Skip to content
Open
186 changes: 111 additions & 75 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use account_state::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::clients::{Builder, ValidatorClient};
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
Expand All @@ -20,7 +19,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::note::{Note, NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;

Expand All @@ -29,16 +28,31 @@ use crate::builder::ChainState;
use crate::db::Db;
use crate::store::StoreClient;

// ACTOR NOTIFICATION
/// Converts a database result into an `ActorShutdownReason` error, logging the error on failure.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs are a bit misleading (it's an internal fn so feel free to disregard):

Suggested change
/// Converts a database result into an `ActorShutdownReason` error, logging the error on failure.
/// Converts a database result mapping the error into an `ActorShutdownReason`, logging it on failure.

fn db_query<T>(
account_id: NetworkAccountId,
result: Result<T, miden_node_db::DatabaseError>,
context: &str,
) -> Result<T, ActorShutdownReason> {
result.map_err(|err| {
tracing::error!(err = err.as_report(), account_id = %account_id, "{context}");
ActorShutdownReason::DbError(account_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this error variant include the miden_node_db::DatabaseError ?

})
}

// ACTOR REQUESTS
// ================================================================================================

/// A notification sent from an account actor to the coordinator.
pub enum ActorNotification {
/// A request sent from an account actor to the coordinator via a shared mpsc channel.
pub enum ActorRequest {
/// One or more notes failed during transaction execution and should have their attempt
/// counters incremented.
/// counters incremented. The actor waits for the coordinator to acknowledge the DB write via
/// the oneshot channel, preventing race conditions where the actor could re-select the same
/// notes before the failure is persisted.
NotesFailed {
nullifiers: Vec<Nullifier>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
/// A note script was fetched from the remote store and should be persisted to the local DB.
CacheNoteScript { script_root: Word, script: NoteScript },
Expand All @@ -49,15 +63,14 @@ pub enum ActorNotification {

/// The reason an actor has shut down.
pub enum ActorShutdownReason {
/// Occurs when an account actor detects failure in the messaging channel used by the
/// coordinator.
EventChannelClosed,
/// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore.
SemaphoreFailed(AcquireError),
/// Occurs when an account actor detects its corresponding cancellation token has been triggered
/// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate
/// graceful shutdown of actors.
Cancelled(NetworkAccountId),
/// Occurs when the actor encounters a database error it cannot recover from.
DbError(NetworkAccountId),
}

// ACCOUNT ACTOR CONFIG
Expand Down Expand Up @@ -87,8 +100,8 @@ pub struct AccountActorContext {
pub max_note_attempts: usize,
/// Database for persistent state.
pub db: Db,
/// Channel for sending notifications to the coordinator (via the builder event loop).
pub notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator (via the builder event loop).
pub request_tx: mpsc::Sender<ActorRequest>,
}

// ACCOUNT ORIGIN
Expand Down Expand Up @@ -179,7 +192,7 @@ pub struct AccountActor {
store: StoreClient,
db: Db,
mode: ActorMode,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
validator: ValidatorClient,
Expand All @@ -190,17 +203,16 @@ pub struct AccountActor {
max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
max_note_attempts: usize,
/// Channel for sending notifications to the coordinator.
notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator.
request_tx: mpsc::Sender<ActorRequest>,
}

impl AccountActor {
/// Constructs a new account actor and corresponding messaging channel with the given
/// configuration.
/// Constructs a new account actor with the given configuration.
pub fn new(
origin: AccountOrigin,
actor_context: &AccountActorContext,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
) -> Self {
let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone());
Expand All @@ -217,7 +229,7 @@ impl AccountActor {
store: actor_context.store.clone(),
db: actor_context.db.clone(),
mode: ActorMode::NoViableNotes,
event_rx,
notify,
cancel_token,
block_producer,
validator,
Expand All @@ -226,7 +238,7 @@ impl AccountActor {
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
notification_tx: actor_context.notification_tx.clone(),
request_tx: actor_context.request_tx.clone(),
}
}

Expand All @@ -237,11 +249,14 @@ impl AccountActor {

// Determine initial mode by checking DB for available notes.
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let has_notes = self
.db
.has_available_notes(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to check for available notes");
let has_notes = match db_query(
account_id,
self.db.has_available_notes(account_id, block_num, self.max_note_attempts).await,
"failed to check for available notes",
) {
Comment on lines +252 to +256
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit, feel free to disregard: A pattern similar to

self.db.has_available_notes(...).map_err(|err| map_db_err(acc_id, "failed to check notes"))

reads a bit better IMO

Ok(v) => v,
Err(reason) => return reason,
};

if has_notes {
self.mode = ActorMode::NotesAvailable;
Expand All @@ -261,28 +276,28 @@ impl AccountActor {
_ = self.cancel_token.cancelled() => {
return ActorShutdownReason::Cancelled(account_id);
}
// Handle mempool events.
event = self.event_rx.recv() => {
let Some(event) = event else {
return ActorShutdownReason::EventChannelClosed;
};
// Re-enable transaction execution if the transaction being waited on has
// been resolved (added to mempool, committed in a block, or reverted).
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
let should_wake = match event.as_ref() {
MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id,
MempoolEvent::BlockCommitted { txs, .. } => {
txs.contains(&awaited_id)
},
MempoolEvent::TransactionsReverted(tx_ids) => {
tx_ids.contains(&awaited_id)
},
};
if should_wake {
// Handle coordinator notifications. On notification, re-evaluate state from DB.
_ = self.notify.notified() => {
match self.mode {
ActorMode::TransactionInflight(awaited_id) => {
// Check DB: is the inflight tx still pending?
let resolved = match db_query(
account_id,
self.db
.is_transaction_resolved(account_id, awaited_id)
.await,
"failed to check transaction status",
) {
Ok(v) => v,
Err(reason) => return reason,
};
if resolved {
self.mode = ActorMode::NotesAvailable;
}
},
_ => {
self.mode = ActorMode::NotesAvailable;
}
} else {
self.mode = ActorMode::NotesAvailable;
}
},
// Execute transactions.
Expand All @@ -293,10 +308,13 @@ impl AccountActor {
let chain_state = self.chain_state.read().await.clone();

// Query DB for latest account and available notes.
let tx_candidate = self.select_candidate_from_db(
let tx_candidate = match self.select_candidate_from_db(
account_id,
chain_state,
).await;
).await {
Ok(candidate) => candidate,
Err(shutdown_reason) => return shutdown_reason,
};

if let Some(tx_candidate) = tx_candidate {
self.execute_transactions(account_id, tx_candidate).await;
Expand All @@ -319,30 +337,32 @@ impl AccountActor {
&self,
account_id: NetworkAccountId,
chain_state: ChainState,
) -> Option<TransactionCandidate> {
) -> Result<Option<TransactionCandidate>, ActorShutdownReason> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.max_notes_per_tx.get();

let (latest_account, notes) = self
.db
.select_candidate(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to query DB for candidate");
let (latest_account, notes) = db_query(
account_id,
self.db.select_candidate(account_id, block_num, self.max_note_attempts).await,
"failed to query DB for transaction candidate",
)?;

let account = latest_account?;
let Some(account) = latest_account else {
return Ok(None);
};

let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
if notes.is_empty() {
return None;
return Ok(None);
}

let (chain_tip_header, chain_mmr) = chain_state.into_parts();
Some(TransactionCandidate {
Ok(Some(TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
})
}))
}

/// Execute a transaction candidate and mark notes as failed as required.
Expand All @@ -369,17 +389,13 @@ impl AccountActor {
let notes = tx_candidate.notes.clone();
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
// Execution completed without failed notes.
Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => {
self.cache_note_scripts(scripts_to_cache).await;
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Execution completed with some failed notes.
Ok((tx_id, failed, scripts_to_cache)) => {
self.cache_note_scripts(scripts_to_cache).await;
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
if !failed.is_empty() {
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
}
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Transaction execution failed.
Expand All @@ -395,25 +411,45 @@ impl AccountActor {
}
}

/// Sends notifications to the coordinator to cache note scripts fetched from the remote store.
/// Sends requests to the coordinator to cache note scripts fetched from the remote store.
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
let _ = self
.notification_tx
.send(ActorNotification::CacheNoteScript { script_root, script })
.await;
if self
.request_tx
.send(ActorRequest::CacheNoteScript { script_root, script })
.await
.is_err()
{
tracing::warn!(
"failed to send cache note script request, coordinator is shutting down"
);
break;
}
}
}

/// Sends a notification to the coordinator to mark notes as failed.
/// Sends a request to the coordinator to mark notes as failed and waits for the DB write to
/// complete. This prevents a race condition where the actor could re-select the same notes
/// before the failure counts are updated in the database.
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
let _ = self
.notification_tx
.send(ActorNotification::NotesFailed {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if self
.request_tx
.send(ActorRequest::NotesFailed {
nullifiers: nullifiers.to_vec(),
block_num,
ack_tx,
})
.await;
.await
.is_err()
{
tracing::warn!("failed to send notes failed request, coordinator is shutting down");
return;
}
// Wait for the coordinator to confirm the DB write.
if ack_rx.await.is_err() {
tracing::warn!("failed to receive notes failed ack from coordinator");
}
}
}

Expand Down
Loading
Loading