fix: microblock related re-org bug causing txs to be incorrectly orphaned #804 #818

This commit is contained in:
Matthew Little
2021-10-22 21:38:47 +02:00
committed by GitHub
parent 4ade5ee39b
commit bae619d653

View File

@@ -1034,8 +1034,8 @@ export class PgDataStore
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table`
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion`
);
}
});
@@ -1068,7 +1068,9 @@ export class PgDataStore
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(`Removed ${removedTxsResult.removedTxs.length} txs from mempool table`);
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during new block ingestion`
);
}
}
@@ -1110,6 +1112,8 @@ export class PgDataStore
data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
data.block.execution_cost_write_length = totalCost.execution_cost_write_length;
let batchedTxData: DataStoreTxEventData[] = data.txs;
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
if (data.microblocks.length > 0) {
@@ -1136,44 +1140,50 @@ export class PgDataStore
const missingTxs = data.txs.filter(entry =>
missingMicroblockHashes.has(entry.tx.microblock_hash)
);
// TODO(mb): the microblock code after this line should take into account this already inserted confirmed microblock data,
// right now it performs redundant updates, blindly treating all microblock txs as unconfirmed.
await this.insertMicroblockData(client, missingMicroblocks, missingTxs);
// Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = batchedTxData.filter(entry => {
return !missingMicroblockHashes.has(entry.tx.microblock_hash);
});
}
}
let batchedTxData: DataStoreTxEventData[] = data.txs;
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
client,
{
isCanonical: isCanonical,
blockHeight: data.block.block_height,
blockHash: data.block.block_hash,
indexBlockHash: data.block.index_block_hash,
parentIndexBlockHash: data.block.parent_index_block_hash,
parentMicroblockHash: data.block.parent_microblock_hash,
parentMicroblockSequence: data.block.parent_microblock_sequence,
burnBlockTime: data.block.burn_block_time,
}
);
// When processing an immediately-non-canonical block, do not orphan and possible existing microblocks
// which may be still considered canonical by the canonical block at this height.
if (isCanonical) {
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
client,
{
isCanonical: isCanonical,
blockHeight: data.block.block_height,
blockHash: data.block.block_hash,
indexBlockHash: data.block.index_block_hash,
parentIndexBlockHash: data.block.parent_index_block_hash,
parentMicroblockHash: data.block.parent_microblock_hash,
parentMicroblockSequence: data.block.parent_microblock_sequence,
burnBlockTime: data.block.burn_block_time,
}
);
// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
);
const restoredMempoolTxs = await this.restoreMempoolTxs(
client,
orphanedAndMissingTxs.map(tx => tx.tx_id)
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
});
// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
);
const restoredMempoolTxs = await this.restoreMempoolTxs(
client,
orphanedAndMissingTxs.map(tx => tx.tx_id)
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
});
// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = data.txs.filter(entry => {
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
return !matchingTx;
});
// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = batchedTxData.filter(entry => {
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
return !matchingTx;
});
}
// TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary
@@ -1267,7 +1277,12 @@ export class PgDataStore
parentMicroblockSequence: number;
burnBlockTime: number;
}
): Promise<{ acceptedMicroblockTxs: DbTx[]; orphanedMicroblockTxs: DbTx[] }> {
): Promise<{
acceptedMicroblockTxs: DbTx[];
orphanedMicroblockTxs: DbTx[];
acceptedMicroblocks: string[];
orphanedMicroblocks: string[];
}> {
// Find the parent microblock if this anchor block points to one. If not, perform a sanity check for expected block headers in this case:
// > Anchored blocks that do not have parent microblock streams will have their parent microblock header hashes set to all 0's, and the parent microblock sequence number set to 0.
let acceptedMicroblockTip: DbMicroblock | undefined;
@@ -1294,7 +1309,7 @@ export class PgDataStore
acceptedMicroblockTip = this.parseMicroblockQueryResult(microblockTipQuery.rows[0]);
}
// Identify microblocks that were either excepted or orphaned by this anchor block.
// Identify microblocks that were either accepted or orphaned by this anchor block.
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
client,
blockData.parentIndexBlockHash,
@@ -1337,6 +1352,8 @@ export class PgDataStore
return {
acceptedMicroblockTxs,
orphanedMicroblockTxs,
acceptedMicroblocks,
orphanedMicroblocks,
};
}
@@ -1454,8 +1471,17 @@ export class PgDataStore
args.microblocks.map(mb => hexToBuffer(mb)),
]
);
// Any txs restored need to be pruned from the mempool
const updatedMbTxs = updatedMbTxsQuery.rows.map(r => this.parseTxQueryResult(r));
const txsToPrune = updatedMbTxs
.filter(tx => tx.canonical && tx.microblock_canonical)
.map(tx => tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, txsToPrune);
if (removedTxsResult.removedTxs.length > 0) {
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during micro-reorg handling`
);
}
// Update the `index_block_hash` and `microblock_canonical` properties on all the tables containing other
// microblock-tx metadata that have been accepted or orphaned in this anchor block.
@@ -1902,29 +1928,6 @@ export class PgDataStore
canonical: boolean,
updatedEntities: UpdatedEntities
): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> {
const microblockResult = await client.query<{ microblock_hash: Buffer }>(
`
UPDATE microblocks
SET canonical = $2
WHERE index_block_hash = $1 AND canonical != $2
RETURNING microblock_hash
`,
[indexBlockHash, canonical]
);
const microblockHashes = microblockResult.rows.map(row =>
bufferToHexPrefixString(row.microblock_hash)
);
if (canonical) {
updatedEntities.markedCanonical.microblocks += microblockResult.rowCount;
} else {
updatedEntities.markedNonCanonical.microblocks += microblockResult.rowCount;
}
for (const microblockHash of microblockHashes) {
logger.verbose(
`Marked microblock as ${canonical ? 'canonical' : 'non-canonical'}: ${microblockHash}`
);
}
const txResult = await client.query<TxQueryResult>(
`
UPDATE txs
@@ -2118,18 +2121,6 @@ export class PgDataStore
}
updatedEntities.markedCanonical.blocks++;
const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
await this.updateMicroCanonical(client, {
isCanonical: true,
blockHeight: restoredBlock.block_height,
blockHash: restoredBlock.block_hash,
indexBlockHash: restoredBlock.index_block_hash,
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
parentMicroblockHash: restoredBlock.parent_microblock_hash,
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
burnBlockTime: restoredBlock.burn_block_time,
});
const orphanedBlockResult = await client.query<BlockQueryResult>(
`
-- orphan the now conflicting block at the same height
@@ -2140,10 +2131,14 @@ export class PgDataStore
`,
[restoredBlockResult.rows[0].block_height, indexBlockHash]
);
const microblocksOrphaned = new Set<string>();
const microblocksAccepted = new Set<string>();
if (orphanedBlockResult.rowCount > 0) {
const orphanedBlocks = orphanedBlockResult.rows.map(b => this.parseBlockQueryResult(b));
for (const orphanedBlock of orphanedBlocks) {
await this.updateMicroCanonical(client, {
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
isCanonical: false,
blockHeight: orphanedBlock.block_height,
blockHash: orphanedBlock.block_hash,
@@ -2153,6 +2148,14 @@ export class PgDataStore
parentMicroblockSequence: orphanedBlock.parent_microblock_sequence,
burnBlockTime: orphanedBlock.burn_block_time,
});
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
microblocksOrphaned.add(mb);
microblocksAccepted.delete(mb);
});
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
microblocksOrphaned.delete(mb);
microblocksAccepted.add(mb);
});
}
updatedEntities.markedNonCanonical.blocks++;
@@ -2165,14 +2168,49 @@ export class PgDataStore
await this.restoreMempoolTxs(client, markNonCanonicalResult.txsMarkedNonCanonical);
}
// The canonical microblock tables _must_ be restored _after_ orphaning all other blocks at a given height,
// because there is only 1 row per microblock hash, and both the orphaned blocks at this height and the
// canonical block can be pointed to the same microblocks.
const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
isCanonical: true,
blockHeight: restoredBlock.block_height,
blockHash: restoredBlock.block_hash,
indexBlockHash: restoredBlock.index_block_hash,
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
parentMicroblockHash: restoredBlock.parent_microblock_hash,
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
burnBlockTime: restoredBlock.burn_block_time,
});
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
microblocksOrphaned.add(mb);
microblocksAccepted.delete(mb);
});
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
microblocksOrphaned.delete(mb);
microblocksAccepted.add(mb);
});
updatedEntities.markedCanonical.microblocks += microblocksAccepted.size;
updatedEntities.markedNonCanonical.microblocks += microblocksOrphaned.size;
microblocksOrphaned.forEach(mb => logger.verbose(`Marked microblock as non-canonical: ${mb}`));
microblocksAccepted.forEach(mb => logger.verbose(`Marked microblock as canonical: ${mb}`));
const markCanonicalResult = await this.markEntitiesCanonical(
client,
indexBlockHash,
true,
updatedEntities
);
await this.pruneMempoolTxs(client, markCanonicalResult.txsMarkedCanonical);
const removedTxsResult = await this.pruneMempoolTxs(
client,
markCanonicalResult.txsMarkedCanonical
);
if (removedTxsResult.removedTxs.length > 0) {
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during reorg handling`
);
}
const parentResult = await client.query<{ index_block_hash: Buffer }>(
`
-- check if the parent block is also orphaned