From 4d0fc0584dafde34637b80f4d2ccea8512ddb46f Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 16 Mar 2020 15:38:32 +0100 Subject: [PATCH] feat: initial support for reading JSON messages from the core-node event socket Keeping the stacks-p2p code in separate dir for possible mempool syncing later on. --- package-lock.json | 6 +- src/{binaryReader.ts => binary-reader.ts} | 0 src/event-stream/core-node-message.ts | 67 +++++++++++++++++++ src/event-stream/reader.ts | 17 +++++ src/index.ts | 21 +----- .../block-header.ts} | 2 +- src/{blockReader.ts => p2p/block.ts} | 6 +- src/{stacks-p2p.ts => p2p/messages.ts} | 28 ++++++-- src/{txReader.ts => p2p/tx.ts} | 6 +- 9 files changed, 119 insertions(+), 34 deletions(-) rename src/{binaryReader.ts => binary-reader.ts} (100%) create mode 100644 src/event-stream/core-node-message.ts create mode 100644 src/event-stream/reader.ts rename src/{blockHeaderReader.ts => p2p/block-header.ts} (98%) rename src/{blockReader.ts => p2p/block.ts} (73%) rename src/{stacks-p2p.ts => p2p/messages.ts} (82%) rename src/{txReader.ts => p2p/tx.ts} (99%) diff --git a/package-lock.json b/package-lock.json index 1421a291..8bfab024 100644 --- a/package-lock.json +++ b/package-lock.json @@ -137,9 +137,9 @@ } }, "acorn": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.1.0.tgz", - "integrity": "sha512-kL5CuoXA/dgxlBbVrflsflzQ3PAas7RYZB52NOm/6839iVYJgKMJ3cQJD+t2i5+qFa8h3MDpEOJiS64E8JLnSQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.1.1.tgz", + "integrity": "sha512-add7dgA5ppRPxCFJoAGfMDi7PIBXq1RtGo7BhbLaxwrXPOmw8gq48Y9ozT01hUKy9byMjlR20EJhu5zlkErEkg==", "dev": true }, "acorn-jsx": { diff --git a/src/binaryReader.ts b/src/binary-reader.ts similarity index 100% rename from src/binaryReader.ts rename to src/binary-reader.ts diff --git a/src/event-stream/core-node-message.ts b/src/event-stream/core-node-message.ts new file mode 100644 index 00000000..150d61bf --- /dev/null +++ b/src/event-stream/core-node-message.ts @@ -0,0 +1,67 @@ +export enum CoreNodeEventType { + ContractEvent = 'contract_event', + StxTransferEvent = 'stx_transfer_event', + StxMintEvent = 'stx_mint_event', + StxBurnEvent = 'stx_burn_event', + NftTransferEvent = 'nft_transfer_event', + NftMintEvent = 'nft_mint_event', + FtTransferEvent = 'ft_transfer_event', + FtMintEvent = 'ft_mint_event', +} + +export interface CoreNodeEventMessage { + type: CoreNodeEventType; +} + +export interface SmartContractEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.ContractEvent; + contract_event: { + contract_identifier: string; + topic: string; + value: string; + }; +} + +export interface StxTransferEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.StxTransferEvent; + stx_transfer_event: any; +} + +export interface StxMintEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.StxMintEvent; + stx_mint_event: any; +} + +export interface StxBurnEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.StxBurnEvent; + stx_burn_event: any; +} + +export interface NftTransferEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.NftTransferEvent; + nft_transfer_event: any; +} + +export interface NftMintEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.NftMintEvent; + nft_mint_event: any; +} + +export interface FtTransferEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.FtTransferEvent; + ft_transfer_event: any; +} + +export interface FtMintEvent extends CoreNodeEventMessage { + type: CoreNodeEventType.FtMintEvent; + ft_mint_event: any; +} + +export interface CoreNodeMessage { + block_hash: string; + index_block_hash: string; + parent_block_hash: string; + parent_microblock: string; + events: any[]; + transactions: string[]; +} diff --git a/src/event-stream/reader.ts b/src/event-stream/reader.ts new file mode 100644 index 00000000..db7c72eb --- /dev/null +++ b/src/event-stream/reader.ts @@ -0,0 +1,17 @@ +import * as net from 'net'; +import { CoreNodeMessage } from './core-node-message'; + +/** + * Read JSON messages from a core-node event stream socket. + */ +export async function readMessageFromSocket(socket: net.Socket): Promise { + let data: Buffer = Buffer.alloc(0); + for await (const chunk of socket) { + data = Buffer.concat([data, chunk]); + } + const jsonString = data.toString('utf8'); + const message: CoreNodeMessage = JSON.parse(jsonString); + console.log(`${Date.now()} Received core-node message:`); + console.log(message); + return message; +} diff --git a/src/index.ts b/src/index.ts index 644a5691..f6fa0be5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,26 +1,9 @@ import * as net from 'net'; -import { BinaryReader } from './binaryReader'; -import { readMessages, StacksMessageTypeID } from './stacks-p2p'; -import { NotImplementedError } from './errors'; -import { getEnumDescription } from './helpers'; - -async function readSocket(socket: net.Socket): Promise { - const binaryReader = new BinaryReader(socket); - for await (const message of readMessages(binaryReader)) { - const msgType = message.messageTypeId; - if (msgType === StacksMessageTypeID.Blocks) { - console.log(`${Date.now()} Received Stacks message type: StacksMessageTypeID.Blocks`); - } else if (msgType === StacksMessageTypeID.Transaction) { - console.log(`${Date.now()} Received Stacks message type: StacksMessageTypeID.Transaction`); - } else { - throw new NotImplementedError(`handler for message type: ${getEnumDescription(StacksMessageTypeID, msgType)}`); - } - } -} +import { readMessageFromSocket } from './event-stream/reader'; const server = net.createServer(clientSocket => { console.log('client connected'); - readSocket(clientSocket).catch(error => { + readMessageFromSocket(clientSocket).catch(error => { console.error(`error reading messages from socket: ${error}`); console.error(error); clientSocket.destroy(); diff --git a/src/blockHeaderReader.ts b/src/p2p/block-header.ts similarity index 98% rename from src/blockHeaderReader.ts rename to src/p2p/block-header.ts index e44f3b4a..32bd1cc4 100644 --- a/src/blockHeaderReader.ts +++ b/src/p2p/block-header.ts @@ -1,4 +1,4 @@ -import { BufferReader } from './binaryReader'; +import { BufferReader } from '../binary-reader'; /* const blockHeaderSize = diff --git a/src/blockReader.ts b/src/p2p/block.ts similarity index 73% rename from src/blockReader.ts rename to src/p2p/block.ts index 129680db..0f0eaa3f 100644 --- a/src/blockReader.ts +++ b/src/p2p/block.ts @@ -1,6 +1,6 @@ -import { BufferReader } from './binaryReader'; -import { readBlockHeader, BlockHeader } from './blockHeaderReader'; -import { readTransactions, Transaction } from './txReader'; +import { BufferReader } from '../binary-reader'; +import { readBlockHeader, BlockHeader } from './block-header'; +import { readTransactions, Transaction } from './tx'; export interface Block { header: BlockHeader; diff --git a/src/stacks-p2p.ts b/src/p2p/messages.ts similarity index 82% rename from src/stacks-p2p.ts rename to src/p2p/messages.ts index 6ea3099a..2cded5ba 100644 --- a/src/stacks-p2p.ts +++ b/src/p2p/messages.ts @@ -1,8 +1,9 @@ -import { BinaryReader, BufferReader } from './binaryReader'; -import { readBlocks, Block } from './blockReader'; -import { Transaction, readTransaction } from './txReader'; -import { StacksMessageParsingError, NotImplementedError } from './errors'; -import { getEnumDescription } from './helpers'; +import * as net from 'net'; +import { BinaryReader, BufferReader } from '../binary-reader'; +import { readBlocks, Block } from './block'; +import { Transaction, readTransaction } from './tx'; +import { StacksMessageParsingError, NotImplementedError } from '../errors'; +import { getEnumDescription } from '../helpers'; export enum StacksMessageTypeID { Handshake = 0, @@ -138,3 +139,20 @@ export async function* readMessages(stream: BinaryReader): AsyncGenerator { + const binaryReader = new BinaryReader(socket); + for await (const message of readMessages(binaryReader)) { + const msgType = message.messageTypeId; + if (msgType === StacksMessageTypeID.Blocks) { + console.log(`${Date.now()} Received Stacks message type: StacksMessageTypeID.Blocks`); + } else if (msgType === StacksMessageTypeID.Transaction) { + console.log(`${Date.now()} Received Stacks message type: StacksMessageTypeID.Transaction`); + } else { + throw new NotImplementedError(`handler for message type: ${getEnumDescription(StacksMessageTypeID, msgType)}`); + } + } +} diff --git a/src/txReader.ts b/src/p2p/tx.ts similarity index 99% rename from src/txReader.ts rename to src/p2p/tx.ts index 0fc3823d..be2f5e00 100644 --- a/src/txReader.ts +++ b/src/p2p/tx.ts @@ -1,6 +1,6 @@ -import { BufferReader } from './binaryReader'; -import { getEnumDescription } from './helpers'; -import { StacksMessageParsingError, NotImplementedError } from './errors'; +import { BufferReader } from '../binary-reader'; +import { getEnumDescription } from '../helpers'; +import { StacksMessageParsingError, NotImplementedError } from '../errors'; enum SigHashMode { /** SingleSigHashMode */