[price-pusher] add injective price listener (#627)

* add injective price listener

* minor change

* description fix

* add fixme comments
This commit is contained in:
Dev Kalra
2023-02-25 01:51:07 +05:30
committed by GitHub
parent eecebe86cc
commit fd0257d7d9
4 changed files with 3629 additions and 293 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -43,6 +43,7 @@
"typescript": "^4.6.3"
},
"dependencies": {
"@injectivelabs/sdk-ts": "^1.0.457",
"@pythnetwork/pyth-common-js": "^1.2.0",
"@pythnetwork/pyth-evm-js": "^1.1.0",
"@pythnetwork/pyth-sdk-solidity": "^2.2.0",

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env node
// FIXME: refactor this file and command structure
// FIXME: update readme and compose files
// FIXME: release a new version
import yargs from "yargs";
import { hideBin } from "yargs/helpers";
import {
@@ -11,13 +13,21 @@ import { EvmPriceListener, EvmPricePusher, PythContractFactory } from "./evm";
import { PythPriceListener } from "./pyth-price-listener";
import fs from "fs";
import { readPriceConfigFile } from "./price-config";
import { PriceServiceConnection } from "@pythnetwork/pyth-common-js";
import { InjectivePriceListener, InjectivePricePusher } from "./injective";
const argv = yargs(hideBin(process.argv))
.option("evm-endpoint", {
.option("network", {
description: "the blockchain network to push to",
type: "string",
choices: ["evm", "injective"],
required: true,
})
.option("endpoint", {
description:
"RPC endpoint URL for the EVM network. If you provide a normal HTTP endpoint, the pusher " +
"RPC endpoint URL for the network. If you provide a normal HTTP endpoint, the pusher " +
"will periodically poll for updates. The polling interval is configurable via the " +
"`evm-polling-frequency` command-line argument. If you provide a websocket RPC " +
"`polling-frequency` command-line argument. for the evm chains, if you provide a websocket RPC " +
"endpoint (`ws[s]://...`), the price pusher will use event subscriptions to read " +
"the current EVM price in addition to polling. ",
type: "string",
@@ -55,9 +65,9 @@ const argv = yargs(hideBin(process.argv))
required: false,
default: 10,
})
.option("evm-polling-frequency", {
.option("polling-frequency", {
description:
"The frequency to poll price info data from the EVM network if the RPC is not a websocket.",
"The frequency to poll price info data from the network if the RPC is not a websocket.",
type: "number",
required: false,
default: 5,
@@ -79,13 +89,45 @@ if (CONTRACT_ADDR[argv.pythContract] !== undefined) {
const priceConfigs = readPriceConfigFile(argv.priceConfigFile);
async function run() {
async function injectiveRun() {
const connection = new PriceServiceConnection(argv.priceEndpoint, {
logger: console,
});
const pythPriceListener = new PythPriceListener(connection, priceConfigs);
const injectivePriceListener = new InjectivePriceListener(
pythContractAddr,
argv.endpoint,
priceConfigs,
{ pollingFrequency: argv.pollingFrequency }
);
const handler = new Controller(
priceConfigs,
pythPriceListener,
injectivePriceListener,
new InjectivePricePusher(),
{
cooldownDuration: argv.cooldownDuration,
}
);
await injectivePriceListener.start();
await pythPriceListener.start();
// Handler starts after the above listeners are started
// which means that they have fetched their initial price information.
await handler.start();
}
async function evmRun() {
const connection = new EvmPriceServiceConnection(argv.priceEndpoint, {
logger: console,
});
const pythContractFactory = new PythContractFactory(
argv.evmEndpoint,
argv.endpoint,
fs.readFileSync(argv.mnemonicFile, "utf-8").trim(),
pythContractAddr
);
@@ -94,7 +136,7 @@ async function run() {
pythContractFactory,
priceConfigs,
{
pollingFrequency: argv.evmPollingFrequency,
pollingFrequency: argv.pollingFrequency,
}
);
@@ -123,4 +165,9 @@ async function run() {
await handler.start();
}
function run() {
if (argv.network === "injective") injectiveRun();
else if (argv.network === "evm") evmRun();
}
run();

View File

@@ -0,0 +1,112 @@
import { HexString } from "@pythnetwork/pyth-common-js";
import { ChainPricePusher, PriceInfo, PriceListener } from "./interface";
import { DurationInSeconds } from "./utils";
import { PriceConfig } from "./price-config";
import { ChainGrpcWasmApi } from "@injectivelabs/sdk-ts";
type PriceQueryResponse = {
price_feed: {
id: string;
price: {
price: string;
conf: string;
expo: number;
publish_time: number;
};
};
};
// this use price without leading 0x
// FIXME: implement common methods in the parent class
export class InjectivePriceListener implements PriceListener {
private latestPriceInfo: Map<HexString, PriceInfo>;
private priceIds: HexString[];
private pollingFrequency: DurationInSeconds;
constructor(
private contractAddress: string,
private grpcEndpoint: string,
priceConfigs: PriceConfig[],
config: {
pollingFrequency: DurationInSeconds;
}
) {
this.latestPriceInfo = new Map();
this.priceIds = priceConfigs.map((priceConfig) => priceConfig.id);
this.pollingFrequency = config.pollingFrequency;
}
async start() {
console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);
await this.pollPrices();
}
private async pollPrices() {
console.log("Polling injective prices...");
for (const priceId of this.priceIds) {
const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
if (currentPriceInfo !== undefined) {
this.updateLatestPriceInfo(priceId, currentPriceInfo);
}
}
}
async getOnChainPriceInfo(
priceId: HexString
): Promise<PriceInfo | undefined> {
let priceQueryResponse: PriceQueryResponse;
try {
const api = new ChainGrpcWasmApi(this.grpcEndpoint);
const { data } = await api.fetchSmartContractState(
this.contractAddress,
Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString("base64")
);
const json = Buffer.from(data as string, "base64").toString();
priceQueryResponse = JSON.parse(json);
} catch (e) {
console.error(`Getting on-chain price for ${priceId} failed. Error:`);
console.error(e);
return undefined;
}
return {
conf: priceQueryResponse.price_feed.price.conf,
price: priceQueryResponse.price_feed.price.price,
publishTime: priceQueryResponse.price_feed.price.publish_time,
};
}
private updateLatestPriceInfo(priceId: HexString, observedPrice: PriceInfo) {
const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);
// Ignore the observed price if the cache already has newer
// price. This could happen because we are using polling and
// subscription at the same time.
if (
cachedLatestPriceInfo !== undefined &&
cachedLatestPriceInfo.publishTime > observedPrice.publishTime
) {
return;
}
this.latestPriceInfo.set(priceId, observedPrice);
}
getLatestPriceInfo(priceId: string): PriceInfo | undefined {
return this.latestPriceInfo.get(priceId);
}
}
export class InjectivePricePusher implements ChainPricePusher {
async updatePriceFeed(
priceIds: string[],
pubTimesToPush: number[]
): Promise<void> {
console.log("dummy pushed");
}
}