mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
fix: optimize reg and dereg
This commit is contained in:
@@ -40,7 +40,7 @@ server helpers to handle all node interactions transparently.
|
||||
1. Create configuration objects for the local server and the Chainhook node you'll be interacting
|
||||
with
|
||||
```typescript
|
||||
import { ServerOptions, ChainhookNodeOptions } from "@hirosystems/chainhook-client/dist/server";
|
||||
import { ServerOptions, ChainhookNodeOptions } from "@hirosystems/chainhook-client";
|
||||
|
||||
// Local server options
|
||||
const opts: ServerOptions = {
|
||||
@@ -54,8 +54,7 @@ server helpers to handle all node interactions transparently.
|
||||
|
||||
// Chainhook node options
|
||||
const chainhook: ChainhookNodeOptions = {
|
||||
hostname: "<node_hostname>",
|
||||
port: 20456
|
||||
base_url: "<node_base_url>"
|
||||
};
|
||||
```
|
||||
|
||||
|
||||
@@ -47,3 +47,6 @@ export class ChainhookEventObserver {
|
||||
this.fastify = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export * from './server';
|
||||
export * from './schemas/payload';
|
||||
|
||||
@@ -27,8 +27,7 @@ const ServerOptionsSchema = Type.Object({
|
||||
export type ServerOptions = Static<typeof ServerOptionsSchema>;
|
||||
|
||||
const ChainhookNodeOptionsSchema = Type.Object({
|
||||
hostname: Type.String(),
|
||||
port: Type.Integer(),
|
||||
base_url: Type.String(),
|
||||
});
|
||||
/** Chainhook node connection options */
|
||||
export type ChainhookNodeOptions = Static<typeof ChainhookNodeOptionsSchema>;
|
||||
@@ -77,13 +76,13 @@ export async function buildServer(
|
||||
predicates: [ServerPredicate],
|
||||
callback: OnEventCallback
|
||||
) {
|
||||
const base_path = `http://${chainhookOpts.hostname}:${chainhookOpts.port}`;
|
||||
|
||||
async function waitForNode(this: FastifyInstance) {
|
||||
logger.info(`EventServer connecting to chainhook node at ${base_path}...`);
|
||||
logger.info(
|
||||
`ChainhookEventObserver connecting to chainhook node at ${chainhookOpts.base_url}...`
|
||||
);
|
||||
while (true) {
|
||||
try {
|
||||
await request(`${base_path}/ping`, { method: 'GET', throwOnError: true });
|
||||
await request(`${chainhookOpts.base_url}/ping`, { method: 'GET', throwOnError: true });
|
||||
break;
|
||||
} catch (error) {
|
||||
logger.error(error, 'Chainhook node not available, retrying...');
|
||||
@@ -93,11 +92,14 @@ export async function buildServer(
|
||||
}
|
||||
|
||||
async function registerPredicates(this: FastifyInstance) {
|
||||
logger.info(predicates, `EventServer registering predicates at ${base_path}...`);
|
||||
logger.info(
|
||||
predicates,
|
||||
`ChainhookEventObserver registering predicates at ${chainhookOpts.base_url}`
|
||||
);
|
||||
for (const predicate of predicates) {
|
||||
const thenThat: ThenThatHttpPost = {
|
||||
http_post: {
|
||||
url: `${serverOpts.external_base_url}/chainhook/${predicate.uuid}`,
|
||||
url: `${serverOpts.external_base_url}/chainhook/${encodeURIComponent(predicate.uuid)}`,
|
||||
authorization_header: `Bearer ${serverOpts.auth_token}`,
|
||||
},
|
||||
};
|
||||
@@ -105,33 +107,49 @@ export async function buildServer(
|
||||
const body = predicate as Predicate;
|
||||
if ('mainnet' in body.networks) body.networks.mainnet.then_that = thenThat;
|
||||
if ('testnet' in body.networks) body.networks.testnet.then_that = thenThat;
|
||||
await request(`${base_path}/v1/chainhooks`, {
|
||||
await request(`${chainhookOpts.base_url}/v1/chainhooks`, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(body),
|
||||
headers: { 'content-type': 'application/json' },
|
||||
throwOnError: true,
|
||||
});
|
||||
logger.info(`EventServer registered '${predicate.name}' predicate (${predicate.uuid})`);
|
||||
logger.info(
|
||||
`ChainhookEventObserver registered '${predicate.name}' predicate (${predicate.uuid})`
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(error, `EventServer unable to register predicate`);
|
||||
logger.error(error, `ChainhookEventObserver unable to register predicate`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function removePredicates(this: FastifyInstance) {
|
||||
logger.info(`EventServer closing predicates at ${base_path}...`);
|
||||
for (const predicate of predicates) {
|
||||
try {
|
||||
await request(`${base_path}/v1/chainhooks/${predicate.chain}/${predicate.uuid}`, {
|
||||
method: 'DELETE',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
throwOnError: true,
|
||||
});
|
||||
logger.info(`EventServer removed '${predicate.name}' predicate (${predicate.uuid})`);
|
||||
} catch (error) {
|
||||
logger.error(error, `EventServer unable to deregister predicate`);
|
||||
}
|
||||
}
|
||||
logger.info(`ChainhookEventObserver closing predicates at ${chainhookOpts.base_url}`);
|
||||
const removals = predicates.map(
|
||||
predicate =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
request(
|
||||
`${chainhookOpts.base_url}/v1/chainhooks/${predicate.chain}/${encodeURIComponent(
|
||||
predicate.uuid
|
||||
)}`,
|
||||
{
|
||||
method: 'DELETE',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
throwOnError: true,
|
||||
}
|
||||
)
|
||||
.then(() => {
|
||||
logger.info(
|
||||
`ChainhookEventObserver removed '${predicate.name}' predicate (${predicate.uuid})`
|
||||
);
|
||||
resolve();
|
||||
})
|
||||
.catch(error => {
|
||||
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
|
||||
reject(error);
|
||||
});
|
||||
})
|
||||
);
|
||||
await Promise.allSettled(removals);
|
||||
}
|
||||
|
||||
async function isEventAuthorized(request: FastifyRequest, reply: FastifyReply) {
|
||||
@@ -142,11 +160,11 @@ export async function buildServer(
|
||||
await reply.code(403).send();
|
||||
}
|
||||
|
||||
const EventServer: FastifyPluginCallback<Record<never, never>, Server, TypeBoxTypeProvider> = (
|
||||
fastify,
|
||||
options,
|
||||
done
|
||||
) => {
|
||||
const ChainhookEventObserver: FastifyPluginCallback<
|
||||
Record<never, never>,
|
||||
Server,
|
||||
TypeBoxTypeProvider
|
||||
> = (fastify, options, done) => {
|
||||
fastify.addHook('preHandler', isEventAuthorized);
|
||||
fastify.post(
|
||||
'/chainhook/:uuid',
|
||||
@@ -162,7 +180,7 @@ export async function buildServer(
|
||||
try {
|
||||
await callback(request.params.uuid, request.body);
|
||||
} catch (error) {
|
||||
logger.error(error, `EventServer error processing payload`);
|
||||
logger.error(error, `ChainhookEventObserver error processing payload`);
|
||||
await reply.code(422).send();
|
||||
}
|
||||
await reply.code(200).send();
|
||||
@@ -182,6 +200,6 @@ export async function buildServer(
|
||||
fastify.addHook('onReady', registerPredicates);
|
||||
fastify.addHook('onClose', removePredicates);
|
||||
|
||||
await fastify.register(EventServer);
|
||||
await fastify.register(ChainhookEventObserver);
|
||||
return fastify;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user