feat: add influxdb do redstone api

This commit is contained in:
Wojciech Rybakiewicz
2023-12-06 11:31:54 +01:00
committed by GitHub
7 changed files with 3450 additions and 1617 deletions

View File

@@ -19,7 +19,7 @@ jobs:
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-north-1
aws-region: eu-west-1
- name: Login to Amazon ECR
id: login-ecr

View File

@@ -14,6 +14,8 @@
},
"dependencies": {
"@amplitude/node": "^1.9.1",
"@redstone-finance/sdk": "^0.3.5",
"@redstone-finance/utils": "^0.3.5",
"@vendia/serverless-express": "^4.3.9",
"arweave": "^1.10.14",
"aws-sdk": "^2.931.0",
@@ -22,6 +24,7 @@
"body-parser": "^1.19.0",
"consola": "^2.15.3",
"cors": "^2.8.5",
"csv-file-to-json": "^4.0.5",
"deep-sort-object": "^1.0.2",
"errorhandler": "^1.5.1",
"express": "^4.18.1",
@@ -48,7 +51,7 @@
"ts-jest": "^28.0.3",
"ts-node": "^10.9.1",
"tsc": "^2.0.4",
"typescript": "^4.7.2",
"typescript": "^4.9.5",
"uuid-random": "^1.3.2"
}
}

View File

@@ -320,6 +320,48 @@ const providers = {
},
};
export const providerToDataServiceId = {
"redstone-rapid-demo": "redstone-rapid-demo",
"redstone-main-demo": "redstone-main-demo",
"redstone-stocks-demo": "redstone-stocks-demo",
"redstone-avalanche-demo": "redstone-avalanche-demo",
"redstone-twaps-demo": "redstone-twaps-demo",
"redstone-arbitrum-prod": "redstone-arbitrum-prod",
"redstone-custom-urls-demo": "redstone-custom-urls-demo",
"redstone-arbitrum-demo": "redstone-arbitrum-demo",
"redstone-avalanche-prod": "redstone-avalanche-prod",
"redstone-primary-demo": "redstone-primary-demo",
"redstone-primary-prod": "redstone-primary-prod",
"redstone-rapid": "redstone-rapid-demo",
redstone: "redstone-main-demo",
"redstone-stocks": "redstone-stocks-demo",
"redstone-avalanche": "redstone-avalanche-demo",
"redstone-avalanche-node-1": "redstone-avalanche-demo",
"redstone-avalanche-node-2": "redstone-avalanche-demo",
"redstone-twaps-1": "redstone-twaps-demo",
"redstone-custom-urls-1": "redstone-custom-urls-demo",
"redstone-custom-urls-2": "redstone-custom-urls-demo",
"redstone-arbitrum-dev-node-1": "redstone-arbitrum-demo",
"redstone-arbitrum-dev-node-2": "redstone-arbitrum-demo",
"redstone-avalanche-prod-node-1": "redstone-avalanche-prod",
"redstone-avalanche-prod-node-2": "redstone-avalanche-prod",
"redstone-avalanche-prod-node-3": "redstone-avalanche-prod",
"redstone-avalanche-prod-node-4": "redstone-avalanche-prod",
"redstone-avalanche-prod-node-5": "redstone-avalanche-prod",
"redstone-primary-demo-node-1": "redstone-primary-demo",
"redstone-primary-demo-node-2": "redstone-primary-demo",
"redstone-primary-prod-node-1": "redstone-primary-prod",
"redstone-primary-prod-node-2": "redstone-primary-prod",
"redstone-primary-prod-node-3": "redstone-primary-prod",
"redstone-primary-prod-node-4": "redstone-primary-prod",
"redstone-primary-prod-node-5": "redstone-primary-prod",
"redstone-arbitrum-prod-node-1": "redstone-arbitrum-prod",
"redstone-arbitrum-prod-node-2": "redstone-arbitrum-prod",
"redstone-arbitrum-prod-node-3": "redstone-arbitrum-prod",
"redstone-arbitrum-prod-node-4": "redstone-arbitrum-prod",
"redstone-arbitrum-prod-node-5": "redstone-arbitrum-prod",
};
export const getProviders = () => {
return providers;
};
@@ -348,3 +390,25 @@ export const findProviderDetailsByAddress = (address: string) => {
// throw new Error(`Public key not found for provider address: ${address}`);
return {};
};
export const findProviderNameByAddress = (address: string) => {
for (const providerName in providers) {
const details = providers[providerName];
if (
details.address === address ||
details.evmAddress.toLowerCase() === address.toLowerCase()
) {
return providerName;
}
}
return undefined;
};
export const getDataServiceId = (provider: string) => {
const dataServiceIdFromProviderName = providerToDataServiceId[provider];
if (dataServiceIdFromProviderName) {
return dataServiceIdFromProviderName;
} else {
return providerToDataServiceId[findProviderNameByAddress(provider)];
}
};

