feat: event-replay supporting parallel insertions

This commit is contained in:
Chris Guimaraes
2023-07-11 11:35:09 +01:00
parent 4211328438
commit f33ecee858
38 changed files with 6347 additions and 1918 deletions

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('blocks', {
index_block_hash: {
@@ -71,9 +74,9 @@ exports.up = pgm => {
},
});
pgm.createIndex('blocks', 'block_hash', { method: 'hash' });
pgm.createIndex('blocks', 'burn_block_hash', { method: 'hash' });
pgm.createIndex('blocks', 'index_block_hash', { method: 'hash' });
pgm.createIndex('blocks', 'block_hash', { method: INDEX_METHOD });
pgm.createIndex('blocks', 'burn_block_hash', { method: INDEX_METHOD });
pgm.createIndex('blocks', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('blocks', [{ name: 'block_height', sort: 'DESC' }]);
pgm.createIndex('blocks', [{ name: 'burn_block_height', sort: 'DESC' }]);
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('txs', {
id: {
@@ -164,15 +167,15 @@ exports.up = pgm => {
coinbase_alt_recipient: 'string',
});
pgm.createIndex('txs', 'tx_id', { method: 'hash' });
pgm.createIndex('txs', 'contract_call_contract_id', { method: 'hash' });
pgm.createIndex('txs', 'index_block_hash', { method: 'hash' });
pgm.createIndex('txs', 'microblock_hash', { method: 'hash' });
pgm.createIndex('txs', 'sender_address', { method: 'hash' });
pgm.createIndex('txs', 'smart_contract_contract_id', { method: 'hash' });
pgm.createIndex('txs', 'sponsor_address', { method: 'hash' });
pgm.createIndex('txs', 'token_transfer_recipient_address', { method: 'hash' });
pgm.createIndex('txs', 'coinbase_alt_recipient');
pgm.createIndex('txs', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('txs', 'contract_call_contract_id', { method: INDEX_METHOD });
pgm.createIndex('txs', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('txs', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('txs', 'sender_address', { method: INDEX_METHOD });
pgm.createIndex('txs', 'smart_contract_contract_id', { method: INDEX_METHOD });
pgm.createIndex('txs', 'sponsor_address', { method: INDEX_METHOD });
pgm.createIndex('txs', 'token_transfer_recipient_address', { method: INDEX_METHOD });
pgm.createIndex('txs', 'coinbase_alt_recipient', { method: INDEX_METHOD });
pgm.createIndex('txs', 'type_id');
pgm.createIndex('txs', [{ name: 'tx_index', sort: 'DESC' }]);
pgm.createIndex('txs', [

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('stx_events', {
id: {
@@ -58,11 +61,11 @@ exports.up = pgm => {
memo: 'bytea',
});
pgm.createIndex('stx_events', 'tx_id', { method: 'hash' });
pgm.createIndex('stx_events', 'index_block_hash', { method: 'hash' });
pgm.createIndex('stx_events', 'microblock_hash', { method: 'hash' });
pgm.createIndex('stx_events', 'sender', { method: 'hash' });
pgm.createIndex('stx_events', 'recipient', { method: 'hash' });
pgm.createIndex('stx_events', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('stx_events', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('stx_events', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('stx_events', 'sender', { method: INDEX_METHOD });
pgm.createIndex('stx_events', 'recipient', { method: INDEX_METHOD });
pgm.createIndex('stx_events', 'event_index');
pgm.createIndex('stx_events', [{ name: 'block_height', sort: 'DESC' }]);

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('ft_events', {
id: {
@@ -61,11 +64,11 @@ exports.up = pgm => {
recipient: 'string',
});
pgm.createIndex('ft_events', 'tx_id', { method: 'hash' });
pgm.createIndex('ft_events', 'index_block_hash', { method: 'hash' });
pgm.createIndex('ft_events', 'microblock_hash', { method: 'hash' });
pgm.createIndex('ft_events', 'sender', { method: 'hash' });
pgm.createIndex('ft_events', 'recipient', { method: 'hash' });
pgm.createIndex('ft_events', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('ft_events', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('ft_events', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('ft_events', 'sender', { method: INDEX_METHOD });
pgm.createIndex('ft_events', 'recipient', { method: INDEX_METHOD });
pgm.createIndex('ft_events', 'event_index');
pgm.createIndex('ft_events', [{ name: 'block_height', sort: 'DESC' }]);

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('nft_events', {
id: {
@@ -61,11 +64,11 @@ exports.up = pgm => {
recipient: 'string',
});
pgm.createIndex('nft_events', 'tx_id', { method: 'hash' });
pgm.createIndex('nft_events', 'index_block_hash', { method: 'hash' });
pgm.createIndex('nft_events', 'microblock_hash', { method: 'hash' });
pgm.createIndex('nft_events', 'sender', { method: 'hash' });
pgm.createIndex('nft_events', 'recipient', { method: 'hash' });
pgm.createIndex('nft_events', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('nft_events', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('nft_events', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('nft_events', 'sender', { method: INDEX_METHOD });
pgm.createIndex('nft_events', 'recipient', { method: INDEX_METHOD });
pgm.createIndex('nft_events', 'event_index');
pgm.createIndex('nft_events', ['asset_identifier', 'value']);
pgm.createIndex('nft_events', 'asset_identifier', { where: 'asset_event_type_id = 2', method: 'hash' }); // Mints

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('contract_logs', {
id: {
@@ -59,8 +62,8 @@ exports.up = pgm => {
},
});
pgm.createIndex('contract_logs', 'tx_id', { method: 'hash' });
pgm.createIndex('contract_logs', 'index_block_hash', { method: 'hash' });
pgm.createIndex('contract_logs', 'microblock_hash', { method: 'hash' });
pgm.createIndex('contract_logs', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('contract_logs', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('contract_logs', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('contract_logs', 'event_index');
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('smart_contracts', {
id: {
@@ -54,7 +57,7 @@ exports.up = pgm => {
},
});
pgm.createIndex('smart_contracts', 'index_block_hash', { method: 'hash' });
pgm.createIndex('smart_contracts', 'microblock_hash', { method: 'hash' });
pgm.createIndex('smart_contracts', 'contract_id', { method: 'hash' });
pgm.createIndex('smart_contracts', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('smart_contracts', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('smart_contracts', 'contract_id', { method: INDEX_METHOD });
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('faucet_requests', {
id: {
@@ -23,5 +26,5 @@ exports.up = pgm => {
},
});
pgm.createIndex('faucet_requests', 'address', { method: 'hash' });
pgm.createIndex('faucet_requests', 'address', { method: INDEX_METHOD });
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('mempool_txs', {
id: {
@@ -103,13 +106,13 @@ exports.up = pgm => {
}
});
pgm.createIndex('mempool_txs', 'tx_id', { method: 'hash' });
pgm.createIndex('mempool_txs', 'contract_call_contract_id', { method: 'hash' });
pgm.createIndex('mempool_txs', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', 'contract_call_contract_id', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', 'nonce');
pgm.createIndex('mempool_txs', 'sender_address', { method: 'hash' });
pgm.createIndex('mempool_txs', 'smart_contract_contract_id', { method: 'hash' });
pgm.createIndex('mempool_txs', 'sponsor_address', { method: 'hash' });
pgm.createIndex('mempool_txs', 'token_transfer_recipient_address', { method: 'hash' });
pgm.createIndex('mempool_txs', 'sender_address', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', 'smart_contract_contract_id', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', 'sponsor_address', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', 'token_transfer_recipient_address', { method: INDEX_METHOD });
pgm.createIndex('mempool_txs', [{ name: 'receipt_time', sort: 'DESC' }]);
pgm.createIndex('mempool_txs', ['type_id', 'receipt_block_height'], { where: 'pruned = false'});
pgm.createIndex('mempool_txs', ['type_id', 'fee_rate'], { where: 'pruned = false'});

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('stx_lock_events', {
id: {
@@ -69,10 +72,10 @@ exports.up = pgm => {
{ name: 'tx_index', sort: 'DESC' },
]);
pgm.createIndex('stx_lock_events', 'tx_id', { method: 'hash' });
pgm.createIndex('stx_lock_events', 'index_block_hash', { method: 'hash' });
pgm.createIndex('stx_lock_events', 'microblock_hash', { method: 'hash' });
pgm.createIndex('stx_lock_events', 'locked_address', { method: 'hash' });
pgm.createIndex('stx_lock_events', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('stx_lock_events', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('stx_lock_events', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('stx_lock_events', 'locked_address', { method: INDEX_METHOD });
pgm.createIndex('stx_lock_events', [{ name: 'block_height', sort: 'DESC' }]);
pgm.createIndex('stx_lock_events', [{ name: 'unlock_height', sort: 'DESC' }]);
pgm.createIndex('stx_lock_events', 'contract_name');

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('miner_rewards', {
id: {
@@ -50,8 +53,8 @@ exports.up = pgm => {
}
});
pgm.createIndex('miner_rewards', 'index_block_hash', { method: 'hash' });
pgm.createIndex('miner_rewards', 'recipient', { method: 'hash' });
pgm.createIndex('miner_rewards', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('miner_rewards', 'recipient', { method: INDEX_METHOD });
pgm.createIndex('miner_rewards', 'miner_address');
pgm.createIndex('miner_rewards', [{ name: 'mature_block_height', sort: 'DESC' }]);
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('burnchain_rewards', {
id: {
@@ -35,7 +38,7 @@ exports.up = pgm => {
},
});
pgm.createIndex('burnchain_rewards', 'burn_block_hash', { method: 'hash' });
pgm.createIndex('burnchain_rewards', 'reward_recipient', { method: 'hash' });
pgm.createIndex('burnchain_rewards', 'burn_block_hash', { method: INDEX_METHOD });
pgm.createIndex('burnchain_rewards', 'reward_recipient', { method: INDEX_METHOD });
pgm.createIndex('burnchain_rewards', [{ name: 'burn_block_height', sort: 'DESC' }]);
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('namespaces', {
id: {
@@ -88,7 +91,7 @@ exports.up = pgm => {
},
});
pgm.createIndex('namespaces', 'index_block_hash');
pgm.createIndex('namespaces', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('namespaces', [
{ name: 'ready_block', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('names', {
id: {
@@ -81,8 +84,8 @@ exports.up = pgm => {
},
});
pgm.createIndex('names', 'namespace_id');
pgm.createIndex('names', 'index_block_hash');
pgm.createIndex('names', 'namespace_id', { method: INDEX_METHOD });
pgm.createIndex('names', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('names', [
{ name: 'registered_at', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('subdomains', {
id: {
@@ -80,8 +83,8 @@ exports.up = pgm => {
},
});
pgm.createIndex('subdomains', 'name');
pgm.createIndex('subdomains', 'index_block_hash');
pgm.createIndex('subdomains', 'name', { method: INDEX_METHOD });
pgm.createIndex('subdomains', 'index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('subdomains', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('reward_slot_holders', {
id: {
@@ -27,6 +30,6 @@ exports.up = pgm => {
},
});
pgm.createIndex('reward_slot_holders', 'burn_block_hash', { method: 'hash' });
pgm.createIndex('reward_slot_holders', 'burn_block_hash', { method: INDEX_METHOD });
pgm.createIndex('reward_slot_holders', [{ name: 'burn_block_height', sort: 'DESC' }]);
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('token_offering_locked', {
id: {
@@ -19,5 +22,5 @@ exports.up = pgm => {
},
});
pgm.createIndex('token_offering_locked', 'address', { method: 'hash' });
pgm.createIndex('token_offering_locked', 'address', { method: INDEX_METHOD });
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('microblocks', {
id: {
@@ -26,7 +29,7 @@ exports.up = pgm => {
type: 'integer',
notNull: true,
},
// For the first microblock (sequence number 0), this points to the parent/anchor block hash,
// For the first microblock (sequence number 0), this points to the parent/anchor block hash,
// for subsequent microblocks it points to the previous microblock's hash.
microblock_parent_hash: {
type: 'bytea',
@@ -70,8 +73,8 @@ exports.up = pgm => {
}
});
pgm.createIndex('microblocks', 'microblock_hash', { method: 'hash' });
pgm.createIndex('microblocks', 'parent_index_block_hash', { method: 'hash' });
pgm.createIndex('microblocks', 'microblock_hash', { method: INDEX_METHOD });
pgm.createIndex('microblocks', 'parent_index_block_hash', { method: INDEX_METHOD });
pgm.createIndex('microblocks', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' }

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('nft_metadata', {
id: {
@@ -20,14 +23,14 @@ exports.up = pgm => {
image_uri: {
type: 'string',
notNull: true,
},
},
image_canonical_uri: {
type: 'string',
type: 'string',
notNull: true,
},
},
contract_id: {
type: 'string',
notNull: true,
type: 'string',
notNull: true,
unique: true,
},
tx_id: {
@@ -35,10 +38,10 @@ exports.up = pgm => {
notNull: true,
},
sender_address: {
type: 'string',
notNull: true,
type: 'string',
notNull: true,
}
});
pgm.createIndex('nft_metadata', 'contract_id', { method: 'hash' });
pgm.createIndex('nft_metadata', 'contract_id', { method: INDEX_METHOD });
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('ft_metadata', {
id: {
@@ -20,33 +23,33 @@ exports.up = pgm => {
image_uri: {
type: 'string',
notNull: true,
},
},
image_canonical_uri: {
type: 'string',
type: 'string',
notNull: true,
},
},
contract_id: {
type: 'string',
notNull: true,
type: 'string',
notNull: true,
unique: true,
},
symbol: {
type: 'string',
notNull: true,
type: 'string',
notNull: true,
},
decimals: {
type: 'integer',
notNull: true,
type: 'integer',
notNull: true,
},
tx_id: {
type: 'bytea',
notNull: true,
},
sender_address: {
type: 'string',
notNull: true,
type: 'string',
notNull: true,
}
});
pgm.createIndex('ft_metadata', 'contract_id', { method: 'hash' });
pgm.createIndex('ft_metadata', 'contract_id', { method: INDEX_METHOD });
}

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
pgm.createTable('zonefiles', {
id: {
@@ -27,7 +30,7 @@ exports.up = pgm => {
}
});
pgm.addIndex('zonefiles', 'zonefile_hash');
pgm.addIndex('zonefiles', 'zonefile_hash', { method: INDEX_METHOD });
pgm.addConstraint(
'zonefiles',
'unique_name_zonefile_hash_tx_id_index_block_hash',

View File

@@ -1,4 +1,7 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
exports.up = pgm => {
/**
* Stores all `tx_id`s of transactions that affect a principal's STX balance since that cannot be
@@ -47,8 +50,8 @@ exports.up = pgm => {
},
});
pgm.createIndex('principal_stx_txs', 'tx_id', { method: 'hash' });
pgm.createIndex('principal_stx_txs', 'principal', { method: 'hash' });
pgm.createIndex('principal_stx_txs', 'tx_id', { method: INDEX_METHOD });
pgm.createIndex('principal_stx_txs', 'principal', { method: INDEX_METHOD });
pgm.createIndex('principal_stx_txs', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },

6440
package-lock.json generated

File diff suppressed because it is too large Load Diff

70
run_event_replay Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env bash
#
# 1. Parsing comman line arguments
#
while getopts w: flag
do
case "${flag}" in
w) workers=${OPTARG};
esac
done
#
# 2. Prepare DB for event-replay
#
PG_USER=postgres \
PG_PASSWORD=postgres \
PG_DATABASE=stacks_blockchain_api \
PG_SCHEMA=public \
PG_PORT=5490 \
node ./lib/event-replay/parquet-based/pre-replay-db.js
wait
#
# 3. NEW_BURN_BLOCK events
#
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --new-burn-block
wait
#
# 4. ATTACHMENTS_NEW events
#
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --attachment-new
wait
#
# 5. NEW_BLOCK events
#
# Generate ID chunks in accordance to workers amount
node ./lib/event-replay/parquet-based/gen-ids-file.js --workers=${workers}
wait
id_files=(./events/new_block/ids_*.txt)
for id_file in "${id_files[@]}"
do
NODE_OPTIONS=--max-old-space-size=8192 \
STACKS_CHAIN_ID=0x00000001 \
node ./lib/index.js from-parquet-events --new-block --ids-path=${id_file} &
done
wait
#
# 6. Prepare DB for regular operations after event-replay
#
PG_USER=postgres \
PG_PASSWORD=postgres \
PG_DATABASE=stacks_blockchain_api \
PG_SCHEMA=public \
PG_PORT=5490 \
node ./lib/event-replay/parquet-based/post-replay-db.js
wait
exit 0

View File

@@ -704,6 +704,7 @@ export interface DbBnsSubdomain {
tx_id: string;
tx_index: number;
canonical: boolean;
index_block_hash?: string;
}
export interface DbConfigState {

View File

@@ -552,109 +552,32 @@ export class PgWriteStore extends PgStore {
return result.count;
}
async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
const values: BlockInsertValues[] = blocks.map(block => ({
block_hash: block.block_hash,
index_block_hash: block.index_block_hash,
parent_index_block_hash: block.parent_index_block_hash,
parent_block_hash: block.parent_block_hash,
parent_microblock_hash: block.parent_microblock_hash,
parent_microblock_sequence: block.parent_microblock_sequence,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
burn_block_hash: block.burn_block_hash,
burn_block_height: block.burn_block_height,
miner_txid: block.miner_txid,
canonical: block.canonical,
execution_cost_read_count: block.execution_cost_read_count,
execution_cost_read_length: block.execution_cost_read_length,
execution_cost_runtime: block.execution_cost_runtime,
execution_cost_write_count: block.execution_cost_write_count,
execution_cost_write_length: block.execution_cost_write_length,
}));
async insertStxEventBatch(sql: PgSqlClient, stxEvents: StxEventInsertValues[]) {
const values = stxEvents.map(s => {
const value: StxEventInsertValues = {
event_index: s.event_index,
tx_id: s.tx_id,
tx_index: s.tx_index,
block_height: s.block_height,
index_block_hash: s.index_block_hash,
parent_index_block_hash: s.parent_index_block_hash,
microblock_hash: s.microblock_hash,
microblock_sequence: s.microblock_sequence,
microblock_canonical: s.microblock_canonical,
canonical: s.canonical,
asset_event_type_id: s.asset_event_type_id,
sender: s.sender,
recipient: s.recipient,
amount: s.amount,
memo: s.memo ?? null,
};
return value;
});
await sql`
INSERT INTO blocks ${sql(values)}
INSERT INTO stx_events ${sql(values)}
`;
}
async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
canonical: mb.canonical,
microblock_canonical: mb.microblock_canonical,
microblock_hash: mb.microblock_hash,
microblock_sequence: mb.microblock_sequence,
microblock_parent_hash: mb.microblock_parent_hash,
parent_index_block_hash: mb.parent_index_block_hash,
block_height: mb.block_height,
parent_block_height: mb.parent_block_height,
parent_block_hash: mb.parent_block_hash,
index_block_hash: mb.index_block_hash,
block_hash: mb.block_hash,
parent_burn_block_height: mb.parent_burn_block_height,
parent_burn_block_hash: mb.parent_burn_block_hash,
parent_burn_block_time: mb.parent_burn_block_time,
}));
const mbResult = await sql`
INSERT INTO microblocks ${sql(values)}
`;
if (mbResult.count !== microblocks.length) {
throw new Error(
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
);
}
}
async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
const values: TxInsertValues[] = txs.map(tx => ({
tx_id: tx.tx_id,
raw_tx: tx.raw_result,
tx_index: tx.tx_index,
index_block_hash: tx.index_block_hash,
parent_index_block_hash: tx.parent_index_block_hash,
block_hash: tx.block_hash,
parent_block_hash: tx.parent_block_hash,
block_height: tx.block_height,
burn_block_time: tx.burn_block_time,
parent_burn_block_time: tx.parent_burn_block_time,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
canonical: tx.canonical,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
microblock_canonical: tx.microblock_canonical,
microblock_sequence: tx.microblock_sequence,
microblock_hash: tx.microblock_hash,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
raw_result: tx.raw_result,
event_count: tx.event_count,
execution_cost_read_count: tx.execution_cost_read_count,
execution_cost_read_length: tx.execution_cost_read_length,
execution_cost_runtime: tx.execution_cost_runtime,
execution_cost_write_count: tx.execution_cost_write_count,
execution_cost_write_length: tx.execution_cost_write_length,
}));
await sql`INSERT INTO txs ${sql(values)}`;
}
async updateBurnchainRewardSlotHolders({
burnchainBlockHash,
burnchainBlockHeight,
@@ -1726,7 +1649,7 @@ export class PgWriteStore extends PgStore {
if (result.count !== slotValues.length) {
throw new Error(`Failed to insert slot holder for ${slotValues}`);
}
};
}
async insertBurnchainRewardsBatch(sql: PgSqlClient, rewards: DbBurnchainReward[]): Promise<void> {
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
@@ -1743,10 +1666,10 @@ export class PgWriteStore extends PgStore {
INSERT into burnchain_rewards ${sql(rewardValues)}
`;
if(res.count !== rewardValues.length) {
if (res.count !== rewardValues.length) {
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
}
};
}
async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise<number> {
const values: TxInsertValues = {
@@ -3099,7 +3022,7 @@ export class PgWriteStore extends PgStore {
}
/**
* Called when a full event import is complete.
* (event-replay) Finishes DB setup after an event-replay.
*/
async finishEventReplay() {
if (!this.isEventReplay) {
@@ -3113,7 +3036,235 @@ export class PgWriteStore extends PgStore {
});
}
/** Enable or disable indexes for the provided set of tables. */
/**
* batch operations (mainly for event-replay)
*/
async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
const values: BlockInsertValues[] = blocks.map(block => ({
block_hash: block.block_hash,
index_block_hash: block.index_block_hash,
parent_index_block_hash: block.parent_index_block_hash,
parent_block_hash: block.parent_block_hash,
parent_microblock_hash: block.parent_microblock_hash,
parent_microblock_sequence: block.parent_microblock_sequence,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
burn_block_hash: block.burn_block_hash,
burn_block_height: block.burn_block_height,
miner_txid: block.miner_txid,
canonical: block.canonical,
execution_cost_read_count: block.execution_cost_read_count,
execution_cost_read_length: block.execution_cost_read_length,
execution_cost_runtime: block.execution_cost_runtime,
execution_cost_write_count: block.execution_cost_write_count,
execution_cost_write_length: block.execution_cost_write_length,
}));
await sql`
INSERT INTO blocks ${sql(values)}
`;
}
async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
canonical: mb.canonical,
microblock_canonical: mb.microblock_canonical,
microblock_hash: mb.microblock_hash,
microblock_sequence: mb.microblock_sequence,
microblock_parent_hash: mb.microblock_parent_hash,
parent_index_block_hash: mb.parent_index_block_hash,
block_height: mb.block_height,
parent_block_height: mb.parent_block_height,
parent_block_hash: mb.parent_block_hash,
index_block_hash: mb.index_block_hash,
block_hash: mb.block_hash,
parent_burn_block_height: mb.parent_burn_block_height,
parent_burn_block_hash: mb.parent_burn_block_hash,
parent_burn_block_time: mb.parent_burn_block_time,
}));
const mbResult = await sql`
INSERT INTO microblocks ${sql(values)}
`;
if (mbResult.count !== microblocks.length) {
throw new Error(
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
);
}
}
// alias to insertMicroblock
async insertMicroblockBatch(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
return this.insertMicroblock(sql, microblocks);
}
async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
const values: TxInsertValues[] = txs.map(tx => ({
tx_id: tx.tx_id,
raw_tx: tx.raw_result,
tx_index: tx.tx_index,
index_block_hash: tx.index_block_hash,
parent_index_block_hash: tx.parent_index_block_hash,
block_hash: tx.block_hash,
parent_block_hash: tx.parent_block_hash,
block_height: tx.block_height,
burn_block_time: tx.burn_block_time,
parent_burn_block_time: tx.parent_burn_block_time,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
canonical: tx.canonical,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
microblock_canonical: tx.microblock_canonical,
microblock_sequence: tx.microblock_sequence,
microblock_hash: tx.microblock_hash,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
raw_result: tx.raw_result,
event_count: tx.event_count,
execution_cost_read_count: tx.execution_cost_read_count,
execution_cost_read_length: tx.execution_cost_read_length,
execution_cost_runtime: tx.execution_cost_runtime,
execution_cost_write_count: tx.execution_cost_write_count,
execution_cost_write_length: tx.execution_cost_write_length,
}));
await sql`INSERT INTO txs ${sql(values)}`;
}
async insertPrincipalStxTxsBatch(sql: PgSqlClient, values: PrincipalStxTxsInsertValues[]) {
await sql`
INSERT INTO principal_stx_txs ${sql(values)}
`;
}
async insertContractEventBatch(sql: PgSqlClient, values: SmartContractEventInsertValues[]) {
await sql`
INSERT INTO contract_logs ${sql(values)}
`;
}
async insertFtEventBatch(sql: PgSqlClient, values: FtEventInsertValues[]) {
await sql`
INSERT INTO ft_events ${sql(values)}
`;
}
async insertNftEventBatch(sql: PgSqlClient, values: NftEventInsertValues[]) {
await sql`INSERT INTO nft_events ${sql(values)}`;
}
async insertNameBatch(sql: PgSqlClient, values: BnsNameInsertValues[]) {
await sql`
INSERT INTO names ${sql(values)}
`;
}
async insertNamespace(
sql: PgSqlClient,
blockData: {
index_block_hash: string;
parent_index_block_hash: string;
microblock_hash: string;
microblock_sequence: number;
microblock_canonical: boolean;
},
bnsNamespace: DbBnsNamespace
) {
const values: BnsNamespaceInsertValues = {
namespace_id: bnsNamespace.namespace_id,
launched_at: bnsNamespace.launched_at ?? null,
address: bnsNamespace.address,
reveal_block: bnsNamespace.reveal_block,
ready_block: bnsNamespace.ready_block,
buckets: bnsNamespace.buckets,
base: bnsNamespace.base.toString(),
coeff: bnsNamespace.coeff.toString(),
nonalpha_discount: bnsNamespace.nonalpha_discount.toString(),
no_vowel_discount: bnsNamespace.no_vowel_discount.toString(),
lifetime: bnsNamespace.lifetime,
status: bnsNamespace.status ?? null,
tx_index: bnsNamespace.tx_index,
tx_id: bnsNamespace.tx_id,
canonical: bnsNamespace.canonical,
index_block_hash: blockData.index_block_hash,
parent_index_block_hash: blockData.parent_index_block_hash,
microblock_hash: blockData.microblock_hash,
microblock_sequence: blockData.microblock_sequence,
microblock_canonical: blockData.microblock_canonical,
};
await sql`
INSERT INTO namespaces ${sql(values)}
`;
}
async insertZonefileBatch(sql: PgSqlClient, values: BnsZonefileInsertValues[]) {
await sql`
INSERT INTO zonefiles ${sql(values)}
`;
}
async updateBatchSubdomainsEventReplay(
sql: PgSqlClient,
data: DataStoreAttachmentSubdomainData[]
): Promise<void> {
const subdomainValues: BnsSubdomainInsertValues[] = [];
for (const dataItem of data) {
if (dataItem.subdomains && dataItem.blockData) {
for (const subdomain of dataItem.subdomains) {
subdomainValues.push({
name: subdomain.name,
namespace_id: subdomain.namespace_id,
fully_qualified_subdomain: subdomain.fully_qualified_subdomain,
owner: subdomain.owner,
zonefile_hash: validateZonefileHash(subdomain.zonefile_hash),
parent_zonefile_hash: subdomain.parent_zonefile_hash,
parent_zonefile_index: subdomain.parent_zonefile_index,
block_height: subdomain.block_height,
tx_index: subdomain.tx_index,
zonefile_offset: subdomain.zonefile_offset,
resolver: subdomain.resolver,
canonical: subdomain.canonical,
tx_id: subdomain.tx_id,
index_block_hash: dataItem.blockData.index_block_hash,
parent_index_block_hash: dataItem.blockData.parent_index_block_hash,
microblock_hash: dataItem.blockData.microblock_hash,
microblock_sequence: dataItem.blockData.microblock_sequence,
microblock_canonical: dataItem.blockData.microblock_canonical,
});
}
}
}
if (subdomainValues.length === 0) {
return;
}
const result = await sql`
INSERT INTO subdomains ${sql(subdomainValues)}
`;
if (result.count !== subdomainValues.length) {
throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`);
}
}
/**
* (event-replay) Enable or disable indexes for the provided set of tables.
*/
async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise<void> {
const tableSchema = this.sql.options.connection.search_path ?? 'public';
const result = await sql`

View File

@@ -1,20 +1,23 @@
import { Database, QueryResult } from "duckdb";
import { Database, QueryResult, TableData } from 'duckdb';
export class DatasetStore {
private readonly db;
constructor() {
this.db = new Database(':memory:');
};
}
static async connect(): Promise<DatasetStore> {
static connect(): DatasetStore {
return new DatasetStore();
};
}
newBlockEventsIds = () => {
var con = this.db.connect();
return new Promise((resolve) => {
//
// NEW_BLOCK canonical payload manipulation/queries
//
newBlockEventsIds = (): Promise<number[]> => {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
"SELECT ID FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
(err: any, result: any) => {
@@ -22,27 +25,47 @@ export class DatasetStore {
throw err;
}
let res = result.map((a: any) => a.id); // extract IDs as an Array
const res: number[] = result.map((a: { id: number }) => a.id); // extract IDs as an Array
resolve(res);
}
);
});
};
newBlockEventsOrderedPayloadStream = (): Promise<QueryResult> => {
return new Promise(async (resolve) => {
var con = this.db.connect();
newBlockEventsPayloadStream = (ids: number[]): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
const res = con.stream(
"SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') ORDER BY id",
`SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
);
resolve(res);
});
};
getNewBlockEventsInBlockHeights = (blockHeights: number[]): Promise<TableData> => {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
`SELECT * FROM READ_PARQUET('events/new_block/*.parquet') WHERE block_height IN (${blockHeights})`,
(err: any, res: any) => {
if (err) {
throw err;
}
resolve(res);
}
);
});
};
//
// NEW_BURN_BLOCK EVENTS
//
newBurnBlockEventsOrdered = () => {
return new Promise((resolve) => {
var con = this.db.connect();
return new Promise(resolve => {
const con = this.db.connect();
con.all(
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet') ORDER BY id",
(err: any, result: any) => {
@@ -55,4 +78,24 @@ export class DatasetStore {
);
});
};
};
//
// ATTACHMENTS_NEW EVENTS
//
attachmentsNewEvents = (): Promise<TableData> => {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
"SELECT payload FROM READ_PARQUET('events/attachments/new/*.parquet')",
(err: any, result: any) => {
if (err) {
throw err;
}
resolve(result);
}
);
});
};
}

View File

@@ -1,74 +1,102 @@
import * as tty from 'tty';
import * as fs from 'fs';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
import { insertNewBlockEvents } from './importers/new_block_importer';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processNewBlockEvents } from './importers/new-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { DatasetStore } from './dataset/store';
const MIGRATIONS_TABLE = 'pgmigrations';
const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => {
/**
*
*/
const ingestNewBurnBlock = async () => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'import-events',
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = await DatasetStore.connect();
if (wipeDB) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
const dataset = DatasetStore.connect();
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error(
`DB migration cycle failed, possibly due to an incompatible API version upgrade. Add --wipe-db --force or perform a manual DB wipe before importing.`
);
}
let tables: string[] = [];
if (disableIndexes) {
// Get DB tables
const dbName = db.sql.options.database; // stacks-blockchain-api
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
logger.error(errorMsg);
throw new Error(errorMsg);
}
tables = tablesQuery.map(r => r.tablename);
// Disable indexing and constraints on tables to speed up insertion
logger.info(`Disable indexes on tables: ${tables.join(', ')}`);
db.toggleTableIndexes(db.sql, tables, false);
}
try {
await Promise.all([
insertNewBurnBlockEvents(db, dataset, timeTracker),
insertNewBlockEvents(db, dataset, timeTracker)
]);
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
await processNewBurnBlockEvents(db, dataset);
});
} catch (err) {
throw err;
} finally {
if (disableIndexes) {
logger.info(`Enable indexes on tables: ${tables.join(', ')}`);
db.toggleTableIndexes(db.sql, tables, true);
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
await db.close();
}
};
/**
*
*/
const ingestAttachmentNew = async () => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
try {
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
await processAttachmentNewEvents(db, dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
await db.close();
}
};
const ingestNewBlock = async (idsPath?: string) => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
try {
const idsFileContent = fs.readFileSync(`${idsPath}`, 'utf-8');
const ids = idsFileContent.split(/\r?\n/);
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
await processNewBlockEvents(db, dataset, ids);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
@@ -76,6 +104,6 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
}
};
export { run };
export { ingestNewBlock, ingestAttachmentNew, ingestNewBurnBlock };

View File

@@ -0,0 +1,44 @@
import * as fs from 'fs';
import { logger } from '../../logger';
import { DatasetStore } from './dataset/store';
import { splitIntoChunks } from './helpers';
(async () => {
const args = process.argv.slice(2);
const workers: number = Number(args[0].split('=')[1]);
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
const dir = './events/new_block';
const dataset = DatasetStore.connect();
const ids: number[] = await dataset.newBlockEventsIds();
const batchSize = Math.ceil(ids.length / workers);
const chunks = splitIntoChunks(ids, batchSize);
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
// delete previous files
files.map(
file =>
new Promise((resolve, reject) => {
try {
fs.unlinkSync(`${dir}/${file}`);
} catch (err) {
throw err;
}
})
);
// create id files
chunks.forEach((chunk, idx) => {
const filename = `./events/new_block/ids_${idx + 1}.txt`;
chunk.forEach(id => {
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
});
});
})().catch(err => {
throw new err();
});

View File

@@ -1,12 +1,12 @@
interface TimeTracker {
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
trackSync<T = void>(name: string, fn: () => T): T;
getDurations: (
roundDecimals?: number
) => {
name: string;
seconds: string;
}[];
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
trackSync<T = void>(name: string, fn: () => T): T;
getDurations: (
roundDecimals?: number
) => {
name: string;
seconds: string;
}[];
}
const createTimeTracker = (): TimeTracker => {
@@ -49,6 +49,40 @@ const createTimeTracker = (): TimeTracker => {
});
},
};
};
interface Stopwatch {
/** Milliseconds since stopwatch was created. */
getElapsed: () => number;
/** Seconds since stopwatch was created. */
getElapsedSeconds: (roundDecimals?: number) => number;
getElapsedAndRestart: () => number;
restart(): void;
}
function stopwatch(): Stopwatch {
let start = process.hrtime.bigint();
const result: Stopwatch = {
getElapsedSeconds: (roundDecimals?: number) => {
const elapsedMs = result.getElapsed();
const seconds = elapsedMs / 1000;
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
},
getElapsed: () => {
const end = process.hrtime.bigint();
return Number((end - start) / 1_000_000n);
},
getElapsedAndRestart: () => {
const end = process.hrtime.bigint();
const result = Number((end - start) / 1_000_000n);
start = process.hrtime.bigint();
return result;
},
restart: () => {
start = process.hrtime.bigint();
},
};
return result;
}
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
@@ -57,8 +91,8 @@ function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
}
}
const splitIntoChunks = async (data: object[], chunk_size: number) => {
const splitIntoChunks = (data: number[], chunk_size: number) => {
return [...chunks(data, chunk_size)];
};
export { TimeTracker, createTimeTracker, splitIntoChunks };
export { createTimeTracker, splitIntoChunks };

View File

@@ -0,0 +1,66 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { parseAttachmentMessage } from '../../../event-stream/event-server';
import { logger } from '../../../logger';
import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-message';
import { DataStoreAttachmentSubdomainData, DataStoreBnsBlockData } from '../../../datastore/common';
import { DatasetStore } from '../dataset/store';
import { I32_MAX } from '../../../helpers';
export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'ATTACHMENTS_NEW events process started');
const attachmentsNewEvents = await dataset.attachmentsNewEvents();
const ary: DataStoreAttachmentSubdomainData[] = [];
for await (const event of attachmentsNewEvents) {
const blockData: DataStoreBnsBlockData = {
index_block_hash: '',
parent_index_block_hash: '',
microblock_hash: '',
microblock_sequence: I32_MAX,
microblock_canonical: true,
};
const dataStore: DataStoreAttachmentSubdomainData = {};
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(event.payload);
const attachments = parseAttachmentMessage(attachmentMsg);
dataStore.subdomains = attachments.subdomainObj.dbBnsSubdomain;
blockData.index_block_hash = attachments.subdomainObj.attachmentData.indexBlockHash;
dataStore.blockData = blockData;
dataStore.attachment = attachments.subdomainObj.attachmentData;
ary.push(dataStore);
}
const blockHeights = [];
for (const el of ary) {
if (el.subdomains!.length !== 0) {
blockHeights.push(el.attachment!.blockHeight);
}
}
// get events from block heights
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
for (const event of blockEvents) {
for (const ds of ary) {
if (ds.blockData?.index_block_hash === event.index_block_hash) {
const txs = JSON.parse(event.payload).transactions;
for (const tx of txs) {
if (ds.attachment!.txId === tx.txid) {
ds.blockData!.microblock_hash = tx.microblock_hash || '';
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
}
}
ds.blockData!.index_block_hash = event.index_block_hash;
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
}
}
}
await db.updateBatchSubdomainsEventReplay(db.sql, ary);
};

View File

@@ -0,0 +1,445 @@
import { Readable, Writable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { parseNewBlockMessage } from '../../../event-stream/event-server';
import {
DbBlock,
DbMicroblock,
DbTx,
SmartContractEventInsertValues,
StxEventInsertValues,
PrincipalStxTxsInsertValues,
FtEventInsertValues,
NftEventInsertValues,
BnsNameInsertValues,
BnsZonefileInsertValues,
DataStoreBlockUpdateData,
} from '../../../datastore/common';
import { validateZonefileHash } from '../../../datastore/helpers';
import { logger } from '../../../logger';
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
import { DatasetStore } from '../dataset/store';
const chainID = getApiConfiguredChainID();
const batchInserters: BatchInserter[] = [];
interface BatchInserter<T = any> {
push(entries: T[]): Promise<void>;
flush(): Promise<void>;
}
function createBatchInserter<T>({
batchSize,
insertFn,
}: {
batchSize: number;
insertFn: (entries: T[]) => Promise<void>;
}): BatchInserter<T> {
let entryBuffer: T[] = [];
return {
async push(entries: T[]) {
entries.length === 1
? entryBuffer.push(entries[0])
: entries.forEach(e => entryBuffer.push(e));
if (entryBuffer.length === batchSize) {
await insertFn(entryBuffer);
entryBuffer.length = 0;
} else if (entryBuffer.length > batchSize) {
for (const batch of batchIterate(entryBuffer, batchSize)) {
await insertFn(batch);
}
entryBuffer.length = 0;
}
},
async flush() {
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
if (entryBuffer.length > 0) {
await insertFn(entryBuffer);
entryBuffer = [];
}
},
};
}
const populateBatchInserters = (db: PgWriteStore) => {
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into blocks table...');
return db.insertBlockBatch(db.sql, entries);
},
});
batchInserters.push(dbBlockBatchInserter);
const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into microblocks table...');
return db.insertMicroblockBatch(db.sql, entries);
},
});
batchInserters.push(dbMicroblockBatchInserter);
const dbTxBatchInserter = createBatchInserter<DbTx>({
batchSize: 1400,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into txs table...');
return db.insertTxBatch(db.sql, entries);
},
});
batchInserters.push(dbTxBatchInserter);
const dbStxEventBatchInserter = createBatchInserter<StxEventInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into stx_events table...');
return db.insertStxEventBatch(db.sql, entries);
},
});
batchInserters.push(dbStxEventBatchInserter);
const dbPrincipalStxTxBatchInserter = createBatchInserter<PrincipalStxTxsInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into stx_events table...');
return db.insertPrincipalStxTxsBatch(db.sql, entries);
},
});
batchInserters.push(dbPrincipalStxTxBatchInserter);
const dbContractEventBatchInserter = createBatchInserter<SmartContractEventInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into contract_logs table...');
return db.insertContractEventBatch(db.sql, entries);
},
});
batchInserters.push(dbContractEventBatchInserter);
const dbFtEventBatchInserter = createBatchInserter<FtEventInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into ft_events table...');
return db.insertFtEventBatch(db.sql, entries);
},
});
batchInserters.push(dbFtEventBatchInserter);
const dbNftEventBatchInserter = createBatchInserter<NftEventInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into nft_events table...');
return db.insertNftEventBatch(db.sql, entries);
},
});
batchInserters.push(dbNftEventBatchInserter);
const dbNameBatchInserter = createBatchInserter<BnsNameInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into names table...');
return db.insertNameBatch(db.sql, entries);
},
});
batchInserters.push(dbNameBatchInserter);
const dbZonefileBatchInserter = createBatchInserter<BnsZonefileInsertValues>({
batchSize: 500,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into zonefiles table...');
return db.insertZonefileBatch(db.sql, entries);
},
});
batchInserters.push(dbZonefileBatchInserter);
return new Writable({
objectMode: true,
write: async (data: CoreNodeBlockMessage, _encoding, next) => {
let dbData: DataStoreBlockUpdateData;
try {
dbData = parseNewBlockMessage(chainID, data);
} catch (err) {
logger.error({ component: 'event-replay' }, 'Error when parsing new_block event');
console.error(err);
throw err;
}
const insertTxs = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbTxBatchInserter.push([entry.tx]);
}
};
const insertContractLogs = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbContractEventBatchInserter.push(
entry.contractLogEvents.map((contractEvent: any) => ({
event_index: contractEvent.event_index,
tx_id: contractEvent.tx_id,
tx_index: contractEvent.tx_index,
block_height: contractEvent.block_height,
index_block_hash: entry.tx.index_block_hash,
parent_index_block_hash: entry.tx.parent_index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
microblock_canonical: entry.tx.microblock_canonical,
canonical: contractEvent.canonical,
contract_identifier: contractEvent.contract_identifier,
topic: contractEvent.topic,
value: contractEvent.value,
}))
);
}
};
const insertStxEvents = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
for (const stxEvent of entry.stxEvents) {
await dbStxEventBatchInserter.push([
{
...stxEvent,
index_block_hash: entry.tx.index_block_hash,
parent_index_block_hash: entry.tx.parent_index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
microblock_canonical: entry.tx.microblock_canonical,
sender: stxEvent.sender ?? null,
recipient: stxEvent.recipient ?? null,
amount: stxEvent.amount ?? null,
memo: stxEvent.memo ?? null,
},
]);
}
}
};
const insertPrincipalStxTxs = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
// string key: `principal, tx_id, index_block_hash, microblock_hash`
const alreadyInsertedRowKeys = new Set<string>();
const values: PrincipalStxTxsInsertValues[] = [];
const push = (principal: string) => {
// Check if this row has already been inserted by comparing the same columns used in the
// sql unique constraint defined on the table. This prevents later errors during re-indexing
// when the table indexes/constraints are temporarily disabled during inserts.
const constraintKey = `${principal},${entry.tx.tx_id},${entry.tx.index_block_hash},${entry.tx.microblock_hash}`;
if (!alreadyInsertedRowKeys.has(constraintKey)) {
alreadyInsertedRowKeys.add(constraintKey);
values.push({
principal: principal,
tx_id: entry.tx.tx_id,
block_height: entry.tx.block_height,
index_block_hash: entry.tx.index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
tx_index: entry.tx.tx_index,
canonical: entry.tx.canonical,
microblock_canonical: entry.tx.microblock_canonical,
});
}
};
const principals = new Set<string>();
// Insert tx data
[
entry.tx.sender_address,
entry.tx.token_transfer_recipient_address,
entry.tx.contract_call_contract_id,
entry.tx.smart_contract_contract_id,
]
.filter((p): p is string => !!p)
.forEach(p => principals.add(p));
// Insert stx_event data
entry.stxEvents.forEach((event: any) => {
if (event.sender) {
principals.add(event.sender);
}
if (event.recipient) {
principals.add(event.recipient);
}
});
principals.forEach(principal => push(principal));
await dbPrincipalStxTxBatchInserter.push(values);
}
};
const insertFTEvents = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbFtEventBatchInserter.push(
entry.ftEvents.map((ftEvent: any) => ({
event_index: ftEvent.event_index,
tx_id: ftEvent.tx_id,
tx_index: ftEvent.tx_index,
block_height: ftEvent.block_height,
index_block_hash: entry.tx.index_block_hash,
parent_index_block_hash: entry.tx.parent_index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
microblock_canonical: entry.tx.microblock_canonical,
canonical: ftEvent.canonical,
asset_event_type_id: ftEvent.asset_event_type_id,
sender: ftEvent.sender ?? null,
recipient: ftEvent.recipient ?? null,
asset_identifier: ftEvent.asset_identifier,
amount: ftEvent.amount.toString(),
}))
);
}
};
const insertNFTEvents = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbNftEventBatchInserter.push(
entry.nftEvents.map((nftEvent: any) => ({
tx_id: nftEvent.tx_id,
index_block_hash: entry.tx.index_block_hash,
parent_index_block_hash: entry.tx.parent_index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
microblock_canonical: entry.tx.microblock_canonical,
sender: nftEvent.sender ?? null,
recipient: nftEvent.recipient ?? null,
event_index: nftEvent.event_index,
tx_index: nftEvent.tx_index,
block_height: nftEvent.block_height,
canonical: nftEvent.canonical,
asset_event_type_id: nftEvent.asset_event_type_id,
asset_identifier: nftEvent.asset_identifier,
value: nftEvent.value,
}))
);
}
};
const insertNames = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbNameBatchInserter.push(
entry.names.map((bnsName: any) => ({
name: bnsName.name,
address: bnsName.address,
registered_at: bnsName.registered_at,
expire_block: bnsName.expire_block,
zonefile_hash: validateZonefileHash(bnsName.zonefile_hash),
namespace_id: bnsName.namespace_id,
grace_period: bnsName.grace_period ?? null,
renewal_deadline: bnsName.renewal_deadline ?? null,
resolver: bnsName.resolver ?? null,
tx_id: bnsName.tx_id ?? null,
tx_index: bnsName.tx_index,
event_index: bnsName.event_index ?? null,
status: bnsName.status ?? null,
canonical: bnsName.canonical,
index_block_hash: entry.tx.index_block_hash ?? null,
parent_index_block_hash: entry.tx.parent_index_block_hash,
microblock_hash: entry.tx.microblock_hash,
microblock_sequence: entry.tx.microblock_sequence,
microblock_canonical: entry.tx.microblock_canonical,
}))
);
}
};
const insertZoneFiles = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
await dbZonefileBatchInserter.push(
entry.names.map((bnsName: any) => ({
name: bnsName.name,
zonefile: bnsName.zonefile,
zonefile_hash: validateZonefileHash(bnsName.zonefile_hash),
tx_id: bnsName.tx_id,
index_block_hash: bnsName.index_block_hash ?? null,
}))
);
}
};
const insertSmartContracts = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
for (const smartContract of entry.smartContracts) {
await db.updateSmartContract(db.sql, entry.tx, smartContract);
}
}
};
const insertNamespaces = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
for (const namespace of entry.namespaces) {
await db.insertNamespace(db.sql, entry.tx, namespace);
}
}
};
const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => {
for (const entry of dbData.txs) {
for (const stxLockEvent of entry.stxLockEvents) {
await db.updateStxLockEvent(db.sql, entry.tx, stxLockEvent);
}
}
};
await Promise.all([
// Insert blocks
dbBlockBatchInserter.push([dbData.block]),
// // Insert microblocks
dbMicroblockBatchInserter.push(dbData.microblocks),
// // Insert txs
insertTxs(dbData),
// // Insert stx_events
insertStxEvents(dbData),
// // Insert principal_stx_txs
insertPrincipalStxTxs(dbData),
// // Insert contract_logs
insertContractLogs(dbData),
// // Insert ft_events
insertFTEvents(dbData),
// // Insert nft_events
insertNFTEvents(dbData),
// // Insert names
insertNames(dbData),
// Insert zonefiles
insertZoneFiles(dbData),
// Insert smart_contracts
insertSmartContracts(dbData),
// Insert namespaces
insertNamespaces(dbData),
// Insert stx_lock_events
insertStxLockEvents(dbData),
]);
next();
},
});
};
const transformDataToJSON = () => {
return new Transform({
objectMode: true,
transform: (data, _encoding, callback) => {
callback(null, JSON.parse(data.payload));
},
});
};
export const processNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, ids?: any) => {
logger.info({ component: 'event-replay' }, 'NEW_BLOCK events process started');
const payload = await dataset.newBlockEventsPayloadStream(ids);
const toJSON = transformDataToJSON();
const insertBatchData = populateBatchInserters(db);
await pipeline(
Readable.from(payload),
toJSON,
insertBatchData.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
})
);
};

View File

@@ -2,7 +2,7 @@ import { PgWriteStore } from '../../../datastore/pg-write-store';
import { DbBurnchainReward, DbRewardSlotHolder } from '../../../datastore/common';
import { CoreNodeBurnBlockMessage } from '../../../event-stream/core-node-message';
import { logger } from '../../../logger';
import { TimeTracker, splitIntoChunks } from '../helpers';
import { splitIntoChunks } from '../helpers';
import { DatasetStore } from '../dataset/store';
const INSERT_BATCH_SIZE = 500;
@@ -42,15 +42,15 @@ const DbRewardSlotHolderParse = (payload: CoreNodeBurnBlockMessage) => {
const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: any) => {
for (const chunk of chunks) {
let burnchainRewards: DbBurnchainReward[] = [];
let slotHolders: DbRewardSlotHolder[] = [];
const burnchainRewards: DbBurnchainReward[] = [];
const slotHolders: DbRewardSlotHolder[] = [];
for (const event of chunk) {
const payload: CoreNodeBurnBlockMessage = JSON.parse(event['payload']);
const burnchainRewardsData = DbBurnchainRewardParse(payload);
const slotHoldersData = DbRewardSlotHolderParse(payload);
burnchainRewardsData.forEach(reward => burnchainRewards.push(reward));
slotHoldersData.forEach(slotHolder => slotHolders.push(slotHolder));
};
}
if (burnchainRewards.length !== 0) {
await db.insertBurnchainRewardsBatch(db.sql, burnchainRewards);
@@ -59,15 +59,14 @@ const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: an
if (slotHolders.length !== 0) {
await db.insertSlotHoldersBatch(db.sql, slotHolders);
}
};
}
};
export const insertNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
export const processNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'NEW_BURN_BLOCK events process started');
await timeTracker.track('insertNewBurnBlockEvents', async () => {
return dataset.newBurnBlockEventsOrdered()
.then(async (data: any) => await splitIntoChunks(data, INSERT_BATCH_SIZE))
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
});
return dataset
.newBurnBlockEventsOrdered()
.then((data: any) => splitIntoChunks(data, INSERT_BATCH_SIZE))
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
};

View File

@@ -1,144 +0,0 @@
import { Readable, Writable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { parseNewBlockMessage } from '../../../event-stream/event-server';
import { DbBlock, DbMicroblock, DbTx } from '../../../datastore/common';
import { logger } from '../../../logger';
import { TimeTracker } from '../helpers';
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
import { DatasetStore } from '../dataset/store';
const batchInserters: BatchInserter[] = [];
const chainID = getApiConfiguredChainID();
interface BatchInserter<T = any> {
push(entries: T[]): Promise<void>;
flush(): Promise<void>;
}
function createBatchInserter<T>({
batchSize,
insertFn,
}: {
batchSize: number;
insertFn: (entries: T[]) => Promise<void>;
}): BatchInserter<T> {
let entryBuffer: T[] = [];
return {
async push(entries: T[]) {
entries.length === 1
? entryBuffer.push(entries[0])
: entries.forEach(e => entryBuffer.push(e));
if (entryBuffer.length === batchSize) {
await insertFn(entryBuffer);
entryBuffer.length = 0;
} else if (entryBuffer.length > batchSize) {
for (const batch of batchIterate(entryBuffer, batchSize)) {
await insertFn(batch);
}
entryBuffer.length = 0;
}
},
async flush() {
logger.info('Flushing remaining data...');
if (entryBuffer.length > 0) {
await insertFn(entryBuffer);
entryBuffer = [];
}
},
};
}
const populateBatchInserters = async (db: PgWriteStore) => {
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
batchSize: 100,
insertFn: (entries) => {
logger.info('Inserting blocks...');
return db.insertBlockBatch(db.sql, entries);
},
});
batchInserters.push(dbBlockBatchInserter);
const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
batchSize: 200,
insertFn: (entries) => {
logger.info('Inserting microblocks...');
return db.insertMicroblock(db.sql, entries);
},
});
batchInserters.push(dbMicroblockBatchInserter);
const dbTxBatchInserter = createBatchInserter<DbTx>({
batchSize: 1000,
insertFn: (entries) => {
logger.info('Inserting txs...');
return db.insertTxBatch(db.sql, entries);
},
});
batchInserters.push(dbTxBatchInserter);
return new Writable({
objectMode: true,
write: async (data: CoreNodeBlockMessage, _encoding, next) => {
let dbData;
try {
dbData = parseNewBlockMessage(chainID, data);
} catch (err) {
logger.error('Error when parsing new_block event');
console.error(err);
throw err;
}
const insertTxs = async (dbData: any) => {
for (const entry of dbData.txs) {
await dbTxBatchInserter.push([entry.tx]);
}
};
await Promise.all([
// Insert blocks
dbBlockBatchInserter.push([dbData.block]),
// Insert microblocks
dbMicroblockBatchInserter.push(dbData.microblocks),
// Insert Txs
insertTxs(dbData)
]);
next();
}
});
}
const transformDataToJSON = async () => {
return new Transform({
objectMode: true,
transform: async (data, _encoding, callback) => {
callback(null, JSON.parse(data.payload));
}
});
};
export const insertNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
logger.info(`Inserting NEW_BLOCK events to db...`);
await timeTracker.track('insertNewBlockEvents', async () => {
const payload = await dataset.newBlockEventsOrderedPayloadStream();
const toJSON = await transformDataToJSON();
const insertBatchData = await populateBatchInserters(db);
await pipeline(
Readable.from(payload),
toJSON,
insertBatchData
.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
})
)
});
};

View File

@@ -0,0 +1,17 @@
import { logger } from '../../logger';
import { PgWriteStore } from '../../datastore/pg-write-store';
(async () => {
const db = await PgWriteStore.connect({
usageName: 'post-event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await db.finishEventReplay();
})().catch(err => {
throw err;
});

View File

@@ -0,0 +1,47 @@
import { logger } from '../../logger';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { PgWriteStore } from '../../datastore/pg-write-store';
const MIGRATIONS_TABLE = 'pgmigrations';
(async () => {
const db = await PgWriteStore.connect({
usageName: 'pre-event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
logger.info({ component: 'event-replay' }, 'Migrating tables');
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error('DB migration cycle failed');
}
const dbName = db.sql.options.database;
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
logger.info(
{ component: 'event-replay' },
'Disabling indexes and constraints to speed up insertion'
);
await db.toggleTableIndexes(db.sql, tables, false);
logger.info({ component: 'event-replay' }, `Indexes disabled on tables: ${tables.join(', ')}`);
})().catch(err => {
throw err;
});

View File

@@ -36,6 +36,8 @@ import {
DataStoreAttachmentData,
DbPox2Event,
DbTxStatus,
DbBnsSubdomain,
DataStoreBnsBlockData,
} from '../datastore/common';
import {
getTxSenderAddress,
@@ -58,6 +60,8 @@ import {
parseNameFromContractEvent,
parseNameRenewalWithNoZonefileHashFromContractCall,
parseNamespaceFromContractEvent,
parseZoneFileTxt,
parseResolver,
} from './bns/bns-helpers';
import { PgWriteStore } from '../datastore/pg-write-store';
import {
@@ -69,6 +73,7 @@ import { handleBnsImport } from '../import-v1';
import { Pox2ContractIdentifer } from '../pox-helpers';
import { decodePox2PrintEvent } from './pox2-event-parsing';
import { logger, loggerMiddleware } from '../logger';
import * as zoneFileParser from 'zone-file';
export const IBD_PRUNABLE_ROUTES = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks'];
@@ -1075,3 +1080,94 @@ export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage
return dbData;
}
export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = [];
const subdomainObj: {
attachmentData: DataStoreAttachmentData;
dbBnsSubdomain: DbBnsSubdomain[];
} = {
attachmentData: {
op: '',
name: '',
namespace: '',
zonefile: '',
zonefileHash: '',
txId: '',
indexBlockHash: '',
blockHeight: 0,
},
dbBnsSubdomain: [],
};
for (const attachment of msg) {
if (
attachment.contract_id === BnsContractIdentifier.mainnet ||
attachment.contract_id === BnsContractIdentifier.testnet
) {
const metadataCV = decodeClarityValue<
ClarityValueTuple<{
op: ClarityValueStringAscii;
name: ClarityValueBuffer;
namespace: ClarityValueBuffer;
}>
>(attachment.metadata);
const op = metadataCV.data['op'].data;
const zonefile = Buffer.from(attachment.content.slice(2), 'hex').toString();
const zonefileHash = attachment.content_hash;
zoneFiles.push({
zonefile,
zonefileHash,
txId: attachment.tx_id,
});
if (op === 'name-update') {
const name = hexToBuffer(metadataCV.data['name'].buffer).toString('utf8');
const namespace = hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8');
const zoneFileContents = zoneFileParser.parseZoneFile(zonefile);
const zoneFileTxt = zoneFileContents.txt;
// Case for subdomain
if (zoneFileTxt) {
for (let i = 0; i < zoneFileTxt.length; i++) {
const zoneFile = zoneFileTxt[i];
const parsedTxt = parseZoneFileTxt(zoneFile.txt);
if (parsedTxt.owner === '') continue; //if txt has no owner , skip it
const subdomain: DbBnsSubdomain = {
name: name.concat('.', namespace),
namespace_id: namespace,
fully_qualified_subdomain: zoneFile.name.concat('.', name, '.', namespace),
owner: parsedTxt.owner,
zonefile_hash: parsedTxt.zoneFileHash,
zonefile: parsedTxt.zoneFile,
tx_id: attachment.tx_id,
tx_index: -1,
canonical: true,
parent_zonefile_hash: attachment.content_hash.slice(2),
parent_zonefile_index: 0, // TODO need to figure out this field
block_height: Number.parseInt(attachment.block_height, 10),
zonefile_offset: 1,
resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '',
index_block_hash: attachment.index_block_hash,
};
const attachmentData: DataStoreAttachmentData = {
op: op,
name: subdomain.name,
namespace: '',
zonefile: subdomain.zonefile,
zonefileHash: subdomain.zonefile_hash,
txId: subdomain.tx_id,
indexBlockHash: subdomain.index_block_hash || '',
blockHeight: subdomain.block_height,
};
subdomainObj.dbBnsSubdomain.push(subdomain);
subdomainObj.attachmentData = attachmentData;
}
}
}
}
}
return { zoneFiles, subdomainObj };
}

View File

@@ -27,7 +27,11 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
import { registerMempoolPromStats } from './datastore/helpers';
import { logger } from './logger';
import { run } from './event-replay/parquet-based/event-replay';
import {
ingestAttachmentNew,
ingestNewBlock,
ingestNewBurnBlock,
} from './event-replay/parquet-based/event-replay';
enum StacksApiMode {
/**
@@ -280,9 +284,11 @@ function getProgramArgs() {
| {
operand: 'from-parquet-events';
options: {
['wipe-db']?: boolean;
['disable-indexes']?: boolean;
}
['new-burn-block']?: boolean;
['attachment-new']?: boolean;
['new-block']?: boolean;
['ids-path']?: string;
};
};
return { args, parsedOpts };
}
@@ -299,7 +305,17 @@ async function handleProgramArgs() {
args.options.force
);
} else if (args.operand === 'from-parquet-events') {
await run(args.options['wipe-db'], args.options['disable-indexes']);
if (args.options['new-burn-block']) {
await ingestNewBurnBlock();
}
if (args.options['attachment-new']) {
await ingestAttachmentNew();
}
if (args.options['new-block']) {
await ingestNewBlock(args.options['ids-path']);
}
} else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else {