diff --git a/.env.example b/.env.example index 397a0336..743de715 100644 --- a/.env.example +++ b/.env.example @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer # METIS_ANDROMEDA_RPC_URL #COINGECKO_API_KEY= -#IPFS_GATEWAY= +#IPFS_GATEWAYs=[] +#WHITELISTED_ADDRESSES=["0x123..","0x456.."] # optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro #GRAPHILE_LICENSE diff --git a/docs/reindexing.md b/docs/reindexing.md index 00f6fc67..8ea7689e 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,9 +12,15 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` in Development +### Dropping Schemas in Development -- During development, you can use the `--drop-db` flag to ensure the indexer always deletes the existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. +- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. + +- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch. + +- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. + +- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch. ### Important Notes diff --git a/indexer-compose.yml b/indexer-compose.yml index d2cf9758..61d81c33 100644 --- a/indexer-compose.yml +++ b/indexer-compose.yml @@ -20,27 +20,27 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} - SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} - POLYGON_MUMBAI_RPC_URL: ${POLYGON_MUMBAI_RPC_URL} - AVALANCHE_RPC_URL: ${AVALANCHE_RPC_URL} - OPTIMISM_RPC_URL: ${OPTIMISM_RPC_URL} + SEPOLIA_RPC_URLS: ${SEPOLIA_RPC_URLS} + POLYGON_MUMBAI_RPC_URLS: ${POLYGON_MUMBAI_RPC_URLS} + AVALANCHE_RPC_URLS: ${AVALANCHE_RPC_URLS} + OPTIMISM_RPC_URLS: ${OPTIMISM_RPC_URLS} SENTRY_DSN: ${SENTRY_DSN} PGN_TESTNET_RPC_URL: ${PGN_TESTNET_RPC_URL} - ARBITRUM_GOERLI_RPC_URL: ${ARBITRUM_GOERLI_RPC_URL} - FANTOM_RPC_URL: ${FANTOM_RPC_URL} - BASE_RPC_URL: ${BASE_RPC_URL} - PGN_RPC_URL: ${PGN_RPC_URL} - GOERLI_RPC_URL: ${GOERLI_RPC_URL} - AVALANCHE_FUJI_RPC_URL: ${AVALANCHE_FUJI_RPC_URL} - ARBITRUM_RPC_URL: ${ARBITRUM_RPC_URL} - SEI_MAINNET_RPC_URL: ${SEI_MAINNET_RPC_URL} - MAINNET_RPC_URL: ${MAINNET_RPC_URL} - POLYGON_RPC_URL: ${POLYGON_RPC_URL} - METIS_ANDROMEDA_RPC_URL: ${METIS_ANDROMEDA_RPC_URL} - SCROLL_SEPOLIA_RPC_URL: ${SCROLL_SEPOLIA_RPC_URL} + ARBITRUM_GOERLI_RPC_URLS: ${ARBITRUM_GOERLI_RPC_URLS} + FANTOM_RPC_URLS: ${FANTOM_RPC_URLS} + BASE_RPC_URLS: ${BASE_RPC_URLS} + PGN_RPC_URLS: ${PGN_RPC_URLS} + GOERLI_RPC_URLS: ${GOERLI_RPC_URLS} + AVALANCHE_FUJI_RPC_URLS: ${AVALANCHE_FUJI_RPC_URLS} + ARBITRUM_RPC_URLS: ${ARBITRUM_RPC_URLS} + SEI_MAINNET_RPC_URLS: ${SEI_MAINNET_RPC_URLS} + MAINNET_RPC_URLS: ${MAINNET_RPC_URLS} + POLYGON_RPC_URLS: ${POLYGON_RPC_URLS} + METIS_ANDROMEDA_RPC_URLS: ${METIS_ANDROMEDA_RPC_URLS} + SCROLL_SEPOLIA_RPC_URLS: ${SCROLL_SEPOLIA_RPC_URLS} DATABASE_URL: "postgresql://postgres:postgres@db:5432/grants_stack_indexer" index: @@ -62,7 +62,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} diff --git a/src/config.ts b/src/config.ts index 8822b1a4..ceff9b7e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,9 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "81"; +const CHAIN_DATA_VERSION = "83"; +const IPFS_DATA_VERSION = "1"; +const PRICE_DATA_VERSION = "1"; export type Token = { code: string; @@ -38,7 +40,7 @@ export type Subscription = { }; export type Chain = { - rpc: string; + rpcs: string[]; name: string; id: ChainId; pricesFromTimestamp: number; @@ -47,15 +49,15 @@ export type Chain = { maxGetLogsRange?: number; }; -const rpcUrl = z.string().url(); +const rpcUrl = z.array(z.string().url()); const CHAINS: Chain[] = [ { id: 1, name: "mainnet", - rpc: rpcUrl - .default("https://mainnet.infura.io/v3/") - .parse(process.env.MAINNET_RPC_URL), + rpcs: rpcUrl + .default(["https://mainnet.infura.io/v3/"]) + .parse(JSON.parse(process.env.MAINNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2022, 11, 1, 0, 0, 0), tokens: [ { @@ -150,9 +152,9 @@ const CHAINS: Chain[] = [ { id: 10, name: "optimism", - rpc: rpcUrl - .default("https://optimism-rpc.publicnode.com") - .parse(process.env.OPTIMISM_RPC_URL), + rpcs: rpcUrl + .default(["https://optimism-rpc.publicnode.com"]) + .parse(JSON.parse(process.env.OPTIMISM_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2022, 11, 1, 0, 0, 0), tokens: [ { @@ -266,9 +268,9 @@ const CHAINS: Chain[] = [ { id: 11155111, name: "sepolia", - rpc: rpcUrl - .default("https://ethereum-sepolia.publicnode.com") - .parse(process.env.SEPOLIA_RPC_URL), + rpcs: rpcUrl + .default(["https://ethereum-sepolia.publicnode.com"]) + .parse(JSON.parse(process.env.SEPOLIA_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 11, 1, 0, 0, 0), tokens: [ { @@ -370,9 +372,9 @@ const CHAINS: Chain[] = [ { id: 250, name: "fantom", - rpc: rpcUrl - .default("https://rpcapi.fantom.network") - .parse(process.env.FANTOM_RPC_URL), + rpcs: rpcUrl + .default(["https://rpcapi.fantom.network"]) + .parse(JSON.parse(process.env.FANTOM_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2022, 11, 1, 0, 0, 0), tokens: [ { @@ -469,9 +471,9 @@ const CHAINS: Chain[] = [ { id: 58008, name: "pgn-testnet", - rpc: rpcUrl - .default("https://sepolia.publicgoods.network") - .parse(process.env.PGN_TESTNET_RPC_URL), + rpcs: rpcUrl + .default(["https://sepolia.publicgoods.network"]) + .parse(JSON.parse(process.env.PGN_TESTNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 5, 2, 0, 0, 0), tokens: [ { @@ -522,9 +524,9 @@ const CHAINS: Chain[] = [ { id: 424, name: "pgn-mainnet", - rpc: rpcUrl - .default("https://rpc.publicgoods.network") - .parse(process.env.PGN_RPC_URL), + rpcs: rpcUrl + .default(["https://rpc.publicgoods.network"]) + .parse(JSON.parse(process.env.PGN_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 5, 2, 0, 0, 0), tokens: [ { @@ -600,9 +602,9 @@ const CHAINS: Chain[] = [ { id: 42161, name: "arbitrum", - rpc: rpcUrl - .default("https://arb-mainnet.g.alchemy.com/v2/") - .parse(process.env.ARBITRUM_RPC_URL), + rpcs: rpcUrl + .default(["https://arb-mainnet.g.alchemy.com/v2/"]) + .parse(JSON.parse(process.env.ARBITRUM_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 7, 1, 0, 0, 0), tokens: [ { @@ -713,9 +715,9 @@ const CHAINS: Chain[] = [ { id: 80001, name: "polygon-mumbai", - rpc: rpcUrl - .default("https://rpc-mumbai.maticvigil.com/") - .parse(process.env.POLYGON_MUMBAI_RPC_URL), + rpcs: rpcUrl + .default(["https://rpc-mumbai.maticvigil.com/"]) + .parse(JSON.parse(process.env.POLYGON_MUMBAI_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 8, 19, 0, 0, 0), tokens: [ { @@ -793,9 +795,9 @@ const CHAINS: Chain[] = [ { id: 137, name: "polygon", - rpc: rpcUrl - .default("https://polygon-rpc.com") - .parse(process.env.POLYGON_RPC_URL), + rpcs: rpcUrl + .default(["https://polygon-rpc.com"]) + .parse(JSON.parse(process.env.POLYGON_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 8, 19, 0, 0, 0), tokens: [ { @@ -891,9 +893,9 @@ const CHAINS: Chain[] = [ { id: 8453, name: "base", - rpc: rpcUrl - .default("https://mainnet.base.org/") - .parse(process.env.BASE_RPC_URL), + rpcs: rpcUrl + .default(["https://mainnet.base.org/"]) + .parse(JSON.parse(process.env.BASE_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 12, 1, 0, 0, 0), tokens: [ { @@ -971,9 +973,9 @@ const CHAINS: Chain[] = [ { id: 324, name: "zksync-era-mainnet", - rpc: rpcUrl - .default("https://mainnet.era.zksync.io") - .parse(process.env.ZKSYNC_RPC_URL), + rpcs: rpcUrl + .default(["https://mainnet.era.zksync.io"]) + .parse(JSON.parse(process.env.ZKSYNC_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 12, 1, 0, 0, 0), tokens: [ { @@ -1086,9 +1088,9 @@ const CHAINS: Chain[] = [ { id: 300, name: "zksync-era-testnet", - rpc: rpcUrl - .default("https://sepolia.era.zksync.dev") - .parse(process.env.ZKSYNC_TESTNET_RPC_URL), + rpcs: rpcUrl + .default(["https://sepolia.era.zksync.dev"]) + .parse(JSON.parse(process.env.ZKSYNC_TESTNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 12, 1, 0, 0, 0), tokens: [ { @@ -1126,9 +1128,9 @@ const CHAINS: Chain[] = [ { id: 43114, name: "avalanche", - rpc: rpcUrl - .default("https://rpc.ankr.com/avalanche") - .parse(process.env.AVALANCHE_RPC_URL), + rpcs: rpcUrl + .default(["https://rpc.ankr.com/avalanche"]) + .parse(JSON.parse(process.env.AVALANCHE_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 8, 19, 0, 0, 0), tokens: [ { @@ -1206,9 +1208,9 @@ const CHAINS: Chain[] = [ { id: 43113, name: "avalanche-fuji", - rpc: rpcUrl - .default("https://avalanche-fuji-c-chain.publicnode.com") - .parse(process.env.AVALANCHE_FUJI_RPC_URL), + rpcs: rpcUrl + .default(["https://avalanche-fuji-c-chain.publicnode.com"]) + .parse(JSON.parse(process.env.AVALANCHE_FUJI_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2023, 8, 19, 0, 0, 0), tokens: [ { @@ -1276,9 +1278,9 @@ const CHAINS: Chain[] = [ { id: 534351, name: "scroll-sepolia", - rpc: rpcUrl - .default("https://sepolia-rpc.scroll.io") - .parse(process.env.SCROLL_SEPOLIA_RPC_URL), + rpcs: rpcUrl + .default(["https://sepolia-rpc.scroll.io"]) + .parse(JSON.parse(process.env.SCROLL_SEPOLIA_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), maxGetLogsRange: 2000, tokens: [ @@ -1347,9 +1349,9 @@ const CHAINS: Chain[] = [ { id: 534352, name: "scroll", - rpc: rpcUrl - .default("https://rpc.scroll.io") - .parse(process.env.SCROLL_RPC_URL), + rpcs: rpcUrl + .default(["https://rpc.scroll.io"]) + .parse(JSON.parse(process.env.SCROLL_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), maxGetLogsRange: 9000, tokens: [ @@ -1433,9 +1435,9 @@ const CHAINS: Chain[] = [ { id: 713715, name: "sei-devnet", - rpc: rpcUrl - .default("https://evm-rpc-arctic-1.sei-apis.com") - .parse(process.env.SEI_DEVNET_RPC_URL), + rpcs: rpcUrl + .default(["https://evm-rpc-arctic-1.sei-apis.com"]) + .parse(JSON.parse(process.env.SEI_DEVNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1483,9 +1485,9 @@ const CHAINS: Chain[] = [ { id: 1329, name: "sei-mainnet", - rpc: rpcUrl - .default("https://evm-rpc.sei-apis.com") - .parse(process.env.SEI_MAINNET_RPC_URL), + rpcs: rpcUrl + .default(["https://evm-rpc.sei-apis.com"]) + .parse(JSON.parse(process.env.SEI_MAINNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), maxGetLogsRange: 10000, tokens: [ @@ -1534,9 +1536,9 @@ const CHAINS: Chain[] = [ { id: 42, name: "lukso-mainnet", - rpc: rpcUrl - .default("https://42.rpc.thirdweb.com") - .parse(process.env.LUKSO_MAINNET_RPC_URL), + rpcs: rpcUrl + .default(["https://42.rpc.thirdweb.com"]) + .parse(JSON.parse(process.env.LUKSO_MAINNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1584,9 +1586,9 @@ const CHAINS: Chain[] = [ { id: 4201, name: "lukso-testnet", - rpc: rpcUrl - .default("https://4201.rpc.thirdweb.com") - .parse(process.env.LUKSO_TESTNET_RPC_URL), + rpcs: rpcUrl + .default(["https://4201.rpc.thirdweb.com"]) + .parse(JSON.parse(process.env.LUKSO_TESTNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1625,9 +1627,9 @@ const CHAINS: Chain[] = [ { id: 42220, name: "celo-mainnet", - rpc: rpcUrl - .default("https://forno.celo.org") - .parse(process.env.CELO_MAINNET_RPC_URL), + rpcs: rpcUrl + .default(["https://forno.celo.org"]) + .parse(JSON.parse(process.env.CELO_MAINNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1684,9 +1686,9 @@ const CHAINS: Chain[] = [ { id: 44787, name: "celo-testnet", - rpc: rpcUrl - .default("https://alfajores-forno.celo-testnet.org") - .parse(process.env.CELO_TESTNET_RPC_URL), + rpcs: rpcUrl + .default(["https://alfajores-forno.celo-testnet.org"]) + .parse(JSON.parse(process.env.CELO_TESTNET_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1725,9 +1727,9 @@ const CHAINS: Chain[] = [ { id: 1088, name: "metisAndromeda", - rpc: rpcUrl - .default("https://andromeda.metis.io/?owner=1088") - .parse(process.env.METIS_ANDROMEDA_RPC_URL), + rpcs: rpcUrl + .default(["https://andromeda.metis.io/?owner=1088"]) + .parse(JSON.parse(process.env.METIS_ANDROMEDA_RPC_URLS!)), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), tokens: [ { @@ -1818,7 +1820,7 @@ export type Config = { httpServerWaitForSync: boolean; httpServerEnabled: boolean; indexerEnabled: boolean; - ipfsGateway: string; + ipfsGateways: string[]; coingeckoApiKey: string | null; coingeckoApiUrl: string; chains: Chain[]; @@ -1829,11 +1831,18 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; + ipfsDataVersion: string; + ipfsDatabaseSchemaName: string; + priceDataVersion: string; + priceDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; enableResourceMonitor: boolean; dropDb: boolean; + dropChainDb: boolean; + dropIpfsDb: boolean; + dropPriceDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -1901,6 +1910,15 @@ export function getConfig(): Config { "drop-db": { type: "boolean", }, + "drop-chain-db": { + type: "boolean", + }, + "drop-ipfs-db": { + type: "boolean", + }, + "drop-price-db": { + type: "boolean", + }, "rm-cache": { type: "boolean", }, @@ -1968,10 +1986,11 @@ export function getConfig(): Config { const runOnce = z.boolean().default(false).parse(args["run-once"]); - const ipfsGateway = z + const ipfsGateways = z .string() - .default("https://ipfs.io") - .parse(process.env.IPFS_GATEWAY); + .array() + .default(["https://ipfs.io"]) + .parse(JSON.parse(process.env.IPFS_GATEWAYS!)); const sentryDsn = z .union([z.string(), z.null()]) @@ -1988,7 +2007,16 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; + const ipfsDataVersion = IPFS_DATA_VERSION; + const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + + const priceDataVersion = PRICE_DATA_VERSION; + const priceDatabaseSchemaName = `price_data_${priceDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); + const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); + const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); + const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2028,7 +2056,7 @@ export function getConfig(): Config { cacheDir, logLevel, runOnce, - ipfsGateway, + ipfsGateways, passportScorerId, apiHttpPort, pinoPretty, @@ -2037,9 +2065,16 @@ export function getConfig(): Config { databaseUrl, readOnlyDatabaseUrl, dropDb, + dropChainDb, + dropIpfsDb, + dropPriceDb, removeCache, dataVersion, databaseSchemaName, + ipfsDataVersion, + ipfsDatabaseSchemaName, + priceDataVersion, + priceDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 5eb7f12e..c53d2a14 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,6 +16,7 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, + NewIpfsData, } from "./schema.js"; export type DataChange = @@ -140,4 +141,8 @@ export type DataChange = | { type: "InsertApplicationPayout"; payout: NewApplicationPayout; + } + | { + type: "InsertIpfsData"; + ipfs: NewIpfsData; }; diff --git a/src/database/index.ts b/src/database/index.ts index 2b749ff2..2fed6a0f 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,8 +14,9 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, + IpfsDataTable, } from "./schema.js"; -import { migrate } from "./migrate.js"; +import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -37,6 +38,7 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; + ipfsData: IpfsDataTable; } type KyselyDb = Kysely; @@ -53,13 +55,17 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly databaseSchemaName: string; + readonly chainDataSchemaName: string; + readonly ipfsDataSchemaName: string; + readonly priceDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - schemaName: string; + chainDataSchemaName: string; + ipfsDataSchemaName: string; + priceDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -72,10 +78,12 @@ export class Database { plugins: [new CamelCasePlugin()], }); - this.#db = this.#db.withSchema(options.schemaName); + // Initialize schema names + this.chainDataSchemaName = options.chainDataSchemaName; + this.ipfsDataSchemaName = options.ipfsDataSchemaName; + this.priceDataSchemaName = options.priceDataSchemaName; this.#logger = options.logger; - this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -87,26 +95,72 @@ export class Database { async acquireWriteLock() { const client = await this.#connectionPool.connect(); - // generate lock id based on schema - const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { - return acc + char.charCodeAt(0); - }, 0); - - try { + // Helper function to generate lock ID based on schema name + const generateLockId = (schemaName: string): number => { + return schemaName.split("").reduce((acc, char) => { + return acc + char.charCodeAt(0); + }, 0); + }; + + // Helper function to forcibly acquire a lock for a specific schema + const forciblyAcquireLockForSchema = async (lockId: number) => { + // This will block until the lock is acquired + await client.query(`SELECT pg_advisory_lock(${lockId})`); + }; + + // Helper function to acquire a lock for a specific schema + const acquireLockForSchema = async (lockId: number) => { const result = await client.query( `SELECT pg_try_advisory_lock(${lockId}) as lock` ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - if (result.rows[0].lock === true) { - return { - release: async () => { - await client.query(`SELECT pg_advisory_unlock(${lockId})`); - client.release(); - }, - client, - }; - } + return result.rows[0].lock === true; + }; + + // Helper function to release a lock for a specific schema + const releaseLockForSchema = async (lockId: number) => { + await client.query(`SELECT pg_advisory_unlock(${lockId})`); + }; + + // Acquire locks for all schemas + const chainDataLockId = generateLockId(this.chainDataSchemaName); + const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + const priceDataLockId = generateLockId(this.priceDataSchemaName); + + // Track acquired locks + const acquiredLocks: number[] = []; + + try { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + if (chainDataLockAcquired) acquiredLocks.push(chainDataLockId); + + // const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + // if (ipfsDataLockAcquired) acquiredLocks.push(ipfsDataLockId); + // const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + // if (priceDataLockAcquired) acquiredLocks.push(priceDataLockId); + + // NOTE: We are forcibly acquiring locks for IPFS and Price data schemas + await forciblyAcquireLockForSchema(priceDataLockId); + await forciblyAcquireLockForSchema(priceDataLockId); + acquiredLocks.push(ipfsDataLockId); + acquiredLocks.push(priceDataLockId); + + // this.#logger.info(`Lock Status => + // Chain Data (${chainDataLockId}): ${chainDataLockAcquired}, + // IPFS Data (${ipfsDataLockId}): ${ipfsDataLockAcquired}, + // Price Data (${priceDataLockId}): ${priceDataLockAcquired} + // `); + + return { + release: async () => { + for (const lockId of acquiredLocks) { + await releaseLockForSchema(lockId); + } + client.release(); + }, + client, + acquiredLocks, + }; } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); } @@ -132,12 +186,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; + const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."rounds" AS r + UPDATE "${this.chainDataSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -160,7 +214,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."applications" AS a + UPDATE "${this.chainDataSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -223,38 +277,86 @@ export class Database { } } - async dropSchemaIfExists() { + async dropChainDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.chainDataSchemaName) + .dropSchema(this.chainDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropIpfsDataSchemaIfExists() { await this.#db.schema - .dropSchema(this.databaseSchemaName) + .withSchema(this.ipfsDataSchemaName) + .dropSchema(this.ipfsDataSchemaName) .ifExists() .cascade() .execute(); } - async createSchemaIfNotExists(logger: Logger) { + async dropPriceDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.priceDataSchemaName) + .dropSchema(this.priceDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropAllSchemaIfExists() { + await this.dropChainDataSchemaIfExists(); + await this.dropIpfsDataSchemaIfExists(); + await this.dropPriceDataSchemaIfExists(); + } + + async createSchemaIfNotExists( + schemaName: string, + migrateFn: (tx: any, schemaName: string) => Promise, + logger: Logger + ) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${this.databaseSchemaName} - )`.execute(this.#db); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${schemaName} + )`.execute(this.#db.withSchema(schemaName)); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, + msg: `schema "${schemaName}" exists, skipping creation`, }); - return; } logger.info({ - msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, + msg: `schema "${schemaName}" does not exist, creating schema`, }); - await this.#db.transaction().execute(async (tx) => { - await tx.schema.createSchema(this.databaseSchemaName).execute(); + await this.#db + .withSchema(schemaName) + .transaction() + .execute(async (tx) => { + await tx.schema.createSchema(schemaName).execute(); + await migrateFn(tx, schemaName); + }); + } - await migrate(tx, this.databaseSchemaName); - }); + async createAllSchemas(logger: Logger) { + await this.createSchemaIfNotExists( + this.chainDataSchemaName, + migrate, + logger + ); + await this.createSchemaIfNotExists( + this.ipfsDataSchemaName, + migrateDataFetcher, + logger + ); + await this.createSchemaIfNotExists( + this.priceDataSchemaName, + migratePriceFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -267,6 +369,7 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -275,6 +378,7 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -283,6 +387,7 @@ export class Database { case "InsertPendingRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -291,6 +396,7 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -298,12 +404,17 @@ export class Database { } case "InsertProject": { - await this.#db.insertInto("projects").values(change.project).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("projects") + .values(change.project) + .execute(); break; } case "UpdateProject": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -314,6 +425,7 @@ export class Database { case "InsertProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -322,6 +434,7 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -332,6 +445,7 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -342,12 +456,17 @@ export class Database { } case "InsertRound": { - await this.#db.insertInto("rounds").values(change.round).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("rounds") + .values(change.round) + .execute(); break; } case "UpdateRound": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -358,6 +477,7 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -375,6 +495,7 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -385,6 +506,7 @@ export class Database { case "InsertRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -393,6 +515,7 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -411,7 +534,11 @@ export class Database { }; } - await this.#db.insertInto("applications").values(application).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("applications") + .values(application) + .execute(); break; } @@ -425,6 +552,7 @@ export class Database { } await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -441,6 +569,7 @@ export class Database { case "InsertManyDonations": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -449,12 +578,17 @@ export class Database { } case "InsertManyPrices": { - await this.#db.insertInto("prices").values(change.prices).execute(); + await this.#db + .withSchema(this.priceDataSchemaName) + .insertInto("prices") + .values(change.prices) + .execute(); break; } case "IncrementRoundDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -472,6 +606,7 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -484,6 +619,7 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -502,6 +638,7 @@ export class Database { case "NewLegacyProject": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -510,12 +647,22 @@ export class Database { case "InsertApplicationPayout": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } + case "InsertIpfsData": { + await this.#db + .withSchema(this.ipfsDataSchemaName) + .insertInto("ipfsData") + .values(change.ipfs) + .execute(); + break; + } + default: throw new Error(`Unknown changeset type`); } @@ -523,6 +670,7 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -534,6 +682,7 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -545,6 +694,7 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -556,6 +706,7 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -567,6 +718,7 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -578,6 +730,7 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -593,6 +746,7 @@ export class Database { roleValue: string ) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -615,6 +769,7 @@ export class Database { } const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -631,6 +786,7 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -641,6 +797,7 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -650,6 +807,7 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -663,6 +821,7 @@ export class Database { applicationId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -679,6 +838,7 @@ export class Database { projectId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -695,6 +855,7 @@ export class Database { anchorAddress: Address ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -707,6 +868,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -723,6 +885,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -741,6 +904,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -750,6 +914,7 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -761,6 +926,7 @@ export class Database { donorAddress: Address ) { const donations = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -784,6 +950,7 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -791,4 +958,102 @@ export class Database { return result ?? null; } + + async deleteChainData(chainId: ChainId) { + this.#logger.info("Deleting chain data for chainId:", chainId); + + await this.#db.transaction().execute(async (trx) => { + this.#logger.info("Deleting pending round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingRoundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("roundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting pending project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingProjectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications donations"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("donations") + .where("chainId", "=", chainId) + .execute(); + + // this.#logger.info("Deleting donation prices"); + // await trx + // .withSchema(this.priceDataSchemaName) + // .deleteFrom("prices") + // .where("chainId", "=", chainId) + // .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting rounds"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("rounds") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting projects"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projects") + .where("chainId", "=", chainId) + .execute(); + }); + + this.#logger.info("Updating subscriptions indexed_to_block"); + const sqlQuery = ` + UPDATE ${this.chainDataSchemaName}.subscriptions + SET indexed_to_block = 0::bigint + WHERE chain_id = ${chainId} + `; + + await sql.raw(sqlQuery).execute(this.#db); + + this.#logger.info("Deleted chain data for chainId:", chainId); + } + + async getDataByCid(cId: string) { + const metadata = await this.#db + .withSchema(this.ipfsDataSchemaName) + .selectFrom("ipfsData") + .where("cid", "=", cId) + .selectAll() + .executeTakeFirst(); + + return metadata ?? null; + } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c473d710..bedfb571 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -297,22 +297,6 @@ export async function migrate(db: Kysely, schemaName: string) { .columns(["chainId", "roundId", "applicationId"]) .execute(); - await schema - .createTable("prices") - .addColumn("id", "serial", (cb) => cb.primaryKey()) - .addColumn("chainId", CHAIN_ID_TYPE) - .addColumn("tokenAddress", ADDRESS_TYPE) - .addColumn("priceInUSD", "real") - .addColumn("timestamp", "timestamptz") - .addColumn("blockNumber", BIGINT_TYPE) - .execute(); - - await db.schema - .createIndex("idx_prices_chain_token_block") - .on("prices") - .expression(sql`chain_id, token_address, block_number DESC`) - .execute(); - await schema .createTable("legacy_projects") .addColumn("id", "serial", (col) => col.primaryKey()) @@ -392,3 +376,36 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } + +export async function migrateDataFetcher(db: Kysely, schemaName: string) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("ipfs_data") + .addColumn("cid", "text") + .addColumn("data", "jsonb") + .execute(); +} + +export async function migratePriceFetcher( + db: Kysely, + schemaName: string +) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("prices") + .addColumn("id", "serial", (cb) => cb.primaryKey()) + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("tokenAddress", ADDRESS_TYPE) + .addColumn("priceInUSD", "real") + .addColumn("timestamp", "timestamptz") + .addColumn("blockNumber", BIGINT_TYPE) + .execute(); + + await db.schema + .createIndex("idx_prices_chain_token_block") + .on("prices") + .expression(sql`chain_id, token_address, block_number DESC`) + .execute(); +} diff --git a/src/database/schema.ts b/src/database/schema.ts index 06b9a906..8e50c133 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -125,6 +125,11 @@ export type ProjectTable = { projectType: ProjectType; }; +export type IpfsDataTable = { + cid: string; + data: unknown; +}; + export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -253,3 +258,8 @@ export type ApplicationPayout = { }; export type NewApplicationPayout = Insertable; + +export type NewIpfsData = { + cid: string; + data: unknown; +}; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index e4ad3c08..f29c33a9 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,7 +9,9 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - databaseSchema: config.db.databaseSchemaName, + chainDataSchemaName: config.db.chainDataSchemaName, + ipfsDataSchema: config.db.ipfsDataSchemaName, + priceDataSchema: config.db.priceDataSchemaName, }); }); diff --git a/src/http/app.ts b/src/http/app.ts index d5ef33ea..22a36bd1 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js"; import { DataProvider } from "../calculator/dataProvider/index.js"; import { Chain } from "../config.js"; import { Database } from "../database/index.js"; +import { Indexer } from "chainsauce"; +import { recoverMessageAddress } from "viem"; type AsyncRequestHandler = ( req: express.Request, @@ -38,6 +40,7 @@ export interface HttpApiConfig { | { type: "in-thread" } | { type: "worker-pool"; workerPoolSize: number }; }; + indexedChains?: Indexer[] | null; } interface HttpApi { @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { res.send(config.dataVersion); }); + app.get("/config", (_req, res) => { + res.send(config); + }); + + app.post("/index", (req, res) => { + try { + const { chainId, address, timestamp, signature } = req.body as { + chainId: string; + address: string; + timestamp: number; + signature: `0x${string}`; + }; + + const reindex = async () => { + if (!chainId || !config.indexedChains) { + return res.status(400).send("chainId is required"); + } + + try { + const isAuthenticated = await recoverEthereumAddress({ + address, + timestamp, + signature, + }); + + config.logger.info( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp}` + ); + + if (isAuthenticated) { + await config.db.deleteChainData(Number(chainId)); + + const filteredIndexedChains = config.indexedChains.filter( + (chain) => + (chain as { context: { chainId: number } }).context.chainId === + Number(chainId) + ); + + if (filteredIndexedChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const filteredChains = config.chains.filter( + (chain) => chain.id === Number(chainId) + ); + + if (filteredChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const chain = filteredChains[0]; + const indexedChain = filteredIndexedChains[0]; + + chain.subscriptions.forEach((subscription) => { + indexedChain.unsubscribeFromContract({ + address: subscription.address, + }); + + const contractName = subscription.contractName; + const subscriptionFromBlock = + subscription.fromBlock === undefined + ? undefined + : BigInt(subscription.fromBlock); + + indexedChain.subscribeToContract({ + contract: contractName, + address: subscription.address, + fromBlock: subscriptionFromBlock || BigInt(0), + }); + }); + } else { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication` + ); + return res.status(401).send("Authentication failed"); + } + } catch { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error` + ); + return res.status(500).send("An error occurred"); + } + }; + + reindex() + .then(() => { + config.logger.info(`Reindexing of chain ${chainId} finished`); + res.send("Reindexing finished"); + }) + .catch(() => { + config.logger.error( + `Reindexing of chain ${chainId} failed with error` + ); + res.status(500).send("An error occurred"); + }); + } catch { + config.logger.error(`Reindexing failed with error`); + res.status(500).send("An error occurred"); + } + }); + app.use("/api/v1", api); if (config.graphqlHandler !== undefined) { @@ -149,3 +255,58 @@ function staticJsonDataHandler(dataProvider: DataProvider) { } }; } + +const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period + +const recoverEthereumAddress = async ({ + address, + timestamp, + signature, +}: { + address: string; + timestamp: number; + signature: `0x${string}`; +}) => { + if (!address || !timestamp || !signature) { + return false; + } + const whitelistedAddresses: string[] = process.env.WHITELISTED_ADDRESSES + ? (JSON.parse(process.env.WHITELISTED_ADDRESSES) as string[]) + : []; + + if (!whitelistedAddresses) { + return false; + } + + // Check timestamp validity + const currentTime = Date.now(); + if (currentTime - timestamp > VALIDITY_PERIOD) { + return false; + } + + // Construct the expected message to be signed + const expectedMessage = `Authenticate with timestamp: ${timestamp}`; + try { + // Recover address from signature and expected message + const recoveredAddress = await recoverMessageAddress({ + message: expectedMessage, + signature, + }); + + const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) => + addr.toLowerCase() + ); + + if ( + recoveredAddress.toLowerCase() === address.toLowerCase() && + whitelistedAddressesLowercase.includes(address.toLowerCase()) + ) { + return true; + } else { + return false; + } + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +}; diff --git a/src/index.ts b/src/index.ts index e7f52d22..b169151d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -151,7 +151,9 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - schemaName: config.databaseSchemaName, + chainDataSchemaName: config.databaseSchemaName, + ipfsDataSchemaName: config.ipfsDatabaseSchemaName, + priceDataSchemaName: config.priceDatabaseSchemaName, }); baseLogger.info({ @@ -162,9 +164,9 @@ async function main(): Promise { (c) => c.name + " (rpc: " + - c.rpc.slice(0, 25) + + c.rpcs[0].slice(0, 25) + "..." + - c.rpc.slice(-5, -1) + + c.rpcs[0].slice(-5, -1) + ")" ), }); @@ -185,11 +187,32 @@ async function main(): Promise { } const chain = getChainConfigById(chainId); - const client = createPublicClient({ - transport: http(chain.rpc), - }); - const block = await client.getBlock({ blockNumber }); + let publicRpcClient: ReturnType | null = null; + + for (const rpcUrl of chain.rpcs) { + try { + const client = createPublicClient({ + transport: http(rpcUrl), + }); + + const blockNumber = await client.getBlockNumber(); + if (blockNumber !== null) { + publicRpcClient = client; + break; + } + } catch (error) { + throw new Error( + "Failed to connect to RPC at ${rpcUrl}, trying next one..." + ); + } + } + + if (!publicRpcClient) { + throw new Error("All RPC connections failed"); + } + + const block = await publicRpcClient.getBlock({ blockNumber }); const timestamp = Number(block.timestamp); const chainsauceBlock: Block = { @@ -240,15 +263,22 @@ async function main(): Promise { const lock = await db.acquireWriteLock(); if (lock !== null) { - baseLogger.info("acquired write lock"); - if (isFirstRun) { if (config.dropDb) { - baseLogger.info("dropping schema"); - await db.dropSchemaIfExists(); + baseLogger.info("dropping all schemas"); + await db.dropAllSchemaIfExists(); + } else if (config.dropChainDb) { + baseLogger.info("resetting chain data schema"); + await db.dropChainDataSchemaIfExists(); + } else if (config.dropIpfsDb) { + baseLogger.info("resetting ipfs data schema"); + await db.dropIpfsDataSchemaIfExists(); + } else if (config.dropPriceDb) { + baseLogger.info("resetting price data schema"); + await db.dropPriceDataSchemaIfExists(); } - await db.createSchemaIfNotExists(baseLogger); + await db.createAllSchemas(baseLogger); await subscriptionStore.init(); } @@ -324,7 +354,11 @@ async function main(): Promise { const graphqlHandler = postgraphile( readOnlyDatabaseConnectionPool, - config.databaseSchemaName, + [ + config.databaseSchemaName, + config.ipfsDatabaseSchemaName, + config.priceDatabaseSchemaName, + ], { watchPg: false, graphqlRoute: "/graphql", @@ -401,6 +435,7 @@ async function main(): Promise { workerPoolSize: config.estimatesLinearQfWorkerPoolSize, }, }, + indexedChains: await indexChainsPromise, }); await httpApi.start(); @@ -465,43 +500,120 @@ async function catchupAndWatchChain( return undefined; } - const url = `${config.ipfsGateway}/ipfs/${cid}`; - - // chainLogger.trace(`Fetching ${url}`); + // Check if data is already in the IPFS database + const ipfsData = await db.getDataByCid(cid); + if (ipfsData) { + chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + return Promise.resolve(ipfsData.data as string as T); + } - const res = await fetch(url, { - timeout: 2000, - onRetry(cause) { - chainLogger.debug({ - msg: "Retrying IPFS request", - url: url, - err: cause, + // Fetch from a single IPFS gateway + const fetchFromGateway = async (url: string): Promise => { + try { + const res = await fetch(url, { + timeout: 2000, + onRetry(cause) { + chainLogger.debug({ + msg: "Retrying IPFS request", + url: url, + err: cause, + }); + }, + retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, + // IPFS data is immutable, we can rely entirely on the cache when present + cache: "force-cache", + cachePath: + config.cacheDir !== null + ? path.join(config.cacheDir, "ipfs") + : undefined, }); - }, - retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, - // IPFS data is immutable, we can rely entirely on the cache when present - cache: "force-cache", - cachePath: - config.cacheDir !== null - ? path.join(config.cacheDir, "ipfs") - : undefined, - }); - return (await res.json()) as T; + if (res.ok) { + return (await res.json()) as T; // Return the fetched data + } else { + chainLogger.warn( + `Failed to fetch from ${url}, status: ${res.status} ${res.statusText}` + ); + } + } catch (err) { + chainLogger.error( + `Error fetching from gateway ${url}: ${String(err)}` + ); + } + }; + + // Iterate through each gateway and attempt to fetch data + for (const gateway of config.ipfsGateways) { + const url = `${gateway}/ipfs/${cid}`; + // chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); + + const result = await fetchFromGateway(url); + if (result !== undefined) { + // chainLogger.info( + // `Fetch successful from gateway: ${gateway} for CID: ${cid}` + // ); + + // Save to IpfsData table + try { + await db.applyChange({ + type: "InsertIpfsData", + ipfs: { + cid, + data: result, // TODO: check is JSON.parse is needed + }, + }); + } catch (err) { + chainLogger.error( + `Error saving IPFS data to database: ${String(err)}` + ); + } + + return result; // Return the result if fetched successfully + } else { + chainLogger.warn( + `IPFS fetch failed for gateway ${gateway} for CID ${cid}` + ); + } + } + + chainLogger.error( + `Failed to fetch IPFS data for CID ${cid} from all gateways.` + ); + return undefined; // Return undefined if all gateways fail }; chainLogger.info("DEBUG: catching up with blockchain events"); const indexerLogger = chainLogger.child({ subsystem: "DataUpdater" }); - const viemRpcClient = createPublicClient({ - transport: http(config.chain.rpc), - }); + let publicRpcClient: ReturnType | null = null; + + for (const rpcUrl of config.chain.rpcs) { + try { + const client = createPublicClient({ + transport: http(rpcUrl), + }); + + const blockNumber = await client.getBlockNumber(); + if (blockNumber !== null) { + publicRpcClient = client; + break; + } + } catch (error) { + chainLogger.warn( + `Failed to connect to RPC at ${rpcUrl}, trying next one...` + ); + } + } + + if (!publicRpcClient) { + throw new Error("All RPC connections failed"); + } const eventHandlerContext: EventHandlerContext = { chainId: config.chain.id, db, - rpcClient: viemRpcClient, + rpcClient: publicRpcClient, ipfsGet: cachedIpfsGet, priceProvider, blockTimestampInMs: blockTimestampInMs, @@ -512,7 +624,7 @@ async function catchupAndWatchChain( retryDelayMs: 1000, maxConcurrentRequests: 10, maxRetries: 3, - url: config.chain.rpc, + url: config.chain.rpcs[0], onRequest({ method, params }) { chainLogger.trace({ msg: `RPC Request ${method}`, params }); }, diff --git a/src/prices/provider.ts b/src/prices/provider.ts index 01f75656..a8d48091 100644 --- a/src/prices/provider.ts +++ b/src/prices/provider.ts @@ -211,10 +211,19 @@ export function createPriceProvider( }); } - await db.applyChange({ - type: "InsertManyPrices", - prices: [newPrice], - }); + // Check if the price is already in the database + const existingPrice = await db.getTokenPriceByBlockNumber( + chainId, + newPrice.tokenAddress, + blockNumber + ); + + if (!existingPrice) { + await db.applyChange({ + type: "InsertManyPrices", + prices: [newPrice], + }); + } return { ...newPrice, tokenDecimals: token.decimals }; }