feat: add listener callback to socket-io client subscription functions (#1799)

* feat: add listener callback to socket-io client subscription functions

* chore: fix test

* test: add unit tests for socket-io-client

* docs: update socket-io client docs

* chore: lint fix in client lib src

* test: fix flaky pox test
This commit is contained in:
Matthew Little
2024-01-05 16:41:32 +01:00
committed by GitHub
parent 5050763e3f
commit 5634522132
11 changed files with 2721 additions and 2127 deletions

1
client/.prettierignore Normal file
View File

@@ -0,0 +1 @@
src/generated/*

View File

@@ -51,12 +51,13 @@ import * as stacks from '@stacks/blockchain-api-client';
// for testnet, replace with https://api.testnet.hiro.so/ // for testnet, replace with https://api.testnet.hiro.so/
const socketUrl = "https://api.mainnet.hiro.so/"; const socketUrl = "https://api.mainnet.hiro.so/";
const socket = io(socketUrl, { const socket = io(socketUrl);
transports: [ "websocket" ]
});
const sc = new stacks.StacksApiSocketClient(socket); const sc = new stacks.StacksApiSocketClient(socket);
sc.subscribeAddressTransactions('ST3GQB6WGCWKDNFNPSQRV8DY93JN06XPZ2ZE9EVMA'); sc.subscribeAddressTransactions('ST3GQB6WGCWKDNFNPSQRV8DY93JN06XPZ2ZE9EVMA', (address, tx) => {
console.log('address:', address);
console.log('tx:', tx);
});
``` ```
## Available Updates ## Available Updates
@@ -108,7 +109,7 @@ client.subscribeBlocks(event => {});
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeBlocks(); sc.subscribeBlocks(block => {});
``` ```
### Microblock Updates ### Microblock Updates
@@ -149,7 +150,7 @@ client.subscribeMicroblocks(event => {});
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeMicroblocks(); sc.subscribeMicroblocks(microblock => {});
``` ```
### Mempool Updates ### Mempool Updates
@@ -210,7 +211,7 @@ client.subscribeMempool(event => {});
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeMempool(); sc.subscribeMempool(mempoolTx => {});
``` ```
### Transaction Updates ### Transaction Updates
@@ -277,7 +278,7 @@ client.subscribeTxUpdates('0xd78988664aaa9a1b751cd58c55b253914f790e95ca6f3d402a8
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeTransaction('0xd78988664aaa9a1b751cd58c55b253914f790e95ca6f3d402a866559e1cbe0b3'); sc.subscribeTransaction('0xd78988664aaa9a1b751cd58c55b253914f790e95ca6f3d402a866559e1cbe0b3', tx => {});
``` ```
### Address Transaction Updates ### Address Transaction Updates
@@ -366,7 +367,7 @@ client.subscribeAddressTransactions('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q',
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeAddressTransactions('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q'); sc.subscribeAddressTransactions('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q', (address, tx) => {});
``` ```
### Address Balance Updates ### Address Balance Updates
@@ -412,7 +413,7 @@ client.subscribeAddressBalanceUpdates('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeAddressStxBalance('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q'); sc.subscribeAddressStxBalance('SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q', (addr, balance) => {});
``` ```
### NFT event updates ### NFT event updates
@@ -451,13 +452,15 @@ client.subscribeNftCollectionEventUpdates(
``` ```
Subscribe via Socket.io: Subscribe via Socket.io:
```js ```js
sc.subscribeNftEventUpdates(); sc.subscribeNftEvent(nftEvent => {});
sc.subscribeNftAssetEventUpdates( sc.subscribeNftAssetEvent(
'SP176ZMV706NZGDDX8VSQRGMB7QN33BBDVZ6BMNHD.project-indigo-act1::Project-Indigo-Act1', 'SP176ZMV706NZGDDX8VSQRGMB7QN33BBDVZ6BMNHD.project-indigo-act1::Project-Indigo-Act1',
'0x0100000000000000000000000000000095', '0x0100000000000000000000000000000095',
(assetId, value, nftEvent) => {}
); );
sc.subscribeNftCollectionEventUpdates( sc.subscribeNftCollectionEvent(
'SP176ZMV706NZGDDX8VSQRGMB7QN33BBDVZ6BMNHD.project-indigo-act1::Project-Indigo-Act1', 'SP176ZMV706NZGDDX8VSQRGMB7QN33BBDVZ6BMNHD.project-indigo-act1::Project-Indigo-Act1',
(assetId, nftEvent) => {}
); );
``` ```

4495
client/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,8 +24,9 @@
"build:browser:watch": "npm run build:browser -- watch", "build:browser:watch": "npm run build:browser -- watch",
"start": "concurrently npm:build:browser:watch npm:open", "start": "concurrently npm:build:browser:watch npm:open",
"test": "ts-node test/test.ts", "test": "ts-node test/test.ts",
"lint": "eslint . --ext .ts -f codeframe", "lint": "eslint . --ext .ts -f unix",
"lint:prettier": "prettier --check ./src/**/*.ts", "lint:prettier": "prettier --check ./src/**/*.ts",
"lint:fix": "eslint . --ext .js,.jsx,.ts,.tsx -f unix --fix && prettier --write --check src/**/*.ts",
"open": "http-server -o 9222 -o index.html", "open": "http-server -o 9222 -o index.html",
"prep-openapi": "rimraf ./.tmp && rimraf ./src/generated && swagger-cli bundle --dereference -o ./.tmp/openapi-temp.json ../docs/openapi.yaml && shx sed -i '^.*\\$schema.*$' '' ./.tmp/openapi-temp.json > ./.tmp/openapi.json", "prep-openapi": "rimraf ./.tmp && rimraf ./src/generated && swagger-cli bundle --dereference -o ./.tmp/openapi-temp.json ../docs/openapi.yaml && shx sed -i '^.*\\$schema.*$' '' ./.tmp/openapi-temp.json > ./.tmp/openapi.json",
"generate-openapi": "npm run prep-openapi && openapi-generator-cli generate --skip-validate-spec -g typescript-fetch --additional-properties=withInterfaces=true,typescriptThreePlus=true,supportsES6=true,legacyDiscriminatorBehavior=false,enumPropertyNaming=original,modelPropertyNaming=original -i ./.tmp/openapi.json -o ./src/generated > ./.tmp/gen.log", "generate-openapi": "npm run prep-openapi && openapi-generator-cli generate --skip-validate-spec -g typescript-fetch --additional-properties=withInterfaces=true,typescriptThreePlus=true,supportsES6=true,legacyDiscriminatorBehavior=false,enumPropertyNaming=original,modelPropertyNaming=original -i ./.tmp/openapi.json -o ./src/generated > ./.tmp/gen.log",
@@ -43,27 +44,29 @@
"cross-fetch": "3.1.5", "cross-fetch": "3.1.5",
"eventemitter3": "4.0.7", "eventemitter3": "4.0.7",
"jsonrpc-lite": "2.2.0", "jsonrpc-lite": "2.2.0",
"socket.io-client": "4.6.1", "socket.io-client": "4.7.3",
"ws": "7.5.6" "ws": "8.16.0"
}, },
"devDependencies": { "devDependencies": {
"@apidevtools/swagger-cli": "4.0.4", "@apidevtools/swagger-cli": "4.0.4",
"@openapitools/openapi-generator-cli": "2.4.21", "@openapitools/openapi-generator-cli": "2.4.21",
"@stacks/eslint-config": "1.2.0", "@stacks/eslint-config": "2.0.0",
"@stacks/prettier-config": "0.0.7", "@stacks/prettier-config": "0.0.10",
"@typescript-eslint/eslint-plugin": "4.33.0", "@types/node": "20.10.6",
"@typescript-eslint/parser": "4.33.0", "@typescript-eslint/eslint-plugin": "6.17.0",
"@typescript-eslint/parser": "6.17.0",
"concurrently": "7.6.0", "concurrently": "7.6.0",
"eslint": "7.32.0", "eslint": "8.56.0",
"eslint-config-prettier": "8.3.0", "eslint-config-prettier": "9.1.0",
"eslint-plugin-prettier": "3.4.1", "eslint-plugin-prettier": "5.1.2",
"eslint-plugin-unused-imports": "3.0.0",
"http-server": "14.0.0", "http-server": "14.0.0",
"microbundle": "0.13.3", "microbundle": "0.13.3",
"prettier": "2.8.6", "prettier": "3.1.1",
"rimraf": "5.0.0", "rimraf": "5.0.0",
"shx": "0.3.3", "shx": "0.3.3",
"ts-node": "9.1.1", "ts-node": "10.9.2",
"typedoc": "0.23.10", "typedoc": "0.25.6",
"typescript": "4.6.2" "typescript": "5.3.3"
} }
} }

