fix: guarantee db is empty before performing a replay (#1374)

* feat: add event-replay test suite

* test: bns genesis block

* chore: rename tsv to mainnet

* test: export import cycle

* chore: reduce mainnet tsv size

* fix: db has data check

* fix: drop views first, tables second

* feat: friendly error when migration cycle failed
This commit is contained in:
Rafael Cárdenas
2022-10-27 15:29:56 -05:00
committed by GitHub
parent eb9fd0e551
commit ef8e7a9185
11 changed files with 265 additions and 40 deletions

17
.vscode/launch.json vendored
View File

@@ -155,6 +155,23 @@
"preLaunchTask": "stacks-node:deploy-dev",
"postDebugTask": "stacks-node:stop-dev"
},
{
"type": "node",
"request": "launch",
"name": "Jest: Event Replay",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"args": [
"--testTimeout=3600000",
"--runInBand",
"--no-cache",
"--config",
"${workspaceRoot}/jest.config.event-replay.js"
],
"outputCapture": "std",
"console": "integratedTerminal",
"preLaunchTask": "stacks-node:deploy-dev",
"postDebugTask": "stacks-node:stop-dev"
},
{
"type": "node",
"request": "launch",

View File

@@ -0,0 +1,15 @@
module.exports = {
preset: 'ts-jest',
rootDir: 'src',
testMatch: ['<rootDir>/tests-event-replay/**/*.ts'],
testPathIgnorePatterns: [
'<rootDir>/tests-event-replay/setup.ts',
'<rootDir>/tests-event-replay/teardown.ts',
],
collectCoverageFrom: ['<rootDir>/**/*.ts'],
coveragePathIgnorePatterns: ['<rootDir>/tests*'],
coverageDirectory: '../coverage',
globalSetup: '<rootDir>/tests-event-replay/setup.ts',
globalTeardown: '<rootDir>/tests-event-replay/teardown.ts',
testTimeout: 20000,
};

View File

@@ -1,7 +1,7 @@
import { pipelineAsync } from '../helpers';
import { Readable, Writable } from 'stream';
import { DbRawEventRequest } from './common';
import { PgServer } from './connection';
import { connectPostgres, PgServer } from './connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
@@ -119,22 +119,36 @@ export async function* getRawEventRequests(
}
}
export async function containsAnyRawEventRequests(): Promise<boolean> {
const pool = await connectPgPool({
usageName: 'contains-raw-events-check',
/**
* Check the `pg_class` table for any data structures contained in the database. We will consider
* any and all results here as "data" contained in the DB, since anything that is not a completely
* empty DB could lead to strange errors when running the API. See:
* https://www.postgresql.org/docs/current/catalog-pg-class.html
* @returns `boolean` if the DB has data
*/
export async function databaseHasData(args?: {
ignoreMigrationTables?: boolean;
}): Promise<boolean> {
const sql = await connectPostgres({
usageName: 'contains-data-check',
pgServer: PgServer.primary,
});
const client = await pool.connect();
try {
const result = await client.query('SELECT id from event_observer_requests LIMIT 1');
return result.rowCount > 0;
const ignoreMigrationTables = args?.ignoreMigrationTables ?? false;
const result = await sql<{ count: number }[]>`
SELECT COUNT(*)
FROM pg_class c
JOIN pg_namespace s ON s.oid = c.relnamespace
WHERE s.nspname = ${sql.options.connection.search_path}
${ignoreMigrationTables ? sql`AND c.relname NOT LIKE 'pgmigrations%'` : sql``}
`;
return result.count > 0 && result[0].count > 0;
} catch (error: any) {
if (error.message?.includes('does not exist')) {
return false;
}
throw error;
} finally {
client.release();
await pool.end();
await sql.end();
}
}

View File

@@ -3,6 +3,8 @@ import PgMigrate, { RunnerOption } from 'node-pg-migrate';
import { Client } from 'pg';
import { APP_DIR, isDevEnv, isTestEnv, logError, logger } from '../helpers';
import { getPgClientConfig, PgClientConfig } from './connection-legacy';
import { connectPostgres, PgServer } from './connection';
import { databaseHasData } from './event-requests';
const MIGRATIONS_TABLE = 'pgmigrations';
const MIGRATIONS_DIR = path.join(APP_DIR, 'migrations');
@@ -52,10 +54,14 @@ export async function runMigrations(
export async function cycleMigrations(opts?: {
// Bypass the NODE_ENV check when performing a "down" migration which irreversibly drops data.
dangerousAllowDataLoss?: boolean;
checkForEmptyData?: boolean;
}): Promise<void> {
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });
await runMigrations(clientConfig, 'down', opts);
if (opts?.checkForEmptyData && (await databaseHasData({ ignoreMigrationTables: true }))) {
throw new Error('Migration down process did not completely remove DB tables');
}
await runMigrations(clientConfig, 'up', opts);
}
@@ -65,30 +71,31 @@ export async function dangerousDropAllTables(opts?: {
if (opts?.acknowledgePotentialCatastrophicConsequences !== 'yes') {
throw new Error('Dangerous usage error.');
}
const clientConfig = getPgClientConfig({ usageName: 'dangerous-drop-all-tables' });
const client = new Client(clientConfig);
const sql = await connectPostgres({
usageName: 'dangerous-drop-all-tables',
pgServer: PgServer.primary,
});
const schema = sql.options.connection.search_path;
try {
await client.connect();
await client.query('BEGIN');
const getTablesQuery = await client.query<{ table_name: string }>(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
AND table_catalog = $2
AND table_type = 'BASE TABLE'
`,
[clientConfig.schema, clientConfig.database]
);
const tables = getTablesQuery.rows.map(r => r.table_name);
for (const table of tables) {
await client.query(`DROP TABLE IF EXISTS ${table} CASCADE`);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
await sql.begin(async sql => {
const relNamesQuery = async (kind: string) => sql<{ relname: string }[]>`
SELECT relname
FROM pg_class c
JOIN pg_namespace s ON s.oid = c.relnamespace
WHERE s.nspname = ${schema} AND c.relkind = ${kind}
`;
// Remove materialized views first and tables second.
// Using CASCADE in these DROP statements also removes associated indexes and constraints.
const views = await relNamesQuery('m');
for (const view of views) {
await sql`DROP MATERIALIZED VIEW IF EXISTS ${sql(view.relname)} CASCADE`;
}
const tables = await relNamesQuery('r');
for (const table of tables) {
await sql`DROP TABLE IF EXISTS ${sql(table.relname)} CASCADE`;
}
});
} finally {
await client.end();
await sql.end();
}
}

View File

@@ -5,7 +5,7 @@ import { defaultLogLevel, getApiConfiguredChainID, httpPostRequest, logger } fro
import { findBnsGenesisBlockData, findTsvBlockHeight, getDbBlockHeight } from './helpers';
import { importV1BnsNames, importV1BnsSubdomains, importV1TokenOfferingData } from '../import-v1';
import {
containsAnyRawEventRequests,
databaseHasData,
exportRawEventRequests,
getRawEventRequests,
} from '../datastore/event-requests';
@@ -90,7 +90,7 @@ export async function importEventsFromTsv(
default:
throw new Error(`Invalid event import mode: ${importMode}`);
}
const hasData = await containsAnyRawEventRequests();
const hasData = await databaseHasData();
if (!wipeDb && hasData) {
throw new Error(`Database contains existing data. Add --wipe-db to drop the existing tables.`);
}
@@ -98,10 +98,14 @@ export async function importEventsFromTsv(
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
// This performs a "migration down" which drops the tables, then re-creates them.
// If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually,
// or the `--force` option can be used.
await cycleMigrations({ dangerousAllowDataLoss: true });
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error(
`DB migration cycle failed, possibly due to an incompatible API version upgrade. Add --wipe-db --force or perform a manual DB wipe before importing.`
);
}
// Look for the TSV's block height and determine the prunable block window.
const tsvBlockHeight = await findTsvBlockHeight(resolvedFilePath);

View File

@@ -1,8 +1,8 @@
import * as fs from 'fs';
import { findTsvBlockHeight } from '../event-replay/helpers';
import { findBnsGenesisBlockData, findTsvBlockHeight } from '../event-replay/helpers';
import { ReverseFileStream } from '../event-replay/reverse-file-stream';
describe('event replay tests', () => {
describe('helper tests', () => {
function writeTmpFile(fileName: string, contents: string): string {
try {
fs.mkdirSync('./.tmp');
@@ -119,4 +119,17 @@ line4`;
fs.unlinkSync(testFilePath);
}
});
test('BNS genesis block data is found', async () => {
const genesisBlock = await findBnsGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv');
expect(genesisBlock).toEqual({
index_block_hash: '0x918697ef63f9d8bdf844c3312b299e72a231cde542f3173f7755bb8c1cdaf3a7',
parent_index_block_hash: '0x55c9861be5cff984a20ce6d99d4aa65941412889bdc665094136429b84f8c2ee',
microblock_hash: '0x0000000000000000000000000000000000000000000000000000000000000000',
microblock_sequence: 0,
microblock_canonical: true,
tx_id: '0x2f079994c9bd92b2272258b9de73e278824d76efe1b5a83a3b00941f9559de8a',
tx_index: 7,
});
});
});

View File

@@ -0,0 +1,101 @@
import * as fs from 'fs';
import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay';
import { PgWriteStore } from '../datastore/pg-write-store';
import { dangerousDropAllTables, runMigrations } from '../datastore/migrations';
import { databaseHasData } from '../datastore/event-requests';
import { getPgClientConfig } from '../datastore/connection-legacy';
describe('import/export tests', () => {
let db: PgWriteStore;
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
db = await PgWriteStore.connect({
usageName: 'tests',
withNotifier: false,
skipMigrations: true,
});
});
test('event import and export cycle', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getUnanchoredChainTip();
expect(chainTip.found).toBe(true);
expect(chainTip.result?.blockHeight).toBe(28);
expect(chainTip.result?.indexBlockHash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(chainTip.result?.blockHash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp';
try {
fs.mkdirSync(tmpDir);
} catch (error: any) {
if (error.code != 'EEXIST') throw error;
}
const tmpTsvPath = `${tmpDir}/export.tsv`;
await exportEventsAsTsv(tmpTsvPath, true);
// Re-import with exported TSV and check that chain tip matches.
try {
await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true);
const newChainTip = await db.getUnanchoredChainTip();
expect(newChainTip.found).toBe(true);
expect(newChainTip.result?.blockHeight).toBe(28);
expect(newChainTip.result?.indexBlockHash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(newChainTip.result?.blockHash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(tmpDir, { force: true, recursive: true });
}
});
test('import with db wipe options', async () => {
// Migrate first so we have some data.
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });
await runMigrations(clientConfig, 'up', {});
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', false, false)
).rejects.toThrowError('contains existing data');
// Create strange table
await db.sql`CREATE TABLE IF NOT EXISTS test (a varchar(10))`;
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, false)
).rejects.toThrowError('migration cycle failed');
// Force and test
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true)
).resolves.not.toThrow();
});
test('db contains data', async () => {
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });
await runMigrations(clientConfig, 'up', {});
// Having tables counts as having data as this may change across major versions.
await expect(databaseHasData()).resolves.toBe(true);
// Dropping all tables removes everything.
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
await expect(databaseHasData()).resolves.toBe(false);
// Cycling migrations leaves the `pgmigrations` table.
await runMigrations(clientConfig, 'up', {});
await runMigrations(clientConfig, 'down', {});
await expect(databaseHasData()).resolves.toBe(true);
await expect(databaseHasData({ ignoreMigrationTables: true })).resolves.toBe(false);
});
afterEach(async () => {
await db?.close();
});
});

View File

@@ -0,0 +1,11 @@
import { loadDotEnv } from '../helpers';
// ts-unused-exports:disable-next-line
export default (): void => {
console.log('Jest - setup..');
if (!process.env.NODE_ENV) {
process.env.NODE_ENV = 'test';
}
loadDotEnv();
console.log('Jest - setup done');
};

View File

@@ -0,0 +1,5 @@
// ts-unused-exports:disable-next-line
export default (): void => {
console.log('Jest - teardown');
console.log('Jest - teardown done');
};

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long