View File

@@ -3,7 +3,11 @@ import { Router } from "express";
// import tokens from "redstone-node/dist/src/config/tokens.json";
const TOKENS_CONFIG_URL =
"https://raw.githubusercontent.com/redstone-finance/redstone-oracles-monorepo/main/packages/oracle-node/src/config/tokens.json";
"https://raw.githubusercontent.com/redstone-finance/redstone-app/main/src/config/tokens.json";
export const getConfig = async () => {
return (await axios.get(TOKENS_CONFIG_URL)).data;
};
export const configs = (router: Router) => {
/**
@@ -11,7 +15,13 @@ export const configs = (router: Router) => {
* This endpoint creation was requested by JF#1885 on Discord
*/
router.get("/configs/tokens", async (req, res) => {
const tokensConfigResponse = await axios.get(TOKENS_CONFIG_URL);
res.json(tokensConfigResponse.data);
const tokensConfigResponse = await getConfig();
res.json(tokensConfigResponse);
});
};
export function throwExpiredApiError() {
throw new Error(
'This API is expired. You can switch to redstone-sdk https://www.npmjs.com/package/redstone-sdk. If this API is necessary for your application - send us an email: dev@redstone.finance and set endpoint redstone.setCacheApiUrl("https://expiring.b.redstone.finance/prices") which will be also expired in few days.'
);
}

View File

