fix: ignore out of order microblocks causing API to crash

This commit is contained in:
Matthew Little
2021-09-17 22:42:22 +02:00
committed by GitHub
parent 42402b95cf
commit 1e0b3d0e18
6 changed files with 153 additions and 12 deletions

3
.env
View File

@@ -45,6 +45,9 @@ BTC_FAUCET_PK=29c028009a8331358adcc61bb6397377c995d327ac0343ed8e8f1d4d3ef85c27
TESTNET_SEND_MANY_CONTRACT_ID=STR8P3RD1EHA8AA37ERSSSZSWKS9T2GYQFGXNA4C.send-many-memo
MAINNET_SEND_MANY_CONTRACT_ID=SP3FBR2AGK5H9QBDH3EEN6DF8EK8JY7RX8QJ5SVTE.send-many-memo
# Enable debug logging
# STACKS_API_LOG_LEVEL=debug
# Directory containing Stacks 1.0 BNS data extracted from https://storage.googleapis.com/blockstack-v1-migration-data/export-data.tar.gz
# BNS_IMPORT_DIR=/extracted/export-data-dir/

View File

@@ -526,6 +526,14 @@ export interface RawTxQueryResult {
raw_tx: Buffer;
}
class MicroblockGapError extends Error {
constructor(message: string) {
super(message);
this.message = message;
this.name = this.constructor.name;
}
}
// Enable this when debugging potential sql leaks.
const SQL_QUERY_LEAK_DETECTION = false;
@@ -829,6 +837,23 @@ export class PgDataStore
}
async updateMicroblocks(data: DataStoreMicroblockUpdateData): Promise<void> {
try {
await this.updateMicroblocksInternal(data);
} catch (error) {
if (error instanceof MicroblockGapError) {
// Log and ignore this error for now, see https://github.com/blockstack/stacks-blockchain/issues/2850
// for more details.
// In theory it would be possible for the API to cache out-of-order microblock data and use it to
// restore data in this condition, but it would require several changes to sensitive re-org code,
// as well as introduce a new kind of statefulness and responsibility to the API.
logger.warn(error.message);
} else {
throw error;
}
}
}
async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> {
await this.queryTx(async client => {
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
@@ -903,12 +928,19 @@ export class PgDataStore
// Find any microblocks that have been orphaned by this latest microblock chain tip.
// This function also checks that each microblock parent hash points to an existing microblock in the db.
const currentMicroblockTip = dbMicroblocks[dbMicroblocks.length - 1];
const { orphanedMicroblocks } = await this.findUnanchoredMicroblocksAtChainTip(
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
client,
currentMicroblockTip.parent_index_block_hash,
blockHeight,
currentMicroblockTip
);
if ('microblockGap' in unanchoredMicroblocksAtTip) {
// Throw in order to trigger a SQL tx rollback to undo and db writes so far, but catch, log, and ignore this specific error.
throw new MicroblockGapError(
`Gap in parent microblock stream for ${currentMicroblockTip.microblock_hash}, missing microblock ${unanchoredMicroblocksAtTip.missingMicroblockHash}, the oldest microblock ${unanchoredMicroblocksAtTip.oldestParentMicroblockHash} found in the chain has sequence ${unanchoredMicroblocksAtTip.oldestParentMicroblockSequence} rather than 0`
);
}
const { orphanedMicroblocks } = unanchoredMicroblocksAtTip;
if (orphanedMicroblocks.length > 0) {
// Handle microblocks reorgs here, these _should_ only be micro-forks off the same same
// unanchored chain tip, e.g. a leader orphaning it's own unconfirmed microblocks
@@ -1154,15 +1186,19 @@ export class PgDataStore
}
// Identify microblocks that were either excepted or orphaned by this anchor block.
const {
acceptedMicroblocks,
orphanedMicroblocks,
} = await this.findUnanchoredMicroblocksAtChainTip(
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
client,
blockData.parentIndexBlockHash,
blockData.blockHeight,
acceptedMicroblockTip
);
if ('microblockGap' in unanchoredMicroblocksAtTip) {
throw new Error(
`Gap in parent microblock stream for block ${blockData.blockHash}, missing microblock ${unanchoredMicroblocksAtTip.missingMicroblockHash}, the oldest microblock ${unanchoredMicroblocksAtTip.oldestParentMicroblockHash} found in the chain has sequence ${unanchoredMicroblocksAtTip.oldestParentMicroblockSequence} rather than 0`
);
}
const { acceptedMicroblocks, orphanedMicroblocks } = unanchoredMicroblocksAtTip;
let orphanedMicroblockTxs: DbTx[] = [];
if (orphanedMicroblocks.length > 0) {
@@ -1342,6 +1378,7 @@ export class PgDataStore
* latest unanchored microblock tip. Microblocks that are chained to the given tip are
* returned as accepted, and all others are returned as orphaned/rejected. This function
* only performs the lookup, it does not perform any updates to the db.
* If a gap in the microblock stream is detected, that error information is returned instead.
* @param microblockChainTip - undefined if processing an anchor block that doesn't point to a parent microblock.
*/
async findUnanchoredMicroblocksAtChainTip(
@@ -1349,7 +1386,15 @@ export class PgDataStore
parentIndexBlockHash: string,
blockHeight: number,
microblockChainTip: DbMicroblock | undefined
): Promise<{ acceptedMicroblocks: string[]; orphanedMicroblocks: string[] }> {
): Promise<
| { acceptedMicroblocks: string[]; orphanedMicroblocks: string[] }
| {
microblockGap: true;
missingMicroblockHash: string;
oldestParentMicroblockHash: string;
oldestParentMicroblockSequence: number;
}
> {
// Get any microblocks that this anchor block is responsible for accepting or rejecting.
// Note: we don't filter on `microblock_canonical=true` here because that could have been flipped in a previous anchor block
// which could now be in the process of being re-org'd.
@@ -1376,9 +1421,12 @@ export class PgDataStore
);
// Sanity check that the first microblock in the chain is sequence 0
if (!foundMb && prevMicroblock.microblock_sequence !== 0) {
throw new Error(
`First microblock ${prevMicroblock.microblock_parent_hash} found in the chain has sequence ${prevMicroblock.microblock_sequence}`
);
return {
microblockGap: true,
missingMicroblockHash: prevMicroblock?.microblock_parent_hash,
oldestParentMicroblockHash: prevMicroblock.microblock_hash,
oldestParentMicroblockSequence: prevMicroblock.microblock_sequence,
};
}
prevMicroblock = foundMb;
}

View File

@@ -210,6 +210,9 @@ async function handleMicroblockMessage(
parsedTxs.push(parsedTx);
}
});
parsedTxs.forEach(tx => {
logger.verbose(`Received microblock mined tx: ${tx.core_tx.txid}`);
});
const updateData: DataStoreMicroblockUpdateData = {
microblocks: dbMicroblocks,
txs: parseDataStoreTxEventData(parsedTxs, msg.events, {
@@ -294,6 +297,10 @@ async function handleBlockMessage(
return microblock;
});
parsedTxs.forEach(tx => {
logger.verbose(`Received anchor block mined tx: ${tx.core_tx.txid}`);
});
const dbData: DataStoreBlockUpdateData = {
block: dbBlock,
microblocks: dbMicroblocks,
@@ -313,7 +320,6 @@ function parseDataStoreTxEventData(
}
): DataStoreTxEventData[] {
const dbData: DataStoreTxEventData[] = parsedTxs.map(tx => {
logger.verbose(`Received mined tx: ${tx.core_tx.txid}`);
const dbTx: DataStoreBlockUpdateData['txs'][number] = {
tx: createDbTxFromCoreMsg(tx),
stxEvents: [],
@@ -770,8 +776,15 @@ export async function startEventServer(opts: {
app.postAsync('*', async (req, res) => {
const eventPath = req.path;
const payload = JSON.stringify(req.body);
let payload = JSON.stringify(req.body);
await messageHandler.handleRawEventRequest(eventPath, payload, db);
if (logger.isDebugEnabled()) {
// Skip logging massive event payloads, this _should_ only exclude the genesis block payload which is ~80 MB.
if (payload.length > 10_000_000) {
payload = 'payload body too large for logging';
}
logger.debug(`[stacks-node event] ${eventPath} ${payload}`);
}
});
app.postAsync('/new_block', async (req, res) => {

View File

@@ -146,7 +146,25 @@ type DisabledLogLevels = Exclude<
>;
type LoggerInterface = Omit<winston.Logger, DisabledLogLevels> & { level: LogLevel };
export const defaultLogLevel: LogLevel = isDevEnv || isTestEnv ? 'debug' : 'verbose';
const LOG_LEVELS: LogLevel[] = ['error', 'warn', 'info', 'http', 'verbose', 'debug', 'silly'];
export const defaultLogLevel: LogLevel = (() => {
const STACKS_API_LOG_LEVEL_ENV_VAR = 'STACKS_API_LOG_LEVEL';
const logLevelEnvVar = process.env[
STACKS_API_LOG_LEVEL_ENV_VAR
]?.toLowerCase().trim() as LogLevel;
if (logLevelEnvVar) {
if (LOG_LEVELS.includes(logLevelEnvVar)) {
return logLevelEnvVar;
}
throw new Error(
`Invalid ${STACKS_API_LOG_LEVEL_ENV_VAR}, should be one of ${LOG_LEVELS.join(',')}`
);
}
if (isDevEnv) {
return 'debug';
}
return 'http';
})();
export const logger = winston.createLogger({
level: defaultLogLevel,

File diff suppressed because one or more lines are too long

View File

@@ -41,6 +41,62 @@ describe('microblock tests', () => {
client = await db.pool.connect();
});
test('microblock out of order events', async () => {
// test that the event observer can ingest events with out of order microblocks
await useWithCleanup(
() => {
const origLevel = logger.level;
logger.level = 'error';
return [, () => (logger.level = origLevel)] as const;
},
() => {
const readStream = fs.createReadStream(
'src/tests/event-replay-logs/mainnet-out-of-order-microblock.tsv'
);
const rawEventsIterator = PgDataStore.getRawEventRequests(readStream);
return [rawEventsIterator, () => readStream.close()] as const;
},
async () => {
const eventServer = await startEventServer({
datastore: db,
chainId: ChainID.Mainnet,
serverHost: '127.0.0.1',
serverPort: 0,
httpLogLevel: 'debug',
});
return [eventServer, eventServer.closeAsync] as const;
},
async () => {
const apiServer = await startApiServer({
datastore: db,
chainId: ChainID.Mainnet,
httpLogLevel: 'debug',
});
return [apiServer, apiServer.terminate] as const;
},
async (_, rawEventsIterator, eventServer, api) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
}
// test that the out-of-order microblocks were not stored
const mbHash1 = '0xb714e75a7dae26fee0e77788317a0c84e513d1d8647a376b21b1c864e55c135a';
const mbResult1 = await supertest(api.server).get(`/extended/v1/microblock/${mbHash1}`);
expect(mbResult1.status).toBe(404);
const mbHash2 = '0xab9112694f13f7b04996d4b4554af5b5890271fa4e0c9099e67353b42dcf9989';
const mbResult2 = await supertest(api.server).get(`/extended/v1/microblock/${mbHash2}`);
}
);
});
test('microblock re-org scenario 1', async () => {
const lostTx = '0x03484817283a83a0b0c23e84c2659f39c9a06d81a63329464d979ec2af476596';
const canonicalBlockHash = '0x4d27059a847f3c3f6dbbd43343d11981b67409a2710597c6cb1814945cfc4d48';