feat: add pruned event import mode that ignores some historical events (#1125)

* chore: move event replay functions to separate file

* feat: add fs-reverse

* feat: start reading block height

* feat: start ignoring prunable events

* refactor: name updates

* feat: integrate pruned mode, create replay helpers file

* fix: unused export

* feat: also try pruning microblock events

* feat: remove fs-reverse and implement again with modern streams

* docs: add mode to readme

* fix: avoid emitting LF bytes in reverse file stream, support CRLF endings, avoid multiple conversions between buffers and strings

* chore: explicit buffer utf8 conversion

* chore: test reading whole file with backpressure

* chore: remove unused waiter import

Co-authored-by: Matthew Little <zone117x@gmail.com>
This commit is contained in:
Rafael Cárdenas
2022-04-11 10:08:45 -05:00
committed by GitHub
parent 303eaaa8cd
commit da992d77b1
7 changed files with 441 additions and 107 deletions

View File

@@ -103,7 +103,7 @@ be upgraded and its database cannot be migrated to a new schema. One way to hand
and stacks-node working directory, and re-sync from scratch.
Alternatively, an event-replay feature is available where the API records the HTTP POST requests from the stacks-node event emitter, then streams
these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker -- typically around 10 minutes.
these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker.
The feature can be used via program args. For example, if there are breaking changes in the API's sql schema, like adding a new column which requires
event's to be re-played, the following steps could be ran:
@@ -125,6 +125,10 @@ event's to be re-played, the following steps could be ran:
node ./lib/index.js import-events --file /tmp/stacks-node-events.tsv --wipe-db --force
```
This command has two modes of operation, specified by the `--mode` option:
* `archival` (default): The process will import and ingest *all* blockchain events that have happened since the first block.
* `pruned`: The import process will ignore some prunable events (mempool, microblocks) until the import block height has reached `chain tip - 256` blocks. This saves a considerable amount of time during import, but sacrifices some historical data. You can use this mode if you're mostly interested in running an API that prioritizes real time information.
Alternatively, instead of performing the `export-events` command in step 1, an environmental variable can be set which enables events to be streamed to a file
as they are received, while the application is running normally. To enable this feature, set the `STACKS_EXPORT_EVENTS_FILE` env var to the file path where
events should be appended. Example:

View File

@@ -0,0 +1,166 @@
import * as path from 'path';
import * as fs from 'fs';
import { cycleMigrations, dangerousDropAllTables, PgDataStore } from '../datastore/postgres-store';
import { startEventServer } from '../event-stream/event-server';
import { getApiConfiguredChainID, httpPostRequest, logger } from '../helpers';
import { findTsvBlockHeight, getDbBlockHeight } from './helpers';
enum EventImportMode {
/**
* The Event Server will ingest and process every single Stacks node event contained in the TSV file
* from block 0 to the latest block. This is the default mode.
*/
archival = 'archival',
/**
* The Event Server will ingore certain "prunable" events (see `PRUNABLE_EVENT_PATHS`) from
* the imported TSV file if they are received outside of a block window, usually set to
* TSV's `block_height` - 256.
* This allows the import to be faster at the expense of historical blockchain information.
*/
pruned = 'pruned',
}
/**
* Event paths that will be ignored during `EventImportMode.pruned` if received outside of the
* pruned block window.
*/
const PRUNABLE_EVENT_PATHS = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks'];
/**
* Exports all Stacks node events stored in the `event_observer_requests` table to a TSV file.
* @param filePath - Path to TSV file to write
* @param overwriteFile - If we should overwrite the file
*/
export async function exportEventsAsTsv(
filePath?: string,
overwriteFile: boolean = false
): Promise<void> {
if (!filePath) {
throw new Error(`A file path should be specified with the --file option`);
}
const resolvedFilePath = path.resolve(filePath);
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
throw new Error(
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
);
}
console.log(`Export event data to file: ${resolvedFilePath}`);
const writeStream = fs.createWriteStream(resolvedFilePath);
console.log(`Export started...`);
await PgDataStore.exportRawEventRequests(writeStream);
console.log('Export successful.');
}
/**
* Imports Stacks node events from a TSV file and ingests them through the Event Server.
* @param filePath - Path to TSV file to read
* @param importMode - Event import mode
* @param wipeDb - If we should wipe the DB before importing
* @param force - If we should force drop all tables
*/
export async function importEventsFromTsv(
filePath?: string,
importMode?: string,
wipeDb: boolean = false,
force: boolean = false
): Promise<void> {
if (!filePath) {
throw new Error(`A file path should be specified with the --file option`);
}
const resolvedFilePath = path.resolve(filePath);
if (!fs.existsSync(resolvedFilePath)) {
throw new Error(`File does not exist: ${resolvedFilePath}`);
}
let eventImportMode: EventImportMode;
switch (importMode) {
case 'pruned':
eventImportMode = EventImportMode.pruned;
break;
case 'archival':
case undefined:
eventImportMode = EventImportMode.archival;
break;
default:
throw new Error(`Invalid event import mode: ${importMode}`);
}
const hasData = await PgDataStore.containsAnyRawEventRequests();
if (!wipeDb && hasData) {
throw new Error(`Database contains existing data. Add --wipe-db to drop the existing tables.`);
}
if (force) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
// This performs a "migration down" which drops the tables, then re-creates them.
// If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually,
// or the `--force` option can be used.
await cycleMigrations({ dangerousAllowDataLoss: true });
// Look for the TSV's block height and determine the prunable block window.
const tsvBlockHeight = await findTsvBlockHeight(resolvedFilePath);
const blockWindowSize = parseInt(
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
);
const prunedBlockHeight = Math.max(tsvBlockHeight - blockWindowSize, 0);
console.log(`Event file's block height: ${tsvBlockHeight}`);
console.log(`Starting event import and playback in ${eventImportMode} mode`);
if (eventImportMode === EventImportMode.pruned) {
console.log(`Ignoring all prunable events before block height: ${prunedBlockHeight}`);
}
const db = await PgDataStore.connect({
usageName: 'import-events',
skipMigrations: true,
withNotifier: false,
eventReplay: true,
});
const eventServer = await startEventServer({
datastore: db,
chainId: getApiConfiguredChainID(),
serverHost: '127.0.0.1',
serverPort: 0,
httpLogLevel: 'debug',
});
const readStream = fs.createReadStream(resolvedFilePath);
const rawEventsIterator = PgDataStore.getRawEventRequests(readStream, status => {
console.log(status);
});
// Set logger to only output for warnings/errors, otherwise the event replay will result
// in the equivalent of months/years of API log output.
logger.level = 'warn';
// Disable this feature so a redundant export file isn't created while importing from an existing one.
delete process.env['STACKS_EXPORT_EVENTS_FILE'];
// The current import block height. Will be updated with every `/new_block` event.
let blockHeight = 0;
let isPruneFinished = false;
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
if (eventImportMode === EventImportMode.pruned) {
if (PRUNABLE_EVENT_PATHS.includes(rawEvent.event_path) && blockHeight < prunedBlockHeight) {
// Prunable events are ignored here.
continue;
}
if (blockHeight == prunedBlockHeight && !isPruneFinished) {
isPruneFinished = true;
console.log(`Resuming prunable event import...`);
}
}
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (rawEvent.event_path === '/new_block') {
blockHeight = await getDbBlockHeight(db);
}
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();
}

