diff --git a/coins/package-lock.json b/coins/package-lock.json index 8674221082..ba6b020f68 100644 --- a/coins/package-lock.json +++ b/coins/package-lock.json @@ -16,6 +16,7 @@ "@sentry/tracing": "^6.19.7", "@solana/web3.js": "^1.73.3", "@supercharge/promise-pool": "^2.1.0", + "ajv": "^6.12.6", "axios": "^1.6.7", "crypto-js": "^4.2.0", "dayjs": "^1.11.10", @@ -25,6 +26,7 @@ "graphql": "^16.6.0", "graphql-request": "^5.1.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "node-fetch": "^2.7.0", "p-limit": "^3.1.0", "path": "^0.12.7", @@ -6657,7 +6659,6 @@ "version": "6.12.6", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -10412,8 +10413,7 @@ "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", - "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", - "dev": true + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" }, "node_modules/fast-glob": { "version": "3.3.2", @@ -10434,8 +10434,7 @@ "node_modules/fast-json-stable-stringify": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", - "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==", - "dev": true + "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==" }, "node_modules/fast-safe-stringify": { "version": "2.1.1", @@ -14127,8 +14126,7 @@ "node_modules/json-schema-traverse": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==" }, "node_modules/json-stringify-safe": { "version": "5.0.1", @@ -14376,6 +14374,14 @@ "uuid": "bin/uuid" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", @@ -16550,7 +16556,6 @@ "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", - "dev": true, "engines": { "node": ">=6" } @@ -19880,7 +19885,6 @@ "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", - "dev": true, "dependencies": { "punycode": "^2.1.0" } diff --git a/coins/package.json b/coins/package.json index d185970365..105edd3261 100644 --- a/coins/package.json +++ b/coins/package.json @@ -53,6 +53,7 @@ "@sentry/tracing": "^6.19.7", "@solana/web3.js": "^1.73.3", "@supercharge/promise-pool": "^2.1.0", + "ajv": "^6.12.6", "axios": "^1.6.7", "crypto-js": "^4.2.0", "dayjs": "^1.11.10", @@ -62,6 +63,7 @@ "graphql": "^16.6.0", "graphql-request": "^5.1.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "node-fetch": "^2.7.0", "p-limit": "^3.1.0", "path": "^0.12.7", diff --git a/coins/src/adapters/bridges/index.ts b/coins/src/adapters/bridges/index.ts index aa6c79fcec..b7d153d070 100644 --- a/coins/src/adapters/bridges/index.ts +++ b/coins/src/adapters/bridges/index.ts @@ -38,8 +38,8 @@ import fraxtal from "./fraxtal"; import symbiosis from "./symbiosis"; import fuel from "./fuel"; import zircuit from "./zircuit"; -import morph from './morph' -import aptos from './aptosFa' +import morph from "./morph"; +import aptos from "./aptosFa"; export type Token = | { @@ -108,11 +108,12 @@ export const bridges = [ fuel, zircuit, morph, - aptos + aptos, ].map(normalizeBridgeResults) as Bridge[]; import { batchGet, batchWrite } from "../../utils/shared/dynamodb"; import { getCurrentUnixTimestamp } from "../../utils/date"; +import produceKafkaTopics from "../../utils/coins3/produce"; const craftToPK = (to: string) => (to.includes("#") ? to : `asset#${to}`); @@ -206,50 +207,8 @@ async function _storeTokensOfBridge(bridge: Bridge) { }), ); - // const writes2: Coin[] = []; - // const data = await readCoins2( - // tokens.map((t: Token) => ({ - // key: t.to.includes("coingecko#") ? t.to.replace("#", ":") : t.to, - // timestamp: getCurrentUnixTimestamp(), - // })), - // ); - // tokens.map(async (token) => { - // const to = token.to.includes("coingecko#") - // ? token.to.replace("#", ":") - // : token.to; - // if (!(to in data)) return; - // let PK: string = token.from.includes("coingecko#") - // ? token.from.replace("#", ":") - // : token.from.substring(token.from.indexOf("#") + 1); - // const chain = PK.split(":")[0]; - // let decimals: number, symbol: string; - // if ("getAllInfo" in token) { - // try { - // const newToken = await token.getAllInfo(); - // decimals = newToken.decimals; - // symbol = newToken.symbol; - // } catch (e) { - // console.log("Skipping token", PK, e); - // return; - // } - // } else { - // decimals = token.decimals; - // symbol = token.symbol; - // } - // writes2.push({ - // timestamp: getCurrentUnixTimestamp(), - // price: data[to].price, - // confidence: Math.min(data[to].confidence, 0.9), - // key: PK, - // chain, - // adapter: "bridges", - // symbol, - // decimals, - // }); - // }); - await batchWrite(writes, true); - // await batchWrite2(writes2, true, undefined, `bridge index ${i}`); + await produceKafkaTopics(writes, ["coins-metadata"]); return tokens; } export async function storeTokens() { diff --git a/coins/src/adapters/utils/database.ts b/coins/src/adapters/utils/database.ts index 138ce9fe23..8a64a13b05 100644 --- a/coins/src/adapters/utils/database.ts +++ b/coins/src/adapters/utils/database.ts @@ -16,6 +16,7 @@ import { batchWrite2, translateItems } from "../../../coins2"; const confidenceThreshold: number = 0.3; import pLimit from "p-limit"; import { sliceIntoChunks } from "@defillama/sdk/build/util"; +import produceKafkaTopics from "../../utils/coins3/produce"; const rateLimited = pLimit(10); process.env.tableName = "prod-coins-table"; @@ -396,6 +397,7 @@ export async function batchWriteWithAlerts( const filteredItems: AWS.DynamoDB.DocumentClient.PutItemInputAttributeMap[] = await checkMovement(items, previousItems); await batchWrite(filteredItems, failOnError); + await produceKafkaTopics(filteredItems.filter((i) => i.SK == 0) as any[]); } export async function batchWrite2WithAlerts( items: AWS.DynamoDB.DocumentClient.PutItemInputAttributeMap[], diff --git a/coins/src/scripts/coingecko.ts b/coins/src/scripts/coingecko.ts index c7c20a9ac9..f63774b8b5 100644 --- a/coins/src/scripts/coingecko.ts +++ b/coins/src/scripts/coingecko.ts @@ -5,7 +5,7 @@ import { Coin, CoinMetadata, iterateOverPlatforms, - staleMargin + staleMargin, } from "../utils/coingeckoPlatforms"; import sleep from "../utils/shared/sleep"; import { getCurrentUnixTimestamp, toUNIXTimestamp } from "../utils/date"; @@ -14,6 +14,7 @@ import { batchReadPostgres, getRedisConnection } from "../../coins2"; import chainToCoingeckoId from "../../../common/chainToCoingeckoId"; import { decimals, symbol } from "@defillama/sdk/build/erc20"; import { Connection, PublicKey } from "@solana/web3.js"; +import produceKafkaTopics, { Dynamo } from "../utils/coins3/produce"; enum COIN_TYPES { over100m = "over100m", @@ -63,32 +64,43 @@ interface IdToSymbol { } async function storeCoinData(coinData: Write[]) { - return batchWrite( - coinData - .map((c) => ({ - PK: c.PK, - SK: 0, - price: c.price, - mcap: c.mcap, - timestamp: c.timestamp, - symbol: c.symbol, - confidence: c.confidence, - })) - .filter((c: Write) => c.symbol != null), - false, - ); -} - -async function storeHistoricalCoinData(coinData: Write[]) { - return batchWrite( - coinData.map((c) => ({ - SK: c.SK, + const items = coinData + .map((c) => ({ PK: c.PK, + SK: 0, price: c.price, + mcap: c.mcap, + timestamp: c.timestamp, + symbol: c.symbol, confidence: c.confidence, - })), - false, - ); + })) + .filter((c: Write) => c.symbol != null); + await Promise.all([ + produceKafkaTopics( + items.map((i) => ({ adapter: "coingecko", decimals: 0, ...i } as Dynamo)), + ), + batchWrite(items, false), + ]); +} + +async function storeHistoricalCoinData(coinData: Write[]) { + const items = coinData.map((c) => ({ + SK: c.SK, + PK: c.PK, + price: c.price, + confidence: c.confidence, + })); + await Promise.all([ + produceKafkaTopics( + items.map((i) => ({ + adapter: "coingecko", + timestamp: i.SK, + ...i, + })) as Dynamo[], + ["coins-timeseries"], + ), + batchWrite(items, false), + ]); } let solanaTokens: Promise; @@ -278,6 +290,7 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { pricesAndMcaps[c.PK] = { price: c.price, mcap: c.mcap }; }); + const kafkaItems: any[] = []; await Promise.all( filteredCoins.map(async (coin) => iterateOverPlatforms( @@ -305,7 +318,7 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { return; } - await ddb.put({ + const item = { PK, SK: 0, created, @@ -313,14 +326,22 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { symbol, redirect: cgPK(coin.id), confidence: 0.99, - }); + }; + kafkaItems.push(item); + await ddb.put(item); }, coinPlatformData, ), ), ); - await deleteStaleKeysPromise; + await Promise.all([ + produceKafkaTopics( + kafkaItems.map((i) => ({ adapter: "coingecko", ...i })), + ["coins-metadata"], + ), + deleteStaleKeysPromise, + ]); } const HOUR = 3600; @@ -357,20 +378,27 @@ async function getAndStoreHourly( (c: any) => c.timestamp, ); - await batchWrite( - coinData.prices - .filter((price) => { - const ts = toUNIXTimestamp(price[0]); - return !writtenTimestamps[ts]; - }) - .map((price) => ({ - SK: toUNIXTimestamp(price[0]), - PK, - price: price[1], - confidence: 0.99, - })), - false, - ); + const items = coinData.prices + .filter((price) => { + const ts = toUNIXTimestamp(price[0]); + return !writtenTimestamps[ts]; + }) + .map((price) => ({ + SK: toUNIXTimestamp(price[0]), + PK, + price: price[1], + confidence: 0.99, + })); + + await Promise.all([ + produceKafkaTopics( + items.map( + (i) => ({ adapter: "coingecko", timestamp: i.SK, ...i }), + ["coins-timeseries"], + ), + ), + batchWrite(items, false), + ]); } async function fetchCoingeckoData( @@ -427,7 +455,7 @@ async function triggerFetchCoingeckoData(hourly: boolean, coinType?: string) { let coins = (await fetch( `https://pro-api.coingecko.com/api/v3/coins/list?include_platform=true&x_cg_pro_api_key=${process.env.CG_KEY}`, ).then((r) => r.json())) as Coin[]; - + if (coinType || hourly) { const metadatas = await getCGCoinMetadatas( coins.map((coin) => coin.id), diff --git a/coins/src/scripts/defiCoins.ts b/coins/src/scripts/defiCoins.ts index bac4c32426..6dff2ff211 100644 --- a/coins/src/scripts/defiCoins.ts +++ b/coins/src/scripts/defiCoins.ts @@ -1,7 +1,6 @@ require("dotenv").config(); import { batchWriteWithAlerts, - batchWrite2WithAlerts, filterWritesWithLowConfidence, } from "../adapters/utils/database"; import { withTimeout } from "../../../defi/src/utils/shared/withTimeout"; @@ -53,9 +52,9 @@ async function storeDefiCoins() { true, ), ]); - await batchWrite2WithAlerts( - resultsWithoutDuplicates.slice(i, i + step), - ); + // await batchWrite2WithAlerts( + // resultsWithoutDuplicates.slice(i, i + step), + // ); } } catch (e) { console.error(`ERROR: ${adapterKey} adapter failed ${e}`); diff --git a/coins/src/utils/coins3/cli/consume.ts b/coins/src/utils/coins3/cli/consume.ts new file mode 100644 index 0000000000..a970a5d975 --- /dev/null +++ b/coins/src/utils/coins3/cli/consume.ts @@ -0,0 +1,25 @@ +import { topics } from "../jsonValidation"; +import { getConsumer } from "../kafka"; + +const run = async () => { + await Promise.all( + topics.map(async (topic) => { + const consumer = await getConsumer(topic); + await consumer.subscribe({ topic, fromBeginning: true }); + + await consumer.run({ + eachMessage: async ({ topic, partition, message }: any) => { + console.log({ + topic, + partition, + offset: message.offset, + value: message.value.toString(), + }); + }, + }); + }), + ); +}; + +run().catch(console.error); +// ts-node coins/src/utils/coins3/cli/consume.ts diff --git a/coins/src/utils/coins3/cli/newTopic.ts b/coins/src/utils/coins3/cli/newTopic.ts new file mode 100644 index 0000000000..d36f3912a5 --- /dev/null +++ b/coins/src/utils/coins3/cli/newTopic.ts @@ -0,0 +1,26 @@ +import { getKafka } from "../kafka"; + +const createTopic = async () => { + const kafka = getKafka(); + const admin = kafka.admin(); + await admin.connect(); + + // Create a topic + await admin.createTopics({ + topics: [ + { + topic: "current", // Replace with your topic name + numPartitions: 1, // Number of partitions + replicationFactor: 1, // Replication factor + }, + ], + }); + + console.log("Topic created successfully"); + + // Disconnect the admin client + await admin.disconnect(); +}; + +createTopic().catch(console.error); +// ts-node coins/src/utils/coins3/cli/newTopic.ts diff --git a/coins/src/utils/coins3/jsonValidation.ts b/coins/src/utils/coins3/jsonValidation.ts new file mode 100644 index 0000000000..eba8ea37e6 --- /dev/null +++ b/coins/src/utils/coins3/jsonValidation.ts @@ -0,0 +1,64 @@ +import Ajv from "ajv"; + +type Schema = { + type: string; + properties: { [prop: string]: { type: string } }; + required: string[]; + additionalProperties: Boolean; +}; + +const schemas: { [topic: string]: Schema } = { + "coins-metadata": { + type: "object", + properties: { + decimals: { type: "integer" }, + symbol: { type: "string" }, + address: { type: "string" }, + pid: { type: "string" }, + chain: { type: "string" }, + source: { type: "string" }, + redirect: { type: "string" }, + }, + required: ["decimals", "symbol", "address", "pid"], + additionalProperties: false, + }, + "coins-current": { + type: "object", + properties: { + pid: { type: "string" }, + price: { type: "number" }, + confidence: { type: "number" }, + source: { type: "string" }, + mcap: { type: "number" }, + }, + required: ["pid", "price"], + additionalProperties: false, + }, + "coins-timeseries": { + type: "object", + properties: { + ts: { type: "integer" }, + pid: { type: "string" }, + price: { type: "number" }, + confidence: { type: "number" }, + source: { type: "string" }, + mcap: { type: "number" }, + }, + required: ["ts", "pid", "price"], + additionalProperties: false, + }, +}; + +const ajv = new Ajv(); + +export function validate(data: object, topic: Topic) { + const comp = ajv.compile(schemas[topic]); + const valid = comp(data); + if (!valid) { + console.log(comp.errors); + throw new Error(`${topic} validation error`); + } +} + +export const topics: string[] = Object.keys(schemas); +export type Topic = keyof typeof schemas; diff --git a/coins/src/utils/coins3/kafka.ts b/coins/src/utils/coins3/kafka.ts new file mode 100644 index 0000000000..4c3bf15a3c --- /dev/null +++ b/coins/src/utils/coins3/kafka.ts @@ -0,0 +1,36 @@ +import { Consumer, Kafka, Producer } from "kafkajs"; + +let kafka: Kafka; +let producer: Producer; +const consumers: { [groupId: string]: Consumer } = {}; + +export function getKafka(): Kafka { + if (!kafka) { + const kafkaConfig = process.env.KAFKA_CLIENT_CONFIG; + if (!kafkaConfig) { + throw new Error("Missing KAFKA_CLIENT_CONFIG"); + } + const [brokers, username, password] = kafkaConfig.split("---"); + kafka = new Kafka({ + clientId: "my-app", + brokers: brokers.split(","), + ssl: { + rejectUnauthorized: false, // Allow self-signed certificates + }, + sasl: { mechanism: "scram-sha-256", username, password }, + }); + } + return kafka; +} +export async function getConsumer(groupId: any): Promise { + if (!groupId) throw new Error("Missing groupId"); + if (!consumers[groupId]) + consumers[groupId] = getKafka().consumer({ groupId }); + await consumers[groupId].connect(); + return consumers[groupId]; +} +export async function getProducer(): Promise { + if (!producer) producer = getKafka().producer(); + await producer.connect(); + return producer; +} diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts new file mode 100644 index 0000000000..5b6b6aa09e --- /dev/null +++ b/coins/src/utils/coins3/produce.ts @@ -0,0 +1,103 @@ +import { Message, Producer } from "kafkajs"; +import { Topic, topics as allTopics, validate } from "./jsonValidation"; +import { getProducer } from "./kafka"; + +export type Dynamo = { + SK: number; + PK: string; + adapter: string; + confidence: number; + price: number; + redirect?: string; + decimals?: number; + symbol?: string; + timestamp?: number; + mcap?: number; +}; + +async function produceTopics( + items: Dynamo[], + topic: Topic, + producer: Producer, +) { + const messages: Message[] = []; + + items.map((item) => { + const { symbol, decimals, price } = item; + if (topic != "coins-metadata" && !price) return; + if (topic == "coins-metadata" && (!symbol || decimals == null)) return; + const message: object = convertToMessage(item, topic); + validate(message, topic); + messages.push({ value: JSON.stringify(message) }); + }); + + await producer.send({ topic: `${topic}`, messages }); +} +function convertToMessage(item: Dynamo, topic: Topic): object { + const { + PK, + symbol, + decimals, + price, + confidence, + timestamp, + adapter, + redirect, + mcap, + } = item; + + const { chain, address, pid } = splitPk(PK); + const redirectPid = redirect ? splitPk(redirect).pid : undefined; + + switch (topic) { + case "coins-metadata": + return { + symbol, + decimals: Number(decimals), + address, + pid, + chain, + source: adapter, + redirect: redirectPid, + }; + case "coins-current": + return { pid, price, confidence, source: adapter, mcap }; + case "coins-timeseries": + return { pid, price, confidence, source: adapter, ts: timestamp, mcap }; + default: + throw new Error(`Topic '${topic}' is not valid`); + } +} +function splitPk(pk: string): { chain: string; address: string; pid: string } { + const assetPrefix: string = "asset#"; + const coingeckoPrefix: string = "coingecko#"; + + if (pk.toLowerCase().startsWith(coingeckoPrefix)) { + const address = pk.substring(coingeckoPrefix.length).toLowerCase(); + return { + chain: "coingecko", + address, + pid: `coingecko:${address}`, + }; + } + + if (pk.startsWith(assetPrefix)) pk = pk.substring(assetPrefix.length); + const chain = pk.split(":")[0].toLowerCase(); + const address = pk.substring(pk.split(":")[0].length + 1).toLowerCase(); + return { chain, address, pid: `${chain}:${address}` }; +} + +export default async function produce( + items: Dynamo[], + topics: Topic[] = allTopics, +) { + if (!items.length) return; + const invalidTopic = topics.find((t: any) => { + !allTopics.includes(t); + }); + if (invalidTopic) throw new Error(`invalid topic: ${invalidTopic}`); + const producer: Producer = await getProducer(); + await Promise.all( + topics.map((topic: Topic) => produceTopics(items, topic, producer)), + ); +}