From 701bd1a984c4ab064ddb1273a74cdb25975d7c1c Mon Sep 17 00:00:00 2001 From: Chris Guimaraes Date: Fri, 11 Aug 2023 17:54:37 +0100 Subject: [PATCH] feat: events folder as environment var --- .env | 13 +++++--- .../parquet-based/dataset/store.ts | 31 +++++++++++-------- src/event-replay/parquet-based/helpers.ts | 4 +-- .../parquet-based/workers/new-block-worker.ts | 2 +- .../parquet-based/workers/raw-worker.ts | 2 +- 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/.env b/.env index 1348d855..095a6907 100644 --- a/.env +++ b/.env @@ -172,12 +172,15 @@ STACKS_NODE_TYPE=L1 # STACKS_ADDRESS_CACHE_SIZE=10000 # Specify a URL to redirect from /doc. If this URL is not provided, server renders local documentation -# of openapi.yaml for test / development NODE_ENV. +# of openapi.yaml for test / development NODE_ENV. # For production, /doc is not served if this env var is not provided. -# API_DOCS_URL="https://docs.hiro.so/api" +# API_DOCS_URL="https://docs.hiro.so/api" -# For use while syncing. Places the API into an "Initial Block Download(IBD)" mode, -# forcing it to stop any redundant processing until the node is fully synced up to its peers. -# Some examples of processing that are avoided are: +# For use while syncing. Places the API into an "Initial Block Download(IBD)" mode, +# forcing it to stop any redundant processing until the node is fully synced up to its peers. +# Some examples of processing that are avoided are: # REFRESH MATERIALIZED VIEW SQLs that are extremely CPU intensive on the PG instance, Mempool messages, etc., # IBD_MODE_UNTIL_BLOCK= + +# Folder with events to be imported by the event-replay. +STACKS_EVENTS_DIR=./eventssds diff --git a/src/event-replay/parquet-based/dataset/store.ts b/src/event-replay/parquet-based/dataset/store.ts index 7185b9e8..adafda30 100644 --- a/src/event-replay/parquet-based/dataset/store.ts +++ b/src/event-replay/parquet-based/dataset/store.ts @@ -1,5 +1,10 @@ +import { loadDotEnv } from '../../../helpers'; import { Database, QueryResult, TableData } from 'duckdb'; +loadDotEnv(); + +const EVENTS_DIR = process.env.STACKS_EVENTS_DIR; + export class DatasetStore { private readonly db; @@ -19,7 +24,7 @@ export class DatasetStore { const con = this.db.connect(); return new Promise(resolve => { con.all( - "SELECT id FROM READ_PARQUET('events/new_block/canonical/*.parquet')", + `SELECT id FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet')`, (err: any, result: any) => { if (err) { throw err; @@ -36,7 +41,7 @@ export class DatasetStore { return new Promise(resolve => { const con = this.db.connect(); const res = con.stream( - `SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id` + `SELECT payload FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id` ); resolve(res); @@ -47,7 +52,7 @@ export class DatasetStore { const con = this.db.connect(); return new Promise(resolve => { con.all( - `SELECT * FROM READ_PARQUET('events/new_block/*.parquet') WHERE block_height IN (${blockHeights})`, + `SELECT * FROM READ_PARQUET('${EVENTS_DIR}/new_block/*.parquet') WHERE block_height IN (${blockHeights})`, (err: any, res: any) => { if (err) { throw err; @@ -67,7 +72,7 @@ export class DatasetStore { return new Promise(resolve => { const con = this.db.connect(); con.all( - "SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet') ORDER BY id", + `SELECT * FROM READ_PARQUET('${EVENTS_DIR}/new_burn_block/canonical/*.parquet') ORDER BY id`, (err: any, result: any) => { if (err) { throw err; @@ -87,7 +92,7 @@ export class DatasetStore { const con = this.db.connect(); return new Promise(resolve => { const res = con.stream( - "SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet') ORDER BY id" + `SELECT payload FROM READ_PARQUET('${EVENTS_DIR}/attachments/new/canonical/*.parquet') ORDER BY id` ); resolve(res); @@ -103,11 +108,11 @@ export class DatasetStore { const con = this.db.connect(); con.all( `SELECT method, payload FROM READ_PARQUET([ - 'events/new_burn_block/canonical/*.parquet', - 'events/attachments/new/canonical/*.parquet', - 'events/new_microblocks/*.parquet', - 'events/drop_mempool_tx/*.parquet', - 'events/new_mempool_tx/*.parquet', + '${EVENTS_DIR}/new_burn_block/canonical/*.parquet', + '${EVENTS_DIR}/attachments/new/canonical/*.parquet', + '${EVENTS_DIR}/new_microblocks/*.parquet', + '${EVENTS_DIR}/drop_mempool_tx/*.parquet', + '${EVENTS_DIR}/new_mempool_tx/*.parquet', ]) ORDER BY id`, (err: any, result: any) => { if (err) { @@ -124,7 +129,7 @@ export class DatasetStore { return new Promise(resolve => { const con = this.db.connect(); const res = con.stream( - `SELECT method, payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id` + `SELECT method, payload FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id` ); resolve(res); @@ -139,7 +144,7 @@ export class DatasetStore { return new Promise(resolve => { const con = this.db.connect(); con.all( - `SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`, + `SELECT method, payload FROM READ_PARQUET('${EVENTS_DIR}/remainder/*.parquet') ORDER BY id`, (err: any, res: any) => { if (err) { throw err; @@ -159,7 +164,7 @@ export class DatasetStore { return new Promise(resolve => { const con = this.db.connect(); con.all( - "SELECT * FROM READ_PARQUET('events/canonical/block_hashes/*.parquet')", + `SELECT * FROM READ_PARQUET('${EVENTS_DIR}/canonical/block_hashes/*.parquet')`, (err: any, res: any) => { if (err) { throw err; diff --git a/src/event-replay/parquet-based/helpers.ts b/src/event-replay/parquet-based/helpers.ts index 32655ae5..63ccddfe 100644 --- a/src/event-replay/parquet-based/helpers.ts +++ b/src/event-replay/parquet-based/helpers.ts @@ -76,7 +76,7 @@ const genIdsFiles = async (dataset: DatasetStore) => { logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} workers`); - const dir = './events/new_block'; + const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`; const ids: number[] = await dataset.newBlockEventsIds(); const batchSize = Math.ceil(ids.length / workers); @@ -95,7 +95,7 @@ const genIdsFiles = async (dataset: DatasetStore) => { // create id files chunks.forEach((chunk, idx) => { - const filename = `./events/new_block/ids_${idx + 1}.txt`; + const filename = `${dir}/ids_${idx + 1}.txt`; chunk.forEach(id => { fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' }); }); diff --git a/src/event-replay/parquet-based/workers/new-block-worker.ts b/src/event-replay/parquet-based/workers/new-block-worker.ts index 2cfd8c93..a1e715ad 100644 --- a/src/event-replay/parquet-based/workers/new-block-worker.ts +++ b/src/event-replay/parquet-based/workers/new-block-worker.ts @@ -20,7 +20,7 @@ const ingestNewBlock = async (idFile?: string) => { const timeTracker = createTimeTracker(); - const dir = './events/new_block'; + const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`; try { const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8'); diff --git a/src/event-replay/parquet-based/workers/raw-worker.ts b/src/event-replay/parquet-based/workers/raw-worker.ts index 2795cb57..8262d4cb 100644 --- a/src/event-replay/parquet-based/workers/raw-worker.ts +++ b/src/event-replay/parquet-based/workers/raw-worker.ts @@ -20,7 +20,7 @@ const ingestNewBlock = async (idFile?: string) => { const timeTracker = createTimeTracker(); - const dir = './events/new_block'; + const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`; try { const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');