View File

@@ -0,0 +1,47 @@
import { PgDataStore } from '../datastore/postgres-store';
import { ReverseFileStream } from './reverse-file-stream';
/**
* Traverse a TSV file in reverse to find the last received `/new_block` node message and return
* the `block_height` reported by that event. Even though the block produced by that event might
* end up being re-org'd, it gives us a reasonable idea as to what the Stacks node thought
* the block height was the moment it was sent.
* @param filePath - TSV path
* @returns `number` found block height, 0 if not found
*/
export async function findTsvBlockHeight(filePath: string): Promise<number> {
let blockHeight = 0;
const reverseStream = new ReverseFileStream(filePath);
for await (const data of reverseStream) {
const columns = data.toString().split('\t');
const eventName = columns[2];
if (eventName === '/new_block') {
const payload = columns[3];
blockHeight = JSON.parse(payload).block_height;
break;
}
}
reverseStream.destroy();
return blockHeight;
}
/**
* Get the current block height from the DB. We won't use the `getChainTip` method since that
* adds some conversions from block hashes into strings that we're not interested in. We also can't
* use the `chain_tip` materialized view since it is unavailable during replay, so we'll use the
* `block_height DESC` index.
* @param db - Data store
* @returns Block height
*/
export async function getDbBlockHeight(db: PgDataStore): Promise<number> {
const result = await db.query(async client => {
return await client.query<{ block_height: number }>(
`SELECT MAX(block_height) as block_height FROM blocks WHERE canonical = TRUE`
);
});
if (result.rowCount === 0) {
return 0;
}
return result.rows[0].block_height;
}

View File

