feat: syncing to db

This commit is contained in:
Zitao Xiong
2024-05-24 16:40:47 +08:00
parent 209fb58669
commit 4c5573bc9f
13 changed files with 319 additions and 43 deletions

View File

@@ -9,4 +9,4 @@ HASURA_GRAPHQL_ADMIN_SECRET=JBw5-wS9B-rOV2
HASURA_GRAPHQL_SERVER_PORT=40180
NODE_DATABASE_URL=postgresql://postgres:postgres@localhost:40432/sui_data_sync_db
SYNC_AUTO_CREATE_SCHEMAS=true
SYNC_AUTO_CREATE_TABLES=false
SYNC_AUTO_CREATE_TABLES=true

View File

@@ -18,6 +18,8 @@
"slonik": "^45.4.1",
"slonik-interceptor-query-logging": "^45.4.1",
"slonik-sql-tag-raw": "^45.4.1",
"p-queue": "^6.6.2",
"p-retry": "^4.6.2",
"tslib": "^2.6.2",
"zod": "^3.23.8"
},

49
pnpm-lock.yaml generated
View File

@@ -32,6 +32,12 @@ importers:
memoizee:
specifier: ^0.4.15
version: 0.4.15
p-queue:
specifier: ^6.6.2
version: 6.6.2
p-retry:
specifier: ^4.6.2
version: 4.6.2
slonik:
specifier: ^45.4.1
version: 45.4.1(zod@3.23.8)
@@ -1590,6 +1596,9 @@ packages:
'@types/responselike@1.0.0':
resolution: {integrity: sha512-85Y2BjiufFzaMIlvJDvTTB8Fxl2xfLo4HgmHzVBz08w4wDePCTjYw66PdrolO0kzli3yam/YCgRufyo1DdQVTA==}
'@types/retry@0.12.0':
resolution: {integrity: sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA==}
'@types/semver-utils@1.1.3':
resolution: {integrity: sha512-T+YwkslhsM+CeuhYUxyAjWm7mJ5am/K10UX40RuA6k6Lc7eGtq8iY2xOzy7Vq0GOqhl/xZl5l2FwURZMTPTUww==}
@@ -3304,6 +3313,10 @@ packages:
resolution: {integrity: sha512-mlVgR3PGuzlo0MmTdk4cXqXWlwQDLnONTAg6sm62XkMJEiRxN3GL3SffkYvqwonbkJBcrI7Uvv5Zh9yjvn2iUw==}
engines: {node: '>=12.20'}
p-finally@1.0.0:
resolution: {integrity: sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==}
engines: {node: '>=4'}
p-limit@3.1.0:
resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==}
engines: {node: '>=10'}
@@ -3320,6 +3333,18 @@ packages:
resolution: {integrity: sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==}
engines: {node: '>=10'}
p-queue@6.6.2:
resolution: {integrity: sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==}
engines: {node: '>=8'}
p-retry@4.6.2:
resolution: {integrity: sha512-312Id396EbJdvRONlngUx0NydfrIQ5lsYu0znKVUzVvArzEIt08V1qhtyESbGVd1FGX7UKtiFp5uwKZdM8wIuQ==}
engines: {node: '>=8'}
p-timeout@3.2.0:
resolution: {integrity: sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==}
engines: {node: '>=8'}
package-json@8.1.1:
resolution: {integrity: sha512-cbH9IAIJHNj9uXi196JVsRlt7cHKak6u/e6AkL/bkRelZ7rlL3X1YKxsZwa36xipOEKAsdtmaG6aAJoM1fx2zA==}
engines: {node: '>=14.16'}
@@ -3685,6 +3710,10 @@ packages:
resolution: {integrity: sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow==}
engines: {node: '>= 4'}
retry@0.13.1:
resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==}
engines: {node: '>= 4'}
reusify@1.0.4:
resolution: {integrity: sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==}
engines: {iojs: '>=1.0.0', node: '>=0.10.0'}
@@ -5996,6 +6025,8 @@ snapshots:
dependencies:
'@types/node': 20.12.12
'@types/retry@0.12.0': {}
'@types/semver-utils@1.1.3': {}
'@typescript-eslint/eslint-plugin@7.10.0(@typescript-eslint/parser@7.10.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0)(typescript@5.4.5)':
@@ -7975,6 +8006,8 @@ snapshots:
p-cancelable@3.0.0: {}
p-finally@1.0.0: {}
p-limit@3.1.0:
dependencies:
yocto-queue: 0.1.0
@@ -7991,6 +8024,20 @@ snapshots:
dependencies:
aggregate-error: 3.1.0
p-queue@6.6.2:
dependencies:
eventemitter3: 4.0.7
p-timeout: 3.2.0
p-retry@4.6.2:
dependencies:
'@types/retry': 0.12.0
retry: 0.13.1
p-timeout@3.2.0:
dependencies:
p-finally: 1.0.0
package-json@8.1.1:
dependencies:
got: 12.6.1
@@ -8343,6 +8390,8 @@ snapshots:
retry@0.12.0: {}
retry@0.13.1: {}
reusify@1.0.4: {}
rimraf@3.0.2:

