feat: add ENV configs for DB close and API shutdown timeouts (#1366)

* style: async/await in postgres classes

* chore: tag read only pg txs

* feat: pg close timeout

* feat: force kill timeout env

* fix: remaining read only

* fix: correct use of scoped sql conns

* fix: resolveOrTimeout promise handling
This commit is contained in:
Rafael Cárdenas
2022-10-20 08:40:01 -05:00
committed by GitHub
parent f15df41fa9
commit 444f008fe2
7 changed files with 180 additions and 117 deletions

6
.env
View File

@@ -9,6 +9,8 @@ PG_SSL=false
# PG_IDLE_TIMEOUT=30
# Max connection lifetime in seconds, defaults to 60
# PG_MAX_LIFETIME=60
# Seconds before force-ending running queries on connection close, defaults to 5
# PG_CLOSE_TIMEOUT=5
# Can be any string, use to specify a use case specific to a deployment
PG_APPLICATION_NAME=stacks-blockchain-api
@@ -33,6 +35,7 @@ PG_APPLICATION_NAME=stacks-blockchain-api
# PG_PRIMARY_SSL=
# PG_PRIMARY_IDLE_TIMEOUT=
# PG_PRIMARY_MAX_LIFETIME=
# PG_PRIMARY_CLOSE_TIMEOUT=
# The connection URI below can be used in place of the PG variables above,
# but if enabled it must be defined without others or omitted.
# PG_PRIMARY_CONNECTION_URI=
@@ -84,6 +87,9 @@ STACKS_CORE_RPC_PORT=20443
## configure the chainID/networkID; testnet: 0x80000000, mainnet: 0x00000001
STACKS_CHAIN_ID=0x00000001
# Seconds to allow API components to shut down gracefully before force-killing them, defaults to 60
# STACKS_SHUTDOWN_FORCE_KILL_TIMEOUT=60
BTC_RPC_HOST=http://127.0.0.1
BTC_RPC_PORT=18443
BTC_RPC_USER=btc

View File

@@ -115,6 +115,20 @@ export async function connectPostgres({
return sql;
}
/**
* Retrieve a postgres ENV value depending on the target database server (read-replica/default or
* primary). We will fall back to read-replica values if a primary value was not given. See the
* `.env` file for more information on these options.
*/
export function getPgConnectionEnvValue(
name: string,
pgServer: PgServer = PgServer.default
): string | undefined {
return pgServer === PgServer.primary
? process.env[`PG_PRIMARY_${name}`] ?? process.env[`PG_${name}`]
: process.env[`PG_${name}`];
}
export function getPostgres({
usageName,
pgServer,
@@ -122,28 +136,21 @@ export function getPostgres({
usageName: string;
pgServer?: PgServer;
}): PgSqlClient {
// Retrieve a postgres ENV value depending on the target database server (read-replica/default or primary).
// We will fall back to read-replica values if a primary value was not given.
// See the `.env` file for more information on these options.
const pgEnvValue = (name: string): string | undefined =>
pgServer === PgServer.primary
? process.env[`PG_PRIMARY_${name}`] ?? process.env[`PG_${name}`]
: process.env[`PG_${name}`];
const pgEnvVars = {
database: pgEnvValue('DATABASE'),
user: pgEnvValue('USER'),
password: pgEnvValue('PASSWORD'),
host: pgEnvValue('HOST'),
port: pgEnvValue('PORT'),
ssl: pgEnvValue('SSL'),
schema: pgEnvValue('SCHEMA'),
applicationName: pgEnvValue('APPLICATION_NAME'),
idleTimeout: parseInt(pgEnvValue('IDLE_TIMEOUT') ?? '30'),
maxLifetime: parseInt(pgEnvValue('MAX_LIFETIME') ?? '60'),
database: getPgConnectionEnvValue('DATABASE', pgServer),
user: getPgConnectionEnvValue('USER', pgServer),
password: getPgConnectionEnvValue('PASSWORD', pgServer),
host: getPgConnectionEnvValue('HOST', pgServer),
port: getPgConnectionEnvValue('PORT', pgServer),
ssl: getPgConnectionEnvValue('SSL', pgServer),
schema: getPgConnectionEnvValue('SCHEMA', pgServer),
applicationName: getPgConnectionEnvValue('APPLICATION_NAME', pgServer),
idleTimeout: parseInt(getPgConnectionEnvValue('IDLE_TIMEOUT', pgServer) ?? '30'),
maxLifetime: parseInt(getPgConnectionEnvValue('MAX_LIFETIME', pgServer) ?? '60'),
poolMax: parseInt(process.env['PG_CONNECTION_POOL_MAX'] ?? '10'),
};
const defaultAppName = 'stacks-blockchain-api';
const pgConnectionUri = pgEnvValue('CONNECTION_URI');
const pgConnectionUri = getPgConnectionEnvValue('CONNECTION_URI', pgServer);
const pgConfigEnvVar = Object.entries(pgEnvVars).find(([, v]) => typeof v === 'string')?.[0];
if (pgConfigEnvVar && pgConnectionUri) {
throw new Error(

View File

@@ -69,7 +69,7 @@ import {
StxUnlockEvent,
TransferQueryResult,
} from './common';
import { connectPostgres, PgServer, PgSqlClient } from './connection';
import { connectPostgres, getPgConnectionEnvValue, PgServer, PgSqlClient } from './connection';
import {
abiColumn,
BLOCK_COLUMNS,
@@ -110,6 +110,9 @@ export class PgStore {
readonly sql: PgSqlClient;
readonly eventEmitter: PgStoreEventEmitter;
readonly notifier?: PgNotifier;
protected get closeTimeout(): number {
return parseInt(getPgConnectionEnvValue('CLOSE_TIMEOUT', PgServer.default) ?? '5');
}
constructor(sql: PgSqlClient, notifier: PgNotifier | undefined = undefined) {
this.sql = sql;
@@ -133,7 +136,7 @@ export class PgStore {
async close(): Promise<void> {
await this.notifier?.close();
await this.sql.end();
await this.sql.end({ timeout: this.closeTimeout });
}
/**
@@ -211,7 +214,7 @@ export class PgStore {
blockIdentifer: BlockIdentifier,
metadata?: DbGetBlockWithMetadataOpts<TWithTxs, TWithMicroblocks>
): Promise<FoundOrNot<DbGetBlockWithMetadataResponse<TWithTxs, TWithMicroblocks>>> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const block = await this.getBlockInternal(sql, blockIdentifer);
if (!block.found) {
return { found: false };
@@ -377,24 +380,6 @@ export class PgStore {
return { found: true, result: block } as const;
}
async getBlocks({ limit, offset }: { limit: number; offset: number }) {
return await this.sql.begin(async sql => {
const total = await sql<{ count: number }[]>`
SELECT block_count AS count FROM chain_tip
`;
const results = await sql<BlockQueryResult[]>`
SELECT ${sql(BLOCK_COLUMNS)}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT ${limit}
OFFSET ${offset}
`;
const parsed = results.map(r => parseBlockQueryResult(r));
return { results: parsed, total: total[0].count } as const;
});
}
/**
* Returns Block information with metadata, including accepted and streamed microblocks hash
* @returns `BlocksWithMetadata` object including list of Blocks with metadata and total count.
@@ -406,9 +391,21 @@ export class PgStore {
limit: number;
offset: number;
}): Promise<BlocksWithMetadata> {
return await this.sql.begin(async sql => {
// get block list
const { results: blocks, total: block_count } = await this.getBlocks({ limit, offset });
return await this.sql.begin('READ ONLY', async sql => {
// Get blocks with count.
const countQuery = await sql<{ count: number }[]>`
SELECT block_count AS count FROM chain_tip
`;
const block_count = countQuery[0].count;
const blocksQuery = await sql<BlockQueryResult[]>`
SELECT ${sql(BLOCK_COLUMNS)}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT ${limit}
OFFSET ${offset}
`;
const blocks = blocksQuery.map(r => parseBlockQueryResult(r));
const blockHashValues: string[] = [];
const indexBlockHashValues: string[] = [];
blocks.forEach(block => {
@@ -502,7 +499,7 @@ export class PgStore {
}
async getBlockTxsRows(blockHash: string): Promise<FoundOrNot<DbTx[]>> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getBlockInternal(sql, { hash: blockHash });
if (!blockQuery.found) {
throw new Error(`Could not find block by hash ${blockHash}`);
@@ -525,7 +522,7 @@ export class PgStore {
async getMicroblock(args: {
microblockHash: string;
}): Promise<FoundOrNot<{ microblock: DbMicroblock; txs: string[] }>> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const result = await sql<MicroblockQueryResult[]>`
SELECT ${sql(MICROBLOCK_COLUMNS)}
FROM microblocks
@@ -552,7 +549,7 @@ export class PgStore {
limit: number;
offset: number;
}): Promise<{ result: { microblock: DbMicroblock; txs: string[] }[]; total: number }> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const countQuery = await sql<
{ total: number }[]
>`SELECT microblock_count AS total FROM chain_tip`;
@@ -603,7 +600,7 @@ export class PgStore {
}
async getUnanchoredTxs(): Promise<{ txs: DbTx[] }> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
return this.getUnanchoredTxsInternal(sql);
});
}
@@ -612,7 +609,7 @@ export class PgStore {
stxAddress: string;
blockIdentifier: BlockIdentifier;
}): Promise<FoundOrNot<{ lastExecutedTxNonce: number | null; possibleNextNonce: number }>> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const dbBlock = await this.getBlockInternal(sql, args.blockIdentifier);
if (!dbBlock.found) {
return { found: false };
@@ -644,7 +641,7 @@ export class PgStore {
possibleNextNonce: number;
detectedMissingNonces: number[];
}> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const executedTxNonce = await sql<{ nonce: number | null }[]>`
SELECT MAX(nonce) nonce
FROM txs
@@ -790,7 +787,7 @@ export class PgStore {
limit: number,
offset: number
): Promise<FoundOrNot<{ results: DbTx[]; total: number }>> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getBlockInternal(sql, blockIdentifer);
if (!blockQuery.found) {
return { found: false };
@@ -951,13 +948,13 @@ export class PgStore {
if (args.txIds.length === 0) {
return [];
}
return this.sql.begin(async client => {
const result = await this.sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(this.sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
return await this.sql.begin('READ ONLY', async sql => {
const result = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
FROM mempool_txs
WHERE tx_id IN ${this.sql(args.txIds)}
WHERE tx_id IN ${sql(args.txIds)}
`;
return await this.parseMempoolTransactions(result, client, args.includeUnanchored);
return await this.parseMempoolTransactions(result, sql, args.includeUnanchored);
});
}
@@ -969,8 +966,8 @@ export class PgStore {
txId: string;
includeUnanchored: boolean;
includePruned?: boolean;
}) {
return this.sql.begin(async sql => {
}): Promise<FoundOrNot<DbMempoolTx>> {
return await this.sql.begin('READ ONLY', async sql => {
const result = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
FROM mempool_txs
@@ -1014,7 +1011,7 @@ export class PgStore {
limit: number;
offset: number;
}): Promise<{ results: DbMempoolTx[]; total: number }> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const droppedStatuses = [
DbTxStatus.DroppedReplaceByFee,
DbTxStatus.DroppedReplaceAcrossFork,
@@ -1051,7 +1048,7 @@ export class PgStore {
}
async getMempoolStats({ lastBlockCount }: { lastBlockCount?: number }): Promise<DbMempoolStats> {
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
return await this.getMempoolStatsInternal({ sql, lastBlockCount });
});
}
@@ -1228,7 +1225,7 @@ export class PgStore {
recipientAddress?: string;
address?: string;
}): Promise<{ results: DbMempoolTx[]; total: number }> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
// If caller did not opt-in to unanchored tx data, then treat unanchored txs as pending mempool txs.
const unanchoredTxs: string[] = !includeUnanchored
? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id)
@@ -1289,8 +1286,14 @@ export class PgStore {
return { found: true, result: { digest: result[0].digest } };
}
async getTx({ txId, includeUnanchored }: { txId: string; includeUnanchored: boolean }) {
return this.sql.begin(async sql => {
async getTx({
txId,
includeUnanchored,
}: {
txId: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbTx>> {
return await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const result = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
@@ -1330,16 +1333,14 @@ export class PgStore {
offset: number;
txTypeFilter: TransactionType[];
includeUnanchored: boolean;
}) {
}): Promise<{ results: DbTx[]; total: number }> {
let totalQuery: { count: number }[];
let resultQuery: ContractTxQueryResult[];
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const maxHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
if (txTypeFilter.length === 0) {
totalQuery = await sql<{ count: number }[]>`
SELECT ${
includeUnanchored ? this.sql('tx_count_unanchored') : this.sql('tx_count')
} AS count
SELECT ${includeUnanchored ? sql('tx_count_unanchored') : sql('tx_count')} AS count
FROM chain_tip
`;
resultQuery = await sql<ContractTxQueryResult[]>`
@@ -1373,15 +1374,15 @@ export class PgStore {
});
}
getTxListEvents(args: {
async getTxListEvents(args: {
txs: {
txId: string;
indexBlockHash: string;
}[];
limit: number;
offset: number;
}) {
return this.sql.begin(async sql => {
}): Promise<{ results: DbEvent[] }> {
return await this.sql.begin('READ ONLY', async sql => {
if (args.txs.length === 0) return { results: [] };
// TODO: This hack has to be done because postgres.js can't figure out how to interpolate
// these `bytea` VALUES comparisons yet.
@@ -1494,13 +1495,18 @@ export class PgStore {
/**
* TODO investigate if this method needs be deprecated in favor of {@link getTransactionEvents}
*/
async getTxEvents(args: { txId: string; indexBlockHash: string; limit: number; offset: number }) {
async getTxEvents(args: {
txId: string;
indexBlockHash: string;
limit: number;
offset: number;
}): Promise<{ results: DbEvent[] }> {
// Note: when this is used to fetch events for an unanchored microblock tx, the `indexBlockHash` is empty
// which will cause the sql queries to also match micro-orphaned tx data (resulting in duplicate event results).
// To prevent that, all micro-orphaned events are excluded using `microblock_orphaned=false`.
// That means, unlike regular orphaned txs, if a micro-orphaned tx is never re-mined, the micro-orphaned event data
// will never be returned.
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const eventIndexStart = args.offset;
const eventIndexEnd = args.offset + args.limit - 1;
const stxLockResults = await sql<
@@ -1617,8 +1623,8 @@ export class PgStore {
eventTypeFilter: DbEventTypeId[];
limit: number;
offset: number;
}) {
return this.sql.begin(async sql => {
}): Promise<{ results: DbEvent[] }> {
return await this.sql.begin('READ ONLY', async sql => {
const refValue = args.addressOrTxId.address ?? args.addressOrTxId.txId;
const isAddress = args.addressOrTxId.address !== undefined;
const emptyEvents = sql`SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL`;
@@ -1947,7 +1953,7 @@ export class PgStore {
stxAddress: string;
includeUnanchored: boolean;
}): Promise<DbStxBalance> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const blockQuery = await this.getCurrentBlockInternal(sql);
if (!blockQuery.found) {
throw new Error(`Could not find current block`);
@@ -1967,7 +1973,7 @@ export class PgStore {
}
async getStxBalanceAtBlock(stxAddress: string, blockHeight: number): Promise<DbStxBalance> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const chainTip = await this.getChainTip(sql);
const blockHeightToQuery =
blockHeight > chainTip.blockHeight ? chainTip.blockHeight : blockHeight;
@@ -2079,8 +2085,8 @@ export class PgStore {
blockHeight: number;
}
| { includeUnanchored: boolean }
) {
return this.sql.begin(async sql => {
): Promise<{ stx: bigint; blockHeight: number }> {
return await this.sql.begin('READ ONLY', async sql => {
let atBlockHeight: number;
let atMatureBlockHeight: number;
if ('blockHeight' in args) {
@@ -2337,7 +2343,7 @@ export class PgStore {
}
async getTxStatus(txId: string): Promise<FoundOrNot<DbTxGlobalStatus>> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const chainResult = await sql<DbTxGlobalStatus[]>`
SELECT status, index_block_hash, microblock_hash
FROM txs
@@ -2663,7 +2669,7 @@ export class PgStore {
async searchHash({ hash }: { hash: string }): Promise<FoundOrNot<DbSearchResult>> {
// TODO(mb): add support for searching for microblock by hash
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const txQuery = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
FROM txs WHERE tx_id = ${hash} LIMIT 1
@@ -2722,7 +2728,7 @@ export class PgStore {
entity_id: principal,
},
} as const;
return await this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
if (isContract) {
const contractMempoolTxResult = await sql<MempoolTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}
@@ -3114,7 +3120,7 @@ export class PgStore {
if (txIds.length === 0) {
return [];
}
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const result = await sql<ContractTxQueryResult[]>`
SELECT ${unsafeCols(sql, [...TX_COLUMNS, abiColumn()])}
@@ -3134,7 +3140,7 @@ export class PgStore {
}
async getNamespaceList({ includeUnanchored }: { includeUnanchored: boolean }) {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ namespace_id: string }[]>`
SELECT DISTINCT ON (namespace_id) namespace_id
@@ -3160,7 +3166,7 @@ export class PgStore {
results: string[];
}> {
const offset = page * 100;
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ name: string }[]>`
SELECT DISTINCT ON (name) name
@@ -3184,7 +3190,7 @@ export class PgStore {
namespace: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsNamespace & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<(DbBnsNamespace & { tx_id: string; index_block_hash: string })[]>`
SELECT DISTINCT ON (namespace_id) namespace_id, *
@@ -3218,7 +3224,7 @@ export class PgStore {
includeUnanchored: boolean;
chainId: ChainID;
}): Promise<FoundOrNot<DbBnsName & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
const nameZonefile = await sql<(DbBnsName & { tx_id: string; index_block_hash: string })[]>`
SELECT n.*, z.zonefile
@@ -3257,7 +3263,7 @@ export class PgStore {
zoneFileHash: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsZoneFile>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, {
includeUnanchored: args.includeUnanchored,
});
@@ -3313,7 +3319,7 @@ export class PgStore {
name: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsZoneFile>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
// Depending on the kind of name we got, use the correct table to pivot on canonical chain
// state to get the zonefile. We can't pivot on the `txs` table because some names/subdomains
@@ -3366,7 +3372,7 @@ export class PgStore {
includeUnanchored: boolean;
chainId: ChainID;
}): Promise<FoundOrNot<string[]>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
// 1. Get subdomains owned by this address.
// These don't produce NFT events so we have to look directly at the `subdomains` table.
@@ -3450,7 +3456,7 @@ export class PgStore {
name: string;
includeUnanchored: boolean;
}): Promise<{ results: string[] }> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ fully_qualified_subdomain: string }[]>`
SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain
@@ -3474,7 +3480,7 @@ export class PgStore {
includeUnanchored: boolean;
}) {
const offset = page * 100;
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ fully_qualified_subdomain: string }[]>`
SELECT DISTINCT ON (fully_qualified_subdomain) fully_qualified_subdomain
@@ -3492,7 +3498,7 @@ export class PgStore {
async getNamesList({ page, includeUnanchored }: { page: number; includeUnanchored: boolean }) {
const offset = page * 100;
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<{ name: string }[]>`
SELECT DISTINCT ON (name) name
@@ -3515,7 +3521,7 @@ export class PgStore {
subdomain: string;
includeUnanchored: boolean;
}): Promise<FoundOrNot<DbBnsSubdomain & { index_block_hash: string }>> {
const queryResult = await this.sql.begin(async sql => {
const queryResult = await this.sql.begin('READ ONLY', async sql => {
const maxBlockHeight = await this.getMaxBlockHeight(sql, { includeUnanchored });
return await sql<(DbBnsSubdomain & { tx_id: string; index_block_hash: string })[]>`
SELECT s.*, z.zonefile
@@ -3602,7 +3608,7 @@ export class PgStore {
}
async getUnlockedAddressesAtBlock(block: DbBlock): Promise<StxUnlockEvent[]> {
return this.sql.begin(async client => {
return await this.sql.begin('READ ONLY', async client => {
return await this.internalGetUnlockedAccountsAtHeight(client, block);
});
}
@@ -3724,14 +3730,14 @@ export class PgStore {
}
}
getFtMetadataList({
async getFtMetadataList({
limit,
offset,
}: {
limit: number;
offset: number;
}): Promise<{ results: DbFungibleTokenMetadata[]; total: number }> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const totalQuery = await sql<{ count: number }[]>`
SELECT COUNT(*)::integer
FROM ft_metadata
@@ -3761,14 +3767,14 @@ export class PgStore {
});
}
getNftMetadataList({
async getNftMetadataList({
limit,
offset,
}: {
limit: number;
offset: number;
}): Promise<{ results: DbNonFungibleTokenMetadata[]; total: number }> {
return this.sql.begin(async sql => {
return await this.sql.begin('READ ONLY', async sql => {
const totalQuery = await sql<{ count: number }[]>`
SELECT COUNT(*)::integer
FROM nft_metadata

View File

@@ -1,4 +1,3 @@
import * as fs from 'fs';
import { logger, logError, getOrAdd, batchIterate, isProdEnv, I32_MAX } from '../helpers';
import {
DbBlock,
@@ -74,7 +73,13 @@ import {
} from './helpers';
import { PgNotifier } from './pg-notifier';
import { PgStore } from './pg-store';
import { connectPostgres, PgJsonb, PgServer, PgSqlClient } from './connection';
import {
connectPostgres,
getPgConnectionEnvValue,
PgJsonb,
PgServer,
PgSqlClient,
} from './connection';
import { runMigrations } from './migrations';
import { getPgClientConfig } from './connection-legacy';
import { isProcessableTokenMetadata } from '../token-metadata/helpers';
@@ -96,7 +101,9 @@ class MicroblockGapError extends Error {
*/
export class PgWriteStore extends PgStore {
readonly isEventReplay: boolean;
private cachedParameterizedInsertStrings = new Map<string, string>();
protected get closeTimeout(): number {
return parseInt(getPgConnectionEnvValue('CLOSE_TIMEOUT', PgServer.primary) ?? '5');
}
constructor(
sql: PgSqlClient,
@@ -1182,7 +1189,7 @@ export class PgWriteStore extends PgStore {
burnchainBlockHeight: number;
rewards: DbBurnchainReward[];
}): Promise<void> {
return this.sql.begin(async sql => {
return await this.sql.begin(async sql => {
const existingRewards = await sql<
{
reward_recipient: string;
@@ -1331,16 +1338,13 @@ export class PgWriteStore extends PgStore {
}
async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> {
let updatedTxs: DbMempoolTx[] = [];
await this.sql.begin(async sql => {
const updateResults = await sql<MempoolTxQueryResult[]>`
UPDATE mempool_txs
SET pruned = true, status = ${status}
WHERE tx_id IN ${sql(txIds)}
RETURNING ${sql(MEMPOOL_TX_COLUMNS)}
`;
updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
});
const updateResults = await this.sql<MempoolTxQueryResult[]>`
UPDATE mempool_txs
SET pruned = true, status = ${status}
WHERE tx_id IN ${this.sql(txIds)}
RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)}
`;
const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });

View File

@@ -705,10 +705,11 @@ export async function resolveOrTimeout(
) {
let timer: NodeJS.Timeout;
const result = await Promise.race([
new Promise(async (resolve, _) => {
await promise;
clearTimeout(timer);
resolve(true);
new Promise((resolve, reject) => {
promise
.then(() => resolve(true))
.catch(error => reject(error))
.finally(() => clearTimeout(timer));
}),
new Promise((resolve, _) => {
timer = setInterval(() => {

View File

@@ -19,7 +19,7 @@ async function startShutdown() {
return;
}
isShuttingDown = true;
const timeoutMs = 5000;
const timeoutMs = parseInt(process.env['STACKS_SHUTDOWN_FORCE_KILL_TIMEOUT'] ?? '60') * 1000;
let errorEncountered = false;
for (const config of shutdownConfigs) {
try {

View File

@@ -25,7 +25,7 @@ import { getBlocksWithMetadata, parseDbEvent } from '../api/controllers/db-contr
import * as assert from 'assert';
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { getPostgres, PgSqlClient } from '../datastore/connection';
import { getPostgres, PgServer, PgSqlClient } from '../datastore/connection';
import { bnsNameCV, bufferToHexPrefixString, I32_MAX } from '../helpers';
import { ChainID } from '@stacks/transactions';
import { TestBlockBuilder } from '../test-utils/test-builders';
@@ -215,6 +215,45 @@ describe('postgres datastore', () => {
);
});
test('postgres primary env var config fallback', () => {
testEnvVars(
{
PG_CONNECTION_URI: undefined,
PG_DATABASE: 'pg_db_db1',
PG_USER: 'pg_user_user1',
PG_PASSWORD: 'pg_password_password1',
PG_HOST: 'pg_host_host1',
PG_PORT: '9876',
PG_SSL: 'true',
PG_SCHEMA: 'pg_schema_schema1',
PG_APPLICATION_NAME: 'test-env-vars',
PG_MAX_LIFETIME: '5',
PG_IDLE_TIMEOUT: '1',
// Primary values:
PG_PRIMARY_DATABASE: 'primary_db',
PG_PRIMARY_USER: 'primary_user',
PG_PRIMARY_PASSWORD: 'primary_password',
PG_PRIMARY_HOST: 'primary_host',
PG_PRIMARY_PORT: '9999',
},
() => {
const sql = getPostgres({ usageName: 'tests', pgServer: PgServer.primary });
// Primary values take precedence.
expect(sql.options.database).toBe('primary_db');
expect(sql.options.user).toBe('primary_user');
expect(sql.options.pass).toBe('primary_password');
expect(sql.options.host).toStrictEqual(['primary_host']);
expect(sql.options.port).toStrictEqual([9999]);
// Other values come from defaults.
expect(sql.options.ssl).toBe(true);
expect(sql.options.max_lifetime).toBe(5);
expect(sql.options.idle_timeout).toBe(1);
expect(sql.options.connection.search_path).toBe('pg_schema_schema1');
expect(sql.options.connection.application_name).toBe('test-env-vars:tests');
}
);
});
test('postgres connection application_name', async () => {
await testEnvVars(
{