@@ -0,0 +1,54 @@
import * as stream from 'stream';
import * as fs from 'fs';
/**
* Streams lines from a text file in reverse, starting from the end of the file.
* Modernized version of https://www.npmjs.com/package/fs-reverse
*/
export class ReverseFileStream extends stream.Readable {
private fileDescriptor: number;
private position: number;
private lineBuffer: string[] = [];
private remainder: string = '';
public readonly fileLength: number;
public bytesRead: number = 0;
constructor(filePath: string, opts?: stream.ReadableOptions) {
// `objectMode` avoids the `Buffer->utf8->Buffer->utf8` conversions when pushing strings
super({ ...{ objectMode: true, autoDestroy: true }, ...opts });
this.fileLength = fs.statSync(filePath).size;
this.position = this.fileLength;
this.fileDescriptor = fs.openSync(filePath, 'r', 0o666);
}
_read(size: number): void {
while (this.lineBuffer.length === 0 && this.position > 0) {
// Read `size` bytes from the end of the file.
const length = Math.min(size, this.position);
const buffer = Buffer.alloc(length);
this.position = this.position - length;
this.bytesRead += fs.readSync(this.fileDescriptor, buffer, 0, length, this.position);
// Split into lines to fill the `lineBuffer`
this.remainder = buffer.toString('utf8') + this.remainder;
this.lineBuffer = this.remainder.split(/\r?\n/);
// Ignore empty/trailing lines, `readable.push('')` is not recommended
this.lineBuffer = this.lineBuffer.filter(line => line.length > 0);
this.remainder = this.lineBuffer.shift() ?? '';
}
if (this.lineBuffer.length) {
this.push(this.lineBuffer.pop());
} else if (this.remainder.length) {
this.push(this.remainder);
this.remainder = '';
} else {
this.push(null);
}
}
_destroy(error: Error | null, callback: (error?: Error | null) => void): void {
fs.closeSync(this.fileDescriptor);
}
}

View File

@@ -17,6 +17,7 @@ import {
SyslogConfigSetLevels,
} from 'winston/lib/winston/config';
import { DbStxEvent, DbTx } from './datastore/common';
import { StacksCoreRpcClient } from './core-rpc/client';
export const isDevEnv = process.env.NODE_ENV === 'development';
export const isTestEnv = process.env.NODE_ENV === 'test';
@@ -960,3 +961,34 @@ export function isSmartContractTx(dbTx: DbTx, stxEvents: DbStxEvent[] = []): boo
}
return false;
}
/**
* Gets the chain id as reported by the Stacks node.
* @returns `ChainID` Chain id
*/
export async function getStacksNodeChainID(): Promise<ChainID> {
const client = new StacksCoreRpcClient();
await client.waitForConnection(Infinity);
const coreInfo = await client.getInfo();
if (coreInfo.network_id === ChainID.Mainnet) {
return ChainID.Mainnet;
} else if (coreInfo.network_id === ChainID.Testnet) {
return ChainID.Testnet;
} else {
throw new Error(`Unexpected network_id "${coreInfo.network_id}"`);
}
}
/**
* Gets the chain id as configured by the `STACKS_CHAIN_ID` API env variable.
* @returns `ChainID` Chain id
*/
export function getApiConfiguredChainID() {
if (!('STACKS_CHAIN_ID' in process.env)) {
const error = new Error(`Env var STACKS_CHAIN_ID is not set`);
logError(error.message, error);
throw error;
}
const configuredChainID: ChainID = parseInt(process.env['STACKS_CHAIN_ID'] as string);
return configuredChainID;
}

View File

