From 605714864eea3546bd63ae82f92ce4396a6d1139 Mon Sep 17 00:00:00 2001 From: Chris Czub Date: Thu, 12 Dec 2024 23:48:50 -0500 Subject: [PATCH] First pass at client filtering --- crates/relayer-cli/src/chain_registry.rs | 39 ++-- crates/relayer/src/chain/cosmos/config.rs | 4 + crates/relayer/src/chain/penumbra/config.rs | 7 +- crates/relayer/src/config.rs | 14 +- crates/relayer/src/config/filter.rs | 187 +++++++++++++++++- crates/relayer/src/supervisor/scan.rs | 71 +++++-- tools/integration-test/src/tests/ica.rs | 4 +- tools/test-framework/src/types/single/node.rs | 1 + 8 files changed, 292 insertions(+), 35 deletions(-) diff --git a/crates/relayer-cli/src/chain_registry.rs b/crates/relayer-cli/src/chain_registry.rs index 61b1401a09..43641ca206 100644 --- a/crates/relayer-cli/src/chain_registry.rs +++ b/crates/relayer-cli/src/chain_registry.rs @@ -22,7 +22,7 @@ use ibc_relayer::{ config::{ default, dynamic_gas::DynamicGasPrice, - filter::{FilterPattern, PacketFilter}, + filter::{ClientFilter, FilterPattern, PacketFilter}, gas_multiplier::GasMultiplier, types::{MaxMsgNum, MaxTxSize, Memo, TrustThreshold}, AddressType, ChainConfig, EventSourceMode, GasPrice, @@ -36,34 +36,40 @@ use tracing::{error, trace}; const MAX_HEALTHY_QUERY_RETRIES: u8 = 5; -/// Generate packet filters from Vec and load them in a Map(chain_name -> filter). -fn construct_packet_filters(ibc_paths: Vec) -> HashMap { - let mut packet_filters: HashMap<_, Vec<_>> = HashMap::new(); +/// Generate packet and client filters from Vec and load them in a Map(chain_name -> (packet_filters, client_filters)). +fn construct_filters(ibc_paths: Vec) -> HashMap { + let mut filters: HashMap<_, (Vec<_>, Vec<_>)> = HashMap::new(); for path in ibc_paths { for channel in path.channels { let chain_1 = path.chain_1.chain_name.to_owned(); let chain_2 = path.chain_2.chain_name.to_owned(); - let filters_1 = packet_filters.entry(chain_1).or_default(); + let filters_1 = filters.entry(chain_1).or_default(); - filters_1.push(( + filters_1.0.push(( FilterPattern::Exact(channel.chain_1.port_id.clone()), FilterPattern::Exact(channel.chain_1.channel_id.clone()), )); + filters_1 + .1 + .push(FilterPattern::Exact(path.chain_1.client_id.clone())); - let filters_2 = packet_filters.entry(chain_2).or_default(); + let filters_2 = filters.entry(chain_2).or_default(); - filters_2.push(( + filters_2.0.push(( FilterPattern::Exact(channel.chain_2.port_id.clone()), FilterPattern::Exact(channel.chain_2.channel_id.clone()), )); + filters_2 + .1 + .push(FilterPattern::Exact(path.chain_2.client_id.clone())); } } - packet_filters + filters .into_iter() - .map(|(k, v)| (k, PacketFilter::allow(v))) + .map(|(k, v)| (k, (PacketFilter::allow(v.0), ClientFilter::allow(v.1)))) .collect() } @@ -72,6 +78,7 @@ async fn hermes_config( chain_data: ChainData, assets: AssetList, packet_filter: Option, + client_filter: Option, ) -> Result where GrpcQuerier: @@ -171,6 +178,7 @@ where denom: asset.base.to_owned(), }, packet_filter: packet_filter.unwrap_or_default(), + client_filter: client_filter.unwrap_or_default(), address_type: AddressType::default(), sequential_batch_tx: false, extension_options: Vec::new(), @@ -341,19 +349,24 @@ pub async fn get_configs( }) .collect(); - let mut packet_filters = construct_packet_filters(path_data); + let mut filters = construct_filters(path_data); // Construct ChainConfig let config_handles: Vec<_> = chain_data_array .into_iter() .zip(asset_lists.into_iter()) .map(|((chain_name, chain_data), (_, assets))| { - let packet_filter = packet_filters.remove(&chain_name); + let (packet_filter, client_filter) = match filters.remove(&chain_name) { + Some(filters) => (Some(filters.0), Some(filters.1)), + None => (None, None), + }; let handle = tokio::spawn(hermes_config::< GrpcHealthCheckQuerier, SimpleHermesRpcQuerier, SimpleGrpcFormatter, - >(chain_data, assets, packet_filter)); + >( + chain_data, assets, packet_filter, client_filter + )); (chain_name, handle) }) diff --git a/crates/relayer/src/chain/cosmos/config.rs b/crates/relayer/src/chain/cosmos/config.rs index 8aa79cc849..d4297fc57a 100644 --- a/crates/relayer/src/chain/cosmos/config.rs +++ b/crates/relayer/src/chain/cosmos/config.rs @@ -18,6 +18,7 @@ use crate::{ compat_mode::CompatMode, default, dynamic_gas::DynamicGasPrice, + filter::ClientFilter, gas_multiplier::GasMultiplier, types::{MaxMsgNum, MaxTxSize, Memo, TrustThreshold}, AddressType, EventSourceMode, ExtensionOption, GasPrice, GenesisRestart, PacketFilter, @@ -144,6 +145,9 @@ pub struct CosmosSdkConfig { #[serde(default)] pub packet_filter: PacketFilter, + #[serde(default)] + pub client_filter: ClientFilter, + #[serde(default)] pub dynamic_gas_price: DynamicGasPrice, diff --git a/crates/relayer/src/chain/penumbra/config.rs b/crates/relayer/src/chain/penumbra/config.rs index e20fd9de42..2a44fcaffc 100644 --- a/crates/relayer/src/chain/penumbra/config.rs +++ b/crates/relayer/src/chain/penumbra/config.rs @@ -6,8 +6,8 @@ use serde_derive::{Deserialize, Serialize}; use tendermint_rpc::Url; use crate::config::{ - compat_mode::CompatMode, default, types::TrustThreshold, EventSourceMode, GenesisRestart, - PacketFilter, RefreshRate, + compat_mode::CompatMode, default, filter::ClientFilter, types::TrustThreshold, EventSourceMode, + GenesisRestart, PacketFilter, RefreshRate, }; #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] @@ -42,6 +42,9 @@ pub struct PenumbraConfig { /// Controls which packets will be relayed. #[serde(default)] pub packet_filter: PacketFilter, + /// Controls which connections will be scanned. + #[serde(default)] + pub client_filter: ClientFilter, pub clear_interval: Option, /// How many packets to fetch at once from the chain when clearing packets #[serde(default = "default::query_packets_chunk_size")] diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index 73a4e4a35d..cf1f3327cd 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -16,6 +16,7 @@ use core::{ str::FromStr, time::Duration, }; +use filter::ClientFilter; use std::{ borrow::Cow, fs::{self, File}, @@ -42,7 +43,10 @@ use tendermint_rpc::{Url, WebSocketClientUrl}; pub use crate::config::Error as ConfigError; use crate::{ - chain::{cosmos::config::CosmosSdkConfig, penumbra::config::PenumbraConfig}, + chain::{ + cosmos::config::{self, CosmosSdkConfig}, + penumbra::config::PenumbraConfig, + }, config::types::{ics20_field_size_limit::Ics20FieldSizeLimit, TrustThreshold}, error::Error as RelayerError, extension_options::ExtensionOptionDynamicFeeTx, @@ -677,6 +681,14 @@ impl ChainConfig { } } + pub fn client_filter(&self) -> &ClientFilter { + match self { + Self::CosmosSdk(config) => &config.client_filter, + Self::Astria(config) => &config.client_filter, + Self::Penumbra(config) => &config.client_filter, + } + } + pub fn packet_filter(&self) -> &PacketFilter { match self { Self::CosmosSdk(config) => &config.packet_filter, diff --git a/crates/relayer/src/config/filter.rs b/crates/relayer/src/config/filter.rs index 64e07768f6..97341740f0 100644 --- a/crates/relayer/src/config/filter.rs +++ b/crates/relayer/src/config/filter.rs @@ -6,7 +6,7 @@ use std::{collections::HashMap, hash::Hash}; use ibc_relayer_types::{ applications::transfer::RawCoin, bigint::U256, - core::ics24_host::identifier::{ChannelId, PortId}, + core::ics24_host::identifier::{ChannelId, ClientId, PortId}, events::IbcEventType, }; use itertools::Itertools; @@ -42,14 +42,40 @@ impl PacketFilter { } } - pub fn allow(filters: Vec<(PortFilterMatch, ChannelFilterMatch)>) -> PacketFilter { + pub fn allow(channel_filters: Vec<(PortFilterMatch, ChannelFilterMatch)>) -> PacketFilter { PacketFilter::new( - ChannelPolicy::Allow(ChannelFilters::new(filters)), + ChannelPolicy::Allow(ChannelFilters::new(channel_filters)), HashMap::new(), ) } } +/// Represents all the filtering policies for clients. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ClientFilter { + #[serde(flatten)] + pub client_policy: ClientPolicy, +} + +impl Default for ClientFilter { + /// By default, allows all clients. + fn default() -> Self { + Self { + client_policy: ClientPolicy::default(), + } + } +} + +impl ClientFilter { + pub fn new(client_policy: ClientPolicy) -> Self { + Self { client_policy } + } + + pub fn allow(client_filters: Vec) -> ClientFilter { + ClientFilter::new(ClientPolicy::Allow(ClientFilters::new(client_filters))) + } +} + /// Represents the ways in which packets can be filtered. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde( @@ -231,6 +257,119 @@ impl Serialize for ChannelFilters { } } +#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ClientFilters(Vec); + +impl ClientFilters { + /// Create a new filter from the given list of client filters. + pub fn new(filters: Vec) -> Self { + Self(filters) + } + + /// Returns the number of filters. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Returns true if there are no filters, false otherwise. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Indicates whether a match for the given [`ClientId`] + /// exists in the filter policy. + pub fn matches(&self, client: &ClientId) -> bool { + self.0 + .iter() + .any(|client_filter| client_filter.matches(client)) + } + + /// Indicates whether this filter policy contains only exact patterns. + #[inline] + pub fn is_exact(&self) -> bool { + self.0.iter().all(|client_filter| client_filter.is_exact()) + } + + /// An iterator over the [`ClientId`]s that don't contain wildcards. + pub fn iter_exact(&self) -> impl Iterator { + self.0.iter().filter_map(|client_filter| { + if let FilterPattern::Exact(ref client_id) = client_filter { + Some(client_id) + } else { + None + } + }) + } +} + +impl fmt::Display for ClientFilters { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + self.0 + .iter() + .map(|client_id| format!("{client_id}")) + .join(", ") + ) + } +} + +impl Serialize for ClientFilters { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeSeq; + + struct Item<'a> { + a: &'a FilterPattern, + } + + impl<'a> Serialize for Item<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(1))?; + seq.serialize_element(self.a)?; + seq.end() + } + } + + let mut outer_seq = serializer.serialize_seq(Some(self.0.len()))?; + + for client in &self.0 { + outer_seq.serialize_element(&Item { a: client })?; + } + + outer_seq.end() + } +} + +/// Represents the ways in which clients can be filtered. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde( + rename_all = "lowercase", + tag = "policy", + content = "list", + deny_unknown_fields +)] +pub enum ClientPolicy { + /// Only scan the specified clients. + Allow(ClientFilters), + /// Scan all available clients. + AllowAll, +} + +impl Default for ClientPolicy { + /// By default, allows all clients. + fn default() -> Self { + Self::AllowAll + } +} + /// Newtype wrapper for expressing wildcard patterns compiled to a [`regex::Regex`]. #[derive(Clone, Debug)] pub struct Wildcard { @@ -288,12 +427,13 @@ impl Hash for Wildcard { } } -/// Represents a single channel to be filtered in a [`ChannelFilters`] list. +/// Represents either a single channel to be filtered in a [`ChannelFilters`] list or +/// a single client to be filtered in a [`ClientFilters`] list. #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum FilterPattern { - /// A channel specified exactly with its [`PortId`] & [`ChannelId`]. + /// A channel specified exactly with its [`PortId`] & [`ChannelId`] or a client with its [`ClientId`]. Exact(T), - /// A glob of channel(s) specified with a wildcard in either or both [`PortId`] & [`ChannelId`]. + /// A glob of channel(s) specified with a wildcard in either or both [`PortId`] & [`ChannelId`], or a wildcard channel. Wildcard(Wildcard), } @@ -358,6 +498,8 @@ where pub type PortFilterMatch = FilterPattern; /// Type alias for a [`FilterPattern`] containing a [`ChannelId`]. pub type ChannelFilterMatch = FilterPattern; +/// Type alias for a [`FilterPattern`] containing a [`ClientId`]. +pub type ClientFilterMatch = FilterPattern; impl<'de> Deserialize<'de> for PortFilterMatch { fn deserialize>(deserializer: D) -> Result { @@ -371,6 +513,12 @@ impl<'de> Deserialize<'de> for ChannelFilterMatch { } } +impl<'de> Deserialize<'de> for ClientFilterMatch { + fn deserialize>(deserializer: D) -> Result { + deserializer.deserialize_string(client::ClientFilterMatchVisitor) + } +} + pub(crate) mod port { use super::*; @@ -398,6 +546,33 @@ pub(crate) mod port { } } +pub(crate) mod client { + use super::*; + + pub struct ClientFilterMatchVisitor; + + impl<'de> de::Visitor<'de> for ClientFilterMatchVisitor { + type Value = ClientFilterMatch; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("valid ClientId or wildcard") + } + + fn visit_str(self, v: &str) -> Result { + tracing::info!("ClientFilterMatchVisitor: visit_str: {}", v); + if let Ok(client_id) = ClientId::from_str(v) { + Ok(ClientFilterMatch::Exact(client_id)) + } else { + let wildcard = v.parse().map_err(E::custom)?; + Ok(ClientFilterMatch::Wildcard(wildcard)) + } + } + + fn visit_string(self, v: String) -> Result { + self.visit_str(&v) + } + } +} pub(crate) mod channel { use super::*; diff --git a/crates/relayer/src/supervisor/scan.rs b/crates/relayer/src/supervisor/scan.rs index df371ef9f3..bd576cea85 100644 --- a/crates/relayer/src/supervisor/scan.rs +++ b/crates/relayer/src/supervisor/scan.rs @@ -1,5 +1,5 @@ use core::fmt::{Display, Error as FmtError, Formatter}; -use std::collections::BTreeMap; +use std::{borrow::Borrow as _, collections::BTreeMap}; use ibc_relayer_types::core::{ ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, @@ -27,7 +27,7 @@ use crate::{ }, client_state::IdentifiedAnyClientState, config::{ - filter::{ChannelFilters, ChannelPolicy}, + filter::{ChannelFilters, ChannelPolicy, ClientFilters, ClientPolicy}, ChainConfig, Config, }, error::Error as RelayerError, @@ -320,18 +320,21 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { let mut scan = ChainScan::new(chain_config.id().clone()); - match self.use_allow_list(chain_config) { + // TODO: hack to avoid complicating the code for now + let client_filters = self.use_client_allow_list(chain_config); + + match self.use_channel_allow_list(chain_config) { Some(spec) if self.scan_mode == ScanMode::Auto => { info!( "chain uses an allow list (without wildcards), skipping scan for fast startup" ); info!("allowed ports/channels: {}", spec); - self.query_allowed_channels(&chain, spec, &mut scan)?; + self.query_allowed_channels(&chain, spec, client_filters, &mut scan)?; } _ => { - info!("scanning chain for all clients, connections and channels"); - self.scan_all_clients(&chain, &mut scan)?; + info!("scanning chain for all clients, and channels"); + self.scan_all_clients(&chain, &mut scan, client_filters)?; } }; @@ -341,12 +344,13 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { pub fn query_allowed_channels( &mut self, chain: &Chain, - filters: &ChannelFilters, + channel_filters: &ChannelFilters, + client_filters: Option<&ClientFilters>, scan: &mut ChainScan, ) -> Result<(), Error> { info!("querying allowed channels..."); - for (port_id, channel_id) in filters.iter_exact() { + for (port_id, channel_id) in channel_filters.iter_exact() { let result = scan_allowed_channel(self.registry, chain, port_id, channel_id); match result { @@ -357,6 +361,17 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { counterparty_connection_state, client, }) => { + if let Some(client_filters) = client_filters { + let allowed_client_ids = client_filters.iter_exact().collect_vec(); + if !allowed_client_ids.contains(&&client.client_id) { + warn!( + client = %client.client_id, + "skipping client, reason: client is not allowed" + ); + continue; + } + } + let counterparty_chain_id = client.client_state.chain_id(); if let Some(counterparty_channel) = &counterparty_channel { init_telemetry( @@ -395,12 +410,28 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { Ok(()) } - pub fn scan_all_clients(&mut self, chain: &Chain, scan: &mut ChainScan) -> Result<(), Error> { - info!("scanning all clients..."); + pub fn scan_all_clients( + &mut self, + chain: &Chain, + scan: &mut ChainScan, + client_filters: Option<&ClientFilters>, + ) -> Result<(), Error> { + info!(?client_filters, "scanning filtered clients..."); let clients = query_all_clients(chain)?; for client in clients { + if let Some(client_filters) = client_filters { + let exact_client_filters = client_filters.iter_exact().collect_vec(); + if !exact_client_filters.contains(&&client.client_id) { + warn!( + client = %client.client_id, + "skipping client, reason: client is not allowed" + ); + continue; + } + } + if let Some(client_scan) = self.scan_client(chain, client)? { if self.config.telemetry.enabled { // discovery phase : query every chain, connections and channels @@ -581,7 +612,10 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { true } - fn use_allow_list<'b>(&self, chain_config: &'b ChainConfig) -> Option<&'b ChannelFilters> { + fn use_channel_allow_list<'b>( + &self, + chain_config: &'b ChainConfig, + ) -> Option<&'b ChannelFilters> { if !self.filtering_enabled() { return None; } @@ -592,6 +626,21 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } } + fn use_client_allow_list<'b>( + &self, + chain_config: &'b ChainConfig, + ) -> Option<&'b ClientFilters> { + if !self.filtering_enabled() { + return None; + } + + let policy = &chain_config.client_filter().client_policy; + match policy { + ClientPolicy::Allow(ref filters) if filters.is_exact() => Some(filters), + _ => None, + } + } + fn client_allowed(&mut self, chain: &Chain, client: &IdentifiedAnyClientState) -> bool { if !self.filtering_enabled() { return true; diff --git a/tools/integration-test/src/tests/ica.rs b/tools/integration-test/src/tests/ica.rs index 15f989e552..b2f6f6d19c 100644 --- a/tools/integration-test/src/tests/ica.rs +++ b/tools/integration-test/src/tests/ica.rs @@ -198,14 +198,14 @@ impl TestOverrides for IcaFilterTestDeny { for chain in &mut config.chains { match chain { ChainConfig::CosmosSdk(chain_config) => { - chain_config.packet_filter.channel_policy = + chain_config.packet_filter.cpolicy = ChannelPolicy::Deny(ChannelFilters::new(vec![( FilterPattern::Wildcard("ica*".parse().unwrap()), FilterPattern::Wildcard("*".parse().unwrap()), )])); } ChainConfig::Penumbra(chain_config) => { - chain_config.packet_filter.channel_policy = + chain_config.packet_filter.policy = ChannelPolicy::Deny(ChannelFilters::new(vec![( FilterPattern::Wildcard("ica*".parse().unwrap()), FilterPattern::Wildcard("*".parse().unwrap()), diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index fbd47f286a..8d5713277c 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -194,6 +194,7 @@ impl FullNode { trust_threshold: Default::default(), gas_price, packet_filter: Default::default(), + client_filter: Default::default(), address_type: chain_type.address_type(), memo_prefix: Default::default(), memo_overwrite: None,