mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: insert block transaction data in batches (#1760)
* chore: first pass of batching * fix: tests * refactor: rename bns tx header type * chore: more refactors * chore: upgrade toolkit * chore: upgrade api toolkit * fix: do not wait for pg notifications * fix: parquet function calls * fix: bns test imports * fix: batch size * fix: pox event writes * chore: reorganize pox3 unlock height
This commit is contained in:
74
package-lock.json
generated
74
package-lock.json
generated
@@ -10,7 +10,7 @@
|
||||
"license": "GPL-3.0",
|
||||
"dependencies": {
|
||||
"@apidevtools/json-schema-ref-parser": "9.0.9",
|
||||
"@hirosystems/api-toolkit": "1.2.2",
|
||||
"@hirosystems/api-toolkit": "1.3.3",
|
||||
"@promster/express": "6.0.0",
|
||||
"@promster/server": "6.0.6",
|
||||
"@promster/types": "3.2.3",
|
||||
@@ -128,48 +128,6 @@
|
||||
"utf-8-validate": "5.0.7"
|
||||
}
|
||||
},
|
||||
"../api-toolkit": {
|
||||
"name": "@hirosystems/api-toolkit",
|
||||
"version": "1.1.0",
|
||||
"extraneous": true,
|
||||
"license": "Apache 2.0",
|
||||
"dependencies": {
|
||||
"@fastify/cors": "^8.0.0",
|
||||
"@fastify/swagger": "^8.3.1",
|
||||
"@fastify/type-provider-typebox": "^3.2.0",
|
||||
"fastify": "^4.3.0",
|
||||
"fastify-metrics": "^10.2.0",
|
||||
"node-pg-migrate": "^6.2.2",
|
||||
"pino": "^8.11.0",
|
||||
"postgres": "^3.3.4"
|
||||
},
|
||||
"bin": {
|
||||
"api-toolkit-git-info": "bin/api-toolkit-git-info.js"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@commitlint/cli": "^17.5.0",
|
||||
"@commitlint/config-conventional": "^17.4.4",
|
||||
"@stacks/eslint-config": "^1.2.0",
|
||||
"@types/jest": "^29.5.0",
|
||||
"@typescript-eslint/eslint-plugin": "^5.56.0",
|
||||
"@typescript-eslint/parser": "^5.56.0",
|
||||
"babel-jest": "^29.5.0",
|
||||
"copyfiles": "^2.4.1",
|
||||
"eslint": "^8.36.0",
|
||||
"eslint-plugin-prettier": "^4.2.1",
|
||||
"eslint-plugin-tsdoc": "^0.2.17",
|
||||
"husky": "^8.0.3",
|
||||
"jest": "^29.5.0",
|
||||
"prettier": "^2.8.6",
|
||||
"rimraf": "^4.4.1",
|
||||
"ts-jest": "^29.0.5",
|
||||
"ts-node": "^10.9.1",
|
||||
"typescript": "^5.0.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18"
|
||||
}
|
||||
},
|
||||
"node_modules/@aashutoshrathi/word-wrap": {
|
||||
"version": "1.2.6",
|
||||
"resolved": "https://registry.npmjs.org/@aashutoshrathi/word-wrap/-/word-wrap-1.2.6.tgz",
|
||||
@@ -1234,9 +1192,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@fastify/cors": {
|
||||
"version": "8.4.0",
|
||||
"resolved": "https://registry.npmjs.org/@fastify/cors/-/cors-8.4.0.tgz",
|
||||
"integrity": "sha512-MlVvMTenltToByTpLwlWtO+7dQ3l2J+1OpmGrx9JpSNWo1d+dhfNCOi23zHhxdFhtpDzfwGwCsKu9DTeG7k7nQ==",
|
||||
"version": "8.4.1",
|
||||
"resolved": "https://registry.npmjs.org/@fastify/cors/-/cors-8.4.1.tgz",
|
||||
"integrity": "sha512-iYQJtrY3pFiDS5mo5zRaudzg2OcUdJ96PD6xfkKOOEilly5nnrFZx/W6Sce2T79xxlEn2qpU3t5+qS2phS369w==",
|
||||
"dependencies": {
|
||||
"fastify-plugin": "^4.0.0",
|
||||
"mnemonist": "0.39.5"
|
||||
@@ -1273,9 +1231,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@fastify/swagger/node_modules/yaml": {
|
||||
"version": "2.3.3",
|
||||
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.3.tgz",
|
||||
"integrity": "sha512-zw0VAJxgeZ6+++/su5AFoqBbZbrEakwu+X0M5HmcwUiBL7AzcuPKjj5we4xfQLp78LkEMpD0cOnUhmgOVy3KdQ==",
|
||||
"version": "2.3.4",
|
||||
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.4.tgz",
|
||||
"integrity": "sha512-8aAvwVUSHpfEqTQ4w/KMlf3HcRdt50E5ODIQJBw1fQ5RL34xabzxtUlzTXVqc4rkZsPbvrXKWnABCD7kWSmocA==",
|
||||
"engines": {
|
||||
"node": ">= 14"
|
||||
}
|
||||
@@ -1289,9 +1247,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@hirosystems/api-toolkit": {
|
||||
"version": "1.2.2",
|
||||
"resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.2.2.tgz",
|
||||
"integrity": "sha512-HLaodPN6dHUAkrOwECsMwQicBMhJn8CYlS0QwC4SC3smclno3fB0oKm1QYZFQEDy2KV3IPTAh70B/vkpib/Kkw==",
|
||||
"version": "1.3.3",
|
||||
"resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.3.3.tgz",
|
||||
"integrity": "sha512-0/JjQ54twLdVqf8+hB+8IAKn8JdCdlMfT3BqUWha5qMrjlC3KX+kAl+88+CqpoibY/lgYJ9fs+70KG/weHt3LQ==",
|
||||
"dependencies": {
|
||||
"@fastify/cors": "^8.0.0",
|
||||
"@fastify/swagger": "^8.3.1",
|
||||
@@ -1378,9 +1336,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@hirosystems/api-toolkit/node_modules/postgres": {
|
||||
"version": "3.4.2",
|
||||
"resolved": "https://registry.npmjs.org/postgres/-/postgres-3.4.2.tgz",
|
||||
"integrity": "sha512-0UcCGvDBSaAzLeWwiq+QVmiGfOPTosFb+sxJUUtd+7Pi/ByFPuz6Gq05LbS0sM1ghMWC5atuks3pfl34g0qmFw==",
|
||||
"version": "3.4.3",
|
||||
"resolved": "https://registry.npmjs.org/postgres/-/postgres-3.4.3.tgz",
|
||||
"integrity": "sha512-iHJn4+M9vbTdHSdDzNkC0crHq+1CUdFhx+YqCE+SqWxPjm+Zu63jq7yZborOBF64c8pc58O5uMudyL1FQcHacA==",
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
@@ -6964,9 +6922,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/fastify-metrics": {
|
||||
"version": "10.3.2",
|
||||
"resolved": "https://registry.npmjs.org/fastify-metrics/-/fastify-metrics-10.3.2.tgz",
|
||||
"integrity": "sha512-02SEIGH02zfguqRMho0LB8L7YVAj5cIgWM0iqZslIErqaUWc1iHVAOC+YXYG3S2DZU6VHdFaMyuxjEOCQHAETA==",
|
||||
"version": "10.3.3",
|
||||
"resolved": "https://registry.npmjs.org/fastify-metrics/-/fastify-metrics-10.3.3.tgz",
|
||||
"integrity": "sha512-TmMcfrMWBSbA7yk31tFtJnWKtNXLSO7jmTRIjPX9HKC4pLmyd0JnOQ3r9XCYnev6NL9/eVRXxNfrsqQdKTLZkw==",
|
||||
"dependencies": {
|
||||
"fastify-plugin": "^4.3.0",
|
||||
"prom-client": "^14.2.0"
|
||||
|
||||
@@ -85,7 +85,7 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@apidevtools/json-schema-ref-parser": "9.0.9",
|
||||
"@hirosystems/api-toolkit": "1.2.2",
|
||||
"@hirosystems/api-toolkit": "1.3.3",
|
||||
"@promster/express": "6.0.0",
|
||||
"@promster/server": "6.0.6",
|
||||
"@promster/types": "3.2.3",
|
||||
|
||||
@@ -632,6 +632,11 @@ export interface DataStoreBnsBlockData {
|
||||
microblock_canonical: boolean;
|
||||
}
|
||||
|
||||
export type DataStoreBnsBlockTxData = DataStoreBnsBlockData & {
|
||||
tx_id: string;
|
||||
tx_index: number;
|
||||
};
|
||||
|
||||
export interface DataStoreAttachmentSubdomainData {
|
||||
attachment?: DataStoreAttachmentData;
|
||||
blockData?: DataStoreBnsBlockData;
|
||||
@@ -976,7 +981,7 @@ export interface FaucetRequestQueryResult {
|
||||
occurred_at: string;
|
||||
}
|
||||
|
||||
export interface UpdatedEntities {
|
||||
export interface ReOrgUpdatedEntities {
|
||||
markedCanonical: {
|
||||
blocks: number;
|
||||
microblocks: number;
|
||||
|
||||
@@ -2,6 +2,7 @@ import { parseEnum, unwrapOptionalProp } from '../helpers';
|
||||
import {
|
||||
BlockQueryResult,
|
||||
ContractTxQueryResult,
|
||||
DataStoreBlockUpdateData,
|
||||
DbBlock,
|
||||
DbEvent,
|
||||
DbEventBase,
|
||||
@@ -42,6 +43,7 @@ import {
|
||||
PoxSyntheticEventQueryResult,
|
||||
TxQueryResult,
|
||||
DbPoxSyntheticRevokeDelegateStxEvent,
|
||||
ReOrgUpdatedEntities,
|
||||
} from './common';
|
||||
import {
|
||||
CoreNodeDropMempoolTxReasonType,
|
||||
@@ -1229,3 +1231,163 @@ export function convertTxQueryResultToDbMempoolTx(txs: TxQueryResult[]): DbMempo
|
||||
}
|
||||
return dbMempoolTxs;
|
||||
}
|
||||
|
||||
export function setTotalBlockUpdateDataExecutionCost(data: DataStoreBlockUpdateData) {
|
||||
const cost = data.txs.reduce(
|
||||
(previousValue, currentValue) => {
|
||||
const {
|
||||
execution_cost_read_count,
|
||||
execution_cost_read_length,
|
||||
execution_cost_runtime,
|
||||
execution_cost_write_count,
|
||||
execution_cost_write_length,
|
||||
} = previousValue;
|
||||
return {
|
||||
execution_cost_read_count:
|
||||
execution_cost_read_count + currentValue.tx.execution_cost_read_count,
|
||||
execution_cost_read_length:
|
||||
execution_cost_read_length + currentValue.tx.execution_cost_read_length,
|
||||
execution_cost_runtime: execution_cost_runtime + currentValue.tx.execution_cost_runtime,
|
||||
execution_cost_write_count:
|
||||
execution_cost_write_count + currentValue.tx.execution_cost_write_count,
|
||||
execution_cost_write_length:
|
||||
execution_cost_write_length + currentValue.tx.execution_cost_write_length,
|
||||
};
|
||||
},
|
||||
{
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
}
|
||||
);
|
||||
data.block.execution_cost_read_count = cost.execution_cost_read_count;
|
||||
data.block.execution_cost_read_length = cost.execution_cost_read_length;
|
||||
data.block.execution_cost_runtime = cost.execution_cost_runtime;
|
||||
data.block.execution_cost_write_count = cost.execution_cost_write_count;
|
||||
data.block.execution_cost_write_length = cost.execution_cost_write_length;
|
||||
}
|
||||
|
||||
export function markBlockUpdateDataAsNonCanonical(data: DataStoreBlockUpdateData): void {
|
||||
data.block = { ...data.block, canonical: false };
|
||||
data.microblocks = data.microblocks.map(mb => ({ ...mb, canonical: false }));
|
||||
data.txs = data.txs.map(tx => ({
|
||||
tx: { ...tx.tx, canonical: false },
|
||||
stxLockEvents: tx.stxLockEvents.map(e => ({ ...e, canonical: false })),
|
||||
stxEvents: tx.stxEvents.map(e => ({ ...e, canonical: false })),
|
||||
ftEvents: tx.ftEvents.map(e => ({ ...e, canonical: false })),
|
||||
nftEvents: tx.nftEvents.map(e => ({ ...e, canonical: false })),
|
||||
contractLogEvents: tx.contractLogEvents.map(e => ({ ...e, canonical: false })),
|
||||
smartContracts: tx.smartContracts.map(e => ({ ...e, canonical: false })),
|
||||
names: tx.names.map(e => ({ ...e, canonical: false })),
|
||||
namespaces: tx.namespaces.map(e => ({ ...e, canonical: false })),
|
||||
pox2Events: tx.pox2Events.map(e => ({ ...e, canonical: false })),
|
||||
pox3Events: tx.pox3Events.map(e => ({ ...e, canonical: false })),
|
||||
pox4Events: tx.pox4Events.map(e => ({ ...e, canonical: false })),
|
||||
}));
|
||||
data.minerRewards = data.minerRewards.map(mr => ({ ...mr, canonical: false }));
|
||||
}
|
||||
|
||||
export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
|
||||
return {
|
||||
markedCanonical: {
|
||||
blocks: 0,
|
||||
microblocks: 0,
|
||||
minerRewards: 0,
|
||||
txs: 0,
|
||||
stxLockEvents: 0,
|
||||
stxEvents: 0,
|
||||
ftEvents: 0,
|
||||
nftEvents: 0,
|
||||
pox2Events: 0,
|
||||
pox3Events: 0,
|
||||
pox4Events: 0,
|
||||
contractLogs: 0,
|
||||
smartContracts: 0,
|
||||
names: 0,
|
||||
namespaces: 0,
|
||||
subdomains: 0,
|
||||
},
|
||||
markedNonCanonical: {
|
||||
blocks: 0,
|
||||
microblocks: 0,
|
||||
minerRewards: 0,
|
||||
txs: 0,
|
||||
stxLockEvents: 0,
|
||||
stxEvents: 0,
|
||||
ftEvents: 0,
|
||||
nftEvents: 0,
|
||||
pox2Events: 0,
|
||||
pox3Events: 0,
|
||||
pox4Events: 0,
|
||||
contractLogs: 0,
|
||||
smartContracts: 0,
|
||||
names: 0,
|
||||
namespaces: 0,
|
||||
subdomains: 0,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function logReorgResultInfo(updatedEntities: ReOrgUpdatedEntities) {
|
||||
const updates = [
|
||||
['blocks', updatedEntities.markedCanonical.blocks, updatedEntities.markedNonCanonical.blocks],
|
||||
[
|
||||
'microblocks',
|
||||
updatedEntities.markedCanonical.microblocks,
|
||||
updatedEntities.markedNonCanonical.microblocks,
|
||||
],
|
||||
['txs', updatedEntities.markedCanonical.txs, updatedEntities.markedNonCanonical.txs],
|
||||
[
|
||||
'miner-rewards',
|
||||
updatedEntities.markedCanonical.minerRewards,
|
||||
updatedEntities.markedNonCanonical.minerRewards,
|
||||
],
|
||||
[
|
||||
'stx-lock events',
|
||||
updatedEntities.markedCanonical.stxLockEvents,
|
||||
updatedEntities.markedNonCanonical.stxLockEvents,
|
||||
],
|
||||
[
|
||||
'stx-token events',
|
||||
updatedEntities.markedCanonical.stxEvents,
|
||||
updatedEntities.markedNonCanonical.stxEvents,
|
||||
],
|
||||
[
|
||||
'non-fungible-token events',
|
||||
updatedEntities.markedCanonical.nftEvents,
|
||||
updatedEntities.markedNonCanonical.nftEvents,
|
||||
],
|
||||
[
|
||||
'fungible-token events',
|
||||
updatedEntities.markedCanonical.ftEvents,
|
||||
updatedEntities.markedNonCanonical.ftEvents,
|
||||
],
|
||||
[
|
||||
'contract logs',
|
||||
updatedEntities.markedCanonical.contractLogs,
|
||||
updatedEntities.markedNonCanonical.contractLogs,
|
||||
],
|
||||
[
|
||||
'smart contracts',
|
||||
updatedEntities.markedCanonical.smartContracts,
|
||||
updatedEntities.markedNonCanonical.smartContracts,
|
||||
],
|
||||
['names', updatedEntities.markedCanonical.names, updatedEntities.markedNonCanonical.names],
|
||||
[
|
||||
'namespaces',
|
||||
updatedEntities.markedCanonical.namespaces,
|
||||
updatedEntities.markedNonCanonical.namespaces,
|
||||
],
|
||||
[
|
||||
'subdomains',
|
||||
updatedEntities.markedCanonical.subdomains,
|
||||
updatedEntities.markedNonCanonical.subdomains,
|
||||
],
|
||||
];
|
||||
const markedCanonical = updates.map(e => `${e[1]} ${e[0]}`).join(', ');
|
||||
logger.debug(`Entities marked as canonical: ${markedCanonical}`);
|
||||
const markedNonCanonical = updates.map(e => `${e[2]} ${e[0]}`).join(', ');
|
||||
logger.debug(`Entities marked as non-canonical: ${markedNonCanonical}`);
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,14 +1,6 @@
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import * as fs from 'fs';
|
||||
import * as readline from 'readline';
|
||||
import { DataStoreBnsBlockData, DbTxTypeId } from '../datastore/common';
|
||||
import { DataStoreBnsBlockTxData, DbTxTypeId } from '../datastore/common';
|
||||
import { readLinesReversed } from './reverse-file-stream';
|
||||
import { CoreNodeBlockMessage } from '../event-stream/core-node-message';
|
||||
|
||||
export type BnsGenesisBlock = DataStoreBnsBlockData & {
|
||||
tx_id: string;
|
||||
tx_index: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Traverse a TSV file in reverse to find the last received `/new_block` node message and return
|
||||
@@ -40,7 +32,7 @@ export async function findTsvBlockHeight(filePath: string): Promise<number> {
|
||||
|
||||
export async function getBnsGenesisBlockFromBlockMessage(
|
||||
db: PgWriteStore
|
||||
): Promise<BnsGenesisBlock> {
|
||||
): Promise<DataStoreBnsBlockTxData> {
|
||||
const genesisBlock = await db.getBlock({ height: 1 });
|
||||
if (!genesisBlock.found) {
|
||||
throw new Error('Could not find genesis block');
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
import { Readable, Writable } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { batchIterate } from '../../../helpers';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseAttachment } from '../../../event-stream/event-server';
|
||||
import { logger } from '../../../logger';
|
||||
@@ -11,6 +10,7 @@ import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-messa
|
||||
import { DataStoreAttachmentSubdomainData } from '../../../datastore/common';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { I32_MAX } from '../../../helpers';
|
||||
import { batchIterate } from '@hirosystems/api-toolkit';
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
|
||||
@@ -17,9 +17,10 @@ import {
|
||||
} from '../../../datastore/common';
|
||||
import { validateZonefileHash } from '../../../datastore/helpers';
|
||||
import { logger } from '../../../logger';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { getApiConfiguredChainID } from '../../../helpers';
|
||||
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { batchIterate } from '@hirosystems/api-toolkit';
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
@@ -361,9 +362,7 @@ const populateBatchInserters = (db: PgWriteStore) => {
|
||||
|
||||
const insertSmartContracts = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const smartContract of entry.smartContracts) {
|
||||
await db.updateSmartContract(db.sql, entry.tx, smartContract);
|
||||
}
|
||||
await db.updateSmartContracts(db.sql, entry.tx, entry.smartContracts);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -377,39 +376,29 @@ const populateBatchInserters = (db: PgWriteStore) => {
|
||||
|
||||
const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const stxLockEvent of entry.stxLockEvents) {
|
||||
await db.updateStxLockEvent(db.sql, entry.tx, stxLockEvent);
|
||||
}
|
||||
await db.updateStxLockEvents(db.sql, entry.tx, entry.stxLockEvents);
|
||||
}
|
||||
};
|
||||
|
||||
const insertMinerRewards = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const minerReward of dbData.minerRewards) {
|
||||
await db.updateMinerReward(db.sql, minerReward);
|
||||
}
|
||||
await db.updateMinerRewards(db.sql, dbData.minerRewards);
|
||||
};
|
||||
|
||||
const insertPox2Events = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const pox2Event of entry.pox2Events) {
|
||||
await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox2_events', pox2Event);
|
||||
}
|
||||
await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox2_events', entry.pox2Events);
|
||||
}
|
||||
};
|
||||
|
||||
const insertPox3Events = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const pox3Event of entry.pox3Events) {
|
||||
await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox3_events', pox3Event);
|
||||
}
|
||||
await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox3_events', entry.pox3Events);
|
||||
}
|
||||
};
|
||||
|
||||
const insertPox4Events = async (dbData: DataStoreBlockUpdateData) => {
|
||||
for (const entry of dbData.txs) {
|
||||
for (const pox4Event of entry.pox4Events) {
|
||||
await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox4_events', pox4Event);
|
||||
}
|
||||
await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox4_events', entry.pox4Events);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -3,8 +3,8 @@ import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { RawEventRequestInsertValues } from '../../../datastore/common';
|
||||
import { logger } from '../../../logger';
|
||||
import { batchIterate } from '../../../helpers';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { batchIterate } from '@hirosystems/api-toolkit';
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
|
||||
@@ -440,71 +440,6 @@ export function assertNotNullish<T>(
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate over an array, yielding multiple items at a time. If the size of the given array
|
||||
* is not divisible by the given batch size, then the length of the last items returned will
|
||||
* be smaller than the given batch size, i.e.:
|
||||
* ```typescript
|
||||
* items.length % batchSize
|
||||
* ```
|
||||
* @param items - The array to iterate over.
|
||||
* @param batchSize - Maximum number of items to return at a time.
|
||||
*/
|
||||
export function* batchIterate<T>(
|
||||
items: T[],
|
||||
batchSize: number,
|
||||
printBenchmark = isDevEnv
|
||||
): Generator<T[]> {
|
||||
if (items.length === 0) {
|
||||
return;
|
||||
}
|
||||
const startTime = Date.now();
|
||||
for (let i = 0; i < items.length; ) {
|
||||
const itemsRemaining = items.length - i;
|
||||
const sliceSize = Math.min(batchSize, itemsRemaining);
|
||||
yield items.slice(i, i + sliceSize);
|
||||
i += sliceSize;
|
||||
}
|
||||
|
||||
if (printBenchmark) {
|
||||
const itemsPerSecond = Math.round((items.length / (Date.now() - startTime)) * 1000);
|
||||
const caller = new Error().stack?.split('at ')[3].trim();
|
||||
logger.debug(`Iterated ${itemsPerSecond} items/second at ${caller}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function* asyncBatchIterate<T>(
|
||||
items: AsyncIterable<T>,
|
||||
batchSize: number,
|
||||
printBenchmark = isDevEnv
|
||||
): AsyncGenerator<T[], void, unknown> {
|
||||
const startTime = Date.now();
|
||||
let itemCount = 0;
|
||||
let itemBatch: T[] = [];
|
||||
for await (const item of items) {
|
||||
itemBatch.push(item);
|
||||
itemCount++;
|
||||
if (itemBatch.length >= batchSize) {
|
||||
yield itemBatch;
|
||||
itemBatch = [];
|
||||
if (printBenchmark) {
|
||||
const itemsPerSecond = Math.round((itemCount / (Date.now() - startTime)) * 1000);
|
||||
const caller = new Error().stack?.split('at ')[3].trim();
|
||||
logger.debug(`Iterated ${itemsPerSecond} items/second at ${caller}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (itemBatch.length > 0) {
|
||||
yield itemBatch;
|
||||
}
|
||||
}
|
||||
|
||||
export async function* asyncIterableToGenerator<T>(iter: AsyncIterable<T>) {
|
||||
for await (const entry of iter) {
|
||||
yield entry;
|
||||
}
|
||||
}
|
||||
|
||||
function intMax(args: bigint[]): bigint;
|
||||
function intMax(args: number[]): number;
|
||||
function intMax(args: bigint[] | number[]): any {
|
||||
|
||||
@@ -8,17 +8,18 @@ import * as zlib from 'zlib';
|
||||
import { bitcoinToStacksAddress } from 'stacks-encoding-native-js';
|
||||
import * as split2 from 'split2';
|
||||
import {
|
||||
DataStoreBnsBlockTxData,
|
||||
DbBnsName,
|
||||
DbBnsNamespace,
|
||||
DbBnsSubdomain,
|
||||
DbConfigState,
|
||||
DbTokenOfferingLocked,
|
||||
} from '../datastore/common';
|
||||
import { asyncBatchIterate, asyncIterableToGenerator, I32_MAX, REPO_DIR } from '../helpers';
|
||||
import { BnsGenesisBlock, getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers';
|
||||
import { REPO_DIR } from '../helpers';
|
||||
import { getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { logger } from '../logger';
|
||||
import { PgSqlClient } from '@hirosystems/api-toolkit';
|
||||
import { PgSqlClient, asyncBatchIterate, asyncIterableToGenerator } from '@hirosystems/api-toolkit';
|
||||
|
||||
const finished = util.promisify(stream.finished);
|
||||
const pipeline = util.promisify(stream.pipeline);
|
||||
@@ -78,13 +79,13 @@ class ChainProcessor extends stream.Writable {
|
||||
namespace: Map<string, DbBnsNamespace>;
|
||||
db: PgWriteStore;
|
||||
sql: PgSqlClient;
|
||||
genesisBlock: BnsGenesisBlock;
|
||||
genesisBlock: DataStoreBnsBlockTxData;
|
||||
|
||||
constructor(
|
||||
sql: PgSqlClient,
|
||||
db: PgWriteStore,
|
||||
zhashes: Map<string, string>,
|
||||
genesisBlock: BnsGenesisBlock
|
||||
genesisBlock: DataStoreBnsBlockTxData
|
||||
) {
|
||||
super();
|
||||
this.zhashes = zhashes;
|
||||
@@ -159,7 +160,7 @@ class ChainProcessor extends stream.Writable {
|
||||
canonical: true,
|
||||
status: 'name-register',
|
||||
};
|
||||
await this.db.updateNames(this.sql, this.genesisBlock, obj);
|
||||
await this.db.updateNames(this.sql, this.genesisBlock, [obj]);
|
||||
this.rowCount += 1;
|
||||
if (obj.zonefile === '') {
|
||||
logger.debug(
|
||||
@@ -186,7 +187,7 @@ class ChainProcessor extends stream.Writable {
|
||||
canonical: true,
|
||||
};
|
||||
this.namespace.set(obj.namespace_id, obj);
|
||||
await this.db.updateNamespaces(this.sql, this.genesisBlock, obj);
|
||||
await this.db.updateNamespaces(this.sql, this.genesisBlock, [obj]);
|
||||
this.rowCount += 1;
|
||||
}
|
||||
}
|
||||
@@ -230,9 +231,9 @@ function btcToStxAddress(btcAddress: string) {
|
||||
}
|
||||
|
||||
class SubdomainTransform extends stream.Transform {
|
||||
genesisBlock: BnsGenesisBlock;
|
||||
genesisBlock: DataStoreBnsBlockTxData;
|
||||
|
||||
constructor(genesisBlock: BnsGenesisBlock) {
|
||||
constructor(genesisBlock: DataStoreBnsBlockTxData) {
|
||||
super({ objectMode: true, highWaterMark: SUBDOMAIN_BATCH_SIZE });
|
||||
this.genesisBlock = genesisBlock;
|
||||
}
|
||||
@@ -304,7 +305,7 @@ async function valid(fileName: string): Promise<boolean> {
|
||||
return true;
|
||||
}
|
||||
|
||||
async function* readSubdomains(importDir: string, genesisBlock: BnsGenesisBlock) {
|
||||
async function* readSubdomains(importDir: string, genesisBlock: DataStoreBnsBlockTxData) {
|
||||
const metaIter = asyncIterableToGenerator<DbBnsSubdomain>(
|
||||
stream.pipeline(
|
||||
fs.createReadStream(path.join(importDir, 'subdomains.csv')),
|
||||
@@ -416,7 +417,7 @@ async function validateBnsImportDir(importDir: string, importFiles: string[]) {
|
||||
export async function importV1BnsNames(
|
||||
db: PgWriteStore,
|
||||
importDir: string,
|
||||
genesisBlock: BnsGenesisBlock
|
||||
genesisBlock: DataStoreBnsBlockTxData
|
||||
) {
|
||||
const configState = await db.getConfigState();
|
||||
if (configState.bns_names_onchain_imported) {
|
||||
@@ -444,7 +445,7 @@ export async function importV1BnsNames(
|
||||
export async function importV1BnsSubdomains(
|
||||
db: PgWriteStore,
|
||||
importDir: string,
|
||||
genesisBlock: BnsGenesisBlock
|
||||
genesisBlock: DataStoreBnsBlockTxData
|
||||
) {
|
||||
const configState = await db.getConfigState();
|
||||
if (configState.bns_subdomains_imported) {
|
||||
|
||||
@@ -129,8 +129,10 @@ describe('BNS API tests', () => {
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
tx_id: '',
|
||||
tx_index: 0,
|
||||
},
|
||||
namespace
|
||||
[namespace]
|
||||
);
|
||||
const namespace2: DbBnsNamespace = {
|
||||
namespace_id: 'blockstack',
|
||||
@@ -157,8 +159,10 @@ describe('BNS API tests', () => {
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
tx_id: '',
|
||||
tx_index: 0,
|
||||
},
|
||||
namespace2
|
||||
[namespace2]
|
||||
);
|
||||
});
|
||||
|
||||
@@ -434,8 +438,10 @@ describe('BNS API tests', () => {
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
tx_id: '',
|
||||
tx_index: 0,
|
||||
},
|
||||
dbName
|
||||
[dbName]
|
||||
);
|
||||
|
||||
const query1 = await supertest(api.server).get(`/v1/names/invalid/zonefile/${zonefileHash}`);
|
||||
@@ -531,8 +537,10 @@ describe('BNS API tests', () => {
|
||||
microblock_hash: '',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
tx_id: '',
|
||||
tx_index: 0,
|
||||
},
|
||||
dbName2
|
||||
[dbName2]
|
||||
);
|
||||
|
||||
const query1 = await supertest(api.server).get(`/v1/addresses/${blockchain}/${address}`);
|
||||
|
||||
@@ -4,8 +4,7 @@ import { ChainID } from '@stacks/transactions';
|
||||
import { importV1BnsNames, importV1BnsSubdomains } from '../import-v1';
|
||||
import * as assert from 'assert';
|
||||
import { TestBlockBuilder } from '../test-utils/test-builders';
|
||||
import { DataStoreBlockUpdateData } from '../datastore/common';
|
||||
import { BnsGenesisBlock } from '../event-replay/helpers';
|
||||
import { DataStoreBlockUpdateData, DataStoreBnsBlockTxData } from '../datastore/common';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { migrate } from '../test-utils/test-helpers';
|
||||
|
||||
@@ -30,7 +29,7 @@ describe('BNS V1 import', () => {
|
||||
});
|
||||
|
||||
test('v1-import', async () => {
|
||||
const genesis: BnsGenesisBlock = {
|
||||
const genesis: DataStoreBnsBlockTxData = {
|
||||
index_block_hash: block.block.index_block_hash,
|
||||
parent_index_block_hash: block.block.parent_index_block_hash,
|
||||
microblock_canonical: true,
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
DbBnsSubdomain,
|
||||
DbTokenOfferingLocked,
|
||||
DbTx,
|
||||
DataStoreBnsBlockTxData,
|
||||
} from '../datastore/common';
|
||||
import { getBlocksWithMetadata, parseDbEvent } from '../api/controllers/db-controller';
|
||||
import * as assert from 'assert';
|
||||
@@ -95,9 +96,7 @@ describe('postgres datastore', () => {
|
||||
createMinerReward('addrB', 0n, 30n, 40n, 7n),
|
||||
createMinerReward('addrB', 99999n, 92n, 93n, 0n, false),
|
||||
];
|
||||
for (const reward of minerRewards) {
|
||||
await db.updateMinerReward(client, reward);
|
||||
}
|
||||
await db.updateMinerRewards(client, minerRewards);
|
||||
|
||||
const tx: DbTxRaw = {
|
||||
tx_id: '0x1234',
|
||||
@@ -196,9 +195,7 @@ describe('postgres datastore', () => {
|
||||
createStxLockEvent('addrA', 222n, 1),
|
||||
createStxLockEvent('addrB', 333n, 1),
|
||||
];
|
||||
for (const stxLockEvent of stxLockEvents) {
|
||||
await db.updateStxLockEvent(client, tx, stxLockEvent);
|
||||
}
|
||||
await db.updateStxLockEvents(client, tx, stxLockEvents);
|
||||
await db.updateTx(client, tx);
|
||||
await db.updateTx(client, tx2);
|
||||
|
||||
@@ -3515,9 +3512,7 @@ describe('postgres datastore', () => {
|
||||
}
|
||||
|
||||
// insert miner rewards directly
|
||||
for (const minerReward of [minerReward1]) {
|
||||
await db.updateMinerReward(client, minerReward);
|
||||
}
|
||||
await db.updateMinerRewards(client, [minerReward1]);
|
||||
|
||||
// insert txs directly
|
||||
for (const tx of [tx1, tx2]) {
|
||||
@@ -3525,9 +3520,7 @@ describe('postgres datastore', () => {
|
||||
}
|
||||
|
||||
// insert stx lock events directly
|
||||
for (const event of [stxLockEvent1]) {
|
||||
await db.updateStxLockEvent(client, tx1, event);
|
||||
}
|
||||
await db.updateStxLockEvents(client, tx1, [stxLockEvent1]);
|
||||
|
||||
const block5: DbBlock = {
|
||||
block_hash: '0x55',
|
||||
@@ -4616,8 +4609,8 @@ describe('postgres datastore', () => {
|
||||
microblock_hash: '0x00',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
},
|
||||
namespace
|
||||
} as DataStoreBnsBlockTxData,
|
||||
[namespace]
|
||||
);
|
||||
const { results } = await db.getNamespaceList({ includeUnanchored: false });
|
||||
expect(results.length).toBe(1);
|
||||
@@ -4672,8 +4665,8 @@ describe('postgres datastore', () => {
|
||||
microblock_hash: '0x00',
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_canonical: true,
|
||||
},
|
||||
name
|
||||
} as DataStoreBnsBlockTxData,
|
||||
[name]
|
||||
);
|
||||
const { results } = await db.getNamespaceNamesList({
|
||||
namespace: 'abc',
|
||||
|
||||
@@ -183,7 +183,7 @@ describe('other tests', () => {
|
||||
tx_fees_streamed_confirmed: 2_000_000_000_000n,
|
||||
tx_fees_streamed_produced: 3_000_000_000_000n,
|
||||
};
|
||||
await db.updateMinerReward(client, minerReward1);
|
||||
await db.updateMinerRewards(client, [minerReward1]);
|
||||
const expectedTotalStx3 =
|
||||
stxMintEvent1.amount +
|
||||
stxMintEvent2.amount -
|
||||
|
||||
@@ -770,7 +770,7 @@ describe('search tests', () => {
|
||||
recipient: addr5,
|
||||
sender: 'none',
|
||||
};
|
||||
await db.updateFtEvent(client, stxTx1, ftEvent1);
|
||||
await db.updateFtEvents(client, stxTx1, [ftEvent1]);
|
||||
|
||||
// test address as a ft event recipient
|
||||
const searchResult5 = await supertest(api.server).get(`/extended/v1/search/${addr5}`);
|
||||
@@ -798,7 +798,7 @@ describe('search tests', () => {
|
||||
recipient: 'none',
|
||||
sender: addr6,
|
||||
};
|
||||
await db.updateFtEvent(client, stxTx1, ftEvent2);
|
||||
await db.updateFtEvents(client, stxTx1, [ftEvent2]);
|
||||
|
||||
// test address as a ft event sender
|
||||
const searchResult6 = await supertest(api.server).get(`/extended/v1/search/${addr6}`);
|
||||
@@ -826,7 +826,7 @@ describe('search tests', () => {
|
||||
recipient: addr7,
|
||||
sender: 'none',
|
||||
};
|
||||
await db.updateNftEvent(client, stxTx1, nftEvent1, false);
|
||||
await db.updateNftEvents(client, stxTx1, [nftEvent1], false);
|
||||
|
||||
// test address as a nft event recipient
|
||||
const searchResult7 = await supertest(api.server).get(`/extended/v1/search/${addr7}`);
|
||||
@@ -854,7 +854,7 @@ describe('search tests', () => {
|
||||
recipient: 'none',
|
||||
sender: addr8,
|
||||
};
|
||||
await db.updateNftEvent(client, stxTx1, nftEvent2, false);
|
||||
await db.updateNftEvents(client, stxTx1, [nftEvent2], false);
|
||||
|
||||
// test address as a nft event sender
|
||||
const searchResult8 = await supertest(api.server).get(`/extended/v1/search/${addr8}`);
|
||||
|
||||
@@ -1045,15 +1045,17 @@ describe('tx tests', () => {
|
||||
fungible_tokens: [],
|
||||
non_fungible_tokens: [],
|
||||
};
|
||||
await db.updateSmartContract(client, dbTx, {
|
||||
tx_id: dbTx.tx_id,
|
||||
canonical: true,
|
||||
clarity_version: null,
|
||||
contract_id: 'ST11NJTTKGVT6D1HY4NJRVQWMQM7TVAR091EJ8P2Y.hello-world',
|
||||
block_height: dbBlock.block_height,
|
||||
source_code: '()',
|
||||
abi: JSON.stringify(contractAbi),
|
||||
});
|
||||
await db.updateSmartContracts(client, dbTx, [
|
||||
{
|
||||
tx_id: dbTx.tx_id,
|
||||
canonical: true,
|
||||
clarity_version: null,
|
||||
contract_id: 'ST11NJTTKGVT6D1HY4NJRVQWMQM7TVAR091EJ8P2Y.hello-world',
|
||||
block_height: dbBlock.block_height,
|
||||
source_code: '()',
|
||||
abi: JSON.stringify(contractAbi),
|
||||
},
|
||||
]);
|
||||
const txQuery = await getTxFromDataStore(db, { txId: dbTx.tx_id, includeUnanchored: false });
|
||||
expect(txQuery.found).toBe(true);
|
||||
if (!txQuery.found) {
|
||||
|
||||
Reference in New Issue
Block a user