diff --git a/Cargo.lock b/Cargo.lock index 3f6982bf..6137c615 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6575,37 +6575,6 @@ dependencies = [ "der", ] -[[package]] -name = "spo-api" -version = "3.0.0" -dependencies = [ - "anyhow", - "async-graphql", - "async-graphql-axum", - "axum", - "byte-unit-serde", - "clap", - "derive_more 2.1.1", - "fastrace", - "fastrace-axum", - "futures", - "indexer-common", - "indoc", - "log", - "metrics", - "once_cell", - "regex", - "serde", - "serde_with", - "sqlx", - "thiserror 2.0.17", - "tokio", - "tower", - "tower-http", - "trait-variant", - "uuid", -] - [[package]] name = "spo-indexer" version = "3.0.0" diff --git a/Cargo.toml b/Cargo.toml index 4ed1d605..d6e6e4d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ members = [ "indexer-standalone", "indexer-tests", "spo-indexer", - "spo-api", ] [workspace.package] diff --git a/indexer-api/graphql/schema-v3.graphql b/indexer-api/graphql/schema-v3.graphql index 4e4fe531..e5b2caef 100644 --- a/indexer-api/graphql/schema-v3.graphql +++ b/indexer-api/graphql/schema-v3.graphql @@ -75,6 +75,19 @@ type CollapsedMerkleTree { protocolVersion: Int! } +""" +Committee member for an epoch. +""" +type CommitteeMember { + epochNo: Int! + position: Int! + sidechainPubkeyHex: String! + expectedSlots: Int! + auraPubkeyHex: String + poolIdHex: String + spoSkHex: String +} + """ A contract action. """ @@ -354,6 +367,37 @@ type DustSpendProcessed implements DustLedgerEvent { maxId: Int! } +""" +Current epoch information. +""" +type EpochInfo { + epochNo: Int! + durationSeconds: Int! + elapsedSeconds: Int! +} + +""" +SPO performance for an epoch. +""" +type EpochPerf { + epochNo: Int! + spoSkHex: String! + produced: Int! + expected: Int! + identityLabel: String + stakeSnapshot: String + poolIdHex: String + validatorClass: String +} + +""" +First valid epoch for an SPO identity. +""" +type FirstValidEpoch { + idKey: String! + firstValidEpoch: Int! +} + scalar HexEncoded type Mutation { @@ -382,6 +426,28 @@ type ParamChange implements DustLedgerEvent { maxId: Int! } +""" +Pool metadata from Cardano. +""" +type PoolMetadata { + poolIdHex: String! + hexId: String + name: String + ticker: String + homepageUrl: String + logoUrl: String +} + +""" +Presence event for an SPO in an epoch. +""" +type PresenceEvent { + epochNo: Int! + idKey: String! + source: String! + status: String +} + type Query { """ Find a block for the given optional offset; if not present, the latest block is returned. @@ -407,6 +473,107 @@ type Query { Get the full history of Terms and Conditions changes for governance auditability. """ termsAndConditionsHistory: [TermsAndConditionsChange!]! + """ + List SPO identities with pagination. + """ + spoIdentities(limit: Int, offset: Int): [SpoIdentity!]! + """ + Get SPO identity by pool ID. + """ + spoIdentityByPoolId(poolIdHex: String!): SpoIdentity + """ + Get total count of SPOs. + """ + spoCount: Int + """ + Get pool metadata by pool ID. + """ + poolMetadata(poolIdHex: String!): PoolMetadata + """ + List pool metadata with pagination. + """ + poolMetadataList(limit: Int, offset: Int, withNameOnly: Boolean): [PoolMetadata!]! + """ + Get SPO with metadata by pool ID. + """ + spoByPoolId(poolIdHex: String!): Spo + """ + List SPOs with optional search. + """ + spoList(limit: Int, offset: Int, search: String): [Spo!]! + """ + Get composite SPO data (identity + metadata + performance). + """ + spoCompositeByPoolId(poolIdHex: String!): SpoComposite + """ + Get SPO identifiers ordered by performance. + """ + stakePoolOperators(limit: Int): [String!]! + """ + Get latest SPO performance entries. + """ + spoPerformanceLatest(limit: Int, offset: Int): [EpochPerf!]! + """ + Get SPO performance by SPO key. + """ + spoPerformanceBySpoSk(spoSkHex: String!, limit: Int, offset: Int): [EpochPerf!]! + """ + Get epoch performance for all SPOs. + """ + epochPerformance(epoch: Int!, limit: Int, offset: Int): [EpochPerf!]! + """ + Get current epoch information. + """ + currentEpochInfo: EpochInfo + """ + Get epoch utilization (produced/expected ratio). + """ + epochUtilization(epoch: Int!): Float + """ + Get committee membership for an epoch. + """ + committee(epoch: Int!): [CommitteeMember!]! + """ + Get cumulative registration totals for an epoch range. + """ + registeredTotalsSeries(fromEpoch: Int!, toEpoch: Int!): [RegisteredTotals!]! + """ + Get registration statistics for an epoch range. + """ + registeredSpoSeries(fromEpoch: Int!, toEpoch: Int!): [RegisteredStat!]! + """ + Get raw presence events for an epoch range. + """ + registeredPresence(fromEpoch: Int!, toEpoch: Int!): [PresenceEvent!]! + """ + Get first valid epoch for each SPO identity. + """ + registeredFirstValidEpochs(uptoEpoch: Int): [FirstValidEpoch!]! + """ + Get stake distribution with search and ordering. + """ + stakeDistribution(limit: Int, offset: Int, search: String, orderByStakeDesc: Boolean): [StakeShare!]! +} + +""" +Registration statistics for an epoch. +""" +type RegisteredStat { + epochNo: Int! + federatedValidCount: Int! + federatedInvalidCount: Int! + registeredValidCount: Int! + registeredInvalidCount: Int! + dparam: Float +} + +""" +Cumulative registration totals for an epoch. +""" +type RegisteredTotals { + epochNo: Int! + totalRegistered: Int! + newlyRegistered: Int! } """ @@ -539,6 +706,58 @@ type ShieldedTransactionsProgress { highestRelevantEndIndex: Int! } +""" +SPO with optional metadata. +""" +type Spo { + poolIdHex: String! + validatorClass: String! + sidechainPubkeyHex: String! + auraPubkeyHex: String + name: String + ticker: String + homepageUrl: String + logoUrl: String +} + +""" +Composite SPO data (identity + metadata + performance). +""" +type SpoComposite { + identity: SpoIdentity + metadata: PoolMetadata + performance: [EpochPerf!]! +} + +""" +SPO identity information. +""" +type SpoIdentity { + poolIdHex: String! + mainchainPubkeyHex: String! + sidechainPubkeyHex: String! + auraPubkeyHex: String + validatorClass: String! +} + +""" +Stake share information for an SPO. +""" +type StakeShare { + poolIdHex: String! + name: String + ticker: String + homepageUrl: String + logoUrl: String + liveStake: String + activeStake: String + liveDelegators: Int + liveSaturation: Float + declaredPledge: String + livePledge: String + stakeShare: Float +} + type Subscription { """ Subscribe to blocks starting at the given offset or at the latest block if the offset is diff --git a/indexer-api/src/domain.rs b/indexer-api/src/domain.rs index 81093208..f69f88c2 100644 --- a/indexer-api/src/domain.rs +++ b/indexer-api/src/domain.rs @@ -19,6 +19,7 @@ mod contract_action; pub mod dust; mod ledger_event; mod ledger_state; +pub mod spo; pub mod system_parameters; mod transaction; mod unshielded; diff --git a/indexer-api/src/domain/spo.rs b/indexer-api/src/domain/spo.rs new file mode 100644 index 00000000..e6a5da4a --- /dev/null +++ b/indexer-api/src/domain/spo.rs @@ -0,0 +1,139 @@ +// This file is part of midnight-indexer. +// Copyright (C) 2025 Midnight Foundation +// SPDX-License-Identifier: Apache-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// SPO identity information. +#[derive(Debug, Clone)] +pub struct SpoIdentity { + pub pool_id_hex: String, + pub mainchain_pubkey_hex: String, + pub sidechain_pubkey_hex: String, + pub aura_pubkey_hex: Option, + pub validator_class: String, +} + +/// Pool metadata from Cardano. +#[derive(Debug, Clone)] +pub struct PoolMetadata { + pub pool_id_hex: String, + pub hex_id: Option, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, +} + +/// SPO with optional metadata. +#[derive(Debug, Clone)] +pub struct Spo { + pub pool_id_hex: String, + pub validator_class: String, + pub sidechain_pubkey_hex: String, + pub aura_pubkey_hex: Option, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, +} + +/// Composite SPO data (identity + metadata + performance). +#[derive(Debug, Clone)] +pub struct SpoComposite { + pub identity: Option, + pub metadata: Option, + pub performance: Vec, +} + +/// SPO performance for an epoch. +#[derive(Debug, Clone)] +pub struct EpochPerf { + pub epoch_no: i64, + pub spo_sk_hex: String, + pub produced: i64, + pub expected: i64, + pub identity_label: Option, + pub stake_snapshot: Option, + pub pool_id_hex: Option, + pub validator_class: Option, +} + +/// Current epoch information. +#[derive(Debug, Clone)] +pub struct EpochInfo { + pub epoch_no: i64, + pub duration_seconds: i64, + pub elapsed_seconds: i64, +} + +/// Committee member for an epoch. +#[derive(Debug, Clone)] +pub struct CommitteeMember { + pub epoch_no: i64, + pub position: i32, + pub sidechain_pubkey_hex: String, + pub expected_slots: i32, + pub aura_pubkey_hex: Option, + pub pool_id_hex: Option, + pub spo_sk_hex: Option, +} + +/// Registration statistics for an epoch. +#[derive(Debug, Clone)] +pub struct RegisteredStat { + pub epoch_no: i64, + pub federated_valid_count: i64, + pub federated_invalid_count: i64, + pub registered_valid_count: i64, + pub registered_invalid_count: i64, + pub dparam: Option, +} + +/// Cumulative registration totals for an epoch. +#[derive(Debug, Clone)] +pub struct RegisteredTotals { + pub epoch_no: i64, + pub total_registered: i64, + pub newly_registered: i64, +} + +/// Presence event for an SPO in an epoch. +#[derive(Debug, Clone)] +pub struct PresenceEvent { + pub epoch_no: i64, + pub id_key: String, + pub source: String, + pub status: Option, +} + +/// First valid epoch for an SPO identity. +#[derive(Debug, Clone)] +pub struct FirstValidEpoch { + pub id_key: String, + pub first_valid_epoch: i64, +} + +/// Stake share information for an SPO. +#[derive(Debug, Clone)] +pub struct StakeShare { + pub pool_id_hex: String, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, + pub live_stake: Option, + pub active_stake: Option, + pub live_delegators: Option, + pub live_saturation: Option, + pub declared_pledge: Option, + pub live_pledge: Option, + pub stake_share: Option, +} diff --git a/indexer-api/src/domain/storage.rs b/indexer-api/src/domain/storage.rs index 83780676..8f0978cc 100644 --- a/indexer-api/src/domain/storage.rs +++ b/indexer-api/src/domain/storage.rs @@ -18,6 +18,7 @@ pub mod contract_action; pub mod dust; pub mod ledger_events; pub mod ledger_state; +pub mod spo; pub mod system_parameters; pub mod transaction; pub mod unshielded; @@ -25,7 +26,7 @@ pub mod wallet; use crate::domain::storage::{ block::BlockStorage, contract_action::ContractActionStorage, dust::DustStorage, - ledger_events::LedgerEventStorage, ledger_state::LedgerStateStorage, + ledger_events::LedgerEventStorage, ledger_state::LedgerStateStorage, spo::SpoStorage, system_parameters::SystemParametersStorage, transaction::TransactionStorage, unshielded::UnshieldedUtxoStorage, wallet::WalletStorage, }; @@ -39,6 +40,7 @@ where + DustStorage + LedgerEventStorage + LedgerStateStorage + + SpoStorage + SystemParametersStorage + TransactionStorage + UnshieldedUtxoStorage diff --git a/indexer-api/src/domain/storage/spo.rs b/indexer-api/src/domain/storage/spo.rs new file mode 100644 index 00000000..d7c574e4 --- /dev/null +++ b/indexer-api/src/domain/storage/spo.rs @@ -0,0 +1,283 @@ +// This file is part of midnight-indexer. +// Copyright (C) 2025 Midnight Foundation +// SPDX-License-Identifier: Apache-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::domain::{ + spo::{ + CommitteeMember, EpochInfo, EpochPerf, FirstValidEpoch, PoolMetadata, PresenceEvent, + RegisteredStat, RegisteredTotals, Spo, SpoComposite, SpoIdentity, StakeShare, + }, + storage::NoopStorage, +}; + +/// Storage abstraction for SPO data. +#[trait_variant::make(Send)] +pub trait SpoStorage +where + Self: Clone + Send + Sync + 'static, +{ + /// Get SPO identities with pagination. + async fn get_spo_identities( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error>; + + /// Get SPO identity by pool ID. + async fn get_spo_identity_by_pool_id( + &self, + pool_id: &str, + ) -> Result, sqlx::Error>; + + /// Get total count of SPOs. + async fn get_spo_count(&self) -> Result; + + /// Get pool metadata by pool ID. + async fn get_pool_metadata(&self, pool_id: &str) -> Result, sqlx::Error>; + + /// Get pool metadata list with pagination. + async fn get_pool_metadata_list( + &self, + limit: i64, + offset: i64, + with_name_only: bool, + ) -> Result, sqlx::Error>; + + /// Get SPO with metadata by pool ID. + async fn get_spo_by_pool_id(&self, pool_id: &str) -> Result, sqlx::Error>; + + /// Get SPO list with optional search. + async fn get_spo_list( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + ) -> Result, sqlx::Error>; + + /// Get composite SPO data (identity + metadata + performance). + async fn get_spo_composite_by_pool_id( + &self, + pool_id: &str, + perf_limit: i64, + ) -> Result, sqlx::Error>; + + /// Get SPO identifiers ordered by performance. + async fn get_stake_pool_operator_ids(&self, limit: i64) -> Result, sqlx::Error>; + + /// Get latest SPO performance entries. + async fn get_spo_performance_latest( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error>; + + /// Get SPO performance by SPO key. + async fn get_spo_performance_by_spo_sk( + &self, + spo_sk: &str, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error>; + + /// Get epoch performance for all SPOs. + async fn get_epoch_performance( + &self, + epoch: i64, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error>; + + /// Get current epoch information. + async fn get_current_epoch_info(&self) -> Result, sqlx::Error>; + + /// Get epoch utilization (produced/expected ratio). + async fn get_epoch_utilization(&self, epoch: i64) -> Result, sqlx::Error>; + + /// Get committee membership for an epoch. + async fn get_committee(&self, epoch: i64) -> Result, sqlx::Error>; + + /// Get cumulative registration totals for an epoch range. + async fn get_registered_totals_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error>; + + /// Get registration statistics for an epoch range. + async fn get_registered_spo_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error>; + + /// Get raw presence events for an epoch range. + async fn get_registered_presence( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error>; + + /// Get first valid epoch for each SPO identity. + async fn get_registered_first_valid_epochs( + &self, + upto_epoch: Option, + ) -> Result, sqlx::Error>; + + /// Get stake distribution with search and ordering. + /// Returns (stake_shares, total_live_stake). + async fn get_stake_distribution( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + order_desc: bool, + ) -> Result<(Vec, f64), sqlx::Error>; +} + +#[allow(unused_variables)] +impl SpoStorage for NoopStorage { + async fn get_spo_identities( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_identity_by_pool_id( + &self, + pool_id: &str, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_count(&self) -> Result { + unimplemented!() + } + + async fn get_pool_metadata(&self, pool_id: &str) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_pool_metadata_list( + &self, + limit: i64, + offset: i64, + with_name_only: bool, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_by_pool_id(&self, pool_id: &str) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_list( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_composite_by_pool_id( + &self, + pool_id: &str, + perf_limit: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_stake_pool_operator_ids(&self, limit: i64) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_performance_latest( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_spo_performance_by_spo_sk( + &self, + spo_sk: &str, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_epoch_performance( + &self, + epoch: i64, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_current_epoch_info(&self) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_epoch_utilization(&self, epoch: i64) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_committee(&self, epoch: i64) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_registered_totals_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_registered_spo_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_registered_presence( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_registered_first_valid_epochs( + &self, + upto_epoch: Option, + ) -> Result, sqlx::Error> { + unimplemented!() + } + + async fn get_stake_distribution( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + order_desc: bool, + ) -> Result<(Vec, f64), sqlx::Error> { + unimplemented!() + } +} diff --git a/indexer-api/src/infra/api/v3.rs b/indexer-api/src/infra/api/v3.rs index 3782266a..84e36f4b 100644 --- a/indexer-api/src/infra/api/v3.rs +++ b/indexer-api/src/infra/api/v3.rs @@ -17,6 +17,7 @@ pub mod dust; pub mod ledger_events; pub mod mutation; pub mod query; +pub mod spo; pub mod subscription; pub mod system_parameters; pub mod transaction; diff --git a/indexer-api/src/infra/api/v3/query.rs b/indexer-api/src/infra/api/v3/query.rs index f7da9188..b9d1128a 100644 --- a/indexer-api/src/infra/api/v3/query.rs +++ b/indexer-api/src/infra/api/v3/query.rs @@ -20,6 +20,11 @@ use crate::{ block::{Block, BlockOffset}, contract_action::{ContractAction, ContractActionOffset}, dust::DustGenerationStatus, + spo::{ + CommitteeMember, EpochInfo, EpochPerf, FirstValidEpoch, PoolMetadata, + PresenceEvent, RegisteredStat, RegisteredTotals, Spo, SpoComposite, SpoIdentity, + StakeShare, + }, system_parameters::{DParameterChange, TermsAndConditionsChange}, transaction::{Transaction, TransactionOffset}, }, @@ -29,6 +34,8 @@ use async_graphql::{Context, Object}; use fastrace::trace; use std::marker::PhantomData; +const DEFAULT_PERFORMANCE_LIMIT: i64 = 20; + /// GraphQL queries. pub struct Query { _s: PhantomData, @@ -268,4 +275,394 @@ where .map(TermsAndConditionsChange::from) .collect()) } + + /// List SPO identities with pagination. + #[trace] + async fn spo_identities( + &self, + cx: &Context<'_>, + limit: Option, + offset: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(50).clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + + let identities = storage + .get_spo_identities(limit, offset) + .await + .map_err_into_server_error(|| "get SPO identities")?; + + Ok(identities.into_iter().map(Into::into).collect()) + } + + /// Get SPO identity by pool ID. + #[trace] + async fn spo_identity_by_pool_id( + &self, + cx: &Context<'_>, + pool_id_hex: String, + ) -> ApiResult> { + let pool_id = normalize_hex(&pool_id_hex); + let storage = cx.get_storage::(); + + let identity = storage + .get_spo_identity_by_pool_id(&pool_id) + .await + .map_err_into_server_error(|| "get SPO identity by pool ID")?; + + Ok(identity.map(Into::into)) + } + + /// Get total count of SPOs. + #[trace] + async fn spo_count(&self, cx: &Context<'_>) -> ApiResult> { + let storage = cx.get_storage::(); + + let count = storage + .get_spo_count() + .await + .map_err_into_server_error(|| "get SPO count")?; + + Ok(Some(count)) + } + + /// Get pool metadata by pool ID. + #[trace] + async fn pool_metadata( + &self, + cx: &Context<'_>, + pool_id_hex: String, + ) -> ApiResult> { + let pool_id = normalize_hex(&pool_id_hex); + let storage = cx.get_storage::(); + + let metadata = storage + .get_pool_metadata(&pool_id) + .await + .map_err_into_server_error(|| "get pool metadata")?; + + Ok(metadata.map(Into::into)) + } + + /// List pool metadata with pagination. + #[trace] + async fn pool_metadata_list( + &self, + cx: &Context<'_>, + limit: Option, + offset: Option, + with_name_only: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(50).clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + let with_name_only = with_name_only.unwrap_or(false); + + let metadata = storage + .get_pool_metadata_list(limit, offset, with_name_only) + .await + .map_err_into_server_error(|| "get pool metadata list")?; + + Ok(metadata.into_iter().map(Into::into).collect()) + } + + /// Get SPO with metadata by pool ID. + #[trace] + async fn spo_by_pool_id( + &self, + cx: &Context<'_>, + pool_id_hex: String, + ) -> ApiResult> { + let pool_id = normalize_hex(&pool_id_hex); + let storage = cx.get_storage::(); + + let spo = storage + .get_spo_by_pool_id(&pool_id) + .await + .map_err_into_server_error(|| "get SPO by pool ID")?; + + Ok(spo.map(Into::into)) + } + + /// List SPOs with optional search. + #[trace] + async fn spo_list( + &self, + cx: &Context<'_>, + limit: Option, + offset: Option, + search: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(20).clamp(1, 200) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + let search_ref = search.as_deref().and_then(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed) + } + }); + + let spos = storage + .get_spo_list(limit, offset, search_ref) + .await + .map_err_into_server_error(|| "get SPO list")?; + + Ok(spos.into_iter().map(Into::into).collect()) + } + + /// Get composite SPO data (identity + metadata + performance). + #[trace] + async fn spo_composite_by_pool_id( + &self, + cx: &Context<'_>, + pool_id_hex: String, + ) -> ApiResult> { + let pool_id = normalize_hex(&pool_id_hex); + let storage = cx.get_storage::(); + + let composite = storage + .get_spo_composite_by_pool_id(&pool_id, DEFAULT_PERFORMANCE_LIMIT) + .await + .map_err_into_server_error(|| "get SPO composite by pool ID")?; + + Ok(composite.map(Into::into)) + } + + /// Get SPO identifiers ordered by performance. + #[trace] + async fn stake_pool_operators( + &self, + cx: &Context<'_>, + limit: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(20).clamp(1, 100) as i64; + + let ids = storage + .get_stake_pool_operator_ids(limit) + .await + .map_err_into_server_error(|| "get stake pool operators")?; + + Ok(ids) + } + + /// Get latest SPO performance entries. + #[trace] + async fn spo_performance_latest( + &self, + cx: &Context<'_>, + limit: Option, + offset: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit + .unwrap_or(DEFAULT_PERFORMANCE_LIMIT as i32) + .clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + + let perfs = storage + .get_spo_performance_latest(limit, offset) + .await + .map_err_into_server_error(|| "get SPO performance latest")?; + + Ok(perfs.into_iter().map(Into::into).collect()) + } + + /// Get SPO performance by SPO key. + #[trace] + async fn spo_performance_by_spo_sk( + &self, + cx: &Context<'_>, + spo_sk_hex: String, + limit: Option, + offset: Option, + ) -> ApiResult> { + let spo_sk = normalize_hex(&spo_sk_hex); + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(100).clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + + let perfs = storage + .get_spo_performance_by_spo_sk(&spo_sk, limit, offset) + .await + .map_err_into_server_error(|| "get SPO performance by SPO key")?; + + Ok(perfs.into_iter().map(Into::into).collect()) + } + + /// Get epoch performance for all SPOs. + #[trace] + async fn epoch_performance( + &self, + cx: &Context<'_>, + epoch: i64, + limit: Option, + offset: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(100).clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + + let perfs = storage + .get_epoch_performance(epoch, limit, offset) + .await + .map_err_into_server_error(|| "get epoch performance")?; + + Ok(perfs.into_iter().map(Into::into).collect()) + } + + /// Get current epoch information. + #[trace] + async fn current_epoch_info(&self, cx: &Context<'_>) -> ApiResult> { + let storage = cx.get_storage::(); + + let info = storage + .get_current_epoch_info() + .await + .map_err_into_server_error(|| "get current epoch info")?; + + Ok(info.map(Into::into)) + } + + /// Get epoch utilization (produced/expected ratio). + #[trace] + async fn epoch_utilization(&self, cx: &Context<'_>, epoch: i32) -> ApiResult> { + let storage = cx.get_storage::(); + + let utilization = storage + .get_epoch_utilization(epoch as i64) + .await + .map_err_into_server_error(|| "get epoch utilization")?; + + Ok(utilization) + } + + /// Get committee membership for an epoch. + #[trace] + async fn committee(&self, cx: &Context<'_>, epoch: i64) -> ApiResult> { + let storage = cx.get_storage::(); + + let members = storage + .get_committee(epoch) + .await + .map_err_into_server_error(|| "get committee")?; + + Ok(members.into_iter().map(Into::into).collect()) + } + + /// Get cumulative registration totals for an epoch range. + #[trace] + async fn registered_totals_series( + &self, + cx: &Context<'_>, + from_epoch: i64, + to_epoch: i64, + ) -> ApiResult> { + let storage = cx.get_storage::(); + + let totals = storage + .get_registered_totals_series(from_epoch, to_epoch) + .await + .map_err_into_server_error(|| "get registered totals series")?; + + Ok(totals.into_iter().map(Into::into).collect()) + } + + /// Get registration statistics for an epoch range. + #[trace] + async fn registered_spo_series( + &self, + cx: &Context<'_>, + from_epoch: i64, + to_epoch: i64, + ) -> ApiResult> { + let storage = cx.get_storage::(); + + let stats = storage + .get_registered_spo_series(from_epoch, to_epoch) + .await + .map_err_into_server_error(|| "get registered SPO series")?; + + Ok(stats.into_iter().map(Into::into).collect()) + } + + /// Get raw presence events for an epoch range. + #[trace] + async fn registered_presence( + &self, + cx: &Context<'_>, + from_epoch: i64, + to_epoch: i64, + ) -> ApiResult> { + let storage = cx.get_storage::(); + + let events = storage + .get_registered_presence(from_epoch, to_epoch) + .await + .map_err_into_server_error(|| "get registered presence")?; + + Ok(events.into_iter().map(Into::into).collect()) + } + + /// Get first valid epoch for each SPO identity. + #[trace] + async fn registered_first_valid_epochs( + &self, + cx: &Context<'_>, + upto_epoch: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + + let epochs = storage + .get_registered_first_valid_epochs(upto_epoch) + .await + .map_err_into_server_error(|| "get registered first valid epochs")?; + + Ok(epochs.into_iter().map(Into::into).collect()) + } + + /// Get stake distribution with search and ordering. + #[trace] + async fn stake_distribution( + &self, + cx: &Context<'_>, + limit: Option, + offset: Option, + search: Option, + order_by_stake_desc: Option, + ) -> ApiResult> { + let storage = cx.get_storage::(); + let limit = limit.unwrap_or(50).clamp(1, 500) as i64; + let offset = offset.unwrap_or(0).max(0) as i64; + let search_ref = search.as_deref().and_then(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed) + } + }); + let order_desc = order_by_stake_desc.unwrap_or(true); + + let (shares, _total) = storage + .get_stake_distribution(limit, offset, search_ref, order_desc) + .await + .map_err_into_server_error(|| "get stake distribution")?; + + Ok(shares.into_iter().map(Into::into).collect()) + } +} + +/// Normalize hex string by stripping 0x prefix and lowercasing. +fn normalize_hex(input: &str) -> String { + let s = input + .strip_prefix("0x") + .unwrap_or(input) + .strip_prefix("0X") + .unwrap_or(input); + s.to_ascii_lowercase() } diff --git a/indexer-api/src/infra/api/v3/spo.rs b/indexer-api/src/infra/api/v3/spo.rs new file mode 100644 index 00000000..fe0e9284 --- /dev/null +++ b/indexer-api/src/infra/api/v3/spo.rs @@ -0,0 +1,312 @@ +// This file is part of midnight-indexer. +// Copyright (C) 2025 Midnight Foundation +// SPDX-License-Identifier: Apache-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::domain::spo::{ + CommitteeMember as DomainCommitteeMember, EpochInfo as DomainEpochInfo, + EpochPerf as DomainEpochPerf, FirstValidEpoch as DomainFirstValidEpoch, + PoolMetadata as DomainPoolMetadata, PresenceEvent as DomainPresenceEvent, + RegisteredStat as DomainRegisteredStat, RegisteredTotals as DomainRegisteredTotals, + Spo as DomainSpo, SpoComposite as DomainSpoComposite, SpoIdentity as DomainSpoIdentity, + StakeShare as DomainStakeShare, +}; +use async_graphql::SimpleObject; + +/// SPO identity information. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct SpoIdentity { + pub pool_id_hex: String, + pub mainchain_pubkey_hex: String, + pub sidechain_pubkey_hex: String, + pub aura_pubkey_hex: Option, + pub validator_class: String, +} + +impl From for SpoIdentity { + fn from(d: DomainSpoIdentity) -> Self { + Self { + pool_id_hex: d.pool_id_hex, + mainchain_pubkey_hex: d.mainchain_pubkey_hex, + sidechain_pubkey_hex: d.sidechain_pubkey_hex, + aura_pubkey_hex: d.aura_pubkey_hex, + validator_class: d.validator_class, + } + } +} + +/// Pool metadata from Cardano. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct PoolMetadata { + pub pool_id_hex: String, + pub hex_id: Option, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, +} + +impl From for PoolMetadata { + fn from(d: DomainPoolMetadata) -> Self { + Self { + pool_id_hex: d.pool_id_hex, + hex_id: d.hex_id, + name: d.name, + ticker: d.ticker, + homepage_url: d.homepage_url, + logo_url: d.logo_url, + } + } +} + +/// SPO with optional metadata. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct Spo { + pub pool_id_hex: String, + pub validator_class: String, + pub sidechain_pubkey_hex: String, + pub aura_pubkey_hex: Option, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, +} + +impl From for Spo { + fn from(d: DomainSpo) -> Self { + Self { + pool_id_hex: d.pool_id_hex, + validator_class: d.validator_class, + sidechain_pubkey_hex: d.sidechain_pubkey_hex, + aura_pubkey_hex: d.aura_pubkey_hex, + name: d.name, + ticker: d.ticker, + homepage_url: d.homepage_url, + logo_url: d.logo_url, + } + } +} + +/// Composite SPO data (identity + metadata + performance). +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct SpoComposite { + pub identity: Option, + pub metadata: Option, + pub performance: Vec, +} + +impl From for SpoComposite { + fn from(d: DomainSpoComposite) -> Self { + Self { + identity: d.identity.map(Into::into), + metadata: d.metadata.map(Into::into), + performance: d.performance.into_iter().map(Into::into).collect(), + } + } +} + +/// SPO performance for an epoch. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct EpochPerf { + pub epoch_no: i64, + pub spo_sk_hex: String, + pub produced: i64, + pub expected: i64, + pub identity_label: Option, + pub stake_snapshot: Option, + pub pool_id_hex: Option, + pub validator_class: Option, +} + +impl From for EpochPerf { + fn from(d: DomainEpochPerf) -> Self { + Self { + epoch_no: d.epoch_no, + spo_sk_hex: d.spo_sk_hex, + produced: d.produced, + expected: d.expected, + identity_label: d.identity_label, + stake_snapshot: d.stake_snapshot, + pool_id_hex: d.pool_id_hex, + validator_class: d.validator_class, + } + } +} + +/// Current epoch information. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct EpochInfo { + pub epoch_no: i64, + pub duration_seconds: i64, + pub elapsed_seconds: i64, +} + +impl From for EpochInfo { + fn from(d: DomainEpochInfo) -> Self { + Self { + epoch_no: d.epoch_no, + duration_seconds: d.duration_seconds, + elapsed_seconds: d.elapsed_seconds, + } + } +} + +/// Committee member for an epoch. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct CommitteeMember { + pub epoch_no: i64, + pub position: i32, + pub sidechain_pubkey_hex: String, + pub expected_slots: i32, + pub aura_pubkey_hex: Option, + pub pool_id_hex: Option, + pub spo_sk_hex: Option, +} + +impl From for CommitteeMember { + fn from(d: DomainCommitteeMember) -> Self { + Self { + epoch_no: d.epoch_no, + position: d.position, + sidechain_pubkey_hex: d.sidechain_pubkey_hex, + expected_slots: d.expected_slots, + aura_pubkey_hex: d.aura_pubkey_hex, + pool_id_hex: d.pool_id_hex, + spo_sk_hex: d.spo_sk_hex, + } + } +} + +/// Registration statistics for an epoch. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct RegisteredStat { + pub epoch_no: i64, + pub federated_valid_count: i64, + pub federated_invalid_count: i64, + pub registered_valid_count: i64, + pub registered_invalid_count: i64, + pub dparam: Option, +} + +impl From for RegisteredStat { + fn from(d: DomainRegisteredStat) -> Self { + Self { + epoch_no: d.epoch_no, + federated_valid_count: d.federated_valid_count, + federated_invalid_count: d.federated_invalid_count, + registered_valid_count: d.registered_valid_count, + registered_invalid_count: d.registered_invalid_count, + dparam: d.dparam, + } + } +} + +/// Cumulative registration totals for an epoch. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct RegisteredTotals { + pub epoch_no: i64, + pub total_registered: i64, + pub newly_registered: i64, +} + +impl From for RegisteredTotals { + fn from(d: DomainRegisteredTotals) -> Self { + Self { + epoch_no: d.epoch_no, + total_registered: d.total_registered, + newly_registered: d.newly_registered, + } + } +} + +/// Presence event for an SPO in an epoch. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct PresenceEvent { + pub epoch_no: i64, + pub id_key: String, + pub source: String, + pub status: Option, +} + +impl From for PresenceEvent { + fn from(d: DomainPresenceEvent) -> Self { + Self { + epoch_no: d.epoch_no, + id_key: d.id_key, + source: d.source, + status: d.status, + } + } +} + +/// First valid epoch for an SPO identity. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct FirstValidEpoch { + pub id_key: String, + pub first_valid_epoch: i64, +} + +impl From for FirstValidEpoch { + fn from(d: DomainFirstValidEpoch) -> Self { + Self { + id_key: d.id_key, + first_valid_epoch: d.first_valid_epoch, + } + } +} + +/// Stake share information for an SPO. +#[derive(SimpleObject)] +#[graphql(rename_fields = "camelCase")] +pub struct StakeShare { + pub pool_id_hex: String, + pub name: Option, + pub ticker: Option, + pub homepage_url: Option, + pub logo_url: Option, + pub live_stake: Option, + pub active_stake: Option, + pub live_delegators: Option, + pub live_saturation: Option, + pub declared_pledge: Option, + pub live_pledge: Option, + pub stake_share: Option, +} + +impl From for StakeShare { + fn from(d: DomainStakeShare) -> Self { + Self { + pool_id_hex: d.pool_id_hex, + name: d.name, + ticker: d.ticker, + homepage_url: d.homepage_url, + logo_url: d.logo_url, + live_stake: d.live_stake, + active_stake: d.active_stake, + live_delegators: d.live_delegators, + live_saturation: d.live_saturation, + declared_pledge: d.declared_pledge, + live_pledge: d.live_pledge, + stake_share: d.stake_share, + } + } +} diff --git a/indexer-api/src/infra/storage.rs b/indexer-api/src/infra/storage.rs index eb169f0f..81132bc6 100644 --- a/indexer-api/src/infra/storage.rs +++ b/indexer-api/src/infra/storage.rs @@ -16,6 +16,7 @@ mod contract_action; mod dust; mod ledger_events; mod ledger_state; +mod spo; mod system_parameters; mod transaction; mod unshielded; diff --git a/indexer-api/src/infra/storage/spo.rs b/indexer-api/src/infra/storage/spo.rs new file mode 100644 index 00000000..f6477d7e --- /dev/null +++ b/indexer-api/src/infra/storage/spo.rs @@ -0,0 +1,1133 @@ +// This file is part of midnight-indexer. +// Copyright (C) 2025 Midnight Foundation +// SPDX-License-Identifier: Apache-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ + domain::{ + spo::{ + CommitteeMember, EpochInfo, EpochPerf, FirstValidEpoch, PoolMetadata, PresenceEvent, + RegisteredStat, RegisteredTotals, Spo, SpoComposite, SpoIdentity, StakeShare, + }, + storage::spo::SpoStorage, + }, + infra::storage::Storage, +}; +use fastrace::trace; +use indoc::indoc; + +impl SpoStorage for Storage { + #[trace] + async fn get_spo_identities( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT pool_id AS pool_id_hex, + mainchain_pubkey AS mainchain_pubkey_hex, + sidechain_pubkey AS sidechain_pubkey_hex, + aura_pubkey AS aura_pubkey_hex, + 'UNKNOWN' AS validator_class + FROM spo_identity + WHERE pool_id IS NOT NULL + ORDER BY mainchain_pubkey + LIMIT $1 OFFSET $2 + "}; + + sqlx::query_as::<_, (String, String, String, Option, String)>(query) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map( + |( + pool_id_hex, + mainchain_pubkey_hex, + sidechain_pubkey_hex, + aura_pubkey_hex, + validator_class, + )| SpoIdentity { + pool_id_hex, + mainchain_pubkey_hex, + sidechain_pubkey_hex, + aura_pubkey_hex, + validator_class, + }, + ) + .collect() + }) + } + + #[trace] + async fn get_spo_identity_by_pool_id( + &self, + pool_id: &str, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT pool_id AS pool_id_hex, + mainchain_pubkey AS mainchain_pubkey_hex, + sidechain_pubkey AS sidechain_pubkey_hex, + aura_pubkey AS aura_pubkey_hex, + 'UNKNOWN' AS validator_class + FROM spo_identity + WHERE pool_id = $1 + LIMIT 1 + "}; + + sqlx::query_as::<_, (String, String, String, Option, String)>(query) + .bind(pool_id) + .fetch_optional(&*self.pool) + .await + .map(|opt| { + opt.map( + |( + pool_id_hex, + mainchain_pubkey_hex, + sidechain_pubkey_hex, + aura_pubkey_hex, + validator_class, + )| SpoIdentity { + pool_id_hex, + mainchain_pubkey_hex, + sidechain_pubkey_hex, + aura_pubkey_hex, + validator_class, + }, + ) + }) + } + + #[trace] + async fn get_spo_count(&self) -> Result { + let query = indoc! {" + SELECT COUNT(1)::BIGINT FROM spo_stake_snapshot + "}; + + sqlx::query_scalar::<_, i64>(query) + .fetch_one(&*self.pool) + .await + } + + #[trace] + async fn get_pool_metadata(&self, pool_id: &str) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT pool_id AS pool_id_hex, + hex_id AS hex_id, + name, ticker, homepage_url, url AS logo_url + FROM pool_metadata_cache + WHERE pool_id = $1 + LIMIT 1 + "}; + + sqlx::query_as::< + _, + ( + String, + Option, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(pool_id) + .fetch_optional(&*self.pool) + .await + .map(|opt| { + opt.map( + |(pool_id_hex, hex_id, name, ticker, homepage_url, logo_url)| PoolMetadata { + pool_id_hex, + hex_id, + name, + ticker, + homepage_url, + logo_url, + }, + ) + }) + } + + #[trace] + async fn get_pool_metadata_list( + &self, + limit: i64, + offset: i64, + with_name_only: bool, + ) -> Result, sqlx::Error> { + let query = if with_name_only { + indoc! {" + SELECT pool_id AS pool_id_hex, + hex_id AS hex_id, + name, ticker, homepage_url, url AS logo_url + FROM pool_metadata_cache + WHERE name IS NOT NULL OR ticker IS NOT NULL + ORDER BY pool_id + LIMIT $1 OFFSET $2 + "} + } else { + indoc! {" + SELECT pool_id AS pool_id_hex, + hex_id AS hex_id, + name, ticker, homepage_url, url AS logo_url + FROM pool_metadata_cache + ORDER BY pool_id + LIMIT $1 OFFSET $2 + "} + }; + + sqlx::query_as::< + _, + ( + String, + Option, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map( + |(pool_id_hex, hex_id, name, ticker, homepage_url, logo_url)| PoolMetadata { + pool_id_hex, + hex_id, + name, + ticker, + homepage_url, + logo_url, + }, + ) + .collect() + }) + } + + #[trace] + async fn get_spo_by_pool_id(&self, pool_id: &str) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT si.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class, + si.sidechain_pubkey AS sidechain_pubkey_hex, + si.aura_pubkey AS aura_pubkey_hex, + pm.name, pm.ticker, pm.homepage_url, pm.url AS logo_url + FROM spo_identity si + LEFT JOIN pool_metadata_cache pm ON pm.pool_id = si.pool_id + WHERE si.pool_id = $1 + LIMIT 1 + "}; + + sqlx::query_as::< + _, + ( + String, + String, + String, + Option, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(pool_id) + .fetch_optional(&*self.pool) + .await + .map(|opt| { + opt.map( + |( + pool_id_hex, + validator_class, + sidechain_pubkey_hex, + aura_pubkey_hex, + name, + ticker, + homepage_url, + logo_url, + )| Spo { + pool_id_hex, + validator_class, + sidechain_pubkey_hex, + aura_pubkey_hex, + name, + ticker, + homepage_url, + logo_url, + }, + ) + }) + } + + #[trace] + async fn get_spo_list( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + ) -> Result, sqlx::Error> { + let rows = if let Some(s) = search { + let s_like = format!("%{s}%"); + let s_hex = normalize_hex(s).unwrap_or_else(|| s.to_ascii_lowercase()); + let s_hex_like = format!("%{s_hex}%"); + + let query = indoc! {" + SELECT s.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class, + si.sidechain_pubkey AS sidechain_pubkey_hex, + si.aura_pubkey AS aura_pubkey_hex, + pm.name, pm.ticker, pm.homepage_url, pm.url AS logo_url + FROM spo_stake_snapshot s + LEFT JOIN spo_identity si ON si.pool_id = s.pool_id + LEFT JOIN pool_metadata_cache pm ON pm.pool_id = s.pool_id + WHERE ( + pm.name ILIKE $3 OR pm.ticker ILIKE $3 OR pm.homepage_url ILIKE $3 OR s.pool_id ILIKE $4 + OR si.sidechain_pubkey ILIKE $4 OR si.aura_pubkey ILIKE $4 OR si.mainchain_pubkey ILIKE $4 + ) + ORDER BY COALESCE(si.mainchain_pubkey, s.pool_id) + LIMIT $1 OFFSET $2 + "}; + + sqlx::query_as::< + _, + ( + String, + String, + String, + Option, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(limit) + .bind(offset) + .bind(s_like) + .bind(s_hex_like) + .fetch_all(&*self.pool) + .await? + } else { + let query = indoc! {" + SELECT s.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class, + si.sidechain_pubkey AS sidechain_pubkey_hex, + si.aura_pubkey AS aura_pubkey_hex, + pm.name, pm.ticker, pm.homepage_url, pm.url AS logo_url + FROM spo_stake_snapshot s + LEFT JOIN spo_identity si ON si.pool_id = s.pool_id + LEFT JOIN pool_metadata_cache pm ON pm.pool_id = s.pool_id + ORDER BY COALESCE(si.mainchain_pubkey, s.pool_id) + LIMIT $1 OFFSET $2 + "}; + + sqlx::query_as::< + _, + ( + String, + String, + String, + Option, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await? + }; + + Ok(rows + .into_iter() + .map( + |( + pool_id_hex, + validator_class, + sidechain_pubkey_hex, + aura_pubkey_hex, + name, + ticker, + homepage_url, + logo_url, + )| Spo { + pool_id_hex, + validator_class, + sidechain_pubkey_hex, + aura_pubkey_hex, + name, + ticker, + homepage_url, + logo_url, + }, + ) + .collect()) + } + + #[trace] + async fn get_spo_composite_by_pool_id( + &self, + pool_id: &str, + perf_limit: i64, + ) -> Result, sqlx::Error> { + // Get identity. + let identity = self.get_spo_identity_by_pool_id(pool_id).await?; + + // Get metadata. + let metadata = self.get_pool_metadata(pool_id).await?; + + // Get performance if identity exists. + let performance = if let Some(ref id) = identity { + self.get_spo_performance_by_spo_sk(&id.sidechain_pubkey_hex, perf_limit, 0) + .await? + } else { + vec![] + }; + + // Return None only if both identity and metadata are missing. + if identity.is_none() && metadata.is_none() { + return Ok(None); + } + + Ok(Some(SpoComposite { + identity, + metadata, + performance, + })) + } + + #[trace] + async fn get_stake_pool_operator_ids(&self, limit: i64) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT encode(sep.spo_sk,'hex') AS spo_sk_hex + FROM spo_epoch_performance sep + GROUP BY sep.spo_sk + ORDER BY MAX(sep.produced_blocks) DESC + LIMIT $1 + "}; + + sqlx::query_scalar::<_, String>(query) + .bind(limit) + .fetch_all(&*self.pool) + .await + } + + #[trace] + async fn get_spo_performance_latest( + &self, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT sep.epoch_no, + sep.spo_sk AS spo_sk_hex, + sep.produced_blocks, + sep.expected_blocks, + sep.identity_label, + NULL::TEXT AS stake_snapshot, + si.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class + FROM spo_epoch_performance sep + LEFT JOIN spo_identity si ON si.spo_sk = sep.spo_sk + ORDER BY sep.epoch_no DESC, sep.produced_blocks DESC + LIMIT $1 OFFSET $2 + "}; + + sqlx::query_as::< + _, + ( + i64, + String, + i32, + i32, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await + .map(|rows| rows.into_iter().map(epoch_perf_from_row).collect()) + } + + #[trace] + async fn get_spo_performance_by_spo_sk( + &self, + spo_sk: &str, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT sep.epoch_no, + sep.spo_sk AS spo_sk_hex, + sep.produced_blocks, + sep.expected_blocks, + sep.identity_label, + NULL::TEXT AS stake_snapshot, + si.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class + FROM spo_epoch_performance sep + LEFT JOIN spo_identity si ON si.spo_sk = sep.spo_sk + WHERE sep.spo_sk = $1 + ORDER BY sep.epoch_no DESC + LIMIT $2 OFFSET $3 + "}; + + sqlx::query_as::< + _, + ( + i64, + String, + i32, + i32, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(spo_sk) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await + .map(|rows| rows.into_iter().map(epoch_perf_from_row).collect()) + } + + #[trace] + async fn get_epoch_performance( + &self, + epoch: i64, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT sep.epoch_no, + sep.spo_sk AS spo_sk_hex, + sep.produced_blocks, + sep.expected_blocks, + sep.identity_label, + NULL::TEXT AS stake_snapshot, + si.pool_id AS pool_id_hex, + 'UNKNOWN' AS validator_class + FROM spo_epoch_performance sep + LEFT JOIN spo_identity si ON si.spo_sk = sep.spo_sk + WHERE sep.epoch_no = $1 + ORDER BY sep.produced_blocks DESC + LIMIT $2 OFFSET $3 + "}; + + sqlx::query_as::< + _, + ( + i64, + String, + i32, + i32, + Option, + Option, + Option, + Option, + ), + >(query) + .bind(epoch) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await + .map(|rows| rows.into_iter().map(epoch_perf_from_row).collect()) + } + + #[trace] + async fn get_current_epoch_info(&self) -> Result, sqlx::Error> { + let query = indoc! {" + WITH last AS ( + SELECT + epoch_no, + EXTRACT(EPOCH FROM starts_at)::BIGINT AS starts_s, + EXTRACT(EPOCH FROM ends_at)::BIGINT AS ends_s, + EXTRACT(EPOCH FROM (ends_at - starts_at))::BIGINT AS dur_s, + EXTRACT(EPOCH FROM NOW())::BIGINT AS now_s + FROM epochs + ORDER BY epoch_no DESC + LIMIT 1 + ), calc AS ( + SELECT + epoch_no, starts_s, ends_s, dur_s, now_s, + CASE WHEN ends_s > now_s THEN 0 + ELSE ((now_s - ends_s) / dur_s)::BIGINT + 1 END AS n + FROM last + ), synth AS ( + SELECT + (epoch_no + n) AS epoch_no, + dur_s AS duration_seconds, + CASE WHEN n = 0 THEN LEAST(GREATEST(now_s - starts_s, 0), dur_s) + ELSE LEAST(GREATEST(now_s - (ends_s + (n - 1) * dur_s), 0), dur_s) + END AS elapsed_seconds + FROM calc + ) + SELECT epoch_no, duration_seconds, elapsed_seconds FROM synth + "}; + + sqlx::query_as::<_, (i64, i64, i64)>(query) + .fetch_optional(&*self.pool) + .await + .map(|opt| { + opt.map(|(epoch_no, duration_seconds, elapsed_seconds)| EpochInfo { + epoch_no, + duration_seconds, + elapsed_seconds, + }) + }) + } + + #[trace] + async fn get_epoch_utilization(&self, epoch: i64) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT COALESCE( + CASE WHEN SUM(expected_blocks) > 0 + THEN SUM(produced_blocks)::DOUBLE PRECISION / SUM(expected_blocks) + ELSE 0.0 END, + 0.0) AS utilization + FROM spo_epoch_performance + WHERE epoch_no = $1 + "}; + + sqlx::query_scalar::<_, Option>(query) + .bind(epoch) + .fetch_one(&*self.pool) + .await + .map(|v| v.or(Some(0.0))) + } + + #[trace] + async fn get_committee(&self, epoch: i64) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT + cm.epoch_no, + cm.position, + cm.sidechain_pubkey AS sidechain_pubkey_hex, + cm.expected_slots, + si.aura_pubkey AS aura_pubkey_hex, + si.pool_id AS pool_id_hex, + si.spo_sk AS spo_sk_hex + FROM committee_membership cm + LEFT JOIN spo_identity si ON si.sidechain_pubkey = cm.sidechain_pubkey + WHERE cm.epoch_no = $1 + ORDER BY cm.position + "}; + + sqlx::query_as::< + _, + ( + i64, + i32, + String, + i32, + Option, + Option, + Option, + ), + >(query) + .bind(epoch) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map( + |( + epoch_no, + position, + sidechain_pubkey_hex, + expected_slots, + aura_pubkey_hex, + pool_id_hex, + spo_sk_hex, + )| CommitteeMember { + epoch_no, + position, + sidechain_pubkey_hex, + expected_slots, + aura_pubkey_hex, + pool_id_hex, + spo_sk_hex, + }, + ) + .collect() + }) + } + + #[trace] + async fn get_registered_totals_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + let start = from_epoch.min(to_epoch); + let end = to_epoch.max(from_epoch); + + let query = indoc! {" + WITH rng AS ( + SELECT generate_series($1::BIGINT, $2::BIGINT) AS epoch_no + ), + cur AS ( + SELECT s.pool_id + FROM spo_stake_snapshot s + ), + union_firsts AS ( + SELECT si.pool_id AS pool_id, MIN(sh.epoch_no)::BIGINT AS first_seen_epoch + FROM spo_history sh + LEFT JOIN spo_identity si ON si.spo_sk = sh.spo_sk + WHERE si.pool_id IS NOT NULL + GROUP BY si.pool_id + UNION ALL + SELECT si.pool_id AS pool_id, MIN(cm.epoch_no)::BIGINT AS first_seen_epoch + FROM committee_membership cm + LEFT JOIN spo_identity si ON si.sidechain_pubkey = cm.sidechain_pubkey + WHERE si.pool_id IS NOT NULL + GROUP BY si.pool_id + UNION ALL + SELECT si.pool_id AS pool_id, MIN(sep.epoch_no)::BIGINT AS first_seen_epoch + FROM spo_epoch_performance sep + LEFT JOIN spo_identity si ON si.spo_sk = sep.spo_sk + WHERE si.pool_id IS NOT NULL + GROUP BY si.pool_id + ), + firsts0 AS ( + SELECT pool_id, MIN(first_seen_epoch)::BIGINT AS first_seen_epoch + FROM union_firsts + GROUP BY pool_id + ), + firsts_cur AS ( + SELECT c.pool_id, + COALESCE(f0.first_seen_epoch, $2::BIGINT) AS first_seen_epoch + FROM cur c + LEFT JOIN firsts0 f0 ON f0.pool_id = c.pool_id + ), + agg AS ( + SELECT r.epoch_no, + COUNT(*) FILTER (WHERE fc.first_seen_epoch <= r.epoch_no) AS total_registered, + COUNT(*) FILTER (WHERE fc.first_seen_epoch = r.epoch_no) AS newly_registered + FROM rng r + CROSS JOIN firsts_cur fc + GROUP BY r.epoch_no + ) + SELECT epoch_no, total_registered, newly_registered + FROM agg + ORDER BY epoch_no + "}; + + sqlx::query_as::<_, (i64, i64, i64)>(query) + .bind(start) + .bind(end) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map( + |(epoch_no, total_registered, newly_registered)| RegisteredTotals { + epoch_no, + total_registered, + newly_registered, + }, + ) + .collect() + }) + } + + #[trace] + async fn get_registered_spo_series( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + let start = from_epoch.min(to_epoch); + let end = to_epoch.max(from_epoch); + + let query = indoc! {" + WITH rng AS ( + SELECT generate_series($1::BIGINT, $2::BIGINT) AS epoch_no + ), + hist_valid AS ( + SELECT sh.epoch_no, + COUNT(DISTINCT si.pool_id) AS cnt + FROM spo_history sh + LEFT JOIN spo_identity si ON si.spo_sk = sh.spo_sk + WHERE sh.status IN ('VALID','Valid') + AND sh.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + AND si.pool_id IS NOT NULL + GROUP BY sh.epoch_no + ), + hist_invalid AS ( + SELECT sh.epoch_no, + COUNT(DISTINCT si.pool_id) AS cnt + FROM spo_history sh + LEFT JOIN spo_identity si ON si.spo_sk = sh.spo_sk + WHERE sh.status IN ('INVALID','Invalid') + AND sh.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + AND si.pool_id IS NOT NULL + GROUP BY sh.epoch_no + ), + fed AS ( + SELECT c.epoch_no, + COUNT(DISTINCT c.sidechain_pubkey) FILTER (WHERE c.expected_slots > 0) AS federated_valid_count, + 0::BIGINT AS federated_invalid_count + FROM committee_membership c + WHERE c.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + GROUP BY c.epoch_no + ) + SELECT r.epoch_no, + COALESCE(f.federated_valid_count, 0) AS federated_valid_count, + COALESCE(f.federated_invalid_count, 0) AS federated_invalid_count, + COALESCE(hv.cnt, 0) AS registered_valid_count, + COALESCE(hi.cnt, 0) AS registered_invalid_count, + COALESCE(hv.cnt, 0)::DOUBLE PRECISION AS dparam + FROM rng r + LEFT JOIN hist_valid hv ON hv.epoch_no = r.epoch_no + LEFT JOIN hist_invalid hi ON hi.epoch_no = r.epoch_no + LEFT JOIN fed f ON f.epoch_no = r.epoch_no + ORDER BY r.epoch_no + "}; + + sqlx::query_as::<_, (i64, i64, i64, i64, i64, Option)>(query) + .bind(start) + .bind(end) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map( + |( + epoch_no, + federated_valid_count, + federated_invalid_count, + registered_valid_count, + registered_invalid_count, + dparam, + )| RegisteredStat { + epoch_no, + federated_valid_count, + federated_invalid_count, + registered_valid_count, + registered_invalid_count, + dparam, + }, + ) + .collect() + }) + } + + #[trace] + async fn get_registered_presence( + &self, + from_epoch: i64, + to_epoch: i64, + ) -> Result, sqlx::Error> { + let start = from_epoch.min(to_epoch); + let end = to_epoch.max(from_epoch); + + let query = indoc! {" + WITH history AS ( + SELECT sh.epoch_no::BIGINT AS epoch_no, + COALESCE(si.pool_id, sh.spo_sk) AS id_key, + 'history'::TEXT AS source, + sh.status::TEXT AS status + FROM spo_history sh + LEFT JOIN spo_identity si ON si.spo_sk = sh.spo_sk + WHERE sh.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + ), + committee AS ( + SELECT cm.epoch_no::BIGINT AS epoch_no, + COALESCE(si.pool_id, cm.sidechain_pubkey) AS id_key, + 'committee'::TEXT AS source, + NULL::TEXT AS status + FROM committee_membership cm + LEFT JOIN spo_identity si ON si.sidechain_pubkey = cm.sidechain_pubkey + WHERE cm.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + ), + performance AS ( + SELECT sep.epoch_no::BIGINT AS epoch_no, + COALESCE(si.pool_id, sep.spo_sk) AS id_key, + 'performance'::TEXT AS source, + NULL::TEXT AS status + FROM spo_epoch_performance sep + LEFT JOIN spo_identity si ON si.spo_sk = sep.spo_sk + WHERE sep.epoch_no BETWEEN $1::BIGINT AND $2::BIGINT + ) + SELECT epoch_no, id_key, source, status FROM history + UNION ALL + SELECT epoch_no, id_key, source, status FROM committee + UNION ALL + SELECT epoch_no, id_key, source, status FROM performance + ORDER BY epoch_no, source, id_key + "}; + + sqlx::query_as::<_, (i64, String, String, Option)>(query) + .bind(start) + .bind(end) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map(|(epoch_no, id_key, source, status)| PresenceEvent { + epoch_no, + id_key, + source, + status, + }) + .collect() + }) + } + + #[trace] + async fn get_registered_first_valid_epochs( + &self, + upto_epoch: Option, + ) -> Result, sqlx::Error> { + let query = indoc! {" + SELECT COALESCE(si.pool_id, sh.spo_sk) AS id_key, + MIN(sh.epoch_no)::BIGINT AS first_valid_epoch + FROM spo_history sh + LEFT JOIN spo_identity si ON si.spo_sk = sh.spo_sk + WHERE sh.status IN ('VALID','Valid') + AND ($1::BIGINT IS NULL OR sh.epoch_no <= $1::BIGINT) + GROUP BY 1 + ORDER BY first_valid_epoch + "}; + + sqlx::query_as::<_, (String, i64)>(query) + .bind(upto_epoch) + .fetch_all(&*self.pool) + .await + .map(|rows| { + rows.into_iter() + .map(|(id_key, first_valid_epoch)| FirstValidEpoch { + id_key, + first_valid_epoch, + }) + .collect() + }) + } + + #[trace] + async fn get_stake_distribution( + &self, + limit: i64, + offset: i64, + search: Option<&str>, + order_desc: bool, + ) -> Result<(Vec, f64), sqlx::Error> { + // First get total live stake. + let total_query = indoc! {" + SELECT COALESCE(SUM(s.live_stake), 0)::TEXT + FROM spo_stake_snapshot s + "}; + let total_live_str: String = sqlx::query_scalar(total_query) + .fetch_one(&*self.pool) + .await?; + let total_live_f64: f64 = total_live_str.parse().unwrap_or(0.0); + + // Build the main query. + let base_select = if search.is_some() { + indoc! {" + SELECT + pm.pool_id AS pool_id_hex, + pm.name, pm.ticker, pm.homepage_url, pm.url AS logo_url, + (s.live_stake)::TEXT, (s.active_stake)::TEXT, s.live_delegators, s.live_saturation, + (s.declared_pledge)::TEXT, (s.live_pledge)::TEXT + FROM spo_stake_snapshot s + JOIN pool_metadata_cache pm ON pm.pool_id = s.pool_id + WHERE ( + pm.name ILIKE $3 OR pm.ticker ILIKE $3 OR pm.homepage_url ILIKE $3 OR pm.pool_id ILIKE $4 + ) + ORDER BY COALESCE(s.live_stake, 0) DESC, pm.pool_id + LIMIT $1 OFFSET $2 + "} + } else { + indoc! {" + SELECT + pm.pool_id AS pool_id_hex, + pm.name, pm.ticker, pm.homepage_url, pm.url AS logo_url, + (s.live_stake)::TEXT, (s.active_stake)::TEXT, s.live_delegators, s.live_saturation, + (s.declared_pledge)::TEXT, (s.live_pledge)::TEXT + FROM spo_stake_snapshot s + JOIN pool_metadata_cache pm ON pm.pool_id = s.pool_id + ORDER BY COALESCE(s.live_stake, 0) DESC, pm.pool_id + LIMIT $1 OFFSET $2 + "} + }; + + let sql = if order_desc { + base_select.to_string() + } else { + base_select.replace("DESC", "ASC") + }; + + let rows = if let Some(s) = search { + let s_like = format!("%{s}%"); + sqlx::query_as::< + _, + ( + String, // pool_id_hex + Option, // name + Option, // ticker + Option, // homepage_url + Option, // logo_url + Option, // live_stake + Option, // active_stake + Option, // live_delegators + Option, // live_saturation + Option, // declared_pledge + Option, // live_pledge + ), + >(&sql) + .bind(limit) + .bind(offset) + .bind(s_like.clone()) + .bind(s_like) + .fetch_all(&*self.pool) + .await? + } else { + sqlx::query_as::< + _, + ( + String, + Option, + Option, + Option, + Option, + Option, + Option, + Option, + Option, + Option, + Option, + ), + >(&sql) + .bind(limit) + .bind(offset) + .fetch_all(&*self.pool) + .await? + }; + + let stake_shares = rows + .into_iter() + .map( + |( + pool_id_hex, + name, + ticker, + homepage_url, + logo_url, + live_stake, + active_stake, + live_delegators, + live_saturation, + declared_pledge, + live_pledge, + )| { + let share = { + let ls = live_stake.as_deref().unwrap_or("0"); + let lv = ls.parse::().unwrap_or(0.0); + if total_live_f64 > 0.0 { + lv / total_live_f64 + } else { + 0.0 + } + }; + let live_delegators_i64 = live_delegators.map(|v| v as i64); + StakeShare { + pool_id_hex, + name, + ticker, + homepage_url, + logo_url, + live_stake, + active_stake, + live_delegators: live_delegators_i64, + live_saturation, + declared_pledge, + live_pledge, + stake_share: Some(share), + } + }, + ) + .collect(); + + Ok((stake_shares, total_live_f64)) + } +} + +/// Row type for epoch performance query results. +type EpochPerfRow = ( + i64, + String, + i32, + i32, + Option, + Option, + Option, + Option, +); + +/// Helper to convert epoch performance row to domain type. +fn epoch_perf_from_row(row: EpochPerfRow) -> EpochPerf { + let ( + epoch_no, + spo_sk_hex, + produced_i32, + expected_i32, + identity_label, + stake_snapshot, + pool_id_hex, + validator_class, + ) = row; + EpochPerf { + epoch_no, + spo_sk_hex, + produced: produced_i32 as i64, + expected: expected_i32 as i64, + identity_label, + stake_snapshot, + pool_id_hex, + validator_class, + } +} + +/// Normalize hex string by stripping 0x prefix and lowercasing. +fn normalize_hex(input: &str) -> Option { + if input.is_empty() { + return None; + } + let s = input + .strip_prefix("0x") + .unwrap_or(input) + .strip_prefix("0X") + .unwrap_or(input); + if !s.len().is_multiple_of(2) || s.len() > 256 { + return None; + } + // Validate hex characters. + if !s.chars().all(|c| c.is_ascii_hexdigit()) { + return None; + } + Some(s.to_ascii_lowercase()) +}