feat: add latest smart contract txs materialized view

* feat: add contract_txs materialized view

* chore: refresh view on update

* chore: fix unit tests

* feat: optimize view to include last 50 txs

* feat: refresh on microblock updates as well

* chore: remove unused var

* chore: select only required columns

* fix: race condition on refresh and pg notify

* chore: return to shorthand or-equal notation

* fix: await for pg notification emission

* chore: make helper for refreshing views

* fix: sql syntax error

* chore: fix style for view selection
This commit is contained in:
Rafael Cárdenas
2021-11-23 16:41:01 -06:00
committed by GitHub
parent e7d8efa9f3
commit 67c453cb6c
5 changed files with 264 additions and 102 deletions

View File

@@ -70,32 +70,32 @@ export class PgNotifier {
await this.subscriber.listenTo(this.pgChannelName);
}
public sendBlock(payload: PgBlockNotificationPayload) {
this.notify({ type: 'blockUpdate', payload: payload });
public async sendBlock(payload: PgBlockNotificationPayload) {
await this.notify({ type: 'blockUpdate', payload: payload });
}
public sendMicroblock(payload: PgMicroblockNotificationPayload) {
this.notify({ type: 'microblockUpdate', payload: payload });
public async sendMicroblock(payload: PgMicroblockNotificationPayload) {
await this.notify({ type: 'microblockUpdate', payload: payload });
}
public sendTx(payload: PgTxNotificationPayload) {
this.notify({ type: 'txUpdate', payload: payload });
public async sendTx(payload: PgTxNotificationPayload) {
await this.notify({ type: 'txUpdate', payload: payload });
}
public sendAddress(payload: PgAddressNotificationPayload) {
this.notify({ type: 'addressUpdate', payload: payload });
public async sendAddress(payload: PgAddressNotificationPayload) {
await this.notify({ type: 'addressUpdate', payload: payload });
}
public sendName(payload: PgNameNotificationPayload) {
this.notify({ type: 'nameUpdate', payload: payload });
public async sendName(payload: PgNameNotificationPayload) {
await this.notify({ type: 'nameUpdate', payload: payload });
}
public sendTokenMetadata(payload: PgTokenMetadataNotificationPayload) {
this.notify({ type: 'tokenMetadataUpdateQueued', payload: payload });
public async sendTokenMetadata(payload: PgTokenMetadataNotificationPayload) {
await this.notify({ type: 'tokenMetadataUpdateQueued', payload: payload });
}
public sendTokens(payload: PgTokensNotificationPayload) {
this.notify({ type: 'tokensUpdate', payload: payload });
public async sendTokens(payload: PgTokensNotificationPayload) {
await this.notify({ type: 'tokensUpdate', payload: payload });
}
public async close() {
@@ -103,8 +103,8 @@ export class PgNotifier {
await this.subscriber.close();
}
private notify(notification: PgNotification) {
void this.subscriber
private async notify(notification: PgNotification) {
await this.subscriber
.notify(this.pgChannelName, { notification: notification })
.catch(error =>
logError(`Error sending PgNotifier notification of type: ${notification.type}`, error)

View File

@@ -37,6 +37,8 @@ import {
pipelineAsync,
isProdEnv,
has0xPrefix,
isValidPrincipal,
isSmartContractTx,
} from '../helpers';
import {
DataStore,
@@ -963,6 +965,7 @@ export class PgDataStore
const txs: DataStoreTxEventData[] = [];
let refreshContractTxsView = false;
for (const entry of data.txs) {
// Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist.
const dbTx: DbTx = {
@@ -987,12 +990,16 @@ export class PgDataStore
names: entry.names.map(e => ({ ...e, registered_at: blockHeight })),
namespaces: entry.namespaces.map(e => ({ ...e, ready_block: blockHeight })),
});
refreshContractTxsView ||= isSmartContractTx(dbTx, entry.stxEvents);
}
await this.insertMicroblockData(client, dbMicroblocks, txs);
dbMicroblocks.forEach(microblock =>
this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash })
);
if (refreshContractTxsView) {
await this.refreshMaterializedView(client, 'latest_contract_txs');
}
dbMicroblocks.forEach(async microblock => {
await this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash });
});
// Find any microblocks that have been orphaned by this latest microblock chain tip.
// This function also checks that each microblock parent hash points to an existing microblock in the db.
@@ -1192,7 +1199,8 @@ export class PgDataStore
const blocksUpdated = await this.updateBlock(client, data.block);
if (blocksUpdated !== 0) {
let newNftEvents = false;
let refreshNftCustodyView = false;
let refreshContractTxsView = false;
for (const minerRewards of data.minerRewards) {
await this.updateMinerReward(client, minerRewards);
}
@@ -1207,7 +1215,7 @@ export class PgDataStore
await this.updateFtEvent(client, entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
newNftEvents = true;
refreshNftCustodyView = true;
await this.updateNftEvent(client, entry.tx, nftEvent);
}
for (const smartContract of entry.smartContracts) {
@@ -1219,9 +1227,13 @@ export class PgDataStore
for (const namespace of entry.namespaces) {
await this.updateNamespaces(client, entry.tx, namespace);
}
refreshContractTxsView ||= isSmartContractTx(entry.tx, entry.stxEvents);
}
if (newNftEvents && !this.eventReplay) {
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
if (refreshNftCustodyView) {
await this.refreshMaterializedView(client, 'nft_custody');
}
if (refreshContractTxsView) {
await this.refreshMaterializedView(client, 'latest_contract_txs');
}
const tokenContractDeployments = data.txs
@@ -1251,13 +1263,13 @@ export class PgDataStore
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
// event replay of the v1 blockchain.
if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) {
this.notifier?.sendBlock({ blockHash: data.block.block_hash });
data.txs.forEach(entry => {
this.notifier?.sendTx({ txId: entry.tx.tx_id });
await this.notifier?.sendBlock({ blockHash: data.block.block_hash });
data.txs.forEach(async entry => {
await this.notifier?.sendTx({ txId: entry.tx.tx_id });
});
this.emitAddressTxUpdates(data);
for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) {
this.notifier?.sendTokenMetadata({ entry: tokenMetadataQueueEntry });
await this.notifier?.sendTokenMetadata({ entry: tokenMetadataQueueEntry });
}
}
}
@@ -1788,7 +1800,7 @@ export class PgDataStore
[zonefile, validZonefileHash]
);
});
this.notifier?.sendName({ nameInfo: tx_id });
await this.notifier?.sendName({ nameInfo: tx_id });
}
private validateZonefileHash(zonefileHash: string) {
@@ -1857,8 +1869,8 @@ export class PgDataStore
break;
}
});
addressTxUpdates.forEach((blockHeight, address) => {
this.notifier?.sendAddress({
addressTxUpdates.forEach(async (blockHeight, address) => {
await this.notifier?.sendAddress({
address: address,
blockHeight: blockHeight,
});
@@ -3190,7 +3202,7 @@ export class PgDataStore
}
});
for (const tx of updatedTxs) {
this.notifier?.sendTx({ txId: tx.tx_id });
await this.notifier?.sendTx({ txId: tx.tx_id });
}
}
@@ -3210,7 +3222,7 @@ export class PgDataStore
updatedTxs = updateResults.rows.map(r => this.parseMempoolTxQueryResult(r));
});
for (const tx of updatedTxs) {
this.notifier?.sendTx({ txId: tx.tx_id });
await this.notifier?.sendTx({ txId: tx.tx_id });
}
}
@@ -4649,6 +4661,23 @@ export class PgDataStore
});
}
/**
* Refreshes a Postgres materialized view.
* @param client - Pg Client
* @param viewName - Materialized view name
* @param skipDuringEventReplay - If we should skip refreshing during event replay
*/
async refreshMaterializedView(
client: ClientBase,
viewName: string,
skipDuringEventReplay = true
) {
if (this.eventReplay && skipDuringEventReplay) {
return;
}
await client.query(`REFRESH MATERIALIZED VIEW ${viewName}`);
}
async getStxBalance({
stxAddress,
includeUnanchored,
@@ -5099,35 +5128,55 @@ export class PgDataStore
atSingleBlock = false;
queryParams.push(blockHeight);
}
const resultQuery = await client.query<TxQueryResult & { count: number }>(
`
WITH principal_txs AS (
WITH event_txs AS (
SELECT tx_id FROM stx_events WHERE stx_events.sender = $1 OR stx_events.recipient = $1
const principal = isValidPrincipal(args.stxAddress);
if (!principal) {
return { results: [], total: 0 };
}
// Smart contracts with a very high tx volume get frequent requests for the last N tx, where N
// is commonly <= 50. We'll query a materialized view if this is the case.
const useMaterializedView =
principal.type == 'contractAddress' && !atSingleBlock && args.limit + args.offset <= 50;
const resultQuery = useMaterializedView
? await client.query<TxQueryResult & { count: number }>(
`
SELECT ${TX_COLUMNS}, (COUNT(*) OVER())::integer as count
FROM latest_contract_txs
WHERE contract_id = $1 AND block_height <= $4
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC
LIMIT $2
OFFSET $3
`,
queryParams
)
SELECT *
FROM txs
WHERE canonical = true AND microblock_canonical = true AND (
sender_address = $1 OR
token_transfer_recipient_address = $1 OR
contract_call_contract_id = $1 OR
smart_contract_contract_id = $1
)
UNION
SELECT txs.* FROM txs
INNER JOIN event_txs
ON txs.tx_id = event_txs.tx_id
WHERE txs.canonical = true AND txs.microblock_canonical = true
)
SELECT ${TX_COLUMNS}, (COUNT(*) OVER())::integer as count
FROM principal_txs
${atSingleBlock ? 'WHERE block_height = $4' : 'WHERE block_height <= $4'}
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC
LIMIT $2
OFFSET $3
`,
queryParams
);
: await client.query<TxQueryResult & { count: number }>(
`
WITH principal_txs AS (
WITH event_txs AS (
SELECT tx_id FROM stx_events WHERE stx_events.sender = $1 OR stx_events.recipient = $1
)
SELECT *
FROM txs
WHERE canonical = true AND microblock_canonical = true AND (
sender_address = $1 OR
token_transfer_recipient_address = $1 OR
contract_call_contract_id = $1 OR
smart_contract_contract_id = $1
)
UNION
SELECT txs.* FROM txs
INNER JOIN event_txs
ON txs.tx_id = event_txs.tx_id
WHERE txs.canonical = true AND txs.microblock_canonical = true
)
SELECT ${TX_COLUMNS}, (COUNT(*) OVER())::integer as count
FROM principal_txs
${atSingleBlock ? 'WHERE block_height = $4' : 'WHERE block_height <= $4'}
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC
LIMIT $2
OFFSET $3
`,
queryParams
);
const count = resultQuery.rowCount > 0 ? resultQuery.rows[0].count : 0;
const parsed = resultQuery.rows.map(r => this.parseTxQueryResult(r));
return { results: parsed, total: count };
@@ -6657,7 +6706,7 @@ export class PgDataStore
);
return result.rowCount;
});
this.notifier?.sendTokens({ contractID: contract_id });
await this.notifier?.sendTokens({ contractID: contract_id });
return rowCount;
}
@@ -6703,7 +6752,7 @@ export class PgDataStore
);
return result.rowCount;
});
this.notifier?.sendTokens({ contractID: contract_id });
await this.notifier?.sendTokens({ contractID: contract_id });
return rowCount;
}
@@ -6797,8 +6846,8 @@ export class PgDataStore
return;
}
await this.queryTx(async client => {
// Refresh postgres materialized views.
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
await this.refreshMaterializedView(client, 'nft_custody', false);
await this.refreshMaterializedView(client, 'latest_contract_txs', false);
});
}

