mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: event-replay supporting parallel insertions
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('blocks', {
|
||||
index_block_hash: {
|
||||
@@ -71,9 +74,9 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('blocks', 'block_hash', { method: 'hash' });
|
||||
pgm.createIndex('blocks', 'burn_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('blocks', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('blocks', 'block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('blocks', 'burn_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('blocks', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('blocks', [{ name: 'block_height', sort: 'DESC' }]);
|
||||
pgm.createIndex('blocks', [{ name: 'burn_block_height', sort: 'DESC' }]);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('txs', {
|
||||
id: {
|
||||
@@ -164,15 +167,15 @@ exports.up = pgm => {
|
||||
coinbase_alt_recipient: 'string',
|
||||
});
|
||||
|
||||
pgm.createIndex('txs', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'contract_call_contract_id', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'sender_address', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'smart_contract_contract_id', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'sponsor_address', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'token_transfer_recipient_address', { method: 'hash' });
|
||||
pgm.createIndex('txs', 'coinbase_alt_recipient');
|
||||
pgm.createIndex('txs', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'contract_call_contract_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'sender_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'smart_contract_contract_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'sponsor_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'token_transfer_recipient_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'coinbase_alt_recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('txs', 'type_id');
|
||||
pgm.createIndex('txs', [{ name: 'tx_index', sort: 'DESC' }]);
|
||||
pgm.createIndex('txs', [
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('stx_events', {
|
||||
id: {
|
||||
@@ -58,11 +61,11 @@ exports.up = pgm => {
|
||||
memo: 'bytea',
|
||||
});
|
||||
|
||||
pgm.createIndex('stx_events', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('stx_events', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('stx_events', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('stx_events', 'sender', { method: 'hash' });
|
||||
pgm.createIndex('stx_events', 'recipient', { method: 'hash' });
|
||||
pgm.createIndex('stx_events', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_events', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_events', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_events', 'sender', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_events', 'recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_events', 'event_index');
|
||||
pgm.createIndex('stx_events', [{ name: 'block_height', sort: 'DESC' }]);
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('ft_events', {
|
||||
id: {
|
||||
@@ -61,11 +64,11 @@ exports.up = pgm => {
|
||||
recipient: 'string',
|
||||
});
|
||||
|
||||
pgm.createIndex('ft_events', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('ft_events', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('ft_events', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('ft_events', 'sender', { method: 'hash' });
|
||||
pgm.createIndex('ft_events', 'recipient', { method: 'hash' });
|
||||
pgm.createIndex('ft_events', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('ft_events', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('ft_events', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('ft_events', 'sender', { method: INDEX_METHOD });
|
||||
pgm.createIndex('ft_events', 'recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('ft_events', 'event_index');
|
||||
pgm.createIndex('ft_events', [{ name: 'block_height', sort: 'DESC' }]);
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('nft_events', {
|
||||
id: {
|
||||
@@ -61,11 +64,11 @@ exports.up = pgm => {
|
||||
recipient: 'string',
|
||||
});
|
||||
|
||||
pgm.createIndex('nft_events', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('nft_events', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('nft_events', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('nft_events', 'sender', { method: 'hash' });
|
||||
pgm.createIndex('nft_events', 'recipient', { method: 'hash' });
|
||||
pgm.createIndex('nft_events', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('nft_events', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('nft_events', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('nft_events', 'sender', { method: INDEX_METHOD });
|
||||
pgm.createIndex('nft_events', 'recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('nft_events', 'event_index');
|
||||
pgm.createIndex('nft_events', ['asset_identifier', 'value']);
|
||||
pgm.createIndex('nft_events', 'asset_identifier', { where: 'asset_event_type_id = 2', method: 'hash' }); // Mints
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('contract_logs', {
|
||||
id: {
|
||||
@@ -59,8 +62,8 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('contract_logs', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('contract_logs', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('contract_logs', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('contract_logs', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('contract_logs', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('contract_logs', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('contract_logs', 'event_index');
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('smart_contracts', {
|
||||
id: {
|
||||
@@ -54,7 +57,7 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('smart_contracts', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('smart_contracts', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('smart_contracts', 'contract_id', { method: 'hash' });
|
||||
pgm.createIndex('smart_contracts', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('smart_contracts', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('smart_contracts', 'contract_id', { method: INDEX_METHOD });
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('faucet_requests', {
|
||||
id: {
|
||||
@@ -23,5 +26,5 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('faucet_requests', 'address', { method: 'hash' });
|
||||
pgm.createIndex('faucet_requests', 'address', { method: INDEX_METHOD });
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('mempool_txs', {
|
||||
id: {
|
||||
@@ -103,13 +106,13 @@ exports.up = pgm => {
|
||||
}
|
||||
});
|
||||
|
||||
pgm.createIndex('mempool_txs', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'contract_call_contract_id', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', 'contract_call_contract_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', 'nonce');
|
||||
pgm.createIndex('mempool_txs', 'sender_address', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'smart_contract_contract_id', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'sponsor_address', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'token_transfer_recipient_address', { method: 'hash' });
|
||||
pgm.createIndex('mempool_txs', 'sender_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', 'smart_contract_contract_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', 'sponsor_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', 'token_transfer_recipient_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('mempool_txs', [{ name: 'receipt_time', sort: 'DESC' }]);
|
||||
pgm.createIndex('mempool_txs', ['type_id', 'receipt_block_height'], { where: 'pruned = false'});
|
||||
pgm.createIndex('mempool_txs', ['type_id', 'fee_rate'], { where: 'pruned = false'});
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('stx_lock_events', {
|
||||
id: {
|
||||
@@ -69,10 +72,10 @@ exports.up = pgm => {
|
||||
{ name: 'tx_index', sort: 'DESC' },
|
||||
]);
|
||||
|
||||
pgm.createIndex('stx_lock_events', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('stx_lock_events', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('stx_lock_events', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('stx_lock_events', 'locked_address', { method: 'hash' });
|
||||
pgm.createIndex('stx_lock_events', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_lock_events', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_lock_events', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_lock_events', 'locked_address', { method: INDEX_METHOD });
|
||||
pgm.createIndex('stx_lock_events', [{ name: 'block_height', sort: 'DESC' }]);
|
||||
pgm.createIndex('stx_lock_events', [{ name: 'unlock_height', sort: 'DESC' }]);
|
||||
pgm.createIndex('stx_lock_events', 'contract_name');
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('miner_rewards', {
|
||||
id: {
|
||||
@@ -50,8 +53,8 @@ exports.up = pgm => {
|
||||
}
|
||||
});
|
||||
|
||||
pgm.createIndex('miner_rewards', 'index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('miner_rewards', 'recipient', { method: 'hash' });
|
||||
pgm.createIndex('miner_rewards', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('miner_rewards', 'recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('miner_rewards', 'miner_address');
|
||||
pgm.createIndex('miner_rewards', [{ name: 'mature_block_height', sort: 'DESC' }]);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('burnchain_rewards', {
|
||||
id: {
|
||||
@@ -35,7 +38,7 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('burnchain_rewards', 'burn_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('burnchain_rewards', 'reward_recipient', { method: 'hash' });
|
||||
pgm.createIndex('burnchain_rewards', 'burn_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('burnchain_rewards', 'reward_recipient', { method: INDEX_METHOD });
|
||||
pgm.createIndex('burnchain_rewards', [{ name: 'burn_block_height', sort: 'DESC' }]);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('namespaces', {
|
||||
id: {
|
||||
@@ -88,7 +91,7 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('namespaces', 'index_block_hash');
|
||||
pgm.createIndex('namespaces', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('namespaces', [
|
||||
{ name: 'ready_block', sort: 'DESC' },
|
||||
{ name: 'microblock_sequence', sort: 'DESC' },
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('names', {
|
||||
id: {
|
||||
@@ -81,8 +84,8 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('names', 'namespace_id');
|
||||
pgm.createIndex('names', 'index_block_hash');
|
||||
pgm.createIndex('names', 'namespace_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('names', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('names', [
|
||||
{ name: 'registered_at', sort: 'DESC' },
|
||||
{ name: 'microblock_sequence', sort: 'DESC' },
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('subdomains', {
|
||||
id: {
|
||||
@@ -80,8 +83,8 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('subdomains', 'name');
|
||||
pgm.createIndex('subdomains', 'index_block_hash');
|
||||
pgm.createIndex('subdomains', 'name', { method: INDEX_METHOD });
|
||||
pgm.createIndex('subdomains', 'index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('subdomains', [
|
||||
{ name: 'block_height', sort: 'DESC' },
|
||||
{ name: 'microblock_sequence', sort: 'DESC' },
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('reward_slot_holders', {
|
||||
id: {
|
||||
@@ -27,6 +30,6 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('reward_slot_holders', 'burn_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('reward_slot_holders', 'burn_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('reward_slot_holders', [{ name: 'burn_block_height', sort: 'DESC' }]);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('token_offering_locked', {
|
||||
id: {
|
||||
@@ -19,5 +22,5 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('token_offering_locked', 'address', { method: 'hash' });
|
||||
pgm.createIndex('token_offering_locked', 'address', { method: INDEX_METHOD });
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('microblocks', {
|
||||
id: {
|
||||
@@ -26,7 +29,7 @@ exports.up = pgm => {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
// For the first microblock (sequence number 0), this points to the parent/anchor block hash,
|
||||
// For the first microblock (sequence number 0), this points to the parent/anchor block hash,
|
||||
// for subsequent microblocks it points to the previous microblock's hash.
|
||||
microblock_parent_hash: {
|
||||
type: 'bytea',
|
||||
@@ -70,8 +73,8 @@ exports.up = pgm => {
|
||||
}
|
||||
});
|
||||
|
||||
pgm.createIndex('microblocks', 'microblock_hash', { method: 'hash' });
|
||||
pgm.createIndex('microblocks', 'parent_index_block_hash', { method: 'hash' });
|
||||
pgm.createIndex('microblocks', 'microblock_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('microblocks', 'parent_index_block_hash', { method: INDEX_METHOD });
|
||||
pgm.createIndex('microblocks', [
|
||||
{ name: 'block_height', sort: 'DESC' },
|
||||
{ name: 'microblock_sequence', sort: 'DESC' }
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('nft_metadata', {
|
||||
id: {
|
||||
@@ -20,14 +23,14 @@ exports.up = pgm => {
|
||||
image_uri: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
},
|
||||
},
|
||||
image_canonical_uri: {
|
||||
type: 'string',
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
},
|
||||
},
|
||||
contract_id: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
unique: true,
|
||||
},
|
||||
tx_id: {
|
||||
@@ -35,10 +38,10 @@ exports.up = pgm => {
|
||||
notNull: true,
|
||||
},
|
||||
sender_address: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
}
|
||||
});
|
||||
|
||||
pgm.createIndex('nft_metadata', 'contract_id', { method: 'hash' });
|
||||
pgm.createIndex('nft_metadata', 'contract_id', { method: INDEX_METHOD });
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('ft_metadata', {
|
||||
id: {
|
||||
@@ -20,33 +23,33 @@ exports.up = pgm => {
|
||||
image_uri: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
},
|
||||
},
|
||||
image_canonical_uri: {
|
||||
type: 'string',
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
},
|
||||
},
|
||||
contract_id: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
unique: true,
|
||||
},
|
||||
symbol: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
},
|
||||
decimals: {
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
type: 'integer',
|
||||
notNull: true,
|
||||
},
|
||||
tx_id: {
|
||||
type: 'bytea',
|
||||
notNull: true,
|
||||
},
|
||||
sender_address: {
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
type: 'string',
|
||||
notNull: true,
|
||||
}
|
||||
});
|
||||
|
||||
pgm.createIndex('ft_metadata', 'contract_id', { method: 'hash' });
|
||||
pgm.createIndex('ft_metadata', 'contract_id', { method: INDEX_METHOD });
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
pgm.createTable('zonefiles', {
|
||||
id: {
|
||||
@@ -27,7 +30,7 @@ exports.up = pgm => {
|
||||
}
|
||||
});
|
||||
|
||||
pgm.addIndex('zonefiles', 'zonefile_hash');
|
||||
pgm.addIndex('zonefiles', 'zonefile_hash', { method: INDEX_METHOD });
|
||||
pgm.addConstraint(
|
||||
'zonefiles',
|
||||
'unique_name_zonefile_hash_tx_id_index_block_hash',
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
|
||||
|
||||
const INDEX_METHOD = process.env.PG_IDENT_INDEX_TYPE;
|
||||
|
||||
exports.up = pgm => {
|
||||
/**
|
||||
* Stores all `tx_id`s of transactions that affect a principal's STX balance since that cannot be
|
||||
@@ -47,8 +50,8 @@ exports.up = pgm => {
|
||||
},
|
||||
});
|
||||
|
||||
pgm.createIndex('principal_stx_txs', 'tx_id', { method: 'hash' });
|
||||
pgm.createIndex('principal_stx_txs', 'principal', { method: 'hash' });
|
||||
pgm.createIndex('principal_stx_txs', 'tx_id', { method: INDEX_METHOD });
|
||||
pgm.createIndex('principal_stx_txs', 'principal', { method: INDEX_METHOD });
|
||||
pgm.createIndex('principal_stx_txs', [
|
||||
{ name: 'block_height', sort: 'DESC' },
|
||||
{ name: 'microblock_sequence', sort: 'DESC' },
|
||||
|
||||
6440
package-lock.json
generated
6440
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
70
run_event_replay
Executable file
70
run_event_replay
Executable file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# 1. Parsing comman line arguments
|
||||
#
|
||||
while getopts w: flag
|
||||
do
|
||||
case "${flag}" in
|
||||
w) workers=${OPTARG};
|
||||
esac
|
||||
done
|
||||
|
||||
#
|
||||
# 2. Prepare DB for event-replay
|
||||
#
|
||||
PG_USER=postgres \
|
||||
PG_PASSWORD=postgres \
|
||||
PG_DATABASE=stacks_blockchain_api \
|
||||
PG_SCHEMA=public \
|
||||
PG_PORT=5490 \
|
||||
node ./lib/event-replay/parquet-based/pre-replay-db.js
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 3. NEW_BURN_BLOCK events
|
||||
#
|
||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --new-burn-block
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 4. ATTACHMENTS_NEW events
|
||||
#
|
||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --attachment-new
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 5. NEW_BLOCK events
|
||||
#
|
||||
|
||||
# Generate ID chunks in accordance to workers amount
|
||||
node ./lib/event-replay/parquet-based/gen-ids-file.js --workers=${workers}
|
||||
|
||||
wait
|
||||
|
||||
id_files=(./events/new_block/ids_*.txt)
|
||||
for id_file in "${id_files[@]}"
|
||||
do
|
||||
NODE_OPTIONS=--max-old-space-size=8192 \
|
||||
STACKS_CHAIN_ID=0x00000001 \
|
||||
node ./lib/index.js from-parquet-events --new-block --ids-path=${id_file} &
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 6. Prepare DB for regular operations after event-replay
|
||||
#
|
||||
PG_USER=postgres \
|
||||
PG_PASSWORD=postgres \
|
||||
PG_DATABASE=stacks_blockchain_api \
|
||||
PG_SCHEMA=public \
|
||||
PG_PORT=5490 \
|
||||
node ./lib/event-replay/parquet-based/post-replay-db.js
|
||||
|
||||
wait
|
||||
|
||||
exit 0
|
||||
@@ -704,6 +704,7 @@ export interface DbBnsSubdomain {
|
||||
tx_id: string;
|
||||
tx_index: number;
|
||||
canonical: boolean;
|
||||
index_block_hash?: string;
|
||||
}
|
||||
|
||||
export interface DbConfigState {
|
||||
|
||||
@@ -552,109 +552,32 @@ export class PgWriteStore extends PgStore {
|
||||
return result.count;
|
||||
}
|
||||
|
||||
async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
|
||||
const values: BlockInsertValues[] = blocks.map(block => ({
|
||||
block_hash: block.block_hash,
|
||||
index_block_hash: block.index_block_hash,
|
||||
parent_index_block_hash: block.parent_index_block_hash,
|
||||
parent_block_hash: block.parent_block_hash,
|
||||
parent_microblock_hash: block.parent_microblock_hash,
|
||||
parent_microblock_sequence: block.parent_microblock_sequence,
|
||||
block_height: block.block_height,
|
||||
burn_block_time: block.burn_block_time,
|
||||
burn_block_hash: block.burn_block_hash,
|
||||
burn_block_height: block.burn_block_height,
|
||||
miner_txid: block.miner_txid,
|
||||
canonical: block.canonical,
|
||||
execution_cost_read_count: block.execution_cost_read_count,
|
||||
execution_cost_read_length: block.execution_cost_read_length,
|
||||
execution_cost_runtime: block.execution_cost_runtime,
|
||||
execution_cost_write_count: block.execution_cost_write_count,
|
||||
execution_cost_write_length: block.execution_cost_write_length,
|
||||
}));
|
||||
async insertStxEventBatch(sql: PgSqlClient, stxEvents: StxEventInsertValues[]) {
|
||||
const values = stxEvents.map(s => {
|
||||
const value: StxEventInsertValues = {
|
||||
event_index: s.event_index,
|
||||
tx_id: s.tx_id,
|
||||
tx_index: s.tx_index,
|
||||
block_height: s.block_height,
|
||||
index_block_hash: s.index_block_hash,
|
||||
parent_index_block_hash: s.parent_index_block_hash,
|
||||
microblock_hash: s.microblock_hash,
|
||||
microblock_sequence: s.microblock_sequence,
|
||||
microblock_canonical: s.microblock_canonical,
|
||||
canonical: s.canonical,
|
||||
asset_event_type_id: s.asset_event_type_id,
|
||||
sender: s.sender,
|
||||
recipient: s.recipient,
|
||||
amount: s.amount,
|
||||
memo: s.memo ?? null,
|
||||
};
|
||||
return value;
|
||||
});
|
||||
await sql`
|
||||
INSERT INTO blocks ${sql(values)}
|
||||
INSERT INTO stx_events ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
|
||||
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
|
||||
canonical: mb.canonical,
|
||||
microblock_canonical: mb.microblock_canonical,
|
||||
microblock_hash: mb.microblock_hash,
|
||||
microblock_sequence: mb.microblock_sequence,
|
||||
microblock_parent_hash: mb.microblock_parent_hash,
|
||||
parent_index_block_hash: mb.parent_index_block_hash,
|
||||
block_height: mb.block_height,
|
||||
parent_block_height: mb.parent_block_height,
|
||||
parent_block_hash: mb.parent_block_hash,
|
||||
index_block_hash: mb.index_block_hash,
|
||||
block_hash: mb.block_hash,
|
||||
parent_burn_block_height: mb.parent_burn_block_height,
|
||||
parent_burn_block_hash: mb.parent_burn_block_hash,
|
||||
parent_burn_block_time: mb.parent_burn_block_time,
|
||||
}));
|
||||
const mbResult = await sql`
|
||||
INSERT INTO microblocks ${sql(values)}
|
||||
`;
|
||||
if (mbResult.count !== microblocks.length) {
|
||||
throw new Error(
|
||||
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
|
||||
const values: TxInsertValues[] = txs.map(tx => ({
|
||||
tx_id: tx.tx_id,
|
||||
raw_tx: tx.raw_result,
|
||||
tx_index: tx.tx_index,
|
||||
index_block_hash: tx.index_block_hash,
|
||||
parent_index_block_hash: tx.parent_index_block_hash,
|
||||
block_hash: tx.block_hash,
|
||||
parent_block_hash: tx.parent_block_hash,
|
||||
block_height: tx.block_height,
|
||||
burn_block_time: tx.burn_block_time,
|
||||
parent_burn_block_time: tx.parent_burn_block_time,
|
||||
type_id: tx.type_id,
|
||||
anchor_mode: tx.anchor_mode,
|
||||
status: tx.status,
|
||||
canonical: tx.canonical,
|
||||
post_conditions: tx.post_conditions,
|
||||
nonce: tx.nonce,
|
||||
fee_rate: tx.fee_rate,
|
||||
sponsored: tx.sponsored,
|
||||
sponsor_nonce: tx.sponsor_nonce ?? null,
|
||||
sponsor_address: tx.sponsor_address ?? null,
|
||||
sender_address: tx.sender_address,
|
||||
origin_hash_mode: tx.origin_hash_mode,
|
||||
microblock_canonical: tx.microblock_canonical,
|
||||
microblock_sequence: tx.microblock_sequence,
|
||||
microblock_hash: tx.microblock_hash,
|
||||
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
|
||||
token_transfer_amount: tx.token_transfer_amount ?? null,
|
||||
token_transfer_memo: tx.token_transfer_memo ?? null,
|
||||
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
|
||||
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
|
||||
smart_contract_source_code: tx.smart_contract_source_code ?? null,
|
||||
contract_call_contract_id: tx.contract_call_contract_id ?? null,
|
||||
contract_call_function_name: tx.contract_call_function_name ?? null,
|
||||
contract_call_function_args: tx.contract_call_function_args ?? null,
|
||||
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
|
||||
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
|
||||
coinbase_payload: tx.coinbase_payload ?? null,
|
||||
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
|
||||
raw_result: tx.raw_result,
|
||||
event_count: tx.event_count,
|
||||
execution_cost_read_count: tx.execution_cost_read_count,
|
||||
execution_cost_read_length: tx.execution_cost_read_length,
|
||||
execution_cost_runtime: tx.execution_cost_runtime,
|
||||
execution_cost_write_count: tx.execution_cost_write_count,
|
||||
execution_cost_write_length: tx.execution_cost_write_length,
|
||||
}));
|
||||
await sql`INSERT INTO txs ${sql(values)}`;
|
||||
}
|
||||
|
||||
async updateBurnchainRewardSlotHolders({
|
||||
burnchainBlockHash,
|
||||
burnchainBlockHeight,
|
||||
@@ -1726,7 +1649,7 @@ export class PgWriteStore extends PgStore {
|
||||
if (result.count !== slotValues.length) {
|
||||
throw new Error(`Failed to insert slot holder for ${slotValues}`);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async insertBurnchainRewardsBatch(sql: PgSqlClient, rewards: DbBurnchainReward[]): Promise<void> {
|
||||
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
|
||||
@@ -1743,10 +1666,10 @@ export class PgWriteStore extends PgStore {
|
||||
INSERT into burnchain_rewards ${sql(rewardValues)}
|
||||
`;
|
||||
|
||||
if(res.count !== rewardValues.length) {
|
||||
if (res.count !== rewardValues.length) {
|
||||
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise<number> {
|
||||
const values: TxInsertValues = {
|
||||
@@ -3099,7 +3022,7 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a full event import is complete.
|
||||
* (event-replay) Finishes DB setup after an event-replay.
|
||||
*/
|
||||
async finishEventReplay() {
|
||||
if (!this.isEventReplay) {
|
||||
@@ -3113,7 +3036,235 @@ export class PgWriteStore extends PgStore {
|
||||
});
|
||||
}
|
||||
|
||||
/** Enable or disable indexes for the provided set of tables. */
|
||||
/**
|
||||
* batch operations (mainly for event-replay)
|
||||
*/
|
||||
|
||||
async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
|
||||
const values: BlockInsertValues[] = blocks.map(block => ({
|
||||
block_hash: block.block_hash,
|
||||
index_block_hash: block.index_block_hash,
|
||||
parent_index_block_hash: block.parent_index_block_hash,
|
||||
parent_block_hash: block.parent_block_hash,
|
||||
parent_microblock_hash: block.parent_microblock_hash,
|
||||
parent_microblock_sequence: block.parent_microblock_sequence,
|
||||
block_height: block.block_height,
|
||||
burn_block_time: block.burn_block_time,
|
||||
burn_block_hash: block.burn_block_hash,
|
||||
burn_block_height: block.burn_block_height,
|
||||
miner_txid: block.miner_txid,
|
||||
canonical: block.canonical,
|
||||
execution_cost_read_count: block.execution_cost_read_count,
|
||||
execution_cost_read_length: block.execution_cost_read_length,
|
||||
execution_cost_runtime: block.execution_cost_runtime,
|
||||
execution_cost_write_count: block.execution_cost_write_count,
|
||||
execution_cost_write_length: block.execution_cost_write_length,
|
||||
}));
|
||||
await sql`
|
||||
INSERT INTO blocks ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
|
||||
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
|
||||
canonical: mb.canonical,
|
||||
microblock_canonical: mb.microblock_canonical,
|
||||
microblock_hash: mb.microblock_hash,
|
||||
microblock_sequence: mb.microblock_sequence,
|
||||
microblock_parent_hash: mb.microblock_parent_hash,
|
||||
parent_index_block_hash: mb.parent_index_block_hash,
|
||||
block_height: mb.block_height,
|
||||
parent_block_height: mb.parent_block_height,
|
||||
parent_block_hash: mb.parent_block_hash,
|
||||
index_block_hash: mb.index_block_hash,
|
||||
block_hash: mb.block_hash,
|
||||
parent_burn_block_height: mb.parent_burn_block_height,
|
||||
parent_burn_block_hash: mb.parent_burn_block_hash,
|
||||
parent_burn_block_time: mb.parent_burn_block_time,
|
||||
}));
|
||||
const mbResult = await sql`
|
||||
INSERT INTO microblocks ${sql(values)}
|
||||
`;
|
||||
if (mbResult.count !== microblocks.length) {
|
||||
throw new Error(
|
||||
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// alias to insertMicroblock
|
||||
async insertMicroblockBatch(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
|
||||
return this.insertMicroblock(sql, microblocks);
|
||||
}
|
||||
|
||||
async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
|
||||
const values: TxInsertValues[] = txs.map(tx => ({
|
||||
tx_id: tx.tx_id,
|
||||
raw_tx: tx.raw_result,
|
||||
tx_index: tx.tx_index,
|
||||
index_block_hash: tx.index_block_hash,
|
||||
parent_index_block_hash: tx.parent_index_block_hash,
|
||||
block_hash: tx.block_hash,
|
||||
parent_block_hash: tx.parent_block_hash,
|
||||
block_height: tx.block_height,
|
||||
burn_block_time: tx.burn_block_time,
|
||||
parent_burn_block_time: tx.parent_burn_block_time,
|
||||
type_id: tx.type_id,
|
||||
anchor_mode: tx.anchor_mode,
|
||||
status: tx.status,
|
||||
canonical: tx.canonical,
|
||||
post_conditions: tx.post_conditions,
|
||||
nonce: tx.nonce,
|
||||
fee_rate: tx.fee_rate,
|
||||
sponsored: tx.sponsored,
|
||||
sponsor_nonce: tx.sponsor_nonce ?? null,
|
||||
sponsor_address: tx.sponsor_address ?? null,
|
||||
sender_address: tx.sender_address,
|
||||
origin_hash_mode: tx.origin_hash_mode,
|
||||
microblock_canonical: tx.microblock_canonical,
|
||||
microblock_sequence: tx.microblock_sequence,
|
||||
microblock_hash: tx.microblock_hash,
|
||||
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
|
||||
token_transfer_amount: tx.token_transfer_amount ?? null,
|
||||
token_transfer_memo: tx.token_transfer_memo ?? null,
|
||||
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
|
||||
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
|
||||
smart_contract_source_code: tx.smart_contract_source_code ?? null,
|
||||
contract_call_contract_id: tx.contract_call_contract_id ?? null,
|
||||
contract_call_function_name: tx.contract_call_function_name ?? null,
|
||||
contract_call_function_args: tx.contract_call_function_args ?? null,
|
||||
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
|
||||
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
|
||||
coinbase_payload: tx.coinbase_payload ?? null,
|
||||
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
|
||||
raw_result: tx.raw_result,
|
||||
event_count: tx.event_count,
|
||||
execution_cost_read_count: tx.execution_cost_read_count,
|
||||
execution_cost_read_length: tx.execution_cost_read_length,
|
||||
execution_cost_runtime: tx.execution_cost_runtime,
|
||||
execution_cost_write_count: tx.execution_cost_write_count,
|
||||
execution_cost_write_length: tx.execution_cost_write_length,
|
||||
}));
|
||||
await sql`INSERT INTO txs ${sql(values)}`;
|
||||
}
|
||||
|
||||
async insertPrincipalStxTxsBatch(sql: PgSqlClient, values: PrincipalStxTxsInsertValues[]) {
|
||||
await sql`
|
||||
INSERT INTO principal_stx_txs ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertContractEventBatch(sql: PgSqlClient, values: SmartContractEventInsertValues[]) {
|
||||
await sql`
|
||||
INSERT INTO contract_logs ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertFtEventBatch(sql: PgSqlClient, values: FtEventInsertValues[]) {
|
||||
await sql`
|
||||
INSERT INTO ft_events ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertNftEventBatch(sql: PgSqlClient, values: NftEventInsertValues[]) {
|
||||
await sql`INSERT INTO nft_events ${sql(values)}`;
|
||||
}
|
||||
|
||||
async insertNameBatch(sql: PgSqlClient, values: BnsNameInsertValues[]) {
|
||||
await sql`
|
||||
INSERT INTO names ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertNamespace(
|
||||
sql: PgSqlClient,
|
||||
blockData: {
|
||||
index_block_hash: string;
|
||||
parent_index_block_hash: string;
|
||||
microblock_hash: string;
|
||||
microblock_sequence: number;
|
||||
microblock_canonical: boolean;
|
||||
},
|
||||
bnsNamespace: DbBnsNamespace
|
||||
) {
|
||||
const values: BnsNamespaceInsertValues = {
|
||||
namespace_id: bnsNamespace.namespace_id,
|
||||
launched_at: bnsNamespace.launched_at ?? null,
|
||||
address: bnsNamespace.address,
|
||||
reveal_block: bnsNamespace.reveal_block,
|
||||
ready_block: bnsNamespace.ready_block,
|
||||
buckets: bnsNamespace.buckets,
|
||||
base: bnsNamespace.base.toString(),
|
||||
coeff: bnsNamespace.coeff.toString(),
|
||||
nonalpha_discount: bnsNamespace.nonalpha_discount.toString(),
|
||||
no_vowel_discount: bnsNamespace.no_vowel_discount.toString(),
|
||||
lifetime: bnsNamespace.lifetime,
|
||||
status: bnsNamespace.status ?? null,
|
||||
tx_index: bnsNamespace.tx_index,
|
||||
tx_id: bnsNamespace.tx_id,
|
||||
canonical: bnsNamespace.canonical,
|
||||
index_block_hash: blockData.index_block_hash,
|
||||
parent_index_block_hash: blockData.parent_index_block_hash,
|
||||
microblock_hash: blockData.microblock_hash,
|
||||
microblock_sequence: blockData.microblock_sequence,
|
||||
microblock_canonical: blockData.microblock_canonical,
|
||||
};
|
||||
await sql`
|
||||
INSERT INTO namespaces ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertZonefileBatch(sql: PgSqlClient, values: BnsZonefileInsertValues[]) {
|
||||
await sql`
|
||||
INSERT INTO zonefiles ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async updateBatchSubdomainsEventReplay(
|
||||
sql: PgSqlClient,
|
||||
data: DataStoreAttachmentSubdomainData[]
|
||||
): Promise<void> {
|
||||
const subdomainValues: BnsSubdomainInsertValues[] = [];
|
||||
for (const dataItem of data) {
|
||||
if (dataItem.subdomains && dataItem.blockData) {
|
||||
for (const subdomain of dataItem.subdomains) {
|
||||
subdomainValues.push({
|
||||
name: subdomain.name,
|
||||
namespace_id: subdomain.namespace_id,
|
||||
fully_qualified_subdomain: subdomain.fully_qualified_subdomain,
|
||||
owner: subdomain.owner,
|
||||
zonefile_hash: validateZonefileHash(subdomain.zonefile_hash),
|
||||
parent_zonefile_hash: subdomain.parent_zonefile_hash,
|
||||
parent_zonefile_index: subdomain.parent_zonefile_index,
|
||||
block_height: subdomain.block_height,
|
||||
tx_index: subdomain.tx_index,
|
||||
zonefile_offset: subdomain.zonefile_offset,
|
||||
resolver: subdomain.resolver,
|
||||
canonical: subdomain.canonical,
|
||||
tx_id: subdomain.tx_id,
|
||||
index_block_hash: dataItem.blockData.index_block_hash,
|
||||
parent_index_block_hash: dataItem.blockData.parent_index_block_hash,
|
||||
microblock_hash: dataItem.blockData.microblock_hash,
|
||||
microblock_sequence: dataItem.blockData.microblock_sequence,
|
||||
microblock_canonical: dataItem.blockData.microblock_canonical,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
if (subdomainValues.length === 0) {
|
||||
return;
|
||||
}
|
||||
const result = await sql`
|
||||
INSERT INTO subdomains ${sql(subdomainValues)}
|
||||
`;
|
||||
if (result.count !== subdomainValues.length) {
|
||||
throw new Error(`Expected ${subdomainValues.length} subdomain inserts, got ${result.count}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* (event-replay) Enable or disable indexes for the provided set of tables.
|
||||
*/
|
||||
async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise<void> {
|
||||
const tableSchema = this.sql.options.connection.search_path ?? 'public';
|
||||
const result = await sql`
|
||||
|
||||
@@ -1,20 +1,23 @@
|
||||
import { Database, QueryResult } from "duckdb";
|
||||
import { Database, QueryResult, TableData } from 'duckdb';
|
||||
|
||||
export class DatasetStore {
|
||||
|
||||
private readonly db;
|
||||
|
||||
constructor() {
|
||||
this.db = new Database(':memory:');
|
||||
};
|
||||
}
|
||||
|
||||
static async connect(): Promise<DatasetStore> {
|
||||
static connect(): DatasetStore {
|
||||
return new DatasetStore();
|
||||
};
|
||||
}
|
||||
|
||||
newBlockEventsIds = () => {
|
||||
var con = this.db.connect();
|
||||
return new Promise((resolve) => {
|
||||
//
|
||||
// NEW_BLOCK canonical payload manipulation/queries
|
||||
//
|
||||
|
||||
newBlockEventsIds = (): Promise<number[]> => {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT ID FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
|
||||
(err: any, result: any) => {
|
||||
@@ -22,27 +25,47 @@ export class DatasetStore {
|
||||
throw err;
|
||||
}
|
||||
|
||||
let res = result.map((a: any) => a.id); // extract IDs as an Array
|
||||
const res: number[] = result.map((a: { id: number }) => a.id); // extract IDs as an Array
|
||||
resolve(res);
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
newBlockEventsOrderedPayloadStream = (): Promise<QueryResult> => {
|
||||
return new Promise(async (resolve) => {
|
||||
var con = this.db.connect();
|
||||
newBlockEventsPayloadStream = (ids: number[]): Promise<QueryResult> => {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
"SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') ORDER BY id",
|
||||
`SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
});
|
||||
};
|
||||
|
||||
getNewBlockEventsInBlockHeights = (blockHeights: number[]): Promise<TableData> => {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
`SELECT * FROM READ_PARQUET('events/new_block/*.parquet') WHERE block_height IN (${blockHeights})`,
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
resolve(res);
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
//
|
||||
// NEW_BURN_BLOCK EVENTS
|
||||
//
|
||||
|
||||
newBurnBlockEventsOrdered = () => {
|
||||
return new Promise((resolve) => {
|
||||
var con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet') ORDER BY id",
|
||||
(err: any, result: any) => {
|
||||
@@ -55,4 +78,24 @@ export class DatasetStore {
|
||||
);
|
||||
});
|
||||
};
|
||||
};
|
||||
|
||||
//
|
||||
// ATTACHMENTS_NEW EVENTS
|
||||
//
|
||||
|
||||
attachmentsNewEvents = (): Promise<TableData> => {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/*.parquet')",
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
resolve(result);
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,74 +1,102 @@
|
||||
import * as tty from 'tty';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
|
||||
import { insertNewBlockEvents } from './importers/new_block_importer';
|
||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||
import { processNewBlockEvents } from './importers/new-block-importer';
|
||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => {
|
||||
/**
|
||||
*
|
||||
*/
|
||||
const ingestNewBurnBlock = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'import-events',
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = await DatasetStore.connect();
|
||||
|
||||
if (wipeDB) {
|
||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||
}
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
throw new Error(
|
||||
`DB migration cycle failed, possibly due to an incompatible API version upgrade. Add --wipe-db --force or perform a manual DB wipe before importing.`
|
||||
);
|
||||
}
|
||||
|
||||
let tables: string[] = [];
|
||||
if (disableIndexes) {
|
||||
// Get DB tables
|
||||
const dbName = db.sql.options.database; // stacks-blockchain-api
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
logger.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
tables = tablesQuery.map(r => r.tablename);
|
||||
|
||||
// Disable indexing and constraints on tables to speed up insertion
|
||||
logger.info(`Disable indexes on tables: ${tables.join(', ')}`);
|
||||
db.toggleTableIndexes(db.sql, tables, false);
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
insertNewBurnBlockEvents(db, dataset, timeTracker),
|
||||
insertNewBlockEvents(db, dataset, timeTracker)
|
||||
]);
|
||||
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
|
||||
await processNewBurnBlockEvents(db, dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (disableIndexes) {
|
||||
logger.info(`Enable indexes on tables: ${tables.join(', ')}`);
|
||||
db.toggleTableIndexes(db.sql, tables, true);
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
|
||||
await db.close();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
const ingestAttachmentNew = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
|
||||
await processAttachmentNewEvents(db, dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
|
||||
await db.close();
|
||||
}
|
||||
};
|
||||
|
||||
const ingestNewBlock = async (idsPath?: string) => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${idsPath}`, 'utf-8');
|
||||
const ids = idsFileContent.split(/\r?\n/);
|
||||
|
||||
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
|
||||
await processNewBlockEvents(db, dataset, ids);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
@@ -76,6 +104,6 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export { run };
|
||||
export { ingestNewBlock, ingestAttachmentNew, ingestNewBurnBlock };
|
||||
|
||||
44
src/event-replay/parquet-based/gen-ids-file.ts
Normal file
44
src/event-replay/parquet-based/gen-ids-file.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { logger } from '../../logger';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { splitIntoChunks } from './helpers';
|
||||
|
||||
(async () => {
|
||||
const args = process.argv.slice(2);
|
||||
const workers: number = Number(args[0].split('=')[1]);
|
||||
|
||||
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
const ids: number[] = await dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
const chunks = splitIntoChunks(ids, batchSize);
|
||||
|
||||
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
|
||||
// delete previous files
|
||||
files.map(
|
||||
file =>
|
||||
new Promise((resolve, reject) => {
|
||||
try {
|
||||
fs.unlinkSync(`${dir}/${file}`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
});
|
||||
})().catch(err => {
|
||||
throw new err();
|
||||
});
|
||||
@@ -1,12 +1,12 @@
|
||||
interface TimeTracker {
|
||||
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
|
||||
trackSync<T = void>(name: string, fn: () => T): T;
|
||||
getDurations: (
|
||||
roundDecimals?: number
|
||||
) => {
|
||||
name: string;
|
||||
seconds: string;
|
||||
}[];
|
||||
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
|
||||
trackSync<T = void>(name: string, fn: () => T): T;
|
||||
getDurations: (
|
||||
roundDecimals?: number
|
||||
) => {
|
||||
name: string;
|
||||
seconds: string;
|
||||
}[];
|
||||
}
|
||||
|
||||
const createTimeTracker = (): TimeTracker => {
|
||||
@@ -49,6 +49,40 @@ const createTimeTracker = (): TimeTracker => {
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
interface Stopwatch {
|
||||
/** Milliseconds since stopwatch was created. */
|
||||
getElapsed: () => number;
|
||||
/** Seconds since stopwatch was created. */
|
||||
getElapsedSeconds: (roundDecimals?: number) => number;
|
||||
getElapsedAndRestart: () => number;
|
||||
restart(): void;
|
||||
}
|
||||
|
||||
function stopwatch(): Stopwatch {
|
||||
let start = process.hrtime.bigint();
|
||||
const result: Stopwatch = {
|
||||
getElapsedSeconds: (roundDecimals?: number) => {
|
||||
const elapsedMs = result.getElapsed();
|
||||
const seconds = elapsedMs / 1000;
|
||||
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
|
||||
},
|
||||
getElapsed: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
return Number((end - start) / 1_000_000n);
|
||||
},
|
||||
getElapsedAndRestart: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
const result = Number((end - start) / 1_000_000n);
|
||||
start = process.hrtime.bigint();
|
||||
return result;
|
||||
},
|
||||
restart: () => {
|
||||
start = process.hrtime.bigint();
|
||||
},
|
||||
};
|
||||
return result;
|
||||
}
|
||||
|
||||
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
|
||||
@@ -57,8 +91,8 @@ function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
|
||||
}
|
||||
}
|
||||
|
||||
const splitIntoChunks = async (data: object[], chunk_size: number) => {
|
||||
const splitIntoChunks = (data: number[], chunk_size: number) => {
|
||||
return [...chunks(data, chunk_size)];
|
||||
};
|
||||
|
||||
export { TimeTracker, createTimeTracker, splitIntoChunks };
|
||||
export { createTimeTracker, splitIntoChunks };
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseAttachmentMessage } from '../../../event-stream/event-server';
|
||||
import { logger } from '../../../logger';
|
||||
import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-message';
|
||||
import { DataStoreAttachmentSubdomainData, DataStoreBnsBlockData } from '../../../datastore/common';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { I32_MAX } from '../../../helpers';
|
||||
|
||||
export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||
logger.info({ component: 'event-replay' }, 'ATTACHMENTS_NEW events process started');
|
||||
|
||||
const attachmentsNewEvents = await dataset.attachmentsNewEvents();
|
||||
const ary: DataStoreAttachmentSubdomainData[] = [];
|
||||
|
||||
for await (const event of attachmentsNewEvents) {
|
||||
const blockData: DataStoreBnsBlockData = {
|
||||
index_block_hash: '',
|
||||
parent_index_block_hash: '',
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
};
|
||||
const dataStore: DataStoreAttachmentSubdomainData = {};
|
||||
|
||||
const attachmentMsg: CoreNodeAttachmentMessage[] = JSON.parse(event.payload);
|
||||
const attachments = parseAttachmentMessage(attachmentMsg);
|
||||
dataStore.subdomains = attachments.subdomainObj.dbBnsSubdomain;
|
||||
|
||||
blockData.index_block_hash = attachments.subdomainObj.attachmentData.indexBlockHash;
|
||||
dataStore.blockData = blockData;
|
||||
|
||||
dataStore.attachment = attachments.subdomainObj.attachmentData;
|
||||
|
||||
ary.push(dataStore);
|
||||
}
|
||||
|
||||
const blockHeights = [];
|
||||
for (const el of ary) {
|
||||
if (el.subdomains!.length !== 0) {
|
||||
blockHeights.push(el.attachment!.blockHeight);
|
||||
}
|
||||
}
|
||||
// get events from block heights
|
||||
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
|
||||
|
||||
for (const event of blockEvents) {
|
||||
for (const ds of ary) {
|
||||
if (ds.blockData?.index_block_hash === event.index_block_hash) {
|
||||
const txs = JSON.parse(event.payload).transactions;
|
||||
for (const tx of txs) {
|
||||
if (ds.attachment!.txId === tx.txid) {
|
||||
ds.blockData!.microblock_hash = tx.microblock_hash || '';
|
||||
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
|
||||
}
|
||||
}
|
||||
|
||||
ds.blockData!.index_block_hash = event.index_block_hash;
|
||||
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await db.updateBatchSubdomainsEventReplay(db.sql, ary);
|
||||
};
|
||||
445
src/event-replay/parquet-based/importers/new-block-importer.ts
Normal file
445
src/event-replay/parquet-based/importers/new-block-importer.ts
Normal file
@@ -0,0 +1,445 @@
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseNewBlockMessage } from '../../../event-stream/event-server';
|
||||
import {
|
||||
DbBlock,
|
||||
DbMicroblock,
|
||||
DbTx,
|
||||
SmartContractEventInsertValues,
|
||||
StxEventInsertValues,
|
||||
PrincipalStxTxsInsertValues,
|
||||
FtEventInsertValues,
|
||||
NftEventInsertValues,
|
||||
BnsNameInsertValues,
|
||||
BnsZonefileInsertValues,
|
||||
DataStoreBlockUpdateData,
|
||||
} from '../../../datastore/common';
|
||||
import { validateZonefileHash } from '../../../datastore/helpers';
|
||||
import { logger } from '../../../logger';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
push(entries: T[]): Promise<void>;
|
||||
flush(): Promise<void>;
|
||||
}
|
||||
|
||||
function createBatchInserter<T>({
|
||||
batchSize,
|
||||
insertFn,
|
||||
}: {
|
||||
batchSize: number;
|
||||
insertFn: (entries: T[]) => Promise<void>;
|
||||
}): BatchInserter<T> {
|
||||
let entryBuffer: T[] = [];
|
||||
return {
|
||||
async push(entries: T[]) {
|
||||
entries.length === 1
|
||||
? entryBuffer.push(entries[0])
|
||||
: entries.forEach(e => entryBuffer.push(e));
|
||||
if (entryBuffer.length === batchSize) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer.length = 0;
|
||||
} else if (entryBuffer.length > batchSize) {
|
||||
for (const batch of batchIterate(entryBuffer, batchSize)) {
|
||||
await insertFn(batch);
|
||||
}
|
||||
entryBuffer.length = 0;
|
||||
}
|
||||
},
|
||||
async flush() {
|
||||
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
|
||||
if (entryBuffer.length > 0) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer = [];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const populateBatchInserters = (db: PgWriteStore) => {
|
||||
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into blocks table...');
|
||||
return db.insertBlockBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbBlockBatchInserter);
|
||||
|
||||
const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into microblocks table...');
|
||||
return db.insertMicroblockBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbMicroblockBatchInserter);
|
||||
|
||||
const dbTxBatchInserter = createBatchInserter<DbTx>({
|
||||
batchSize: 1400,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into txs table...');
|
||||
return db.insertTxBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbTxBatchInserter);
|
||||
|
||||
const dbStxEventBatchInserter = createBatchInserter<StxEventInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into stx_events table...');
|
||||
return db.insertStxEventBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbStxEventBatchInserter);
|
||||
|
||||
const dbPrincipalStxTxBatchInserter = createBatchInserter<PrincipalStxTxsInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into stx_events table...');
|
||||
return db.insertPrincipalStxTxsBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbPrincipalStxTxBatchInserter);
|
||||
|
||||
const dbContractEventBatchInserter = createBatchInserter<SmartContractEventInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into contract_logs table...');
|
||||
return db.insertContractEventBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbContractEventBatchInserter);
|
||||
|
||||
const dbFtEventBatchInserter = createBatchInserter<FtEventInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into ft_events table...');
|
||||
return db.insertFtEventBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbFtEventBatchInserter);
|
||||
|
||||
const dbNftEventBatchInserter = createBatchInserter<NftEventInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into nft_events table...');
|
||||
return db.insertNftEventBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbNftEventBatchInserter);
|
||||
|
||||
const dbNameBatchInserter = createBatchInserter<BnsNameInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into names table...');
|
||||
return db.insertNameBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbNameBatchInserter);
|
||||
|
||||
const dbZonefileBatchInserter = createBatchInserter<BnsZonefileInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: entries => {
|
||||
logger.debug({ component: 'event-replay' }, 'Inserting into zonefiles table...');
|
||||
return db.insertZonefileBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbZonefileBatchInserter);
|
||||
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data: CoreNodeBlockMessage, _encoding, next) => {
|
||||
let dbData: DataStoreBlockUpdateData;
|
||||
try {
|
||||
dbData = parseNewBlockMessage(chainID, data);
|
||||
} catch (err) {
|
||||
logger.error({ component: 'event-replay' }, 'Error when parsing new_block event');
|
||||
console.error(err);
|
||||
|
||||
throw err;
|
||||
}
|
||||
|
||||
const insertTxs = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbTxBatchInserter.push([entry.tx]);
|
||||
}
|
||||
};
|
||||
|
||||
const insertContractLogs = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbContractEventBatchInserter.push(
|
||||
entry.contractLogEvents.map((contractEvent: any) => ({
|
||||
event_index: contractEvent.event_index,
|
||||
tx_id: contractEvent.tx_id,
|
||||
tx_index: contractEvent.tx_index,
|
||||
block_height: contractEvent.block_height,
|
||||
index_block_hash: entry.tx.index_block_hash,
|
||||
parent_index_block_hash: entry.tx.parent_index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
canonical: contractEvent.canonical,
|
||||
contract_identifier: contractEvent.contract_identifier,
|
||||
topic: contractEvent.topic,
|
||||
value: contractEvent.value,
|
||||
}))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const insertStxEvents = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const stxEvent of entry.stxEvents) {
|
||||
await dbStxEventBatchInserter.push([
|
||||
{
|
||||
...stxEvent,
|
||||
index_block_hash: entry.tx.index_block_hash,
|
||||
parent_index_block_hash: entry.tx.parent_index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
sender: stxEvent.sender ?? null,
|
||||
recipient: stxEvent.recipient ?? null,
|
||||
amount: stxEvent.amount ?? null,
|
||||
memo: stxEvent.memo ?? null,
|
||||
},
|
||||
]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const insertPrincipalStxTxs = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
// string key: `principal, tx_id, index_block_hash, microblock_hash`
|
||||
const alreadyInsertedRowKeys = new Set<string>();
|
||||
const values: PrincipalStxTxsInsertValues[] = [];
|
||||
const push = (principal: string) => {
|
||||
// Check if this row has already been inserted by comparing the same columns used in the
|
||||
// sql unique constraint defined on the table. This prevents later errors during re-indexing
|
||||
// when the table indexes/constraints are temporarily disabled during inserts.
|
||||
const constraintKey = `${principal},${entry.tx.tx_id},${entry.tx.index_block_hash},${entry.tx.microblock_hash}`;
|
||||
if (!alreadyInsertedRowKeys.has(constraintKey)) {
|
||||
alreadyInsertedRowKeys.add(constraintKey);
|
||||
values.push({
|
||||
principal: principal,
|
||||
tx_id: entry.tx.tx_id,
|
||||
block_height: entry.tx.block_height,
|
||||
index_block_hash: entry.tx.index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
tx_index: entry.tx.tx_index,
|
||||
canonical: entry.tx.canonical,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const principals = new Set<string>();
|
||||
|
||||
// Insert tx data
|
||||
[
|
||||
entry.tx.sender_address,
|
||||
entry.tx.token_transfer_recipient_address,
|
||||
entry.tx.contract_call_contract_id,
|
||||
entry.tx.smart_contract_contract_id,
|
||||
]
|
||||
.filter((p): p is string => !!p)
|
||||
.forEach(p => principals.add(p));
|
||||
|
||||
// Insert stx_event data
|
||||
entry.stxEvents.forEach((event: any) => {
|
||||
if (event.sender) {
|
||||
principals.add(event.sender);
|
||||
}
|
||||
if (event.recipient) {
|
||||
principals.add(event.recipient);
|
||||
}
|
||||
});
|
||||
|
||||
principals.forEach(principal => push(principal));
|
||||
await dbPrincipalStxTxBatchInserter.push(values);
|
||||
}
|
||||
};
|
||||
|
||||
const insertFTEvents = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbFtEventBatchInserter.push(
|
||||
entry.ftEvents.map((ftEvent: any) => ({
|
||||
event_index: ftEvent.event_index,
|
||||
tx_id: ftEvent.tx_id,
|
||||
tx_index: ftEvent.tx_index,
|
||||
block_height: ftEvent.block_height,
|
||||
index_block_hash: entry.tx.index_block_hash,
|
||||
parent_index_block_hash: entry.tx.parent_index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
canonical: ftEvent.canonical,
|
||||
asset_event_type_id: ftEvent.asset_event_type_id,
|
||||
sender: ftEvent.sender ?? null,
|
||||
recipient: ftEvent.recipient ?? null,
|
||||
asset_identifier: ftEvent.asset_identifier,
|
||||
amount: ftEvent.amount.toString(),
|
||||
}))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const insertNFTEvents = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbNftEventBatchInserter.push(
|
||||
entry.nftEvents.map((nftEvent: any) => ({
|
||||
tx_id: nftEvent.tx_id,
|
||||
index_block_hash: entry.tx.index_block_hash,
|
||||
parent_index_block_hash: entry.tx.parent_index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
sender: nftEvent.sender ?? null,
|
||||
recipient: nftEvent.recipient ?? null,
|
||||
event_index: nftEvent.event_index,
|
||||
tx_index: nftEvent.tx_index,
|
||||
block_height: nftEvent.block_height,
|
||||
canonical: nftEvent.canonical,
|
||||
asset_event_type_id: nftEvent.asset_event_type_id,
|
||||
asset_identifier: nftEvent.asset_identifier,
|
||||
value: nftEvent.value,
|
||||
}))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const insertNames = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbNameBatchInserter.push(
|
||||
entry.names.map((bnsName: any) => ({
|
||||
name: bnsName.name,
|
||||
address: bnsName.address,
|
||||
registered_at: bnsName.registered_at,
|
||||
expire_block: bnsName.expire_block,
|
||||
zonefile_hash: validateZonefileHash(bnsName.zonefile_hash),
|
||||
namespace_id: bnsName.namespace_id,
|
||||
grace_period: bnsName.grace_period ?? null,
|
||||
renewal_deadline: bnsName.renewal_deadline ?? null,
|
||||
resolver: bnsName.resolver ?? null,
|
||||
tx_id: bnsName.tx_id ?? null,
|
||||
tx_index: bnsName.tx_index,
|
||||
event_index: bnsName.event_index ?? null,
|
||||
status: bnsName.status ?? null,
|
||||
canonical: bnsName.canonical,
|
||||
index_block_hash: entry.tx.index_block_hash ?? null,
|
||||
parent_index_block_hash: entry.tx.parent_index_block_hash,
|
||||
microblock_hash: entry.tx.microblock_hash,
|
||||
microblock_sequence: entry.tx.microblock_sequence,
|
||||
microblock_canonical: entry.tx.microblock_canonical,
|
||||
}))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const insertZoneFiles = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbZonefileBatchInserter.push(
|
||||
entry.names.map((bnsName: any) => ({
|
||||
name: bnsName.name,
|
||||
zonefile: bnsName.zonefile,
|
||||
zonefile_hash: validateZonefileHash(bnsName.zonefile_hash),
|
||||
tx_id: bnsName.tx_id,
|
||||
index_block_hash: bnsName.index_block_hash ?? null,
|
||||
}))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const insertSmartContracts = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const smartContract of entry.smartContracts) {
|
||||
await db.updateSmartContract(db.sql, entry.tx, smartContract);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const insertNamespaces = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const namespace of entry.namespaces) {
|
||||
await db.insertNamespace(db.sql, entry.tx, namespace);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const stxLockEvent of entry.stxLockEvents) {
|
||||
await db.updateStxLockEvent(db.sql, entry.tx, stxLockEvent);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
// Insert blocks
|
||||
dbBlockBatchInserter.push([dbData.block]),
|
||||
// // Insert microblocks
|
||||
dbMicroblockBatchInserter.push(dbData.microblocks),
|
||||
// // Insert txs
|
||||
insertTxs(dbData),
|
||||
// // Insert stx_events
|
||||
insertStxEvents(dbData),
|
||||
// // Insert principal_stx_txs
|
||||
insertPrincipalStxTxs(dbData),
|
||||
// // Insert contract_logs
|
||||
insertContractLogs(dbData),
|
||||
// // Insert ft_events
|
||||
insertFTEvents(dbData),
|
||||
// // Insert nft_events
|
||||
insertNFTEvents(dbData),
|
||||
// // Insert names
|
||||
insertNames(dbData),
|
||||
// Insert zonefiles
|
||||
insertZoneFiles(dbData),
|
||||
// Insert smart_contracts
|
||||
insertSmartContracts(dbData),
|
||||
// Insert namespaces
|
||||
insertNamespaces(dbData),
|
||||
// Insert stx_lock_events
|
||||
insertStxLockEvents(dbData),
|
||||
]);
|
||||
|
||||
next();
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const transformDataToJSON = () => {
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
transform: (data, _encoding, callback) => {
|
||||
callback(null, JSON.parse(data.payload));
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
export const processNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, ids?: any) => {
|
||||
logger.info({ component: 'event-replay' }, 'NEW_BLOCK events process started');
|
||||
|
||||
const payload = await dataset.newBlockEventsPayloadStream(ids);
|
||||
const toJSON = transformDataToJSON();
|
||||
const insertBatchData = populateBatchInserters(db);
|
||||
|
||||
await pipeline(
|
||||
Readable.from(payload),
|
||||
toJSON,
|
||||
insertBatchData.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
})
|
||||
);
|
||||
};
|
||||
@@ -2,7 +2,7 @@ import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { DbBurnchainReward, DbRewardSlotHolder } from '../../../datastore/common';
|
||||
import { CoreNodeBurnBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { logger } from '../../../logger';
|
||||
import { TimeTracker, splitIntoChunks } from '../helpers';
|
||||
import { splitIntoChunks } from '../helpers';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const INSERT_BATCH_SIZE = 500;
|
||||
@@ -42,15 +42,15 @@ const DbRewardSlotHolderParse = (payload: CoreNodeBurnBlockMessage) => {
|
||||
|
||||
const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: any) => {
|
||||
for (const chunk of chunks) {
|
||||
let burnchainRewards: DbBurnchainReward[] = [];
|
||||
let slotHolders: DbRewardSlotHolder[] = [];
|
||||
const burnchainRewards: DbBurnchainReward[] = [];
|
||||
const slotHolders: DbRewardSlotHolder[] = [];
|
||||
for (const event of chunk) {
|
||||
const payload: CoreNodeBurnBlockMessage = JSON.parse(event['payload']);
|
||||
const burnchainRewardsData = DbBurnchainRewardParse(payload);
|
||||
const slotHoldersData = DbRewardSlotHolderParse(payload);
|
||||
burnchainRewardsData.forEach(reward => burnchainRewards.push(reward));
|
||||
slotHoldersData.forEach(slotHolder => slotHolders.push(slotHolder));
|
||||
};
|
||||
}
|
||||
|
||||
if (burnchainRewards.length !== 0) {
|
||||
await db.insertBurnchainRewardsBatch(db.sql, burnchainRewards);
|
||||
@@ -59,15 +59,14 @@ const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: an
|
||||
if (slotHolders.length !== 0) {
|
||||
await db.insertSlotHoldersBatch(db.sql, slotHolders);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
export const insertNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
|
||||
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
|
||||
export const processNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||
logger.info({ component: 'event-replay' }, 'NEW_BURN_BLOCK events process started');
|
||||
|
||||
await timeTracker.track('insertNewBurnBlockEvents', async () => {
|
||||
return dataset.newBurnBlockEventsOrdered()
|
||||
.then(async (data: any) => await splitIntoChunks(data, INSERT_BATCH_SIZE))
|
||||
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
|
||||
});
|
||||
return dataset
|
||||
.newBurnBlockEventsOrdered()
|
||||
.then((data: any) => splitIntoChunks(data, INSERT_BATCH_SIZE))
|
||||
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
|
||||
};
|
||||
@@ -1,144 +0,0 @@
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseNewBlockMessage } from '../../../event-stream/event-server';
|
||||
import { DbBlock, DbMicroblock, DbTx } from '../../../datastore/common';
|
||||
import { logger } from '../../../logger';
|
||||
import { TimeTracker } from '../helpers';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
push(entries: T[]): Promise<void>;
|
||||
flush(): Promise<void>;
|
||||
}
|
||||
|
||||
function createBatchInserter<T>({
|
||||
batchSize,
|
||||
insertFn,
|
||||
}: {
|
||||
batchSize: number;
|
||||
insertFn: (entries: T[]) => Promise<void>;
|
||||
}): BatchInserter<T> {
|
||||
let entryBuffer: T[] = [];
|
||||
return {
|
||||
async push(entries: T[]) {
|
||||
entries.length === 1
|
||||
? entryBuffer.push(entries[0])
|
||||
: entries.forEach(e => entryBuffer.push(e));
|
||||
if (entryBuffer.length === batchSize) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer.length = 0;
|
||||
} else if (entryBuffer.length > batchSize) {
|
||||
for (const batch of batchIterate(entryBuffer, batchSize)) {
|
||||
await insertFn(batch);
|
||||
}
|
||||
entryBuffer.length = 0;
|
||||
}
|
||||
},
|
||||
async flush() {
|
||||
logger.info('Flushing remaining data...');
|
||||
if (entryBuffer.length > 0) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer = [];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const populateBatchInserters = async (db: PgWriteStore) => {
|
||||
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
|
||||
batchSize: 100,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting blocks...');
|
||||
return db.insertBlockBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbBlockBatchInserter);
|
||||
|
||||
const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
|
||||
batchSize: 200,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting microblocks...');
|
||||
return db.insertMicroblock(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbMicroblockBatchInserter);
|
||||
|
||||
const dbTxBatchInserter = createBatchInserter<DbTx>({
|
||||
batchSize: 1000,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting txs...');
|
||||
return db.insertTxBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbTxBatchInserter);
|
||||
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data: CoreNodeBlockMessage, _encoding, next) => {
|
||||
|
||||
let dbData;
|
||||
try {
|
||||
dbData = parseNewBlockMessage(chainID, data);
|
||||
} catch (err) {
|
||||
logger.error('Error when parsing new_block event');
|
||||
console.error(err);
|
||||
|
||||
throw err;
|
||||
}
|
||||
|
||||
const insertTxs = async (dbData: any) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbTxBatchInserter.push([entry.tx]);
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
// Insert blocks
|
||||
dbBlockBatchInserter.push([dbData.block]),
|
||||
// Insert microblocks
|
||||
dbMicroblockBatchInserter.push(dbData.microblocks),
|
||||
// Insert Txs
|
||||
insertTxs(dbData)
|
||||
]);
|
||||
|
||||
next();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const transformDataToJSON = async () => {
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
transform: async (data, _encoding, callback) => {
|
||||
callback(null, JSON.parse(data.payload));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
export const insertNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
|
||||
logger.info(`Inserting NEW_BLOCK events to db...`);
|
||||
|
||||
await timeTracker.track('insertNewBlockEvents', async () => {
|
||||
const payload = await dataset.newBlockEventsOrderedPayloadStream();
|
||||
const toJSON = await transformDataToJSON();
|
||||
const insertBatchData = await populateBatchInserters(db);
|
||||
|
||||
await pipeline(
|
||||
Readable.from(payload),
|
||||
toJSON,
|
||||
insertBatchData
|
||||
.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
})
|
||||
)
|
||||
});
|
||||
};
|
||||
17
src/event-replay/parquet-based/post-replay-db.ts
Normal file
17
src/event-replay/parquet-based/post-replay-db.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { logger } from '../../logger';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
|
||||
(async () => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'post-event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
// Refreshing materialized views
|
||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||
await db.finishEventReplay();
|
||||
})().catch(err => {
|
||||
throw err;
|
||||
});
|
||||
47
src/event-replay/parquet-based/pre-replay-db.ts
Normal file
47
src/event-replay/parquet-based/pre-replay-db.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import { logger } from '../../logger';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
(async () => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'pre-event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
|
||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Migrating tables');
|
||||
try {
|
||||
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
throw new Error('DB migration cycle failed');
|
||||
}
|
||||
|
||||
const dbName = db.sql.options.database;
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
logger.info(
|
||||
{ component: 'event-replay' },
|
||||
'Disabling indexes and constraints to speed up insertion'
|
||||
);
|
||||
await db.toggleTableIndexes(db.sql, tables, false);
|
||||
logger.info({ component: 'event-replay' }, `Indexes disabled on tables: ${tables.join(', ')}`);
|
||||
})().catch(err => {
|
||||
throw err;
|
||||
});
|
||||
@@ -36,6 +36,8 @@ import {
|
||||
DataStoreAttachmentData,
|
||||
DbPox2Event,
|
||||
DbTxStatus,
|
||||
DbBnsSubdomain,
|
||||
DataStoreBnsBlockData,
|
||||
} from '../datastore/common';
|
||||
import {
|
||||
getTxSenderAddress,
|
||||
@@ -58,6 +60,8 @@ import {
|
||||
parseNameFromContractEvent,
|
||||
parseNameRenewalWithNoZonefileHashFromContractCall,
|
||||
parseNamespaceFromContractEvent,
|
||||
parseZoneFileTxt,
|
||||
parseResolver,
|
||||
} from './bns/bns-helpers';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import {
|
||||
@@ -69,6 +73,7 @@ import { handleBnsImport } from '../import-v1';
|
||||
import { Pox2ContractIdentifer } from '../pox-helpers';
|
||||
import { decodePox2PrintEvent } from './pox2-event-parsing';
|
||||
import { logger, loggerMiddleware } from '../logger';
|
||||
import * as zoneFileParser from 'zone-file';
|
||||
|
||||
export const IBD_PRUNABLE_ROUTES = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks'];
|
||||
|
||||
@@ -1075,3 +1080,94 @@ export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage
|
||||
|
||||
return dbData;
|
||||
}
|
||||
|
||||
export function parseAttachmentMessage(msg: CoreNodeAttachmentMessage[]) {
|
||||
const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = [];
|
||||
|
||||
const subdomainObj: {
|
||||
attachmentData: DataStoreAttachmentData;
|
||||
dbBnsSubdomain: DbBnsSubdomain[];
|
||||
} = {
|
||||
attachmentData: {
|
||||
op: '',
|
||||
name: '',
|
||||
namespace: '',
|
||||
zonefile: '',
|
||||
zonefileHash: '',
|
||||
txId: '',
|
||||
indexBlockHash: '',
|
||||
blockHeight: 0,
|
||||
},
|
||||
dbBnsSubdomain: [],
|
||||
};
|
||||
|
||||
for (const attachment of msg) {
|
||||
if (
|
||||
attachment.contract_id === BnsContractIdentifier.mainnet ||
|
||||
attachment.contract_id === BnsContractIdentifier.testnet
|
||||
) {
|
||||
const metadataCV = decodeClarityValue<
|
||||
ClarityValueTuple<{
|
||||
op: ClarityValueStringAscii;
|
||||
name: ClarityValueBuffer;
|
||||
namespace: ClarityValueBuffer;
|
||||
}>
|
||||
>(attachment.metadata);
|
||||
const op = metadataCV.data['op'].data;
|
||||
const zonefile = Buffer.from(attachment.content.slice(2), 'hex').toString();
|
||||
const zonefileHash = attachment.content_hash;
|
||||
zoneFiles.push({
|
||||
zonefile,
|
||||
zonefileHash,
|
||||
txId: attachment.tx_id,
|
||||
});
|
||||
if (op === 'name-update') {
|
||||
const name = hexToBuffer(metadataCV.data['name'].buffer).toString('utf8');
|
||||
const namespace = hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8');
|
||||
const zoneFileContents = zoneFileParser.parseZoneFile(zonefile);
|
||||
const zoneFileTxt = zoneFileContents.txt;
|
||||
// Case for subdomain
|
||||
if (zoneFileTxt) {
|
||||
for (let i = 0; i < zoneFileTxt.length; i++) {
|
||||
const zoneFile = zoneFileTxt[i];
|
||||
const parsedTxt = parseZoneFileTxt(zoneFile.txt);
|
||||
if (parsedTxt.owner === '') continue; //if txt has no owner , skip it
|
||||
const subdomain: DbBnsSubdomain = {
|
||||
name: name.concat('.', namespace),
|
||||
namespace_id: namespace,
|
||||
fully_qualified_subdomain: zoneFile.name.concat('.', name, '.', namespace),
|
||||
owner: parsedTxt.owner,
|
||||
zonefile_hash: parsedTxt.zoneFileHash,
|
||||
zonefile: parsedTxt.zoneFile,
|
||||
tx_id: attachment.tx_id,
|
||||
tx_index: -1,
|
||||
canonical: true,
|
||||
parent_zonefile_hash: attachment.content_hash.slice(2),
|
||||
parent_zonefile_index: 0, // TODO need to figure out this field
|
||||
block_height: Number.parseInt(attachment.block_height, 10),
|
||||
zonefile_offset: 1,
|
||||
resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '',
|
||||
index_block_hash: attachment.index_block_hash,
|
||||
};
|
||||
|
||||
const attachmentData: DataStoreAttachmentData = {
|
||||
op: op,
|
||||
name: subdomain.name,
|
||||
namespace: '',
|
||||
zonefile: subdomain.zonefile,
|
||||
zonefileHash: subdomain.zonefile_hash,
|
||||
txId: subdomain.tx_id,
|
||||
indexBlockHash: subdomain.index_block_hash || '',
|
||||
blockHeight: subdomain.block_height,
|
||||
};
|
||||
|
||||
subdomainObj.dbBnsSubdomain.push(subdomain);
|
||||
subdomainObj.attachmentData = attachmentData;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { zoneFiles, subdomainObj };
|
||||
}
|
||||
|
||||
26
src/index.ts
26
src/index.ts
@@ -27,7 +27,11 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
|
||||
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
|
||||
import { registerMempoolPromStats } from './datastore/helpers';
|
||||
import { logger } from './logger';
|
||||
import { run } from './event-replay/parquet-based/event-replay';
|
||||
import {
|
||||
ingestAttachmentNew,
|
||||
ingestNewBlock,
|
||||
ingestNewBurnBlock,
|
||||
} from './event-replay/parquet-based/event-replay';
|
||||
|
||||
enum StacksApiMode {
|
||||
/**
|
||||
@@ -280,9 +284,11 @@ function getProgramArgs() {
|
||||
| {
|
||||
operand: 'from-parquet-events';
|
||||
options: {
|
||||
['wipe-db']?: boolean;
|
||||
['disable-indexes']?: boolean;
|
||||
}
|
||||
['new-burn-block']?: boolean;
|
||||
['attachment-new']?: boolean;
|
||||
['new-block']?: boolean;
|
||||
['ids-path']?: string;
|
||||
};
|
||||
};
|
||||
return { args, parsedOpts };
|
||||
}
|
||||
@@ -299,7 +305,17 @@ async function handleProgramArgs() {
|
||||
args.options.force
|
||||
);
|
||||
} else if (args.operand === 'from-parquet-events') {
|
||||
await run(args.options['wipe-db'], args.options['disable-indexes']);
|
||||
if (args.options['new-burn-block']) {
|
||||
await ingestNewBurnBlock();
|
||||
}
|
||||
|
||||
if (args.options['attachment-new']) {
|
||||
await ingestAttachmentNew();
|
||||
}
|
||||
|
||||
if (args.options['new-block']) {
|
||||
await ingestNewBlock(args.options['ids-path']);
|
||||
}
|
||||
} else if (parsedOpts._[0]) {
|
||||
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user