@@ -4,9 +4,10 @@ import { Package } from "../models/package";
import { Price } from "../models/price";
import { getProviderFromParams } from "../utils";
import { tryCleanCollection } from "../helpers/mongo";
import { enableLiteMode, cacheTTLMilliseconds } from "../config";
import { enableLiteMode, cacheTTLMilliseconds, } from "../config";
import { Router } from "express";
import { Document } from "mongoose";
import {throwExpiredApiError} from "./configs"
const dbItemToObj = (item: Document<unknown, any, Package> & Package) => {
return _.omit(item.toObject(), ["_id", "__v"]);
@@ -94,8 +95,8 @@ export const packages = (router: Router) => {
router.get(
"/packages/latest",
asyncHandler(async (req, res) => {
const initialMongoQuery = {};
return await findPackage(req, res, initialMongoQuery);
console.log("Getting latest packages")
throwExpiredApiError()
})
);
@@ -106,14 +107,8 @@ export const packages = (router: Router) => {
router.get(
"/packages",
asyncHandler(async (req, res) => {
if (!req.query.toTimestamp) {
throw new Error("toTimestamp query param is required");
}
const initialMongoQuery = {
timestamp: { $lte: req.query.toTimestamp },
};
return await findPackage(req, res, initialMongoQuery);
console.log("Getting packages by timestamp")
throwExpiredApiError()
})
);
};

View File

@@ -9,12 +9,20 @@ import {
maxLimitForPrices,
enableLiteMode,
} from "../config";
import { getConfig } from "./configs";
import { getDataServiceId } from "../providers";
import { Price, priceToObject } from "../models/price";
import { logEvent } from "../helpers/amplitude-event-logger";
import { assertValidSignature } from "../helpers/signature-verifier";
import { priceParamsToPriceObj, getProviderFromParams } from "../utils";
import { logger } from "../helpers/logger";
import { tryCleanCollection } from "../helpers/mongo";
import { requestDataPackages, fetchDataPackages } from "@redstone-finance/sdk";
import { providerToDataServiceId } from "../providers";
import axios from "axios";
import csvToJSON from "csv-file-to-json";
import { String } from "aws-sdk/clients/cloudsearch";
import { time } from "console";
export interface PriceWithParams
extends Omit<Price, "signature" | "evmSignature" | "liteEvmSignature"> {
@@ -259,23 +267,436 @@ interface QueryParams extends PriceWithParams {
providerPublicKey?: string;
}
const mapFromSdkToResponse = (dataPackage: any, provider: any) => {
return dataPackage.dataPackage.dataPoints.map((point: any) => {
const sourceMetadata = point.toObj().metadata.sourceMetadata;
let sourcesFormatted = {};
for (const [name, value] of Object.entries(sourceMetadata)) {
sourcesFormatted[name] = Number((value as any).value);
}
const timestamp = dataPackage.dataPackage.timestampMilliseconds;
return {
symbol: point.dataFeedId,
provider: provider.address,
value: point.toObj().value,
source: sourcesFormatted,
timestamp: timestamp,
providerPublicKey: provider.publicKey,
permawebTx: "mock-permaweb-tx",
version: "0.3",
};
});
};
const mapFromGatewayToResponse = (dataPackage: any, provider: any) => {
return dataPackage.dataPoints.map((point: any) => {
const sourceMetadata = point.metadata.sourceMetadata;
let sourcesFormatted = {};
for (const [name, value] of Object.entries(sourceMetadata)) {
sourcesFormatted[name] = Number((value as any).value);
}
const timestamp = dataPackage.timestampMilliseconds;
return {
symbol: point.dataFeedId,
provider: provider.address,
value: point.value,
source: sourcesFormatted,
timestamp: timestamp,
providerPublicKey: provider.publicKey,
permawebTx: "mock-permaweb-tx",
version: "0.3",
};
});
};
const toMap = (priceList: any) => {
let map = {};
for (const price of priceList) {
map[price.symbol] = price;
}
return map;
};
function validatePareter(parameter: string) {
const onlyLettersPattern = /^[A-Z a-z.0-9=/_$-]+$/;
if (!parameter.match(onlyLettersPattern)) {
throw new Error(`Invalid parameter: ${parameter}`);
}
return parameter;
}
async function requestInflux(query: String) {
const config = {
headers: {
Authorization: `Token ${process.env.INFLUXDB_TOKEN}`,
"Content-Type": "application/vnd.flux",
},
};
try {
const result = await axios.post(
`${process.env.INFLUXDB_URL}/api/v2/query?org=redstone`,
query,
config
);
const json = csvToJSON({ data: result.data });
return json;
} catch (error) {
console.error(error);
throw new Error("Request failed");
}
}
export const prices = (router: Router) => {
/**
* This endpoint is used for fetching prices data.
* It is used in redstone-api
*/
function shouldRunTestFeature(percentOfTestFeatureEnv) {
if (percentOfTestFeatureEnv) {
return Math.floor(Math.random() * 100) < Number(percentOfTestFeatureEnv);
} else {
return false;
}
}
async function handleByOracleGateway(req, res, dataServiceId, params) {
try {
const provider = await getProviderFromParams(
req.query as { provider: string }
);
const symbol = req.query.symbol as string;
const symbols = req.query.symbols as string;
if (symbol !== undefined && symbol !== "") {
const dataPackageResponse = await requestDataPackages({
dataServiceId: dataServiceId,
uniqueSignersCount: 1,
dataFeeds: [symbol],
});
const dataPackage = dataPackageResponse[symbol][0];
return res.json(mapFromSdkToResponse(dataPackage, provider));
} else if (symbol === "") {
return res.json([]);
} else if (symbols !== undefined) {
const tokens = symbols.split(",");
const dataPackages = await fetchDataPackages({
dataServiceId: dataServiceId,
});
return res.json(
toMap(
tokens
.filter((token) => dataPackages[token] !== undefined)
.map((token) => dataPackages[token][0])
.flatMap((dataPackage) =>
mapFromGatewayToResponse(dataPackage, provider)
)
)
);
} else {
const dataPackageResponse = await requestDataPackages({
dataServiceId: dataServiceId,
uniqueSignersCount: 1,
});
const dataPackage = dataPackageResponse["___ALL_FEEDS___"][0];
return res.json(toMap(mapFromSdkToResponse(dataPackage, provider)));
}
} catch (e) {
console.error(e);
console.log(`Failed running test feautre: ${JSON.stringify(params)}`);
throw e;
}
}
async function handleByInfluxWithSymbolAndInterval(
res,
params,
dataServiceId,
providerDetails
) {
console.log("Executing single token with interval");
if (
params.fromTimestamp === undefined ||
params.toTimestamp === undefined
) {
throw new Error(
`Param fromTimestamp and toTimestamp are required when using interval`
);
}
const start = Math.ceil((params.fromTimestamp - params.interval) / 1000);
const stop = Math.floor(params.toTimestamp / 1000);
const limit = params.limit !== undefined ? params.limit : 100000;
const offset = params.offset !== undefined ? params.offset : 0;
const request = `
from(bucket: "redstone")
|> range(start: ${start}, stop: ${stop})
|> filter(fn: (r) => r._measurement == "dataPackages")
|> filter(fn: (r) => r.dataFeedId == "${validatePareter(
params.symbol
)}")
|> filter(fn: (r) => r.dataServiceId == "${validatePareter(
dataServiceId
)}")
|> keep(columns: ["_time", "_value", "_field", "dataFeedId", "dataServiceId"])
|> aggregateWindow(every: ${
params.interval
}ms, fn: mean, createEmpty: false)
|> map(fn: (r) => ({ r with timestamp: int(v: r._time) / 1000000 }))
|> limit(n: ${limit}, offset: ${offset})
`;
const results = await requestInflux(request);
const sourceResults = results.filter(
(element) =>
element._field !== "value" && element._field !== "metadataValue"
);
const valueResults = results.filter(
(element) =>
element._field === "value" && element._field !== "metadataValue"
);
const mappedResults = valueResults.map((element) => {
const sourceResultsForTimestamp = sourceResults.filter(
(result) => result.timestamp === element.timestamp
);
const source = {};
for (let i = 0; i < sourceResultsForTimestamp.length; i++) {
const sourceName = sourceResultsForTimestamp[i]._field.replace(
"value-",
""
);
source[sourceName] = Number(sourceResultsForTimestamp[i]._value);
}
return {
symbol: element.dataFeedId,
provider: providerDetails.address,
value: Number(element._value),
source: source,
timestamp: Number(element.timestamp),
providerPublicKey: providerDetails.publicKey,
permawebTx: "mock-permaweb-tx",
version: "0.3",
};
});
console.log("Executed single token with interval");
return res.json(mappedResults);
}
async function handleByInfluxWithSymbolAndNoInterval(
res,
params,
providerDetails,
dataServiceId
) {
console.log("Executing single token with toTimestamp");
const limit = params.limit !== undefined ? Number(params.limit) : 1;
const offset = params.offset !== undefined ? Number(params.offset) : 0;
if (params.fromTimestamp !== undefined && limit + offset > 1000) {
throw new Error(
`When not passing fromTimestamp limit + offset can't be more than 1000, is: ${limit} + ${offset}`
);
}
const stop = params.toTimestamp
? Math.floor(params.toTimestamp / 1000)
: Math.ceil(Date.now() / 1000);
const searchWindow = Math.max(limit + offset, 3);
const start =
params.fromTimestamp !== undefined
? Math.ceil(params.fromTimestamp / 1000)
: stop - searchWindow * 60;
console.log(
`limit: ${limit}, offset: ${offset} Start: ${start}, stop: ${stop}`
);
const request = `
from(bucket: "redstone")
|> range(start: ${start}, stop: ${stop})
|> filter(fn: (r) => r._measurement == "dataPackages")
|> filter(fn: (r) => r.dataFeedId == "${validatePareter(
params.symbol
)}")
|> filter(fn: (r) => r.dataServiceId == "${validatePareter(
dataServiceId
)}")
|> keep(columns: ["_time", "_value", "_field", "dataFeedId", "dataServiceId"])
|> sort(columns: ["_time"], desc: true)
|> limit(n: ${limit}, offset: ${offset})
|> map(fn: (r) => ({ r with timestamp: int(v: r._time) / 1000000 }))
`;
const results = await requestInflux(request);
const sourceResults = results.filter(
(element) =>
element._field !== "value" && element._field !== "metadataValue"
);
const mappedResults = results
.filter(
(element) =>
element._field === "value" && element._field !== "metadataValue"
)
.map((element) => {
const sourceResultsForTimestamp = sourceResults.filter(
(result) => result.timestamp === element.timestamp
);
const source = {};
for (let i = 0; i < sourceResultsForTimestamp.length; i++) {
const sourceName = sourceResultsForTimestamp[i]._field.replace(
"value-",
""
);
source[sourceName] = Number(sourceResultsForTimestamp[i]._value);
}
return {
symbol: element.dataFeedId,
provider: providerDetails.address,
value: Number(element._value),
source: source,
timestamp: Number(element.timestamp),
providerPublicKey: providerDetails.publicKey,
permawebTx: "mock-permaweb-tx",
version: "0.3",
};
});
console.log("Executed single token with toTimestamp");
return res.json(mappedResults);
}
async function handleByInfluxWithManyTokens(
res,
params,
dataServiceId,
providerDetails
) {
let tokens = [];
if (params.symbols !== undefined) {
tokens = params.symbols.split(",");
}
console.log("Executing for many tokens");
const stop =
params.toTimestamp !== undefined
? Math.floor(params.toTimestamp / 1000)
: Math.ceil(Date.now() / 1000);
const start = stop - 2 * 60;
tokens.forEach((token) => validatePareter(token));
console.log(
`Start: ${start} stop ${stop}, tokens: ${JSON.stringify(tokens)}`
);
const request = `
from(bucket: "redstone")
|> range(start: ${start}, stop: ${stop})
|> filter(fn: (r) => r._measurement == "dataPackages")
|> filter(fn: (r) => r.dataServiceId == "${validatePareter(
dataServiceId
)}")
|> filter(fn: (r) => contains(value: r.dataFeedId, set: ${JSON.stringify(
tokens
)}))
|> keep(columns: ["_time", "_value", "_field", "dataFeedId", "dataServiceId"])
|> map(fn: (r) => ({ r with timestamp: int(v: r._time) / 1000000 }))
|> sort(columns: ["_time"], desc: true)
`;
const results = await requestInflux(request);
const sourceResults = results.filter(
(element) =>
element._field !== "value" && element._field !== "metadataValue"
);
const response = {};
results
.filter((element) => element._field === "value")
.forEach((element) => {
const timestampsForDataFeedId = [
...new Set(
results
.filter((result) => result.dataFeedId == element.dataFeedId)
.map((result) => result.timestamp)
),
];
timestampsForDataFeedId.sort();
if (
Number(
timestampsForDataFeedId[timestampsForDataFeedId.length - 1]
) === Number(element.timestamp)
) {
console.log("Filling timestamp");
const sourceResultsForTimestamp = sourceResults.filter(
(result) =>
result.timestamp === element.timestamp &&
result.dataFeedId === element.dataFeedId
);
const source = {};
for (let i = 0; i < sourceResultsForTimestamp.length; i++) {
const sourceName = sourceResultsForTimestamp[i]._field.replace(
"value-",
""
);
source[sourceName] = Number(sourceResultsForTimestamp[i]._value);
}
response[element.dataFeedId] = {
symbol: element.dataFeedId,
provider: providerDetails.address,
value: Number(element._value),
source: source,
timestamp: Number(element.timestamp),
providerPublicKey: providerDetails.publicKey,
permawebTx: "mock-permaweb-tx",
version: "0.3",
};
}
});
console.log("Executed for many tokens");
return res.json(response);
}
function getDateTimeString(timestamp) {
const date = new Date(timestamp).toLocaleDateString("pl-PL");
const time = new Date(timestamp).toLocaleTimeString("pl-PL");
return `${date} ${time}`;
}
function isOldDataRequest(params) {
const now = Date.now();
const days30Ago = Date.now() - 30 * 24 * 60 * 60 * 1000;
console.log(
`DEBUG ${params.fromTimestamp} ${days30Ago} ${params.fromTimestamp}`
);
if (params.fromTimestamp && days30Ago > Number(params.fromTimestamp)) {
console.log(
`isOldDataRequest with fromTimestamp: ${getDateTimeString(
days30Ago
)} > ${getDateTimeString(Number(params.fromTimestamp))}`
);
return true;
} else {
const toTimestamp = params.toTimestamp
? Number(params.toTimestamp)
: Date.now();
const limit = params.limit !== undefined ? Number(params.limit) : 1;
const offset = params.offset !== undefined ? Number(params.offset) : 0;
const goBackInTime = (limit + offset) * 60 * 1000;
const fromTimestamp = toTimestamp - goBackInTime;
const result = days30Ago > fromTimestamp;
console.log(
`isOldDataRequest no from result: ${result} toTimestamp: ${getDateTimeString(
toTimestamp
)} limit: ${params.limit} offset: ${
params.offset
} goBackInTime: ${goBackInTime} fromTimestamp: ${getDateTimeString(
fromTimestamp
)}`
);
return result;
}
}
router.get(
"/prices",
asyncHandler(async (req, res) => {
// Request validation
console.log(`Query: ${JSON.stringify(req.query)}`);
const params = req.query as unknown as QueryParams;
// Saving API read event in amplitude
logEvent({
eventName: "api-get-request",
eventProps: params,
ip: getIp(req),
});
const dataServiceId = getDataServiceId(req.query.provider as string);
getIp(req);
if (!params.fromTimestamp && !params.toTimestamp && !params.limit) {
return handleByOracleGateway(req, res, dataServiceId, params);
}
// Getting provider details
const providerDetails = await getProviderFromParams(params);
@@ -284,29 +705,90 @@ export const prices = (router: Router) => {
// If query params contain "symbol" we fetch price for this symbol
if (params.symbol !== undefined) {
let body: _.Omit<
Document<unknown, any, Price> & Price & { providerPublicKey: any },
"_id" | "__v"
>[];
if (params.interval !== undefined) {
body = await getPricesInTimeRangeForSingleToken(params);
if (
shouldRunTestFeature(process.env.TEST_SYMBOL_INTERVAL_PERCENT) &&
isOldDataRequest(params)
) {
console.log(
`Running TEST_SYMBOL_INTERVAL_PERCENT: ${JSON.stringify(
req.query
)}`
);
return handleByInfluxWithSymbolAndInterval(
res,
params,
dataServiceId,
providerDetails
);
}
return res.json(await getPricesInTimeRangeForSingleToken(params));
} else if (params.toTimestamp !== undefined) {
body = await getHistoricalPricesForSingleToken(params);
if (
shouldRunTestFeature(
process.env.TEST_SYMBOL_NO_INTERVAL_TO_TIMESTAMP_PERCENT
) &&
isOldDataRequest(params)
) {
console.log(
`Running TEST_SYMBOL_NO_INTERVAL_TO_TIMESTAMP_PERCENT: ${JSON.stringify(
req.query
)}`
);
return handleByInfluxWithSymbolAndNoInterval(
res,
params,
providerDetails,
dataServiceId
);
} else {
return res.json(await getHistoricalPricesForSingleToken(params));
}
} else {
body = await getLatestPricesForSingleToken(params);
if (
shouldRunTestFeature(
process.env.TEST_SYMBOL_NO_INTERVAL_NO_TO_TIMESTAMP_PERCENT
) &&
isOldDataRequest(params)
) {
console.log(
`Running TEST_SYMBOL_NO_INTERVAL_NO_TO_TIMESTAMP_PERCENT: ${JSON.stringify(
req.query
)}`
);
return handleByInfluxWithSymbolAndNoInterval(
res,
params,
providerDetails,
dataServiceId
);
} else {
return res.json(await getLatestPricesForSingleToken(params));
}
}
return res.json(body);
}
// Otherwise we fetch prices for many symbols
else {
let tokens = [];
if (params.symbols !== undefined) {
tokens = params.symbols.split(",");
}
params.tokens = tokens;
} else {
if (
shouldRunTestFeature(process.env.TEST_MANY_SYMBOLS_PERCENT) &&
isOldDataRequest(params)
) {
console.log(
`Running TEST_MANY_SYMBOLS_PERCENT: ${JSON.stringify(req.query)}`
);
return handleByInfluxWithManyTokens(
res,
params,
dataServiceId,
providerDetails
);
} else {
let tokens = [];
if (params.symbols !== undefined) {
tokens = params.symbols.split(",");
}
params.tokens = tokens;
const body = await getPriceForManyTokens(params);
return res.json(body);
return res.json(await getPriceForManyTokens(params));
}
}
})
);

4429
yarn.lock

File diff suppressed because it is too large Load Diff