mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: ignore out of order microblocks causing API to crash
This commit is contained in:
3
.env
3
.env
@@ -45,6 +45,9 @@ BTC_FAUCET_PK=29c028009a8331358adcc61bb6397377c995d327ac0343ed8e8f1d4d3ef85c27
|
||||
TESTNET_SEND_MANY_CONTRACT_ID=STR8P3RD1EHA8AA37ERSSSZSWKS9T2GYQFGXNA4C.send-many-memo
|
||||
MAINNET_SEND_MANY_CONTRACT_ID=SP3FBR2AGK5H9QBDH3EEN6DF8EK8JY7RX8QJ5SVTE.send-many-memo
|
||||
|
||||
# Enable debug logging
|
||||
# STACKS_API_LOG_LEVEL=debug
|
||||
|
||||
# Directory containing Stacks 1.0 BNS data extracted from https://storage.googleapis.com/blockstack-v1-migration-data/export-data.tar.gz
|
||||
# BNS_IMPORT_DIR=/extracted/export-data-dir/
|
||||
|
||||
|
||||
@@ -526,6 +526,14 @@ export interface RawTxQueryResult {
|
||||
raw_tx: Buffer;
|
||||
}
|
||||
|
||||
class MicroblockGapError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.message = message;
|
||||
this.name = this.constructor.name;
|
||||
}
|
||||
}
|
||||
|
||||
// Enable this when debugging potential sql leaks.
|
||||
const SQL_QUERY_LEAK_DETECTION = false;
|
||||
|
||||
@@ -829,6 +837,23 @@ export class PgDataStore
|
||||
}
|
||||
|
||||
async updateMicroblocks(data: DataStoreMicroblockUpdateData): Promise<void> {
|
||||
try {
|
||||
await this.updateMicroblocksInternal(data);
|
||||
} catch (error) {
|
||||
if (error instanceof MicroblockGapError) {
|
||||
// Log and ignore this error for now, see https://github.com/blockstack/stacks-blockchain/issues/2850
|
||||
// for more details.
|
||||
// In theory it would be possible for the API to cache out-of-order microblock data and use it to
|
||||
// restore data in this condition, but it would require several changes to sensitive re-org code,
|
||||
// as well as introduce a new kind of statefulness and responsibility to the API.
|
||||
logger.warn(error.message);
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> {
|
||||
await this.queryTx(async client => {
|
||||
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
|
||||
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
|
||||
@@ -903,12 +928,19 @@ export class PgDataStore
|
||||
// Find any microblocks that have been orphaned by this latest microblock chain tip.
|
||||
// This function also checks that each microblock parent hash points to an existing microblock in the db.
|
||||
const currentMicroblockTip = dbMicroblocks[dbMicroblocks.length - 1];
|
||||
const { orphanedMicroblocks } = await this.findUnanchoredMicroblocksAtChainTip(
|
||||
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
|
||||
client,
|
||||
currentMicroblockTip.parent_index_block_hash,
|
||||
blockHeight,
|
||||
currentMicroblockTip
|
||||
);
|
||||
if ('microblockGap' in unanchoredMicroblocksAtTip) {
|
||||
// Throw in order to trigger a SQL tx rollback to undo and db writes so far, but catch, log, and ignore this specific error.
|
||||
throw new MicroblockGapError(
|
||||
`Gap in parent microblock stream for ${currentMicroblockTip.microblock_hash}, missing microblock ${unanchoredMicroblocksAtTip.missingMicroblockHash}, the oldest microblock ${unanchoredMicroblocksAtTip.oldestParentMicroblockHash} found in the chain has sequence ${unanchoredMicroblocksAtTip.oldestParentMicroblockSequence} rather than 0`
|
||||
);
|
||||
}
|
||||
const { orphanedMicroblocks } = unanchoredMicroblocksAtTip;
|
||||
if (orphanedMicroblocks.length > 0) {
|
||||
// Handle microblocks reorgs here, these _should_ only be micro-forks off the same same
|
||||
// unanchored chain tip, e.g. a leader orphaning it's own unconfirmed microblocks
|
||||
@@ -1154,15 +1186,19 @@ export class PgDataStore
|
||||
}
|
||||
|
||||
// Identify microblocks that were either excepted or orphaned by this anchor block.
|
||||
const {
|
||||
acceptedMicroblocks,
|
||||
orphanedMicroblocks,
|
||||
} = await this.findUnanchoredMicroblocksAtChainTip(
|
||||
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
|
||||
client,
|
||||
blockData.parentIndexBlockHash,
|
||||
blockData.blockHeight,
|
||||
acceptedMicroblockTip
|
||||
);
|
||||
if ('microblockGap' in unanchoredMicroblocksAtTip) {
|
||||
throw new Error(
|
||||
`Gap in parent microblock stream for block ${blockData.blockHash}, missing microblock ${unanchoredMicroblocksAtTip.missingMicroblockHash}, the oldest microblock ${unanchoredMicroblocksAtTip.oldestParentMicroblockHash} found in the chain has sequence ${unanchoredMicroblocksAtTip.oldestParentMicroblockSequence} rather than 0`
|
||||
);
|
||||
}
|
||||
|
||||
const { acceptedMicroblocks, orphanedMicroblocks } = unanchoredMicroblocksAtTip;
|
||||
|
||||
let orphanedMicroblockTxs: DbTx[] = [];
|
||||
if (orphanedMicroblocks.length > 0) {
|
||||
@@ -1342,6 +1378,7 @@ export class PgDataStore
|
||||
* latest unanchored microblock tip. Microblocks that are chained to the given tip are
|
||||
* returned as accepted, and all others are returned as orphaned/rejected. This function
|
||||
* only performs the lookup, it does not perform any updates to the db.
|
||||
* If a gap in the microblock stream is detected, that error information is returned instead.
|
||||
* @param microblockChainTip - undefined if processing an anchor block that doesn't point to a parent microblock.
|
||||
*/
|
||||
async findUnanchoredMicroblocksAtChainTip(
|
||||
@@ -1349,7 +1386,15 @@ export class PgDataStore
|
||||
parentIndexBlockHash: string,
|
||||
blockHeight: number,
|
||||
microblockChainTip: DbMicroblock | undefined
|
||||
): Promise<{ acceptedMicroblocks: string[]; orphanedMicroblocks: string[] }> {
|
||||
): Promise<
|
||||
| { acceptedMicroblocks: string[]; orphanedMicroblocks: string[] }
|
||||
| {
|
||||
microblockGap: true;
|
||||
missingMicroblockHash: string;
|
||||
oldestParentMicroblockHash: string;
|
||||
oldestParentMicroblockSequence: number;
|
||||
}
|
||||
> {
|
||||
// Get any microblocks that this anchor block is responsible for accepting or rejecting.
|
||||
// Note: we don't filter on `microblock_canonical=true` here because that could have been flipped in a previous anchor block
|
||||
// which could now be in the process of being re-org'd.
|
||||
@@ -1376,9 +1421,12 @@ export class PgDataStore
|
||||
);
|
||||
// Sanity check that the first microblock in the chain is sequence 0
|
||||
if (!foundMb && prevMicroblock.microblock_sequence !== 0) {
|
||||
throw new Error(
|
||||
`First microblock ${prevMicroblock.microblock_parent_hash} found in the chain has sequence ${prevMicroblock.microblock_sequence}`
|
||||
);
|
||||
return {
|
||||
microblockGap: true,
|
||||
missingMicroblockHash: prevMicroblock?.microblock_parent_hash,
|
||||
oldestParentMicroblockHash: prevMicroblock.microblock_hash,
|
||||
oldestParentMicroblockSequence: prevMicroblock.microblock_sequence,
|
||||
};
|
||||
}
|
||||
prevMicroblock = foundMb;
|
||||
}
|
||||
|
||||
@@ -210,6 +210,9 @@ async function handleMicroblockMessage(
|
||||
parsedTxs.push(parsedTx);
|
||||
}
|
||||
});
|
||||
parsedTxs.forEach(tx => {
|
||||
logger.verbose(`Received microblock mined tx: ${tx.core_tx.txid}`);
|
||||
});
|
||||
const updateData: DataStoreMicroblockUpdateData = {
|
||||
microblocks: dbMicroblocks,
|
||||
txs: parseDataStoreTxEventData(parsedTxs, msg.events, {
|
||||
@@ -294,6 +297,10 @@ async function handleBlockMessage(
|
||||
return microblock;
|
||||
});
|
||||
|
||||
parsedTxs.forEach(tx => {
|
||||
logger.verbose(`Received anchor block mined tx: ${tx.core_tx.txid}`);
|
||||
});
|
||||
|
||||
const dbData: DataStoreBlockUpdateData = {
|
||||
block: dbBlock,
|
||||
microblocks: dbMicroblocks,
|
||||
@@ -313,7 +320,6 @@ function parseDataStoreTxEventData(
|
||||
}
|
||||
): DataStoreTxEventData[] {
|
||||
const dbData: DataStoreTxEventData[] = parsedTxs.map(tx => {
|
||||
logger.verbose(`Received mined tx: ${tx.core_tx.txid}`);
|
||||
const dbTx: DataStoreBlockUpdateData['txs'][number] = {
|
||||
tx: createDbTxFromCoreMsg(tx),
|
||||
stxEvents: [],
|
||||
@@ -770,8 +776,15 @@ export async function startEventServer(opts: {
|
||||
|
||||
app.postAsync('*', async (req, res) => {
|
||||
const eventPath = req.path;
|
||||
const payload = JSON.stringify(req.body);
|
||||
let payload = JSON.stringify(req.body);
|
||||
await messageHandler.handleRawEventRequest(eventPath, payload, db);
|
||||
if (logger.isDebugEnabled()) {
|
||||
// Skip logging massive event payloads, this _should_ only exclude the genesis block payload which is ~80 MB.
|
||||
if (payload.length > 10_000_000) {
|
||||
payload = 'payload body too large for logging';
|
||||
}
|
||||
logger.debug(`[stacks-node event] ${eventPath} ${payload}`);
|
||||
}
|
||||
});
|
||||
|
||||
app.postAsync('/new_block', async (req, res) => {
|
||||
|
||||
@@ -146,7 +146,25 @@ type DisabledLogLevels = Exclude<
|
||||
>;
|
||||
type LoggerInterface = Omit<winston.Logger, DisabledLogLevels> & { level: LogLevel };
|
||||
|
||||
export const defaultLogLevel: LogLevel = isDevEnv || isTestEnv ? 'debug' : 'verbose';
|
||||
const LOG_LEVELS: LogLevel[] = ['error', 'warn', 'info', 'http', 'verbose', 'debug', 'silly'];
|
||||
export const defaultLogLevel: LogLevel = (() => {
|
||||
const STACKS_API_LOG_LEVEL_ENV_VAR = 'STACKS_API_LOG_LEVEL';
|
||||
const logLevelEnvVar = process.env[
|
||||
STACKS_API_LOG_LEVEL_ENV_VAR
|
||||
]?.toLowerCase().trim() as LogLevel;
|
||||
if (logLevelEnvVar) {
|
||||
if (LOG_LEVELS.includes(logLevelEnvVar)) {
|
||||
return logLevelEnvVar;
|
||||
}
|
||||
throw new Error(
|
||||
`Invalid ${STACKS_API_LOG_LEVEL_ENV_VAR}, should be one of ${LOG_LEVELS.join(',')}`
|
||||
);
|
||||
}
|
||||
if (isDevEnv) {
|
||||
return 'debug';
|
||||
}
|
||||
return 'http';
|
||||
})();
|
||||
|
||||
export const logger = winston.createLogger({
|
||||
level: defaultLogLevel,
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -41,6 +41,62 @@ describe('microblock tests', () => {
|
||||
client = await db.pool.connect();
|
||||
});
|
||||
|
||||
test('microblock out of order events', async () => {
|
||||
// test that the event observer can ingest events with out of order microblocks
|
||||
await useWithCleanup(
|
||||
() => {
|
||||
const origLevel = logger.level;
|
||||
logger.level = 'error';
|
||||
return [, () => (logger.level = origLevel)] as const;
|
||||
},
|
||||
() => {
|
||||
const readStream = fs.createReadStream(
|
||||
'src/tests/event-replay-logs/mainnet-out-of-order-microblock.tsv'
|
||||
);
|
||||
const rawEventsIterator = PgDataStore.getRawEventRequests(readStream);
|
||||
return [rawEventsIterator, () => readStream.close()] as const;
|
||||
},
|
||||
async () => {
|
||||
const eventServer = await startEventServer({
|
||||
datastore: db,
|
||||
chainId: ChainID.Mainnet,
|
||||
serverHost: '127.0.0.1',
|
||||
serverPort: 0,
|
||||
httpLogLevel: 'debug',
|
||||
});
|
||||
return [eventServer, eventServer.closeAsync] as const;
|
||||
},
|
||||
async () => {
|
||||
const apiServer = await startApiServer({
|
||||
datastore: db,
|
||||
chainId: ChainID.Mainnet,
|
||||
httpLogLevel: 'debug',
|
||||
});
|
||||
return [apiServer, apiServer.terminate] as const;
|
||||
},
|
||||
async (_, rawEventsIterator, eventServer, api) => {
|
||||
for await (const rawEvents of rawEventsIterator) {
|
||||
for (const rawEvent of rawEvents) {
|
||||
await httpPostRequest({
|
||||
host: '127.0.0.1',
|
||||
port: eventServer.serverAddress.port,
|
||||
path: rawEvent.event_path,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: Buffer.from(rawEvent.payload, 'utf8'),
|
||||
throwOnNotOK: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
// test that the out-of-order microblocks were not stored
|
||||
const mbHash1 = '0xb714e75a7dae26fee0e77788317a0c84e513d1d8647a376b21b1c864e55c135a';
|
||||
const mbResult1 = await supertest(api.server).get(`/extended/v1/microblock/${mbHash1}`);
|
||||
expect(mbResult1.status).toBe(404);
|
||||
const mbHash2 = '0xab9112694f13f7b04996d4b4554af5b5890271fa4e0c9099e67353b42dcf9989';
|
||||
const mbResult2 = await supertest(api.server).get(`/extended/v1/microblock/${mbHash2}`);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
test('microblock re-org scenario 1', async () => {
|
||||
const lostTx = '0x03484817283a83a0b0c23e84c2659f39c9a06d81a63329464d979ec2af476596';
|
||||
const canonicalBlockHash = '0x4d27059a847f3c3f6dbbd43343d11981b67409a2710597c6cb1814945cfc4d48';
|
||||
|
||||
Reference in New Issue
Block a user