From fb76fde76ea8497a503d903134214f3e491d01d3 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 26 Nov 2024 16:03:51 +0000 Subject: [PATCH 1/9] kafka --- coins/package-lock.json | 22 ++++--- coins/package.json | 2 + coins/src/adapters/utils/database.ts | 2 + coins/src/scripts/defiCoins.ts | 7 +-- coins/src/utils/coins3/cli/consume.ts | 25 ++++++++ coins/src/utils/coins3/cli/newTopic.ts | 26 ++++++++ coins/src/utils/coins3/jsonValidation.ts | 61 +++++++++++++++++++ coins/src/utils/coins3/kafka.ts | 36 +++++++++++ coins/src/utils/coins3/produce.ts | 76 ++++++++++++++++++++++++ 9 files changed, 244 insertions(+), 13 deletions(-) create mode 100644 coins/src/utils/coins3/cli/consume.ts create mode 100644 coins/src/utils/coins3/cli/newTopic.ts create mode 100644 coins/src/utils/coins3/jsonValidation.ts create mode 100644 coins/src/utils/coins3/kafka.ts create mode 100644 coins/src/utils/coins3/produce.ts diff --git a/coins/package-lock.json b/coins/package-lock.json index 8674221082..ba6b020f68 100644 --- a/coins/package-lock.json +++ b/coins/package-lock.json @@ -16,6 +16,7 @@ "@sentry/tracing": "^6.19.7", "@solana/web3.js": "^1.73.3", "@supercharge/promise-pool": "^2.1.0", + "ajv": "^6.12.6", "axios": "^1.6.7", "crypto-js": "^4.2.0", "dayjs": "^1.11.10", @@ -25,6 +26,7 @@ "graphql": "^16.6.0", "graphql-request": "^5.1.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "node-fetch": "^2.7.0", "p-limit": "^3.1.0", "path": "^0.12.7", @@ -6657,7 +6659,6 @@ "version": "6.12.6", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -10412,8 +10413,7 @@ "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", - "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", - "dev": true + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" }, "node_modules/fast-glob": { "version": "3.3.2", @@ -10434,8 +10434,7 @@ "node_modules/fast-json-stable-stringify": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", - "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==", - "dev": true + "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==" }, "node_modules/fast-safe-stringify": { "version": "2.1.1", @@ -14127,8 +14126,7 @@ "node_modules/json-schema-traverse": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==" }, "node_modules/json-stringify-safe": { "version": "5.0.1", @@ -14376,6 +14374,14 @@ "uuid": "bin/uuid" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", @@ -16550,7 +16556,6 @@ "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", - "dev": true, "engines": { "node": ">=6" } @@ -19880,7 +19885,6 @@ "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", - "dev": true, "dependencies": { "punycode": "^2.1.0" } diff --git a/coins/package.json b/coins/package.json index d185970365..105edd3261 100644 --- a/coins/package.json +++ b/coins/package.json @@ -53,6 +53,7 @@ "@sentry/tracing": "^6.19.7", "@solana/web3.js": "^1.73.3", "@supercharge/promise-pool": "^2.1.0", + "ajv": "^6.12.6", "axios": "^1.6.7", "crypto-js": "^4.2.0", "dayjs": "^1.11.10", @@ -62,6 +63,7 @@ "graphql": "^16.6.0", "graphql-request": "^5.1.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "node-fetch": "^2.7.0", "p-limit": "^3.1.0", "path": "^0.12.7", diff --git a/coins/src/adapters/utils/database.ts b/coins/src/adapters/utils/database.ts index 138ce9fe23..8a64a13b05 100644 --- a/coins/src/adapters/utils/database.ts +++ b/coins/src/adapters/utils/database.ts @@ -16,6 +16,7 @@ import { batchWrite2, translateItems } from "../../../coins2"; const confidenceThreshold: number = 0.3; import pLimit from "p-limit"; import { sliceIntoChunks } from "@defillama/sdk/build/util"; +import produceKafkaTopics from "../../utils/coins3/produce"; const rateLimited = pLimit(10); process.env.tableName = "prod-coins-table"; @@ -396,6 +397,7 @@ export async function batchWriteWithAlerts( const filteredItems: AWS.DynamoDB.DocumentClient.PutItemInputAttributeMap[] = await checkMovement(items, previousItems); await batchWrite(filteredItems, failOnError); + await produceKafkaTopics(filteredItems.filter((i) => i.SK == 0) as any[]); } export async function batchWrite2WithAlerts( items: AWS.DynamoDB.DocumentClient.PutItemInputAttributeMap[], diff --git a/coins/src/scripts/defiCoins.ts b/coins/src/scripts/defiCoins.ts index bac4c32426..6dff2ff211 100644 --- a/coins/src/scripts/defiCoins.ts +++ b/coins/src/scripts/defiCoins.ts @@ -1,7 +1,6 @@ require("dotenv").config(); import { batchWriteWithAlerts, - batchWrite2WithAlerts, filterWritesWithLowConfidence, } from "../adapters/utils/database"; import { withTimeout } from "../../../defi/src/utils/shared/withTimeout"; @@ -53,9 +52,9 @@ async function storeDefiCoins() { true, ), ]); - await batchWrite2WithAlerts( - resultsWithoutDuplicates.slice(i, i + step), - ); + // await batchWrite2WithAlerts( + // resultsWithoutDuplicates.slice(i, i + step), + // ); } } catch (e) { console.error(`ERROR: ${adapterKey} adapter failed ${e}`); diff --git a/coins/src/utils/coins3/cli/consume.ts b/coins/src/utils/coins3/cli/consume.ts new file mode 100644 index 0000000000..a970a5d975 --- /dev/null +++ b/coins/src/utils/coins3/cli/consume.ts @@ -0,0 +1,25 @@ +import { topics } from "../jsonValidation"; +import { getConsumer } from "../kafka"; + +const run = async () => { + await Promise.all( + topics.map(async (topic) => { + const consumer = await getConsumer(topic); + await consumer.subscribe({ topic, fromBeginning: true }); + + await consumer.run({ + eachMessage: async ({ topic, partition, message }: any) => { + console.log({ + topic, + partition, + offset: message.offset, + value: message.value.toString(), + }); + }, + }); + }), + ); +}; + +run().catch(console.error); +// ts-node coins/src/utils/coins3/cli/consume.ts diff --git a/coins/src/utils/coins3/cli/newTopic.ts b/coins/src/utils/coins3/cli/newTopic.ts new file mode 100644 index 0000000000..d36f3912a5 --- /dev/null +++ b/coins/src/utils/coins3/cli/newTopic.ts @@ -0,0 +1,26 @@ +import { getKafka } from "../kafka"; + +const createTopic = async () => { + const kafka = getKafka(); + const admin = kafka.admin(); + await admin.connect(); + + // Create a topic + await admin.createTopics({ + topics: [ + { + topic: "current", // Replace with your topic name + numPartitions: 1, // Number of partitions + replicationFactor: 1, // Replication factor + }, + ], + }); + + console.log("Topic created successfully"); + + // Disconnect the admin client + await admin.disconnect(); +}; + +createTopic().catch(console.error); +// ts-node coins/src/utils/coins3/cli/newTopic.ts diff --git a/coins/src/utils/coins3/jsonValidation.ts b/coins/src/utils/coins3/jsonValidation.ts new file mode 100644 index 0000000000..c9a9c29629 --- /dev/null +++ b/coins/src/utils/coins3/jsonValidation.ts @@ -0,0 +1,61 @@ +import Ajv from "ajv"; + +type Schema = { + type: string; + properties: { [prop: string]: { type: string } }; + required: string[]; + additionalProperties: Boolean; +}; + +const schemas: { [topic: string]: Schema } = { + metadata: { + type: "object", + properties: { + decimals: { type: "integer" }, + symbol: { type: "string" }, + address: { type: "string" }, + pid: { type: "string" }, + chain: { type: "string" }, + source: { type: "string" }, + }, + required: ["decimals", "symbol", "address", "pid", "chain"], + additionalProperties: false, + }, + current: { + type: "object", + properties: { + pid: { type: "string" }, + price: { type: "number" }, + confidence: { type: "number" }, + source: { type: "string" }, + }, + required: ["pid", "price", "confidence", "source"], + additionalProperties: false, + }, + timeseries: { + type: "object", + properties: { + ts: { type: "integer" }, + pid: { type: "string" }, + price: { type: "number" }, + confidence: { type: "number" }, + source: { type: "string" }, + }, + required: ["ts", "pid", "price", "confidence", "source"], + additionalProperties: false, + }, +}; + +const ajv = new Ajv(); + +export function validate(data: object, topic: Topic) { + const comp = ajv.compile(schemas[topic]); + const valid = comp(data); + if (!valid) { + console.log(comp.errors); + throw new Error(`${topic} validation error`); + } +} + +export const topics: string[] = Object.keys(schemas); +export type Topic = keyof typeof schemas; diff --git a/coins/src/utils/coins3/kafka.ts b/coins/src/utils/coins3/kafka.ts new file mode 100644 index 0000000000..4c3bf15a3c --- /dev/null +++ b/coins/src/utils/coins3/kafka.ts @@ -0,0 +1,36 @@ +import { Consumer, Kafka, Producer } from "kafkajs"; + +let kafka: Kafka; +let producer: Producer; +const consumers: { [groupId: string]: Consumer } = {}; + +export function getKafka(): Kafka { + if (!kafka) { + const kafkaConfig = process.env.KAFKA_CLIENT_CONFIG; + if (!kafkaConfig) { + throw new Error("Missing KAFKA_CLIENT_CONFIG"); + } + const [brokers, username, password] = kafkaConfig.split("---"); + kafka = new Kafka({ + clientId: "my-app", + brokers: brokers.split(","), + ssl: { + rejectUnauthorized: false, // Allow self-signed certificates + }, + sasl: { mechanism: "scram-sha-256", username, password }, + }); + } + return kafka; +} +export async function getConsumer(groupId: any): Promise { + if (!groupId) throw new Error("Missing groupId"); + if (!consumers[groupId]) + consumers[groupId] = getKafka().consumer({ groupId }); + await consumers[groupId].connect(); + return consumers[groupId]; +} +export async function getProducer(): Promise { + if (!producer) producer = getKafka().producer(); + await producer.connect(); + return producer; +} diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts new file mode 100644 index 0000000000..f4fede16f9 --- /dev/null +++ b/coins/src/utils/coins3/produce.ts @@ -0,0 +1,76 @@ +import { Message, Producer } from "kafkajs"; +import { Topic, topics as allTopics, validate } from "./jsonValidation"; +import { getProducer } from "./kafka"; + +type Dynamo = { + SK: number; + PK: string; + adapter: string; + confidence: number; + price: number; + redirect?: string; + decimals?: number; + symbol?: string; + timestamp?: number; +}; + +async function produceTopics( + items: Dynamo[], + topic: Topic, + producer: Producer, +) { + const messages: Message[] = []; + + items.map((item) => { + const { symbol, decimals } = item; + if (!symbol || !decimals) return; + const message: object = convertToMessage(item, topic); + validate(message, topic); + messages.push({ value: JSON.stringify(message) }); + }); + + await producer.send({ topic: `${topic}`, messages }); +} +function convertToMessage(item: Dynamo, topic: Topic): object { + const { PK, symbol, decimals, price, confidence, timestamp, adapter } = item; + const { chain, address, pid } = splitPk(PK); + + switch (topic) { + case "metadata": + return { symbol, decimals, address, pid, chain, source: adapter }; + case "current": + return { pid, price, confidence, source: adapter }; + case "timeseries": + return { pid, price, confidence, source: adapter, ts: timestamp }; + default: + throw new Error(`Topic '${topic}' is not valid`); + } +} +function splitPk(pk: string): { chain: string; address: string; pid: string } { + const assetPrefix: string = "asset#"; + const coingeckoPrefix: string = "coingecko#"; + + if (pk.toLowerCase().startsWith(coingeckoPrefix)) { + const address = pk.substring(coingeckoPrefix.length).toLowerCase(); + return { + chain: "coingecko", + address, + pid: `coingecko:${address}`, + }; + } + + if (pk.startsWith(assetPrefix)) pk = pk.substring(assetPrefix.length); + const chain = pk.split(":")[0].toLowerCase(); + const address = pk.substring(pk.split(":")[0].length + 1).toLowerCase(); + return { chain, address, pid: `${chain}:${address}` }; +} + +export default async function produce( + items: Dynamo[], + topics: string[] = allTopics, +) { + const producer: Producer = await getProducer(); + await Promise.all( + topics.map((topic: Topic) => produceTopics([items[1]], topic, producer)), + ); +} From 99a56999baf2df99e1b60e62f06aba5be91b300b Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 26 Nov 2024 16:06:00 +0000 Subject: [PATCH 2/9] patch debug --- coins/src/utils/coins3/produce.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index f4fede16f9..a0523f54f5 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -71,6 +71,6 @@ export default async function produce( ) { const producer: Producer = await getProducer(); await Promise.all( - topics.map((topic: Topic) => produceTopics([items[1]], topic, producer)), + topics.map((topic: Topic) => produceTopics(items, topic, producer)), ); } From ada6ba7589d592ad0a9a406d4181f323ec6ccc02 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Wed, 27 Nov 2024 11:49:04 +0000 Subject: [PATCH 3/9] feedback --- coins/src/utils/coins3/produce.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index a0523f54f5..55b74b9aa7 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -23,7 +23,7 @@ async function produceTopics( items.map((item) => { const { symbol, decimals } = item; - if (!symbol || !decimals) return; + if (!symbol || decimals == null) return; const message: object = convertToMessage(item, topic); validate(message, topic); messages.push({ value: JSON.stringify(message) }); From cf1eea9363ddd38d7cc55340fb54b8d40efb0aa0 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Wed, 27 Nov 2024 15:22:30 +0000 Subject: [PATCH 4/9] bridges --- coins/src/adapters/bridges/index.ts | 72 ++++++++++------------------- coins/src/utils/coins3/produce.ts | 1 + 2 files changed, 25 insertions(+), 48 deletions(-) diff --git a/coins/src/adapters/bridges/index.ts b/coins/src/adapters/bridges/index.ts index aa6c79fcec..d42b3951ac 100644 --- a/coins/src/adapters/bridges/index.ts +++ b/coins/src/adapters/bridges/index.ts @@ -38,8 +38,8 @@ import fraxtal from "./fraxtal"; import symbiosis from "./symbiosis"; import fuel from "./fuel"; import zircuit from "./zircuit"; -import morph from './morph' -import aptos from './aptosFa' +import morph from "./morph"; +import aptos from "./aptosFa"; export type Token = | { @@ -108,11 +108,12 @@ export const bridges = [ fuel, zircuit, morph, - aptos + aptos, ].map(normalizeBridgeResults) as Bridge[]; import { batchGet, batchWrite } from "../../utils/shared/dynamodb"; import { getCurrentUnixTimestamp } from "../../utils/date"; +import produceKafkaTopics from "../../utils/coins3/produce"; const craftToPK = (to: string) => (to.includes("#") ? to : `asset#${to}`); @@ -146,7 +147,7 @@ async function _storeTokensOfBridge(bridge: Bridge) { const redirectMap: { [redirect: string]: string } = {}; const toRecords = await batchGet( - unlisted.map((t) => ({ + tokens.map((t) => ({ PK: craftToPK(t.to), SK: 0, })), @@ -173,9 +174,26 @@ async function _storeTokensOfBridge(bridge: Bridge) { if (record.price) toAddressToRecord[toPK] = record.PK; }); + const writes2: any[] = []; const writes: any[] = []; await Promise.all( - unlisted.map(async (token) => { + tokens.map(async (token) => { + // coins 3 + if ("decimals" in token && "symbol" in token) { + const toRecord = toRecords.find((r: any) => r.PK == token.to); + writes2.push({ + PK: `asset#${token.from}`, + timestamp: toRecord.timestamp, + decimals: token.decimals, + symbol: token.symbol, + confidence: 0.97, + adapter: "bridges", + price: toRecord.price, + }); + } + + if (!unlisted.includes(token)) return; + const finalPK = toAddressToRecord[craftToPK(token.to)]; if (finalPK === undefined) return; @@ -206,50 +224,8 @@ async function _storeTokensOfBridge(bridge: Bridge) { }), ); - // const writes2: Coin[] = []; - // const data = await readCoins2( - // tokens.map((t: Token) => ({ - // key: t.to.includes("coingecko#") ? t.to.replace("#", ":") : t.to, - // timestamp: getCurrentUnixTimestamp(), - // })), - // ); - // tokens.map(async (token) => { - // const to = token.to.includes("coingecko#") - // ? token.to.replace("#", ":") - // : token.to; - // if (!(to in data)) return; - // let PK: string = token.from.includes("coingecko#") - // ? token.from.replace("#", ":") - // : token.from.substring(token.from.indexOf("#") + 1); - // const chain = PK.split(":")[0]; - // let decimals: number, symbol: string; - // if ("getAllInfo" in token) { - // try { - // const newToken = await token.getAllInfo(); - // decimals = newToken.decimals; - // symbol = newToken.symbol; - // } catch (e) { - // console.log("Skipping token", PK, e); - // return; - // } - // } else { - // decimals = token.decimals; - // symbol = token.symbol; - // } - // writes2.push({ - // timestamp: getCurrentUnixTimestamp(), - // price: data[to].price, - // confidence: Math.min(data[to].confidence, 0.9), - // key: PK, - // chain, - // adapter: "bridges", - // symbol, - // decimals, - // }); - // }); - await batchWrite(writes, true); - // await batchWrite2(writes2, true, undefined, `bridge index ${i}`); + await produceKafkaTopics(writes2); return tokens; } export async function storeTokens() { diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index 55b74b9aa7..cb10ac2155 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -69,6 +69,7 @@ export default async function produce( items: Dynamo[], topics: string[] = allTopics, ) { + if (!items.length) return const producer: Producer = await getProducer(); await Promise.all( topics.map((topic: Topic) => produceTopics(items, topic, producer)), From c4eef3c198e6b292595b339aa38591862b8df5ef Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 3 Dec 2024 13:52:35 +0000 Subject: [PATCH 5/9] cg --- coins/src/adapters/bridges/index.ts | 23 +----- coins/src/scripts/coingecko.ts | 99 ++++++++++++++---------- coins/src/utils/coins3/jsonValidation.ts | 7 +- coins/src/utils/coins3/produce.ts | 50 +++++++++--- 4 files changed, 104 insertions(+), 75 deletions(-) diff --git a/coins/src/adapters/bridges/index.ts b/coins/src/adapters/bridges/index.ts index d42b3951ac..b7d153d070 100644 --- a/coins/src/adapters/bridges/index.ts +++ b/coins/src/adapters/bridges/index.ts @@ -147,7 +147,7 @@ async function _storeTokensOfBridge(bridge: Bridge) { const redirectMap: { [redirect: string]: string } = {}; const toRecords = await batchGet( - tokens.map((t) => ({ + unlisted.map((t) => ({ PK: craftToPK(t.to), SK: 0, })), @@ -174,26 +174,9 @@ async function _storeTokensOfBridge(bridge: Bridge) { if (record.price) toAddressToRecord[toPK] = record.PK; }); - const writes2: any[] = []; const writes: any[] = []; await Promise.all( - tokens.map(async (token) => { - // coins 3 - if ("decimals" in token && "symbol" in token) { - const toRecord = toRecords.find((r: any) => r.PK == token.to); - writes2.push({ - PK: `asset#${token.from}`, - timestamp: toRecord.timestamp, - decimals: token.decimals, - symbol: token.symbol, - confidence: 0.97, - adapter: "bridges", - price: toRecord.price, - }); - } - - if (!unlisted.includes(token)) return; - + unlisted.map(async (token) => { const finalPK = toAddressToRecord[craftToPK(token.to)]; if (finalPK === undefined) return; @@ -225,7 +208,7 @@ async function _storeTokensOfBridge(bridge: Bridge) { ); await batchWrite(writes, true); - await produceKafkaTopics(writes2); + await produceKafkaTopics(writes, ["coins-metadata"]); return tokens; } export async function storeTokens() { diff --git a/coins/src/scripts/coingecko.ts b/coins/src/scripts/coingecko.ts index c7c20a9ac9..38db37f393 100644 --- a/coins/src/scripts/coingecko.ts +++ b/coins/src/scripts/coingecko.ts @@ -5,7 +5,7 @@ import { Coin, CoinMetadata, iterateOverPlatforms, - staleMargin + staleMargin, } from "../utils/coingeckoPlatforms"; import sleep from "../utils/shared/sleep"; import { getCurrentUnixTimestamp, toUNIXTimestamp } from "../utils/date"; @@ -14,6 +14,7 @@ import { batchReadPostgres, getRedisConnection } from "../../coins2"; import chainToCoingeckoId from "../../../common/chainToCoingeckoId"; import { decimals, symbol } from "@defillama/sdk/build/erc20"; import { Connection, PublicKey } from "@solana/web3.js"; +import produceKafkaTopics, { Dynamo } from "../utils/coins3/produce"; enum COIN_TYPES { over100m = "over100m", @@ -63,32 +64,39 @@ interface IdToSymbol { } async function storeCoinData(coinData: Write[]) { - return batchWrite( - coinData - .map((c) => ({ - PK: c.PK, - SK: 0, - price: c.price, - mcap: c.mcap, - timestamp: c.timestamp, - symbol: c.symbol, - confidence: c.confidence, - })) - .filter((c: Write) => c.symbol != null), - false, - ); -} - -async function storeHistoricalCoinData(coinData: Write[]) { - return batchWrite( - coinData.map((c) => ({ - SK: c.SK, + const items = coinData + .map((c) => ({ PK: c.PK, + SK: 0, price: c.price, + mcap: c.mcap, + timestamp: c.timestamp, + symbol: c.symbol, confidence: c.confidence, - })), - false, - ); + })) + .filter((c: Write) => c.symbol != null); + await Promise.all([ + produceKafkaTopics( + items.map((i) => ({ adapter: "coingecko", ...i } as Dynamo)), + ), + batchWrite(items, false), + ]); +} + +async function storeHistoricalCoinData(coinData: Write[]) { + const items = coinData.map((c) => ({ + SK: c.SK, + PK: c.PK, + price: c.price, + confidence: c.confidence, + })); + await Promise.all([ + produceKafkaTopics( + items.map((i) => ({ adapter: "coingecko", ...i })) as Dynamo[], + ["coins-timeseries"], + ), + batchWrite(items, false), + ]); } let solanaTokens: Promise; @@ -278,6 +286,7 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { pricesAndMcaps[c.PK] = { price: c.price, mcap: c.mcap }; }); + const kafkaItems: any[] = []; await Promise.all( filteredCoins.map(async (coin) => iterateOverPlatforms( @@ -305,7 +314,7 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { return; } - await ddb.put({ + const item = { PK, SK: 0, created, @@ -313,14 +322,16 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { symbol, redirect: cgPK(coin.id), confidence: 0.99, - }); + }; + kafkaItems.push(item); + await ddb.put(item); }, coinPlatformData, ), ), ); - await deleteStaleKeysPromise; + await Promise.all([produceKafkaTopics(kafkaItems), deleteStaleKeysPromise]); } const HOUR = 3600; @@ -357,20 +368,24 @@ async function getAndStoreHourly( (c: any) => c.timestamp, ); - await batchWrite( - coinData.prices - .filter((price) => { - const ts = toUNIXTimestamp(price[0]); - return !writtenTimestamps[ts]; - }) - .map((price) => ({ - SK: toUNIXTimestamp(price[0]), - PK, - price: price[1], - confidence: 0.99, - })), - false, - ); + const items = coinData.prices + .filter((price) => { + const ts = toUNIXTimestamp(price[0]); + return !writtenTimestamps[ts]; + }) + .map((price) => ({ + SK: toUNIXTimestamp(price[0]), + PK, + price: price[1], + confidence: 0.99, + })); + + await Promise.all([ + produceKafkaTopics( + items.map((i) => ({ adapter: "coingecko", ...i }), ["coins-timeseries"]), + ), + batchWrite(items, false), + ]); } async function fetchCoingeckoData( @@ -427,7 +442,7 @@ async function triggerFetchCoingeckoData(hourly: boolean, coinType?: string) { let coins = (await fetch( `https://pro-api.coingecko.com/api/v3/coins/list?include_platform=true&x_cg_pro_api_key=${process.env.CG_KEY}`, ).then((r) => r.json())) as Coin[]; - + if (coinType || hourly) { const metadatas = await getCGCoinMetadatas( coins.map((coin) => coin.id), diff --git a/coins/src/utils/coins3/jsonValidation.ts b/coins/src/utils/coins3/jsonValidation.ts index c9a9c29629..ec46015b82 100644 --- a/coins/src/utils/coins3/jsonValidation.ts +++ b/coins/src/utils/coins3/jsonValidation.ts @@ -8,7 +8,7 @@ type Schema = { }; const schemas: { [topic: string]: Schema } = { - metadata: { + "coins-metadata": { type: "object", properties: { decimals: { type: "integer" }, @@ -17,11 +17,12 @@ const schemas: { [topic: string]: Schema } = { pid: { type: "string" }, chain: { type: "string" }, source: { type: "string" }, + redirect: { type: "string" }, }, required: ["decimals", "symbol", "address", "pid", "chain"], additionalProperties: false, }, - current: { + "coins-current": { type: "object", properties: { pid: { type: "string" }, @@ -32,7 +33,7 @@ const schemas: { [topic: string]: Schema } = { required: ["pid", "price", "confidence", "source"], additionalProperties: false, }, - timeseries: { + "coins-timeseries": { type: "object", properties: { ts: { type: "integer" }, diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index cb10ac2155..a74791c432 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -2,7 +2,7 @@ import { Message, Producer } from "kafkajs"; import { Topic, topics as allTopics, validate } from "./jsonValidation"; import { getProducer } from "./kafka"; -type Dynamo = { +export type Dynamo = { SK: number; PK: string; adapter: string; @@ -12,6 +12,7 @@ type Dynamo = { decimals?: number; symbol?: string; timestamp?: number; + created?: number; }; async function produceTopics( @@ -22,7 +23,8 @@ async function produceTopics( const messages: Message[] = []; items.map((item) => { - const { symbol, decimals } = item; + const { symbol, decimals, price } = item; + if (topic != "coins-metadata" && !price) return; if (!symbol || decimals == null) return; const message: object = convertToMessage(item, topic); validate(message, topic); @@ -32,16 +34,40 @@ async function produceTopics( await producer.send({ topic: `${topic}`, messages }); } function convertToMessage(item: Dynamo, topic: Topic): object { - const { PK, symbol, decimals, price, confidence, timestamp, adapter } = item; + const { + PK, + symbol, + decimals, + price, + confidence, + timestamp, + adapter, + redirect, + created, + } = item; const { chain, address, pid } = splitPk(PK); switch (topic) { - case "metadata": - return { symbol, decimals, address, pid, chain, source: adapter }; - case "current": + case "coins-metadata": + return { + symbol, + decimals, + address, + pid, + chain, + source: adapter, + redirect, + }; + case "coins-current": return { pid, price, confidence, source: adapter }; - case "timeseries": - return { pid, price, confidence, source: adapter, ts: timestamp }; + case "coins-timeseries": + return { + pid, + price, + confidence, + source: adapter, + ts: timestamp ?? created, + }; default: throw new Error(`Topic '${topic}' is not valid`); } @@ -67,9 +93,13 @@ function splitPk(pk: string): { chain: string; address: string; pid: string } { export default async function produce( items: Dynamo[], - topics: string[] = allTopics, + topics: Topic[] = allTopics, ) { - if (!items.length) return + const invalidTopic = topics.find((t: any) => { + !allTopics.includes(t); + }); + if (invalidTopic) throw new Error(`invalid topic: ${invalidTopic}`); + if (!items.length) return; const producer: Producer = await getProducer(); await Promise.all( topics.map((topic: Topic) => produceTopics(items, topic, producer)), From 76afd1275c6be6483f22fa24c7f23968ec30cc43 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 3 Dec 2024 13:58:32 +0000 Subject: [PATCH 6/9] no need for created --- coins/src/utils/coins3/produce.ts | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index a74791c432..0e9f9ad7a2 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -12,7 +12,6 @@ export type Dynamo = { decimals?: number; symbol?: string; timestamp?: number; - created?: number; }; async function produceTopics( @@ -43,7 +42,6 @@ function convertToMessage(item: Dynamo, topic: Topic): object { timestamp, adapter, redirect, - created, } = item; const { chain, address, pid } = splitPk(PK); @@ -61,13 +59,7 @@ function convertToMessage(item: Dynamo, topic: Topic): object { case "coins-current": return { pid, price, confidence, source: adapter }; case "coins-timeseries": - return { - pid, - price, - confidence, - source: adapter, - ts: timestamp ?? created, - }; + return { pid, price, confidence, source: adapter, ts: timestamp }; default: throw new Error(`Topic '${topic}' is not valid`); } From 81b6806fb77490eb21944be933ca63bb162707a4 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 3 Dec 2024 15:10:13 +0000 Subject: [PATCH 7/9] bridges patch --- coins/src/utils/coins3/produce.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index 0e9f9ad7a2..e888d6a6a2 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -43,18 +43,20 @@ function convertToMessage(item: Dynamo, topic: Topic): object { adapter, redirect, } = item; + const { chain, address, pid } = splitPk(PK); + const redirectPid = redirect ? splitPk(redirect).pid : undefined; switch (topic) { case "coins-metadata": return { symbol, - decimals, + decimals: Number(decimals), address, pid, chain, source: adapter, - redirect, + redirect: redirectPid, }; case "coins-current": return { pid, price, confidence, source: adapter }; From 734f4c564a32ef88b15947216de247414bb2e1c7 Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Tue, 3 Dec 2024 15:56:22 +0000 Subject: [PATCH 8/9] cg patches --- coins/src/scripts/coingecko.ts | 21 +++++++++++++++++---- coins/src/utils/coins3/jsonValidation.ts | 2 ++ coins/src/utils/coins3/produce.ts | 10 ++++++---- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/coins/src/scripts/coingecko.ts b/coins/src/scripts/coingecko.ts index 38db37f393..f63774b8b5 100644 --- a/coins/src/scripts/coingecko.ts +++ b/coins/src/scripts/coingecko.ts @@ -77,7 +77,7 @@ async function storeCoinData(coinData: Write[]) { .filter((c: Write) => c.symbol != null); await Promise.all([ produceKafkaTopics( - items.map((i) => ({ adapter: "coingecko", ...i } as Dynamo)), + items.map((i) => ({ adapter: "coingecko", decimals: 0, ...i } as Dynamo)), ), batchWrite(items, false), ]); @@ -92,7 +92,11 @@ async function storeHistoricalCoinData(coinData: Write[]) { })); await Promise.all([ produceKafkaTopics( - items.map((i) => ({ adapter: "coingecko", ...i })) as Dynamo[], + items.map((i) => ({ + adapter: "coingecko", + timestamp: i.SK, + ...i, + })) as Dynamo[], ["coins-timeseries"], ), batchWrite(items, false), @@ -331,7 +335,13 @@ async function getAndStoreCoins(coins: Coin[], rejected: Coin[]) { ), ); - await Promise.all([produceKafkaTopics(kafkaItems), deleteStaleKeysPromise]); + await Promise.all([ + produceKafkaTopics( + kafkaItems.map((i) => ({ adapter: "coingecko", ...i })), + ["coins-metadata"], + ), + deleteStaleKeysPromise, + ]); } const HOUR = 3600; @@ -382,7 +392,10 @@ async function getAndStoreHourly( await Promise.all([ produceKafkaTopics( - items.map((i) => ({ adapter: "coingecko", ...i }), ["coins-timeseries"]), + items.map( + (i) => ({ adapter: "coingecko", timestamp: i.SK, ...i }), + ["coins-timeseries"], + ), ), batchWrite(items, false), ]); diff --git a/coins/src/utils/coins3/jsonValidation.ts b/coins/src/utils/coins3/jsonValidation.ts index ec46015b82..1186dc0dd4 100644 --- a/coins/src/utils/coins3/jsonValidation.ts +++ b/coins/src/utils/coins3/jsonValidation.ts @@ -29,6 +29,7 @@ const schemas: { [topic: string]: Schema } = { price: { type: "number" }, confidence: { type: "number" }, source: { type: "string" }, + mcap: { type: "number" }, }, required: ["pid", "price", "confidence", "source"], additionalProperties: false, @@ -41,6 +42,7 @@ const schemas: { [topic: string]: Schema } = { price: { type: "number" }, confidence: { type: "number" }, source: { type: "string" }, + mcap: { type: "number" }, }, required: ["ts", "pid", "price", "confidence", "source"], additionalProperties: false, diff --git a/coins/src/utils/coins3/produce.ts b/coins/src/utils/coins3/produce.ts index e888d6a6a2..5b6b6aa09e 100644 --- a/coins/src/utils/coins3/produce.ts +++ b/coins/src/utils/coins3/produce.ts @@ -12,6 +12,7 @@ export type Dynamo = { decimals?: number; symbol?: string; timestamp?: number; + mcap?: number; }; async function produceTopics( @@ -24,7 +25,7 @@ async function produceTopics( items.map((item) => { const { symbol, decimals, price } = item; if (topic != "coins-metadata" && !price) return; - if (!symbol || decimals == null) return; + if (topic == "coins-metadata" && (!symbol || decimals == null)) return; const message: object = convertToMessage(item, topic); validate(message, topic); messages.push({ value: JSON.stringify(message) }); @@ -42,6 +43,7 @@ function convertToMessage(item: Dynamo, topic: Topic): object { timestamp, adapter, redirect, + mcap, } = item; const { chain, address, pid } = splitPk(PK); @@ -59,9 +61,9 @@ function convertToMessage(item: Dynamo, topic: Topic): object { redirect: redirectPid, }; case "coins-current": - return { pid, price, confidence, source: adapter }; + return { pid, price, confidence, source: adapter, mcap }; case "coins-timeseries": - return { pid, price, confidence, source: adapter, ts: timestamp }; + return { pid, price, confidence, source: adapter, ts: timestamp, mcap }; default: throw new Error(`Topic '${topic}' is not valid`); } @@ -89,11 +91,11 @@ export default async function produce( items: Dynamo[], topics: Topic[] = allTopics, ) { + if (!items.length) return; const invalidTopic = topics.find((t: any) => { !allTopics.includes(t); }); if (invalidTopic) throw new Error(`invalid topic: ${invalidTopic}`); - if (!items.length) return; const producer: Producer = await getProducer(); await Promise.all( topics.map((topic: Topic) => produceTopics(items, topic, producer)), From 6b1e28587f749a6bc66269c42713417ffa5f672f Mon Sep 17 00:00:00 2001 From: waynebruce0x Date: Mon, 9 Dec 2024 14:40:23 +0000 Subject: [PATCH 9/9] comments --- coins/src/utils/coins3/jsonValidation.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/coins/src/utils/coins3/jsonValidation.ts b/coins/src/utils/coins3/jsonValidation.ts index 1186dc0dd4..eba8ea37e6 100644 --- a/coins/src/utils/coins3/jsonValidation.ts +++ b/coins/src/utils/coins3/jsonValidation.ts @@ -19,7 +19,7 @@ const schemas: { [topic: string]: Schema } = { source: { type: "string" }, redirect: { type: "string" }, }, - required: ["decimals", "symbol", "address", "pid", "chain"], + required: ["decimals", "symbol", "address", "pid"], additionalProperties: false, }, "coins-current": { @@ -31,7 +31,7 @@ const schemas: { [topic: string]: Schema } = { source: { type: "string" }, mcap: { type: "number" }, }, - required: ["pid", "price", "confidence", "source"], + required: ["pid", "price"], additionalProperties: false, }, "coins-timeseries": { @@ -44,7 +44,7 @@ const schemas: { [topic: string]: Schema } = { source: { type: "string" }, mcap: { type: "number" }, }, - required: ["ts", "pid", "price", "confidence", "source"], + required: ["ts", "pid", "price"], additionalProperties: false, }, };