feat: implement subset of Stacks p2p stream consumer

Refs #5
This commit is contained in:
Matthew Little
2020-02-19 09:57:27 -05:00
parent 3dad3d4660
commit 1a6e6eea90
10 changed files with 1365 additions and 181 deletions

16
.eslintrc.js Normal file
View File

@@ -0,0 +1,16 @@
module.exports = {
root: true,
parser: '@typescript-eslint/parser',
plugins: [
'@typescript-eslint',
],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier/@typescript-eslint',
],
rules: {
'@typescript-eslint/no-use-before-define': "off",
}
};

7
.vscode/extensions.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"recommendations": [
"esbenp.prettier-vscode",
"editorconfig.editorconfig",
"dbaeumer.vscode-eslint"
]
}

1102
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,8 @@
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"build": "tsc"
"build": "tsc",
"lint": "eslint . --ext .js,.jsx,.ts,.tsx"
},
"repository": {
"type": "git",
@@ -23,5 +24,11 @@
"smart-buffer": "^4.1.0",
"ts-node": "^8.6.2",
"typescript": "^3.7.5"
},
"devDependencies": {
"@typescript-eslint/eslint-plugin": "^2.20.0",
"@typescript-eslint/parser": "^2.20.0",
"eslint": "^6.8.0",
"eslint-config-prettier": "^6.10.0"
}
}

View File

@@ -1,75 +1,14 @@
import { Readable } from 'stream';
import { SmartBuffer } from 'smart-buffer';
function readExact(stream: Readable, byteCount: number, callback: (error: Error | null, data?: Buffer) => void): void {
const chunk: Buffer = stream.read(byteCount);
if (chunk !== null) {
if (chunk.length !== byteCount) {
callback(new Error(`Unexpected chunk length, expected '${byteCount}', received '${chunk.byteLength}'`));
}
callback(null, chunk);
} else {
stream.once('readable', () => {
readExact(stream, byteCount, callback);
});
}
}
export function readBuffer(stream: Readable, byteCount: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
readExact(stream, byteCount, (error, data) => {
if (error) {
reject(error);
} else {
resolve(data);
}
})
});
}
export async function readByte(stream: Readable): Promise<number> {
const buffer = await readBuffer(stream, 1);
return buffer[0];
}
export async function readUInt16BE(stream: Readable): Promise<number> {
const buffer = await readBuffer(stream, 2);
return buffer.readUInt16BE(0);
}
export async function readUInt32BE(stream: Readable): Promise<number> {
const buffer = await readBuffer(stream, 4);
return buffer.readUInt32BE(0);
}
export async function readUInt64BE(stream: Readable): Promise<bigint> {
const buffer = await readBuffer(stream, 8);
return buffer.readBigUInt64BE(0);
}
export async function read32Bytes(stream: Readable): Promise<Buffer> {
const buffer = await readBuffer(stream, 32);
return buffer;
}
export class BufferReader extends SmartBuffer {
constructor(buff: Buffer) {
super({ buff: buff });
}
static async fromStream(stream: Readable, length: number): Promise<BufferReader> {
const buff = await readBuffer(stream, length);
return new BufferReader(buff);
}
class BufferReader extends SmartBuffer {
readBigUIntLE(length: number): bigint {
const buffer = Buffer.from(this.readBuffer(length)).reverse();
const hex = buffer.toString('hex');
const num = BigInt(`0x${hex}`);
return num;
}
readBigUIntBE(length: number): bigint {
const buffer = this.readBuffer(length);
const hex = buffer.toString('hex');
@@ -77,3 +16,64 @@ export class BufferReader extends SmartBuffer {
return num;
}
}
export class BinaryReader {
readonly stream: Readable;
constructor (stream: Readable) {
this.stream = stream;
}
private readExact(length: number, callback: (error: Error | null, data?: Buffer) => void): void {
const chunk: Buffer = this.stream.read(length);
if (chunk !== null) {
if (chunk.length !== length) {
callback(new Error(`Unexpected chunk length, expected '${length}', received '${chunk.length}'`));
}
callback(null, chunk);
} else {
this.stream.once('readable', () => {
this.readExact(length, callback);
});
}
}
readBuffer(length: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
this.readExact(length, (error, data) => {
if (error) {
reject(error);
} else {
resolve(data);
}
})
});
}
readUInt8(): Promise<number> {
return this.readBuffer(1).then(buffer => buffer[0]);
}
readUInt16BE(): Promise<number> {
return this.readBuffer(2).then(buffer => buffer.readUInt16BE(0));
}
readUInt32BE(): Promise<number> {
return this.readBuffer(4).then(buffer => buffer.readUInt32BE(0));
}
readUInt64BE(): Promise<bigint> {
return this.readBuffer(8).then(buffer => buffer.readBigUInt64BE(0));
}
read32Bytes(): Promise<Buffer> {
return this.readBuffer(32);
}
sync(length: number): Promise<BufferReader> {
return this.readBuffer(length).then(buffer => new BufferReader({ buff: buffer }));
}
}