@@ -5,12 +5,13 @@ import {
logError,
isProdEnv,
numberToHex,
httpPostRequest,
parseArgBoolean,
getApiConfiguredChainID,
getStacksNodeChainID,
} from './helpers';
import * as sourceMapSupport from 'source-map-support';
import { DataStore } from './datastore/common';
import { cycleMigrations, dangerousDropAllTables, PgDataStore } from './datastore/postgres-store';
import { PgDataStore } from './datastore/postgres-store';
import { startApiServer } from './api/init';
import { startProfilerServer } from './inspector-util';
import { startEventServer } from './event-stream/event-server';
@@ -21,15 +22,14 @@ import {
} from './event-stream/tokens-contract-handler';
import { StacksCoreRpcClient } from './core-rpc/client';
import { createServer as createPrometheusServer } from '@promster/server';
import { ChainID } from '@stacks/transactions';
import { registerShutdownConfig } from './shutdown-handler';
import { importV1TokenOfferingData, importV1BnsData } from './import-v1';
import { OfflineDummyStore } from './datastore/offline-dummy-store';
import { Socket } from 'net';
import * as getopts from 'getopts';
import * as fs from 'fs';
import * as path from 'path';
import { injectC32addressEncodeCache } from './c32-addr-cache';
import { exportEventsAsTsv, importEventsFromTsv } from './event-replay/event-replay';
enum StacksApiMode {
/**
@@ -103,29 +103,6 @@ async function monitorCoreRpcConnection(): Promise<void> {
}
}
async function getCoreChainID(): Promise<ChainID> {
const client = new StacksCoreRpcClient();
await client.waitForConnection(Infinity);
const coreInfo = await client.getInfo();
if (coreInfo.network_id === ChainID.Mainnet) {
return ChainID.Mainnet;
} else if (coreInfo.network_id === ChainID.Testnet) {
return ChainID.Testnet;
} else {
throw new Error(`Unexpected network_id "${coreInfo.network_id}"`);
}
}
function getConfiguredChainID() {
if (!('STACKS_CHAIN_ID' in process.env)) {
const error = new Error(`Env var STACKS_CHAIN_ID is not set`);
logError(error.message, error);
throw error;
}
const configuredChainID: ChainID = parseInt(process.env['STACKS_CHAIN_ID'] as string);
return configuredChainID;
}
async function init(): Promise<void> {
if (isProdEnv && !fs.existsSync('.git-info')) {
throw new Error(
@@ -161,7 +138,7 @@ async function init(): Promise<void> {
}
}
const configuredChainID = getConfiguredChainID();
const configuredChainID = getApiConfiguredChainID();
const eventServer = await startEventServer({
datastore: db,
@@ -173,7 +150,7 @@ async function init(): Promise<void> {
forceKillable: false,
});
const networkChainId = await getCoreChainID();
const networkChainId = await getStacksNodeChainID();
if (networkChainId !== configuredChainID) {
const chainIdConfig = numberToHex(configuredChainID);
const chainIdNode = numberToHex(networkChainId);
@@ -207,7 +184,7 @@ async function init(): Promise<void> {
}
if (apiMode !== StacksApiMode.writeOnly) {
const apiServer = await startApiServer({ datastore: db, chainId: getConfiguredChainID() });
const apiServer = await startApiServer({ datastore: db, chainId: getApiConfiguredChainID() });
logger.info(`API server listening on: http://${apiServer.address}`);
registerShutdownConfig({
name: 'API Server',
@@ -286,6 +263,7 @@ function getProgramArgs() {
operand: 'import-events';
options: {
['file']?: string;
['mode']?: string;
['wipe-db']?: boolean;
['force']?: boolean;
};
@@ -296,83 +274,14 @@ function getProgramArgs() {
async function handleProgramArgs() {
const { args, parsedOpts } = getProgramArgs();
if (args.operand === 'export-events') {
if (!args.options.file) {
throw new Error(`A file path should be specified with the --file option`);
}
const filePath = path.resolve(args.options.file);
if (fs.existsSync(filePath) && args.options['overwrite-file'] !== true) {
throw new Error(
`A file already exists at ${filePath}. Add --overwrite-file to truncate an existing file`
);
}
console.log(`Export event data to file: ${filePath}`);
const writeStream = fs.createWriteStream(filePath);
console.log(`Export started...`);
await PgDataStore.exportRawEventRequests(writeStream);
console.log('Export successful.');
await exportEventsAsTsv(args.options.file, args.options['overwrite-file']);
} else if (args.operand === 'import-events') {
if (!args.options.file) {
throw new Error(`A file path should be specified with the --file option`);
}
const filePath = path.resolve(args.options.file);
if (!fs.existsSync(filePath)) {
throw new Error(`File does not exist: ${filePath}`);
}
const hasData = await PgDataStore.containsAnyRawEventRequests();
if (!args.options['wipe-db'] && hasData) {
throw new Error(
`Database contains existing data. Add --wipe-db to drop the existing tables.`
);
}
if (args.options['force']) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
// This performs a "migration down" which drops the tables, then re-creates them.
// If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually,
// or the `--force` option can be used.
await cycleMigrations({ dangerousAllowDataLoss: true });
const db = await PgDataStore.connect({
usageName: 'import-events',
skipMigrations: true,
withNotifier: false,
eventReplay: true,
});
const eventServer = await startEventServer({
datastore: db,
chainId: getConfiguredChainID(),
serverHost: '127.0.0.1',
serverPort: 0,
httpLogLevel: 'debug',
});
const readStream = fs.createReadStream(filePath);
const rawEventsIterator = PgDataStore.getRawEventRequests(readStream, status => {
console.log(status);
});
// Set logger to only output for warnings/errors, otherwise the event replay will result
// in the equivalent of months/years of API log output.
logger.level = 'warn';
// Disable this feature so a redundant export file isn't created while importing from an existing one.
delete process.env['STACKS_EXPORT_EVENTS_FILE'];
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();
await importEventsFromTsv(
args.options.file,
args.options.mode,
args.options['wipe-db'],
args.options.force
);
} else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else {

View File

@@ -0,0 +1,122 @@
import * as fs from 'fs';
import { findTsvBlockHeight } from '../event-replay/helpers';
import { ReverseFileStream } from '../event-replay/reverse-file-stream';
describe('event replay tests', () => {
function writeTmpFile(fileName: string, contents: string): string {
try {
fs.mkdirSync('./.tmp');
} catch (error: any) {
if (error.code != 'EEXIST') throw error;
}
const path = `./.tmp/${fileName}`;
fs.writeFileSync(path, contents, { encoding: 'utf-8' });
return path;
}
test('ReverseFileStream handles backpressure', async () => {
let contents = '';
for (let i = 1; i <= 1000; i++) {
contents += `line${i}\n`;
}
const testFilePath = writeTmpFile('test1.txt', contents);
try {
// Default stream buffer is 64KB, set to 300 bytes so file is larger than memory buffer
const reverseStream = new ReverseFileStream(testFilePath, { highWaterMark: 300 });
const output: string[] = [];
let linesStreamed = 0;
for await (const data of reverseStream) {
linesStreamed++;
output.push(data);
if (linesStreamed === 4) {
break;
}
}
expect(linesStreamed).toEqual(4);
expect(output).toEqual(['line1000', 'line999', 'line998', 'line997']);
expect(reverseStream.bytesRead).toBeLessThan(reverseStream.fileLength);
// Read whole file
const reverseStream2 = new ReverseFileStream(testFilePath, { highWaterMark: 300 });
const output2: string[] = [];
let linesStreamed2 = 0;
for await (const data of reverseStream2) {
linesStreamed2++;
output2.push(data);
}
expect(linesStreamed2).toEqual(1000);
expect(output2[0]).toBe('line1000');
expect(output2[output2.length - 1]).toBe('line1');
expect(reverseStream2.bytesRead).toBe(reverseStream2.fileLength);
} finally {
fs.unlinkSync(testFilePath);
}
});
test('ReverseFileStream streams file in reverse', async () => {
const contents = `line1
line2
line3
line4`;
const testFilePath = writeTmpFile('test1.txt', contents);
try {
const reverseStream = new ReverseFileStream(testFilePath);
const output: string[] = [];
let linesStreamed = 0;
for await (const data of reverseStream) {
linesStreamed++;
output.push(data);
}
expect(linesStreamed).toEqual(4);
expect(output).toEqual(['line4', 'line3', 'line2', 'line1']);
} finally {
fs.unlinkSync(testFilePath);
}
});
test('ReverseFileStream streams file in reverse', async () => {
const contents = ['line1', 'line2', 'line3', 'line4'].join('\r\n');
const testFilePath = writeTmpFile('test1.txt', contents);
try {
const reverseStream = new ReverseFileStream(testFilePath);
const output: string[] = [];
let linesStreamed = 0;
for await (const data of reverseStream) {
linesStreamed++;
output.push(data);
}
expect(linesStreamed).toEqual(4);
expect(output).toEqual(['line4', 'line3', 'line2', 'line1']);
} finally {
fs.unlinkSync(testFilePath);
}
});
test('TSV block height is found', async () => {
const contents = `744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]
744275\t2022-02-21 16:07:01.123587+00\t/new_block\t{"block_height": 1200}
744275\t2022-02-21 16:07:01.123587+00\t/new_block\t{"block_height": 1201}
744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]`;
const testFilePath = writeTmpFile('test1.tsv', contents);
try {
const blockHeight = await findTsvBlockHeight(testFilePath);
expect(blockHeight).toEqual(1201);
} finally {
fs.unlinkSync(testFilePath);
}
});
test('TSV block height is 0 if not found', async () => {
const contents = `744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]
744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]
744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]
744275\t2022-02-21 16:07:01.123587+00\t/new_mempool_tx\t[]`;
const testFilePath = writeTmpFile('test1.tsv', contents);
try {
const blockHeight = await findTsvBlockHeight(testFilePath);
expect(blockHeight).toEqual(0);
} finally {
fs.unlinkSync(testFilePath);
}
});
});