diff --git a/node/CHANGELOG.md b/node/CHANGELOG.md index f613a469f0..a11531c278 100644 --- a/node/CHANGELOG.md +++ b/node/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. The format ### Added * Add `BinaryPort` interface along with the relevant config entries. * Added chainspec settings `finders_fee`, `finality_signature_proportion` and `signature_rewards_max_delay` to control behavior of the new seigniorage model. +* Isolated sync handling, which comes online with only local data and rejects peers. Useful for testing, auditing, and similar scenarios. ### Changed * All SSE events are emitted via the `/events` endpoint. None of the previous ones (`/events/main`, `/events/deploys`, and `/events/sigs`) is available any longer. diff --git a/node/src/components/network.rs b/node/src/components/network.rs index d1a96ede50..c141c660a1 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -248,6 +248,7 @@ where registry: &Registry, chain_info_source: C, validator_matrix: ValidatorMatrix, + allow_handshake: bool, ) -> Result> { let net_metrics = Arc::new(Metrics::new(registry)?); @@ -285,6 +286,7 @@ where node_key_pair.map(NodeKeyPair::new), chain_info_source.into(), &net_metrics, + allow_handshake, )); let component = Network { @@ -662,6 +664,7 @@ where | ConnectionError::TlsHandshake(_) | ConnectionError::HandshakeSend(_) | ConnectionError::HandshakeRecv(_) + | ConnectionError::HandshakeNotAllowed | ConnectionError::IncompatibleVersion(_) => None, // These errors are potential bugs on our side. @@ -1121,6 +1124,10 @@ where { type Event = Event

