feat: ability to configure multiple tx broadcast endpoints #765

This commit is contained in:
Matthew Little
2021-09-20 21:50:06 +02:00
committed by GitHub
parent 101922bc84
commit 8a9222a3cb
5 changed files with 251 additions and 1 deletions

4
.env
View File

@@ -29,6 +29,10 @@ STACKS_CORE_RPC_PORT=20443
# STACKS_CORE_PROXY_HOST=127.0.0.1
# STACKS_CORE_PROXY_PORT=20443
# Configure a path to a file containing additional stacks-node `POST /v2/tranascation` URLs for the /v2 proxy to mutlicast.
# The file should be a newline-delimited list of URLs.
# STACKS_API_EXTRA_TX_ENDPOINTS_FILE=./config/extra-tx-post-endpoints.txt
# STACKS_FAUCET_NODE_HOST=<IP or hostname>
# STACKS_FAUCET_NODE_PORT=<port number>

View File

@@ -0,0 +1,4 @@
http://another-node-a/v2/transactions
http://another-node-b/v2/transactions
# http://another-node-c/v2/transactions
# http://another-node-d/v2/transactions

View File

@@ -1,12 +1,14 @@
import * as express from 'express';
import * as cors from 'cors';
import { createProxyMiddleware, Options } from 'http-proxy-middleware';
import { logger, parsePort } from '../../helpers';
import { logError, logger, parsePort, pipelineAsync, REPO_DIR } from '../../helpers';
import { Agent } from 'http';
import * as fs from 'fs';
import * as path from 'path';
import { addAsync } from '@awaitjs/express';
import * as chokidar from 'chokidar';
import * as jsoncParser from 'jsonc-parser';
import fetch, { RequestInit } from 'node-fetch';
export function GetStacksNodeProxyEndpoint() {
// Use STACKS_CORE_PROXY env vars if available, otherwise fallback to `STACKS_CORE_RPC
@@ -79,6 +81,132 @@ export function createCoreNodeRpcProxyRouter(): express.Router {
return null;
};
/**
* Check for any extra endpoints that have been configured for performing a "multicast" for a tx submission.
*/
async function getExtraTxPostEndpoints(): Promise<string[] | false> {
const STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR = 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE';
const extraEndpointsEnvVar = process.env[STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR];
if (!extraEndpointsEnvVar) {
return false;
}
const filePath = path.resolve(REPO_DIR, extraEndpointsEnvVar);
let fileContents: string;
try {
fileContents = await fs.promises.readFile(filePath, { encoding: 'utf8' });
} catch (error) {
logError(`Error reading ${STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR}: ${error}`, error);
return false;
}
const endpoints = fileContents
.split(/\r?\n/)
.map(r => r.trim())
.filter(r => !r.startsWith('#') && r.length !== 0);
if (endpoints.length === 0) {
return false;
}
return endpoints;
}
/**
* Reads an http request stream into a Buffer.
*/
async function readRequestBody(req: express.Request, maxSizeBytes = Infinity): Promise<Buffer> {
return new Promise((resolve, reject) => {
let resultBuffer: Buffer = Buffer.alloc(0);
req.on('data', chunk => {
if (!Buffer.isBuffer(chunk)) {
reject(
new Error(
`Expected request body chunks to be Buffer, received ${chunk.constructor.name}`
)
);
req.destroy();
return;
}
resultBuffer = resultBuffer.length === 0 ? chunk : Buffer.concat([resultBuffer, chunk]);
if (resultBuffer.byteLength >= maxSizeBytes) {
reject(new Error(`Request body exceeded max byte size`));
req.destroy();
return;
}
});
req.on('end', () => {
if (!req.complete) {
return reject(
new Error('The connection was terminated while the message was still being sent')
);
}
resolve(resultBuffer);
});
req.on('error', error => reject(error));
});
}
router.postAsync('/transactions', async (req, res, next) => {
const extraEndpoints = await getExtraTxPostEndpoints();
if (!extraEndpoints) {
next();
return;
}
const endpoints = [
// The primary proxy endpoint (the http response from this one will be returned to the client)
`http://${stacksNodeRpcEndpoint}/v2/transactions`,
];
endpoints.push(...extraEndpoints);
logger.info(`Overriding POST /v2/transactions to multicast to ${endpoints.join(',')}}`);
const maxBodySize = 10_000_000; // 10 MB max POST body size
const reqBody = await readRequestBody(req, maxBodySize);
const reqHeaders: string[][] = [];
for (let i = 0; i < req.rawHeaders.length; i += 2) {
reqHeaders.push([req.rawHeaders[i], req.rawHeaders[i + 1]]);
}
const postFn = async (endpoint: string) => {
const reqOpts: RequestInit = {
method: 'POST',
agent: httpAgent,
body: reqBody,
headers: reqHeaders,
};
const proxyResult = await fetch(endpoint, reqOpts);
return proxyResult;
};
// Here's were we "multicast" the `/v2/transaction` POST, by concurrently sending the http request to all configured endpoints.
const results = await Promise.allSettled(endpoints.map(endpoint => postFn(endpoint)));
// Only the first (non-extra) endpoint http response is proxied back through to the client, so ensure any errors from requests
// to the extra endpoints are logged.
results.slice(1).forEach(p => {
if (p.status === 'rejected') {
logError(`Error during POST /v2/transaction to extra endpoint: ${p.reason}`, p.reason);
} else {
if (!p.value.ok) {
logError(
`Response ${p.value.status} during POST /v2/transaction to extra endpoint ${p.value.url}`
);
}
}
});
// Proxy the result of the (non-extra) http response back to the client.
const mainResult = results[0];
if (mainResult.status === 'rejected') {
logError(
`Error in primary POST /v2/transaction proxy: ${mainResult.reason}`,
mainResult.reason
);
res.status(500).json({ error: mainResult.reason });
} else {
const proxyResp = mainResult.value;
res.status(proxyResp.status);
proxyResp.headers.forEach((value, name) => {
res.setHeader(name, value);
});
await pipelineAsync(proxyResp.body, res);
}
});
const proxyOptions: Options = {
agent: httpAgent,
target: `http://${stacksNodeRpcEndpoint}`,

View File

@@ -37,3 +37,32 @@ export async function useWithCleanup<T extends [...Disposable<any>[]]>(
}
}
}
export type TestEnvVar = [EnvVarKey: string, EnvVarValue: string];
/**
* Helper function for tests.
* Sets local process environment variables, and returns a function that restores them to the original values.
*/
export function withEnvVars(...envVars: TestEnvVar[]) {
const original: { exists: boolean; key: string; value: string | undefined }[] = [];
envVars.forEach(([k, v]) => {
original.push({
exists: k in process.env,
key: k,
value: v,
});
});
envVars.forEach(([k, v]) => {
process.env[k] = v;
});
return () => {
original.forEach(orig => {
if (!orig.exists) {
delete process.env[orig.key];
} else {
process.env[orig.key] = orig.value;
}
});
};
}

