fix: catch pg exceptions on queries outside of express (#1348)

* fix: catch db exceptions in token processor

* feat: return 503 on API postgres errors

* fix: add postgres.js connection errors

* test: token queue and handler

* fix: block_count on limit=0 for blocks
This commit is contained in:
Rafael Cárdenas
2022-10-12 10:24:14 -05:00
committed by GitHub
parent fa08a9c9a9
commit 1f07b8587c
8 changed files with 109 additions and 34 deletions

View File

@@ -5,8 +5,6 @@ import * as expressWinston from 'express-winston';
import * as winston from 'winston';
import { v4 as uuid } from 'uuid';
import * as cors from 'cors';
import * as WebSocket from 'ws';
import * as SocketIO from 'socket.io';
import { createTxRouter } from './routes/tx';
import { createDebugRouter } from './routes/debug';
@@ -47,6 +45,7 @@ import * as fs from 'fs';
import { PgStore } from '../datastore/pg-store';
import { PgWriteStore } from '../datastore/pg-write-store';
import { WebSocketTransmitter } from './routes/ws/web-socket-transmitter';
import { isPgConnectionError } from '../datastore/helpers';
export interface ApiServer {
expressApp: express.Express;
@@ -300,6 +299,8 @@ export async function startApiServer(opts: {
if (error && !res.headersSent) {
if (error instanceof InvalidRequestError) {
res.status(error.status).json({ error: error.message }).end();
} else if (isPgConnectionError(error)) {
res.status(503).json({ error: `The database service is unavailable` }).end();
} else {
res.status(500);
const errorTag = uuid();

View File

@@ -184,6 +184,14 @@ export function isPgConnectionError(error: any): string | false {
return 'Postgres connection ENOTFOUND';
} else if (error.code === 'ECONNRESET') {
return 'Postgres connection ECONNRESET';
} else if (error.code === 'CONNECTION_CLOSED') {
return 'Postgres connection CONNECTION_CLOSED';
} else if (error.code === 'CONNECTION_ENDED') {
return 'Postgres connection CONNECTION_ENDED';
} else if (error.code === 'CONNECTION_DESTROYED') {
return 'Postgres connection CONNECTION_DESTROYED';
} else if (error.code === 'CONNECTION_CONNECT_TIMEOUT') {
return 'Postgres connection CONNECTION_CONNECT_TIMEOUT';
} else if (error.message) {
const msg = (error as Error).message.toLowerCase();
if (msg.includes('database system is starting up')) {

View File

@@ -420,7 +420,7 @@ export class PgStore {
if (blockHashValues.length === 0) {
return {
results: [],
total: 0,
total: block_count,
};
}

View File

@@ -12,6 +12,7 @@ import { stringCV } from '@stacks/transactions/dist/clarity/types/stringCV';
import { getTokenMetadataFetchTimeoutMs } from '../token-metadata/helpers';
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { TokensProcessorQueue } from '../token-metadata/tokens-processor-queue';
const NFT_CONTRACT_ABI: ClarityAbi = {
maps: [],
@@ -189,6 +190,33 @@ describe('token metadata strict mode', () => {
expect(entry.result?.processed).toBe(false);
});
test('db errors are handled gracefully in contract handler', async () => {
process.env['STACKS_CORE_RPC_PORT'] = '11111'; // Make node unreachable
process.env['STACKS_API_TOKEN_METADATA_STRICT_MODE'] = '1';
process.env['STACKS_API_TOKEN_METADATA_MAX_RETRIES'] = '0';
const handler = new TokensContractHandler({
contractId: contractId,
smartContractAbi: NFT_CONTRACT_ABI,
datastore: db,
chainId: ChainID.Testnet,
txId: contractTxId,
dbQueueId: 1,
});
await db.close(); // End connection to trigger postgres error
await expect(handler.start()).resolves.not.toThrow();
});
test('db errors are handled gracefully in queue', async () => {
const queue = new TokensProcessorQueue(db, ChainID.Testnet);
await db.close(); // End connection to trigger postgres error
await expect(queue.checkDbQueue()).resolves.not.toThrow();
await expect(queue.drainDbQueue()).resolves.not.toThrow();
await expect(queue.queueNotificationHandler(1)).resolves.not.toThrow();
await expect(
queue.queueHandler({ queueId: 1, txId: '0x11', contractId: 'test' })
).resolves.not.toThrow();
});
test('node runtime errors get retried', async () => {
const mockResponse = {
okay: false,

View File

@@ -14,7 +14,6 @@ import {
DbNonFungibleTokenMetadata,
} from '../datastore/common';
import { startApiServer, ApiServer } from '../api/init';
import { PoolClient } from 'pg';
import * as fs from 'fs';
import { EventStreamServer, startEventServer } from '../event-stream/event-server';
import { getStacksTestnetNetwork } from '../rosetta-helpers';

View File

@@ -293,6 +293,13 @@ describe('other tests', () => {
expect(result.body.status).toBe('ready');
});
test('database unavailable responses', async () => {
// Close connection so we get an error.
await db.close();
const result = await supertest(api.server).get(`/extended/v1/block/`);
expect(result.body.error).toBe('The database service is unavailable');
});
afterEach(async () => {
await api.terminate();
await db?.close();

View File

@@ -149,30 +149,40 @@ export class TokensContractHandler {
processingFinished = true;
} catch (error) {
if (error instanceof RetryableTokenMetadataError) {
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
if (
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
retries <= getTokenMetadataMaxRetries()
) {
logger.info(
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
);
} else {
logger.warn(
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
);
try {
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
if (
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
retries <= getTokenMetadataMaxRetries()
) {
logger.info(
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
);
} else {
logger.warn(
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
);
processingFinished = true;
}
} catch (error) {
logger.error(error);
processingFinished = true;
}
} else {
// Something more serious happened, mark this contract as done.
logger.error(error);
processingFinished = true;
}
} finally {
if (processingFinished) {
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
logger.info(
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
);
try {
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
logger.info(
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
);
} catch (error) {
logger.error(error);
}
}
}
}

View File

@@ -1,4 +1,4 @@
import { logError, logger } from '../helpers';
import { FoundOrNot, logError, logger } from '../helpers';
import { Evt } from 'evt';
import PQueue from 'p-queue';
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
@@ -57,15 +57,18 @@ export class TokensProcessorQueue {
return;
}
const queuedEntries = [...this.queuedEntries.keys()];
entries = await this.db.getTokenMetadataQueue(
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
queuedEntries
);
try {
entries = await this.db.getTokenMetadataQueue(
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
queuedEntries
);
} catch (error) {
logger.error(error);
}
for (const entry of entries) {
await this.queueHandler(entry);
}
await this.queue.onEmpty();
// await this.queue.onIdle();
} while (entries.length > 0 || this.queuedEntries.size > 0);
}
@@ -76,10 +79,16 @@ export class TokensProcessorQueue {
const queuedEntries = [...this.queuedEntries.keys()];
const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
if (limit > 0) {
const entries = await this.db.getTokenMetadataQueue(
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
queuedEntries
);
let entries: DbTokenMetadataQueueEntry[];
try {
entries = await this.db.getTokenMetadataQueue(
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
queuedEntries
);
} catch (error) {
logger.error(error);
return;
}
for (const entry of entries) {
await this.queueHandler(entry);
}
@@ -87,7 +96,13 @@ export class TokensProcessorQueue {
}
async queueNotificationHandler(queueId: number) {
const queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
try {
queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
} catch (error) {
logger.error(error);
return;
}
if (queueEntry.found) {
await this.queueHandler(queueEntry.result);
}
@@ -105,8 +120,15 @@ export class TokensProcessorQueue {
) {
return;
}
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
if (!contractQuery.found || !contractQuery.result.abi) {
let abi: string;
try {
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
if (!contractQuery.found || !contractQuery.result.abi) {
return;
}
abi = contractQuery.result.abi;
} catch (error) {
logger.error(error);
return;
}
logger.info(
@@ -114,7 +136,7 @@ export class TokensProcessorQueue {
);
this.queuedEntries.set(queueEntry.queueId, queueEntry);
const contractAbi: ClarityAbi = JSON.parse(contractQuery.result.abi);
const contractAbi: ClarityAbi = JSON.parse(abi);
const tokenContractHandler = new TokensContractHandler({
contractId: queueEntry.contractId,