mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
feat: ability to configure multiple tx broadcast endpoints #765
This commit is contained in:
4
.env
4
.env
@@ -29,6 +29,10 @@ STACKS_CORE_RPC_PORT=20443
|
||||
# STACKS_CORE_PROXY_HOST=127.0.0.1
|
||||
# STACKS_CORE_PROXY_PORT=20443
|
||||
|
||||
# Configure a path to a file containing additional stacks-node `POST /v2/tranascation` URLs for the /v2 proxy to mutlicast.
|
||||
# The file should be a newline-delimited list of URLs.
|
||||
# STACKS_API_EXTRA_TX_ENDPOINTS_FILE=./config/extra-tx-post-endpoints.txt
|
||||
|
||||
# STACKS_FAUCET_NODE_HOST=<IP or hostname>
|
||||
# STACKS_FAUCET_NODE_PORT=<port number>
|
||||
|
||||
|
||||
4
config/extra-tx-post-endpoints.txt
Normal file
4
config/extra-tx-post-endpoints.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
http://another-node-a/v2/transactions
|
||||
http://another-node-b/v2/transactions
|
||||
# http://another-node-c/v2/transactions
|
||||
# http://another-node-d/v2/transactions
|
||||
@@ -1,12 +1,14 @@
|
||||
import * as express from 'express';
|
||||
import * as cors from 'cors';
|
||||
import { createProxyMiddleware, Options } from 'http-proxy-middleware';
|
||||
import { logger, parsePort } from '../../helpers';
|
||||
import { logError, logger, parsePort, pipelineAsync, REPO_DIR } from '../../helpers';
|
||||
import { Agent } from 'http';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import { addAsync } from '@awaitjs/express';
|
||||
import * as chokidar from 'chokidar';
|
||||
import * as jsoncParser from 'jsonc-parser';
|
||||
import fetch, { RequestInit } from 'node-fetch';
|
||||
|
||||
export function GetStacksNodeProxyEndpoint() {
|
||||
// Use STACKS_CORE_PROXY env vars if available, otherwise fallback to `STACKS_CORE_RPC
|
||||
@@ -79,6 +81,132 @@ export function createCoreNodeRpcProxyRouter(): express.Router {
|
||||
return null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Check for any extra endpoints that have been configured for performing a "multicast" for a tx submission.
|
||||
*/
|
||||
async function getExtraTxPostEndpoints(): Promise<string[] | false> {
|
||||
const STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR = 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE';
|
||||
const extraEndpointsEnvVar = process.env[STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR];
|
||||
if (!extraEndpointsEnvVar) {
|
||||
return false;
|
||||
}
|
||||
const filePath = path.resolve(REPO_DIR, extraEndpointsEnvVar);
|
||||
let fileContents: string;
|
||||
try {
|
||||
fileContents = await fs.promises.readFile(filePath, { encoding: 'utf8' });
|
||||
} catch (error) {
|
||||
logError(`Error reading ${STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR}: ${error}`, error);
|
||||
return false;
|
||||
}
|
||||
const endpoints = fileContents
|
||||
.split(/\r?\n/)
|
||||
.map(r => r.trim())
|
||||
.filter(r => !r.startsWith('#') && r.length !== 0);
|
||||
if (endpoints.length === 0) {
|
||||
return false;
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads an http request stream into a Buffer.
|
||||
*/
|
||||
async function readRequestBody(req: express.Request, maxSizeBytes = Infinity): Promise<Buffer> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let resultBuffer: Buffer = Buffer.alloc(0);
|
||||
req.on('data', chunk => {
|
||||
if (!Buffer.isBuffer(chunk)) {
|
||||
reject(
|
||||
new Error(
|
||||
`Expected request body chunks to be Buffer, received ${chunk.constructor.name}`
|
||||
)
|
||||
);
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
resultBuffer = resultBuffer.length === 0 ? chunk : Buffer.concat([resultBuffer, chunk]);
|
||||
if (resultBuffer.byteLength >= maxSizeBytes) {
|
||||
reject(new Error(`Request body exceeded max byte size`));
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
});
|
||||
req.on('end', () => {
|
||||
if (!req.complete) {
|
||||
return reject(
|
||||
new Error('The connection was terminated while the message was still being sent')
|
||||
);
|
||||
}
|
||||
resolve(resultBuffer);
|
||||
});
|
||||
req.on('error', error => reject(error));
|
||||
});
|
||||
}
|
||||
|
||||
router.postAsync('/transactions', async (req, res, next) => {
|
||||
const extraEndpoints = await getExtraTxPostEndpoints();
|
||||
if (!extraEndpoints) {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
const endpoints = [
|
||||
// The primary proxy endpoint (the http response from this one will be returned to the client)
|
||||
`http://${stacksNodeRpcEndpoint}/v2/transactions`,
|
||||
];
|
||||
endpoints.push(...extraEndpoints);
|
||||
logger.info(`Overriding POST /v2/transactions to multicast to ${endpoints.join(',')}}`);
|
||||
const maxBodySize = 10_000_000; // 10 MB max POST body size
|
||||
const reqBody = await readRequestBody(req, maxBodySize);
|
||||
const reqHeaders: string[][] = [];
|
||||
for (let i = 0; i < req.rawHeaders.length; i += 2) {
|
||||
reqHeaders.push([req.rawHeaders[i], req.rawHeaders[i + 1]]);
|
||||
}
|
||||
const postFn = async (endpoint: string) => {
|
||||
const reqOpts: RequestInit = {
|
||||
method: 'POST',
|
||||
agent: httpAgent,
|
||||
body: reqBody,
|
||||
headers: reqHeaders,
|
||||
};
|
||||
const proxyResult = await fetch(endpoint, reqOpts);
|
||||
return proxyResult;
|
||||
};
|
||||
|
||||
// Here's were we "multicast" the `/v2/transaction` POST, by concurrently sending the http request to all configured endpoints.
|
||||
const results = await Promise.allSettled(endpoints.map(endpoint => postFn(endpoint)));
|
||||
|
||||
// Only the first (non-extra) endpoint http response is proxied back through to the client, so ensure any errors from requests
|
||||
// to the extra endpoints are logged.
|
||||
results.slice(1).forEach(p => {
|
||||
if (p.status === 'rejected') {
|
||||
logError(`Error during POST /v2/transaction to extra endpoint: ${p.reason}`, p.reason);
|
||||
} else {
|
||||
if (!p.value.ok) {
|
||||
logError(
|
||||
`Response ${p.value.status} during POST /v2/transaction to extra endpoint ${p.value.url}`
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Proxy the result of the (non-extra) http response back to the client.
|
||||
const mainResult = results[0];
|
||||
if (mainResult.status === 'rejected') {
|
||||
logError(
|
||||
`Error in primary POST /v2/transaction proxy: ${mainResult.reason}`,
|
||||
mainResult.reason
|
||||
);
|
||||
res.status(500).json({ error: mainResult.reason });
|
||||
} else {
|
||||
const proxyResp = mainResult.value;
|
||||
res.status(proxyResp.status);
|
||||
proxyResp.headers.forEach((value, name) => {
|
||||
res.setHeader(name, value);
|
||||
});
|
||||
await pipelineAsync(proxyResp.body, res);
|
||||
}
|
||||
});
|
||||
|
||||
const proxyOptions: Options = {
|
||||
agent: httpAgent,
|
||||
target: `http://${stacksNodeRpcEndpoint}`,
|
||||
|
||||
@@ -37,3 +37,32 @@ export async function useWithCleanup<T extends [...Disposable<any>[]]>(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export type TestEnvVar = [EnvVarKey: string, EnvVarValue: string];
|
||||
|
||||
/**
|
||||
* Helper function for tests.
|
||||
* Sets local process environment variables, and returns a function that restores them to the original values.
|
||||
*/
|
||||
export function withEnvVars(...envVars: TestEnvVar[]) {
|
||||
const original: { exists: boolean; key: string; value: string | undefined }[] = [];
|
||||
envVars.forEach(([k, v]) => {
|
||||
original.push({
|
||||
exists: k in process.env,
|
||||
key: k,
|
||||
value: v,
|
||||
});
|
||||
});
|
||||
envVars.forEach(([k, v]) => {
|
||||
process.env[k] = v;
|
||||
});
|
||||
return () => {
|
||||
original.forEach(orig => {
|
||||
if (!orig.exists) {
|
||||
delete process.env[orig.key];
|
||||
} else {
|
||||
process.env[orig.key] = orig.value;
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
85
src/tests/v2-proxy-tests.ts
Normal file
85
src/tests/v2-proxy-tests.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
import * as supertest from 'supertest';
|
||||
import { ChainID } from '@stacks/transactions';
|
||||
import { startApiServer } from '../api/init';
|
||||
import { PgDataStore, cycleMigrations, runMigrations } from '../datastore/postgres-store';
|
||||
import { PoolClient } from 'pg';
|
||||
import { useWithCleanup, withEnvVars } from './test-helpers';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
import * as nock from 'nock';
|
||||
|
||||
describe('v2-proxy tests', () => {
|
||||
let db: PgDataStore;
|
||||
let client: PoolClient;
|
||||
|
||||
beforeEach(async () => {
|
||||
process.env.PG_DATABASE = 'postgres';
|
||||
await cycleMigrations();
|
||||
db = await PgDataStore.connect();
|
||||
client = await db.pool.connect();
|
||||
});
|
||||
|
||||
test('tx post multicast', async () => {
|
||||
const primaryProxyEndpoint = 'proxy-stacks-node:12345';
|
||||
const extraTxEndpoint = 'http://extra-tx-endpoint-a/test';
|
||||
await useWithCleanup(
|
||||
() => {
|
||||
const restoreEnvVars = withEnvVars(
|
||||
['STACKS_CORE_PROXY_HOST', primaryProxyEndpoint.split(':')[0]],
|
||||
['STACKS_CORE_PROXY_PORT', primaryProxyEndpoint.split(':')[1]]
|
||||
);
|
||||
return [, () => restoreEnvVars()] as const;
|
||||
},
|
||||
() => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'stacks-api-unit-test-'));
|
||||
const extraEndpointsFilePath = path.join(tempDir, 'extra-tx-endpoints.txt');
|
||||
fs.writeFileSync(extraEndpointsFilePath, extraTxEndpoint, { flag: 'w' });
|
||||
const restoreEnvVars = withEnvVars([
|
||||
'STACKS_API_EXTRA_TX_ENDPOINTS_FILE',
|
||||
extraEndpointsFilePath,
|
||||
]);
|
||||
return [, () => restoreEnvVars()] as const;
|
||||
},
|
||||
async () => {
|
||||
const apiServer = await startApiServer({
|
||||
datastore: db,
|
||||
chainId: ChainID.Mainnet,
|
||||
httpLogLevel: 'debug',
|
||||
});
|
||||
return [apiServer, apiServer.terminate] as const;
|
||||
},
|
||||
async (_, __, api) => {
|
||||
const primaryStubbedResponse = 'success stubbed response';
|
||||
const extraStubbedResponse = 'extra success stubbed response';
|
||||
const testRequest = 'fake-tx-data';
|
||||
let mockedRequestBody = 'none';
|
||||
nock(`http://${primaryProxyEndpoint}`)
|
||||
.post('/v2/transactions', testRequest)
|
||||
.once()
|
||||
.reply(200, primaryStubbedResponse);
|
||||
nock(extraTxEndpoint)
|
||||
.post(() => true, testRequest)
|
||||
.once()
|
||||
.reply(200, (_url, body, cb) => {
|
||||
// the "extra" endpoint responses are logged internally and not sent back to the client, so use this mock callback to
|
||||
// test that this endpoint was called correctly
|
||||
mockedRequestBody = body as string;
|
||||
cb(null, extraStubbedResponse);
|
||||
});
|
||||
const postTxReq = await supertest(api.server).post(`/v2/transactions`).send(testRequest);
|
||||
// test that main endpoint response was returned
|
||||
expect(postTxReq.status).toBe(200);
|
||||
expect(postTxReq.text).toBe(primaryStubbedResponse);
|
||||
// test that the extra endpoint was queried
|
||||
expect(mockedRequestBody).toBe(testRequest);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
client.release();
|
||||
await db?.close();
|
||||
await runMigrations(undefined, 'down');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user