mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
fix: on attachments_new events processing
This commit is contained in:
@@ -3224,48 +3224,6 @@ export class PgWriteStore extends PgStore {
|
||||
`;
|
||||
}
|
||||
|
||||
async updateBatchSubdomainsEventReplay(
|
||||
sql: PgSqlClient,
|
||||
data: DataStoreAttachmentSubdomainData[]
|
||||
): Promise<void> {
|
||||
const subdomainValues: BnsSubdomainInsertValues[] = [];
|
||||
for (const dataItem of data) {
|
||||
if (dataItem.subdomains && dataItem.blockData) {
|
||||
for (const subdomain of dataItem.subdomains) {
|
||||
subdomainValues.push({
|
||||
name: subdomain.name,
|
||||
namespace_id: subdomain.namespace_id,
|
||||
fully_qualified_subdomain: subdomain.fully_qualified_subdomain,
|
||||
owner: subdomain.owner,
|
||||
zonefile_hash: validateZonefileHash(subdomain.zonefile_hash),
|
||||
parent_zonefile_hash: subdomain.parent_zonefile_hash,
|
||||
parent_zonefile_index: subdomain.parent_zonefile_index,
|
||||
block_height: subdomain.block_height,
|
||||
tx_index: subdomain.tx_index,
|
||||
zonefile_offset: subdomain.zonefile_offset,
|
||||
resolver: subdomain.resolver,
|
||||
canonical: subdomain.canonical,
|
||||
tx_id: subdomain.tx_id,
|
||||
index_block_hash: dataItem.blockData.index_block_hash,
|
||||
parent_index_block_hash: dataItem.blockData.parent_index_block_hash,
|
||||
microblock_hash: dataItem.blockData.microblock_hash,
|
||||
microblock_sequence: dataItem.blockData.microblock_sequence,
|
||||
microblock_canonical: dataItem.blockData.microblock_canonical,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
if (subdomainValues.length === 0) {
|
||||
return;
|
||||
}
|
||||
const result = await sql`
|
||||
INSERT INTO subdomains ${sql(subdomainValues)}
|
||||
`;
|
||||
if (result.count !== subdomainValues.length) {
|
||||
throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`);
|
||||
}
|
||||
}
|
||||
|
||||
async insertRawEventRequestBatch(
|
||||
sql: PgSqlClient,
|
||||
events: RawEventRequestInsertValues[]
|
||||
@@ -3276,7 +3234,7 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* (event-replay) Enable or disable indexes for all DB tables.
|
||||
* (event-replay) Enable or disable indexes for DB tables.
|
||||
*/
|
||||
async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise<void> {
|
||||
const enable: boolean = Boolean(state);
|
||||
@@ -3293,12 +3251,16 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
// Exclude subdomains table since its constraints
|
||||
// are need to handle the ingestion of attachments_new events.
|
||||
const filtered = tables.filter(item => item !== 'subdomains');
|
||||
|
||||
const result = await sql`
|
||||
UPDATE pg_index
|
||||
SET ${sql({ indisready: enable, indisvalid: enable })}
|
||||
WHERE indrelid = ANY (
|
||||
SELECT oid FROM pg_class
|
||||
WHERE relname IN ${sql(tables)}
|
||||
WHERE relname IN ${sql(filtered)}
|
||||
AND relnamespace = (
|
||||
SELECT oid FROM pg_namespace WHERE nspname = ${tableSchema}
|
||||
)
|
||||
|
||||
@@ -83,19 +83,14 @@ export class DatasetStore {
|
||||
// ATTACHMENTS_NEW EVENTS
|
||||
//
|
||||
|
||||
attachmentsNewEvents = (): Promise<TableData> => {
|
||||
attachmentsCanonicalEvents = (): Promise<QueryResult> => {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet')",
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
resolve(result);
|
||||
}
|
||||
const res = con.stream(
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet') ORDER BY id"
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
});
|
||||
};
|
||||
|
||||
@@ -155,4 +150,24 @@ export class DatasetStore {
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
//
|
||||
// CANONICAL BLOCK_HASHES
|
||||
//
|
||||
|
||||
canonicalBlockHashes = (): Promise<QueryResult> => {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
"SELECT * FROM READ_PARQUET('events/canonical/block_hashes/*.parquet')",
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
resolve(res);
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,69 +1,123 @@
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-return */
|
||||
|
||||
import { Readable, Writable } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { batchIterate } from '../../../helpers';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseAttachmentMessage } from '../../../event-stream/event-server';
|
||||
import { parseAttachment } from '../../../event-stream/event-server';
|
||||
import { logger } from '../../../logger';
|
||||
import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-message';
|
||||
import { DataStoreAttachmentSubdomainData, DataStoreBnsBlockData } from '../../../datastore/common';
|
||||
import { DataStoreAttachmentSubdomainData } from '../../../datastore/common';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { I32_MAX } from '../../../helpers';
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
push(entries: T[]): Promise<void>;
|
||||
flush(): Promise<void>;
|
||||
}
|
||||
|
||||
function createBatchInserter<T>({
|
||||
batchSize,
|
||||
insertFn,
|
||||
}: {
|
||||
batchSize: number;
|
||||
insertFn: (entries: T[]) => Promise<void>;
|
||||
}): BatchInserter<T> {
|
||||
let entryBuffer: T[] = [];
|
||||
return {
|
||||
async push(entries: T[]) {
|
||||
entries.length === 1
|
||||
? entryBuffer.push(entries[0])
|
||||
: entries.forEach(e => entryBuffer.push(e));
|
||||
if (entryBuffer.length === batchSize) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer.length = 0;
|
||||
} else if (entryBuffer.length > batchSize) {
|
||||
for (const batch of batchIterate(entryBuffer, batchSize)) {
|
||||
await insertFn(batch);
|
||||
}
|
||||
entryBuffer.length = 0;
|
||||
}
|
||||
},
|
||||
async flush() {
|
||||
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
|
||||
if (entryBuffer.length > 0) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer = [];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const insertInBatch = (db: PgWriteStore, canonicalBlockHashes: any) => {
|
||||
const dbAttachmentEventBatchInserter = createBatchInserter<DataStoreAttachmentSubdomainData>({
|
||||
batchSize: 1,
|
||||
insertFn: async entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into subdomains table...');
|
||||
return await db.updateBatchSubdomains(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbAttachmentEventBatchInserter);
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data, _encoding, next) => {
|
||||
const dataStoreAttachments: DataStoreAttachmentSubdomainData[] = [];
|
||||
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(data.payload);
|
||||
const attachments = parseAttachment(attachmentMsg);
|
||||
|
||||
for (const subdomain of attachments.subdomains) {
|
||||
const dataStoreAttachment: DataStoreAttachmentSubdomainData = {};
|
||||
const indexBlockHash = subdomain.index_block_hash!;
|
||||
const blockEntityData = canonicalBlockHashes[subdomain.block_height - 1];
|
||||
const parentIndexBlockHash =
|
||||
canonicalBlockHashes[subdomain.block_height - 2]['index_block_hash'];
|
||||
const microblocks = JSON.parse(blockEntityData['microblock']);
|
||||
|
||||
const microblockIndex = microblocks.findIndex(
|
||||
(mb: any, index: any) => index > 0 && mb[1].includes(subdomain.tx_id)
|
||||
);
|
||||
|
||||
// derive from entity hash index
|
||||
subdomain.tx_index = JSON.parse(blockEntityData['microblock'])
|
||||
.flatMap((m: any) => m[1])
|
||||
.findIndex((tx: any) => tx === subdomain.tx_id);
|
||||
|
||||
const blockData = {
|
||||
index_block_hash: indexBlockHash,
|
||||
parent_index_block_hash: parentIndexBlockHash,
|
||||
microblock_hash: microblockIndex !== -1 ? microblocks[microblockIndex][0] : '',
|
||||
microblock_sequence: microblockIndex !== -1 ? microblockIndex - 1 : I32_MAX,
|
||||
microblock_canonical: true,
|
||||
};
|
||||
|
||||
dataStoreAttachment.blockData = blockData;
|
||||
dataStoreAttachment.subdomains = attachments.subdomains;
|
||||
dataStoreAttachments.push(dataStoreAttachment);
|
||||
}
|
||||
|
||||
await dbAttachmentEventBatchInserter.push(dataStoreAttachments);
|
||||
|
||||
next();
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||
logger.info({ component: 'event-replay' }, 'ATTACHMENTS_NEW events process started');
|
||||
|
||||
const attachmentsNewEvents = await dataset.attachmentsNewEvents();
|
||||
const ary: DataStoreAttachmentSubdomainData[] = [];
|
||||
const canonicalEvents = await dataset.attachmentsCanonicalEvents();
|
||||
const canonicalBlockHashes: any = await dataset.canonicalBlockHashes();
|
||||
const insert = insertInBatch(db, canonicalBlockHashes);
|
||||
|
||||
for await (const event of attachmentsNewEvents) {
|
||||
const blockData: DataStoreBnsBlockData = {
|
||||
index_block_hash: '',
|
||||
parent_index_block_hash: '',
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
};
|
||||
const dataStore: DataStoreAttachmentSubdomainData = {};
|
||||
|
||||
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(event.payload);
|
||||
const attachments = parseAttachmentMessage(attachmentMsg);
|
||||
dataStore.subdomains = attachments.subdomainObj.dbBnsSubdomain;
|
||||
|
||||
blockData.index_block_hash = attachments.subdomainObj.attachmentData.indexBlockHash;
|
||||
dataStore.blockData = blockData;
|
||||
|
||||
dataStore.attachment = attachments.subdomainObj.attachmentData;
|
||||
|
||||
ary.push(dataStore);
|
||||
await pipeline(
|
||||
Readable.from(canonicalEvents),
|
||||
insert.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
|
||||
const blockHeights = [];
|
||||
for (const el of ary) {
|
||||
if (el.subdomains!.length !== 0) {
|
||||
blockHeights.push(el.attachment!.blockHeight);
|
||||
}
|
||||
}
|
||||
|
||||
if (blockHeights.length > 0) {
|
||||
// get events from block heights
|
||||
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
|
||||
|
||||
for (const event of blockEvents) {
|
||||
for (const ds of ary) {
|
||||
if (ds.blockData?.index_block_hash === event.index_block_hash) {
|
||||
const txs = JSON.parse(event.payload).transactions;
|
||||
for (const tx of txs) {
|
||||
if (ds.attachment!.txId === tx.txid) {
|
||||
ds.blockData!.microblock_hash = tx.microblock_hash || '';
|
||||
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
|
||||
}
|
||||
}
|
||||
|
||||
ds.blockData!.index_block_hash = event.index_block_hash;
|
||||
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await db.updateBatchSubdomainsEventReplay(db.sql, ary);
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
import { Readable, Writable } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { RawEventRequestInsertValues } from '../../../datastore/common';
|
||||
import { logger } from '../../../logger';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { batchIterate } from '../../../helpers';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
|
||||
@@ -267,7 +267,7 @@ export class ReplayController {
|
||||
// NEW_BLOCK events
|
||||
await this.ingestNewBlockEvents();
|
||||
|
||||
// RAW events to event_observer_requests table
|
||||
// // RAW events to event_observer_requests table
|
||||
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
|
||||
|
||||
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
|
||||
|
||||
@@ -1081,26 +1081,9 @@ export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage
|
||||
return dbData;
|
||||
}
|
||||
|
||||
export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
|
||||
export function parseAttachment(msg: CoreNodeAttachmentMessage[]) {
|
||||
const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = [];
|
||||
|
||||
const subdomainObj: {
|
||||
attachmentData: DataStoreAttachmentData;
|
||||
dbBnsSubdomain: DbBnsSubdomain[];
|
||||
} = {
|
||||
attachmentData: {
|
||||
op: '',
|
||||
name: '',
|
||||
namespace: '',
|
||||
zonefile: '',
|
||||
zonefileHash: '',
|
||||
txId: '',
|
||||
indexBlockHash: '',
|
||||
blockHeight: 0,
|
||||
},
|
||||
dbBnsSubdomain: [],
|
||||
};
|
||||
|
||||
const subdomains: DbBnsSubdomain[] = [];
|
||||
for (const attachment of msg) {
|
||||
if (
|
||||
attachment.contract_id === BnsContractIdentifier.mainnet ||
|
||||
@@ -1149,25 +1132,11 @@ export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
|
||||
resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '',
|
||||
index_block_hash: attachment.index_block_hash,
|
||||
};
|
||||
|
||||
const attachmentData: DataStoreAttachmentData = {
|
||||
op: op,
|
||||
name: subdomain.name,
|
||||
namespace: '',
|
||||
zonefile: subdomain.zonefile,
|
||||
zonefileHash: subdomain.zonefile_hash,
|
||||
txId: subdomain.tx_id,
|
||||
indexBlockHash: subdomain.index_block_hash || '',
|
||||
blockHeight: subdomain.block_height,
|
||||
};
|
||||
|
||||
subdomainObj.dbBnsSubdomain.push(subdomain);
|
||||
subdomainObj.attachmentData = attachmentData;
|
||||
subdomains.push(subdomain);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { zoneFiles, subdomainObj };
|
||||
return { zoneFiles, subdomains };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user