View File

@@ -16,6 +16,7 @@ import {
NpmConfigSetLevels,
SyslogConfigSetLevels,
} from 'winston/lib/winston/config';
import { DbStxEvent, DbTx } from './datastore/common';
export const isDevEnv = process.env.NODE_ENV === 'development';
export const isTestEnv = process.env.NODE_ENV === 'test';
@@ -956,3 +957,30 @@ export function getSendManyContract(chainId: ChainID) {
: process.env.TESTNET_SEND_MANY_CONTRACT_ID;
return contractId;
}
/**
* Determines if a transaction involved a smart contract.
* @param dbTx - Transaction DB entry
* @param stxEvents - Associated STX Events for this tx
* @returns true if tx involved a smart contract, false otherwise
*/
export function isSmartContractTx(dbTx: DbTx, stxEvents: DbStxEvent[] = []): boolean {
if (
dbTx.smart_contract_contract_id ||
dbTx.contract_call_contract_id ||
isValidContractName(dbTx.sender_address) ||
(dbTx.token_transfer_recipient_address &&
isValidContractName(dbTx.token_transfer_recipient_address))
) {
return true;
}
for (const stxEvent of stxEvents) {
if (
(stxEvent.sender && isValidContractName(stxEvent.sender)) ||
(stxEvent.recipient && isValidContractName(stxEvent.recipient))
) {
return true;
}
}
return false;
}