View File

@@ -1,9 +1,16 @@
import { io } from 'socket.io-client'; import { io, Socket } from 'socket.io-client';
import type { Socket } from 'socket.io-client'; import type { ManagerOptions, SocketOptions } from 'socket.io-client';
import { import {
ClientToServerMessages, ClientToServerMessages,
Topic, Topic,
ServerToClientMessages, ServerToClientMessages,
MempoolTransaction,
Transaction,
Block,
Microblock,
AddressTransactionWithTransfers,
NftEvent,
AddressStxBalanceResponse,
} from '@stacks/stacks-blockchain-api-types'; } from '@stacks/stacks-blockchain-api-types';
import { BASE_PATH } from '../generated/runtime'; import { BASE_PATH } from '../generated/runtime';
@@ -23,34 +30,46 @@ function getWsUrl(url: string): URL {
return urlObj; return urlObj;
} }
export interface StacksApiSocketConnectionOptions { export type StacksApiSocketConnectionOptions = {
url?: string; url?: string;
/** Initial topics to subscribe to. */ /** Initial topics to subscribe to. */
subscriptions?: Topic[]; subscriptions?: Topic[];
socketOpts?: Partial<ManagerOptions & SocketOptions>;
};
function createStacksApiSocket(opts?: StacksApiSocketConnectionOptions) {
const socketOpts = {
...opts?.socketOpts,
query: {
...opts?.socketOpts?.query,
// Subscriptions can be specified on init using this handshake query param.
subscriptions: Array.from(new Set(opts?.subscriptions)).join(','),
},
};
const socket: StacksApiSocket = io(getWsUrl(opts?.url ?? BASE_PATH).href, socketOpts);
return socket;
} }
export class StacksApiSocketClient { export class StacksApiSocketClient {
readonly socket: StacksApiSocket; readonly socket: StacksApiSocket;
constructor(socket: StacksApiSocket) { constructor(socket: StacksApiSocket);
this.socket = socket; constructor(opts?: StacksApiSocketConnectionOptions);
constructor(args?: StacksApiSocket | StacksApiSocketConnectionOptions) {
if (args instanceof Socket) {
this.socket = args;
} else {
this.socket = createStacksApiSocket(args);
}
} }
public static connect({ public static connect(opts?: StacksApiSocketConnectionOptions) {
url = BASE_PATH, return new StacksApiSocketClient(opts);
subscriptions = [],
}: StacksApiSocketConnectionOptions = {}) {
const socket: StacksApiSocket = io(getWsUrl(url).href, {
query: {
// Subscriptions can be specified on init using this handshake query param.
subscriptions: Array.from(new Set(subscriptions)).join(','),
},
});
return new StacksApiSocketClient(socket);
} }
handleSubscription(topic: Topic, subscribe = false) { handleSubscription(topic: Topic, subscribe = false, listener?: (...args: any[]) => void) {
const subscriptions = new Set(this.socket.io.opts.query?.subscriptions.split(',') ?? []); const subsQuery = this.socket.io.opts.query?.subscriptions as string | undefined;
const subscriptions = new Set(subsQuery?.split(',') ?? []);
if (subscribe) { if (subscribe) {
this.socket.emit('subscribe', topic, error => { this.socket.emit('subscribe', topic, error => {
if (error) console.error(`Error subscribing: ${error}`); if (error) console.error(`Error subscribing: ${error}`);
@@ -67,81 +86,106 @@ export class StacksApiSocketClient {
this.socket.io.opts.query.subscriptions = Array.from(subscriptions).join(','); this.socket.io.opts.query.subscriptions = Array.from(subscriptions).join(',');
return { return {
unsubscribe: () => { unsubscribe: () => {
if (listener) {
this.socket.off(topic, listener);
}
this.handleSubscription(topic, false); this.handleSubscription(topic, false);
}, },
}; };
} }
subscribeBlocks() { subscribeBlocks(listener?: (tx: Block) => void) {
return this.handleSubscription('block', true); if (listener) this.socket.on('block', listener);
return this.handleSubscription('block', true, listener);
} }
unsubscribeBlocks() { unsubscribeBlocks() {
this.handleSubscription('block', false); this.handleSubscription('block', false);
} }
subscribeMicroblocks() { subscribeMicroblocks(listener?: (tx: Microblock) => void) {
return this.handleSubscription('microblock', true); if (listener) this.socket.on('microblock', listener);
return this.handleSubscription('microblock', true, listener);
} }
unsubscribeMicroblocks() { unsubscribeMicroblocks() {
this.handleSubscription('microblock', false); this.handleSubscription('microblock', false);
} }
subscribeMempool() { subscribeMempool(listener?: (tx: MempoolTransaction) => void) {
return this.handleSubscription('mempool', true); if (listener) this.socket.on('mempool', listener);
return this.handleSubscription('mempool', true, listener);
} }
unsubscribeMempool() { unsubscribeMempool() {
this.handleSubscription('mempool', false); this.handleSubscription('mempool', false);
} }
subscribeAddressTransactions(address: string) { subscribeAddressTransactions(
return this.handleSubscription(`address-transaction:${address}` as const, true); address: string,
listener?: (address: string, tx: AddressTransactionWithTransfers) => void
) {
if (listener) this.socket.on(`address-transaction:${address}`, listener);
return this.handleSubscription(`address-transaction:${address}`, true, listener);
} }
unsubscribeAddressTransactions(address: string) { unsubscribeAddressTransactions(address: string) {
this.handleSubscription(`address-transaction:${address}` as const, false); this.handleSubscription(`address-transaction:${address}`, false);
} }
subscribeAddressStxBalance(address: string) { subscribeAddressStxBalance(
return this.handleSubscription(`address-stx-balance:${address}` as const, true); address: string,
listener?: (address: string, stxBalance: AddressStxBalanceResponse) => void
) {
if (listener) this.socket.on(`address-stx-balance:${address}`, listener);
return this.handleSubscription(`address-stx-balance:${address}`, true, listener);
} }
unsubscribeAddressStxBalance(address: string) { unsubscribeAddressStxBalance(address: string) {
this.handleSubscription(`address-stx-balance:${address}` as const, false); this.handleSubscription(`address-stx-balance:${address}`, false);
} }
subscribeTransaction(txId: string) { subscribeTransaction(txId: string, listener?: (tx: MempoolTransaction | Transaction) => void) {
return this.handleSubscription(`transaction:${txId}` as const, true); if (listener) this.socket.on(`transaction:${txId}`, listener);
return this.handleSubscription(`transaction:${txId}`, true, listener);
} }
unsubscribeTransaction(txId: string) { unsubscribeTransaction(txId: string) {
this.handleSubscription(`transaction:${txId}` as const, false); this.handleSubscription(`transaction:${txId}`, false);
} }
subscribeNftEvent() { subscribeNftEvent(listener?: (event: NftEvent) => void) {
return this.handleSubscription('nft-event', true); if (listener) this.socket.on('nft-event', listener);
return this.handleSubscription('nft-event', true, listener);
} }
unsubscribeNftEvent() { unsubscribeNftEvent() {
this.handleSubscription('nft-event', false); this.handleSubscription('nft-event', false);
} }
subscribeNftAssetEvent(assetIdentifier: string, value: string) { subscribeNftAssetEvent(
return this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}` as const, true); assetIdentifier: string,
value: string,
listener?: (assetIdentifier: string, value: string, event: NftEvent) => void
) {
if (listener) this.socket.on(`nft-asset-event:${assetIdentifier}+${value}`, listener);
return this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}`, true, listener);
} }
unsubscribeNftAssetEvent(assetIdentifier: string, value: string) { unsubscribeNftAssetEvent(assetIdentifier: string, value: string) {
this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}` as const, false); this.handleSubscription(`nft-asset-event:${assetIdentifier}+${value}`, false);
} }
subscribeNftCollectionEvent(assetIdentifier: string) { subscribeNftCollectionEvent(
return this.handleSubscription(`nft-collection-event:${assetIdentifier}` as const, true); assetIdentifier: string,
listener?: (assetIdentifier: string, event: NftEvent) => void
) {
if (listener) this.socket.on(`nft-collection-event:${assetIdentifier}`, listener);
return this.handleSubscription(`nft-collection-event:${assetIdentifier}`, true, listener);
} }
unsubscribeNftCollectionEvent(assetIdentifier: string) { unsubscribeNftCollectionEvent(assetIdentifier: string) {
this.handleSubscription(`nft-collection-event:${assetIdentifier}` as const, false); this.handleSubscription(`nft-collection-event:${assetIdentifier}`, false);
} }
logEvents() { logEvents() {
@@ -151,18 +195,6 @@ export class StacksApiSocketClient {
this.socket.on('block', block => console.log('block', block)); this.socket.on('block', block => console.log('block', block));
this.socket.on('microblock', microblock => console.log('microblock', microblock)); this.socket.on('microblock', microblock => console.log('microblock', microblock));
this.socket.on('mempool', tx => console.log('mempool', tx)); this.socket.on('mempool', tx => console.log('mempool', tx));
this.socket.on('address-transaction', (address, data) =>
console.log('address-transaction', address, data)
);
this.socket.on('address-stx-balance', (address, data) =>
console.log('address-stx-balance', address, data)
);
this.socket.on('nft-event', event => console.log('nft-event', event)); this.socket.on('nft-event', event => console.log('nft-event', event));
this.socket.on('nft-asset-event', (assetIdentifier, value, event) =>
console.log('nft-asset-event', assetIdentifier, value, event)
);
this.socket.on('nft-collection-event', (assetIdentifier, event) =>
console.log('nft-collection-event', assetIdentifier, event)
);
} }
} }

View File

@@ -74,7 +74,7 @@ export class StacksApiWebSocketClient {
constructor(webSocket: IWebSocket) { constructor(webSocket: IWebSocket) {
this.webSocket = webSocket; this.webSocket = webSocket;
(webSocket as WebSocket).addEventListener('message', event => { (webSocket as WebSocket).addEventListener('message', event => {
const parsed = JsonRpcLite.parse(event.data); const parsed = JsonRpcLite.parse(event.data as string);
const rpcObjects = Array.isArray(parsed) ? parsed : [parsed]; const rpcObjects = Array.isArray(parsed) ? parsed : [parsed];
rpcObjects.forEach(obj => { rpcObjects.forEach(obj => {
if (obj.type === JsonRpcLite.RpcStatusType.notification) { if (obj.type === JsonRpcLite.RpcStatusType.notification) {
@@ -100,11 +100,10 @@ export class StacksApiWebSocketClient {
const method = data.method as RpcSubscriptionType; const method = data.method as RpcSubscriptionType;
switch (method) { switch (method) {
case 'tx_update': case 'tx_update':
this.eventEmitter.emit('txUpdate', data.params as (Transaction | MempoolTransaction)); this.eventEmitter.emit('txUpdate', data.params as Transaction | MempoolTransaction);
break; break;
case 'address_tx_update': case 'address_tx_update':
this.eventEmitter.emit('addressTxUpdate', this.eventEmitter.emit('addressTxUpdate', data.params as RpcAddressTxNotificationParams);
data.params as RpcAddressTxNotificationParams);
break; break;
case 'address_balance_update': case 'address_balance_update':
this.eventEmitter.emit( this.eventEmitter.emit(
@@ -134,7 +133,7 @@ export class StacksApiWebSocketClient {
} }
private rpcCall<TResult = void>(method: string, params: any): Promise<TResult> { private rpcCall<TResult = void>(method: string, params: any): Promise<TResult> {
const rpcReq = JsonRpcLite.request(++this.idCursor, method, params); const rpcReq = JsonRpcLite.request(++this.idCursor, method, params as JsonRpcLite.RpcParams);
return new Promise<TResult>((resolve, reject) => { return new Promise<TResult>((resolve, reject) => {
this.pendingRequests.set(rpcReq.id, { resolve, reject }); this.pendingRequests.set(rpcReq.id, { resolve, reject });
this.webSocket.send(rpcReq.serialize()); this.webSocket.send(rpcReq.serialize());
@@ -276,9 +275,15 @@ export class StacksApiWebSocketClient {
asset_identifier: assetIdentifier, asset_identifier: assetIdentifier,
value, value,
}; };
const subscribed = await this.rpcCall<{ asset_identifier: string, value: string }>('subscribe', params); const subscribed = await this.rpcCall<{ asset_identifier: string; value: string }>(
'subscribe',
params
);
const listener = (event: NftEvent) => { const listener = (event: NftEvent) => {
if (event.asset_identifier === subscribed.asset_identifier && event.value.hex === subscribed.value) { if (
event.asset_identifier === subscribed.asset_identifier &&
event.value.hex === subscribed.value
) {
update(event); update(event);
} }
}; };

View File

@@ -1,6 +1,6 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "es2017", "target": "es2020",
"module": "commonjs", "module": "commonjs",
"moduleResolution": "node", "moduleResolution": "node",
"declaration": true, "declaration": true,

View File

@@ -37,25 +37,10 @@ export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
transaction: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: 'nft-event']: (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; 'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; [key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; [key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; [key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void; [key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
} }

View File

@@ -2,8 +2,11 @@ import {
AddressStxBalanceTopic, AddressStxBalanceTopic,
AddressTransactionTopic, AddressTransactionTopic,
ClientToServerMessages, ClientToServerMessages,
NftAssetEventTopic,
NftCollectionEventTopic,
ServerToClientMessages, ServerToClientMessages,
Topic, Topic,
TransactionTopic,
} from 'docs/socket-io'; } from 'docs/socket-io';
import * as http from 'http'; import * as http from 'http';
import { Server as SocketIOServer } from 'socket.io'; import { Server as SocketIOServer } from 'socket.io';
@@ -63,8 +66,8 @@ export class SocketIOChannel extends WebSocketChannel {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(',')); const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
for (const topic of topics) { for (const topic of topics) {
this.prometheus?.subscribe(socket, topic); this.prometheus?.subscribe(socket, topic);
await socket.join(topic);
} }
await socket.join(topics);
} }
socket.on('disconnect', reason => { socket.on('disconnect', reason => {
logger.debug(`disconnected ${socket.id}: ${reason}`, component); logger.debug(`disconnected ${socket.id}: ${reason}`, component);
@@ -170,15 +173,7 @@ export class SocketIOChannel extends WebSocketChannel {
if (!this.io) { if (!this.io) {
return; return;
} }
const sockets = []; return await this.io.to(room).fetchSockets();
const socketIds = await this.io.to(room).allSockets();
for (const id of socketIds) {
const socket = this.io.sockets.sockets.get(id);
if (socket) {
sockets.push(socket);
}
}
return sockets;
} }
send<P extends keyof WebSocketPayload>( send<P extends keyof WebSocketPayload>(
@@ -225,9 +220,10 @@ export class SocketIOChannel extends WebSocketChannel {
case 'transaction': { case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>; const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction'); this.prometheus?.sendEvent('transaction');
void this.getTopicSockets(`transaction:${tx.tx_id}`).then(sockets => const topic: TransactionTopic = `transaction:${tx.tx_id}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => sockets?.forEach(socket =>
socket.timeout(timeout).emit('transaction', tx, _ => socket.disconnect(true)) socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
) )
); );
break; break;
@@ -247,11 +243,12 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftAssetEvent'] WebSocketPayload['nftAssetEvent']
>; >;
this.prometheus?.sendEvent('nft-asset-event'); this.prometheus?.sendEvent('nft-asset-event');
void this.getTopicSockets(`nft-event`).then(sockets => const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => sockets?.forEach(socket =>
socket socket
.timeout(timeout) .timeout(timeout)
.emit('nft-asset-event', assetIdentifier, value, event, _ => socket.disconnect(true)) .emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
) )
); );
break; break;
@@ -261,11 +258,12 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftCollectionEvent'] WebSocketPayload['nftCollectionEvent']
>; >;
this.prometheus?.sendEvent('nft-collection-event'); this.prometheus?.sendEvent('nft-collection-event');
void this.getTopicSockets(`nft-event`).then(sockets => const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => sockets?.forEach(socket =>
socket socket
.timeout(timeout) .timeout(timeout)
.emit('nft-collection-event', assetIdentifier, event, _ => socket.disconnect(true)) .emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
) )
); );
break; break;
@@ -276,9 +274,6 @@ export class SocketIOChannel extends WebSocketChannel {
this.prometheus?.sendEvent('address-transaction'); this.prometheus?.sendEvent('address-transaction');
void this.getTopicSockets(topic).then(sockets => void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => { sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-transaction', principal, tx, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true)); socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
}) })
); );
@@ -290,9 +285,6 @@ export class SocketIOChannel extends WebSocketChannel {
this.prometheus?.sendEvent('address-stx-balance'); this.prometheus?.sendEvent('address-stx-balance');
void this.getTopicSockets(topic).then(sockets => void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => { sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-stx-balance', principal, balance, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true)); socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
}) })
); );