68
src/blockHeaderReader.ts Normal file
View File

@@ -0,0 +1,68 @@
import { BinaryReader } from './binaryReader';
const blockHeaderSize =
1 + // version number
16 + // proof score
80 + // VRF proof
32 + // parent block hash
32 + // parent microblock hash
2 + // parent microblock sequence number
32 + // transaction merkle root
32 + // state merkle root
20 // microblock public key hash
export interface BlockHeader {
/** Version number to describe how to validate the block. */
version: number;
/** How much work has gone into this chain so far. */
workScore: {
/** Number of burn tokens destroyed. */
burn: bigint;
/** In Stacks, "work" == the length of the fork. */
work: bigint;
};
/** RFC-compliant VRF. Must match the burn commitment transaction on the burn chain (in particular, it must hash to its VRF seed). */
vrfProof: {
/** Compressed Ed25519 point. */
gamma: Buffer;
/** Ed25519 scalar - unsigned integer */
c: bigint;
/** Ed25519 scalar - unsigned integer */
s: bigint;
};
/** The SHA512/256 hash of the last anchored block that precedes this block in the fork to which this block is to be appended. */
parentBlockHash: Buffer;
/** The SHA512/256 hash of the last streamed block that precedes this block in the fork to which this block is to be appended. */
parentMicroblockHash: Buffer;
/** The sequence number of the parent microblock to which this anchored block is attached. */
parentMicroblockSequence: number;
/** The SHA512/256 root hash of a binary Merkle tree calculated over the sequence of transactions in this block. */
txMerkleRootHash: Buffer;
/** The SHA512/256 root hash of a MARF index over the state of the blockchain. */
stateMerkleRootHash: Buffer;
/** The Hash160 of a compressed public key whose private key will be used to sign microblocks during the peer's tenure. */
microblockPubkeyHash: Buffer;
}
export async function readBlockHeader(stream: BinaryReader): Promise<BlockHeader> {
const cursor = await stream.sync(blockHeaderSize);
const header: BlockHeader = {
version: cursor.readUInt8(),
workScore: {
burn: cursor.readBigUInt64BE(),
work: cursor.readBigUInt64BE(),
},
vrfProof: {
gamma: cursor.readBuffer(32),
c: cursor.readBigUIntLE(16),
s: cursor.readBigUIntLE(32),
},
parentBlockHash: cursor.readBuffer(32),
parentMicroblockHash: cursor.readBuffer(32),
parentMicroblockSequence: cursor.readUInt16BE(),
txMerkleRootHash: cursor.readBuffer(32),
stateMerkleRootHash: cursor.readBuffer(32),
microblockPubkeyHash: cursor.readBuffer(20)
};
return header;
}

View File

