Skip to content

Commit f6d455c

Browse files
committed
WIP
1 parent 5f51769 commit f6d455c

File tree

7 files changed

+199
-173
lines changed

7 files changed

+199
-173
lines changed

crates/node/src/cli.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,17 +406,18 @@ impl StartCmd {
406406
}
407407
});
408408

409-
// Spawn attestation removal monitoring task
409+
// Spawn TEE attestation monitoring task
410410
let tx_sender_clone = indexer_api.txn_sender.clone();
411-
let contract_state_receiver = indexer_api.contract_state_receiver.clone();
411+
let tee_accounts_receiver = indexer_api.tee_accounts_receiver.clone();
412412
let account_id_clone = config.my_near_account_id.clone();
413+
413414
tokio::spawn(async move {
414415
if let Err(e) = monitor_attestation_removal(
415416
account_id_clone,
416417
tee_authority,
417418
tx_sender_clone,
418419
tls_public_key,
419-
contract_state_receiver,
420+
tee_accounts_receiver,
420421
)
421422
.await
422423
{

crates/node/src/indexer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,6 @@ pub struct IndexerAPI<TransactionSender> {
7575
pub txn_sender: TransactionSender,
7676
/// Watcher that keeps track of allowed [`AllowedDockerImageHash`]es on the contract.
7777
pub allowed_docker_images_receiver: watch::Receiver<Vec<MpcDockerImageHash>>,
78+
/// Watcher that keeps track of TEE accounts stored in the contract.
79+
pub tee_accounts_receiver: watch::Receiver<Vec<mpc_contract::tee::tee_state::NodeId>>,
7880
}

crates/node/src/indexer/fake.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,7 @@ impl FakeIndexerManager {
765765
)),
766766
txn_sender: mock_transaction_sender,
767767
allowed_docker_images_receiver,
768+
tee_accounts_receiver: watch::channel(vec![]).1,
768769
};
769770
let currently_running_job_name = Arc::new(std::sync::Mutex::new("".to_string()));
770771
let disabler = NodeDisabler {

crates/node/src/indexer/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use actix::Addr;
22
use anyhow::bail;
33
use mpc_contract::state::ProtocolContractState;
44
use mpc_contract::tee::proposal::MpcDockerImageHash;
5+
use mpc_contract::tee::tee_state::NodeId;
56
use near_client::ClientActor;
67
use near_client::Status;
78
use near_indexer_primitives::types;
@@ -15,6 +16,7 @@ use tokio::time;
1516

1617
const INTERVAL: Duration = Duration::from_millis(500);
1718
const ALLOWED_IMAGE_HASHES_ENDPOINT: &str = "allowed_docker_image_hashes";
19+
const TEE_ACCOUNTS_ENDPOINT: &str = "get_tee_accounts";
1820
const CONTRACT_STATE_ENDPOINT: &str = "state";
1921

2022
pub(crate) async fn wait_for_full_sync(client: &Addr<ClientActor>) {
@@ -83,6 +85,13 @@ pub(crate) async fn get_mpc_allowed_image_hashes(
8385
get_mpc_state(mpc_contract_id, client, ALLOWED_IMAGE_HASHES_ENDPOINT).await
8486
}
8587

88+
pub(crate) async fn get_mpc_tee_accounts(
89+
mpc_contract_id: AccountId,
90+
client: &actix::Addr<near_client::ViewClientActor>,
91+
) -> anyhow::Result<(u64, Vec<NodeId>)> {
92+
get_mpc_state(mpc_contract_id, client, TEE_ACCOUNTS_ENDPOINT).await
93+
}
94+
8695
pub(crate) async fn get_account_balance(
8796
account_id: AccountId,
8897
client: &actix::Addr<near_client::ViewClientActor>,

crates/node/src/indexer/real.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use super::{IndexerAPI, IndexerState};
66
use crate::config::load_listening_blocks_file;
77
use crate::config::{IndexerConfig, RespondConfig};
88
use crate::indexer::balances::monitor_balance;
9-
use crate::indexer::tee::monitor_allowed_docker_images;
9+
use crate::indexer::tee::{monitor_allowed_docker_images, monitor_tee_accounts};
1010
use crate::indexer::tx_sender::{TransactionProcessorHandle, TransactionSender};
1111
use ed25519_dalek::SigningKey;
1212
use mpc_contract::state::ProtocolContractState;
@@ -55,6 +55,7 @@ pub fn spawn_real_indexer(
5555

5656
let (block_update_sender, block_update_receiver) = mpsc::unbounded_channel();
5757
let (allowed_docker_images_sender, allowed_docker_images_receiver) = watch::channel(vec![]);
58+
let (tee_accounts_sender, tee_accounts_receiver) = watch::channel(vec![]);
5859

5960
let my_near_account_id_clone = my_near_account_id.clone();
6061
let respond_config_clone = respond_config.clone();
@@ -119,6 +120,11 @@ pub fn spawn_real_indexer(
119120
allowed_docker_images_sender,
120121
indexer_state.clone(),
121122
));
123+
124+
actix::spawn(monitor_tee_accounts(
125+
tee_accounts_sender,
126+
indexer_state.clone(),
127+
));
122128

123129
// Returns once the contract state is available.
124130
let contract_state_receiver = monitor_contract_state(
@@ -178,5 +184,6 @@ pub fn spawn_real_indexer(
178184
block_update_receiver: Arc::new(Mutex::new(block_update_receiver)),
179185
txn_sender,
180186
allowed_docker_images_receiver,
187+
tee_accounts_receiver,
181188
}
182189
}

crates/node/src/indexer/tee.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ use std::{sync::Arc, time::Duration};
22

33
use backon::{BackoffBuilder, ExponentialBuilder};
44
use mpc_contract::tee::proposal::MpcDockerImageHash;
5+
use mpc_contract::tee::tee_state::NodeId;
56
use tokio::sync::watch;
67

78
use crate::indexer::{
8-
lib::{get_mpc_allowed_image_hashes, wait_for_full_sync},
9+
lib::{get_mpc_allowed_image_hashes, get_mpc_tee_accounts, wait_for_full_sync},
910
IndexerState,
1011
};
1112

1213
const ALLOWED_IMAGE_HASHES_REFRESH_INTERVAL: std::time::Duration =
1314
std::time::Duration::from_secs(1);
1415
const MIN_BACKOFF_DURATION: Duration = Duration::from_secs(1);
1516
const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(60);
17+
const TEE_ACCOUNTS_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
1618

1719
/// This future waits for the indexer to fully sync, and returns
1820
/// a [`watch::Receiver`] that will be continuously updated with the latest
@@ -71,3 +73,55 @@ pub async fn monitor_allowed_docker_images(
7173
});
7274
}
7375
}
76+
77+
/// Monitor TEE accounts stored in the contract and update the watch channel when changes are detected
78+
pub async fn monitor_tee_accounts(
79+
sender: watch::Sender<Vec<NodeId>>,
80+
indexer_state: Arc<IndexerState>,
81+
) {
82+
let fetch_tee_accounts = {
83+
let indexer_state = indexer_state.clone();
84+
async move || {
85+
let mut backoff = ExponentialBuilder::default()
86+
.with_min_delay(MIN_BACKOFF_DURATION)
87+
.with_max_delay(MAX_BACKOFF_DURATION)
88+
.without_max_times()
89+
.with_jitter()
90+
.build();
91+
92+
loop {
93+
match get_mpc_tee_accounts(
94+
indexer_state.mpc_contract_id.clone(),
95+
&indexer_state.view_client,
96+
)
97+
.await
98+
{
99+
Ok((_block_height, tee_accounts)) => {
100+
break tee_accounts;
101+
}
102+
Err(e) => {
103+
tracing::error!(target: "mpc", "error reading TEE accounts from chain: {:?}", e);
104+
105+
let backoff_duration = backoff.next().unwrap_or(MAX_BACKOFF_DURATION);
106+
tokio::time::sleep(backoff_duration).await;
107+
}
108+
}
109+
}
110+
}
111+
};
112+
113+
wait_for_full_sync(&indexer_state.client).await;
114+
115+
loop {
116+
tokio::time::sleep(TEE_ACCOUNTS_REFRESH_INTERVAL).await;
117+
let tee_accounts = fetch_tee_accounts().await;
118+
sender.send_if_modified(|previous_tee_accounts| {
119+
if *previous_tee_accounts != tee_accounts {
120+
*previous_tee_accounts = tee_accounts;
121+
true
122+
} else {
123+
false
124+
}
125+
});
126+
}
127+
}

0 commit comments

Comments
 (0)