View File

@@ -427,6 +427,7 @@ describe('PoX-3 - Delegate aggregation increase operations', () => {
pox1: { height: poxStatus1.current_burnchain_block_height, ...poxStatus1.next_cycle }, pox1: { height: poxStatus1.current_burnchain_block_height, ...poxStatus1.next_cycle },
pox2: { height: poxStatus2.current_burnchain_block_height, ...poxStatus2.next_cycle }, pox2: { height: poxStatus2.current_burnchain_block_height, ...poxStatus2.next_cycle },
}); });
await standByForPoxCycle();
}); });
test('Validate account balances are unlocked', async () => { test('Validate account balances are unlocked', async () => {

View File

@@ -19,6 +19,7 @@ import {
} from '../test-utils/test-builders'; } from '../test-utils/test-builders';
import { PgWriteStore } from '../datastore/pg-write-store'; import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations'; import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { StacksApiSocketClient } from '../../client/src/socket-io';
describe('socket-io', () => { describe('socket-io', () => {
let apiServer: ApiServer; let apiServer: ApiServer;
@@ -34,6 +35,78 @@ describe('socket-io', () => {
}); });
}); });
test('socket-io-client > block updates', async () => {
const client = new StacksApiSocketClient({
url: `http://${apiServer.address}`,
socketOpts: { reconnection: false },
});
const updateWaiter: Waiter<Block> = waiter();
const subResult = client.subscribeBlocks(block => updateWaiter.finish(block));
const block = new TestBlockBuilder({ block_hash: '0x1234', burn_block_hash: '0x5454' })
.addTx({ tx_id: '0x4321' })
.build();
await db.update(block);
const result = await updateWaiter;
try {
expect(result.hash).toEqual('0x1234');
expect(result.burn_block_hash).toEqual('0x5454');
expect(result.txs[0]).toEqual('0x4321');
} finally {
subResult.unsubscribe();
client.socket.close();
}
});
test('socket-io-client > tx updates', async () => {
const client = new StacksApiSocketClient({
url: `http://${apiServer.address}`,
socketOpts: { reconnection: false },
});
const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
const mempoolSub = client.subscribeMempool(tx => mempoolWaiter.finish(tx));
const txSub = client.subscribeTransaction('0x01', tx => {
if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx);
} else {
txWaiters[1].finish(tx);
}
});
const block = new TestBlockBuilder().addTx().build();
await db.update(block);
const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
const mempoolResult = await mempoolWaiter;
const txResult = await txWaiters[0];
const microblock = new TestMicroblockStreamBuilder()
.addMicroblock()
.addTx({ tx_id: '0x01' })
.build();
await db.updateMicroblocks(microblock);
const txMicroblockResult = await txWaiters[1];
try {
expect(mempoolResult.tx_status).toEqual('pending');
expect(mempoolResult.tx_id).toEqual('0x01');
expect(txResult.tx_status).toEqual('pending');
expect(txResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_status).toEqual('success');
} finally {
mempoolSub.unsubscribe();
txSub.unsubscribe();
client.socket.close();
}
});
test('socket-io > block updates', async () => { test('socket-io > block updates', async () => {
const address = apiServer.address; const address = apiServer.address;
const socket = io(`http://${address}`, { const socket = io(`http://${address}`, {
@@ -106,7 +179,7 @@ describe('socket-io', () => {
socket.on('mempool', tx => { socket.on('mempool', tx => {
mempoolWaiter.finish(tx); mempoolWaiter.finish(tx);
}); });
socket.on('transaction', tx => { socket.on('transaction:0x01', tx => {
if (tx.tx_status === 'pending') { if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx); txWaiters[0].finish(tx);
} else { } else {
@@ -293,12 +366,12 @@ describe('socket-io', () => {
socket.on(`nft-event`, event => { socket.on(`nft-event`, event => {
nftEventWaiters[event.event_index].finish(event); nftEventWaiters[event.event_index].finish(event);
}); });
socket.on(`nft-asset-event`, (assetIdentifier, value, event) => { socket.on(`nft-asset-event:${crashPunks}+${valueHex1}`, (assetIdentifier, value, event) => {
if (assetIdentifier == crashPunks && value == valueHex1) { if (assetIdentifier == crashPunks && value == valueHex1) {
crashPunksWaiter.finish(event); crashPunksWaiter.finish(event);
} }
}); });
socket.on(`nft-collection-event`, (assetIdentifier, event) => { socket.on(`nft-collection-event:${wastelandApes}`, (assetIdentifier, event) => {
if (assetIdentifier == wastelandApes) { if (assetIdentifier == wastelandApes) {
if (event.event_index == 2) { if (event.event_index == 2) {
apeWaiters[0].finish(event); apeWaiters[0].finish(event);