Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 43 additions & 45 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use account_state::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::clients::{Builder, ValidatorClient};
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
Expand All @@ -20,7 +19,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::note::{Note, NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;

Expand All @@ -29,16 +28,19 @@ use crate::builder::ChainState;
use crate::db::Db;
use crate::store::StoreClient;

// ACTOR NOTIFICATION
// ACTOR REQUESTS
// ================================================================================================

/// A notification sent from an account actor to the coordinator.
pub enum ActorNotification {
/// A request sent from an account actor to the coordinator via a shared mpsc channel.
pub enum ActorRequest {
/// One or more notes failed during transaction execution and should have their attempt
/// counters incremented.
/// counters incremented. The actor waits for the coordinator to acknowledge the DB write via
/// the oneshot channel, preventing race conditions where the actor could re-select the same
/// notes before the failure is persisted.
NotesFailed {
nullifiers: Vec<Nullifier>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
/// A note script was fetched from the remote store and should be persisted to the local DB.
CacheNoteScript { script_root: Word, script: NoteScript },
Expand All @@ -49,9 +51,6 @@ pub enum ActorNotification {

/// The reason an actor has shut down.
pub enum ActorShutdownReason {
/// Occurs when an account actor detects failure in the messaging channel used by the
/// coordinator.
EventChannelClosed,
/// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore.
SemaphoreFailed(AcquireError),
/// Occurs when an account actor detects its corresponding cancellation token has been triggered
Expand Down Expand Up @@ -87,8 +86,8 @@ pub struct AccountActorContext {
pub max_note_attempts: usize,
/// Database for persistent state.
pub db: Db,
/// Channel for sending notifications to the coordinator (via the builder event loop).
pub notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator (via the builder event loop).
pub request_tx: mpsc::Sender<ActorRequest>,
}

// ACCOUNT ORIGIN
Expand Down Expand Up @@ -179,7 +178,7 @@ pub struct AccountActor {
store: StoreClient,
db: Db,
mode: ActorMode,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
validator: ValidatorClient,
Expand All @@ -190,17 +189,16 @@ pub struct AccountActor {
max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
max_note_attempts: usize,
/// Channel for sending notifications to the coordinator.
notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator.
request_tx: mpsc::Sender<ActorRequest>,
}

impl AccountActor {
/// Constructs a new account actor and corresponding messaging channel with the given
/// configuration.
/// Constructs a new account actor with the given configuration.
pub fn new(
origin: AccountOrigin,
actor_context: &AccountActorContext,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
) -> Self {
let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone());
Expand All @@ -217,7 +215,7 @@ impl AccountActor {
store: actor_context.store.clone(),
db: actor_context.db.clone(),
mode: ActorMode::NoViableNotes,
event_rx,
notify,
cancel_token,
block_producer,
validator,
Expand All @@ -226,7 +224,7 @@ impl AccountActor {
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
notification_tx: actor_context.notification_tx.clone(),
request_tx: actor_context.request_tx.clone(),
}
}

Expand Down Expand Up @@ -261,28 +259,22 @@ impl AccountActor {
_ = self.cancel_token.cancelled() => {
return ActorShutdownReason::Cancelled(account_id);
}
// Handle mempool events.
event = self.event_rx.recv() => {
let Some(event) = event else {
return ActorShutdownReason::EventChannelClosed;
};
// Re-enable transaction execution if the transaction being waited on has
// been resolved (added to mempool, committed in a block, or reverted).
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
let should_wake = match event.as_ref() {
MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id,
MempoolEvent::BlockCommitted { txs, .. } => {
txs.contains(&awaited_id)
},
MempoolEvent::TransactionsReverted(tx_ids) => {
tx_ids.contains(&awaited_id)
},
};
if should_wake {
// Handle coordinator notifications. On notification, re-evaluate state from DB.
_ = self.notify.notified() => {
match self.mode {
ActorMode::TransactionInflight(awaited_id) => {
// Check DB: is the inflight tx still pending?
let resolved = self.db
.is_transaction_resolved(account_id, awaited_id)
.await
.expect("should be able to check tx status");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to stop everything if a single actor fails on this basis?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an actor panics, the coordinator catches the error in coordinator.next() and logs it, but the system keeps running without the actor. Though know that you mentioned this, probably the best would be to return an specific error for this Db fail case.

if resolved {
self.mode = ActorMode::NotesAvailable;
}
},
_ => {
self.mode = ActorMode::NotesAvailable;
}
} else {
self.mode = ActorMode::NotesAvailable;
}
},
// Execute transactions.
Expand Down Expand Up @@ -395,25 +387,31 @@ impl AccountActor {
}
}

/// Sends notifications to the coordinator to cache note scripts fetched from the remote store.
/// Sends requests to the coordinator to cache note scripts fetched from the remote store.
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
let _ = self
.notification_tx
.send(ActorNotification::CacheNoteScript { script_root, script })
.request_tx
.send(ActorRequest::CacheNoteScript { script_root, script })
.await;
}
}

/// Sends a notification to the coordinator to mark notes as failed.
/// Sends a request to the coordinator to mark notes as failed and waits for the DB write to
/// complete. This prevents a race condition where the actor could re-select the same notes
/// before the failure counts are updated in the database.
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
let _ = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ is type Result<(), SendError<ActorRequest>>. Are we sure we don't want to be handling that error here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with let _ = ack_rx.await; below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced them with .is_err checks

.notification_tx
.send(ActorNotification::NotesFailed {
.request_tx
.send(ActorRequest::NotesFailed {
nullifiers: nullifiers.to_vec(),
block_num,
ack_tx,
})
.await;
// Wait for the coordinator to confirm the DB write.
let _ = ack_rx.await;
}
}

Expand Down
51 changes: 23 additions & 28 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_stream::StreamExt;
use tonic::Status;

use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin, ActorNotification};
use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::store::StoreClient;
Expand Down Expand Up @@ -98,8 +98,8 @@ pub struct NetworkTransactionBuilder {
actor_context: AccountActorContext,
/// Stream of mempool events from the block producer.
mempool_events: MempoolEventStream,
/// Receiver for notifications from account actors (e.g., note failures).
notification_rx: mpsc::Receiver<ActorNotification>,
/// Receiver for requests from account actors (note failures, script caching).
actor_request_rx: mpsc::Receiver<ActorRequest>,
}

impl NetworkTransactionBuilder {
Expand All @@ -112,7 +112,7 @@ impl NetworkTransactionBuilder {
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
notification_rx: mpsc::Receiver<ActorNotification>,
actor_request_rx: mpsc::Receiver<ActorRequest>,
) -> Self {
Self {
config,
Expand All @@ -122,7 +122,7 @@ impl NetworkTransactionBuilder {
chain_state,
actor_context,
mempool_events,
notification_rx,
actor_request_rx,
}
}

Expand Down Expand Up @@ -164,17 +164,17 @@ impl NetworkTransactionBuilder {
.context("mempool event stream ended")?
.context("mempool event stream failed")?;

self.handle_mempool_event(event.into()).await?;
self.handle_mempool_event(event).await?;
},
// Handle account batches loaded from the store.
// Once all accounts are loaded, the channel closes and this branch
// becomes inactive (recv returns None and we stop matching).
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id).await?;
},
// Handle actor notifications (DB writes delegated from actors).
Some(notification) = self.notification_rx.recv() => {
self.handle_actor_notification(notification).await;
// Handle requests from actors.
Some(request) = self.actor_request_rx.recv() => {
self.handle_actor_request(request).await;
},
// Handle account loader task completion/failure.
// If the task fails, we abort since the builder would be in a degraded state
Expand Down Expand Up @@ -227,18 +227,14 @@ impl NetworkTransactionBuilder {
.context("failed to sync account to DB")?;

self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context)
.await?;
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
Ok(())
}

/// Handles mempool events by writing to DB first, then routing to actors.
/// Handles mempool events by writing to DB first, then notifying actors.
#[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
async fn handle_mempool_event(
&mut self,
event: Arc<MempoolEvent>,
) -> Result<(), anyhow::Error> {
match event.as_ref() {
async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
match &event {
MempoolEvent::TransactionAdded { account_delta, .. } => {
// Write event effects to DB first.
self.coordinator
Expand All @@ -253,13 +249,11 @@ impl NetworkTransactionBuilder {
// Spawn new actors if a transaction creates a new network account.
let is_creating_account = delta.is_full_state();
if is_creating_account {
self.coordinator
.spawn_actor(network_account, &self.actor_context)
.await?;
self.coordinator.spawn_actor(network_account, &self.actor_context);
}
}
}
self.coordinator.send_targeted(&event).await?;
self.coordinator.send_targeted(&event);
Ok(())
},
// Update chain state and broadcast.
Expand All @@ -271,7 +265,7 @@ impl NetworkTransactionBuilder {
.context("failed to write BlockCommitted to DB")?;

self.update_chain_tip(header.as_ref().clone()).await;
self.coordinator.broadcast(event.clone()).await;
self.coordinator.broadcast();
Ok(())
},
// Broadcast to all actors.
Expand All @@ -283,7 +277,7 @@ impl NetworkTransactionBuilder {
.await
.context("failed to write TransactionsReverted to DB")?;

self.coordinator.broadcast(event.clone()).await;
self.coordinator.broadcast();

// Cancel actors for reverted account creations.
for account_id in &reverted_accounts {
Expand All @@ -294,15 +288,16 @@ impl NetworkTransactionBuilder {
}
}

/// Processes a notification from an account actor by performing the corresponding DB write.
async fn handle_actor_notification(&mut self, notification: ActorNotification) {
match notification {
ActorNotification::NotesFailed { nullifiers, block_num } => {
/// Processes a request from an account actor.
async fn handle_actor_request(&mut self, request: ActorRequest) {
match request {
ActorRequest::NotesFailed { nullifiers, block_num, ack_tx } => {
if let Err(err) = self.db.notes_failed(nullifiers, block_num).await {
tracing::error!(err = %err, "failed to mark notes as failed");
}
let _ = ack_tx.send(());
},
ActorNotification::CacheNoteScript { script_root, script } => {
ActorRequest::CacheNoteScript { script_root, script } => {
if let Err(err) = self.db.insert_note_script(script_root, &script).await {
tracing::error!(err = %err, "failed to cache note script");
}
Expand Down
Loading
Loading