View File

@@ -0,0 +1,66 @@
/* eslint-disable @typescript-eslint/camelcase */
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';
export const shorthands: ColumnDefinitions | undefined = undefined;
export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createMaterializedView('latest_contract_txs', {}, `
WITH contract_txs AS (
SELECT
contract_call_contract_id AS contract_id, tx_id,
block_height, microblock_sequence, tx_index
FROM txs
WHERE
contract_call_contract_id IS NOT NULL
AND canonical = TRUE
AND microblock_canonical = TRUE
UNION
SELECT
smart_contract_contract_id AS contract_id, tx_id,
block_height, microblock_sequence, tx_index
FROM txs
WHERE
smart_contract_contract_id IS NOT NULL
AND canonical = TRUE
AND microblock_canonical = TRUE
UNION
SELECT
sender_address AS contract_id, tx_id,
block_height, microblock_sequence, tx_index
FROM txs
WHERE
sender_address LIKE '%.%'
AND canonical = TRUE
AND microblock_canonical = TRUE
UNION
SELECT
token_transfer_recipient_address AS contract_id, tx_id,
block_height, microblock_sequence, tx_index
FROM txs
WHERE
token_transfer_recipient_address LIKE '%.%'
AND canonical = TRUE
AND microblock_canonical = TRUE
),
numbered_txs AS (
SELECT
ROW_NUMBER() OVER (
PARTITION BY contract_id
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC
) AS r,
contract_txs.*
FROM contract_txs
)
SELECT numbered_txs.contract_id, txs.*
FROM numbered_txs
INNER JOIN txs USING (tx_id)
WHERE numbered_txs.r <= 50
`);
pgm.createIndex('latest_contract_txs', 'contract_id');
pgm.createIndex('latest_contract_txs', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC'},
{ name: 'tx_index', sort: 'DESC' }
]);
}

