From 5f51769049f501a2d2c8af2ef9f67439ff86f905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Fri, 3 Oct 2025 15:51:48 +0200 Subject: [PATCH 01/11] feat: re-submit attestation if node detects it has no attestation on chain Fixes #1198 --- crates/node/src/cli.rs | 35 +++- crates/node/src/tee/remote_attestation.rs | 237 +++++++++++++++++++++- 2 files changed, 261 insertions(+), 11 deletions(-) diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index 648440a41..13323a4dc 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -45,7 +45,9 @@ use url::Url; use { crate::tee::{ monitor_allowed_image_hashes, - remote_attestation::{periodic_attestation_submission, submit_remote_attestation}, + remote_attestation::{ + monitor_attestation_removal, periodic_attestation_submission, submit_remote_attestation, + }, AllowedImageHashesFile, }, mpc_contract::tee::proposal::MpcDockerImageHash, @@ -388,10 +390,14 @@ impl StartCmd { // Spawn periodic attestation submission task let tx_sender_clone = indexer_api.txn_sender.clone(); + let tee_authority_clone = tee_authority.clone(); tokio::spawn(async move { - if let Err(e) = - periodic_attestation_submission(tee_authority, tx_sender_clone, tls_public_key) - .await + if let Err(e) = periodic_attestation_submission( + tee_authority_clone, + tx_sender_clone, + tls_public_key, + ) + .await { tracing::error!( error = ?e, @@ -400,6 +406,27 @@ impl StartCmd { } }); + // Spawn attestation removal monitoring task + let tx_sender_clone = indexer_api.txn_sender.clone(); + let contract_state_receiver = indexer_api.contract_state_receiver.clone(); + let account_id_clone = config.my_near_account_id.clone(); + tokio::spawn(async move { + if let Err(e) = monitor_attestation_removal( + account_id_clone, + tee_authority, + tx_sender_clone, + tls_public_key, + contract_state_receiver, + ) + .await + { + tracing::error!( + error = ?e, + "attestation removal monitoring task failed" + ); + } + }); + let coordinator = Coordinator { clock: Clock::real(), config_file: config, diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index 212255011..7e616efa7 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -1,20 +1,24 @@ use std::time::Duration; -use anyhow::Context; -use attestation::{attestation::Attestation, report_data::ReportData}; -use backon::{BackoffBuilder, ExponentialBuilder, Retryable}; -use ed25519_dalek::VerifyingKey; -use tee_authority::tee_authority::TeeAuthority; -use tokio_util::time::FutureExt; - use crate::{ + config::ParticipantsConfig, indexer::{ + participants::ContractState, tx_sender::{TransactionSender, TransactionStatus}, types::{ChainSendTransactionRequest, SubmitParticipantInfoArgs}, }, providers::PublicKeyConversion, trait_extensions::convert_to_contract_dto::IntoDtoType, }; +use anyhow::Context; +use attestation::{attestation::Attestation, report_data::ReportData}; +use backon::{BackoffBuilder, ExponentialBuilder, Retryable}; +use ed25519_dalek::VerifyingKey; +use tee_authority::tee_authority::TeeAuthority; +use tokio_util::time::FutureExt; + +use near_sdk::AccountId; +use tokio::sync::watch; const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(10 * 60); const MIN_BACKOFF_DURATION: Duration = Duration::from_millis(100); @@ -134,6 +138,97 @@ async fn periodic_attestation_submission_with_interval( + node_account_id: &AccountId, + tee_authority: &TeeAuthority, + tx_sender: &T, + tls_public_key: VerifyingKey, +) -> anyhow::Result<()> { + let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; + let report_data = ReportData::new(tls_sdk_public_key.clone()); + let fresh_attestation = tee_authority.generate_attestation(report_data).await?; + submit_remote_attestation(tx_sender.clone(), fresh_attestation, tls_public_key).await?; + tracing::info!(%node_account_id, "successfully resubmitted attestation after state change detection"); + Ok(()) +} + +fn is_participant_in_config( + participants: &ParticipantsConfig, + node_account_id: &AccountId, +) -> bool { + participants + .participants + .iter() + .any(|p| p.near_account_id == *node_account_id) +} + +fn extract_participant_info(state: &ContractState, node_account_id: &AccountId) -> bool { + match state { + ContractState::Running(s) => is_participant_in_config(&s.participants, node_account_id), + ContractState::Initializing(s) => { + is_participant_in_config(&s.participants, node_account_id) + } + ContractState::Invalid => false, + } +} + +/// Monitors contract state changes and triggers attestation resubmission when appropriate. +/// +/// This function watches contract state transitions and resubmits attestations when +/// the node transitions from being a participant to not being a participant. +pub async fn monitor_attestation_removal( + node_account_id: AccountId, + tee_authority: TeeAuthority, + tx_sender: T, + tls_public_key: VerifyingKey, + mut contract_state_receiver: watch::Receiver, +) -> anyhow::Result<()> { + tracing::info!( + %node_account_id, + "starting attestation removal monitoring" + ); + + let initial_state = contract_state_receiver.borrow().clone(); + let initially_participant = extract_participant_info(&initial_state, &node_account_id); + + tracing::info!( + %node_account_id, + initially_participant = initially_participant, + "established initial attestation monitoring baseline" + ); + + let mut was_participant = initially_participant; + + loop { + if contract_state_receiver.changed().await.is_err() { + tracing::warn!("contract state receiver closed, stopping attestation monitoring"); + break; + } + + let current_state = contract_state_receiver.borrow().clone(); + let currently_participant = extract_participant_info(¤t_state, &node_account_id); + let participant_removed = was_participant && !currently_participant; + + if participant_removed { + tracing::warn!(%node_account_id, "detected transition from participant to non-participant, triggering attestation resubmission"); + + if let Err(error) = + resubmit_attestation(&node_account_id, &tee_authority, &tx_sender, tls_public_key) + .await + { + tracing::debug!( + ?error, + "attestation resubmission failed, will retry on next state change" + ); + } + } + + was_participant = currently_participant; + } + + Ok(()) +} + /// Allows repeatedly awaiting for something, like a `tokio::time::Interval`. pub trait Tick { async fn tick(&mut self); @@ -229,4 +324,132 @@ mod tests { assert_eq!(sender.count(), TEST_SUBMISSION_COUNT); handle.abort(); } + + #[tokio::test] + async fn test_attestation_removal_detection() { + use crate::{ + config::ParticipantInfo, + indexer::participants::{ContractRunningState, ContractState}, + primitives::ParticipantId, + }; + use mpc_contract::primitives::key_state::{EpochId, Keyset}; + use near_sdk::AccountId; + use tokio::sync::watch; + + let tee_authority = TeeAuthority::from(LocalTeeAuthorityConfig::default()); + let sender = MockSender::new(); + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + let key = SigningKey::generate(&mut rng).verifying_key(); + let account_id: AccountId = "test.near".parse().unwrap(); + + // Create initial contract state with our node as a participant + let our_participant = ParticipantInfo { + id: ParticipantId::from_raw(0), + address: "127.0.0.1".to_string(), + port: 8080, + p2p_public_key: key, + near_account_id: account_id.clone(), + }; + + let initial_participants = ParticipantsConfig { + participants: vec![our_participant.clone()], + threshold: 1, + }; + + let initial_state = ContractState::Running(ContractRunningState { + keyset: Keyset { + epoch_id: EpochId::new(1), + domains: vec![], + }, + participants: initial_participants.clone(), + resharing_state: None, + }); + + let (state_sender, state_receiver) = watch::channel(initial_state); + + // Start the monitoring task + let monitor_handle = tokio::spawn(monitor_attestation_removal( + account_id.clone(), + tee_authority, + sender.clone(), + key, + state_receiver, + )); + + // Give the monitor time to process the initial state + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Check initial submission count (should be 0 since no resubmissions yet) + let initial_count = sender.count(); + assert_eq!(initial_count, 0, "Expected no initial resubmissions"); + + // Trigger initial state processing by sending the same state again + let initial_state_copy = initial_participants.clone(); + let initial_state_trigger = ContractState::Running(ContractRunningState { + keyset: Keyset { + epoch_id: EpochId::new(1), + domains: vec![], + }, + participants: initial_state_copy, + resharing_state: None, + }); + state_sender.send_replace(initial_state_trigger); + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + + // Test 1: Participant removal (resharing scenario) + let other_participant = ParticipantInfo { + id: ParticipantId::from_raw(1), + address: "127.0.0.1".to_string(), + port: 8081, + p2p_public_key: SigningKey::generate(&mut rng).verifying_key(), + near_account_id: "other.near".parse().unwrap(), + }; + + let removed_participants = ParticipantsConfig { + participants: vec![other_participant], + threshold: 1, + }; + + // Send state where our node is no longer a participant (same epoch) + let removed_state = ContractState::Running(ContractRunningState { + keyset: Keyset { + epoch_id: EpochId::new(1), // Same epoch - participant removal + domains: vec![], + }, + participants: removed_participants.clone(), + resharing_state: None, + }); + + state_sender.send(removed_state).unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + + // Should have detected participant removal and triggered resubmission + assert!( + sender.count() >= 1, + "Expected attestation resubmission after participant removal" + ); + + // Test 2: Epoch change without participant removal should NOT trigger resubmission + let epoch_change_state = ContractState::Running(ContractRunningState { + keyset: Keyset { + epoch_id: EpochId::new(2), // New epoch - should NOT trigger resubmission + domains: vec![], + }, + participants: removed_participants, // Same participants as before + resharing_state: None, + }); + + let count_before_epoch_change = sender.count(); + state_sender.send(epoch_change_state).unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + + // Should NOT have triggered resubmission for epoch change alone + assert_eq!( + sender.count(), + count_before_epoch_change, + "Epoch change alone should not trigger attestation resubmission" + ); + + monitor_handle.abort(); + } } From adb633ec27a6f87edd4f30ad98d42f8381cc6450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Fri, 3 Oct 2025 17:36:40 +0200 Subject: [PATCH 02/11] WIP --- crates/node/src/cli.rs | 7 +- crates/node/src/indexer.rs | 2 + crates/node/src/indexer/fake.rs | 1 + crates/node/src/indexer/lib.rs | 9 + crates/node/src/indexer/real.rs | 9 +- crates/node/src/indexer/tee.rs | 56 ++++- crates/node/src/tee/remote_attestation.rs | 288 +++++++++------------- 7 files changed, 199 insertions(+), 173 deletions(-) diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index 13323a4dc..b7e4daa0f 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -406,17 +406,18 @@ impl StartCmd { } }); - // Spawn attestation removal monitoring task + // Spawn TEE attestation monitoring task let tx_sender_clone = indexer_api.txn_sender.clone(); - let contract_state_receiver = indexer_api.contract_state_receiver.clone(); + let tee_accounts_receiver = indexer_api.tee_accounts_receiver.clone(); let account_id_clone = config.my_near_account_id.clone(); + tokio::spawn(async move { if let Err(e) = monitor_attestation_removal( account_id_clone, tee_authority, tx_sender_clone, tls_public_key, - contract_state_receiver, + tee_accounts_receiver, ) .await { diff --git a/crates/node/src/indexer.rs b/crates/node/src/indexer.rs index 9980d4f04..67d54e8bc 100644 --- a/crates/node/src/indexer.rs +++ b/crates/node/src/indexer.rs @@ -75,4 +75,6 @@ pub struct IndexerAPI { pub txn_sender: TransactionSender, /// Watcher that keeps track of allowed [`AllowedDockerImageHash`]es on the contract. pub allowed_docker_images_receiver: watch::Receiver>, + /// Watcher that keeps track of TEE accounts stored in the contract. + pub tee_accounts_receiver: watch::Receiver>, } diff --git a/crates/node/src/indexer/fake.rs b/crates/node/src/indexer/fake.rs index 579801e5c..99033db0b 100644 --- a/crates/node/src/indexer/fake.rs +++ b/crates/node/src/indexer/fake.rs @@ -765,6 +765,7 @@ impl FakeIndexerManager { )), txn_sender: mock_transaction_sender, allowed_docker_images_receiver, + tee_accounts_receiver: watch::channel(vec![]).1, }; let currently_running_job_name = Arc::new(std::sync::Mutex::new("".to_string())); let disabler = NodeDisabler { diff --git a/crates/node/src/indexer/lib.rs b/crates/node/src/indexer/lib.rs index 6ae41d232..ecba53890 100644 --- a/crates/node/src/indexer/lib.rs +++ b/crates/node/src/indexer/lib.rs @@ -2,6 +2,7 @@ use actix::Addr; use anyhow::bail; use mpc_contract::state::ProtocolContractState; use mpc_contract::tee::proposal::MpcDockerImageHash; +use mpc_contract::tee::tee_state::NodeId; use near_client::ClientActor; use near_client::Status; use near_indexer_primitives::types; @@ -15,6 +16,7 @@ use tokio::time; const INTERVAL: Duration = Duration::from_millis(500); const ALLOWED_IMAGE_HASHES_ENDPOINT: &str = "allowed_docker_image_hashes"; +const TEE_ACCOUNTS_ENDPOINT: &str = "get_tee_accounts"; const CONTRACT_STATE_ENDPOINT: &str = "state"; pub(crate) async fn wait_for_full_sync(client: &Addr) { @@ -83,6 +85,13 @@ pub(crate) async fn get_mpc_allowed_image_hashes( get_mpc_state(mpc_contract_id, client, ALLOWED_IMAGE_HASHES_ENDPOINT).await } +pub(crate) async fn get_mpc_tee_accounts( + mpc_contract_id: AccountId, + client: &actix::Addr, +) -> anyhow::Result<(u64, Vec)> { + get_mpc_state(mpc_contract_id, client, TEE_ACCOUNTS_ENDPOINT).await +} + pub(crate) async fn get_account_balance( account_id: AccountId, client: &actix::Addr, diff --git a/crates/node/src/indexer/real.rs b/crates/node/src/indexer/real.rs index 52582d26d..1dc7c4377 100644 --- a/crates/node/src/indexer/real.rs +++ b/crates/node/src/indexer/real.rs @@ -6,7 +6,7 @@ use super::{IndexerAPI, IndexerState}; use crate::config::load_listening_blocks_file; use crate::config::{IndexerConfig, RespondConfig}; use crate::indexer::balances::monitor_balance; -use crate::indexer::tee::monitor_allowed_docker_images; +use crate::indexer::tee::{monitor_allowed_docker_images, monitor_tee_accounts}; use crate::indexer::tx_sender::{TransactionProcessorHandle, TransactionSender}; use ed25519_dalek::SigningKey; use mpc_contract::state::ProtocolContractState; @@ -55,6 +55,7 @@ pub fn spawn_real_indexer( let (block_update_sender, block_update_receiver) = mpsc::unbounded_channel(); let (allowed_docker_images_sender, allowed_docker_images_receiver) = watch::channel(vec![]); + let (tee_accounts_sender, tee_accounts_receiver) = watch::channel(vec![]); let my_near_account_id_clone = my_near_account_id.clone(); let respond_config_clone = respond_config.clone(); @@ -120,6 +121,11 @@ pub fn spawn_real_indexer( indexer_state.clone(), )); + actix::spawn(monitor_tee_accounts( + tee_accounts_sender, + indexer_state.clone(), + )); + // Returns once the contract state is available. let contract_state_receiver = monitor_contract_state( indexer_state.clone(), @@ -178,5 +184,6 @@ pub fn spawn_real_indexer( block_update_receiver: Arc::new(Mutex::new(block_update_receiver)), txn_sender, allowed_docker_images_receiver, + tee_accounts_receiver, } } diff --git a/crates/node/src/indexer/tee.rs b/crates/node/src/indexer/tee.rs index 76e65206d..8bb12e159 100644 --- a/crates/node/src/indexer/tee.rs +++ b/crates/node/src/indexer/tee.rs @@ -2,10 +2,11 @@ use std::{sync::Arc, time::Duration}; use backon::{BackoffBuilder, ExponentialBuilder}; use mpc_contract::tee::proposal::MpcDockerImageHash; +use mpc_contract::tee::tee_state::NodeId; use tokio::sync::watch; use crate::indexer::{ - lib::{get_mpc_allowed_image_hashes, wait_for_full_sync}, + lib::{get_mpc_allowed_image_hashes, get_mpc_tee_accounts, wait_for_full_sync}, IndexerState, }; @@ -13,6 +14,7 @@ const ALLOWED_IMAGE_HASHES_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); const MIN_BACKOFF_DURATION: Duration = Duration::from_secs(1); const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60); +const TEE_ACCOUNTS_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); /// This future waits for the indexer to fully sync, and returns /// a [`watch::Receiver`] that will be continuously updated with the latest @@ -71,3 +73,55 @@ pub async fn monitor_allowed_docker_images( }); } } + +/// Monitor TEE accounts stored in the contract and update the watch channel when changes are detected +pub async fn monitor_tee_accounts( + sender: watch::Sender>, + indexer_state: Arc, +) { + let fetch_tee_accounts = { + let indexer_state = indexer_state.clone(); + async move || { + let mut backoff = ExponentialBuilder::default() + .with_min_delay(MIN_BACKOFF_DURATION) + .with_max_delay(MAX_BACKOFF_DURATION) + .without_max_times() + .with_jitter() + .build(); + + loop { + match get_mpc_tee_accounts( + indexer_state.mpc_contract_id.clone(), + &indexer_state.view_client, + ) + .await + { + Ok((_block_height, tee_accounts)) => { + break tee_accounts; + } + Err(e) => { + tracing::error!(target: "mpc", "error reading TEE accounts from chain: {:?}", e); + + let backoff_duration = backoff.next().unwrap_or(MAX_BACKOFF_DURATION); + tokio::time::sleep(backoff_duration).await; + } + } + } + } + }; + + wait_for_full_sync(&indexer_state.client).await; + + loop { + tokio::time::sleep(TEE_ACCOUNTS_REFRESH_INTERVAL).await; + let tee_accounts = fetch_tee_accounts().await; + sender.send_if_modified(|previous_tee_accounts| { + if *previous_tee_accounts != tee_accounts { + *previous_tee_accounts = tee_accounts; + true + } else { + false + } + }); + } +} diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index 7e616efa7..e022519dc 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -1,9 +1,7 @@ use std::time::Duration; use crate::{ - config::ParticipantsConfig, indexer::{ - participants::ContractState, tx_sender::{TransactionSender, TransactionStatus}, types::{ChainSendTransactionRequest, SubmitParticipantInfoArgs}, }, @@ -17,6 +15,7 @@ use ed25519_dalek::VerifyingKey; use tee_authority::tee_authority::TeeAuthority; use tokio_util::time::FutureExt; +use mpc_contract::tee::tee_state::NodeId; use near_sdk::AccountId; use tokio::sync::watch; @@ -25,6 +24,7 @@ const MIN_BACKOFF_DURATION: Duration = Duration::from_millis(100); const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60); const MAX_RETRY_DURATION: Duration = Duration::from_secs(60 * 60 * 12); // 12 hours. const BACKOFF_FACTOR: f32 = 1.5; +const RESUBMISSION_RETRY_DELAY: Duration = Duration::from_secs(2); /// Submits a remote attestation transaction to the MPC contract, retrying with backoff until success. /// @@ -144,86 +144,105 @@ async fn resubmit_attestation( tx_sender: &T, tls_public_key: VerifyingKey, ) -> anyhow::Result<()> { - let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; - let report_data = ReportData::new(tls_sdk_public_key.clone()); - let fresh_attestation = tee_authority.generate_attestation(report_data).await?; - submit_remote_attestation(tx_sender.clone(), fresh_attestation, tls_public_key).await?; - tracing::info!(%node_account_id, "successfully resubmitted attestation after state change detection"); - Ok(()) -} + const MAX_RETRIES: usize = 3; + let mut retry_interval = tokio::time::interval(RESUBMISSION_RETRY_DELAY); -fn is_participant_in_config( - participants: &ParticipantsConfig, - node_account_id: &AccountId, -) -> bool { - participants - .participants - .iter() - .any(|p| p.near_account_id == *node_account_id) -} + for attempt in 1..=MAX_RETRIES { + let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; + let report_data = ReportData::new(tls_sdk_public_key.clone()); + let fresh_attestation = tee_authority.generate_attestation(report_data).await?; -fn extract_participant_info(state: &ContractState, node_account_id: &AccountId) -> bool { - match state { - ContractState::Running(s) => is_participant_in_config(&s.participants, node_account_id), - ContractState::Initializing(s) => { - is_participant_in_config(&s.participants, node_account_id) + match submit_remote_attestation(tx_sender.clone(), fresh_attestation, tls_public_key).await + { + Ok(_) => { + tracing::info!(%node_account_id, attempt, "successfully resubmitted attestation"); + return Ok(()); + } + Err(error) if attempt == MAX_RETRIES => { + tracing::error!(%node_account_id, %error, "attestation resubmission failed after {MAX_RETRIES} attempts"); + return Err(error); + } + Err(error) => { + tracing::warn!(%node_account_id, attempt, %error, "attestation resubmission failed, retrying"); + if attempt < MAX_RETRIES { + retry_interval.tick().await; + } + } } - ContractState::Invalid => false, } + + unreachable!() } -/// Monitors contract state changes and triggers attestation resubmission when appropriate. +/// Checks if TEE attestation is available for the given node in the TEE accounts list. +fn is_tee_attestation_available(tee_accounts: &[NodeId], node_id: &NodeId) -> bool { + tee_accounts.iter().any(|tee_node_id| { + tee_node_id.account_id == node_id.account_id + && tee_node_id.tls_public_key == node_id.tls_public_key + }) +} + +/// Monitors the contract for TEE attestation removal and triggers resubmission when needed. /// -/// This function watches contract state transitions and resubmits attestations when -/// the node transitions from being a participant to not being a participant. +/// This function watches TEE account changes in the contract and resubmits attestations when +/// the node's TEE attestation is no longer available. This covers all removal scenarios: +/// - Attestation timeout/expiration +/// - Node removal during resharing +/// - TEE validation failures pub async fn monitor_attestation_removal( node_account_id: AccountId, tee_authority: TeeAuthority, tx_sender: T, tls_public_key: VerifyingKey, - mut contract_state_receiver: watch::Receiver, + mut tee_accounts_receiver: watch::Receiver>, ) -> anyhow::Result<()> { + let node_id = NodeId { + account_id: node_account_id.clone(), + tls_public_key: near_sdk::PublicKey::from_parts( + near_sdk::CurveType::ED25519, + tls_public_key.to_bytes().to_vec(), + ) + .expect("Failed to create PublicKey from TLS public key"), + }; + tracing::info!( %node_account_id, - "starting attestation removal monitoring" + "starting TEE attestation removal monitoring" ); - let initial_state = contract_state_receiver.borrow().clone(); - let initially_participant = extract_participant_info(&initial_state, &node_account_id); + let initial_tee_accounts = tee_accounts_receiver.borrow().clone(); + let initially_available = is_tee_attestation_available(&initial_tee_accounts, &node_id); tracing::info!( %node_account_id, - initially_participant = initially_participant, - "established initial attestation monitoring baseline" + initially_available, + "initial TEE attestation status" ); - let mut was_participant = initially_participant; + let mut was_available = initially_available; - loop { - if contract_state_receiver.changed().await.is_err() { - tracing::warn!("contract state receiver closed, stopping attestation monitoring"); - break; - } + while tee_accounts_receiver.changed().await.is_ok() { + let tee_accounts = tee_accounts_receiver.borrow().clone(); + let is_available = is_tee_attestation_available(&tee_accounts, &node_id); - let current_state = contract_state_receiver.borrow().clone(); - let currently_participant = extract_participant_info(¤t_state, &node_account_id); - let participant_removed = was_participant && !currently_participant; + tracing::debug!( + %node_account_id, + is_available, + was_available, + "TEE attestation status check" + ); - if participant_removed { - tracing::warn!(%node_account_id, "detected transition from participant to non-participant, triggering attestation resubmission"); + if was_available && !is_available { + tracing::warn!( + %node_account_id, + "TEE attestation removed from contract, resubmitting" + ); - if let Err(error) = - resubmit_attestation(&node_account_id, &tee_authority, &tx_sender, tls_public_key) - .await - { - tracing::debug!( - ?error, - "attestation resubmission failed, will retry on next state change" - ); - } + resubmit_attestation(&node_account_id, &tee_authority, &tx_sender, tls_public_key) + .await?; } - was_participant = currently_participant; + was_available = is_available; } Ok(()) @@ -326,130 +345,63 @@ mod tests { } #[tokio::test] - async fn test_attestation_removal_detection() { - use crate::{ - config::ParticipantInfo, - indexer::participants::{ContractRunningState, ContractState}, - primitives::ParticipantId, - }; - use mpc_contract::primitives::key_state::{EpochId, Keyset}; - use near_sdk::AccountId; - use tokio::sync::watch; - - let tee_authority = TeeAuthority::from(LocalTeeAuthorityConfig::default()); - let sender = MockSender::new(); + async fn test_tee_attestation_removal_detection() { + let node_account_id: AccountId = "test_node.near".parse().unwrap(); let mut rng = rand::rngs::StdRng::seed_from_u64(42); - let key = SigningKey::generate(&mut rng).verifying_key(); - let account_id: AccountId = "test.near".parse().unwrap(); - - // Create initial contract state with our node as a participant - let our_participant = ParticipantInfo { - id: ParticipantId::from_raw(0), - address: "127.0.0.1".to_string(), - port: 8080, - p2p_public_key: key, - near_account_id: account_id.clone(), - }; - - let initial_participants = ParticipantsConfig { - participants: vec![our_participant.clone()], - threshold: 1, + let tls_public_key = SigningKey::generate(&mut rng).verifying_key(); + let tee_authority = TeeAuthority::from(LocalTeeAuthorityConfig::default()); + let mock_sender = MockSender::new(); + + let node_id = NodeId { + account_id: node_account_id.clone(), + tls_public_key: near_sdk::PublicKey::from_parts( + near_sdk::CurveType::ED25519, + tls_public_key.to_bytes().to_vec(), + ) + .expect("Failed to create PublicKey from TLS public key"), }; - let initial_state = ContractState::Running(ContractRunningState { - keyset: Keyset { - epoch_id: EpochId::new(1), - domains: vec![], - }, - participants: initial_participants.clone(), - resharing_state: None, - }); + // Create initial TEE accounts list including our node + let initial_tee_accounts = vec![node_id.clone()]; + let (sender, receiver) = watch::channel(initial_tee_accounts); - let (state_sender, state_receiver) = watch::channel(initial_state); - - // Start the monitoring task - let monitor_handle = tokio::spawn(monitor_attestation_removal( - account_id.clone(), + // Start monitoring task + let monitoring_task = tokio::spawn(monitor_attestation_removal( + node_account_id.clone(), tee_authority, - sender.clone(), - key, - state_receiver, + mock_sender.clone(), + tls_public_key, + receiver, )); - // Give the monitor time to process the initial state - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - - // Check initial submission count (should be 0 since no resubmissions yet) - let initial_count = sender.count(); - assert_eq!(initial_count, 0, "Expected no initial resubmissions"); - - // Trigger initial state processing by sending the same state again - let initial_state_copy = initial_participants.clone(); - let initial_state_trigger = ContractState::Running(ContractRunningState { - keyset: Keyset { - epoch_id: EpochId::new(1), - domains: vec![], - }, - participants: initial_state_copy, - resharing_state: None, - }); - state_sender.send_replace(initial_state_trigger); - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; - - // Test 1: Participant removal (resharing scenario) - let other_participant = ParticipantInfo { - id: ParticipantId::from_raw(1), - address: "127.0.0.1".to_string(), - port: 8081, - p2p_public_key: SigningKey::generate(&mut rng).verifying_key(), - near_account_id: "other.near".parse().unwrap(), - }; + // Wait for initial setup + tokio::time::sleep(Duration::from_millis(10)).await; - let removed_participants = ParticipantsConfig { - participants: vec![other_participant], - threshold: 1, - }; + // Verify no submission occurred initially (node is in TEE accounts) + assert_eq!(mock_sender.count(), 0); - // Send state where our node is no longer a participant (same epoch) - let removed_state = ContractState::Running(ContractRunningState { - keyset: Keyset { - epoch_id: EpochId::new(1), // Same epoch - participant removal - domains: vec![], - }, - participants: removed_participants.clone(), - resharing_state: None, - }); - - state_sender.send(removed_state).unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; - - // Should have detected participant removal and triggered resubmission - assert!( - sender.count() >= 1, - "Expected attestation resubmission after participant removal" - ); + // Remove the node from TEE accounts (simulate attestation removal) + let removed_tee_accounts = vec![]; // Node is no longer in TEE accounts + sender.send(removed_tee_accounts).unwrap(); - // Test 2: Epoch change without participant removal should NOT trigger resubmission - let epoch_change_state = ContractState::Running(ContractRunningState { - keyset: Keyset { - epoch_id: EpochId::new(2), // New epoch - should NOT trigger resubmission - domains: vec![], - }, - participants: removed_participants, // Same participants as before - resharing_state: None, - }); - - let count_before_epoch_change = sender.count(); - state_sender.send(epoch_change_state).unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; - - // Should NOT have triggered resubmission for epoch change alone - assert_eq!( - sender.count(), - count_before_epoch_change, - "Epoch change alone should not trigger attestation resubmission" - ); + // Wait for monitoring to detect the change + tokio::time::sleep(Duration::from_millis(50)).await; + + // Verify attestation resubmission occurred + assert_eq!(mock_sender.count(), 1); + + // Add the node back to TEE accounts + let restored_tee_accounts = vec![node_id]; + sender.send(restored_tee_accounts).unwrap(); + + // Wait for state update + tokio::time::sleep(Duration::from_millis(10)).await; + + // Verify no additional submission (node is back in TEE accounts) + assert_eq!(mock_sender.count(), 1); - monitor_handle.abort(); + // Clean up + monitoring_task.abort(); + let _ = monitoring_task.await; } } From 348e4da6435fff0d6911943c658bd1adaffdd12f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 12:46:47 +0200 Subject: [PATCH 03/11] WIP --- crates/node/src/tee/remote_attestation.rs | 53 ++++++++++------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index e022519dc..d34d768fe 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -19,12 +19,12 @@ use mpc_contract::tee::tee_state::NodeId; use near_sdk::AccountId; use tokio::sync::watch; +const ATTESTATION_RESUBMISSION_RETRY_DELAY: Duration = Duration::from_secs(2); const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(10 * 60); const MIN_BACKOFF_DURATION: Duration = Duration::from_millis(100); const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60); const MAX_RETRY_DURATION: Duration = Duration::from_secs(60 * 60 * 12); // 12 hours. const BACKOFF_FACTOR: f32 = 1.5; -const RESUBMISSION_RETRY_DELAY: Duration = Duration::from_secs(2); /// Submits a remote attestation transaction to the MPC contract, retrying with backoff until success. /// @@ -145,14 +145,18 @@ async fn resubmit_attestation( tls_public_key: VerifyingKey, ) -> anyhow::Result<()> { const MAX_RETRIES: usize = 3; - let mut retry_interval = tokio::time::interval(RESUBMISSION_RETRY_DELAY); + let mut retry_interval = tokio::time::interval(ATTESTATION_RESUBMISSION_RETRY_DELAY); + let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; + let report_data = ReportData::new(tls_sdk_public_key.clone()); + let fresh_attestation = tee_authority.generate_attestation(report_data).await?; for attempt in 1..=MAX_RETRIES { - let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; - let report_data = ReportData::new(tls_sdk_public_key.clone()); - let fresh_attestation = tee_authority.generate_attestation(report_data).await?; - - match submit_remote_attestation(tx_sender.clone(), fresh_attestation, tls_public_key).await + match submit_remote_attestation( + tx_sender.clone(), + fresh_attestation.clone(), + tls_public_key, + ) + .await { Ok(_) => { tracing::info!(%node_account_id, attempt, "successfully resubmitted attestation"); @@ -175,20 +179,18 @@ async fn resubmit_attestation( } /// Checks if TEE attestation is available for the given node in the TEE accounts list. -fn is_tee_attestation_available(tee_accounts: &[NodeId], node_id: &NodeId) -> bool { - tee_accounts.iter().any(|tee_node_id| { - tee_node_id.account_id == node_id.account_id - && tee_node_id.tls_public_key == node_id.tls_public_key - }) +fn is_node_in_contract_tee_accounts( + tee_accounts_receiver: &mut watch::Receiver>, + node_id: &NodeId, +) -> bool { + let tee_accounts = tee_accounts_receiver.borrow_and_update(); + tee_accounts.contains(node_id) } /// Monitors the contract for TEE attestation removal and triggers resubmission when needed. /// /// This function watches TEE account changes in the contract and resubmits attestations when -/// the node's TEE attestation is no longer available. This covers all removal scenarios: -/// - Attestation timeout/expiration -/// - Node removal during resharing -/// - TEE validation failures +/// the node's TEE attestation is no longer available. pub async fn monitor_attestation_removal( node_account_id: AccountId, tee_authority: TeeAuthority, @@ -202,28 +204,22 @@ pub async fn monitor_attestation_removal( near_sdk::CurveType::ED25519, tls_public_key.to_bytes().to_vec(), ) - .expect("Failed to create PublicKey from TLS public key"), + .map_err(|e| anyhow::anyhow!("Failed to create PublicKey from TLS public key: {}", e))?, }; - tracing::info!( - %node_account_id, - "starting TEE attestation removal monitoring" - ); - - let initial_tee_accounts = tee_accounts_receiver.borrow().clone(); - let initially_available = is_tee_attestation_available(&initial_tee_accounts, &node_id); + let initially_available = + is_node_in_contract_tee_accounts(&mut tee_accounts_receiver, &node_id); tracing::info!( %node_account_id, initially_available, - "initial TEE attestation status" + "starting TEE attestation removal monitoring; initial TEE attestation status" ); let mut was_available = initially_available; while tee_accounts_receiver.changed().await.is_ok() { - let tee_accounts = tee_accounts_receiver.borrow().clone(); - let is_available = is_tee_attestation_available(&tee_accounts, &node_id); + let is_available = is_node_in_contract_tee_accounts(&mut tee_accounts_receiver, &node_id); tracing::debug!( %node_account_id, @@ -358,14 +354,13 @@ mod tests { near_sdk::CurveType::ED25519, tls_public_key.to_bytes().to_vec(), ) - .expect("Failed to create PublicKey from TLS public key"), + .unwrap(), }; // Create initial TEE accounts list including our node let initial_tee_accounts = vec![node_id.clone()]; let (sender, receiver) = watch::channel(initial_tee_accounts); - // Start monitoring task let monitoring_task = tokio::spawn(monitor_attestation_removal( node_account_id.clone(), tee_authority, From 5c2cc312228ae4860f1f33e69a34044b89c62f83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 13:35:59 +0200 Subject: [PATCH 04/11] WIP --- crates/node/src/tee/remote_attestation.rs | 74 ++++++++++++++++++----- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index d34d768fe..cbdc28b52 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -286,15 +286,25 @@ mod tests { } } + /// Simulates contract behavior by automatically adding the node back to TEE accounts + /// when an attestation submission occurs, mimicking real contract response to successful submissions. + struct ContractSimulator { + sender: watch::Sender>, + node_id: NodeId, + } + + /// Mock that tracks attestation submissions and simulates contract responses. #[derive(Clone)] struct MockSender { submissions: Arc>, + contract_simulator: Arc, } impl MockSender { - fn new() -> Self { + fn new(sender: watch::Sender>, node_id: NodeId) -> Self { Self { submissions: Arc::new(Mutex::new(0)), + contract_simulator: Arc::new(ContractSimulator { sender, node_id }), } } @@ -309,6 +319,11 @@ mod tests { _: ChainSendTransactionRequest, ) -> Result<(), TransactionProcessorError> { *self.submissions.lock().unwrap() += 1; + + // Simulate contract adding the node back to TEE accounts after successful submission + let updated_tee_accounts = vec![self.contract_simulator.node_id.clone()]; + let _ = self.contract_simulator.sender.send(updated_tee_accounts); + Ok(()) } @@ -324,7 +339,17 @@ mod tests { #[tokio::test] async fn test_periodic_attestation_submission() { let tee_authority = TeeAuthority::from(LocalTeeAuthorityConfig::default()); - let sender = MockSender::new(); + + let (dummy_sender, _) = watch::channel(vec![]); + let dummy_node_id = NodeId { + account_id: "dummy.near".parse().unwrap(), + tls_public_key: near_sdk::PublicKey::from_parts( + near_sdk::CurveType::ED25519, + vec![0u8; 32], + ) + .unwrap(), + }; + let sender = MockSender::new(dummy_sender, dummy_node_id); let mut rng = rand::rngs::StdRng::seed_from_u64(42); let key = SigningKey::generate(&mut rng).verifying_key(); @@ -346,7 +371,6 @@ mod tests { let mut rng = rand::rngs::StdRng::seed_from_u64(42); let tls_public_key = SigningKey::generate(&mut rng).verifying_key(); let tee_authority = TeeAuthority::from(LocalTeeAuthorityConfig::default()); - let mock_sender = MockSender::new(); let node_id = NodeId { account_id: node_account_id.clone(), @@ -359,7 +383,10 @@ mod tests { // Create initial TEE accounts list including our node let initial_tee_accounts = vec![node_id.clone()]; - let (sender, receiver) = watch::channel(initial_tee_accounts); + let (tee_accounts_sender, receiver) = watch::channel(initial_tee_accounts); + + // Create mock sender with contract simulator built-in + let mock_sender = MockSender::new(tee_accounts_sender.clone(), node_id.clone()); let monitoring_task = tokio::spawn(monitor_attestation_removal( node_account_id.clone(), @@ -377,23 +404,40 @@ mod tests { // Remove the node from TEE accounts (simulate attestation removal) let removed_tee_accounts = vec![]; // Node is no longer in TEE accounts - sender.send(removed_tee_accounts).unwrap(); + tee_accounts_sender.send(removed_tee_accounts).unwrap(); - // Wait for monitoring to detect the change + // Wait for monitoring to detect removal and resubmit tokio::time::sleep(Duration::from_millis(50)).await; - // Verify attestation resubmission occurred - assert_eq!(mock_sender.count(), 1); + // Verify attestation resubmission occurred (monitoring detected removal) + assert_eq!( + mock_sender.count(), + 1, + "Expected exactly one resubmission when node was removed" + ); - // Add the node back to TEE accounts - let restored_tee_accounts = vec![node_id]; - sender.send(restored_tee_accounts).unwrap(); + // Wait a bit more to ensure the monitoring service has processed the automatic re-addition + tokio::time::sleep(Duration::from_millis(20)).await; - // Wait for state update - tokio::time::sleep(Duration::from_millis(10)).await; + // Verify no additional submissions occurred (node should be back in TEE accounts automatically) + assert_eq!( + mock_sender.count(), + 1, + "Expected no additional submissions after node was automatically re-added" + ); - // Verify no additional submission (node is back in TEE accounts) - assert_eq!(mock_sender.count(), 1); + // Test another removal cycle to ensure monitoring continues working + tee_accounts_sender.send(vec![]).unwrap(); + + // Wait for second detection and resubmission + tokio::time::sleep(Duration::from_millis(50)).await; + + // Verify second resubmission occurred + assert_eq!( + mock_sender.count(), + 2, + "Expected second resubmission when node was removed again" + ); // Clean up monitoring_task.abort(); From 85d2d1200a756557542e86201105ff13a1468459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 13:43:34 +0200 Subject: [PATCH 05/11] WIP --- crates/node/src/tee/remote_attestation.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index cbdc28b52..1d23ed5c0 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -426,21 +426,19 @@ mod tests { "Expected no additional submissions after node was automatically re-added" ); - // Test another removal cycle to ensure monitoring continues working - tee_accounts_sender.send(vec![]).unwrap(); + // Stop monitoring service and verify no further submissions occur + monitoring_task.abort(); + let _ = monitoring_task.await; - // Wait for second detection and resubmission - tokio::time::sleep(Duration::from_millis(50)).await; + // Wait a bit to ensure the monitoring task has fully stopped + tokio::time::sleep(Duration::from_millis(10)).await; - // Verify second resubmission occurred + // Verify the submission count remains unchanged after stopping monitoring + // (This confirms that only the monitoring service triggers resubmissions) assert_eq!( mock_sender.count(), - 2, - "Expected second resubmission when node was removed again" + 1, + "Expected submission count to remain stable after stopping monitoring service" ); - - // Clean up - monitoring_task.abort(); - let _ = monitoring_task.await; } } From e76fdd2cb0a1b9b6111aa3fc4fa42f5cc01d024e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 14:03:55 +0200 Subject: [PATCH 06/11] Cleanup --- crates/node/src/indexer.rs | 9 ++--- crates/node/src/indexer/tee.rs | 63 ++++++++++++++++------------------ 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/crates/node/src/indexer.rs b/crates/node/src/indexer.rs index 67d54e8bc..b629c557d 100644 --- a/crates/node/src/indexer.rs +++ b/crates/node/src/indexer.rs @@ -1,11 +1,12 @@ use self::stats::IndexerStats; use handler::ChainBlockUpdate; -use mpc_contract::tee::proposal::MpcDockerImageHash; +use mpc_contract::tee::{proposal::MpcDockerImageHash, tee_state::NodeId}; use near_indexer_primitives::types::AccountId; use participants::ContractState; use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{ + Mutex, {mpsc, watch}, +}; use types::ChainSendTransactionRequest; pub mod balances; @@ -76,5 +77,5 @@ pub struct IndexerAPI { /// Watcher that keeps track of allowed [`AllowedDockerImageHash`]es on the contract. pub allowed_docker_images_receiver: watch::Receiver>, /// Watcher that keeps track of TEE accounts stored in the contract. - pub tee_accounts_receiver: watch::Receiver>, + pub tee_accounts_receiver: watch::Receiver>, } diff --git a/crates/node/src/indexer/tee.rs b/crates/node/src/indexer/tee.rs index 8bb12e159..1dd5104f0 100644 --- a/crates/node/src/indexer/tee.rs +++ b/crates/node/src/indexer/tee.rs @@ -74,47 +74,41 @@ pub async fn monitor_allowed_docker_images( } } -/// Monitor TEE accounts stored in the contract and update the watch channel when changes are detected -pub async fn monitor_tee_accounts( - sender: watch::Sender>, - indexer_state: Arc, -) { - let fetch_tee_accounts = { - let indexer_state = indexer_state.clone(); - async move || { - let mut backoff = ExponentialBuilder::default() - .with_min_delay(MIN_BACKOFF_DURATION) - .with_max_delay(MAX_BACKOFF_DURATION) - .without_max_times() - .with_jitter() - .build(); - - loop { - match get_mpc_tee_accounts( - indexer_state.mpc_contract_id.clone(), - &indexer_state.view_client, - ) - .await - { - Ok((_block_height, tee_accounts)) => { - break tee_accounts; - } - Err(e) => { - tracing::error!(target: "mpc", "error reading TEE accounts from chain: {:?}", e); +/// Fetches TEE accounts from the contract with retry logic. +async fn fetch_tee_accounts_with_retry(indexer_state: &IndexerState) -> Vec { + let mut backoff = ExponentialBuilder::default() + .with_min_delay(MIN_BACKOFF_DURATION) + .with_max_delay(MAX_BACKOFF_DURATION) + .without_max_times() + .with_jitter() + .build(); - let backoff_duration = backoff.next().unwrap_or(MAX_BACKOFF_DURATION); - tokio::time::sleep(backoff_duration).await; - } - } + loop { + match get_mpc_tee_accounts( + indexer_state.mpc_contract_id.clone(), + &indexer_state.view_client, + ) + .await + { + Ok((_block_height, tee_accounts)) => return tee_accounts, + Err(e) => { + tracing::error!(target: "mpc", "error reading TEE accounts from chain: {:?}", e); + let backoff_duration = backoff.next().unwrap_or(MAX_BACKOFF_DURATION); + tokio::time::sleep(backoff_duration).await; } } - }; + } +} +/// Monitor TEE accounts stored in the contract and update the watch channel when changes are detected. +pub async fn monitor_tee_accounts( + sender: watch::Sender>, + indexer_state: Arc, +) { wait_for_full_sync(&indexer_state.client).await; loop { - tokio::time::sleep(TEE_ACCOUNTS_REFRESH_INTERVAL).await; - let tee_accounts = fetch_tee_accounts().await; + let tee_accounts = fetch_tee_accounts_with_retry(&indexer_state).await; sender.send_if_modified(|previous_tee_accounts| { if *previous_tee_accounts != tee_accounts { *previous_tee_accounts = tee_accounts; @@ -123,5 +117,6 @@ pub async fn monitor_tee_accounts( false } }); + tokio::time::sleep(TEE_ACCOUNTS_REFRESH_INTERVAL).await; } } From 0c54b4133789d3310bf95d0abdf619e8f98bcf1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 14:13:24 +0200 Subject: [PATCH 07/11] Cleanup --- crates/node/src/cli.rs | 2 +- crates/node/src/indexer.rs | 4 ++-- crates/node/src/indexer/fake.rs | 2 +- crates/node/src/indexer/real.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index c0ee2e085..03f26dd67 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -408,7 +408,7 @@ impl StartCmd { // Spawn TEE attestation monitoring task let tx_sender_clone = indexer_api.txn_sender.clone(); - let tee_accounts_receiver = indexer_api.tee_accounts_receiver.clone(); + let tee_accounts_receiver = indexer_api.attested_nodes_receiver.clone(); let account_id_clone = config.my_near_account_id.clone(); tokio::spawn(async move { diff --git a/crates/node/src/indexer.rs b/crates/node/src/indexer.rs index b629c557d..e9b02a54c 100644 --- a/crates/node/src/indexer.rs +++ b/crates/node/src/indexer.rs @@ -76,6 +76,6 @@ pub struct IndexerAPI { pub txn_sender: TransactionSender, /// Watcher that keeps track of allowed [`AllowedDockerImageHash`]es on the contract. pub allowed_docker_images_receiver: watch::Receiver>, - /// Watcher that keeps track of TEE accounts stored in the contract. - pub tee_accounts_receiver: watch::Receiver>, + /// Watcher that tracks node IDs that have TEE attestations in the contract. + pub attested_nodes_receiver: watch::Receiver>, } diff --git a/crates/node/src/indexer/fake.rs b/crates/node/src/indexer/fake.rs index 99033db0b..4fd5d454f 100644 --- a/crates/node/src/indexer/fake.rs +++ b/crates/node/src/indexer/fake.rs @@ -765,7 +765,7 @@ impl FakeIndexerManager { )), txn_sender: mock_transaction_sender, allowed_docker_images_receiver, - tee_accounts_receiver: watch::channel(vec![]).1, + attested_nodes_receiver: watch::channel(vec![]).1, }; let currently_running_job_name = Arc::new(std::sync::Mutex::new("".to_string())); let disabler = NodeDisabler { diff --git a/crates/node/src/indexer/real.rs b/crates/node/src/indexer/real.rs index 1dc7c4377..715543b36 100644 --- a/crates/node/src/indexer/real.rs +++ b/crates/node/src/indexer/real.rs @@ -184,6 +184,6 @@ pub fn spawn_real_indexer( block_update_receiver: Arc::new(Mutex::new(block_update_receiver)), txn_sender, allowed_docker_images_receiver, - tee_accounts_receiver, + attested_nodes_receiver: tee_accounts_receiver, } } From 0678db71d31d53e74e6ea8fed042886d537d4281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 14:23:27 +0200 Subject: [PATCH 08/11] Fix --- crates/node/src/tee/remote_attestation.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index 1d23ed5c0..4da98f81b 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -440,5 +440,19 @@ mod tests { 1, "Expected submission count to remain stable after stopping monitoring service" ); + + // Remove the node from TEE accounts again to verify monitoring service is truly stopped + let removed_tee_accounts = vec![]; // Node is no longer in TEE accounts + let _ = tee_accounts_sender.send(removed_tee_accounts); + + // Wait to ensure no resubmission occurs when monitoring is stopped + tokio::time::sleep(Duration::from_millis(50)).await; + + // Verify no resubmission occurred (monitoring service is stopped) + assert_eq!( + mock_sender.count(), + 1, + "Expected no resubmission when monitoring service is stopped" + ); } } From a8e9301fc25612640c62228eb2b103fa2e25204d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 15:02:19 +0200 Subject: [PATCH 09/11] Address Copilot's code review comments --- crates/node/src/tee/remote_attestation.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index 4da98f81b..9d4c5df18 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -151,6 +151,8 @@ async fn resubmit_attestation( let fresh_attestation = tee_authority.generate_attestation(report_data).await?; for attempt in 1..=MAX_RETRIES { + let is_final_attempt = attempt == MAX_RETRIES; + match submit_remote_attestation( tx_sender.clone(), fresh_attestation.clone(), @@ -162,20 +164,19 @@ async fn resubmit_attestation( tracing::info!(%node_account_id, attempt, "successfully resubmitted attestation"); return Ok(()); } - Err(error) if attempt == MAX_RETRIES => { - tracing::error!(%node_account_id, %error, "attestation resubmission failed after {MAX_RETRIES} attempts"); - return Err(error); - } Err(error) => { - tracing::warn!(%node_account_id, attempt, %error, "attestation resubmission failed, retrying"); - if attempt < MAX_RETRIES { + if is_final_attempt { + tracing::error!(%node_account_id, %error, "attestation resubmission failed after {MAX_RETRIES} attempts"); + return Err(error); + } else { + tracing::warn!(%node_account_id, attempt, %error, "attestation resubmission failed, retrying"); retry_interval.tick().await; } } } } - unreachable!() + Ok(()) // This line is unreachable but satisfies the compiler } /// Checks if TEE attestation is available for the given node in the TEE accounts list. From d9778d220d1b3c69b031a8a260cd8f8f12f9a204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Mon, 6 Oct 2025 17:03:16 +0200 Subject: [PATCH 10/11] Don't use sleep() in tests --- crates/node/src/tee/remote_attestation.rs | 57 ++++++++++++++--------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index b477b73c3..a12632b8d 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -266,6 +266,8 @@ mod tests { use tee_authority::tee_authority::{LocalTeeAuthorityConfig, TeeAuthority}; const TEST_SUBMISSION_COUNT: usize = 2; + const TEST_EXPECTED_ATTESTATION_RESUBMISSION_TIMEOUT: Duration = Duration::from_millis(100); + const TEST_VERIFY_NO_ATTESTATION_RESUBMISSION_TIMEOUT: Duration = Duration::from_millis(100); struct MockTicker { count: usize, @@ -299,6 +301,7 @@ mod tests { struct MockSender { submissions: Arc>, contract_simulator: Arc, + notify: Arc, } impl MockSender { @@ -306,12 +309,17 @@ mod tests { Self { submissions: Arc::new(Mutex::new(0)), contract_simulator: Arc::new(ContractSimulator { sender, node_id }), + notify: Arc::new(tokio::sync::Notify::new()), } } fn count(&self) -> usize { *self.submissions.lock().unwrap() } + + async fn wait_for_submission(&self) { + self.notify.notified().await; + } } impl TransactionSender for MockSender { @@ -325,6 +333,9 @@ mod tests { let updated_tee_accounts = vec![self.contract_simulator.node_id.clone()]; let _ = self.contract_simulator.sender.send(updated_tee_accounts); + // Notify that a submission occurred + self.notify.notify_one(); + Ok(()) } @@ -397,8 +408,9 @@ mod tests { receiver, )); - // Wait for initial setup - tokio::time::sleep(Duration::from_millis(10)).await; + // Yield control to allow the monitoring task to start and process initial state. + // This is preferred over sleep() as it doesn't introduce arbitrary timing delays + tokio::task::yield_now().await; // Verify no submission occurred initially (node is in TEE accounts) assert_eq!(mock_sender.count(), 0); @@ -407,35 +419,27 @@ mod tests { let removed_tee_accounts = vec![]; // Node is no longer in TEE accounts tee_accounts_sender.send(removed_tee_accounts).unwrap(); - // Wait for monitoring to detect removal and resubmit - tokio::time::sleep(Duration::from_millis(50)).await; + // Wait for the resubmission to occur (with timeout to avoid hanging) + tokio::time::timeout( + TEST_EXPECTED_ATTESTATION_RESUBMISSION_TIMEOUT, + mock_sender.wait_for_submission(), + ) + .await + .expect("Expected resubmission to occur within timeout"); - // Verify attestation resubmission occurred (monitoring detected removal) + // Verify attestation resubmission occurred and no additional submissions occurred + // (node should be back in TEE accounts automatically after resubmission) assert_eq!( mock_sender.count(), 1, "Expected exactly one resubmission when node was removed" ); - // Wait a bit more to ensure the monitoring service has processed the automatic re-addition - tokio::time::sleep(Duration::from_millis(20)).await; - - // Verify no additional submissions occurred (node should be back in TEE accounts automatically) - assert_eq!( - mock_sender.count(), - 1, - "Expected no additional submissions after node was automatically re-added" - ); - // Stop monitoring service and verify no further submissions occur monitoring_task.abort(); let _ = monitoring_task.await; - // Wait a bit to ensure the monitoring task has fully stopped - tokio::time::sleep(Duration::from_millis(10)).await; - // Verify the submission count remains unchanged after stopping monitoring - // (This confirms that only the monitoring service triggers resubmissions) assert_eq!( mock_sender.count(), 1, @@ -446,8 +450,19 @@ mod tests { let removed_tee_accounts = vec![]; // Node is no longer in TEE accounts let _ = tee_accounts_sender.send(removed_tee_accounts); - // Wait to ensure no resubmission occurs when monitoring is stopped - tokio::time::sleep(Duration::from_millis(50)).await; + // Give a brief moment to ensure no resubmission occurs when monitoring is stopped + // Since the monitoring task is stopped, we use a timeout to verify no submission happens + let timeout_result = tokio::time::timeout( + TEST_VERIFY_NO_ATTESTATION_RESUBMISSION_TIMEOUT, + mock_sender.wait_for_submission(), + ) + .await; + + // Verify the timeout occurred (no submission) + assert!( + timeout_result.is_err(), + "Expected no resubmission when monitoring service is stopped" + ); // Verify no resubmission occurred (monitoring service is stopped) assert_eq!( From 377ec8e88f1082abbf26559f3642b5f715ab3e92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20B=C4=99za?= Date: Tue, 7 Oct 2025 12:12:19 +0200 Subject: [PATCH 11/11] Address code review comments --- crates/node/src/cli.rs | 23 +++--- crates/node/src/tee/remote_attestation.rs | 94 +++-------------------- 2 files changed, 23 insertions(+), 94 deletions(-) diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index e86fcb114..a471d23ca 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -1,15 +1,12 @@ -use crate::config::{CKDConfig, PersistentSecrets, RespondConfig}; -use crate::indexer::tx_sender::TransactionSender; -use crate::providers::PublicKeyConversion; -use crate::web::{static_web_data, DebugRequest}; use crate::{ config::{ - load_config_file, BlockArgs, ConfigFile, IndexerConfig, KeygenConfig, PresignatureConfig, - SecretsConfig, SignatureConfig, SyncMode, TripleConfig, WebUIConfig, + load_config_file, BlockArgs, CKDConfig, ConfigFile, IndexerConfig, KeygenConfig, + PersistentSecrets, PresignatureConfig, RespondConfig, SecretsConfig, SignatureConfig, + SyncMode, TripleConfig, WebUIConfig, }, coordinator::Coordinator, db::SecretDB, - indexer::{real::spawn_real_indexer, IndexerAPI}, + indexer::{real::spawn_real_indexer, tx_sender::TransactionSender, IndexerAPI}, keyshare::{ compat::legacy_ecdsa_key_from_keyshares, local::LocalPermanentKeyStorageBackend, @@ -17,22 +14,23 @@ use crate::{ GcpPermanentKeyStorageConfig, KeyStorageConfig, }, p2p::testing::{generate_test_p2p_configs, PortSeed}, + providers::PublicKeyConversion, tracking::{self, start_root_task}, - web::start_web_server, + web::{start_web_server, static_web_data, DebugRequest}, }; use anyhow::{anyhow, Context}; -use attestation::attestation::Attestation; -use attestation::report_data::ReportData; +use attestation::{attestation::Attestation, report_data::ReportData}; use clap::{Args, Parser, Subcommand, ValueEnum}; use hex::FromHex; use mpc_contract::state::ProtocolContractState; use near_indexer_primitives::types::Finality; use near_sdk::AccountId; use near_time::Clock; -use std::sync::OnceLock; use std::{ path::PathBuf, + sync::OnceLock, sync::{Arc, Mutex}, + time::Duration, }; use tee_authority::tee_authority::{ DstackTeeAuthorityConfig, LocalTeeAuthorityConfig, TeeAuthority, DEFAULT_DSTACK_ENDPOINT, @@ -54,6 +52,8 @@ use { tracing::info, }; +pub const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(10 * 60); + #[derive(Parser, Debug)] #[command(name = "mpc-node")] #[command(about = "MPC Node for Near Protocol")] @@ -396,6 +396,7 @@ impl StartCmd { tee_authority_clone, tx_sender_clone, tls_public_key, + tokio::time::interval(ATTESTATION_RESUBMISSION_INTERVAL), ) .await { diff --git a/crates/node/src/tee/remote_attestation.rs b/crates/node/src/tee/remote_attestation.rs index a12632b8d..a88f8598d 100644 --- a/crates/node/src/tee/remote_attestation.rs +++ b/crates/node/src/tee/remote_attestation.rs @@ -19,8 +19,6 @@ use mpc_contract::tee::tee_state::NodeId; use near_sdk::AccountId; use tokio::sync::watch; -const ATTESTATION_RESUBMISSION_RETRY_DELAY: Duration = Duration::from_secs(2); -const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(10 * 60); const MIN_BACKOFF_DURATION: Duration = Duration::from_millis(100); const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60); const MAX_RETRY_DURATION: Duration = Duration::from_secs(60 * 60 * 12); // 12 hours. @@ -88,95 +86,22 @@ pub async fn submit_remote_attestation( .context("failed to submit attestation after multiple retry attempts")? } -/// Periodically generates and submits fresh attestations at regular intervals. -/// -/// This future runs indefinitely, generating a fresh attestation every 10 minutes -/// and submitting it to the blockchain. -pub async fn periodic_attestation_submission( - tee_authority: TeeAuthority, - tx_sender: T, - tls_public_key: VerifyingKey, -) -> anyhow::Result<()> { - periodic_attestation_submission_with_interval( - tee_authority, - tx_sender, - tls_public_key, - tokio::time::interval(ATTESTATION_RESUBMISSION_INTERVAL), - ) - .await -} - -async fn periodic_attestation_submission_with_interval( +pub async fn periodic_attestation_submission( tee_authority: TeeAuthority, tx_sender: T, tls_public_key: VerifyingKey, mut interval_ticker: I, ) -> anyhow::Result<()> { - loop { - interval_ticker.tick().await; - - let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; - let report_data = ReportData::new(tls_sdk_public_key); - let fresh_attestation = match tee_authority.generate_attestation(report_data).await { - Ok(attestation) => attestation, - Err(error) => { - tracing::error!( - ?error, - "failed to generate fresh attestation, skipping this cycle" - ); - continue; - } - }; - - match submit_remote_attestation(tx_sender.clone(), fresh_attestation, tls_public_key).await - { - Ok(()) => tracing::info!("successfully submitted fresh remote attestation"), - Err(error) => { - tracing::error!(?error, "failed to submit fresh remote attestation"); - } - } - } -} - -async fn resubmit_attestation( - node_account_id: &AccountId, - tee_authority: &TeeAuthority, - tx_sender: &T, - tls_public_key: VerifyingKey, -) -> anyhow::Result<()> { - const MAX_RETRIES: usize = 3; - let mut retry_interval = tokio::time::interval(ATTESTATION_RESUBMISSION_RETRY_DELAY); let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; - let report_data = ReportData::new(tls_sdk_public_key.clone()); + let report_data = ReportData::new(tls_sdk_public_key); let fresh_attestation = tee_authority.generate_attestation(report_data).await?; - for attempt in 1..=MAX_RETRIES { - let is_final_attempt = attempt == MAX_RETRIES; + loop { + interval_ticker.tick().await; - match submit_remote_attestation( - tx_sender.clone(), - fresh_attestation.clone(), - tls_public_key, - ) - .await - { - Ok(_) => { - tracing::info!(%node_account_id, attempt, "successfully resubmitted attestation"); - return Ok(()); - } - Err(error) => { - if is_final_attempt { - tracing::error!(%node_account_id, %error, "attestation resubmission failed after {MAX_RETRIES} attempts"); - return Err(error); - } else { - tracing::warn!(%node_account_id, attempt, %error, "attestation resubmission failed, retrying"); - retry_interval.tick().await; - } - } - } + submit_remote_attestation(tx_sender.clone(), fresh_attestation.clone(), tls_public_key) + .await?; } - - Ok(()) // This line is unreachable but satisfies the compiler } /// Checks if TEE attestation is available for the given node in the TEE accounts list. @@ -218,6 +143,9 @@ pub async fn monitor_attestation_removal( ); let mut was_available = initially_available; + let tls_sdk_public_key = tls_public_key.to_near_sdk_public_key()?; + let report_data = ReportData::new(tls_sdk_public_key); + let fresh_attestation = tee_authority.generate_attestation(report_data).await?; while tee_accounts_receiver.changed().await.is_ok() { let is_available = is_node_in_contract_tee_accounts(&mut tee_accounts_receiver, &node_id); @@ -235,7 +163,7 @@ pub async fn monitor_attestation_removal( "TEE attestation removed from contract, resubmitting" ); - resubmit_attestation(&node_account_id, &tee_authority, &tx_sender, tls_public_key) + submit_remote_attestation(tx_sender.clone(), fresh_attestation.clone(), tls_public_key) .await?; } @@ -365,7 +293,7 @@ mod tests { let mut rng = rand::rngs::StdRng::seed_from_u64(42); let key = SigningKey::generate(&mut rng).verifying_key(); - let handle = tokio::spawn(periodic_attestation_submission_with_interval( + let handle = tokio::spawn(periodic_attestation_submission( tee_authority, sender.clone(), key,