Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use url::Url;
use {
crate::tee::{
monitor_allowed_image_hashes,
remote_attestation::{periodic_attestation_submission, submit_remote_attestation},
remote_attestation::{
monitor_attestation_removal, periodic_attestation_submission, submit_remote_attestation,
},
AllowedImageHashesFile,
},
mpc_contract::tee::proposal::MpcDockerImageHash,
Expand Down Expand Up @@ -388,10 +390,14 @@ impl StartCmd {

// Spawn periodic attestation submission task
let tx_sender_clone = indexer_api.txn_sender.clone();
let tee_authority_clone = tee_authority.clone();
tokio::spawn(async move {
if let Err(e) =
periodic_attestation_submission(tee_authority, tx_sender_clone, tls_public_key)
.await
if let Err(e) = periodic_attestation_submission(
tee_authority_clone,
tx_sender_clone,
tls_public_key,
)
.await
{
tracing::error!(
error = ?e,
Expand All @@ -400,6 +406,28 @@ impl StartCmd {
}
});

// Spawn TEE attestation monitoring task
let tx_sender_clone = indexer_api.txn_sender.clone();
let tee_accounts_receiver = indexer_api.attested_nodes_receiver.clone();
let account_id_clone = config.my_near_account_id.clone();

tokio::spawn(async move {
if let Err(e) = monitor_attestation_removal(
account_id_clone,
tee_authority,
tx_sender_clone,
tls_public_key,
tee_accounts_receiver,
)
.await
{
tracing::error!(
error = ?e,
"attestation removal monitoring task failed"
);
}
});

let coordinator = Coordinator {
clock: Clock::real(),
config_file: config,
Expand Down
9 changes: 6 additions & 3 deletions crates/node/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use self::stats::IndexerStats;
use handler::ChainBlockUpdate;
use mpc_contract::tee::proposal::MpcDockerImageHash;
use mpc_contract::tee::{proposal::MpcDockerImageHash, tee_state::NodeId};
use near_indexer_primitives::types::AccountId;
use participants::ContractState;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch};
use tokio::sync::{
Mutex, {mpsc, watch},
};
use types::ChainSendTransactionRequest;

pub mod balances;
Expand Down Expand Up @@ -75,4 +76,6 @@ pub struct IndexerAPI<TransactionSender> {
pub txn_sender: TransactionSender,
/// Watcher that keeps track of allowed [`AllowedDockerImageHash`]es on the contract.
pub allowed_docker_images_receiver: watch::Receiver<Vec<MpcDockerImageHash>>,
/// Watcher that tracks node IDs that have TEE attestations in the contract.
pub attested_nodes_receiver: watch::Receiver<Vec<NodeId>>,
}
1 change: 1 addition & 0 deletions crates/node/src/indexer/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ impl FakeIndexerManager {
)),
txn_sender: mock_transaction_sender,
allowed_docker_images_receiver,
attested_nodes_receiver: watch::channel(vec![]).1,
};
let currently_running_job_name = Arc::new(std::sync::Mutex::new("".to_string()));
let disabler = NodeDisabler {
Expand Down
9 changes: 9 additions & 0 deletions crates/node/src/indexer/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use actix::Addr;
use anyhow::bail;
use mpc_contract::state::ProtocolContractState;
use mpc_contract::tee::proposal::MpcDockerImageHash;
use mpc_contract::tee::tee_state::NodeId;
use near_client::ClientActor;
use near_client::Status;
use near_indexer_primitives::types;
Expand All @@ -15,6 +16,7 @@ use tokio::time;

const INTERVAL: Duration = Duration::from_millis(500);
const ALLOWED_IMAGE_HASHES_ENDPOINT: &str = "allowed_docker_image_hashes";
const TEE_ACCOUNTS_ENDPOINT: &str = "get_tee_accounts";
const CONTRACT_STATE_ENDPOINT: &str = "state";

pub(crate) async fn wait_for_full_sync(client: &Addr<ClientActor>) {
Expand Down Expand Up @@ -83,6 +85,13 @@ pub(crate) async fn get_mpc_allowed_image_hashes(
get_mpc_state(mpc_contract_id, client, ALLOWED_IMAGE_HASHES_ENDPOINT).await
}

pub(crate) async fn get_mpc_tee_accounts(
mpc_contract_id: AccountId,
client: &actix::Addr<near_client::ViewClientActor>,
) -> anyhow::Result<(u64, Vec<NodeId>)> {
get_mpc_state(mpc_contract_id, client, TEE_ACCOUNTS_ENDPOINT).await
}

pub(crate) async fn get_account_balance(
account_id: AccountId,
client: &actix::Addr<near_client::ViewClientActor>,
Expand Down
9 changes: 8 additions & 1 deletion crates/node/src/indexer/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{IndexerAPI, IndexerState};
use crate::config::load_listening_blocks_file;
use crate::config::{IndexerConfig, RespondConfig};
use crate::indexer::balances::monitor_balance;
use crate::indexer::tee::monitor_allowed_docker_images;
use crate::indexer::tee::{monitor_allowed_docker_images, monitor_tee_accounts};
use crate::indexer::tx_sender::{TransactionProcessorHandle, TransactionSender};
use ed25519_dalek::SigningKey;
use mpc_contract::state::ProtocolContractState;
Expand Down Expand Up @@ -55,6 +55,7 @@ pub fn spawn_real_indexer(

let (block_update_sender, block_update_receiver) = mpsc::unbounded_channel();
let (allowed_docker_images_sender, allowed_docker_images_receiver) = watch::channel(vec![]);
let (tee_accounts_sender, tee_accounts_receiver) = watch::channel(vec![]);

let my_near_account_id_clone = my_near_account_id.clone();
let respond_config_clone = respond_config.clone();
Expand Down Expand Up @@ -120,6 +121,11 @@ pub fn spawn_real_indexer(
indexer_state.clone(),
));

actix::spawn(monitor_tee_accounts(
tee_accounts_sender,
indexer_state.clone(),
));

// Returns once the contract state is available.
let contract_state_receiver = monitor_contract_state(
indexer_state.clone(),
Expand Down Expand Up @@ -178,5 +184,6 @@ pub fn spawn_real_indexer(
block_update_receiver: Arc::new(Mutex::new(block_update_receiver)),
txn_sender,
allowed_docker_images_receiver,
attested_nodes_receiver: tee_accounts_receiver,
}
}
51 changes: 50 additions & 1 deletion crates/node/src/indexer/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ use std::{sync::Arc, time::Duration};

use backon::{BackoffBuilder, ExponentialBuilder};
use mpc_contract::tee::proposal::MpcDockerImageHash;
use mpc_contract::tee::tee_state::NodeId;
use tokio::sync::watch;

use crate::indexer::{
lib::{get_mpc_allowed_image_hashes, wait_for_full_sync},
lib::{get_mpc_allowed_image_hashes, get_mpc_tee_accounts, wait_for_full_sync},
IndexerState,
};

const ALLOWED_IMAGE_HASHES_REFRESH_INTERVAL: std::time::Duration =
std::time::Duration::from_secs(1);
const MIN_BACKOFF_DURATION: Duration = Duration::from_secs(1);
const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60);
const TEE_ACCOUNTS_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);

/// This future waits for the indexer to fully sync, and returns
/// a [`watch::Receiver`] that will be continuously updated with the latest
Expand Down Expand Up @@ -71,3 +73,50 @@ pub async fn monitor_allowed_docker_images(
});
}
}