View File

@@ -0,0 +1,85 @@
import * as supertest from 'supertest';
import { ChainID } from '@stacks/transactions';
import { startApiServer } from '../api/init';
import { PgDataStore, cycleMigrations, runMigrations } from '../datastore/postgres-store';
import { PoolClient } from 'pg';
import { useWithCleanup, withEnvVars } from './test-helpers';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
import * as nock from 'nock';
describe('v2-proxy tests', () => {
let db: PgDataStore;
let client: PoolClient;
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
await cycleMigrations();
db = await PgDataStore.connect();
client = await db.pool.connect();
});
test('tx post multicast', async () => {
const primaryProxyEndpoint = 'proxy-stacks-node:12345';
const extraTxEndpoint = 'http://extra-tx-endpoint-a/test';
await useWithCleanup(
() => {
const restoreEnvVars = withEnvVars(
['STACKS_CORE_PROXY_HOST', primaryProxyEndpoint.split(':')[0]],
['STACKS_CORE_PROXY_PORT', primaryProxyEndpoint.split(':')[1]]
);
return [, () => restoreEnvVars()] as const;
},
() => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'stacks-api-unit-test-'));
const extraEndpointsFilePath = path.join(tempDir, 'extra-tx-endpoints.txt');
fs.writeFileSync(extraEndpointsFilePath, extraTxEndpoint, { flag: 'w' });
const restoreEnvVars = withEnvVars([
'STACKS_API_EXTRA_TX_ENDPOINTS_FILE',
extraEndpointsFilePath,
]);
return [, () => restoreEnvVars()] as const;
},
async () => {
const apiServer = await startApiServer({
datastore: db,
chainId: ChainID.Mainnet,
httpLogLevel: 'debug',
});
return [apiServer, apiServer.terminate] as const;
},
async (_, __, api) => {
const primaryStubbedResponse = 'success stubbed response';
const extraStubbedResponse = 'extra success stubbed response';
const testRequest = 'fake-tx-data';
let mockedRequestBody = 'none';
nock(`http://${primaryProxyEndpoint}`)
.post('/v2/transactions', testRequest)
.once()
.reply(200, primaryStubbedResponse);
nock(extraTxEndpoint)
.post(() => true, testRequest)
.once()
.reply(200, (_url, body, cb) => {
// the "extra" endpoint responses are logged internally and not sent back to the client, so use this mock callback to
// test that this endpoint was called correctly
mockedRequestBody = body as string;
cb(null, extraStubbedResponse);
});
const postTxReq = await supertest(api.server).post(`/v2/transactions`).send(testRequest);
// test that main endpoint response was returned
expect(postTxReq.status).toBe(200);
expect(postTxReq.text).toBe(primaryStubbedResponse);
// test that the extra endpoint was queried
expect(mockedRequestBody).toBe(testRequest);
}
);
});
afterEach(async () => {
client.release();
await db?.close();
await runMigrations(undefined, 'down');
});
});