View File

@@ -39,6 +39,7 @@ import {
DbRewardSlotHolder,
DbMinerReward,
DbTokenOfferingLocked,
DataStoreTxEventData,
} from '../datastore/common';
import { startApiServer, ApiServer } from '../api/init';
import { PgDataStore, cycleMigrations, runMigrations } from '../datastore/postgres-store';
@@ -2901,7 +2902,7 @@ describe('api tests', () => {
parent_block_hash: '0x5678',
parent_microblock_hash: '',
parent_microblock_sequence: 0,
block_height: 100123123,
block_height: 1,
burn_block_time: 39486,
burn_block_hash: '0x1234',
burn_block_height: 100123123,
@@ -2913,7 +2914,6 @@ describe('api tests', () => {
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
await db.updateBlock(client, block);
let indexIdIndex = 0;
const createStxTx = (
@@ -2928,10 +2928,10 @@ describe('api tests', () => {
anchor_mode: 3,
nonce: 0,
raw_tx: Buffer.alloc(0),
index_block_hash: '0x5432',
block_hash: '0x9876',
block_height: 68456,
burn_block_time: 1594647994,
index_block_hash: block.index_block_hash,
block_hash: block.block_hash,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
parent_burn_block_time: 1626122935,
type_id: DbTxTypeId.TokenTransfer,
token_transfer_amount: BigInt(amount),
@@ -2969,9 +2969,6 @@ describe('api tests', () => {
createStxTx(testContractAddr, testAddr4, 15),
createStxTx(testAddr2, testAddr4, 35),
];
for (const tx of txs) {
await db.updateTx(client, tx);
}
const tx: DbTx = {
tx_id: '0x1234',
@@ -2979,10 +2976,10 @@ describe('api tests', () => {
anchor_mode: 3,
nonce: 0,
raw_tx: Buffer.alloc(0),
index_block_hash: '0x5432',
block_hash: '0x9876',
block_height: 68456,
burn_block_time: 1594647994,
index_block_hash: block.index_block_hash,
block_hash: block.block_hash,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
parent_burn_block_time: 1626122935,
type_id: DbTxTypeId.Coinbase,
coinbase_payload: Buffer.from('coinbase hi'),
@@ -3035,9 +3032,6 @@ describe('api tests', () => {
createStxEvent(testContractAddr, testAddr4, 15),
createStxEvent(testAddr2, testAddr4, 35),
];
for (const event of events) {
await db.updateStxEvent(client, tx, event);
}
const createFtEvent = (
sender: string,
@@ -3077,9 +3071,6 @@ describe('api tests', () => {
createFtEvent(testAddr1, testAddr2, 'cash', 500_000),
createFtEvent(testAddr2, testAddr1, 'tendies', 1_000_000),
];
for (const event of ftEvents) {
await db.updateFtEvent(client, tx, event);
}
const createNFtEvents = (
sender: string,
@@ -3123,14 +3114,42 @@ describe('api tests', () => {
createNFtEvents(testAddr1, testAddr2, 'cash', 500),
createNFtEvents(testAddr2, testAddr1, 'tendies', 100),
];
for (const event of nftEvents.flat()) {
await db.updateNftEvent(client, tx, event);
}
const dataStoreTxs = txs.map(dbTx => {
return {
tx: dbTx,
stxLockEvents: [],
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
} as DataStoreTxEventData;
});
dataStoreTxs.push({
tx: tx,
stxLockEvents: [],
stxEvents: events,
ftEvents: ftEvents,
nftEvents: nftEvents.flat(),
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
});
await db.update({
block: block,
microblocks: [],
minerRewards: [],
txs: dataStoreTxs,
});
const tokenOfferingLocked: DbTokenOfferingLocked = {
address: testAddr2,
value: BigInt(4139394444),
block: 33477,
block: 1,
};
await db.updateBatchTokenOfferingLocked(client, [tokenOfferingLocked]);
@@ -3170,7 +3189,7 @@ describe('api tests', () => {
unlock_schedule: [
{
amount: '4139394444',
block_height: 33477,
block_height: 1,
},
],
},
@@ -3209,7 +3228,7 @@ describe('api tests', () => {
const tokenLocked: DbTokenOfferingLocked = {
address: testContractAddr,
value: BigInt(4139391122),
block: 20477,
block: 1,
};
await db.updateBatchTokenOfferingLocked(client, [tokenLocked]);
@@ -3235,7 +3254,7 @@ describe('api tests', () => {
unlock_schedule: [
{
amount: '4139391122',
block_height: 20477,
block_height: 1,
},
],
},
@@ -3377,10 +3396,10 @@ describe('api tests', () => {
sponsored: false,
post_condition_mode: 'allow',
post_conditions: [],
block_hash: '0x9876',
block_height: 68456,
burn_block_time: 1594647994,
burn_block_time_iso: '2020-07-13T13:46:34.000Z',
block_hash: '0x1234',
block_height: 1,
burn_block_time: 39486,
burn_block_time_iso: '1970-01-01T10:58:06.000Z',
canonical: true,
microblock_canonical: true,
microblock_hash: '',
@@ -3418,10 +3437,10 @@ describe('api tests', () => {
sponsored: false,
post_condition_mode: 'allow',
post_conditions: [],
block_hash: '0x9876',
block_height: 68456,
burn_block_time: 1594647994,
burn_block_time_iso: '2020-07-13T13:46:34.000Z',
block_hash: '0x1234',
block_height: 1,
burn_block_time: 39486,
burn_block_time_iso: '1970-01-01T10:58:06.000Z',
canonical: true,
microblock_canonical: true,
microblock_hash: '',
@@ -3459,10 +3478,10 @@ describe('api tests', () => {
sponsored: false,
post_condition_mode: 'allow',
post_conditions: [],
block_hash: '0x9876',
block_height: 68456,
burn_block_time: 1594647994,
burn_block_time_iso: '2020-07-13T13:46:34.000Z',
block_hash: '0x1234',
block_height: 1,
burn_block_time: 39486,
burn_block_time_iso: '1970-01-01T10:58:06.000Z',
canonical: true,
microblock_canonical: true,
microblock_hash: '',