mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: catch pg exceptions on queries outside of express (#1348)
* fix: catch db exceptions in token processor * feat: return 503 on API postgres errors * fix: add postgres.js connection errors * test: token queue and handler * fix: block_count on limit=0 for blocks
This commit is contained in:
@@ -5,8 +5,6 @@ import * as expressWinston from 'express-winston';
|
||||
import * as winston from 'winston';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import * as cors from 'cors';
|
||||
import * as WebSocket from 'ws';
|
||||
import * as SocketIO from 'socket.io';
|
||||
|
||||
import { createTxRouter } from './routes/tx';
|
||||
import { createDebugRouter } from './routes/debug';
|
||||
@@ -47,6 +45,7 @@ import * as fs from 'fs';
|
||||
import { PgStore } from '../datastore/pg-store';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { WebSocketTransmitter } from './routes/ws/web-socket-transmitter';
|
||||
import { isPgConnectionError } from '../datastore/helpers';
|
||||
|
||||
export interface ApiServer {
|
||||
expressApp: express.Express;
|
||||
@@ -300,6 +299,8 @@ export async function startApiServer(opts: {
|
||||
if (error && !res.headersSent) {
|
||||
if (error instanceof InvalidRequestError) {
|
||||
res.status(error.status).json({ error: error.message }).end();
|
||||
} else if (isPgConnectionError(error)) {
|
||||
res.status(503).json({ error: `The database service is unavailable` }).end();
|
||||
} else {
|
||||
res.status(500);
|
||||
const errorTag = uuid();
|
||||
|
||||
@@ -184,6 +184,14 @@ export function isPgConnectionError(error: any): string | false {
|
||||
return 'Postgres connection ENOTFOUND';
|
||||
} else if (error.code === 'ECONNRESET') {
|
||||
return 'Postgres connection ECONNRESET';
|
||||
} else if (error.code === 'CONNECTION_CLOSED') {
|
||||
return 'Postgres connection CONNECTION_CLOSED';
|
||||
} else if (error.code === 'CONNECTION_ENDED') {
|
||||
return 'Postgres connection CONNECTION_ENDED';
|
||||
} else if (error.code === 'CONNECTION_DESTROYED') {
|
||||
return 'Postgres connection CONNECTION_DESTROYED';
|
||||
} else if (error.code === 'CONNECTION_CONNECT_TIMEOUT') {
|
||||
return 'Postgres connection CONNECTION_CONNECT_TIMEOUT';
|
||||
} else if (error.message) {
|
||||
const msg = (error as Error).message.toLowerCase();
|
||||
if (msg.includes('database system is starting up')) {
|
||||
|
||||
@@ -420,7 +420,7 @@ export class PgStore {
|
||||
if (blockHashValues.length === 0) {
|
||||
return {
|
||||
results: [],
|
||||
total: 0,
|
||||
total: block_count,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import { stringCV } from '@stacks/transactions/dist/clarity/types/stringCV';
|
||||
import { getTokenMetadataFetchTimeoutMs } from '../token-metadata/helpers';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { cycleMigrations, runMigrations } from '../datastore/migrations';
|
||||
import { TokensProcessorQueue } from '../token-metadata/tokens-processor-queue';
|
||||
|
||||
const NFT_CONTRACT_ABI: ClarityAbi = {
|
||||
maps: [],
|
||||
@@ -189,6 +190,33 @@ describe('token metadata strict mode', () => {
|
||||
expect(entry.result?.processed).toBe(false);
|
||||
});
|
||||
|
||||
test('db errors are handled gracefully in contract handler', async () => {
|
||||
process.env['STACKS_CORE_RPC_PORT'] = '11111'; // Make node unreachable
|
||||
process.env['STACKS_API_TOKEN_METADATA_STRICT_MODE'] = '1';
|
||||
process.env['STACKS_API_TOKEN_METADATA_MAX_RETRIES'] = '0';
|
||||
const handler = new TokensContractHandler({
|
||||
contractId: contractId,
|
||||
smartContractAbi: NFT_CONTRACT_ABI,
|
||||
datastore: db,
|
||||
chainId: ChainID.Testnet,
|
||||
txId: contractTxId,
|
||||
dbQueueId: 1,
|
||||
});
|
||||
await db.close(); // End connection to trigger postgres error
|
||||
await expect(handler.start()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
test('db errors are handled gracefully in queue', async () => {
|
||||
const queue = new TokensProcessorQueue(db, ChainID.Testnet);
|
||||
await db.close(); // End connection to trigger postgres error
|
||||
await expect(queue.checkDbQueue()).resolves.not.toThrow();
|
||||
await expect(queue.drainDbQueue()).resolves.not.toThrow();
|
||||
await expect(queue.queueNotificationHandler(1)).resolves.not.toThrow();
|
||||
await expect(
|
||||
queue.queueHandler({ queueId: 1, txId: '0x11', contractId: 'test' })
|
||||
).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
test('node runtime errors get retried', async () => {
|
||||
const mockResponse = {
|
||||
okay: false,
|
||||
|
||||
@@ -14,7 +14,6 @@ import {
|
||||
DbNonFungibleTokenMetadata,
|
||||
} from '../datastore/common';
|
||||
import { startApiServer, ApiServer } from '../api/init';
|
||||
import { PoolClient } from 'pg';
|
||||
import * as fs from 'fs';
|
||||
import { EventStreamServer, startEventServer } from '../event-stream/event-server';
|
||||
import { getStacksTestnetNetwork } from '../rosetta-helpers';
|
||||
|
||||
@@ -293,6 +293,13 @@ describe('other tests', () => {
|
||||
expect(result.body.status).toBe('ready');
|
||||
});
|
||||
|
||||
test('database unavailable responses', async () => {
|
||||
// Close connection so we get an error.
|
||||
await db.close();
|
||||
const result = await supertest(api.server).get(`/extended/v1/block/`);
|
||||
expect(result.body.error).toBe('The database service is unavailable');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await api.terminate();
|
||||
await db?.close();
|
||||
|
||||
@@ -149,30 +149,40 @@ export class TokensContractHandler {
|
||||
processingFinished = true;
|
||||
} catch (error) {
|
||||
if (error instanceof RetryableTokenMetadataError) {
|
||||
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
|
||||
if (
|
||||
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
|
||||
retries <= getTokenMetadataMaxRetries()
|
||||
) {
|
||||
logger.info(
|
||||
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
|
||||
);
|
||||
} else {
|
||||
logger.warn(
|
||||
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
|
||||
);
|
||||
try {
|
||||
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
|
||||
if (
|
||||
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
|
||||
retries <= getTokenMetadataMaxRetries()
|
||||
) {
|
||||
logger.info(
|
||||
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
|
||||
);
|
||||
} else {
|
||||
logger.warn(
|
||||
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
|
||||
);
|
||||
processingFinished = true;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
processingFinished = true;
|
||||
}
|
||||
} else {
|
||||
// Something more serious happened, mark this contract as done.
|
||||
logger.error(error);
|
||||
processingFinished = true;
|
||||
}
|
||||
} finally {
|
||||
if (processingFinished) {
|
||||
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
|
||||
logger.info(
|
||||
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
|
||||
);
|
||||
try {
|
||||
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
|
||||
logger.info(
|
||||
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { logError, logger } from '../helpers';
|
||||
import { FoundOrNot, logError, logger } from '../helpers';
|
||||
import { Evt } from 'evt';
|
||||
import PQueue from 'p-queue';
|
||||
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
|
||||
@@ -57,15 +57,18 @@ export class TokensProcessorQueue {
|
||||
return;
|
||||
}
|
||||
const queuedEntries = [...this.queuedEntries.keys()];
|
||||
entries = await this.db.getTokenMetadataQueue(
|
||||
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
|
||||
queuedEntries
|
||||
);
|
||||
try {
|
||||
entries = await this.db.getTokenMetadataQueue(
|
||||
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
|
||||
queuedEntries
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
for (const entry of entries) {
|
||||
await this.queueHandler(entry);
|
||||
}
|
||||
await this.queue.onEmpty();
|
||||
// await this.queue.onIdle();
|
||||
} while (entries.length > 0 || this.queuedEntries.size > 0);
|
||||
}
|
||||
|
||||
@@ -76,10 +79,16 @@ export class TokensProcessorQueue {
|
||||
const queuedEntries = [...this.queuedEntries.keys()];
|
||||
const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
|
||||
if (limit > 0) {
|
||||
const entries = await this.db.getTokenMetadataQueue(
|
||||
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
|
||||
queuedEntries
|
||||
);
|
||||
let entries: DbTokenMetadataQueueEntry[];
|
||||
try {
|
||||
entries = await this.db.getTokenMetadataQueue(
|
||||
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
|
||||
queuedEntries
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return;
|
||||
}
|
||||
for (const entry of entries) {
|
||||
await this.queueHandler(entry);
|
||||
}
|
||||
@@ -87,7 +96,13 @@ export class TokensProcessorQueue {
|
||||
}
|
||||
|
||||
async queueNotificationHandler(queueId: number) {
|
||||
const queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
|
||||
let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
|
||||
try {
|
||||
queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return;
|
||||
}
|
||||
if (queueEntry.found) {
|
||||
await this.queueHandler(queueEntry.result);
|
||||
}
|
||||
@@ -105,8 +120,15 @@ export class TokensProcessorQueue {
|
||||
) {
|
||||
return;
|
||||
}
|
||||
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
|
||||
if (!contractQuery.found || !contractQuery.result.abi) {
|
||||
let abi: string;
|
||||
try {
|
||||
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
|
||||
if (!contractQuery.found || !contractQuery.result.abi) {
|
||||
return;
|
||||
}
|
||||
abi = contractQuery.result.abi;
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return;
|
||||
}
|
||||
logger.info(
|
||||
@@ -114,7 +136,7 @@ export class TokensProcessorQueue {
|
||||
);
|
||||
this.queuedEntries.set(queueEntry.queueId, queueEntry);
|
||||
|
||||
const contractAbi: ClarityAbi = JSON.parse(contractQuery.result.abi);
|
||||
const contractAbi: ClarityAbi = JSON.parse(abi);
|
||||
|
||||
const tokenContractHandler = new TokensContractHandler({
|
||||
contractId: queueEntry.contractId,
|
||||
|
||||
Reference in New Issue
Block a user