From 24e52da6156c23f66eb3a3cb6a752f5418dc81c9 Mon Sep 17 00:00:00 2001 From: g1nt0ki <99907941+g1nt0ki@users.noreply.github.com> Date: Thu, 11 Jul 2024 19:15:42 +0300 Subject: [PATCH] pipe json files --- api2/cache.ts | 3 -- api2/cron-task/index.ts | 43 +++++++++++-------- api2/cron-task/storeCharts.ts | 77 ++++++++++++++++++++--------------- api2/file-cache.ts | 4 ++ api2/routes/index.ts | 36 ++++++++++------ src/utils/discord.ts | 1 - 6 files changed, 98 insertions(+), 66 deletions(-) diff --git a/api2/cache.ts b/api2/cache.ts index 2a43bbd..ded7359 100644 --- a/api2/cache.ts +++ b/api2/cache.ts @@ -27,9 +27,6 @@ export const cache: { historicalPrices?: DailyPeggedPrices[] } = {} -const MINUTES = 60 * 1000 -const HOUR = 60 * MINUTES - const cacheFile = 'stablecoin-cache' export async function initCache(cacheType = CacheType.API_SERVER) { diff --git a/api2/cron-task/index.ts b/api2/cron-task/index.ts index 2b1aaa2..fcd481c 100644 --- a/api2/cron-task/index.ts +++ b/api2/cron-task/index.ts @@ -24,12 +24,9 @@ async function run() { storeConfig(), storePeggedPrices(), ]) - // this also pulls data from ddb and sets to cache await storeCharts() const allStablecoinsData = await storeStablecoins({ peggedPrices: cache.peggedPrices }) - await storePrices() - await storeStablecoinChains() const allChainsSet: Set = new Set() const assetChainMap: { [asset: string]: Set @@ -43,18 +40,27 @@ async function run() { const chainChartMap: any = {} const recentProtocolData: any = {} - await storePeggedAssets() - await storeStablecoinDominance() - await storeChainChartData() + + const timeWrapper = { + storePrices, + storeStablecoinChains, + storePeggedAssets, + storeStablecoinDominance, + storeChainChartData, + alertOutdated, + } + + for (const key in timeWrapper) { + console.time(key) + await timeWrapper[key]() + console.timeEnd(key) + } await storeRouteData('stablecoins', allStablecoinsData) await storeRouteData('stablecoincharts2/all-dominance-chain-breakdown', { dominanceMap, chainChartMap, }) await storeRouteData('stablecoincharts2/recent-protocol-data', recentProtocolData) await saveCache() - await alertOutdated() - - async function storePeggedAssets() { for (const peggedAssetData of allStablecoinsData.peggedAssets) { const { id } = peggedAssetData @@ -111,6 +117,7 @@ async function run() { const data = await getChainData(chain) chainChartMap[getChainDisplayName(chain, true)] = data.aggregated await storeRouteData('stablecoincharts2/' + chain, data) + await storeRouteData('stablecoincharts/' + chain, data.aggregated) } catch (e) { console.error('Error fetching chain data', e) } @@ -119,19 +126,16 @@ async function run() { async function getChainData(chain: string) { let startTimestamp = chain === frontendKey ? allChartsStartTimestamp : undefined chain = chain === frontendKey ? 'all' : chain - const aggregated = removeEmptyItems(await craftChartsResponse({ chain, startTimestamp, })) + const aggregated = removeEmptyItems(await craftChartsResponse({ chain, startTimestamp, assetChainMap})) const breakdown: any = {} for (const [peggedAsset, chainMap] of Object.entries(assetChainMap)) { if (chain !== 'all' && !(chainMap as any).has(chain)) continue - const allPeggedAssetsData = await craftChartsResponse({ chain, peggedID: peggedAsset, startTimestamp }) + const allPeggedAssetsData = await craftChartsResponse({ chain, peggedID: peggedAsset, startTimestamp, assetChainMap }) if (chain === 'all') recentProtocolData[peggedAsset] = allPeggedAssetsData.slice(-32) - if (peggedAsset === '3') // special case for terraUSD which is hardcoded as 0 after the depeg - breakdown[peggedAsset] = allPeggedAssetsData - else - breakdown[peggedAsset] = removeEmptyItems(allPeggedAssetsData) + breakdown[peggedAsset] = removeEmptyItems(allPeggedAssetsData) } @@ -185,9 +189,14 @@ async function alertOutdated() { } - +// filter out empty items from array but retain the last item as is function removeEmptyItems(array: any[] = []) { - return array.map(removeEmpty).filter((item: any) => item) + if (array.length < 2) return array + const last = array[array.length - 1] + let items = array.slice(0, array.length - 1) + items = items.map(removeEmpty).filter((item: any) => item) + items.push(last) + return items } function removeEmpty(item: any) { diff --git a/api2/cron-task/storeCharts.ts b/api2/cron-task/storeCharts.ts index 6b13c05..11c2ada 100644 --- a/api2/cron-task/storeCharts.ts +++ b/api2/cron-task/storeCharts.ts @@ -43,9 +43,9 @@ export default async function handler() { cache.rateTimestamps = rateTimestamps console.timeEnd(timeKey) + /* console.time('storeCharts') - - /* const commonOptions = { + const commonOptions = { lastPrices, historicalPrices, historicalRates, @@ -89,14 +89,26 @@ async function getPeggedAssetsData() { const lastBalance = await getLastRecord(hourlyPeggedBalances(pegged.id)); const allBalances = cache.peggedAssetsData[pegged.id]?.balances || [] const highestSK = allBalances.reduce((highest, item) => Math.max(highest, item.SK), -1) - const newItems = await getHistoricalValues(dailyPeggedBalances(pegged.id), highestSK) - allBalances.push(...newItems) + const pullData = !lastBalance || (lastBalance.SK - highestSK > secondsInDay) // if last item on the table is more than 1 day older than the last balance, pull data + if (pullData) { + console.info('fetching new data for', pegged.id) + const newItems = await getHistoricalValues(dailyPeggedBalances(pegged.id), highestSK) + allBalances.push(...newItems) + } cache.peggedAssetsData[pegged.id] = { balances: allBalances, lastBalance } })) + const { SK, ...terraLastBalance } = cache.peggedAssetsData['3'].lastBalance + cache.peggedAssetsData['3'].balances.forEach((item: any, index: any) => { // the token deppeged after this + if (item.SK > 1655891865) { + cache.peggedAssetsData['3'].balances[index] = { ...item, ...terraLastBalance } + return + } + }) + return cache.peggedAssetsData } @@ -155,11 +167,17 @@ function extractResultOfBinarySearch(ar: any[], binarySearchResult: number) { return ar[binarySearchResult]; } +const _assetCache: any = {}; +let lastDailyTimestamp = 0; + export function craftChartsResponse( - { chain = 'all', peggedID, startTimestamp }: { + { chain = 'all', peggedID, startTimestamp, assetChainMap, }: { chain?: string, peggedID?: string, startTimestamp?: string | number, + assetChainMap: { + [asset: string]: Set + } } ) { if (startTimestamp && typeof startTimestamp === 'string') startTimestamp = parseInt(startTimestamp) @@ -181,7 +199,6 @@ export function craftChartsResponse( }; const normalizedChain = normalizeChain(chain!); - let lastDailyTimestamp = 0; /* * whenever "chain" and "peggedAsset", and peggedAsset has no entry in lastBalance for that chain, @@ -191,6 +208,15 @@ export function craftChartsResponse( if (peggedID && pegged.id !== peggedID) { return; } + const chainMap = assetChainMap[pegged.id]; + if (chain !== "all" && !chainMap.has(chain)) return; // if the coin is not found an given chain, dont process it + if (!_assetCache[pegged.id]) addToAssetCache(pegged); + return _assetCache[pegged.id]; + }).filter((i) => i); + + + function addToAssetCache(pegged: any) { + const { balance: lastBalance, balances } = peggedAssetsData[pegged.id] // if (chain !== "all" && !lastBalance?.[normalizedChain]) @@ -221,12 +247,12 @@ export function craftChartsResponse( ); lastDailyTimestamp = Math.max(lastDailyTimestamp, lastTimestamp); - return { + _assetCache[pegged.id] = { pegged, historicalBalance: historicalBalance.Items, lastTimestamp, }; - }) + } const lastDailyItem = historicalPrices[historicalPrices.length - 1]; if ( @@ -239,51 +265,36 @@ export function craftChartsResponse( } historicalPeggedBalances.map((peggedBalance) => { - if (peggedBalance === undefined) { - return; - } let { historicalBalance, pegged, lastTimestamp } = peggedBalance; const pegType = pegged.pegType; const peggedGeckoID = pegged.gecko_id; const lastBalance = historicalBalance[historicalBalance.length - 1]; + // fill missing data with last available data while (lastTimestamp < lastDailyTimestamp) { - lastTimestamp = getClosestDayStartTimestamp( - lastTimestamp + 24 * secondsInHour - ); + lastTimestamp = getClosestDayStartTimestamp(lastTimestamp + 24 * secondsInHour); historicalBalance.push({ ...lastBalance, SK: lastTimestamp, }); + peggedBalance.lastTimestamp = lastTimestamp; } historicalBalance.map((item: any) => { - const timestamp = getClosestDayStartTimestamp(item.SK); + const timestamp = getClosestDayStartTimestamp(item.SK) let itemBalance: any = {}; - const closestPriceIndex = timestampsBinarySearch( - priceTimestamps, - timestamp, - pricesCompareFn - ); - const closestPrices = extractResultOfBinarySearch( - historicalPrices, - closestPriceIndex - ); + const closestPriceIndex = timestampsBinarySearch(priceTimestamps, timestamp, pricesCompareFn); + const closestPrices = extractResultOfBinarySearch(historicalPrices, closestPriceIndex); + let fallbackPrice = 1; const historicalPrice = closestPrices?.prices[peggedGeckoID]; if (pegType === "peggedVAR") { fallbackPrice = 0; } else if (pegType !== "peggedUSD" && !historicalPrice) { - const closestRatesIndex = timestampsBinarySearch( - rateTimestamps, - timestamp, - ratesCompareFn - ); - const closestRates = extractResultOfBinarySearch( - historicalRates, - closestRatesIndex - ); + const closestRatesIndex = timestampsBinarySearch(rateTimestamps, timestamp, ratesCompareFn); + + const closestRates = extractResultOfBinarySearch(historicalRates, closestRatesIndex); const ticker = pegType.slice(-3); fallbackPrice = 1 / closestRates?.rates?.[ticker]; if (typeof fallbackPrice !== "number") { diff --git a/api2/file-cache.ts b/api2/file-cache.ts index cd4398e..037bf95 100644 --- a/api2/file-cache.ts +++ b/api2/file-cache.ts @@ -36,6 +36,10 @@ export async function readRouteData(subPath: string) { return readFileData(subPath) } +export function getRouteDataPath(subPath: string) { + return path.join(ROUTES_DATA_DIR, subPath) +} + async function storeData(subPath: string, data: any) { const filePath = path.join(CACHE_DIR!, subPath) const dirPath = path.dirname(filePath) diff --git a/api2/routes/index.ts b/api2/routes/index.ts index f6d7453..ad23e8e 100644 --- a/api2/routes/index.ts +++ b/api2/routes/index.ts @@ -1,14 +1,18 @@ import * as HyperExpress from "hyper-express"; import { successResponse, errorResponse, errorWrapper as ew } from "./utils"; -import { readRouteData } from "../file-cache"; +import { getRouteDataPath, readRouteData } from "../file-cache"; import { normalizeChain } from "../../src/utils/normalizeChain"; +import { createReadStream } from 'fs' + +const breakdownData: { + [chain: string]: any +} = {} export default function setRoutes(router: HyperExpress.Router) { router.get("/config", defaultFileHandler); router.get("/rates", defaultFileHandler); - router.get("/stablecoin", defaultFileHandler); router.get("/stablecoinprices", defaultFileHandler); router.get("/stablecoinchains", defaultFileHandler); router.get("/stablecoins", defaultFileHandler); @@ -24,22 +28,26 @@ export default function setRoutes(router: HyperExpress.Router) { chain = normalizeChain(chain) return fileResponse('/stablecoincharts2/' + chain, res); })); - // router.get("/stablecoincharts2/all-llama-app", defaultFileHandler); - // router.get("/stablecoincharts2/all-dominance-chain-breakdown", defaultFileHandler); - // router.get("/stablecoincharts2/recent-protocol-data", defaultFileHandler); // TOO: nuke this route to reduce load on the server router.get("/stablecoincharts/:chain", ew(async (req: any, res: any) => { - const { chain } = req.path_parameters; - let { stablecoin, starts, startts } = req.query; + let { chain } = req.path_parameters; + let { stablecoin, starts, startts } = req.query + chain = normalizeChain(chain) + if (!stablecoin) return fileResponse('/stablecoincharts/' + chain, res); const startTimestamp = starts ?? startts - let data = { breakdown: {}, aggregated: [] } + let data = [] try { - data = await readRouteData('stablecoincharts2/' + normalizeChain(chain)) - } catch (e) { } + if (!breakdownData[chain]) { + breakdownData[chain] = readRouteData('stablecoincharts2/' + chain) + breakdownData[chain] = (await breakdownData[chain] as any).breakdown || {} + } + data = breakdownData[chain][stablecoin] + } catch (e) { + console.error(e) + } - data = (stablecoin ? data.breakdown[stablecoin] : data.aggregated) ?? [] if (startTimestamp) data = (data as any).filter((d: any) => d.timestamp >= startTimestamp) @@ -55,7 +63,11 @@ export default function setRoutes(router: HyperExpress.Router) { async function fileResponse(filePath: string, res: HyperExpress.Response) { try { res.set('Cache-Control', 'public, max-age=1800'); // Set caching to 30 minutes - res.json(await readRouteData(filePath)) + // set response headers as json + res.setHeader('Content-Type', 'application/json'); + // res.json(await readRouteData(filePath)) + const fileStream = createReadStream(getRouteDataPath(filePath)) + fileStream.pipe(res) } catch (e) { console.error(e); return errorResponse(res, 'Internal server error', { statusCode: 500 }) diff --git a/src/utils/discord.ts b/src/utils/discord.ts index 1f0ac94..526d3d8 100644 --- a/src/utils/discord.ts +++ b/src/utils/discord.ts @@ -27,5 +27,4 @@ export async function sendMessage( content: formattedMessage, }), }).then((body) => body.json()); - console.log("discord", response); }