From 8a9222a3cb6ba47ee2c90473e34f433b88e73572 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 20 Sep 2021 21:50:06 +0200 Subject: [PATCH] feat: ability to configure multiple tx broadcast endpoints #765 --- .env | 4 + config/extra-tx-post-endpoints.txt | 4 + src/api/routes/core-node-rpc-proxy.ts | 130 +++++++++++++++++++++++++- src/tests/test-helpers.ts | 29 ++++++ src/tests/v2-proxy-tests.ts | 85 +++++++++++++++++ 5 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 config/extra-tx-post-endpoints.txt create mode 100644 src/tests/v2-proxy-tests.ts diff --git a/.env b/.env index c7a915b6..9371d238 100644 --- a/.env +++ b/.env @@ -29,6 +29,10 @@ STACKS_CORE_RPC_PORT=20443 # STACKS_CORE_PROXY_HOST=127.0.0.1 # STACKS_CORE_PROXY_PORT=20443 +# Configure a path to a file containing additional stacks-node `POST /v2/tranascation` URLs for the /v2 proxy to mutlicast. +# The file should be a newline-delimited list of URLs. +# STACKS_API_EXTRA_TX_ENDPOINTS_FILE=./config/extra-tx-post-endpoints.txt + # STACKS_FAUCET_NODE_HOST= # STACKS_FAUCET_NODE_PORT= diff --git a/config/extra-tx-post-endpoints.txt b/config/extra-tx-post-endpoints.txt new file mode 100644 index 00000000..b367ee25 --- /dev/null +++ b/config/extra-tx-post-endpoints.txt @@ -0,0 +1,4 @@ +http://another-node-a/v2/transactions +http://another-node-b/v2/transactions +# http://another-node-c/v2/transactions +# http://another-node-d/v2/transactions diff --git a/src/api/routes/core-node-rpc-proxy.ts b/src/api/routes/core-node-rpc-proxy.ts index 092d31d2..f74b3a17 100644 --- a/src/api/routes/core-node-rpc-proxy.ts +++ b/src/api/routes/core-node-rpc-proxy.ts @@ -1,12 +1,14 @@ import * as express from 'express'; import * as cors from 'cors'; import { createProxyMiddleware, Options } from 'http-proxy-middleware'; -import { logger, parsePort } from '../../helpers'; +import { logError, logger, parsePort, pipelineAsync, REPO_DIR } from '../../helpers'; import { Agent } from 'http'; import * as fs from 'fs'; +import * as path from 'path'; import { addAsync } from '@awaitjs/express'; import * as chokidar from 'chokidar'; import * as jsoncParser from 'jsonc-parser'; +import fetch, { RequestInit } from 'node-fetch'; export function GetStacksNodeProxyEndpoint() { // Use STACKS_CORE_PROXY env vars if available, otherwise fallback to `STACKS_CORE_RPC @@ -79,6 +81,132 @@ export function createCoreNodeRpcProxyRouter(): express.Router { return null; }; + /** + * Check for any extra endpoints that have been configured for performing a "multicast" for a tx submission. + */ + async function getExtraTxPostEndpoints(): Promise { + const STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR = 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE'; + const extraEndpointsEnvVar = process.env[STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR]; + if (!extraEndpointsEnvVar) { + return false; + } + const filePath = path.resolve(REPO_DIR, extraEndpointsEnvVar); + let fileContents: string; + try { + fileContents = await fs.promises.readFile(filePath, { encoding: 'utf8' }); + } catch (error) { + logError(`Error reading ${STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR}: ${error}`, error); + return false; + } + const endpoints = fileContents + .split(/\r?\n/) + .map(r => r.trim()) + .filter(r => !r.startsWith('#') && r.length !== 0); + if (endpoints.length === 0) { + return false; + } + return endpoints; + } + + /** + * Reads an http request stream into a Buffer. + */ + async function readRequestBody(req: express.Request, maxSizeBytes = Infinity): Promise { + return new Promise((resolve, reject) => { + let resultBuffer: Buffer = Buffer.alloc(0); + req.on('data', chunk => { + if (!Buffer.isBuffer(chunk)) { + reject( + new Error( + `Expected request body chunks to be Buffer, received ${chunk.constructor.name}` + ) + ); + req.destroy(); + return; + } + resultBuffer = resultBuffer.length === 0 ? chunk : Buffer.concat([resultBuffer, chunk]); + if (resultBuffer.byteLength >= maxSizeBytes) { + reject(new Error(`Request body exceeded max byte size`)); + req.destroy(); + return; + } + }); + req.on('end', () => { + if (!req.complete) { + return reject( + new Error('The connection was terminated while the message was still being sent') + ); + } + resolve(resultBuffer); + }); + req.on('error', error => reject(error)); + }); + } + + router.postAsync('/transactions', async (req, res, next) => { + const extraEndpoints = await getExtraTxPostEndpoints(); + if (!extraEndpoints) { + next(); + return; + } + const endpoints = [ + // The primary proxy endpoint (the http response from this one will be returned to the client) + `http://${stacksNodeRpcEndpoint}/v2/transactions`, + ]; + endpoints.push(...extraEndpoints); + logger.info(`Overriding POST /v2/transactions to multicast to ${endpoints.join(',')}}`); + const maxBodySize = 10_000_000; // 10 MB max POST body size + const reqBody = await readRequestBody(req, maxBodySize); + const reqHeaders: string[][] = []; + for (let i = 0; i < req.rawHeaders.length; i += 2) { + reqHeaders.push([req.rawHeaders[i], req.rawHeaders[i + 1]]); + } + const postFn = async (endpoint: string) => { + const reqOpts: RequestInit = { + method: 'POST', + agent: httpAgent, + body: reqBody, + headers: reqHeaders, + }; + const proxyResult = await fetch(endpoint, reqOpts); + return proxyResult; + }; + + // Here's were we "multicast" the `/v2/transaction` POST, by concurrently sending the http request to all configured endpoints. + const results = await Promise.allSettled(endpoints.map(endpoint => postFn(endpoint))); + + // Only the first (non-extra) endpoint http response is proxied back through to the client, so ensure any errors from requests + // to the extra endpoints are logged. + results.slice(1).forEach(p => { + if (p.status === 'rejected') { + logError(`Error during POST /v2/transaction to extra endpoint: ${p.reason}`, p.reason); + } else { + if (!p.value.ok) { + logError( + `Response ${p.value.status} during POST /v2/transaction to extra endpoint ${p.value.url}` + ); + } + } + }); + + // Proxy the result of the (non-extra) http response back to the client. + const mainResult = results[0]; + if (mainResult.status === 'rejected') { + logError( + `Error in primary POST /v2/transaction proxy: ${mainResult.reason}`, + mainResult.reason + ); + res.status(500).json({ error: mainResult.reason }); + } else { + const proxyResp = mainResult.value; + res.status(proxyResp.status); + proxyResp.headers.forEach((value, name) => { + res.setHeader(name, value); + }); + await pipelineAsync(proxyResp.body, res); + } + }); + const proxyOptions: Options = { agent: httpAgent, target: `http://${stacksNodeRpcEndpoint}`, diff --git a/src/tests/test-helpers.ts b/src/tests/test-helpers.ts index c747bba2..6c238a91 100644 --- a/src/tests/test-helpers.ts +++ b/src/tests/test-helpers.ts @@ -37,3 +37,32 @@ export async function useWithCleanup[]]>( } } } + +export type TestEnvVar = [EnvVarKey: string, EnvVarValue: string]; + +/** + * Helper function for tests. + * Sets local process environment variables, and returns a function that restores them to the original values. + */ +export function withEnvVars(...envVars: TestEnvVar[]) { + const original: { exists: boolean; key: string; value: string | undefined }[] = []; + envVars.forEach(([k, v]) => { + original.push({ + exists: k in process.env, + key: k, + value: v, + }); + }); + envVars.forEach(([k, v]) => { + process.env[k] = v; + }); + return () => { + original.forEach(orig => { + if (!orig.exists) { + delete process.env[orig.key]; + } else { + process.env[orig.key] = orig.value; + } + }); + }; +} diff --git a/src/tests/v2-proxy-tests.ts b/src/tests/v2-proxy-tests.ts new file mode 100644 index 00000000..918ce08e --- /dev/null +++ b/src/tests/v2-proxy-tests.ts @@ -0,0 +1,85 @@ +import * as supertest from 'supertest'; +import { ChainID } from '@stacks/transactions'; +import { startApiServer } from '../api/init'; +import { PgDataStore, cycleMigrations, runMigrations } from '../datastore/postgres-store'; +import { PoolClient } from 'pg'; +import { useWithCleanup, withEnvVars } from './test-helpers'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as os from 'os'; +import * as nock from 'nock'; + +describe('v2-proxy tests', () => { + let db: PgDataStore; + let client: PoolClient; + + beforeEach(async () => { + process.env.PG_DATABASE = 'postgres'; + await cycleMigrations(); + db = await PgDataStore.connect(); + client = await db.pool.connect(); + }); + + test('tx post multicast', async () => { + const primaryProxyEndpoint = 'proxy-stacks-node:12345'; + const extraTxEndpoint = 'http://extra-tx-endpoint-a/test'; + await useWithCleanup( + () => { + const restoreEnvVars = withEnvVars( + ['STACKS_CORE_PROXY_HOST', primaryProxyEndpoint.split(':')[0]], + ['STACKS_CORE_PROXY_PORT', primaryProxyEndpoint.split(':')[1]] + ); + return [, () => restoreEnvVars()] as const; + }, + () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'stacks-api-unit-test-')); + const extraEndpointsFilePath = path.join(tempDir, 'extra-tx-endpoints.txt'); + fs.writeFileSync(extraEndpointsFilePath, extraTxEndpoint, { flag: 'w' }); + const restoreEnvVars = withEnvVars([ + 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE', + extraEndpointsFilePath, + ]); + return [, () => restoreEnvVars()] as const; + }, + async () => { + const apiServer = await startApiServer({ + datastore: db, + chainId: ChainID.Mainnet, + httpLogLevel: 'debug', + }); + return [apiServer, apiServer.terminate] as const; + }, + async (_, __, api) => { + const primaryStubbedResponse = 'success stubbed response'; + const extraStubbedResponse = 'extra success stubbed response'; + const testRequest = 'fake-tx-data'; + let mockedRequestBody = 'none'; + nock(`http://${primaryProxyEndpoint}`) + .post('/v2/transactions', testRequest) + .once() + .reply(200, primaryStubbedResponse); + nock(extraTxEndpoint) + .post(() => true, testRequest) + .once() + .reply(200, (_url, body, cb) => { + // the "extra" endpoint responses are logged internally and not sent back to the client, so use this mock callback to + // test that this endpoint was called correctly + mockedRequestBody = body as string; + cb(null, extraStubbedResponse); + }); + const postTxReq = await supertest(api.server).post(`/v2/transactions`).send(testRequest); + // test that main endpoint response was returned + expect(postTxReq.status).toBe(200); + expect(postTxReq.text).toBe(primaryStubbedResponse); + // test that the extra endpoint was queried + expect(mockedRequestBody).toBe(testRequest); + } + ); + }); + + afterEach(async () => { + client.release(); + await db?.close(); + await runMigrations(undefined, 'down'); + }); +});