diff --git a/.env.base b/.env.base index 1a137bf..c974a4d 100644 --- a/.env.base +++ b/.env.base @@ -9,4 +9,4 @@ HASURA_GRAPHQL_ADMIN_SECRET=JBw5-wS9B-rOV2 HASURA_GRAPHQL_SERVER_PORT=40180 NODE_DATABASE_URL=postgresql://postgres:postgres@localhost:40432/sui_data_sync_db SYNC_AUTO_CREATE_SCHEMAS=true -SYNC_AUTO_CREATE_TABLES=false +SYNC_AUTO_CREATE_TABLES=true diff --git a/package.json b/package.json index 350e4ef..926db9d 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,8 @@ "slonik": "^45.4.1", "slonik-interceptor-query-logging": "^45.4.1", "slonik-sql-tag-raw": "^45.4.1", + "p-queue": "^6.6.2", + "p-retry": "^4.6.2", "tslib": "^2.6.2", "zod": "^3.23.8" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 36e7181..0917d6f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,6 +32,12 @@ importers: memoizee: specifier: ^0.4.15 version: 0.4.15 + p-queue: + specifier: ^6.6.2 + version: 6.6.2 + p-retry: + specifier: ^4.6.2 + version: 4.6.2 slonik: specifier: ^45.4.1 version: 45.4.1(zod@3.23.8) @@ -1590,6 +1596,9 @@ packages: '@types/responselike@1.0.0': resolution: {integrity: sha512-85Y2BjiufFzaMIlvJDvTTB8Fxl2xfLo4HgmHzVBz08w4wDePCTjYw66PdrolO0kzli3yam/YCgRufyo1DdQVTA==} + '@types/retry@0.12.0': + resolution: {integrity: sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA==} + '@types/semver-utils@1.1.3': resolution: {integrity: sha512-T+YwkslhsM+CeuhYUxyAjWm7mJ5am/K10UX40RuA6k6Lc7eGtq8iY2xOzy7Vq0GOqhl/xZl5l2FwURZMTPTUww==} @@ -3304,6 +3313,10 @@ packages: resolution: {integrity: sha512-mlVgR3PGuzlo0MmTdk4cXqXWlwQDLnONTAg6sm62XkMJEiRxN3GL3SffkYvqwonbkJBcrI7Uvv5Zh9yjvn2iUw==} engines: {node: '>=12.20'} + p-finally@1.0.0: + resolution: {integrity: sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==} + engines: {node: '>=4'} + p-limit@3.1.0: resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==} engines: {node: '>=10'} @@ -3320,6 +3333,18 @@ packages: resolution: {integrity: sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==} engines: {node: '>=10'} + p-queue@6.6.2: + resolution: {integrity: sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==} + engines: {node: '>=8'} + + p-retry@4.6.2: + resolution: {integrity: sha512-312Id396EbJdvRONlngUx0NydfrIQ5lsYu0znKVUzVvArzEIt08V1qhtyESbGVd1FGX7UKtiFp5uwKZdM8wIuQ==} + engines: {node: '>=8'} + + p-timeout@3.2.0: + resolution: {integrity: sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==} + engines: {node: '>=8'} + package-json@8.1.1: resolution: {integrity: sha512-cbH9IAIJHNj9uXi196JVsRlt7cHKak6u/e6AkL/bkRelZ7rlL3X1YKxsZwa36xipOEKAsdtmaG6aAJoM1fx2zA==} engines: {node: '>=14.16'} @@ -3685,6 +3710,10 @@ packages: resolution: {integrity: sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow==} engines: {node: '>= 4'} + retry@0.13.1: + resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} + engines: {node: '>= 4'} + reusify@1.0.4: resolution: {integrity: sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==} engines: {iojs: '>=1.0.0', node: '>=0.10.0'} @@ -5996,6 +6025,8 @@ snapshots: dependencies: '@types/node': 20.12.12 + '@types/retry@0.12.0': {} + '@types/semver-utils@1.1.3': {} '@typescript-eslint/eslint-plugin@7.10.0(@typescript-eslint/parser@7.10.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0)(typescript@5.4.5)': @@ -7975,6 +8006,8 @@ snapshots: p-cancelable@3.0.0: {} + p-finally@1.0.0: {} + p-limit@3.1.0: dependencies: yocto-queue: 0.1.0 @@ -7991,6 +8024,20 @@ snapshots: dependencies: aggregate-error: 3.1.0 + p-queue@6.6.2: + dependencies: + eventemitter3: 4.0.7 + p-timeout: 3.2.0 + + p-retry@4.6.2: + dependencies: + '@types/retry': 0.12.0 + retry: 0.13.1 + + p-timeout@3.2.0: + dependencies: + p-finally: 1.0.0 + package-json@8.1.1: dependencies: got: 12.6.1 @@ -8343,6 +8390,8 @@ snapshots: retry@0.12.0: {} + retry@0.13.1: {} + reusify@1.0.4: {} rimraf@3.0.2: diff --git a/src/lib/api/index.ts b/src/lib/api/index.ts index 39f449e..8f81c10 100644 --- a/src/lib/api/index.ts +++ b/src/lib/api/index.ts @@ -1,11 +1,11 @@ import got from 'got-cjs'; import { env } from '../env'; import { + MethodTypes, SUIQueryRequest, SUIQueryResponse, SUIQuerySchema, } from '../model/json-rpc'; -import { parseJSON, stringifyJSON } from '../utils/json'; class QueryInconsistencyError extends Error { constructor(message: string) { @@ -14,10 +14,10 @@ class QueryInconsistencyError extends Error { } } -export async function sui_query( - method: T['method'], - params: T['request'], -): Promise { +export async function sui_query( + method: T, + params: Extract['request'], +): Promise['response']> { const randomID = Math.floor(Math.random() * 1000000); const request = SUIQueryRequest.parse({ jsonrpc: '2.0', @@ -29,6 +29,12 @@ export async function sui_query( const data = await got .post(env().SUI_NODE_RPC_URL, { json: request, + // https://github.com/sindresorhus/got/blob/main/documentation/7-retry.md + retry: { + methods: ['post'], + limit: 10, // Number of retries to attempt. the last one is (2 ** (attemptCount − 1) /60) ~= 8 mins + statusCodes: [408, 413, 429, 500, 502, 503, 504, 521, 522, 524], + }, }) .json(); @@ -45,17 +51,17 @@ export async function sui_query( response: typedData.result, }); - return validator.response; + return validator.response as any; } -sui_query('suix_queryEvents', [ - { - MoveEventType: - '0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted', - }, - null, - 5, - true, -]).then(a => { - console.log(parseJSON(stringifyJSON(a, 2))); -}); //? +// sui_query('suix_queryEvents', [ +// { +// MoveEventType: +// '0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted', +// }, +// null, +// 5, +// true, +// ]).then(a => { +// console.log(parseJSON(stringifyJSON(a, 2))); +// }); //? diff --git a/src/lib/data-sync/data-sync.interface.ts b/src/lib/data-sync/data-sync.interface.ts index 4e12663..152d28c 100644 --- a/src/lib/data-sync/data-sync.interface.ts +++ b/src/lib/data-sync/data-sync.interface.ts @@ -1,2 +1,15 @@ export abstract class DataSyncService { + abstract startSync(): void; + abstract stopSync(): void; + abstract setSchemas(schemas: EventSyncSchema[]): void; + abstract addSyncMoveEventType(type: string): void; } + +export type EventSyncSchema = { + tableSchema: string; + transactionModule: string; + events: { + eventName: string; + fields: { [name: string]: 'string' | 'buffer' | 'number' | 'bool' }; + }[]; +}; diff --git a/src/lib/data-sync/data-sync.repository.ts b/src/lib/data-sync/data-sync.repository.ts index cc1dbf0..d33108a 100644 --- a/src/lib/data-sync/data-sync.repository.ts +++ b/src/lib/data-sync/data-sync.repository.ts @@ -1,21 +1,17 @@ import { Inject, Logger } from '@nestjs/common'; import assert from 'assert'; -import { DatabaseTransactionConnection, SqlToken } from 'slonik'; +import { + BinarySqlToken, + DatabaseTransactionConnection, + JsonBinarySqlToken, +} from 'slonik'; import { raw } from 'slonik-sql-tag-raw'; import { z } from 'zod'; import { env } from '../env'; import { PageEvent } from '../model/json-rpc'; import { SQL } from '../persistent/SQL'; import { PersistentService } from '../persistent/persistent.interface'; - -export type EventSyncSchema = { - tableSchema: string; - transactionModule: string; - events: { - eventName: string; - fields: { [name: string]: 'string' | 'buffer' | 'number' | 'bool' }; - }[]; -}; +import { EventSyncSchema } from './data-sync.interface'; export class DataSyncRepository { private readonly logger = new Logger(DataSyncRepository.name, { @@ -112,8 +108,9 @@ export class DataSyncRepository { "transactionModule" text NOT NULL, "sender" bytea NOT NULL, "bcs" text NOT NULL, - "timestampMs" timestamp NOT NULL, - "parsedJson" jsonb NOT NULL, + "type" text NOT NULL, + "timestampMs" BIGINT NOT NULL, + "parsedJson" jsonb NOT NULL, ${fieldsSql} PRIMARY KEY ("txDigest", "eventSeq") ); @@ -228,7 +225,13 @@ ${fieldsSql} SQL.identifier(['timestampMs']), SQL.identifier(['parsedJson']), ]; - const values: (SqlToken | string)[] = [ + const values: ( + | string + | number + | boolean + | BinarySqlToken + | JsonBinarySqlToken + )[] = [ SQL.string(event.id.txDigest), SQL.string(event.id.eventSeq), SQL.string(eventTypeName), @@ -243,7 +246,9 @@ ${fieldsSql} for (const [name, type] of Object.entries(eventAbi.fields)) { fields.push(SQL.identifier([name])); - values.push(SQL.jsonb(event.parsedJson[name])); + const value = event.parsedJson[name] as any; + // @ts-ignore + values.push(SQL[type](value)); } const keyFragments = SQL.join(fields, SQL.fragment`, `); @@ -258,4 +263,25 @@ ${fieldsSql} on conflict do nothing; `); } + + async getLatestEventDigest(type: string, tableSchema: string) { + const types = type.split('::'); + assert(types.length === 3, `invalid event type: ${type}`); + const [packageId, transactionModule, event] = types; + const tableName = `${transactionModule}_evt_${event}`; + const digest = await this.persistentService.pgPool.maybeOne(SQL.type( + z.object({ txDigest: z.string(), eventSeq: z.string() }), + ) // language=sql format=false + ` + SELECT "txDigest", "eventSeq" + FROM ${SQL.identifier([tableSchema, tableName])} + WHERE "packageId" = ${SQL.string(packageId!)} + and "transactionModule" = ${SQL.string(transactionModule!)} + and "eventName" = ${SQL.string(event!)} + ORDER BY "timestampMs" DESC + LIMIT 1; + `); + + return digest; + } } diff --git a/src/lib/data-sync/data-sync.service.ts b/src/lib/data-sync/data-sync.service.ts index 3f0015e..5d95db6 100644 --- a/src/lib/data-sync/data-sync.service.ts +++ b/src/lib/data-sync/data-sync.service.ts @@ -1,7 +1,177 @@ -import { DataSyncService } from './data-sync.interface'; +import { Inject, Logger } from '@nestjs/common'; +import assert from 'assert'; +import PQueue from 'p-queue'; +import { sui_query } from '../api'; +import { PageCursorEvent, PageEvent } from '../model/json-rpc'; +import { noAwait } from '../utils/no-await'; +import { DataSyncService, EventSyncSchema } from './data-sync.interface'; +import { DataSyncRepository } from './data-sync.repository'; +const kQueryLimit = 50; +const kFetchLimit = 100000; export class DefaultDataSyncService implements DataSyncService { - constructor() { + private syncingTypes: string[] = []; + private readonly logger = new Logger(DataSyncService.name); + public syncInterval = 10e3; + private syncing = false; + public schemas: EventSyncSchema[] = []; + + constructor( + @Inject(DataSyncRepository) + private readonly dataSyncRepository: DataSyncRepository, + ) {} + + startSync() { + noAwait(this.loopSync()); + } + + stopSync() { + this.syncing = false; + } + + setSchemas(schemas: EventSyncSchema[]) { + this.schemas = schemas; + } + + async loopSync() { + for (const schema of this.schemas) { + await this.dataSyncRepository.preCheckEventsAbi(schema); + } + this.syncing = true; + while (this.syncing) { + try { + await this.syncMoveEventTypes(); + } catch (e) { + this.logger.error(`sync error: ${e}`); + } + await new Promise(r => setTimeout(r, this.syncInterval)); + } + } + + private findModuleEvent(type: string) { + const types = type.split('::'); + if (types.length !== 3) { + throw new Error(`invalid event type: ${type}`); + } + const packageId = types[0]; + assert(packageId, `missing package id: ${type}`); + const moduleName = types[1]; + assert(moduleName, `missing module name: ${type}`); + const schema = this.schemas.find(s => { + return s.transactionModule == moduleName; + }); + if (!schema) { + throw new Error(`missing schema for ${moduleName}`); + } + + const typeName = types[2]; + assert(typeName, `missing event type name: ${type}`); + const event = schema.events.find(e => { + return e.eventName == typeName; + }); + if (!event) { + throw new Error(`missing event ${typeName} in schema ${moduleName}`); + } + + return { + packageId, + transactionModule: moduleName, + typeName, + event, + tableSchema: schema.tableSchema, + }; + } + + addSyncMoveEventType(type: string) { + const types = type.split('::'); + if (types.length !== 3) { + throw new Error(`invalid event type: ${type}`); + } + if (this.syncingTypes.includes(type)) { + return; + } + this.findModuleEvent(type); + + this.syncingTypes.push(type); + } + + private async syncMoveEventType(type: string) { + const { tableSchema } = this.findModuleEvent(type); + const latestInDb = await this.dataSyncRepository.getLatestEventDigest( + type, + tableSchema, + ); + const containLatestResult = (results: PageEvent[]) => { + if (!latestInDb) { + return false; + } + return results.find( + r => + r.id.txDigest === latestInDb.txDigest && + r.id.eventSeq === latestInDb.eventSeq, + ); + }; + + let cursor: PageCursorEvent | null = null; + const sink: PageEvent[] = []; + for (let i = 0; i < kFetchLimit; i++) { + const response = await sui_query('suix_queryEvents', [ + { MoveEventType: type }, + cursor, + kQueryLimit, + true, + ]); + const data = response.data; + if (data === null) { + break; + } + // building sink until finished + sink.push(...data); + if (containLatestResult(data)) { + break; + } + if (!response.hasNextPage) { + break; + } + + if ( + response.nextCursor == null || + response.nextCursor.eventSeq == null || + response.nextCursor.txDigest == null + ) { + this.logger.error(`invalid cursor: ${JSON.stringify(response)}`); + break; + } + cursor = response.nextCursor as any; + // this.logger.verbose(`next cursor: ${JSON.stringify(cursor)}`); + + if (i === kFetchLimit - 1) { + throw new Error(`fetch limit reached: ${kFetchLimit}`); + } + } + + // write to db + await this.dataSyncRepository.saveEvents({ + events: sink, + schemas: this.schemas, + }); + } + + private internalSyncCounter = 0; + private async syncMoveEventTypes() { + this.logger.verbose( + `[${++this.internalSyncCounter}] syncing ${ + this.syncingTypes.length + } types...`, + ); + const queue = new PQueue({ concurrency: this.syncingTypes.length }); + for (const type of this.syncingTypes) { + noAwait(queue.add(() => this.syncMoveEventType(type))); + } + await queue.onIdle(); + this.logger.debug( + `[${this.internalSyncCounter}] syncing ${this.syncingTypes.length} types done`, + ); } } diff --git a/src/lib/data-sync/index.ts b/src/lib/data-sync/index.ts index 4df4914..2c41fbd 100644 --- a/src/lib/data-sync/index.ts +++ b/src/lib/data-sync/index.ts @@ -1,6 +1,6 @@ import { NestFactory } from '@nestjs/core'; +import { DataSyncService, EventSyncSchema } from './data-sync.interface'; import { DataSyncModule } from './data-sync.module'; -import { DataSyncRepository, EventSyncSchema } from './data-sync.repository'; export * from './data-sync.interface'; export * from './data-sync.module'; @@ -20,11 +20,14 @@ async function main() { const app = await NestFactory.createMicroservice(DataSyncModule, { logger: ['log', 'verbose', 'error', 'warn', 'debug', 'fatal'], }); - const repo = app.get(DataSyncRepository); + const syncService = app.get(DataSyncService); - repo.preCheckEventsAbi(schema).then(a => { - console.log(a); //? - }); + syncService.setSchemas([schema]); + syncService.addSyncMoveEventType( + `0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted`, + ); + + syncService.startSync(); } main().catch(console.error); diff --git a/src/lib/model/json-rpc/base.model.ts b/src/lib/model/json-rpc/base.model.ts index 226afcc..16f3dab 100644 --- a/src/lib/model/json-rpc/base.model.ts +++ b/src/lib/model/json-rpc/base.model.ts @@ -9,7 +9,7 @@ export function makePaginationResponseSchema(data: z.ZodType) { txDigest: z.string(), eventSeq: z.string(), }) - .nullish(), + .nullable(), }); } @@ -73,4 +73,5 @@ export const EventFilter = z.union([ export const PageCursorEvent = z.object({ txDigest: z.string(), eventSeq: z.string(), -}); \ No newline at end of file +}); +export type PageCursorEvent = z.infer; \ No newline at end of file diff --git a/src/lib/model/json-rpc/query-schema.model.ts b/src/lib/model/json-rpc/query-schema.model.ts index ae0f529..6283079 100644 --- a/src/lib/model/json-rpc/query-schema.model.ts +++ b/src/lib/model/json-rpc/query-schema.model.ts @@ -45,7 +45,7 @@ export const SUIQuerySchemaTransactionBlocks = z.object({ request: z.tuple([ z.string(), //query ]), - response: makePaginationResponseSchema(z.object({})), + response: makePaginationResponseSchema(z.object({ abc: z.string() })), }); // endregion @@ -55,4 +55,6 @@ export const SUIQuerySchema = z.discriminatedUnion('method', [ SUIQuerySchemaTransactionBlocks, ]); export type SUIQuerySchema = z.infer; + +export type MethodTypes = SUIQuerySchema['method']; // endregion diff --git a/src/lib/persistent/SQL.ts b/src/lib/persistent/SQL.ts index 5544179..ee7e981 100644 --- a/src/lib/persistent/SQL.ts +++ b/src/lib/persistent/SQL.ts @@ -16,6 +16,7 @@ export const SQL = { number: (num: number) => num, string: (str: string) => str, boolean: (bool: boolean) => bool, + bool: (bool: boolean) => bool, buffer: (buffer: string | Buffer) => { if (typeof buffer === 'string') { return SQL.binary(BufferSchema.parse(buffer)); diff --git a/src/lib/utils/no-await.ts b/src/lib/utils/no-await.ts new file mode 100644 index 0000000..1fe2e44 --- /dev/null +++ b/src/lib/utils/no-await.ts @@ -0,0 +1,4 @@ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const noAwait = (_: Promise) => { + void _; +}; diff --git a/tsconfig.lib.json b/tsconfig.lib.json index 1c95782..92f9ffc 100644 --- a/tsconfig.lib.json +++ b/tsconfig.lib.json @@ -2,7 +2,6 @@ "extends": "./tsconfig.json", "compilerOptions": { "outDir": "./dist/out-tsc", - "declaration": true, "types": ["node"] }, "include": ["src/**/*.ts"],