/// Fetches TEE accounts from the contract with retry logic.
async fn fetch_tee_accounts_with_retry(indexer_state: &IndexerState) -> Vec<NodeId> {
let mut backoff = ExponentialBuilder::default()
.with_min_delay(MIN_BACKOFF_DURATION)
.with_max_delay(MAX_BACKOFF_DURATION)
.without_max_times()
.with_jitter()
.build();

loop {
match get_mpc_tee_accounts(
indexer_state.mpc_contract_id.clone(),
&indexer_state.view_client,
)
.await
{
Ok((_block_height, tee_accounts)) => return tee_accounts,
Err(e) => {
tracing::error!(target: "mpc", "error reading TEE accounts from chain: {:?}", e);
let backoff_duration = backoff.next().unwrap_or(MAX_BACKOFF_DURATION);
tokio::time::sleep(backoff_duration).await;
}
}
}
}

/// Monitor TEE accounts stored in the contract and update the watch channel when changes are detected.
pub async fn monitor_tee_accounts(
sender: watch::Sender<Vec<NodeId>>,
indexer_state: Arc<IndexerState>,
) {
wait_for_full_sync(&indexer_state.client).await;

loop {
let tee_accounts = fetch_tee_accounts_with_retry(&indexer_state).await;
sender.send_if_modified(|previous_tee_accounts| {
if *previous_tee_accounts != tee_accounts {
*previous_tee_accounts = tee_accounts;
true
} else {
false
}
});
tokio::time::sleep(TEE_ACCOUNTS_REFRESH_INTERVAL).await;
}
}
Loading