fix: on attachments_new events processing

This commit is contained in:
Chris Guimaraes
2023-08-02 11:29:52 +01:00
parent 82eadcbe2f
commit 07073139cc
6 changed files with 147 additions and 149 deletions

View File

@@ -3224,48 +3224,6 @@ export class PgWriteStore extends PgStore {
`;
}
async updateBatchSubdomainsEventReplay(
sql: PgSqlClient,
data: DataStoreAttachmentSubdomainData[]
): Promise<void> {
const subdomainValues: BnsSubdomainInsertValues[] = [];
for (const dataItem of data) {
if (dataItem.subdomains && dataItem.blockData) {
for (const subdomain of dataItem.subdomains) {
subdomainValues.push({
name: subdomain.name,
namespace_id: subdomain.namespace_id,
fully_qualified_subdomain: subdomain.fully_qualified_subdomain,
owner: subdomain.owner,
zonefile_hash: validateZonefileHash(subdomain.zonefile_hash),
parent_zonefile_hash: subdomain.parent_zonefile_hash,
parent_zonefile_index: subdomain.parent_zonefile_index,
block_height: subdomain.block_height,
tx_index: subdomain.tx_index,
zonefile_offset: subdomain.zonefile_offset,
resolver: subdomain.resolver,
canonical: subdomain.canonical,
tx_id: subdomain.tx_id,
index_block_hash: dataItem.blockData.index_block_hash,
parent_index_block_hash: dataItem.blockData.parent_index_block_hash,
microblock_hash: dataItem.blockData.microblock_hash,
microblock_sequence: dataItem.blockData.microblock_sequence,
microblock_canonical: dataItem.blockData.microblock_canonical,
});
}
}
}
if (subdomainValues.length === 0) {
return;
}
const result = await sql`
INSERT INTO subdomains ${sql(subdomainValues)}
`;
if (result.count !== subdomainValues.length) {
throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`);
}
}
async insertRawEventRequestBatch(
sql: PgSqlClient,
events: RawEventRequestInsertValues[]
@@ -3276,7 +3234,7 @@ export class PgWriteStore extends PgStore {
}
/**
* (event-replay) Enable or disable indexes for all DB tables.
* (event-replay) Enable or disable indexes for DB tables.
*/
async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise<void> {
const enable: boolean = Boolean(state);
@@ -3293,12 +3251,16 @@ export class PgWriteStore extends PgStore {
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
// Exclude subdomains table since its constraints
// are need to handle the ingestion of attachments_new events.
const filtered = tables.filter(item => item !== 'subdomains');
const result = await sql`
UPDATE pg_index
SET ${sql({ indisready: enable, indisvalid: enable })}
WHERE indrelid = ANY (
SELECT oid FROM pg_class
WHERE relname IN ${sql(tables)}
WHERE relname IN ${sql(filtered)}
AND relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname = ${tableSchema}
)

View File

@@ -83,19 +83,14 @@ export class DatasetStore {
// ATTACHMENTS_NEW EVENTS
//
attachmentsNewEvents = (): Promise<TableData> => {
attachmentsCanonicalEvents = (): Promise<QueryResult> => {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet')",
(err: any, result: any) => {
if (err) {
throw err;
}
resolve(result);
}
const res = con.stream(
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet') ORDER BY id"
);
resolve(res);
});
};
@@ -155,4 +150,24 @@ export class DatasetStore {
);
});
};
//
// CANONICAL BLOCK_HASHES
//
canonicalBlockHashes = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
con.all(
"SELECT * FROM READ_PARQUET('events/canonical/block_hashes/*.parquet')",
(err: any, res: any) => {
if (err) {
throw err;
}
resolve(res);
}
);
});
};
}

View File

@@ -1,69 +1,123 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-unsafe-return */
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { batchIterate } from '../../../helpers';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { parseAttachmentMessage } from '../../../event-stream/event-server';
import { parseAttachment } from '../../../event-stream/event-server';
import { logger } from '../../../logger';
import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-message';
import { DataStoreAttachmentSubdomainData, DataStoreBnsBlockData } from '../../../datastore/common';
import { DataStoreAttachmentSubdomainData } from '../../../datastore/common';
import { DatasetStore } from '../dataset/store';
import { I32_MAX } from '../../../helpers';
const batchInserters: BatchInserter[] = [];
interface BatchInserter<T = any> {
push(entries: T[]): Promise<void>;
flush(): Promise<void>;
}
function createBatchInserter<T>({
batchSize,
insertFn,
}: {
batchSize: number;
insertFn: (entries: T[]) => Promise<void>;
}): BatchInserter<T> {
let entryBuffer: T[] = [];
return {
async push(entries: T[]) {
entries.length === 1
? entryBuffer.push(entries[0])
: entries.forEach(e => entryBuffer.push(e));
if (entryBuffer.length === batchSize) {
await insertFn(entryBuffer);
entryBuffer.length = 0;
} else if (entryBuffer.length > batchSize) {
for (const batch of batchIterate(entryBuffer, batchSize)) {
await insertFn(batch);
}
entryBuffer.length = 0;
}
},
async flush() {
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
if (entryBuffer.length > 0) {
await insertFn(entryBuffer);
entryBuffer = [];
}
},
};
}
const insertInBatch = (db: PgWriteStore, canonicalBlockHashes: any) => {
const dbAttachmentEventBatchInserter = createBatchInserter<DataStoreAttachmentSubdomainData>({
batchSize: 1,
insertFn: async entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into subdomains table...');
return await db.updateBatchSubdomains(db.sql, entries);
},
});
batchInserters.push(dbAttachmentEventBatchInserter);
return new Writable({
objectMode: true,
write: async (data, _encoding, next) => {
const dataStoreAttachments: DataStoreAttachmentSubdomainData[] = [];
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(data.payload);
const attachments = parseAttachment(attachmentMsg);
for (const subdomain of attachments.subdomains) {
const dataStoreAttachment: DataStoreAttachmentSubdomainData = {};
const indexBlockHash = subdomain.index_block_hash!;
const blockEntityData = canonicalBlockHashes[subdomain.block_height - 1];
const parentIndexBlockHash =
canonicalBlockHashes[subdomain.block_height - 2]['index_block_hash'];
const microblocks = JSON.parse(blockEntityData['microblock']);
const microblockIndex = microblocks.findIndex(
(mb: any, index: any) => index > 0 && mb[1].includes(subdomain.tx_id)
);
// derive from entity hash index
subdomain.tx_index = JSON.parse(blockEntityData['microblock'])
.flatMap((m: any) => m[1])
.findIndex((tx: any) => tx === subdomain.tx_id);
const blockData = {
index_block_hash: indexBlockHash,
parent_index_block_hash: parentIndexBlockHash,
microblock_hash: microblockIndex !== -1 ? microblocks[microblockIndex][0] : '',
microblock_sequence: microblockIndex !== -1 ? microblockIndex - 1 : I32_MAX,
microblock_canonical: true,
};
dataStoreAttachment.blockData = blockData;
dataStoreAttachment.subdomains = attachments.subdomains;
dataStoreAttachments.push(dataStoreAttachment);
}
await dbAttachmentEventBatchInserter.push(dataStoreAttachments);
next();
},
});
};
export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'ATTACHMENTS_NEW events process started');
const attachmentsNewEvents = await dataset.attachmentsNewEvents();
const ary: DataStoreAttachmentSubdomainData[] = [];
const canonicalEvents = await dataset.attachmentsCanonicalEvents();
const canonicalBlockHashes: any = await dataset.canonicalBlockHashes();
const insert = insertInBatch(db, canonicalBlockHashes);
for await (const event of attachmentsNewEvents) {
const blockData: DataStoreBnsBlockData = {
index_block_hash: '',
parent_index_block_hash: '',
microblock_hash: '',
microblock_sequence: I32_MAX,
microblock_canonical: true,
};
const dataStore: DataStoreAttachmentSubdomainData = {};
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(event.payload);
const attachments = parseAttachmentMessage(attachmentMsg);
dataStore.subdomains = attachments.subdomainObj.dbBnsSubdomain;
blockData.index_block_hash = attachments.subdomainObj.attachmentData.indexBlockHash;
dataStore.blockData = blockData;
dataStore.attachment = attachments.subdomainObj.attachmentData;
ary.push(dataStore);
await pipeline(
Readable.from(canonicalEvents),
insert.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
const blockHeights = [];
for (const el of ary) {
if (el.subdomains!.length !== 0) {
blockHeights.push(el.attachment!.blockHeight);
}
}
if (blockHeights.length > 0) {
// get events from block heights
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
for (const event of blockEvents) {
for (const ds of ary) {
if (ds.blockData?.index_block_hash === event.index_block_hash) {
const txs = JSON.parse(event.payload).transactions;
for (const tx of txs) {
if (ds.attachment!.txId === tx.txid) {
ds.blockData!.microblock_hash = tx.microblock_hash || '';
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
}
}
ds.blockData!.index_block_hash = event.index_block_hash;
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
}
}
}
}
await db.updateBatchSubdomainsEventReplay(db.sql, ary);
})
);
};

View File

@@ -1,13 +1,11 @@
import { Readable, Writable, Transform } from 'stream';
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { RawEventRequestInsertValues } from '../../../datastore/common';
import { logger } from '../../../logger';
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
import { batchIterate } from '../../../helpers';
import { DatasetStore } from '../dataset/store';
const chainID = getApiConfiguredChainID();
const batchInserters: BatchInserter[] = [];
interface BatchInserter<T = any> {

View File

@@ -267,7 +267,7 @@ export class ReplayController {
// NEW_BLOCK events
await this.ingestNewBlockEvents();
// RAW events to event_observer_requests table
// // RAW events to event_observer_requests table
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events

View File

@@ -1081,26 +1081,9 @@ export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage
return dbData;
}
export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
export function parseAttachment(msg: CoreNodeAttachmentMessage[]) {
const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = [];
const subdomainObj: {
attachmentData: DataStoreAttachmentData;
dbBnsSubdomain: DbBnsSubdomain[];
} = {
attachmentData: {
op: '',
name: '',
namespace: '',
zonefile: '',
zonefileHash: '',
txId: '',
indexBlockHash: '',
blockHeight: 0,
},
dbBnsSubdomain: [],
};
const subdomains: DbBnsSubdomain[] = [];
for (const attachment of msg) {
if (
attachment.contract_id === BnsContractIdentifier.mainnet ||
@@ -1149,25 +1132,11 @@ export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '',
index_block_hash: attachment.index_block_hash,
};
const attachmentData: DataStoreAttachmentData = {
op: op,
name: subdomain.name,
namespace: '',
zonefile: subdomain.zonefile,
zonefileHash: subdomain.zonefile_hash,
txId: subdomain.tx_id,
indexBlockHash: subdomain.index_block_hash || '',
blockHeight: subdomain.block_height,
};
subdomainObj.dbBnsSubdomain.push(subdomain);
subdomainObj.attachmentData = attachmentData;
subdomains.push(subdomain);
}
}
}
}
}
return { zoneFiles, subdomainObj };
return { zoneFiles, subdomains };
}