fix: changed processing order

This commit is contained in:
Chris Guimaraes
2023-07-31 16:49:41 +01:00
parent c77ac57a96
commit 62a12bdef9
3 changed files with 49 additions and 9 deletions

View File

@@ -3308,4 +3308,30 @@ export class PgWriteStore extends PgStore {
throw new Error(`No updates made while toggling table indexes`);
}
}
/**
* (event-replay) Reindex all DB tables.
*/
async reindexAllTables(sql: PgSqlClient): Promise<void> {
const dbName = sql.options.database;
const tableSchema = sql.options.connection.search_path ?? 'public';
const tablesQuery = await sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
for (const table of tables) {
console.log(table)
const result = await sql`REINDEX TABLE ${sql(table)}`;
if (result.count === 0) {
throw new Error(`No updates made while toggling table indexes`);
}
}
}
}

View File

@@ -87,7 +87,7 @@ export class DatasetStore {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
"SELECT payload FROM READ_PARQUET('events/attachments/new/*.parquet')",
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet')",
(err: any, result: any) => {
if (err) {
throw err;
@@ -109,7 +109,7 @@ export class DatasetStore {
con.all(
`SELECT method, payload FROM READ_PARQUET([
'events/new_burn_block/canonical/*.parquet',
'events/attachments/new/*.parquet',
'events/attachments/new/canonical/*.parquet',
'events/new_microblocks/*.parquet',
'events/drop_mempool_tx/*.parquet',
'events/new_mempool_tx/*.parquet',

View File

@@ -240,7 +240,11 @@ export class ReplayController {
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
// to be replayed with regular HTTP POSTs
// Re-indexing tables
logger.info({ component: 'event-replay' }, 'Re-indexing tables');
await this.db.reindexAllTables(this.db.sql);
// Remainder events to be replayed with regular HTTP POSTs
await this.ingestRemainderEvents();
// Refreshing materialized views
@@ -260,13 +264,23 @@ export class ReplayController {
*
*/
do = async () => {
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
// RAW events to event_observer_requests table
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
// NEW_BLOCK events
await this.ingestNewBlockEvents();
// RAW events to event_observer_requests table
await Promise.all(
[
this.ingestRawEvents(),
this.ingestRawNewBlockEvents()
]
);
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
await Promise.all(
[
this.ingestNewBurnBlockEvents(),
this.ingestAttachmentNewEvents()
]
);
};
}