mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: events folder as environment var
This commit is contained in:
3
.env
3
.env
@@ -181,3 +181,6 @@ STACKS_NODE_TYPE=L1
|
||||
# Some examples of processing that are avoided are:
|
||||
# REFRESH MATERIALIZED VIEW SQLs that are extremely CPU intensive on the PG instance, Mempool messages, etc.,
|
||||
# IBD_MODE_UNTIL_BLOCK=
|
||||
|
||||
# Folder with events to be imported by the event-replay.
|
||||
STACKS_EVENTS_DIR=./eventssds
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { loadDotEnv } from '../../../helpers';
|
||||
import { Database, QueryResult, TableData } from 'duckdb';
|
||||
|
||||
loadDotEnv();
|
||||
|
||||
const EVENTS_DIR = process.env.STACKS_EVENTS_DIR;
|
||||
|
||||
export class DatasetStore {
|
||||
private readonly db;
|
||||
|
||||
@@ -19,7 +24,7 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT id FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
|
||||
`SELECT id FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet')`,
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -36,7 +41,7 @@ export class DatasetStore {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
`SELECT payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
`SELECT payload FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
@@ -47,7 +52,7 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
`SELECT * FROM READ_PARQUET('events/new_block/*.parquet') WHERE block_height IN (${blockHeights})`,
|
||||
`SELECT * FROM READ_PARQUET('${EVENTS_DIR}/new_block/*.parquet') WHERE block_height IN (${blockHeights})`,
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -67,7 +72,7 @@ export class DatasetStore {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet') ORDER BY id",
|
||||
`SELECT * FROM READ_PARQUET('${EVENTS_DIR}/new_burn_block/canonical/*.parquet') ORDER BY id`,
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -87,7 +92,7 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
const res = con.stream(
|
||||
"SELECT payload FROM READ_PARQUET('events/attachments/new/canonical/*.parquet') ORDER BY id"
|
||||
`SELECT payload FROM READ_PARQUET('${EVENTS_DIR}/attachments/new/canonical/*.parquet') ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
@@ -103,11 +108,11 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
`SELECT method, payload FROM READ_PARQUET([
|
||||
'events/new_burn_block/canonical/*.parquet',
|
||||
'events/attachments/new/canonical/*.parquet',
|
||||
'events/new_microblocks/*.parquet',
|
||||
'events/drop_mempool_tx/*.parquet',
|
||||
'events/new_mempool_tx/*.parquet',
|
||||
'${EVENTS_DIR}/new_burn_block/canonical/*.parquet',
|
||||
'${EVENTS_DIR}/attachments/new/canonical/*.parquet',
|
||||
'${EVENTS_DIR}/new_microblocks/*.parquet',
|
||||
'${EVENTS_DIR}/drop_mempool_tx/*.parquet',
|
||||
'${EVENTS_DIR}/new_mempool_tx/*.parquet',
|
||||
]) ORDER BY id`,
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
@@ -124,7 +129,7 @@ export class DatasetStore {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
`SELECT method, payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
`SELECT method, payload FROM READ_PARQUET('${EVENTS_DIR}/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
@@ -139,7 +144,7 @@ export class DatasetStore {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
`SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`,
|
||||
`SELECT method, payload FROM READ_PARQUET('${EVENTS_DIR}/remainder/*.parquet') ORDER BY id`,
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -159,7 +164,7 @@ export class DatasetStore {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
con.all(
|
||||
"SELECT * FROM READ_PARQUET('events/canonical/block_hashes/*.parquet')",
|
||||
`SELECT * FROM READ_PARQUET('${EVENTS_DIR}/canonical/block_hashes/*.parquet')`,
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
|
||||
@@ -76,7 +76,7 @@ const genIdsFiles = async (dataset: DatasetStore) => {
|
||||
|
||||
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} workers`);
|
||||
|
||||
const dir = './events/new_block';
|
||||
const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`;
|
||||
|
||||
const ids: number[] = await dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
@@ -95,7 +95,7 @@ const genIdsFiles = async (dataset: DatasetStore) => {
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
const filename = `${dir}/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
|
||||
@@ -20,7 +20,7 @@ const ingestNewBlock = async (idFile?: string) => {
|
||||
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const dir = './events/new_block';
|
||||
const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`;
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
|
||||
|
||||
@@ -20,7 +20,7 @@ const ingestNewBlock = async (idFile?: string) => {
|
||||
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const dir = './events/new_block';
|
||||
const dir = `${process.env.STACKS_EVENTS_DIR}/new_block`;
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
|
||||
|
||||
Reference in New Issue
Block a user