@@ -1,69 +1,29 @@
import { Readable } from 'stream';
import { BufferReader } from './binaryReader';
import { BinaryReader } from './binaryReader';
import { readBlockHeader, BlockHeader } from './blockHeaderReader';
import { readTransactions, Transaction } from './txReader';
import { StacksMessageTypeID } from './stacks-p2p';
const blockHeaderSize =
1 + // version number
16 + // proof score
80 + // VRF proof
32 + // parent block hash
32 + // parent microblock hash
2 + // parent microblock sequence number
32 + // transaction merkle root
32 + // state merkle root
20 // microblock public key hash
interface BlockHeader {
/** Version number to describe how to validate the block. */
version: number;
/** How much work has gone into this chain so far. */
workScore: {
/** Number of burn tokens destroyed. */
burn: bigint;
/** In Stacks, "work" == the length of the fork. */
work: bigint;
};
/** RFC-compliant VRF. Must match the burn commitment transaction on the burn chain (in particular, it must hash to its VRF seed). */
vrfProof: {
/** Compressed Ed25519 point. */
gamma: Buffer;
/** Ed25519 scalar - unsigned integer */
c: bigint;
/** Ed25519 scalar - unsigned integer */
s: bigint;
};
/** The SHA512/256 hash of the last anchored block that precedes this block in the fork to which this block is to be appended. */
parentBlockHash: Buffer;
/** The SHA512/256 hash of the last streamed block that precedes this block in the fork to which this block is to be appended. */
parentMicroblockHash: Buffer;
/** The sequence number of the parent microblock to which this anchored block is attached. */
parentMicroblockSequence: number;
/** The SHA512/256 root hash of a binary Merkle tree calculated over the sequence of transactions in this block. */
txMerkleRootHash: Buffer;
/** The SHA512/256 root hash of a MARF index over the state of the blockchain. */
stateMerkleRootHash: Buffer;
/** The Hash160 of a compressed public key whose private key will be used to sign microblocks during the peer's tenure. */
microblockPubkeyHash: Buffer;
export interface Block {
header: BlockHeader;
transactions: Transaction[];
}
export async function readBlockHeader(stream: Readable): Promise<BlockHeader> {
const cursor = await BufferReader.fromStream(stream, blockHeaderSize);
const header: BlockHeader = {
version: cursor.readUInt8(),
workScore: {
burn: cursor.readBigUInt64BE(),
work: cursor.readBigUInt64BE(),
},
vrfProof: {
gamma: cursor.readBuffer(32),
c: cursor.readBigUIntLE(16),
s: cursor.readBigUIntLE(32),
},
parentBlockHash: cursor.readBuffer(32),
parentMicroblockHash: cursor.readBuffer(32),
parentMicroblockSequence: cursor.readUInt16BE(),
txMerkleRootHash: cursor.readBuffer(32),
stateMerkleRootHash: cursor.readBuffer(32),
microblockPubkeyHash: cursor.readBuffer(20)
};
return header;
export interface StacksMessageBlocks {
messageTypeId: StacksMessageTypeID.Blocks;
blocks: Block[];
}
export async function readBlocks(stream: BinaryReader): Promise<Block[]> {
const blockCount = await stream.readUInt32BE();
const blocks = new Array<Block>(blockCount);
for (let i = 0; i < blockCount; i++) {
const blockHeader = await readBlockHeader(stream);
const txs = await readTransactions(stream);
const block: Block = {
header: blockHeader,
transactions: txs,
};
blocks[i] = block;
}
return blocks;
}

View File

@@ -1,34 +1,20 @@
import * as net from 'net';
import { Readable } from 'stream';
import { readUInt32BE } from './binaryReader';
import { readBlockHeader } from './blockReader';
import { readTransactions } from './txReader';
import { BinaryReader } from './binaryReader';
import { readMessages, StacksMessageTypeID } from './stacks-p2p';
async function readBlocks(stream: Readable) {
try {
do {
const eventType = await readUInt32BE(stream);
if (eventType !== 1) {
throw new Error(`Expected event type 1 (block) but received ${eventType}`);
}
const blockHeader = await readBlockHeader(stream);
console.log(blockHeader);
const txs = await readTransactions(stream);
console.log(txs);
console.log(Date.now());
} while (!stream.destroyed)
} catch (error) {
console.error(error);
process.exit(1);
async function readSocket(socket: net.Socket): Promise<void> {
const binaryReader = new BinaryReader(socket);
for await (const message of readMessages(binaryReader)) {
if (message.messageTypeId === StacksMessageTypeID.Blocks) {
console.log(`${Date.now()} Received Stacks message type: StacksMessageID.Blocks`);
}
}
}
const server = net.createServer((c) => {
// 'connection' listener.
console.log('client connected');
// processLineByLine(c);
readBlocks(c);
readSocket(c);
c.on('end', () => {
console.log('client disconnected');
});

39
src/stacks-p2p.ts Normal file
View File

@@ -0,0 +1,39 @@
import { BinaryReader } from "./binaryReader";
import { readBlocks, Block, StacksMessageBlocks } from "./blockReader";
export enum StacksMessageTypeID {
Handshake = 0,
HandshakeAccept = 1,
HandshakeReject = 2,
GetNeighbors = 3,
Neighbors = 4,
GetBlocksInv = 5,
BlocksInv = 6,
GetBlocks = 7,
Blocks = 8,
GetMicroblocks = 9,
Microblocks = 10,
Transaction = 11,
Nack = 12,
Ping = 13,
Pong = 14,
Reserved = 255
}
type StacksMessage = StacksMessageBlocks;
export async function* readMessages(stream: BinaryReader): AsyncGenerator<StacksMessage> {
while (!stream.stream.destroyed) {
const messageTypeId: StacksMessageTypeID = await stream.readUInt8();
if (messageTypeId === StacksMessageTypeID.Blocks) {
const blocks = await readBlocks(stream);
const msg: StacksMessageBlocks = {
messageTypeId: StacksMessageTypeID.Blocks,
blocks: blocks,
};
yield msg;
} else {
throw new Error(`Not implemented - StacksMessageID ${messageTypeId}`);
}
}
}

View File

@@ -1,5 +1,5 @@
import { Readable } from 'stream';
import { BufferReader, readUInt32BE, readByte, readBuffer } from './binaryReader';
import { BinaryReader } from './binaryReader';
const enum SingleSigHashMode {
P2PKH = 0x00,
@@ -206,7 +206,7 @@ type TransactionPayload =
| TransactionPayloadSmartContract
| TransactionPayloadPoisonMicroblock;
interface Transaction {
export interface Transaction {
version: TransactionVersion; // u8
chainId: number; // u32
auth: TransactionAuthStandard | TransactionAuthSponsored;
@@ -216,13 +216,13 @@ interface Transaction {
payload: TransactionPayload;
}
export async function readTransactions(stream: Readable): Promise<Transaction[]> {
const txCount = await readUInt32BE(stream);
export async function readTransactions(stream: BinaryReader): Promise<Transaction[]> {
const txCount = await stream.readUInt32BE();
const txs = new Array<Transaction>(txCount);
for (let i = 0; i < txCount; i++) {
const version = await readByte(stream);
const chainId = await readUInt32BE(stream);
const authType: TransactionAuthType = await readByte(stream);
const version = await stream.readUInt8();
const chainId = await stream.readUInt32BE();
const authType: TransactionAuthType = await stream.readUInt8();
let auth: TransactionAuthStandard | TransactionAuthSponsored;
if (authType === TransactionAuthType.Standard) {
@@ -245,13 +245,13 @@ export async function readTransactions(stream: Readable): Promise<Transaction[]>
throw new Error(`Unexpected tx auth type: ${authType}`);
}
const anchorMode: TransactionPostConditionMode = await readByte(stream);
const anchorMode: TransactionPostConditionMode = await stream.readUInt8();
if (anchorMode !== TransactionPostConditionMode.Allow
&& anchorMode !== TransactionPostConditionMode.Deny) {
throw new Error(`Unexpected tx post condition anchor mode: ${anchorMode}`);
}
const postConditionMode: TransactionPostConditionMode = await readByte(stream);
const postConditionMode: TransactionPostConditionMode = await stream.readUInt8();
if (postConditionMode !== TransactionPostConditionMode.Allow
&& postConditionMode !== TransactionPostConditionMode.Deny) {
throw new Error(`Unexpected tx post condition mode: ${postConditionMode}`);
@@ -271,21 +271,20 @@ export async function readTransactions(stream: Readable): Promise<Transaction[]>
payload: txPayload,
};
txs[i] = tx;
console.log('here');
}
return txs;
}
async function readTransactionPayload(stream: Readable): Promise<TransactionPayload> {
const txPayloadType: TransactionPayloadID = await readByte(stream);
async function readTransactionPayload(stream: BinaryReader): Promise<TransactionPayload> {
const txPayloadType: TransactionPayloadID = await stream.readUInt8();
if (txPayloadType === TransactionPayloadID.Coinbase) {
const payload: TransactionPayloadCoinbase = {
typeId: txPayloadType,
payload: await readBuffer(stream, 32),
payload: await stream.readBuffer(32),
};
return payload;
} else if (txPayloadType === TransactionPayloadID.TokenTransfer) {
const cursor = await BufferReader.fromStream(stream, 63);
const cursor = await stream.sync(63);
const payload: TransactionPayloadTokenTransfer = {
typeId: txPayloadType,
address: {
@@ -307,14 +306,14 @@ async function readTransactionPayload(stream: Readable): Promise<TransactionPayl
}
}
async function readTransactionPostConditions(stream: Readable): Promise<TransactionPostCondition[]> {
const conditionCount = await readUInt32BE(stream);
async function readTransactionPostConditions(stream: BinaryReader): Promise<TransactionPostCondition[]> {
const conditionCount = await stream.readUInt32BE();
const conditions = new Array<TransactionPostCondition>(conditionCount);
for (let i = 0; i < conditionCount; i++) {
const typeId: AssetInfoID = await readByte(stream);
const typeId: AssetInfoID = await stream.readUInt8();
if (typeId === AssetInfoID.STX) {
const principal = await readTransactionPostConditionPrincipal(stream);
const cursor = await BufferReader.fromStream(stream, 9);
const cursor = await stream.sync(9);
const conditionCode: FungibleConditionCode = cursor.readUInt8();
const condition: TransactionPostConditionStx = {
assetInfoId: typeId,
@@ -334,15 +333,15 @@ async function readTransactionPostConditions(stream: Readable): Promise<Transact
return conditions;
}
async function readTransactionPostConditionPrincipal(stream: Readable): Promise<PostConditionPrincipal> {
const typeId: PostConditionPrincipalID = await readByte(stream);
async function readTransactionPostConditionPrincipal(stream: BinaryReader): Promise<PostConditionPrincipal> {
const typeId: PostConditionPrincipalID = await stream.readUInt8();
if (typeId === PostConditionPrincipalID.Origin) {
const principal: PostConditionPrincipalOrigin = {
typeId: typeId
};
return principal;
} else if (typeId === PostConditionPrincipalID.Standard) {
const cursor = await BufferReader.fromStream(stream, 21);
const cursor = await stream.sync(21);
const principal: PostConditionPrincipalStandard = {
typeId: typeId,
address: {
@@ -352,13 +351,13 @@ async function readTransactionPostConditionPrincipal(stream: Readable): Promise<
};
return principal;
} else if (typeId === PostConditionPrincipalID.Contract) {
const cursor = await BufferReader.fromStream(stream, 22);
const cursor = await stream.sync(22);
const address: StacksAddress = {
version: cursor.readUInt8(),
bytes: cursor.readBuffer(20),
}
const contractNameLen = cursor.readUInt8();
const contractNameBuff = await readBuffer(stream, contractNameLen);
const contractNameBuff = await stream.readBuffer(contractNameLen);
const principal: PostConditionPrincipalContract = {
typeId: typeId,
address: address,
@@ -370,10 +369,10 @@ async function readTransactionPostConditionPrincipal(stream: Readable): Promise<
}
}
async function readTransactionSpendingCondition(stream: Readable): Promise<TransactionSpendingCondition> {
const conditionType = await readByte(stream);
async function readTransactionSpendingCondition(stream: BinaryReader): Promise<TransactionSpendingCondition> {
const conditionType = await stream.readUInt8();
if (conditionType === SingleSigHashMode.P2PKH || conditionType === SingleSigHashMode.P2WPKH) {
const cursor = await BufferReader.fromStream(stream, 102);
const cursor = await stream.sync(102);
const condition: TransactionSpendingConditionSingleSig = {
hashMode: conditionType,
signer: cursor.readBuffer(20),
@@ -384,7 +383,7 @@ async function readTransactionSpendingCondition(stream: Readable): Promise<Trans
};
return condition;
} else if (conditionType === MultiSigHashMode.P2SH || conditionType === MultiSigHashMode.P2WSH) {
const cursor = await BufferReader.fromStream(stream, 40);
const cursor = await stream.sync(40);
const condition: TransactionSpendingConditionMultiSig = {
hashMode: conditionType,
signer: cursor.readBuffer(20),
@@ -393,19 +392,19 @@ async function readTransactionSpendingCondition(stream: Readable): Promise<Trans
authFields: new Array<TransactionAuthField>(cursor.readUInt32BE()),
};
for (let i = 0; i < condition.authFields.length; i++) {
const authType: TransactionAuthFieldID = await readByte(stream);
const authType: TransactionAuthFieldID = await stream.readUInt8();
if (authType === TransactionAuthFieldID.PublicKeyCompressed
|| authType === TransactionAuthFieldID.PublicKeyUncompressed) {
const authFieldPubkey: TransactionAuthFieldPublicKey = {
typeId: authType,
publicKey: await readBuffer(stream, 33),
publicKey: await stream.readBuffer(33),
};
condition.authFields[i] = authFieldPubkey;
} else if (authType === TransactionAuthFieldID.SignatureCompressed
|| authType === TransactionAuthFieldID.SignatureUncompressed) {
const authFieldSig: TransactionAuthFieldSignature = {
typeId: authType,
signature: await readBuffer(stream, 65),
signature: await stream.readBuffer(65),
}
condition.authFields[i] = authFieldSig;
} else {