mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
fix: changed processing order
This commit is contained in:
@@ -3308,4 +3308,30 @@ export class PgWriteStore extends PgStore {
|
||||
throw new Error(`No updates made while toggling table indexes`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* (event-replay) Reindex all DB tables.
|
||||
*/
|
||||
async reindexAllTables(sql: PgSqlClient): Promise<void> {
|
||||
const dbName = sql.options.database;
|
||||
const tableSchema = sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
for (const table of tables) {
|
||||
console.log(table)
|
||||
const result = await sql`REINDEX TABLE ${sql(table)}`;
|
||||
if (result.count === 0) {
|
||||
throw new Error(`No updates made while toggling table indexes`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/*.parquet')",
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet')",
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -109,7 +109,7 @@ export class DatasetStore {
|
||||
con.all(
|
||||
`SELECT method, payload FROM READ_PARQUET([
|
||||
'events/new_burn_block/canonical/*.parquet',
|
||||
'events/attachments/new/*.parquet',
|
||||
'events/attachments/new/canonical/*.parquet',
|
||||
'events/new_microblocks/*.parquet',
|
||||
'events/drop_mempool_tx/*.parquet',
|
||||
'events/new_mempool_tx/*.parquet',
|
||||
|
||||
@@ -240,7 +240,11 @@ export class ReplayController {
|
||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
|
||||
|
||||
// to be replayed with regular HTTP POSTs
|
||||
// Re-indexing tables
|
||||
logger.info({ component: 'event-replay' }, 'Re-indexing tables');
|
||||
await this.db.reindexAllTables(this.db.sql);
|
||||
|
||||
// Remainder events to be replayed with regular HTTP POSTs
|
||||
await this.ingestRemainderEvents();
|
||||
|
||||
// Refreshing materialized views
|
||||
@@ -260,13 +264,23 @@ export class ReplayController {
|
||||
*
|
||||
*/
|
||||
do = async () => {
|
||||
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
|
||||
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||
|
||||
// RAW events to event_observer_requests table
|
||||
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
|
||||
|
||||
// NEW_BLOCK events
|
||||
await this.ingestNewBlockEvents();
|
||||
|
||||
// RAW events to event_observer_requests table
|
||||
await Promise.all(
|
||||
[
|
||||
this.ingestRawEvents(),
|
||||
this.ingestRawNewBlockEvents()
|
||||
]
|
||||
);
|
||||
|
||||
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
|
||||
await Promise.all(
|
||||
[
|
||||
this.ingestNewBurnBlockEvents(),
|
||||
this.ingestAttachmentNewEvents()
|
||||
]
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user