; + fn name(&self) -> &str { + COMPONENT_NAME + } + fn handle_event( &mut self, effect_builder: EffectBuilder, @@ -1281,10 +1288,6 @@ where }, } } - - fn name(&self) -> &str { - COMPONENT_NAME - } } impl InitializedComponent for Network diff --git a/node/src/components/network/error.rs b/node/src/components/network/error.rs index 2a641c99a3..ef7e4bdf81 100644 --- a/node/src/components/network/error.rs +++ b/node/src/components/network/error.rs @@ -196,6 +196,9 @@ pub enum ConnectionError { /// This is usually a bug. #[error("handshake sink/stream could not be reunited")] FailedToReuniteHandshakeSinkAndStream, + /// Handshake not allowed (Isolated mode) + #[error("handshake not allowed (Isolated mode)")] + HandshakeNotAllowed, } /// IO operation that can time out or close. diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index f2a637a384..f4ddcd56cc 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -236,6 +236,8 @@ where max_in_flight_demands: usize, /// Flag indicating whether this node is syncing. is_syncing: AtomicBool, + /// If false, will not allow handshake. + allow_handshake: bool, } impl NetworkContext { @@ -245,6 +247,7 @@ impl NetworkContext { node_key_pair: Option, chain_info: ChainInfo, net_metrics: &Arc, + allow_handshake: bool, ) -> Self { // Set the demand max from configuration, regarding `0` as "unlimited". let max_in_flight_demands = if cfg.max_in_flight_demands == 0 { @@ -277,6 +280,7 @@ impl NetworkContext { tarpit_chance: cfg.tarpit_chance, max_in_flight_demands, is_syncing: AtomicBool::new(false), + allow_handshake, } } @@ -363,6 +367,14 @@ where peer_consensus_public_key, is_peer_syncing: _, }) => { + if !context.allow_handshake { + return IncomingConnection::Failed { + peer_addr, + peer_id, + error: ConnectionError::HandshakeNotAllowed, + }; + } + if let Some(ref public_key) = peer_consensus_public_key { Span::current().record("consensus_key", &field::display(public_key)); } diff --git a/node/src/components/network/tests.rs b/node/src/components/network/tests.rs index b2c43aaf56..73bd10a1ab 100644 --- a/node/src/components/network/tests.rs +++ b/node/src/components/network/tests.rs @@ -21,13 +21,13 @@ use tracing::{debug, info}; use casper_types::{Chainspec, ChainspecRawBytes, SecretKey}; use super::{ - chain_info::ChainInfo, Config, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity, + chain_info::ChainInfo, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity, MessageKind, Network, Payload, }; use crate::{ components::{ gossiper::{self, GossipItem, Gossiper}, - Component, InitializedComponent, + network, Component, InitializedComponent, }, effect::{ announcements::{ControlAnnouncement, GossiperAnnouncement, PeerBehaviorAnnouncement}, @@ -39,13 +39,13 @@ use crate::{ EffectBuilder, Effects, }, protocol, - reactor::{self, EventQueueHandle, Finalize, Reactor, Runner}, + reactor::{self, main_reactor::Config, EventQueueHandle, Finalize, Reactor, Runner}, testing::{ self, init_logging, network::{NetworkedReactor, Nodes, TestingNetwork}, ConditionCheckReactor, }, - types::{NodeId, ValidatorMatrix}, + types::{NodeId, SyncHandling, ValidatorMatrix}, NodeRng, }; @@ -182,43 +182,6 @@ impl Reactor for TestReactor { type Config = Config; type Error = anyhow::Error; - fn new( - cfg: Self::Config, - _chainspec: Arc, - _chainspec_raw_bytes: Arc, - our_identity: Identity, - registry: &Registry, - _event_queue: EventQueueHandle, - rng: &mut NodeRng, - ) -> anyhow::Result<(Self, Effects)> { - let secret_key = SecretKey::random(rng); - let mut net = Network::new( - cfg, - our_identity, - None, - registry, - ChainInfo::create_for_testing(), - ValidatorMatrix::new_with_validator(Arc::new(secret_key)), - )?; - let gossiper_config = gossiper::Config::new_with_small_timeouts(); - let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( - "address_gossiper", - gossiper_config, - registry, - )?; - - net.start_initialization(); - let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()]; - - Ok(( - TestReactor { - net, - address_gossiper, - }, - effects, - )) - } - fn dispatch_event( &mut self, effect_builder: EffectBuilder, @@ -277,6 +240,45 @@ impl Reactor for TestReactor { Event::BlocklistAnnouncement(_announcement) => Effects::new(), } } + + fn new( + cfg: Self::Config, + _chainspec: Arc, + _chainspec_raw_bytes: Arc, + our_identity: Identity, + registry: &Registry, + _event_queue: EventQueueHandle, + rng: &mut NodeRng, + ) -> anyhow::Result<(Self, Effects)> { + let secret_key = SecretKey::random(rng); + let allow_handshake = cfg.node.sync_handling != SyncHandling::Isolated; + let mut net = Network::new( + cfg.network.clone(), + our_identity, + None, + registry, + ChainInfo::create_for_testing(), + ValidatorMatrix::new_with_validator(Arc::new(secret_key)), + allow_handshake, + )?; + let gossiper_config = gossiper::Config::new_with_small_timeouts(); + let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( + "address_gossiper", + gossiper_config, + registry, + )?; + + net.start_initialization(); + let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()]; + + Ok(( + TestReactor { + net, + address_gossiper, + }, + effects, + )) + } } impl NetworkedReactor for TestReactor { @@ -351,13 +353,15 @@ async fn run_two_node_network_five_times() { let mut net = TestingNetwork::new(); let start = Instant::now(); - net.add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); + net.add_node_with_config(cfg, &mut rng).await.unwrap(); + + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); let end = Instant::now(); @@ -417,12 +421,11 @@ async fn bind_to_real_network_interface() { .ip(); let port = testing::unused_port_on_localhost(); - let local_net_config = Config::new((local_addr, port).into()); + let cfg = + Config::default().with_network_config(network::Config::new((local_addr, port).into())); let mut net = TestingNetwork::::new(); - net.add_node_with_config(local_net_config, &mut rng) - .await - .unwrap(); + net.add_node_with_config(cfg, &mut rng).await.unwrap(); // The network should be fully connected. let timeout = Duration::from_secs(2); @@ -452,17 +455,16 @@ async fn check_varying_size_network_connects() { // Pick a random port in the higher ranges that is likely to be unused. let first_node_port = testing::unused_port_on_localhost(); + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); - let _ = net - .add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); + let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap(); + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); for _ in 1..number_of_nodes { - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); } @@ -506,16 +508,17 @@ async fn ensure_peers_metric_is_correct() { // Pick a random port in the higher ranges that is likely to be unused. let first_node_port = testing::unused_port_on_localhost(); - let _ = net - .add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); + + let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap(); + + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); for _ in 1..number_of_nodes { - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); } diff --git a/node/src/reactor/main_reactor.rs b/node/src/reactor/main_reactor.rs index 0cd799f49f..7ddd3b25dc 100644 --- a/node/src/reactor/main_reactor.rs +++ b/node/src/reactor/main_reactor.rs @@ -1130,6 +1130,8 @@ impl reactor::Reactor for MainReactor { registry, )?; + let allow_handshake = config.node.sync_handling != SyncHandling::Isolated; + let network = Network::new( config.network.clone(), network_identity, @@ -1137,6 +1139,7 @@ impl reactor::Reactor for MainReactor { registry, chainspec.as_ref(), validator_matrix.clone(), + allow_handshake, )?; let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( diff --git a/node/src/reactor/main_reactor/config.rs b/node/src/reactor/main_reactor/config.rs index 0432a56c63..510eddb48c 100644 --- a/node/src/reactor/main_reactor/config.rs +++ b/node/src/reactor/main_reactor/config.rs @@ -72,4 +72,11 @@ impl Config { chainspec.transaction_config.max_timestamp_leeway; } } + + /// Set network config. + #[cfg(test)] + pub(crate) fn with_network_config(mut self, network_config: NetworkConfig) -> Self { + self.network = network_config; + self + } } diff --git a/node/src/reactor/main_reactor/tests.rs b/node/src/reactor/main_reactor/tests.rs index dba0ec477c..3bfc43adf7 100644 --- a/node/src/reactor/main_reactor/tests.rs +++ b/node/src/reactor/main_reactor/tests.rs @@ -48,9 +48,9 @@ use casper_types::{ AccountConfig, AccountsConfig, ActivationPoint, AddressableEntityHash, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockV2, CLValue, Chainspec, ChainspecRawBytes, ConsensusProtocolName, Deploy, EraId, FeeHandling, Gas, HoldBalanceHandling, Key, Motes, - NextUpgrade, PricingHandling, PricingMode, ProtocolVersion, PublicKey, RefundHandling, Rewards, - SecretKey, StoredValue, SystemHashRegistry, TimeDiff, Timestamp, Transaction, TransactionHash, - TransactionV1Config, ValidatorConfig, U512, + NextUpgrade, Peers, PricingHandling, PricingMode, ProtocolVersion, PublicKey, RefundHandling, + Rewards, SecretKey, StoredValue, SystemHashRegistry, TimeDiff, Timestamp, Transaction, + TransactionHash, TransactionV1Config, ValidatorConfig, U512, }; use crate::{ @@ -1440,38 +1440,94 @@ async fn should_start_in_isolation() { }; let (mut client, finish_cranking) = setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await; - let request = BinaryRequest::Get( - InformationRequest::Uptime - .try_into() - .expect("should convert"), - ); - let header = - BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); - let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); - let original_request_bytes = header_bytes - .iter() - .chain( - ToBytes::to_bytes(&request) - .expect("should serialize") - .iter(), - ) - .cloned() - .collect::>(); + + let uptime_request_bytes = { + let request = BinaryRequest::Get( + InformationRequest::Uptime + .try_into() + .expect("should convert"), + ); + let header = + BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); + let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); + header_bytes + .iter() + .chain( + ToBytes::to_bytes(&request) + .expect("should serialize") + .iter(), + ) + .cloned() + .collect::>() + }; client - .send(BinaryMessage::new(original_request_bytes.clone())) + .send(BinaryMessage::new(uptime_request_bytes)) .await .expect("should send message"); - let response = timeout(Duration::from_secs(10), client.next()) + let response = timeout(Duration::from_secs(20), client.next()) .await - .unwrap_or_else(|err| panic!("should complete without timeout: {}", err)) + .unwrap_or_else(|err| panic!("should complete uptime request without timeout: {}", err)) .unwrap_or_else(|| panic!("should have bytes")) .unwrap_or_else(|err| panic!("should have ok response: {}", err)); let uptime: Uptime = FromBytes::from_bytes(response.payload()) .expect("Uptime should be deserializable") .0; assert!(uptime.into_inner() > 0); + let (_net, _rng) = timeout(Duration::from_secs(20), finish_cranking) + .await + .unwrap_or_else(|_| panic!("should finish cranking without timeout")); +} + +#[tokio::test] +async fn should_be_peerless_in_isolation() { + let initial_stakes = InitialStakes::Random { count: 1 }; + let spec_override = ConfigsOverride { + node_config_override: NodeConfigOverride { + sync_handling_override: Some(SyncHandling::Isolated), + }, + ..Default::default() + }; + let (mut client, finish_cranking) = + setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await; + + let peers_request_bytes = { + let request = BinaryRequest::Get( + InformationRequest::Peers + .try_into() + .expect("should convert"), + ); + let header = + BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); + let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); + header_bytes + .iter() + .chain( + ToBytes::to_bytes(&request) + .expect("should serialize") + .iter(), + ) + .cloned() + .collect::>() + }; + client + .send(BinaryMessage::new(peers_request_bytes)) + .await + .expect("should send message"); + let response = timeout(Duration::from_secs(20), client.next()) + .await + .unwrap_or_else(|err| panic!("should complete peers request without timeout: {}", err)) + .unwrap_or_else(|| panic!("should have bytes")) + .unwrap_or_else(|err| panic!("should have ok response: {}", err)); + + let peers: Peers = FromBytes::from_bytes(response.payload()) + .expect("Peers should be deserializable") + .0; + assert!( + peers.into_inner().len() == 0, + "should not have peers in isolated mode" + ); - let (_net, _rng) = timeout(Duration::from_secs(10), finish_cranking) + let (_net, _rng) = timeout(Duration::from_secs(20), finish_cranking) .await .unwrap_or_else(|_| panic!("should finish cranking without timeout")); } diff --git a/node/src/types/node_config.rs b/node/src/types/node_config.rs index 617664efd7..f0f2081a3d 100644 --- a/node/src/types/node_config.rs +++ b/node/src/types/node_config.rs @@ -10,7 +10,7 @@ const DEFAULT_SHUTDOWN_FOR_UPGRADE_TIMEOUT: &str = "2min"; const DEFAULT_UPGRADE_TIMEOUT: &str = "30sec"; /// Node sync configuration. -#[derive(DataSize, Debug, Deserialize, Serialize, Clone, Default)] +#[derive(DataSize, Debug, Deserialize, Serialize, Clone, Default, Eq, PartialEq)] #[serde(rename_all = "lowercase")] pub enum SyncHandling { /// Attempt to acquire all historical state back to genesis. diff --git a/resources/local/config.toml b/resources/local/config.toml index 64eaa48f7e..3110f23a84 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -10,6 +10,7 @@ # 'genesis' (node will attempt to acquire all block data back to genesis) # 'ttl' (node will attempt to acquire all block data to comply with time to live enforcement) # 'nosync' (node will only acquire blocks moving forward) +# 'isolated' (node will initialize without peers and will not accept peers) # note: ttl is a chainsepc configured behavior on a given network; consult the `max_ttl` chainspec setting # (it is currently ~18 hours by default on production and production-like networks but subject to change). # note: `nosync` is incompatible with validator behavior; a nosync node is prevented from participating diff --git a/resources/production/config-example.toml b/resources/production/config-example.toml index 530632f9e5..1a56bafb5c 100644 --- a/resources/production/config-example.toml +++ b/resources/production/config-example.toml @@ -8,13 +8,17 @@ # Historical sync behavior for this node. Options are: # 'genesis' (node will attempt to acquire all block data back to genesis) +# note: as time goes on, the time to sync all the way back to genesis takes progressively longer. # 'ttl' (node will attempt to acquire all block data to comply with time to live enforcement) -# 'nosync' (node will only acquire blocks moving forward) # note: ttl is a chainsepc configured behavior on a given network; consult the `max_ttl` chainspec setting # (it is currently ~18 hours by default on production and production-like networks but subject to change). +# 'nosync' (node will only acquire blocks moving forward) # note: `nosync` is incompatible with validator behavior; a nosync node is prevented from participating # in consensus / switching to validate mode. it is primarily for lightweight nodes that are # only interested in recent activity. +# 'isolated' (node will initialize without peers and will not accept peers) +# note: an isolated node will not connect to, sync with, or keep up with the network, but will respond to +# binary port, rest server, event server, and diagnostic port connections. sync_handling = 'ttl' # Idle time after which the syncing process is considered stalled.