feat: add ENV configs for DB close and API shutdown timeouts (#1366)

* style: async/await in postgres classes

* chore: tag read only pg txs

* feat: pg close timeout

* feat: force kill timeout env

* fix: remaining read only

* fix: correct use of scoped sql conns

* fix: resolveOrTimeout promise handling
This commit is contained in:
Rafael Cárdenas
2022-10-20 08:40:01 -05:00
committed by GitHub
parent f15df41fa9
commit 444f008fe2
7 changed files with 180 additions and 117 deletions

6
.env
View File

@@ -9,6 +9,8 @@ PG_SSL=false
# PG_IDLE_TIMEOUT=30 # PG_IDLE_TIMEOUT=30
# Max connection lifetime in seconds, defaults to 60 # Max connection lifetime in seconds, defaults to 60
# PG_MAX_LIFETIME=60 # PG_MAX_LIFETIME=60
# Seconds before force-ending running queries on connection close, defaults to 5
# PG_CLOSE_TIMEOUT=5
# Can be any string, use to specify a use case specific to a deployment # Can be any string, use to specify a use case specific to a deployment
PG_APPLICATION_NAME=stacks-blockchain-api PG_APPLICATION_NAME=stacks-blockchain-api
@@ -33,6 +35,7 @@ PG_APPLICATION_NAME=stacks-blockchain-api
# PG_PRIMARY_SSL= # PG_PRIMARY_SSL=
# PG_PRIMARY_IDLE_TIMEOUT= # PG_PRIMARY_IDLE_TIMEOUT=
# PG_PRIMARY_MAX_LIFETIME= # PG_PRIMARY_MAX_LIFETIME=
# PG_PRIMARY_CLOSE_TIMEOUT=
# The connection URI below can be used in place of the PG variables above, # The connection URI below can be used in place of the PG variables above,
# but if enabled it must be defined without others or omitted. # but if enabled it must be defined without others or omitted.
# PG_PRIMARY_CONNECTION_URI= # PG_PRIMARY_CONNECTION_URI=
@@ -84,6 +87,9 @@ STACKS_CORE_RPC_PORT=20443
## configure the chainID/networkID; testnet: 0x80000000, mainnet: 0x00000001 ## configure the chainID/networkID; testnet: 0x80000000, mainnet: 0x00000001
STACKS_CHAIN_ID=0x00000001 STACKS_CHAIN_ID=0x00000001
# Seconds to allow API components to shut down gracefully before force-killing them, defaults to 60
# STACKS_SHUTDOWN_FORCE_KILL_TIMEOUT=60
BTC_RPC_HOST=http://127.0.0.1 BTC_RPC_HOST=http://127.0.0.1
BTC_RPC_PORT=18443 BTC_RPC_PORT=18443
BTC_RPC_USER=btc BTC_RPC_USER=btc

View File

@@ -115,6 +115,20 @@ export async function connectPostgres({
return sql; return sql;
} }
/**
* Retrieve a postgres ENV value depending on the target database server (read-replica/default or
* primary). We will fall back to read-replica values if a primary value was not given. See the
* `.env` file for more information on these options.
*/
export function getPgConnectionEnvValue(
name: string,
pgServer: PgServer = PgServer.default
): string | undefined {
return pgServer === PgServer.primary
? process.env[`PG_PRIMARY_${name}`] ?? process.env[`PG_${name}`]
: process.env[`PG_${name}`];
}
export function getPostgres({ export function getPostgres({
usageName, usageName,
pgServer, pgServer,
@@ -122,28 +136,21 @@ export function getPostgres({
usageName: string; usageName: string;
pgServer?: PgServer; pgServer?: PgServer;
}): PgSqlClient { }): PgSqlClient {
// Retrieve a postgres ENV value depending on the target database server (read-replica/default or primary).
// We will fall back to read-replica values if a primary value was not given.
// See the `.env` file for more information on these options.
const pgEnvValue = (name: string): string | undefined =>
pgServer === PgServer.primary
? process.env[`PG_PRIMARY_${name}`] ?? process.env[`PG_${name}`]
: process.env[`PG_${name}`];
const pgEnvVars = { const pgEnvVars = {
database: pgEnvValue('DATABASE'), database: getPgConnectionEnvValue('DATABASE', pgServer),
user: pgEnvValue('USER'), user: getPgConnectionEnvValue('USER', pgServer),
password: pgEnvValue('PASSWORD'), password: getPgConnectionEnvValue('PASSWORD', pgServer),
host: pgEnvValue('HOST'), host: getPgConnectionEnvValue('HOST', pgServer),
port: pgEnvValue('PORT'), port: getPgConnectionEnvValue('PORT', pgServer),
ssl: pgEnvValue('SSL'), ssl: getPgConnectionEnvValue('SSL', pgServer),
schema: pgEnvValue('SCHEMA'), schema: getPgConnectionEnvValue('SCHEMA', pgServer),
applicationName: pgEnvValue('APPLICATION_NAME'), applicationName: getPgConnectionEnvValue('APPLICATION_NAME', pgServer),
idleTimeout: parseInt(pgEnvValue('IDLE_TIMEOUT') ?? '30'), idleTimeout: parseInt(getPgConnectionEnvValue('IDLE_TIMEOUT', pgServer) ?? '30'),
maxLifetime: parseInt(pgEnvValue('MAX_LIFETIME') ?? '60'), maxLifetime: parseInt(getPgConnectionEnvValue('MAX_LIFETIME', pgServer) ?? '60'),
poolMax: parseInt(process.env['PG_CONNECTION_POOL_MAX'] ?? '10'), poolMax: parseInt(process.env['PG_CONNECTION_POOL_MAX'] ?? '10'),
}; };
const defaultAppName = 'stacks-blockchain-api'; const defaultAppName = 'stacks-blockchain-api';
const pgConnectionUri = pgEnvValue('CONNECTION_URI'); const pgConnectionUri = getPgConnectionEnvValue('CONNECTION_URI', pgServer);
const pgConfigEnvVar = Object.entries(pgEnvVars).find(([, v]) => typeof v === 'string')?.[0]; const pgConfigEnvVar = Object.entries(pgEnvVars).find(([, v]) => typeof v === 'string')?.[0];
if (pgConfigEnvVar && pgConnectionUri) { if (pgConfigEnvVar && pgConnectionUri) {
throw new Error( throw new Error(

View File

@@ -69,7 +69,7 @@ import {
StxUnlockEvent, StxUnlockEvent,
TransferQueryResult, TransferQueryResult,
} from './common'; } from './common';
import { connectPostgres, PgServer, PgSqlClient } from './connection'; import { connectPostgres, getPgConnectionEnvValue, PgServer, PgSqlClient } from './connection';
import { import {
abiColumn, abiColumn,
BLOCK_COLUMNS, BLOCK_COLUMNS,
@@ -110,6 +110,9 @@ export class PgStore {
readonly sql: PgSqlClient; readonly sql: PgSqlClient;
readonly eventEmitter: PgStoreEventEmitter; readonly eventEmitter: PgStoreEventEmitter;
readonly notifier?: PgNotifier; readonly notifier?: PgNotifier;
protected get closeTimeout(): number {
return parseInt(getPgConnectionEnvValue('CLOSE_TIMEOUT', PgServer.default) ?? '5');
}
constructor(sql: PgSqlClient, notifier: PgNotifier | undefined = undefined) { constructor(sql: PgSqlClient, notifier: PgNotifier | undefined = undefined) {
this.sql = sql; this.sql = sql;
@@ -133,7 +136,7 @@ export class PgStore {
async close(): Promise<void> { async close(): Promise<void> {
await this.notifier?.close(); await this.notifier?.close();
await this.sql.end(); await this.sql.end({ timeout: this.closeTimeout });
} }
/** /**
@@ -211,7 +214,7 @@ export class PgStore {
blockIdentifer: BlockIdentifier, blockIdentifer: BlockIdentifier,
metadata?: DbGetBlockWithMetadataOpts<TWithTxs, TWithMicroblocks> metadata?: DbGetBlockWithMetadataOpts<TWithTxs, TWithMicroblocks>
): Promise<FoundOrNot<DbGetBlockWithMetadataResponse<TWithTxs, TWithMicroblocks>>> { ): Promise<FoundOrNot<DbGetBlockWithMetadataResponse<TWithTxs, TWithMicroblocks>>> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const block = await this.getBlockInternal(sql, blockIdentifer); const block = await this.getBlockInternal(sql, blockIdentifer);
if (!block.found) { if (!block.found) {
return { found: false }; return { found: false };
@@ -377,24 +380,6 @@ export class PgStore {
return { found: true, result: block } as const; return { found: true, result: block } as const;
} }
async getBlocks({ limit, offset }: { limit: number; offset: number }) {
return await this.sql.begin(async sql => {
const total = await sql<{ count: number }[]>`
SELECT block_count AS count FROM chain_tip
`;
const results = await sql<BlockQueryResult[]>`
SELECT ${sql(BLOCK_COLUMNS)}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT ${limit}
OFFSET ${offset}
`;
const parsed = results.map(r => parseBlockQueryResult(r));
return { results: parsed, total: total[0].count } as const;
});
}
/** /**
* Returns Block information with metadata, including accepted and streamed microblocks hash * Returns Block information with metadata, including accepted and streamed microblocks hash
* @returns `BlocksWithMetadata` object including list of Blocks with metadata and total count. * @returns `BlocksWithMetadata` object including list of Blocks with metadata and total count.
@@ -406,9 +391,21 @@ export class PgStore {
limit: number; limit: number;
offset: number; offset: number;
}): Promise<BlocksWithMetadata> { }): Promise<BlocksWithMetadata> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
// get block list // Get blocks with count.
const { results: blocks, total: block_count } = await this.getBlocks({ limit, offset }); const countQuery = await sql<{ count: number }[]>`
SELECT block_count AS count FROM chain_tip
`;
const block_count = countQuery[0].count;
const blocksQuery = await sql<BlockQueryResult[]>`
SELECT ${sql(BLOCK_COLUMNS)}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT ${limit}
OFFSET ${offset}
`;
const blocks = blocksQuery.map(r => parseBlockQueryResult(r));
const blockHashValues: string[] = []; const blockHashValues: string[] = [];
const indexBlockHashValues: string[] = []; const indexBlockHashValues: string[] = [];
blocks.forEach(block => { blocks.forEach(block => {
@@ -502,7 +499,7 @@ export class PgStore {
} }
async getBlockTxsRows(blockHash: string): Promise<FoundOrNot<DbTx[]>> { async getBlockTxsRows(blockHash: string): Promise<FoundOrNot<DbTx[]>> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getBlockInternal(sql, { hash: blockHash }); const blockQuery = await this.getBlockInternal(sql, { hash: blockHash });
if (!blockQuery.found) { if (!blockQuery.found) {
throw new Error(`Could not find block by hash ${blockHash}`); throw new Error(`Could not find block by hash ${blockHash}`);
@@ -525,7 +522,7 @@ export class PgStore {
async getMicroblock(args: { async getMicroblock(args: {
microblockHash: string; microblockHash: string;
}): Promise<FoundOrNot<{ microblock: DbMicroblock; txs: string[] }>> { }): Promise<FoundOrNot<{ microblock: DbMicroblock; txs: string[] }>> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const result = await sql<MicroblockQueryResult[]>` const result = await sql<MicroblockQueryResult[]>`
SELECT ${sql(MICROBLOCK_COLUMNS)} SELECT ${sql(MICROBLOCK_COLUMNS)}
FROM microblocks FROM microblocks
@@ -552,7 +549,7 @@ export class PgStore {
limit: number; limit: number;
offset: number; offset: number;
}): Promise<{ result: { microblock: DbMicroblock; txs: string[] }[]; total: number }> { }): Promise<{ result: { microblock: DbMicroblock; txs: string[] }[]; total: number }> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const countQuery = await sql< const countQuery = await sql<
{ total: number }[] { total: number }[]
>`SELECT microblock_count AS total FROM chain_tip`; >`SELECT microblock_count AS total FROM chain_tip`;
@@ -603,7 +600,7 @@ export class PgStore {
} }
async getUnanchoredTxs(): Promise<{ txs: DbTx[] }> { async getUnanchoredTxs(): Promise<{ txs: DbTx[] }> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
return this.getUnanchoredTxsInternal(sql); return this.getUnanchoredTxsInternal(sql);
}); });
} }
@@ -612,7 +609,7 @@ export class PgStore {
stxAddress: string; stxAddress: string;
blockIdentifier: BlockIdentifier; blockIdentifier: BlockIdentifier;
}): Promise<FoundOrNot<{ lastExecutedTxNonce: number | null; possibleNextNonce: number }>> { }): Promise<FoundOrNot<{ lastExecutedTxNonce: number | null; possibleNextNonce: number }>> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const dbBlock = await this.getBlockInternal(sql, args.blockIdentifier); const dbBlock = await this.getBlockInternal(sql, args.blockIdentifier);
if (!dbBlock.found) { if (!dbBlock.found) {
return { found: false }; return { found: false };
@@ -644,7 +641,7 @@ export class PgStore {
possibleNextNonce: number; possibleNextNonce: number;
detectedMissingNonces: number[]; detectedMissingNonces: number[];
}> { }> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const executedTxNonce = await sql<{ nonce: number | null }[]>` const executedTxNonce = await sql<{ nonce: number | null }[]>`
SELECT MAX(nonce) nonce SELECT MAX(nonce) nonce
FROM txs FROM txs
@@ -790,7 +787,7 @@ export class PgStore {
limit: number, limit: number,
offset: number offset: number
): Promise<FoundOrNot<{ results: DbTx[]; total: number }>> { ): Promise<FoundOrNot<{ results: DbTx[]; total: number }>> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getBlockInternal(sql, blockIdentifer); const blockQuery = await this.getBlockInternal(sql, blockIdentifer);
if (!blockQuery.found) { if (!blockQuery.found) {
return { found: false }; return { found: false };
@@ -951,13 +948,13 @@ export class PgStore {
if (args.txIds.length === 0) { if (args.txIds.length === 0) {
return []; return [];
} }
return this.sql.begin(async client => { return await this.sql.begin('READ ONLY', async sql => {
const result = await this.sql<MempoolTxQueryResult[]>` const result = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(this.sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])} SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
FROM mempool_txs FROM mempool_txs
WHERE tx_id IN ${this.sql(args.txIds)} WHERE tx_id IN ${sql(args.txIds)}
`; `;
return await this.parseMempoolTransactions(result, client, args.includeUnanchored); return await this.parseMempoolTransactions(result, sql, args.includeUnanchored);
}); });
} }
@@ -969,8 +966,8 @@ export class PgStore {
txId: string; txId: string;
includeUnanchored: boolean; includeUnanchored: boolean;
includePruned?: boolean; includePruned?: boolean;
}) { }): Promise<FoundOrNot<DbMempoolTx>> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const result = await sql<MempoolTxQueryResult[]>` const result = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])} SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
FROM mempool_txs FROM mempool_txs
@@ -1014,7 +1011,7 @@ export class PgStore {
limit: number; limit: number;
offset: number; offset: number;
}): Promise<{ results: DbMempoolTx[]; total: number }> { }): Promise<{ results: DbMempoolTx[]; total: number }> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const droppedStatuses = [ const droppedStatuses = [
DbTxStatus.DroppedReplaceByFee, DbTxStatus.DroppedReplaceByFee,
DbTxStatus.DroppedReplaceAcrossFork, DbTxStatus.DroppedReplaceAcrossFork,
@@ -1051,7 +1048,7 @@ export class PgStore {
} }
async getMempoolStats({ lastBlockCount }: { lastBlockCount?: number }): Promise<DbMempoolStats> { async getMempoolStats({ lastBlockCount }: { lastBlockCount?: number }): Promise<DbMempoolStats> {
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
return await this.getMempoolStatsInternal({ sql, lastBlockCount }); return await this.getMempoolStatsInternal({ sql, lastBlockCount });
}); });
} }
@@ -1228,7 +1225,7 @@ export class PgStore {
recipientAddress?: string; recipientAddress?: string;
address?: string; address?: string;
}): Promise<{ results: DbMempoolTx[]; total: number }> { }): Promise<{ results: DbMempoolTx[]; total: number }> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
// If caller did not opt-in to unanchored tx data, then treat unanchored txs as pending mempool txs. // If caller did not opt-in to unanchored tx data, then treat unanchored txs as pending mempool txs.
const unanchoredTxs: string[] = !includeUnanchored const unanchoredTxs: string[] = !includeUnanchored
? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id) ? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id)
@@ -1289,8 +1286,14 @@ export class PgStore {
return { found: true, result: { digest: result[0].digest } }; return { found: true, result: { digest: result[0].digest } };
} }
async getTx({ txId, includeUnanchored }: { txId: string; includeUnanchored: boolean }) { async getTx({
return this.sql.begin(async sql => { txId,
includeUnanchored,
}: {
txId: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbTx>> {
return await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const result = await sql<ContractTxQueryResult[]>` const result = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])} SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
@@ -1330,16 +1333,14 @@ export class PgStore {
offset: number; offset: number;
txTypeFilter: TransactionType[]; txTypeFilter: TransactionType[];
includeUnanchored: boolean; includeUnanchored: boolean;
}) { }): Promise<{ results: DbTx[]; total: number }> {
let totalQuery: { count: number }[]; let totalQuery: { count: number }[];
let resultQuery: ContractTxQueryResult[]; let resultQuery: ContractTxQueryResult[];
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const maxHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
if (txTypeFilter.length === 0) { if (txTypeFilter.length === 0) {
totalQuery = await sql<{ count: number }[]>` totalQuery = await sql<{ count: number }[]>`
SELECT ${ SELECT ${includeUnanchored ? sql('tx_count_unanchored') : sql('tx_count')} AS count
includeUnanchored ? this.sql('tx_count_unanchored') : this.sql('tx_count')
} AS count
FROM chain_tip FROM chain_tip
`; `;
resultQuery = await sql<ContractTxQueryResult[]>` resultQuery = await sql<ContractTxQueryResult[]>`
@@ -1373,15 +1374,15 @@ export class PgStore {
}); });
} }
getTxListEvents(args: { async getTxListEvents(args: {
txs: { txs: {
txId: string; txId: string;
indexBlockHash: string; indexBlockHash: string;
}[]; }[];
limit: number; limit: number;
offset: number; offset: number;
}) { }): Promise<{ results: DbEvent[] }> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
if (args.txs.length === 0) return { results: [] }; if (args.txs.length === 0) return { results: [] };
// TODO: This hack has to be done because postgres.js can't figure out how to interpolate // TODO: This hack has to be done because postgres.js can't figure out how to interpolate
// these `bytea` VALUES comparisons yet. // these `bytea` VALUES comparisons yet.
@@ -1494,13 +1495,18 @@ export class PgStore {
/** /**
* TODO investigate if this method needs be deprecated in favor of {@link getTransactionEvents} * TODO investigate if this method needs be deprecated in favor of {@link getTransactionEvents}
*/ */
async getTxEvents(args: { txId: string; indexBlockHash: string; limit: number; offset: number }) { async getTxEvents(args: {
txId: string;
indexBlockHash: string;
limit: number;
offset: number;
}): Promise<{ results: DbEvent[] }> {
// Note: when this is used to fetch events for an unanchored microblock tx, the `indexBlockHash` is empty // Note: when this is used to fetch events for an unanchored microblock tx, the `indexBlockHash` is empty
// which will cause the sql queries to also match micro-orphaned tx data (resulting in duplicate event results). // which will cause the sql queries to also match micro-orphaned tx data (resulting in duplicate event results).
// To prevent that, all micro-orphaned events are excluded using `microblock_orphaned=false`. // To prevent that, all micro-orphaned events are excluded using `microblock_orphaned=false`.
// That means, unlike regular orphaned txs, if a micro-orphaned tx is never re-mined, the micro-orphaned event data // That means, unlike regular orphaned txs, if a micro-orphaned tx is never re-mined, the micro-orphaned event data
// will never be returned. // will never be returned.
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const eventIndexStart = args.offset; const eventIndexStart = args.offset;
const eventIndexEnd = args.offset + args.limit - 1; const eventIndexEnd = args.offset + args.limit - 1;
const stxLockResults = await sql< const stxLockResults = await sql<
@@ -1617,8 +1623,8 @@ export class PgStore {
eventTypeFilter: DbEventTypeId[]; eventTypeFilter: DbEventTypeId[];
limit: number; limit: number;
offset: number; offset: number;
}) { }): Promise<{ results: DbEvent[] }> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const refValue = args.addressOrTxId.address ?? args.addressOrTxId.txId; const refValue = args.addressOrTxId.address ?? args.addressOrTxId.txId;
const isAddress = args.addressOrTxId.address !== undefined; const isAddress = args.addressOrTxId.address !== undefined;
const emptyEvents = sql`SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL`; const emptyEvents = sql`SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL`;
@@ -1947,7 +1953,7 @@ export class PgStore {
stxAddress: string; stxAddress: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<DbStxBalance> { }): Promise<DbStxBalance> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getCurrentBlockInternal(sql); const blockQuery = await this.getCurrentBlockInternal(sql);
if (!blockQuery.found) { if (!blockQuery.found) {
throw new Error(`Could not find current block`); throw new Error(`Could not find current block`);
@@ -1967,7 +1973,7 @@ export class PgStore {
} }
async getStxBalanceAtBlock(stxAddress: string, blockHeight: number): Promise<DbStxBalance> { async getStxBalanceAtBlock(stxAddress: string, blockHeight: number): Promise<DbStxBalance> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const chainTip = await this.getChainTip(sql); const chainTip = await this.getChainTip(sql);
const blockHeightToQuery = const blockHeightToQuery =
blockHeight > chainTip.blockHeight ? chainTip.blockHeight : blockHeight; blockHeight > chainTip.blockHeight ? chainTip.blockHeight : blockHeight;
@@ -2079,8 +2085,8 @@ export class PgStore {
blockHeight: number; blockHeight: number;
} }
| { includeUnanchored: boolean } | { includeUnanchored: boolean }
) { ): Promise<{ stx: bigint; blockHeight: number }> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
let atBlockHeight: number; let atBlockHeight: number;
let atMatureBlockHeight: number; let atMatureBlockHeight: number;
if ('blockHeight' in args) { if ('blockHeight' in args) {
@@ -2337,7 +2343,7 @@ export class PgStore {
} }
async getTxStatus(txId: string): Promise<FoundOrNot<DbTxGlobalStatus>> { async getTxStatus(txId: string): Promise<FoundOrNot<DbTxGlobalStatus>> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const chainResult = await sql<DbTxGlobalStatus[]>` const chainResult = await sql<DbTxGlobalStatus[]>`
SELECT status, index_block_hash, microblock_hash SELECT status, index_block_hash, microblock_hash
FROM txs FROM txs
@@ -2663,7 +2669,7 @@ export class PgStore {
async searchHash({ hash }: { hash: string }): Promise<FoundOrNot<DbSearchResult>> { async searchHash({ hash }: { hash: string }): Promise<FoundOrNot<DbSearchResult>> {
// TODO(mb): add support for searching for microblock by hash // TODO(mb): add support for searching for microblock by hash
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const txQuery = await sql<ContractTxQueryResult[]>` const txQuery = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])} SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
FROM txs WHERE tx_id = ${hash} LIMIT 1 FROM txs WHERE tx_id = ${hash} LIMIT 1
@@ -2722,7 +2728,7 @@ export class PgStore {
entity_id: principal, entity_id: principal,
}, },
} as const; } as const;
return await this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
if (isContract) { if (isContract) {
const contractMempoolTxResult = await sql<MempoolTxQueryResult[]>` const contractMempoolTxResult = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])} SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
@@ -3114,7 +3120,7 @@ export class PgStore {
if (txIds.length === 0) { if (txIds.length === 0) {
return []; return [];
} }
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const result = await sql<ContractTxQueryResult[]>` const result = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])} SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
@@ -3134,7 +3140,7 @@ export class PgStore {
} }
async getNamespaceList({ includeUnanchored }: { includeUnanchored: boolean }) { async getNamespaceList({ includeUnanchored }: { includeUnanchored: boolean }) {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ namespace_id: string }[]>` return await sql<{ namespace_id: string }[]>`
SELECT DISTINCT ON (namespace_id) namespace_id SELECT DISTINCT ON (namespace_id) namespace_id
@@ -3160,7 +3166,7 @@ export class PgStore {
results: string[]; results: string[];
}> { }> {
const offset = page * 100; const offset = page * 100;
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ name: string }[]>` return await sql<{ name: string }[]>`
SELECT DISTINCT ON (name) name SELECT DISTINCT ON (name) name
@@ -3184,7 +3190,7 @@ export class PgStore {
namespace: string; namespace: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsNamespace & { index_block_hash: string }>> { }): Promise<FoundOrNot<DbBnsNamespace & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<(DbBnsNamespace & { tx_id: string; index_block_hash: string })[]>` return await sql<(DbBnsNamespace & { tx_id: string; index_block_hash: string })[]>`
SELECT DISTINCT ON (namespace_id) namespace_id, * SELECT DISTINCT ON (namespace_id) namespace_id, *
@@ -3218,7 +3224,7 @@ export class PgStore {
includeUnanchored: boolean; includeUnanchored: boolean;
chainId: ChainID; chainId: ChainID;
}): Promise<FoundOrNot<DbBnsName & { index_block_hash: string }>> { }): Promise<FoundOrNot<DbBnsName & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const nameZonefile = await sql<(DbBnsName & { tx_id: string; index_block_hash: string })[]>` const nameZonefile = await sql<(DbBnsName & { tx_id: string; index_block_hash: string })[]>`
SELECT n.*, z.zonefile SELECT n.*, z.zonefile
@@ -3257,7 +3263,7 @@ export class PgStore {
zoneFileHash: string; zoneFileHash: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsZoneFile>> { }): Promise<FoundOrNot<DbBnsZoneFile>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { const maxBlockHeight = await this.getMaxBlockHeight(sql, {
includeUnanchored: args.includeUnanchored, includeUnanchored: args.includeUnanchored,
}); });
@@ -3313,7 +3319,7 @@ export class PgStore {
name: string; name: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsZoneFile>> { }): Promise<FoundOrNot<DbBnsZoneFile>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
// Depending on the kind of name we got, use the correct table to pivot on canonical chain // Depending on the kind of name we got, use the correct table to pivot on canonical chain
// state to get the zonefile. We can't pivot on the `txs` table because some names/subdomains // state to get the zonefile. We can't pivot on the `txs` table because some names/subdomains
@@ -3366,7 +3372,7 @@ export class PgStore {
includeUnanchored: boolean; includeUnanchored: boolean;
chainId: ChainID; chainId: ChainID;
}): Promise<FoundOrNot<string[]>> { }): Promise<FoundOrNot<string[]>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
// 1. Get subdomains owned by this address. // 1. Get subdomains owned by this address.
// These don't produce NFT events so we have to look directly at the `subdomains` table. // These don't produce NFT events so we have to look directly at the `subdomains` table.
@@ -3450,7 +3456,7 @@ export class PgStore {
name: string; name: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<{ results: string[] }> { }): Promise<{ results: string[] }> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ fully_qualified_subdomain: string }[]>` return await sql<{ fully_qualified_subdomain: string }[]>`
SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain
@@ -3474,7 +3480,7 @@ export class PgStore {
includeUnanchored: boolean; includeUnanchored: boolean;
}) { }) {
const offset = page * 100; const offset = page * 100;
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ fully_qualified_subdomain: string }[]>` return await sql<{ fully_qualified_subdomain: string }[]>`
SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain
@@ -3492,7 +3498,7 @@ export class PgStore {
async getNamesList({ page, includeUnanchored }: { page: number; includeUnanchored: boolean }) { async getNamesList({ page, includeUnanchored }: { page: number; includeUnanchored: boolean }) {
const offset = page * 100; const offset = page * 100;
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ name: string }[]>` return await sql<{ name: string }[]>`
SELECT DISTINCT ON (name) name SELECT DISTINCT ON (name) name
@@ -3515,7 +3521,7 @@ export class PgStore {
subdomain: string; subdomain: string;
includeUnanchored: boolean; includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsSubdomain & { index_block_hash: string }>> { }): Promise<FoundOrNot<DbBnsSubdomain & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => { const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored }); const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<(DbBnsSubdomain & { tx_id: string; index_block_hash: string })[]>` return await sql<(DbBnsSubdomain & { tx_id: string; index_block_hash: string })[]>`
SELECT s.*, z.zonefile SELECT s.*, z.zonefile
@@ -3602,7 +3608,7 @@ export class PgStore {
} }
async getUnlockedAddressesAtBlock(block: DbBlock): Promise<StxUnlockEvent[]> { async getUnlockedAddressesAtBlock(block: DbBlock): Promise<StxUnlockEvent[]> {
return this.sql.begin(async client => { return await this.sql.begin('READ ONLY', async client => {
return await this.internalGetUnlockedAccountsAtHeight(client, block); return await this.internalGetUnlockedAccountsAtHeight(client, block);
}); });
} }
@@ -3724,14 +3730,14 @@ export class PgStore {
} }
} }
getFtMetadataList({ async getFtMetadataList({
limit, limit,
offset, offset,
}: { }: {
limit: number; limit: number;
offset: number; offset: number;
}): Promise<{ results: DbFungibleTokenMetadata[]; total: number }> { }): Promise<{ results: DbFungibleTokenMetadata[]; total: number }> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const totalQuery = await sql<{ count: number }[]>` const totalQuery = await sql<{ count: number }[]>`
SELECT COUNT(*)::integer SELECT COUNT(*)::integer
FROM ft_metadata FROM ft_metadata
@@ -3761,14 +3767,14 @@ export class PgStore {
}); });
} }
getNftMetadataList({ async getNftMetadataList({
limit, limit,
offset, offset,
}: { }: {
limit: number; limit: number;
offset: number; offset: number;
}): Promise<{ results: DbNonFungibleTokenMetadata[]; total: number }> { }): Promise<{ results: DbNonFungibleTokenMetadata[]; total: number }> {
return this.sql.begin(async sql => { return await this.sql.begin('READ ONLY', async sql => {
const totalQuery = await sql<{ count: number }[]>` const totalQuery = await sql<{ count: number }[]>`
SELECT COUNT(*)::integer SELECT COUNT(*)::integer
FROM nft_metadata FROM nft_metadata

View File

@@ -1,4 +1,3 @@
import * as fs from 'fs';
import { logger, logError, getOrAdd, batchIterate, isProdEnv, I32_MAX } from '../helpers'; import { logger, logError, getOrAdd, batchIterate, isProdEnv, I32_MAX } from '../helpers';
import { import {
DbBlock, DbBlock,
@@ -74,7 +73,13 @@ import {
} from './helpers'; } from './helpers';
import { PgNotifier } from './pg-notifier'; import { PgNotifier } from './pg-notifier';
import { PgStore } from './pg-store'; import { PgStore } from './pg-store';
import { connectPostgres, PgJsonb, PgServer, PgSqlClient } from './connection'; import {
connectPostgres,
getPgConnectionEnvValue,
PgJsonb,
PgServer,
PgSqlClient,
} from './connection';
import { runMigrations } from './migrations'; import { runMigrations } from './migrations';
import { getPgClientConfig } from './connection-legacy'; import { getPgClientConfig } from './connection-legacy';
import { isProcessableTokenMetadata } from '../token-metadata/helpers'; import { isProcessableTokenMetadata } from '../token-metadata/helpers';
@@ -96,7 +101,9 @@ class MicroblockGapError extends Error {
*/ */
export class PgWriteStore extends PgStore { export class PgWriteStore extends PgStore {
readonly isEventReplay: boolean; readonly isEventReplay: boolean;
private cachedParameterizedInsertStrings = new Map<string, string>(); protected get closeTimeout(): number {
return parseInt(getPgConnectionEnvValue('CLOSE_TIMEOUT', PgServer.primary) ?? '5');
}
constructor( constructor(
sql: PgSqlClient, sql: PgSqlClient,
@@ -1182,7 +1189,7 @@ export class PgWriteStore extends PgStore {
burnchainBlockHeight: number; burnchainBlockHeight: number;
rewards: DbBurnchainReward[]; rewards: DbBurnchainReward[];
}): Promise<void> { }): Promise<void> {
return this.sql.begin(async sql => { return await this.sql.begin(async sql => {
const existingRewards = await sql< const existingRewards = await sql<
{ {
reward_recipient: string; reward_recipient: string;
@@ -1331,16 +1338,13 @@ export class PgWriteStore extends PgStore {
} }
async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> { async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> {
let updatedTxs: DbMempoolTx[] = []; const updateResults = await this.sql<MempoolTxQueryResult[]>`
await this.sql.begin(async sql => { UPDATE mempool_txs
const updateResults = await sql<MempoolTxQueryResult[]>` SET pruned = true, status = ${status}
UPDATE mempool_txs WHERE tx_id IN ${this.sql(txIds)}
SET pruned = true, status = ${status} RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)}
WHERE tx_id IN ${sql(txIds)} `;
RETURNING ${sql(MEMPOOL_TX_COLUMNS)} const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
`;
updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
});
await this.refreshMaterializedView('mempool_digest'); await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) { for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id }); await this.notifier?.sendTx({ txId: tx.tx_id });

View File

@@ -705,10 +705,11 @@ export async function resolveOrTimeout(
) { ) {
let timer: NodeJS.Timeout; let timer: NodeJS.Timeout;
const result = await Promise.race([ const result = await Promise.race([
new Promise(async (resolve, _) => { new Promise((resolve, reject) => {
await promise; promise
clearTimeout(timer); .then(() => resolve(true))
resolve(true); .catch(error => reject(error))
.finally(() => clearTimeout(timer));
}), }),
new Promise((resolve, _) => { new Promise((resolve, _) => {
timer = setInterval(() => { timer = setInterval(() => {

View File

@@ -19,7 +19,7 @@ async function startShutdown() {
return; return;
} }
isShuttingDown = true; isShuttingDown = true;
const timeoutMs = 5000; const timeoutMs = parseInt(process.env['STACKS_SHUTDOWN_FORCE_KILL_TIMEOUT'] ?? '60') * 1000;
let errorEncountered = false; let errorEncountered = false;
for (const config of shutdownConfigs) { for (const config of shutdownConfigs) {
try { try {

View File

@@ -25,7 +25,7 @@ import { getBlocksWithMetadata, parseDbEvent } from '../api/controllers/db-contr
import * as assert from 'assert'; import * as assert from 'assert';
import { PgWriteStore } from '../datastore/pg-write-store'; import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations'; import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { getPostgres, PgSqlClient } from '../datastore/connection'; import { getPostgres, PgServer, PgSqlClient } from '../datastore/connection';
import { bnsNameCV, bufferToHexPrefixString, I32_MAX } from '../helpers'; import { bnsNameCV, bufferToHexPrefixString, I32_MAX } from '../helpers';
import { ChainID } from '@stacks/transactions'; import { ChainID } from '@stacks/transactions';
import { TestBlockBuilder } from '../test-utils/test-builders'; import { TestBlockBuilder } from '../test-utils/test-builders';
@@ -215,6 +215,45 @@ describe('postgres datastore', () => {
); );
}); });
test('postgres primary env var config fallback', () => {
testEnvVars(
{
PG_CONNECTION_URI: undefined,
PG_DATABASE: 'pg_db_db1',
PG_USER: 'pg_user_user1',
PG_PASSWORD: 'pg_password_password1',
PG_HOST: 'pg_host_host1',
PG_PORT: '9876',
PG_SSL: 'true',
PG_SCHEMA: 'pg_schema_schema1',
PG_APPLICATION_NAME: 'test-env-vars',
PG_MAX_LIFETIME: '5',
PG_IDLE_TIMEOUT: '1',
// Primary values:
PG_PRIMARY_DATABASE: 'primary_db',
PG_PRIMARY_USER: 'primary_user',
PG_PRIMARY_PASSWORD: 'primary_password',
PG_PRIMARY_HOST: 'primary_host',
PG_PRIMARY_PORT: '9999',
},
() => {
const sql = getPostgres({ usageName: 'tests', pgServer: PgServer.primary });
// Primary values take precedence.
expect(sql.options.database).toBe('primary_db');
expect(sql.options.user).toBe('primary_user');
expect(sql.options.pass).toBe('primary_password');
expect(sql.options.host).toStrictEqual(['primary_host']);
expect(sql.options.port).toStrictEqual([9999]);
// Other values come from defaults.
expect(sql.options.ssl).toBe(true);
expect(sql.options.max_lifetime).toBe(5);
expect(sql.options.idle_timeout).toBe(1);
expect(sql.options.connection.search_path).toBe('pg_schema_schema1');
expect(sql.options.connection.application_name).toBe('test-env-vars:tests');
}
);
});
test('postgres connection application_name', async () => { test('postgres connection application_name', async () => {
await testEnvVars( await testEnvVars(
{ {