View File

@@ -1,11 +1,11 @@
import got from 'got-cjs';
import { env } from '../env';
import {
MethodTypes,
SUIQueryRequest,
SUIQueryResponse,
SUIQuerySchema,
} from '../model/json-rpc';
import { parseJSON, stringifyJSON } from '../utils/json';
class QueryInconsistencyError extends Error {
constructor(message: string) {
@@ -14,10 +14,10 @@ class QueryInconsistencyError extends Error {
}
}
export async function sui_query<T extends SUIQuerySchema>(
method: T['method'],
params: T['request'],
): Promise<T['response']> {
export async function sui_query<T extends MethodTypes>(
method: T,
params: Extract<SUIQuerySchema, { method: T }>['request'],
): Promise<Extract<SUIQuerySchema, { method: T }>['response']> {
const randomID = Math.floor(Math.random() * 1000000);
const request = SUIQueryRequest.parse({
jsonrpc: '2.0',
@@ -29,6 +29,12 @@ export async function sui_query<T extends SUIQuerySchema>(
const data = await got
.post(env().SUI_NODE_RPC_URL, {
json: request,
// https://github.com/sindresorhus/got/blob/main/documentation/7-retry.md
retry: {
methods: ['post'],
limit: 10, // Number of retries to attempt. the last one is (2 ** (attemptCount 1) /60) ~= 8 mins
statusCodes: [408, 413, 429, 500, 502, 503, 504, 521, 522, 524],
},
})
.json();
@@ -45,17 +51,17 @@ export async function sui_query<T extends SUIQuerySchema>(
response: typedData.result,
});
return validator.response;
return validator.response as any;
}
sui_query('suix_queryEvents', [
{
MoveEventType:
'0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted',
},
null,
5,
true,
]).then(a => {
console.log(parseJSON(stringifyJSON(a, 2)));
}); //?
// sui_query('suix_queryEvents', [
// {
// MoveEventType:
// '0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted',
// },
// null,
// 5,
// true,
// ]).then(a => {
// console.log(parseJSON(stringifyJSON(a, 2)));
// }); //?

View File

@@ -1,2 +1,15 @@
export abstract class DataSyncService {
abstract startSync(): void;
abstract stopSync(): void;
abstract setSchemas(schemas: EventSyncSchema[]): void;
abstract addSyncMoveEventType(type: string): void;
}
export type EventSyncSchema = {
tableSchema: string;
transactionModule: string;
events: {
eventName: string;
fields: { [name: string]: 'string' | 'buffer' | 'number' | 'bool' };
}[];
};

View File

@@ -1,21 +1,17 @@
import { Inject, Logger } from '@nestjs/common';
import assert from 'assert';
import { DatabaseTransactionConnection, SqlToken } from 'slonik';
import {
BinarySqlToken,
DatabaseTransactionConnection,
JsonBinarySqlToken,
} from 'slonik';
import { raw } from 'slonik-sql-tag-raw';
import { z } from 'zod';
import { env } from '../env';
import { PageEvent } from '../model/json-rpc';
import { SQL } from '../persistent/SQL';
import { PersistentService } from '../persistent/persistent.interface';
export type EventSyncSchema = {
tableSchema: string;
transactionModule: string;
events: {
eventName: string;
fields: { [name: string]: 'string' | 'buffer' | 'number' | 'bool' };
}[];
};
import { EventSyncSchema } from './data-sync.interface';
export class DataSyncRepository {
private readonly logger = new Logger(DataSyncRepository.name, {
@@ -112,8 +108,9 @@ export class DataSyncRepository {
"transactionModule" text NOT NULL,
"sender" bytea NOT NULL,
"bcs" text NOT NULL,
"timestampMs" timestamp NOT NULL,
"parsedJson" jsonb NOT NULL,
"type" text NOT NULL,
"timestampMs" BIGINT NOT NULL,
"parsedJson" jsonb NOT NULL,
${fieldsSql}
PRIMARY KEY ("txDigest", "eventSeq")
);
@@ -228,7 +225,13 @@ ${fieldsSql}
SQL.identifier(['timestampMs']),
SQL.identifier(['parsedJson']),
];
const values: (SqlToken | string)[] = [
const values: (
| string
| number
| boolean
| BinarySqlToken
| JsonBinarySqlToken
)[] = [
SQL.string(event.id.txDigest),
SQL.string(event.id.eventSeq),
SQL.string(eventTypeName),
@@ -243,7 +246,9 @@ ${fieldsSql}
for (const [name, type] of Object.entries(eventAbi.fields)) {
fields.push(SQL.identifier([name]));
values.push(SQL.jsonb(event.parsedJson[name]));
const value = event.parsedJson[name] as any;
// @ts-ignore
values.push(SQL[type](value));
}
const keyFragments = SQL.join(fields, SQL.fragment`, `);
@@ -258,4 +263,25 @@ ${fieldsSql}
on conflict do nothing;
`);
}
async getLatestEventDigest(type: string, tableSchema: string) {
const types = type.split('::');
assert(types.length === 3, `invalid event type: ${type}`);
const [packageId, transactionModule, event] = types;
const tableName = `${transactionModule}_evt_${event}`;
const digest = await this.persistentService.pgPool.maybeOne(SQL.type(
z.object({ txDigest: z.string(), eventSeq: z.string() }),
) // language=sql format=false
`
SELECT "txDigest", "eventSeq"
FROM ${SQL.identifier([tableSchema, tableName])}
WHERE "packageId" = ${SQL.string(packageId!)}
and "transactionModule" = ${SQL.string(transactionModule!)}
and "eventName" = ${SQL.string(event!)}
ORDER BY "timestampMs" DESC
LIMIT 1;
`);
return digest;
}
}

View File

@@ -1,7 +1,177 @@
import { DataSyncService } from './data-sync.interface';
import { Inject, Logger } from '@nestjs/common';
import assert from 'assert';
import PQueue from 'p-queue';
import { sui_query } from '../api';
import { PageCursorEvent, PageEvent } from '../model/json-rpc';
import { noAwait } from '../utils/no-await';
import { DataSyncService, EventSyncSchema } from './data-sync.interface';
import { DataSyncRepository } from './data-sync.repository';
const kQueryLimit = 50;
const kFetchLimit = 100000;
export class DefaultDataSyncService implements DataSyncService {
constructor() {
private syncingTypes: string[] = [];
private readonly logger = new Logger(DataSyncService.name);
public syncInterval = 10e3;
private syncing = false;
public schemas: EventSyncSchema[] = [];
constructor(
@Inject(DataSyncRepository)
private readonly dataSyncRepository: DataSyncRepository,
) {}
startSync() {
noAwait(this.loopSync());
}
stopSync() {
this.syncing = false;
}
setSchemas(schemas: EventSyncSchema[]) {
this.schemas = schemas;
}
async loopSync() {
for (const schema of this.schemas) {
await this.dataSyncRepository.preCheckEventsAbi(schema);
}
this.syncing = true;
while (this.syncing) {
try {
await this.syncMoveEventTypes();
} catch (e) {
this.logger.error(`sync error: ${e}`);
}
await new Promise(r => setTimeout(r, this.syncInterval));
}
}
private findModuleEvent(type: string) {
const types = type.split('::');
if (types.length !== 3) {
throw new Error(`invalid event type: ${type}`);
}
const packageId = types[0];
assert(packageId, `missing package id: ${type}`);
const moduleName = types[1];
assert(moduleName, `missing module name: ${type}`);
const schema = this.schemas.find(s => {
return s.transactionModule == moduleName;
});
if (!schema) {
throw new Error(`missing schema for ${moduleName}`);
}
const typeName = types[2];
assert(typeName, `missing event type name: ${type}`);
const event = schema.events.find(e => {
return e.eventName == typeName;
});
if (!event) {
throw new Error(`missing event ${typeName} in schema ${moduleName}`);
}
return {
packageId,
transactionModule: moduleName,
typeName,
event,
tableSchema: schema.tableSchema,
};
}
addSyncMoveEventType(type: string) {
const types = type.split('::');
if (types.length !== 3) {
throw new Error(`invalid event type: ${type}`);
}
if (this.syncingTypes.includes(type)) {
return;
}
this.findModuleEvent(type);
this.syncingTypes.push(type);
}
private async syncMoveEventType(type: string) {
const { tableSchema } = this.findModuleEvent(type);
const latestInDb = await this.dataSyncRepository.getLatestEventDigest(
type,
tableSchema,
);
const containLatestResult = (results: PageEvent[]) => {
if (!latestInDb) {
return false;
}
return results.find(
r =>
r.id.txDigest === latestInDb.txDigest &&
r.id.eventSeq === latestInDb.eventSeq,
);
};
let cursor: PageCursorEvent | null = null;
const sink: PageEvent[] = [];
for (let i = 0; i < kFetchLimit; i++) {
const response = await sui_query('suix_queryEvents', [
{ MoveEventType: type },
cursor,
kQueryLimit,
true,
]);
const data = response.data;
if (data === null) {
break;
}
// building sink until finished
sink.push(...data);
if (containLatestResult(data)) {
break;
}
if (!response.hasNextPage) {
break;
}
if (
response.nextCursor == null ||
response.nextCursor.eventSeq == null ||
response.nextCursor.txDigest == null
) {
this.logger.error(`invalid cursor: ${JSON.stringify(response)}`);
break;
}
cursor = response.nextCursor as any;
// this.logger.verbose(`next cursor: ${JSON.stringify(cursor)}`);
if (i === kFetchLimit - 1) {
throw new Error(`fetch limit reached: ${kFetchLimit}`);
}
}
// write to db
await this.dataSyncRepository.saveEvents({
events: sink,
schemas: this.schemas,
});
}
private internalSyncCounter = 0;
private async syncMoveEventTypes() {
this.logger.verbose(
`[${++this.internalSyncCounter}] syncing ${
this.syncingTypes.length
} types...`,
);
const queue = new PQueue({ concurrency: this.syncingTypes.length });
for (const type of this.syncingTypes) {
noAwait(queue.add(() => this.syncMoveEventType(type)));
}
await queue.onIdle();
this.logger.debug(
`[${this.internalSyncCounter}] syncing ${this.syncingTypes.length} types done`,
);
}
}

View File

@@ -1,6 +1,6 @@
import { NestFactory } from '@nestjs/core';
import { DataSyncService, EventSyncSchema } from './data-sync.interface';
import { DataSyncModule } from './data-sync.module';
import { DataSyncRepository, EventSyncSchema } from './data-sync.repository';
export * from './data-sync.interface';
export * from './data-sync.module';
@@ -20,11 +20,14 @@ async function main() {
const app = await NestFactory.createMicroservice(DataSyncModule, {
logger: ['log', 'verbose', 'error', 'warn', 'debug', 'fatal'],
});
const repo = app.get(DataSyncRepository);
const syncService = app.get(DataSyncService);
repo.preCheckEventsAbi(schema).then(a => {
console.log(a); //?
});
syncService.setSchemas([schema]);
syncService.addSyncMoveEventType(
`0xceba50ec29ada96392373f340fe4eeffab45140ac66acc9459770e5a3c58abf8::simple_gift_box::GiftBoxMinted`,
);
syncService.startSync();
}
main().catch(console.error);

View File

@@ -9,7 +9,7 @@ export function makePaginationResponseSchema<T>(data: z.ZodType<T>) {
txDigest: z.string(),
eventSeq: z.string(),
})
.nullish(),
.nullable(),
});
}
@@ -73,4 +73,5 @@ export const EventFilter = z.union([
export const PageCursorEvent = z.object({
txDigest: z.string(),
eventSeq: z.string(),
});
});
export type PageCursorEvent = z.infer<typeof PageCursorEvent>;

View File

@@ -45,7 +45,7 @@ export const SUIQuerySchemaTransactionBlocks = z.object({
request: z.tuple([
z.string(), //query
]),
response: makePaginationResponseSchema(z.object({})),
response: makePaginationResponseSchema(z.object({ abc: z.string() })),
});
// endregion
@@ -55,4 +55,6 @@ export const SUIQuerySchema = z.discriminatedUnion('method', [
SUIQuerySchemaTransactionBlocks,
]);
export type SUIQuerySchema = z.infer<typeof SUIQuerySchema>;
export type MethodTypes = SUIQuerySchema['method'];
// endregion

View File

@@ -16,6 +16,7 @@ export const SQL = {
number: (num: number) => num,
string: (str: string) => str,
boolean: (bool: boolean) => bool,
bool: (bool: boolean) => bool,
buffer: (buffer: string | Buffer) => {
if (typeof buffer === 'string') {
return SQL.binary(BufferSchema.parse(buffer));

View File

@@ -0,0 +1,4 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const noAwait = (_: Promise<any>) => {
void _;
};

View File

@@ -2,7 +2,6 @@
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "./dist/out-tsc",
"declaration": true,
"types": ["node"]
},
"include": ["src/**/*.ts"],