diff --git a/readme.md b/readme.md index a57fc0aa..45952357 100644 --- a/readme.md +++ b/readme.md @@ -103,7 +103,7 @@ be upgraded and its database cannot be migrated to a new schema. One way to hand and stacks-node working directory, and re-sync from scratch. Alternatively, an event-replay feature is available where the API records the HTTP POST requests from the stacks-node event emitter, then streams -these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker -- typically around 10 minutes. +these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker. The feature can be used via program args. For example, if there are breaking changes in the API's sql schema, like adding a new column which requires event's to be re-played, the following steps could be ran: @@ -125,6 +125,10 @@ event's to be re-played, the following steps could be ran: node ./lib/index.js import-events --file /tmp/stacks-node-events.tsv --wipe-db --force ``` + This command has two modes of operation, specified by the `--mode` option: + * `archival` (default): The process will import and ingest *all* blockchain events that have happened since the first block. + * `pruned`: The import process will ignore some prunable events (mempool, microblocks) until the import block height has reached `chain tip - 256` blocks. This saves a considerable amount of time during import, but sacrifices some historical data. You can use this mode if you're mostly interested in running an API that prioritizes real time information. + Alternatively, instead of performing the `export-events` command in step 1, an environmental variable can be set which enables events to be streamed to a file as they are received, while the application is running normally. To enable this feature, set the `STACKS_EXPORT_EVENTS_FILE` env var to the file path where events should be appended. Example: diff --git a/src/event-replay/event-replay.ts b/src/event-replay/event-replay.ts new file mode 100644 index 00000000..f38f8717 --- /dev/null +++ b/src/event-replay/event-replay.ts @@ -0,0 +1,166 @@ +import * as path from 'path'; +import * as fs from 'fs'; +import { cycleMigrations, dangerousDropAllTables, PgDataStore } from '../datastore/postgres-store'; +import { startEventServer } from '../event-stream/event-server'; +import { getApiConfiguredChainID, httpPostRequest, logger } from '../helpers'; +import { findTsvBlockHeight, getDbBlockHeight } from './helpers'; + +enum EventImportMode { + /** + * The Event Server will ingest and process every single Stacks node event contained in the TSV file + * from block 0 to the latest block. This is the default mode. + */ + archival = 'archival', + /** + * The Event Server will ingore certain "prunable" events (see `PRUNABLE_EVENT_PATHS`) from + * the imported TSV file if they are received outside of a block window, usually set to + * TSV's `block_height` - 256. + * This allows the import to be faster at the expense of historical blockchain information. + */ + pruned = 'pruned', +} + +/** + * Event paths that will be ignored during `EventImportMode.pruned` if received outside of the + * pruned block window. + */ +const PRUNABLE_EVENT_PATHS = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks']; + +/** + * Exports all Stacks node events stored in the `event_observer_requests` table to a TSV file. + * @param filePath - Path to TSV file to write + * @param overwriteFile - If we should overwrite the file + */ +export async function exportEventsAsTsv( + filePath?: string, + overwriteFile: boolean = false +): Promise { + if (!filePath) { + throw new Error(`A file path should be specified with the --file option`); + } + const resolvedFilePath = path.resolve(filePath); + if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) { + throw new Error( + `A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file` + ); + } + console.log(`Export event data to file: ${resolvedFilePath}`); + const writeStream = fs.createWriteStream(resolvedFilePath); + console.log(`Export started...`); + await PgDataStore.exportRawEventRequests(writeStream); + console.log('Export successful.'); +} + +/** + * Imports Stacks node events from a TSV file and ingests them through the Event Server. + * @param filePath - Path to TSV file to read + * @param importMode - Event import mode + * @param wipeDb - If we should wipe the DB before importing + * @param force - If we should force drop all tables + */ +export async function importEventsFromTsv( + filePath?: string, + importMode?: string, + wipeDb: boolean = false, + force: boolean = false +): Promise { + if (!filePath) { + throw new Error(`A file path should be specified with the --file option`); + } + const resolvedFilePath = path.resolve(filePath); + if (!fs.existsSync(resolvedFilePath)) { + throw new Error(`File does not exist: ${resolvedFilePath}`); + } + let eventImportMode: EventImportMode; + switch (importMode) { + case 'pruned': + eventImportMode = EventImportMode.pruned; + break; + case 'archival': + case undefined: + eventImportMode = EventImportMode.archival; + break; + default: + throw new Error(`Invalid event import mode: ${importMode}`); + } + const hasData = await PgDataStore.containsAnyRawEventRequests(); + if (!wipeDb && hasData) { + throw new Error(`Database contains existing data. Add --wipe-db to drop the existing tables.`); + } + if (force) { + await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' }); + } + + // This performs a "migration down" which drops the tables, then re-creates them. + // If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually, + // or the `--force` option can be used. + await cycleMigrations({ dangerousAllowDataLoss: true }); + + // Look for the TSV's block height and determine the prunable block window. + const tsvBlockHeight = await findTsvBlockHeight(resolvedFilePath); + const blockWindowSize = parseInt( + process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256' + ); + const prunedBlockHeight = Math.max(tsvBlockHeight - blockWindowSize, 0); + console.log(`Event file's block height: ${tsvBlockHeight}`); + console.log(`Starting event import and playback in ${eventImportMode} mode`); + if (eventImportMode === EventImportMode.pruned) { + console.log(`Ignoring all prunable events before block height: ${prunedBlockHeight}`); + } + + const db = await PgDataStore.connect({ + usageName: 'import-events', + skipMigrations: true, + withNotifier: false, + eventReplay: true, + }); + const eventServer = await startEventServer({ + datastore: db, + chainId: getApiConfiguredChainID(), + serverHost: '127.0.0.1', + serverPort: 0, + httpLogLevel: 'debug', + }); + + const readStream = fs.createReadStream(resolvedFilePath); + const rawEventsIterator = PgDataStore.getRawEventRequests(readStream, status => { + console.log(status); + }); + // Set logger to only output for warnings/errors, otherwise the event replay will result + // in the equivalent of months/years of API log output. + logger.level = 'warn'; + // Disable this feature so a redundant export file isn't created while importing from an existing one. + delete process.env['STACKS_EXPORT_EVENTS_FILE']; + // The current import block height. Will be updated with every `/new_block` event. + let blockHeight = 0; + let isPruneFinished = false; + for await (const rawEvents of rawEventsIterator) { + for (const rawEvent of rawEvents) { + if (eventImportMode === EventImportMode.pruned) { + if (PRUNABLE_EVENT_PATHS.includes(rawEvent.event_path) && blockHeight < prunedBlockHeight) { + // Prunable events are ignored here. + continue; + } + if (blockHeight == prunedBlockHeight && !isPruneFinished) { + isPruneFinished = true; + console.log(`Resuming prunable event import...`); + } + } + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); + if (rawEvent.event_path === '/new_block') { + blockHeight = await getDbBlockHeight(db); + } + } + } + await db.finishEventReplay(); + console.log(`Event import and playback successful.`); + await eventServer.closeAsync(); + await db.close(); +} diff --git a/src/event-replay/helpers.ts b/src/event-replay/helpers.ts new file mode 100644 index 00000000..cf2c1332 --- /dev/null +++ b/src/event-replay/helpers.ts @@ -0,0 +1,47 @@ +import { PgDataStore } from '../datastore/postgres-store'; +import { ReverseFileStream } from './reverse-file-stream'; + +/** + * Traverse a TSV file in reverse to find the last received `/new_block` node message and return + * the `block_height` reported by that event. Even though the block produced by that event might + * end up being re-org'd, it gives us a reasonable idea as to what the Stacks node thought + * the block height was the moment it was sent. + * @param filePath - TSV path + * @returns `number` found block height, 0 if not found + */ +export async function findTsvBlockHeight(filePath: string): Promise { + let blockHeight = 0; + const reverseStream = new ReverseFileStream(filePath); + for await (const data of reverseStream) { + const columns = data.toString().split('\t'); + const eventName = columns[2]; + if (eventName === '/new_block') { + const payload = columns[3]; + blockHeight = JSON.parse(payload).block_height; + break; + } + } + + reverseStream.destroy(); + return blockHeight; +} + +/** + * Get the current block height from the DB. We won't use the `getChainTip` method since that + * adds some conversions from block hashes into strings that we're not interested in. We also can't + * use the `chain_tip` materialized view since it is unavailable during replay, so we'll use the + * `block_height DESC` index. + * @param db - Data store + * @returns Block height + */ +export async function getDbBlockHeight(db: PgDataStore): Promise { + const result = await db.query(async client => { + return await client.query<{ block_height: number }>( + `SELECT MAX(block_height) as block_height FROM blocks WHERE canonical = TRUE` + ); + }); + if (result.rowCount === 0) { + return 0; + } + return result.rows[0].block_height; +} diff --git a/src/event-replay/reverse-file-stream.ts b/src/event-replay/reverse-file-stream.ts new file mode 100644 index 00000000..4c7a9601 --- /dev/null +++ b/src/event-replay/reverse-file-stream.ts @@ -0,0 +1,54 @@ +import * as stream from 'stream'; +import * as fs from 'fs'; + +/** + * Streams lines from a text file in reverse, starting from the end of the file. + * Modernized version of https://www.npmjs.com/package/fs-reverse + */ +export class ReverseFileStream extends stream.Readable { + private fileDescriptor: number; + private position: number; + + private lineBuffer: string[] = []; + private remainder: string = ''; + + public readonly fileLength: number; + public bytesRead: number = 0; + + constructor(filePath: string, opts?: stream.ReadableOptions) { + // `objectMode` avoids the `Buffer->utf8->Buffer->utf8` conversions when pushing strings + super({ ...{ objectMode: true, autoDestroy: true }, ...opts }); + this.fileLength = fs.statSync(filePath).size; + this.position = this.fileLength; + this.fileDescriptor = fs.openSync(filePath, 'r', 0o666); + } + + _read(size: number): void { + while (this.lineBuffer.length === 0 && this.position > 0) { + // Read `size` bytes from the end of the file. + const length = Math.min(size, this.position); + const buffer = Buffer.alloc(length); + this.position = this.position - length; + this.bytesRead += fs.readSync(this.fileDescriptor, buffer, 0, length, this.position); + + // Split into lines to fill the `lineBuffer` + this.remainder = buffer.toString('utf8') + this.remainder; + this.lineBuffer = this.remainder.split(/\r?\n/); + // Ignore empty/trailing lines, `readable.push('')` is not recommended + this.lineBuffer = this.lineBuffer.filter(line => line.length > 0); + this.remainder = this.lineBuffer.shift() ?? ''; + } + if (this.lineBuffer.length) { + this.push(this.lineBuffer.pop()); + } else if (this.remainder.length) { + this.push(this.remainder); + this.remainder = ''; + } else { + this.push(null); + } + } + + _destroy(error: Error | null, callback: (error?: Error | null) => void): void { + fs.closeSync(this.fileDescriptor); + } +} diff --git a/src/helpers.ts b/src/helpers.ts index e2ad4970..30c122a5 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -17,6 +17,7 @@ import { SyslogConfigSetLevels, } from 'winston/lib/winston/config'; import { DbStxEvent, DbTx } from './datastore/common'; +import { StacksCoreRpcClient } from './core-rpc/client'; export const isDevEnv = process.env.NODE_ENV === 'development'; export const isTestEnv = process.env.NODE_ENV === 'test'; @@ -960,3 +961,34 @@ export function isSmartContractTx(dbTx: DbTx, stxEvents: DbStxEvent[] = []): boo } return false; } + +/** + * Gets the chain id as reported by the Stacks node. + * @returns `ChainID` Chain id + */ +export async function getStacksNodeChainID(): Promise { + const client = new StacksCoreRpcClient(); + await client.waitForConnection(Infinity); + const coreInfo = await client.getInfo(); + if (coreInfo.network_id === ChainID.Mainnet) { + return ChainID.Mainnet; + } else if (coreInfo.network_id === ChainID.Testnet) { + return ChainID.Testnet; + } else { + throw new Error(`Unexpected network_id "${coreInfo.network_id}"`); + } +} + +/** + * Gets the chain id as configured by the `STACKS_CHAIN_ID` API env variable. + * @returns `ChainID` Chain id + */ +export function getApiConfiguredChainID() { + if (!('STACKS_CHAIN_ID' in process.env)) { + const error = new Error(`Env var STACKS_CHAIN_ID is not set`); + logError(error.message, error); + throw error; + } + const configuredChainID: ChainID = parseInt(process.env['STACKS_CHAIN_ID'] as string); + return configuredChainID; +} diff --git a/src/index.ts b/src/index.ts index a299d5e9..90b30b9e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,12 +5,13 @@ import { logError, isProdEnv, numberToHex, - httpPostRequest, parseArgBoolean, + getApiConfiguredChainID, + getStacksNodeChainID, } from './helpers'; import * as sourceMapSupport from 'source-map-support'; import { DataStore } from './datastore/common'; -import { cycleMigrations, dangerousDropAllTables, PgDataStore } from './datastore/postgres-store'; +import { PgDataStore } from './datastore/postgres-store'; import { startApiServer } from './api/init'; import { startProfilerServer } from './inspector-util'; import { startEventServer } from './event-stream/event-server'; @@ -21,15 +22,14 @@ import { } from './event-stream/tokens-contract-handler'; import { StacksCoreRpcClient } from './core-rpc/client'; import { createServer as createPrometheusServer } from '@promster/server'; -import { ChainID } from '@stacks/transactions'; import { registerShutdownConfig } from './shutdown-handler'; import { importV1TokenOfferingData, importV1BnsData } from './import-v1'; import { OfflineDummyStore } from './datastore/offline-dummy-store'; import { Socket } from 'net'; import * as getopts from 'getopts'; import * as fs from 'fs'; -import * as path from 'path'; import { injectC32addressEncodeCache } from './c32-addr-cache'; +import { exportEventsAsTsv, importEventsFromTsv } from './event-replay/event-replay'; enum StacksApiMode { /** @@ -103,29 +103,6 @@ async function monitorCoreRpcConnection(): Promise { } } -async function getCoreChainID(): Promise { - const client = new StacksCoreRpcClient(); - await client.waitForConnection(Infinity); - const coreInfo = await client.getInfo(); - if (coreInfo.network_id === ChainID.Mainnet) { - return ChainID.Mainnet; - } else if (coreInfo.network_id === ChainID.Testnet) { - return ChainID.Testnet; - } else { - throw new Error(`Unexpected network_id "${coreInfo.network_id}"`); - } -} - -function getConfiguredChainID() { - if (!('STACKS_CHAIN_ID' in process.env)) { - const error = new Error(`Env var STACKS_CHAIN_ID is not set`); - logError(error.message, error); - throw error; - } - const configuredChainID: ChainID = parseInt(process.env['STACKS_CHAIN_ID'] as string); - return configuredChainID; -} - async function init(): Promise { if (isProdEnv && !fs.existsSync('.git-info')) { throw new Error( @@ -161,7 +138,7 @@ async function init(): Promise { } } - const configuredChainID = getConfiguredChainID(); + const configuredChainID = getApiConfiguredChainID(); const eventServer = await startEventServer({ datastore: db, @@ -173,7 +150,7 @@ async function init(): Promise { forceKillable: false, }); - const networkChainId = await getCoreChainID(); + const networkChainId = await getStacksNodeChainID(); if (networkChainId !== configuredChainID) { const chainIdConfig = numberToHex(configuredChainID); const chainIdNode = numberToHex(networkChainId); @@ -207,7 +184,7 @@ async function init(): Promise { } if (apiMode !== StacksApiMode.writeOnly) { - const apiServer = await startApiServer({ datastore: db, chainId: getConfiguredChainID() }); + const apiServer = await startApiServer({ datastore: db, chainId: getApiConfiguredChainID() }); logger.info(`API server listening on: http://${apiServer.address}`); registerShutdownConfig({ name: 'API Server', @@ -286,6 +263,7 @@ function getProgramArgs() { operand: 'import-events'; options: { ['file']?: string; + ['mode']?: string; ['wipe-db']?: boolean; ['force']?: boolean; }; @@ -296,83 +274,14 @@ function getProgramArgs() { async function handleProgramArgs() { const { args, parsedOpts } = getProgramArgs(); if (args.operand === 'export-events') { - if (!args.options.file) { - throw new Error(`A file path should be specified with the --file option`); - } - const filePath = path.resolve(args.options.file); - if (fs.existsSync(filePath) && args.options['overwrite-file'] !== true) { - throw new Error( - `A file already exists at ${filePath}. Add --overwrite-file to truncate an existing file` - ); - } - console.log(`Export event data to file: ${filePath}`); - const writeStream = fs.createWriteStream(filePath); - console.log(`Export started...`); - await PgDataStore.exportRawEventRequests(writeStream); - console.log('Export successful.'); + await exportEventsAsTsv(args.options.file, args.options['overwrite-file']); } else if (args.operand === 'import-events') { - if (!args.options.file) { - throw new Error(`A file path should be specified with the --file option`); - } - const filePath = path.resolve(args.options.file); - if (!fs.existsSync(filePath)) { - throw new Error(`File does not exist: ${filePath}`); - } - const hasData = await PgDataStore.containsAnyRawEventRequests(); - if (!args.options['wipe-db'] && hasData) { - throw new Error( - `Database contains existing data. Add --wipe-db to drop the existing tables.` - ); - } - - if (args.options['force']) { - await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' }); - } - - // This performs a "migration down" which drops the tables, then re-creates them. - // If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually, - // or the `--force` option can be used. - await cycleMigrations({ dangerousAllowDataLoss: true }); - - const db = await PgDataStore.connect({ - usageName: 'import-events', - skipMigrations: true, - withNotifier: false, - eventReplay: true, - }); - const eventServer = await startEventServer({ - datastore: db, - chainId: getConfiguredChainID(), - serverHost: '127.0.0.1', - serverPort: 0, - httpLogLevel: 'debug', - }); - - const readStream = fs.createReadStream(filePath); - const rawEventsIterator = PgDataStore.getRawEventRequests(readStream, status => { - console.log(status); - }); - // Set logger to only output for warnings/errors, otherwise the event replay will result - // in the equivalent of months/years of API log output. - logger.level = 'warn'; - // Disable this feature so a redundant export file isn't created while importing from an existing one. - delete process.env['STACKS_EXPORT_EVENTS_FILE']; - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - } - } - await db.finishEventReplay(); - console.log(`Event import and playback successful.`); - await eventServer.closeAsync(); - await db.close(); + await importEventsFromTsv( + args.options.file, + args.options.mode, + args.options['wipe-db'], + args.options.force + ); } else if (parsedOpts._[0]) { throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`); } else { diff --git a/src/tests/event-replay-tests.ts b/src/tests/event-replay-tests.ts new file mode 100644 index 00000000..cd42e87f --- /dev/null +++ b/src/tests/event-replay-tests.ts @@ -0,0 +1,122 @@ +import * as fs from 'fs'; +import { findTsvBlockHeight } from '../event-replay/helpers'; +import { ReverseFileStream } from '../event-replay/reverse-file-stream'; + +describe('event replay tests', () => { + function writeTmpFile(fileName: string, contents: string): string { + try { + fs.mkdirSync('./.tmp'); + } catch (error: any) { + if (error.code != 'EEXIST') throw error; + } + const path = `./.tmp/${fileName}`; + fs.writeFileSync(path, contents, { encoding: 'utf-8' }); + return path; + } + + test('ReverseFileStream handles backpressure', async () => { + let contents = ''; + for (let i = 1; i <= 1000; i++) { + contents += `line${i}\n`; + } + const testFilePath = writeTmpFile('test1.txt', contents); + try { + // Default stream buffer is 64KB, set to 300 bytes so file is larger than memory buffer + const reverseStream = new ReverseFileStream(testFilePath, { highWaterMark: 300 }); + const output: string[] = []; + let linesStreamed = 0; + for await (const data of reverseStream) { + linesStreamed++; + output.push(data); + if (linesStreamed === 4) { + break; + } + } + expect(linesStreamed).toEqual(4); + expect(output).toEqual(['line1000', 'line999', 'line998', 'line997']); + expect(reverseStream.bytesRead).toBeLessThan(reverseStream.fileLength); + + // Read whole file + const reverseStream2 = new ReverseFileStream(testFilePath, { highWaterMark: 300 }); + const output2: string[] = []; + let linesStreamed2 = 0; + for await (const data of reverseStream2) { + linesStreamed2++; + output2.push(data); + } + expect(linesStreamed2).toEqual(1000); + expect(output2[0]).toBe('line1000'); + expect(output2[output2.length - 1]).toBe('line1'); + expect(reverseStream2.bytesRead).toBe(reverseStream2.fileLength); + } finally { + fs.unlinkSync(testFilePath); + } + }); + + test('ReverseFileStream streams file in reverse', async () => { + const contents = `line1 +line2 +line3 +line4`; + const testFilePath = writeTmpFile('test1.txt', contents); + try { + const reverseStream = new ReverseFileStream(testFilePath); + const output: string[] = []; + let linesStreamed = 0; + for await (const data of reverseStream) { + linesStreamed++; + output.push(data); + } + expect(linesStreamed).toEqual(4); + expect(output).toEqual(['line4', 'line3', 'line2', 'line1']); + } finally { + fs.unlinkSync(testFilePath); + } + }); + + test('ReverseFileStream streams file in reverse', async () => { + const contents = ['line1', 'line2', 'line3', 'line4'].join('\r\n'); + const testFilePath = writeTmpFile('test1.txt', contents); + try { + const reverseStream = new ReverseFileStream(testFilePath); + const output: string[] = []; + let linesStreamed = 0; + for await (const data of reverseStream) { + linesStreamed++; + output.push(data); + } + expect(linesStreamed).toEqual(4); + expect(output).toEqual(['line4', 'line3', 'line2', 'line1']); + } finally { + fs.unlinkSync(testFilePath); + } + }); + + test('TSV block height is found', async () => { + const contents = `744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[] +744275\t2022-02-21 16:07:01.123587+00\t/new_block\t{"block_height": 1200} +744275\t2022-02-21 16:07:01.123587+00\t/new_block\t{"block_height": 1201} +744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]`; + const testFilePath = writeTmpFile('test1.tsv', contents); + try { + const blockHeight = await findTsvBlockHeight(testFilePath); + expect(blockHeight).toEqual(1201); + } finally { + fs.unlinkSync(testFilePath); + } + }); + + test('TSV block height is 0 if not found', async () => { + const contents = `744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[] +744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[] +744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[] +744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]`; + const testFilePath = writeTmpFile('test1.tsv', contents); + try { + const blockHeight = await findTsvBlockHeight(testFilePath); + expect(blockHeight).toEqual(0); + } finally { + fs.unlinkSync(testFilePath); + } + }); +});