fix: optimize reg and dereg

This commit is contained in:
Rafael Cardenas
2023-06-02 14:50:25 -06:00
parent f97007b4b4
commit c2ec1b5283
3 changed files with 54 additions and 34 deletions

View File

@@ -40,7 +40,7 @@ server helpers to handle all node interactions transparently.
1. Create configuration objects for the local server and the Chainhook node you'll be interacting
with
```typescript
import { ServerOptions, ChainhookNodeOptions } from "@hirosystems/chainhook-client/dist/server";
import { ServerOptions, ChainhookNodeOptions } from "@hirosystems/chainhook-client";
// Local server options
const opts: ServerOptions = {
@@ -54,8 +54,7 @@ server helpers to handle all node interactions transparently.
// Chainhook node options
const chainhook: ChainhookNodeOptions = {
hostname: "<node_hostname>",
port: 20456
base_url: "<node_base_url>"
};
```

View File

@@ -47,3 +47,6 @@ export class ChainhookEventObserver {
this.fastify = undefined;
}
}
export * from './server';
export * from './schemas/payload';

View File

@@ -27,8 +27,7 @@ const ServerOptionsSchema = Type.Object({
export type ServerOptions = Static<typeof ServerOptionsSchema>;
const ChainhookNodeOptionsSchema = Type.Object({
hostname: Type.String(),
port: Type.Integer(),
base_url: Type.String(),
});
/** Chainhook node connection options */
export type ChainhookNodeOptions = Static<typeof ChainhookNodeOptionsSchema>;
@@ -77,13 +76,13 @@ export async function buildServer(
predicates: [ServerPredicate],
callback: OnEventCallback
) {
const base_path = `http://${chainhookOpts.hostname}:${chainhookOpts.port}`;
async function waitForNode(this: FastifyInstance) {
logger.info(`EventServer connecting to chainhook node at ${base_path}...`);
logger.info(
`ChainhookEventObserver connecting to chainhook node at ${chainhookOpts.base_url}...`
);
while (true) {
try {
await request(`${base_path}/ping`, { method: 'GET', throwOnError: true });
await request(`${chainhookOpts.base_url}/ping`, { method: 'GET', throwOnError: true });
break;
} catch (error) {
logger.error(error, 'Chainhook node not available, retrying...');
@@ -93,11 +92,14 @@ export async function buildServer(
}
async function registerPredicates(this: FastifyInstance) {
logger.info(predicates, `EventServer registering predicates at ${base_path}...`);
logger.info(
predicates,
`ChainhookEventObserver registering predicates at ${chainhookOpts.base_url}`
);
for (const predicate of predicates) {
const thenThat: ThenThatHttpPost = {
http_post: {
url: `${serverOpts.external_base_url}/chainhook/${predicate.uuid}`,
url: `${serverOpts.external_base_url}/chainhook/${encodeURIComponent(predicate.uuid)}`,
authorization_header: `Bearer ${serverOpts.auth_token}`,
},
};
@@ -105,33 +107,49 @@ export async function buildServer(
const body = predicate as Predicate;
if ('mainnet' in body.networks) body.networks.mainnet.then_that = thenThat;
if ('testnet' in body.networks) body.networks.testnet.then_that = thenThat;
await request(`${base_path}/v1/chainhooks`, {
await request(`${chainhookOpts.base_url}/v1/chainhooks`, {
method: 'POST',
body: JSON.stringify(body),
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer registered '${predicate.name}' predicate (${predicate.uuid})`);
logger.info(
`ChainhookEventObserver registered '${predicate.name}' predicate (${predicate.uuid})`
);
} catch (error) {
logger.error(error, `EventServer unable to register predicate`);
logger.error(error, `ChainhookEventObserver unable to register predicate`);
}
}
}
async function removePredicates(this: FastifyInstance) {
logger.info(`EventServer closing predicates at ${base_path}...`);
for (const predicate of predicates) {
try {
await request(`${base_path}/v1/chainhooks/${predicate.chain}/${predicate.uuid}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer removed '${predicate.name}' predicate (${predicate.uuid})`);
} catch (error) {
logger.error(error, `EventServer unable to deregister predicate`);
}
}
logger.info(`ChainhookEventObserver closing predicates at ${chainhookOpts.base_url}`);
const removals = predicates.map(
predicate =>
new Promise<void>((resolve, reject) => {
request(
`${chainhookOpts.base_url}/v1/chainhooks/${predicate.chain}/${encodeURIComponent(
predicate.uuid
)}`,
{
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
}
)
.then(() => {
logger.info(
`ChainhookEventObserver removed '${predicate.name}' predicate (${predicate.uuid})`
);
resolve();
})
.catch(error => {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
reject(error);
});
})
);
await Promise.allSettled(removals);
}
async function isEventAuthorized(request: FastifyRequest, reply: FastifyReply) {
@@ -142,11 +160,11 @@ export async function buildServer(
await reply.code(403).send();
}
const EventServer: FastifyPluginCallback<Record<never, never>, Server, TypeBoxTypeProvider> = (
fastify,
options,
done
) => {
const ChainhookEventObserver: FastifyPluginCallback<
Record<never, never>,
Server,
TypeBoxTypeProvider
> = (fastify, options, done) => {
fastify.addHook('preHandler', isEventAuthorized);
fastify.post(
'/chainhook/:uuid',
@@ -162,7 +180,7 @@ export async function buildServer(
try {
await callback(request.params.uuid, request.body);
} catch (error) {
logger.error(error, `EventServer error processing payload`);
logger.error(error, `ChainhookEventObserver error processing payload`);
await reply.code(422).send();
}
await reply.code(200).send();
@@ -182,6 +200,6 @@ export async function buildServer(
fastify.addHook('onReady', registerPredicates);
fastify.addHook('onClose', removePredicates);
await fastify.register(EventServer);
await fastify.register(ChainhookEventObserver);
return fastify;
}