feat: class interface

This commit is contained in:
Rafael Cárdenas
2023-04-20 14:33:07 -06:00
parent 198cdaa6c8
commit 9dfec454f5
6 changed files with 299 additions and 228 deletions

View File

@@ -1,147 +1,30 @@
import { Type, TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import Fastify, {
FastifyInstance,
FastifyPluginCallback,
FastifyReply,
FastifyRequest,
} from 'fastify';
import { Server } from 'http';
import { request } from 'undici';
import { logger, PINO_CONFIG } from './util/logger';
import { timeout } from './util/helpers';
import { Payload, PayloadSchema } from './schemas';
import { Predicate, ThenThat } from './schemas/predicate';
import { FastifyInstance } from 'fastify';
import {
ServerOptions,
ChainhookNodeOptions,
ServerPredicate,
OnEventCallback,
buildServer,
} from './server';
export type OnEventCallback = (uuid: string, payload: Payload) => Promise<void>;
export class ChainhookEventServer {
private fastify?: FastifyInstance;
private serverOpts: ServerOptions;
private chainhookOpts: ChainhookNodeOptions;
type ServerOptions = {
server: {
host: string;
port: number;
auth_token: string;
external_hostname: string;
};
chainhook_node: {
hostname: string;
port: number;
};
};
/**
* Starts the chainhook event server.
* @returns Fastify instance
*/
export async function startServer(
opts: ServerOptions,
predicates: [Predicate],
callback: OnEventCallback
) {
const base_path = `http://${opts.chainhook_node.hostname}:${opts.chainhook_node.port}`;
async function waitForNode(this: FastifyInstance) {
logger.info(`EventServer connecting to chainhook node...`);
while (true) {
try {
await request(`${base_path}/ping`, { method: 'GET', throwOnError: true });
break;
} catch (error) {
logger.error(error, 'Chainhook node not available, retrying...');
await timeout(1000);
}
}
constructor(serverOpts: ServerOptions, chainhookOpts: ChainhookNodeOptions) {
this.serverOpts = serverOpts;
this.chainhookOpts = chainhookOpts;
}
async function registerPredicates(this: FastifyInstance) {
logger.info(predicates, `EventServer registering predicates on ${base_path}...`);
for (const predicate of predicates) {
const thenThat: ThenThat = {
http_post: {
url: `http://${opts.server.external_hostname}/chainhook/${predicate.uuid}`,
authorization_header: `Bearer ${opts.server.auth_token}`,
},
};
try {
const body = predicate;
if ('mainnet' in body.networks) body.networks.mainnet.then_that = thenThat;
if ('testnet' in body.networks) body.networks.testnet.then_that = thenThat;
await request(`${base_path}/v1/chainhooks`, {
method: 'POST',
body: JSON.stringify(body),
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer registered '${predicate.name}' predicate (${predicate.uuid})`);
} catch (error) {
logger.error(error, `EventServer unable to register predicate`);
}
}
async start(predicates: [ServerPredicate], callback: OnEventCallback) {
if (this.fastify) return;
this.fastify = await buildServer(this.serverOpts, this.chainhookOpts, predicates, callback);
return this.fastify.listen({ host: this.serverOpts.hostname, port: this.serverOpts.port });
}
async function removePredicates(this: FastifyInstance) {
logger.info(`EventServer closing predicates...`);
for (const predicate of predicates) {
try {
await request(`${base_path}/v1/chainhooks/${predicate.chain}/${predicate.uuid}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer removed '${predicate.name}' predicate (${predicate.uuid})`);
} catch (error) {
logger.error(error, `EventServer unable to deregister predicate`);
}
}
async close() {
await this.fastify?.close();
this.fastify = undefined;
}
async function isEventAuthorized(request: FastifyRequest, reply: FastifyReply) {
const authHeader = request.headers.authorization;
if (authHeader && authHeader === `Bearer ${opts.server.auth_token}`) {
return;
}
await reply.code(403).send();
}
const EventServer: FastifyPluginCallback<Record<never, never>, Server, TypeBoxTypeProvider> = (
fastify,
options,
done
) => {
fastify.addHook('preHandler', isEventAuthorized);
fastify.post(
'/chainhook/:uuid',
{
schema: {
params: Type.Object({
uuid: Type.String({ format: 'uuid' }),
}),
body: PayloadSchema,
},
},
async (request, reply) => {
try {
await callback(request.params.uuid, request.body);
} catch (error) {
logger.error(error, `EventServer error processing payload`);
await reply.code(422).send();
}
await reply.code(200).send();
}
);
done();
};
const fastify = Fastify({
trustProxy: true,
logger: PINO_CONFIG,
pluginTimeout: 0, // Disable so ping can retry indefinitely
bodyLimit: 41943040, // 40 MB
}).withTypeProvider<TypeBoxTypeProvider>();
fastify.addHook('onReady', waitForNode);
fastify.addHook('onReady', registerPredicates);
fastify.addHook('onClose', removePredicates);
await fastify.register(EventServer);
await fastify.listen({ host: opts.server.host, port: opts.server.port });
return fastify;
}

View File

@@ -1,4 +1,5 @@
import { Type } from '@sinclair/typebox';
import { Static, Type } from '@sinclair/typebox';
import { ThenThatSchema } from '../predicate';
export const BitcoinIfThisTxIdSchema = Type.Object({
scope: Type.Literal('txid'),
@@ -70,3 +71,38 @@ export const BitcoinIfThisOrdinalsFeedSchema = Type.Object({
scope: Type.Literal('ordinals_protocol'),
operation: Type.Literal('inscription_feed'),
});
export const BitcoinIfThisOptionsSchema = Type.Object({
start_block: Type.Optional(Type.Integer()),
end_block: Type.Optional(Type.Integer()),
expire_after_occurrence: Type.Optional(Type.Integer()),
include_proof: Type.Optional(Type.Boolean()),
include_inputs: Type.Optional(Type.Boolean()),
include_outputs: Type.Optional(Type.Boolean()),
include_witness: Type.Optional(Type.Boolean()),
});
export const BitcoinIfThisSchema = Type.Union([
BitcoinIfThisTxIdSchema,
BitcoinIfThisOpReturnStartsWithSchema,
BitcoinIfThisOpReturnEqualsSchema,
BitcoinIfThisOpReturnEndsWithSchema,
BitcoinIfThisP2PKHSchema,
BitcoinIfThisP2SHSchema,
BitcoinIfThisP2WPKHSchema,
BitcoinIfThisP2WSHSchema,
BitcoinIfThisStacksBlockCommittedSchema,
BitcoinIfThisStacksLeaderKeyRegisteredSchema,
BitcoinIfThisStacksStxTransferredSchema,
BitcoinIfThisStacksStxLockedSchema,
BitcoinIfThisOrdinalsFeedSchema,
]);
export type BitcoinIfThis = Static<typeof BitcoinIfThisSchema>;
export const BitcoinIfThisThenThatSchema = Type.Composite([
BitcoinIfThisOptionsSchema,
Type.Object({
if_this: BitcoinIfThisSchema,
then_that: ThenThatSchema,
}),
]);

View File

@@ -1,7 +1,8 @@
import { Static, Type } from '@sinclair/typebox';
import { StacksEvent } from './stacks';
import { BitcoinEvent } from './bitcoin';
import { IfThisSchema } from './predicate';
import { BitcoinIfThisSchema } from './bitcoin/if_this';
import { StacksIfThisSchema } from './stacks/if_this';
const EventArray = Type.Union([Type.Array(StacksEvent), Type.Array(BitcoinEvent)]);
@@ -10,7 +11,7 @@ export const PayloadSchema = Type.Object({
rollback: EventArray,
chainhook: Type.Object({
uuid: Type.String(),
predicate: IfThisSchema,
predicate: Type.Union([BitcoinIfThisSchema, StacksIfThisSchema]),
}),
});
export type Payload = Static<typeof PayloadSchema>;

View File

@@ -1,97 +1,44 @@
import { Static, Type } from '@sinclair/typebox';
import {
BitcoinIfThisTxIdSchema,
BitcoinIfThisOpReturnStartsWithSchema,
BitcoinIfThisOpReturnEqualsSchema,
BitcoinIfThisOpReturnEndsWithSchema,
BitcoinIfThisP2PKHSchema,
BitcoinIfThisP2SHSchema,
BitcoinIfThisP2WPKHSchema,
BitcoinIfThisP2WSHSchema,
BitcoinIfThisStacksBlockCommittedSchema,
BitcoinIfThisStacksLeaderKeyRegisteredSchema,
BitcoinIfThisStacksStxTransferredSchema,
BitcoinIfThisStacksStxLockedSchema,
BitcoinIfThisOrdinalsFeedSchema,
} from './bitcoin/predicate';
import {
StacksIfThisTxIdSchema,
StacksIfThisBlockHeightHigherThanSchema,
StacksIfThisFtEventSchema,
StacksIfThisNftEventSchema,
StacksIfThisStxEventSchema,
StacksIfThisPrintEventSchema,
StacksIfThisContractCallSchema,
StacksIfThisContractDeploymentSchema,
StacksIfThisContractDeploymentTraitSchema,
} from './stacks/predicate';
import { BitcoinIfThisThenThatSchema } from './bitcoin/if_this';
import { StacksIfThisThenThatSchema } from './stacks/if_this';
export const IfThisSchema = Type.Union([
BitcoinIfThisTxIdSchema,
BitcoinIfThisOpReturnStartsWithSchema,
BitcoinIfThisOpReturnEqualsSchema,
BitcoinIfThisOpReturnEndsWithSchema,
BitcoinIfThisP2PKHSchema,
BitcoinIfThisP2SHSchema,
BitcoinIfThisP2WPKHSchema,
BitcoinIfThisP2WSHSchema,
BitcoinIfThisStacksBlockCommittedSchema,
BitcoinIfThisStacksLeaderKeyRegisteredSchema,
BitcoinIfThisStacksStxTransferredSchema,
BitcoinIfThisStacksStxLockedSchema,
BitcoinIfThisOrdinalsFeedSchema,
StacksIfThisTxIdSchema,
StacksIfThisBlockHeightHigherThanSchema,
StacksIfThisFtEventSchema,
StacksIfThisNftEventSchema,
StacksIfThisStxEventSchema,
StacksIfThisPrintEventSchema,
StacksIfThisContractCallSchema,
StacksIfThisContractDeploymentSchema,
StacksIfThisContractDeploymentTraitSchema,
]);
export type IfThis = Static<typeof IfThisSchema>;
export const ThenThatFileAppendSchema = Type.Object({
file_append: Type.Object({
path: Type.String(),
}),
});
export type ThenThatFileAppend = Static<typeof ThenThatFileAppendSchema>;
export const ThenThatSchema = Type.Union([
Type.Object({
file_append: Type.Object({
path: Type.String(),
}),
export const ThenThatHttpPostSchema = Type.Object({
http_post: Type.Object({
url: Type.String({ format: 'uri' }),
authorization_header: Type.String(),
}),
Type.Object({
http_post: Type.Object({
url: Type.String({ format: 'uri' }),
authorization_header: Type.String(),
}),
}),
]);
});
export type ThenThatHttpPost = Static<typeof ThenThatHttpPostSchema>;
export const ThenThatSchema = Type.Union([ThenThatFileAppendSchema, ThenThatHttpPostSchema]);
export type ThenThat = Static<typeof ThenThatSchema>;
export const IfThisThenThatSchema = Type.Object({
start_block: Type.Optional(Type.Integer()),
end_block: Type.Optional(Type.Integer()),
expire_after_occurrence: Type.Optional(Type.Integer()),
include_proof: Type.Optional(Type.Boolean()),
include_inputs: Type.Optional(Type.Boolean()),
include_outputs: Type.Optional(Type.Boolean()),
include_witness: Type.Optional(Type.Boolean()),
if_this: IfThisSchema,
then_that: ThenThatSchema,
});
export type IfThisThenThat = Static<typeof IfThisThenThatSchema>;
export const PredicateSchema = Type.Object({
export const PredicateHeaderSchema = Type.Object({
uuid: Type.String({ format: 'uuid' }),
name: Type.String(),
version: Type.Integer(),
chain: Type.String(),
networks: Type.Union([
Type.Object({
mainnet: IfThisThenThatSchema,
}),
Type.Object({
testnet: IfThisThenThatSchema,
}),
]),
});
export type PredicateHeader = Static<typeof PredicateHeaderSchema>;
export const PredicateSchema = Type.Composite([
PredicateHeaderSchema,
Type.Object({
networks: Type.Union([
Type.Object({
mainnet: Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema]),
}),
Type.Object({
testnet: Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema]),
}),
]),
}),
]);
export type Predicate = Static<typeof PredicateSchema>;

View File

@@ -1,4 +1,5 @@
import { Type } from '@sinclair/typebox';
import { Static, Type } from '@sinclair/typebox';
import { ThenThatSchema } from '../predicate';
export const StacksIfThisTxIdSchema = Type.Object({
scope: Type.Literal('txid'),
@@ -49,3 +50,31 @@ export const StacksIfThisContractDeploymentTraitSchema = Type.Object({
scope: Type.Literal('contract_deployment'),
implement_trait: Type.String(),
});
export const StacksIfThisOptionsSchema = Type.Object({
start_block: Type.Optional(Type.Integer()),
end_block: Type.Optional(Type.Integer()),
expire_after_occurrence: Type.Optional(Type.Integer()),
decode_clarity_values: Type.Optional(Type.Boolean()),
});
export const StacksIfThisSchema = Type.Union([
StacksIfThisTxIdSchema,
StacksIfThisBlockHeightHigherThanSchema,
StacksIfThisFtEventSchema,
StacksIfThisNftEventSchema,
StacksIfThisStxEventSchema,
StacksIfThisPrintEventSchema,
StacksIfThisContractCallSchema,
StacksIfThisContractDeploymentSchema,
StacksIfThisContractDeploymentTraitSchema,
]);
export type StacksIfThis = Static<typeof StacksIfThisSchema>;
export const StacksIfThisThenThatSchema = Type.Composite([
StacksIfThisOptionsSchema,
Type.Object({
if_this: StacksIfThisSchema,
then_that: ThenThatSchema,
}),
]);

View File

@@ -0,0 +1,175 @@
import { Static, Type, TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import Fastify, {
FastifyInstance,
FastifyPluginCallback,
FastifyReply,
FastifyRequest,
} from 'fastify';
import { Server } from 'http';
import { request } from 'undici';
import { logger, PINO_CONFIG } from './util/logger';
import { timeout } from './util/helpers';
import { Payload, PayloadSchema } from './schemas';
import { Predicate, PredicateHeaderSchema, ThenThatHttpPost } from './schemas/predicate';
import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this';
import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this';
export type OnEventCallback = (uuid: string, payload: Payload) => Promise<void>;
const ServerOptionsSchema = Type.Object({
hostname: Type.String(),
port: Type.Integer(),
auth_token: Type.String(),
external_host: Type.String(),
});
export type ServerOptions = Static<typeof ServerOptionsSchema>;
const ChainhookNodeOptionsSchema = Type.Object({
hostname: Type.String(),
port: Type.Integer(),
});
export type ChainhookNodeOptions = Static<typeof ChainhookNodeOptionsSchema>;
const IfThisThenNothingSchema = Type.Union([
Type.Composite([
BitcoinIfThisOptionsSchema,
Type.Object({
if_this: BitcoinIfThisSchema,
}),
]),
Type.Composite([
StacksIfThisOptionsSchema,
Type.Object({
if_this: StacksIfThisSchema,
}),
]),
]);
const ServerPredicateSchema = Type.Composite([
PredicateHeaderSchema,
Type.Object({
networks: Type.Union([
Type.Object({
mainnet: IfThisThenNothingSchema,
}),
Type.Object({
testnet: IfThisThenNothingSchema,
}),
]),
}),
]);
export type ServerPredicate = Static<typeof ServerPredicateSchema>;
export async function buildServer(
serverOpts: ServerOptions,
chainhookOpts: ChainhookNodeOptions,
predicates: [ServerPredicate],
callback: OnEventCallback
) {
const base_path = `http://${chainhookOpts.hostname}:${chainhookOpts.port}`;
async function waitForNode(this: FastifyInstance) {
logger.info(`EventServer connecting to chainhook node...`);
while (true) {
try {
await request(`${base_path}/ping`, { method: 'GET', throwOnError: true });
break;
} catch (error) {
logger.error(error, 'Chainhook node not available, retrying...');
await timeout(1000);
}
}
}
async function registerPredicates(this: FastifyInstance) {
logger.info(predicates, `EventServer registering predicates on ${base_path}...`);
for (const predicate of predicates) {
const thenThat: ThenThatHttpPost = {
http_post: {
url: `http://${serverOpts.external_host}/chainhook/${predicate.uuid}`,
authorization_header: `Bearer ${serverOpts.auth_token}`,
},
};
try {
const body = predicate as Predicate;
if ('mainnet' in body.networks) body.networks.mainnet.then_that = thenThat;
if ('testnet' in body.networks) body.networks.testnet.then_that = thenThat;
await request(`${base_path}/v1/chainhooks`, {
method: 'POST',
body: JSON.stringify(body),
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer registered '${predicate.name}' predicate (${predicate.uuid})`);
} catch (error) {
logger.error(error, `EventServer unable to register predicate`);
}
}
}
async function removePredicates(this: FastifyInstance) {
logger.info(`EventServer closing predicates...`);
for (const predicate of predicates) {
try {
await request(`${base_path}/v1/chainhooks/${predicate.chain}/${predicate.uuid}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`EventServer removed '${predicate.name}' predicate (${predicate.uuid})`);
} catch (error) {
logger.error(error, `EventServer unable to deregister predicate`);
}
}
}
async function isEventAuthorized(request: FastifyRequest, reply: FastifyReply) {
const authHeader = request.headers.authorization;
if (authHeader && authHeader === `Bearer ${serverOpts.auth_token}`) {
return;
}
await reply.code(403).send();
}
const EventServer: FastifyPluginCallback<Record<never, never>, Server, TypeBoxTypeProvider> = (
fastify,
options,
done
) => {
fastify.addHook('preHandler', isEventAuthorized);
fastify.post(
'/chainhook/:uuid',
{
schema: {
params: Type.Object({
uuid: Type.String({ format: 'uuid' }),
}),
body: PayloadSchema,
},
},
async (request, reply) => {
try {
await callback(request.params.uuid, request.body);
} catch (error) {
logger.error(error, `EventServer error processing payload`);
await reply.code(422).send();
}
await reply.code(200).send();
}
);
done();
};
const fastify = Fastify({
trustProxy: true,
logger: PINO_CONFIG,
pluginTimeout: 0, // Disable so ping can retry indefinitely
bodyLimit: 41943040, // 40 MB
}).withTypeProvider<TypeBoxTypeProvider>();
fastify.addHook('onReady', waitForNode);
fastify.addHook('onReady', registerPredicates);
fastify.addHook('onClose', removePredicates);
await fastify.register(EventServer);
return fastify;
}