mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
This commit is contained in:
@@ -1034,8 +1034,8 @@ export class PgDataStore
|
||||
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
|
||||
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
|
||||
if (removedTxsResult.removedTxs.length > 0) {
|
||||
logger.debug(
|
||||
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table`
|
||||
logger.verbose(
|
||||
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion`
|
||||
);
|
||||
}
|
||||
});
|
||||
@@ -1068,7 +1068,9 @@ export class PgDataStore
|
||||
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
|
||||
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
|
||||
if (removedTxsResult.removedTxs.length > 0) {
|
||||
logger.debug(`Removed ${removedTxsResult.removedTxs.length} txs from mempool table`);
|
||||
logger.verbose(
|
||||
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during new block ingestion`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1110,6 +1112,8 @@ export class PgDataStore
|
||||
data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
|
||||
data.block.execution_cost_write_length = totalCost.execution_cost_write_length;
|
||||
|
||||
let batchedTxData: DataStoreTxEventData[] = data.txs;
|
||||
|
||||
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
|
||||
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
|
||||
if (data.microblocks.length > 0) {
|
||||
@@ -1136,44 +1140,50 @@ export class PgDataStore
|
||||
const missingTxs = data.txs.filter(entry =>
|
||||
missingMicroblockHashes.has(entry.tx.microblock_hash)
|
||||
);
|
||||
// TODO(mb): the microblock code after this line should take into account this already inserted confirmed microblock data,
|
||||
// right now it performs redundant updates, blindly treating all microblock txs as unconfirmed.
|
||||
await this.insertMicroblockData(client, missingMicroblocks, missingTxs);
|
||||
|
||||
// Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts.
|
||||
batchedTxData = batchedTxData.filter(entry => {
|
||||
return !missingMicroblockHashes.has(entry.tx.microblock_hash);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let batchedTxData: DataStoreTxEventData[] = data.txs;
|
||||
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
|
||||
client,
|
||||
{
|
||||
isCanonical: isCanonical,
|
||||
blockHeight: data.block.block_height,
|
||||
blockHash: data.block.block_hash,
|
||||
indexBlockHash: data.block.index_block_hash,
|
||||
parentIndexBlockHash: data.block.parent_index_block_hash,
|
||||
parentMicroblockHash: data.block.parent_microblock_hash,
|
||||
parentMicroblockSequence: data.block.parent_microblock_sequence,
|
||||
burnBlockTime: data.block.burn_block_time,
|
||||
}
|
||||
);
|
||||
// When processing an immediately-non-canonical block, do not orphan and possible existing microblocks
|
||||
// which may be still considered canonical by the canonical block at this height.
|
||||
if (isCanonical) {
|
||||
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
|
||||
client,
|
||||
{
|
||||
isCanonical: isCanonical,
|
||||
blockHeight: data.block.block_height,
|
||||
blockHash: data.block.block_hash,
|
||||
indexBlockHash: data.block.index_block_hash,
|
||||
parentIndexBlockHash: data.block.parent_index_block_hash,
|
||||
parentMicroblockHash: data.block.parent_microblock_hash,
|
||||
parentMicroblockSequence: data.block.parent_microblock_sequence,
|
||||
burnBlockTime: data.block.burn_block_time,
|
||||
}
|
||||
);
|
||||
|
||||
// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
|
||||
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
|
||||
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
|
||||
);
|
||||
const restoredMempoolTxs = await this.restoreMempoolTxs(
|
||||
client,
|
||||
orphanedAndMissingTxs.map(tx => tx.tx_id)
|
||||
);
|
||||
restoredMempoolTxs.restoredTxs.forEach(txId => {
|
||||
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
|
||||
});
|
||||
// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
|
||||
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
|
||||
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
|
||||
);
|
||||
const restoredMempoolTxs = await this.restoreMempoolTxs(
|
||||
client,
|
||||
orphanedAndMissingTxs.map(tx => tx.tx_id)
|
||||
);
|
||||
restoredMempoolTxs.restoredTxs.forEach(txId => {
|
||||
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
|
||||
});
|
||||
|
||||
// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
|
||||
batchedTxData = data.txs.filter(entry => {
|
||||
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
|
||||
return !matchingTx;
|
||||
});
|
||||
// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
|
||||
batchedTxData = batchedTxData.filter(entry => {
|
||||
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
|
||||
return !matchingTx;
|
||||
});
|
||||
}
|
||||
|
||||
// TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary
|
||||
|
||||
@@ -1267,7 +1277,12 @@ export class PgDataStore
|
||||
parentMicroblockSequence: number;
|
||||
burnBlockTime: number;
|
||||
}
|
||||
): Promise<{ acceptedMicroblockTxs: DbTx[]; orphanedMicroblockTxs: DbTx[] }> {
|
||||
): Promise<{
|
||||
acceptedMicroblockTxs: DbTx[];
|
||||
orphanedMicroblockTxs: DbTx[];
|
||||
acceptedMicroblocks: string[];
|
||||
orphanedMicroblocks: string[];
|
||||
}> {
|
||||
// Find the parent microblock if this anchor block points to one. If not, perform a sanity check for expected block headers in this case:
|
||||
// > Anchored blocks that do not have parent microblock streams will have their parent microblock header hashes set to all 0's, and the parent microblock sequence number set to 0.
|
||||
let acceptedMicroblockTip: DbMicroblock | undefined;
|
||||
@@ -1294,7 +1309,7 @@ export class PgDataStore
|
||||
acceptedMicroblockTip = this.parseMicroblockQueryResult(microblockTipQuery.rows[0]);
|
||||
}
|
||||
|
||||
// Identify microblocks that were either excepted or orphaned by this anchor block.
|
||||
// Identify microblocks that were either accepted or orphaned by this anchor block.
|
||||
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
|
||||
client,
|
||||
blockData.parentIndexBlockHash,
|
||||
@@ -1337,6 +1352,8 @@ export class PgDataStore
|
||||
return {
|
||||
acceptedMicroblockTxs,
|
||||
orphanedMicroblockTxs,
|
||||
acceptedMicroblocks,
|
||||
orphanedMicroblocks,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1454,8 +1471,17 @@ export class PgDataStore
|
||||
args.microblocks.map(mb => hexToBuffer(mb)),
|
||||
]
|
||||
);
|
||||
|
||||
// Any txs restored need to be pruned from the mempool
|
||||
const updatedMbTxs = updatedMbTxsQuery.rows.map(r => this.parseTxQueryResult(r));
|
||||
const txsToPrune = updatedMbTxs
|
||||
.filter(tx => tx.canonical && tx.microblock_canonical)
|
||||
.map(tx => tx.tx_id);
|
||||
const removedTxsResult = await this.pruneMempoolTxs(client, txsToPrune);
|
||||
if (removedTxsResult.removedTxs.length > 0) {
|
||||
logger.verbose(
|
||||
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during micro-reorg handling`
|
||||
);
|
||||
}
|
||||
|
||||
// Update the `index_block_hash` and `microblock_canonical` properties on all the tables containing other
|
||||
// microblock-tx metadata that have been accepted or orphaned in this anchor block.
|
||||
@@ -1902,29 +1928,6 @@ export class PgDataStore
|
||||
canonical: boolean,
|
||||
updatedEntities: UpdatedEntities
|
||||
): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> {
|
||||
const microblockResult = await client.query<{ microblock_hash: Buffer }>(
|
||||
`
|
||||
UPDATE microblocks
|
||||
SET canonical = $2
|
||||
WHERE index_block_hash = $1 AND canonical != $2
|
||||
RETURNING microblock_hash
|
||||
`,
|
||||
[indexBlockHash, canonical]
|
||||
);
|
||||
const microblockHashes = microblockResult.rows.map(row =>
|
||||
bufferToHexPrefixString(row.microblock_hash)
|
||||
);
|
||||
if (canonical) {
|
||||
updatedEntities.markedCanonical.microblocks += microblockResult.rowCount;
|
||||
} else {
|
||||
updatedEntities.markedNonCanonical.microblocks += microblockResult.rowCount;
|
||||
}
|
||||
for (const microblockHash of microblockHashes) {
|
||||
logger.verbose(
|
||||
`Marked microblock as ${canonical ? 'canonical' : 'non-canonical'}: ${microblockHash}`
|
||||
);
|
||||
}
|
||||
|
||||
const txResult = await client.query<TxQueryResult>(
|
||||
`
|
||||
UPDATE txs
|
||||
@@ -2118,18 +2121,6 @@ export class PgDataStore
|
||||
}
|
||||
updatedEntities.markedCanonical.blocks++;
|
||||
|
||||
const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
|
||||
await this.updateMicroCanonical(client, {
|
||||
isCanonical: true,
|
||||
blockHeight: restoredBlock.block_height,
|
||||
blockHash: restoredBlock.block_hash,
|
||||
indexBlockHash: restoredBlock.index_block_hash,
|
||||
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
|
||||
parentMicroblockHash: restoredBlock.parent_microblock_hash,
|
||||
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
|
||||
burnBlockTime: restoredBlock.burn_block_time,
|
||||
});
|
||||
|
||||
const orphanedBlockResult = await client.query<BlockQueryResult>(
|
||||
`
|
||||
-- orphan the now conflicting block at the same height
|
||||
@@ -2140,10 +2131,14 @@ export class PgDataStore
|
||||
`,
|
||||
[restoredBlockResult.rows[0].block_height, indexBlockHash]
|
||||
);
|
||||
|
||||
const microblocksOrphaned = new Set<string>();
|
||||
const microblocksAccepted = new Set<string>();
|
||||
|
||||
if (orphanedBlockResult.rowCount > 0) {
|
||||
const orphanedBlocks = orphanedBlockResult.rows.map(b => this.parseBlockQueryResult(b));
|
||||
for (const orphanedBlock of orphanedBlocks) {
|
||||
await this.updateMicroCanonical(client, {
|
||||
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
|
||||
isCanonical: false,
|
||||
blockHeight: orphanedBlock.block_height,
|
||||
blockHash: orphanedBlock.block_hash,
|
||||
@@ -2153,6 +2148,14 @@ export class PgDataStore
|
||||
parentMicroblockSequence: orphanedBlock.parent_microblock_sequence,
|
||||
burnBlockTime: orphanedBlock.burn_block_time,
|
||||
});
|
||||
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
|
||||
microblocksOrphaned.add(mb);
|
||||
microblocksAccepted.delete(mb);
|
||||
});
|
||||
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
|
||||
microblocksOrphaned.delete(mb);
|
||||
microblocksAccepted.add(mb);
|
||||
});
|
||||
}
|
||||
|
||||
updatedEntities.markedNonCanonical.blocks++;
|
||||
@@ -2165,14 +2168,49 @@ export class PgDataStore
|
||||
await this.restoreMempoolTxs(client, markNonCanonicalResult.txsMarkedNonCanonical);
|
||||
}
|
||||
|
||||
// The canonical microblock tables _must_ be restored _after_ orphaning all other blocks at a given height,
|
||||
// because there is only 1 row per microblock hash, and both the orphaned blocks at this height and the
|
||||
// canonical block can be pointed to the same microblocks.
|
||||
const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
|
||||
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
|
||||
isCanonical: true,
|
||||
blockHeight: restoredBlock.block_height,
|
||||
blockHash: restoredBlock.block_hash,
|
||||
indexBlockHash: restoredBlock.index_block_hash,
|
||||
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
|
||||
parentMicroblockHash: restoredBlock.parent_microblock_hash,
|
||||
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
|
||||
burnBlockTime: restoredBlock.burn_block_time,
|
||||
});
|
||||
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
|
||||
microblocksOrphaned.add(mb);
|
||||
microblocksAccepted.delete(mb);
|
||||
});
|
||||
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
|
||||
microblocksOrphaned.delete(mb);
|
||||
microblocksAccepted.add(mb);
|
||||
});
|
||||
updatedEntities.markedCanonical.microblocks += microblocksAccepted.size;
|
||||
updatedEntities.markedNonCanonical.microblocks += microblocksOrphaned.size;
|
||||
|
||||
microblocksOrphaned.forEach(mb => logger.verbose(`Marked microblock as non-canonical: ${mb}`));
|
||||
microblocksAccepted.forEach(mb => logger.verbose(`Marked microblock as canonical: ${mb}`));
|
||||
|
||||
const markCanonicalResult = await this.markEntitiesCanonical(
|
||||
client,
|
||||
indexBlockHash,
|
||||
true,
|
||||
updatedEntities
|
||||
);
|
||||
await this.pruneMempoolTxs(client, markCanonicalResult.txsMarkedCanonical);
|
||||
|
||||
const removedTxsResult = await this.pruneMempoolTxs(
|
||||
client,
|
||||
markCanonicalResult.txsMarkedCanonical
|
||||
);
|
||||
if (removedTxsResult.removedTxs.length > 0) {
|
||||
logger.verbose(
|
||||
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during reorg handling`
|
||||
);
|
||||
}
|
||||
const parentResult = await client.query<{ index_block_hash: Buffer }>(
|
||||
`
|
||||
-- check if the parent block is also orphaned
|
||||
|
||||
Reference in New Issue
Block a user