Skip to content

Commit 75d581a

Browse files
committed
[invoker] removing unnecessary async on action handling
Removes the unnecessary tainted async methods from InvokerHandle. This makes it clear to callers when a method can yield or not.
1 parent 2d57b56 commit 75d581a

File tree

9 files changed

+47
-63
lines changed

9 files changed

+47
-63
lines changed

crates/invoker-api/src/handle.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,21 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use super::Effect;
12-
use super::JournalMetadata;
11+
use std::ops::RangeInclusive;
12+
13+
use tokio::sync::mpsc;
1314

14-
use crate::invocation_reader::JournalEntry;
1515
use restate_errors::NotRunningError;
1616
use restate_types::identifiers::PartitionKey;
1717
use restate_types::identifiers::{InvocationId, PartitionLeaderEpoch};
1818
use restate_types::invocation::{InvocationEpoch, InvocationTarget};
1919
use restate_types::journal::Completion;
2020
use restate_types::journal_v2::CommandIndex;
2121
use restate_types::journal_v2::raw::RawNotification;
22-
use std::future::Future;
23-
use std::ops::RangeInclusive;
24-
use tokio::sync::mpsc;
22+
23+
use super::Effect;
24+
use super::JournalMetadata;
25+
use crate::invocation_reader::JournalEntry;
2526

2627
#[derive(Debug, Eq, PartialEq, Default)]
2728
pub enum InvokeInputJournal {
@@ -38,56 +39,56 @@ pub trait InvokerHandle<SR> {
3839
invocation_epoch: InvocationEpoch,
3940
invocation_target: InvocationTarget,
4041
journal: InvokeInputJournal,
41-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
42+
) -> Result<(), NotRunningError>;
4243

4344
fn notify_completion(
4445
&mut self,
4546
partition: PartitionLeaderEpoch,
4647
invocation_id: InvocationId,
4748
completion: Completion,
48-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
49+
) -> Result<(), NotRunningError>;
4950

5051
fn notify_notification(
5152
&mut self,
5253
partition: PartitionLeaderEpoch,
5354
invocation_id: InvocationId,
5455
invocation_epoch: InvocationEpoch,
5556
entry: RawNotification,
56-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
57+
) -> Result<(), NotRunningError>;
5758

5859
fn retry_invocation_now(
5960
&mut self,
6061
partition: PartitionLeaderEpoch,
6162
invocation_id: InvocationId,
6263
invocation_epoch: InvocationEpoch,
63-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
64+
) -> Result<(), NotRunningError>;
6465

6566
fn notify_stored_command_ack(
6667
&mut self,
6768
partition: PartitionLeaderEpoch,
6869
invocation_id: InvocationId,
6970
invocation_epoch: InvocationEpoch,
7071
command_index: CommandIndex,
71-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
72+
) -> Result<(), NotRunningError>;
7273

7374
fn abort_all_partition(
7475
&mut self,
7576
partition: PartitionLeaderEpoch,
76-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
77+
) -> Result<(), NotRunningError>;
7778

7879
/// *Note*: When aborting an invocation, and restarting it, the `invocation_epoch` MUST be bumped.
7980
fn abort_invocation(
8081
&mut self,
8182
partition_leader_epoch: PartitionLeaderEpoch,
8283
invocation_id: InvocationId,
8384
invocation_epoch: InvocationEpoch,
84-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
85+
) -> Result<(), NotRunningError>;
8586

8687
fn register_partition(
8788
&mut self,
8889
partition: PartitionLeaderEpoch,
8990
partition_key_range: RangeInclusive<PartitionKey>,
9091
storage_reader: SR,
9192
sender: mpsc::Sender<Box<Effect>>,
92-
) -> impl Future<Output = Result<(), NotRunningError>> + Send;
93+
) -> Result<(), NotRunningError>;
9394
}

crates/invoker-api/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub mod test_util {
100100
}
101101

102102
impl<SR: Send> InvokerHandle<SR> for MockInvokerHandle<SR> {
103-
async fn invoke(
103+
fn invoke(
104104
&mut self,
105105
_partition: PartitionLeaderEpoch,
106106
_invocation_id: InvocationId,
@@ -111,7 +111,7 @@ pub mod test_util {
111111
Ok(())
112112
}
113113

114-
async fn notify_completion(
114+
fn notify_completion(
115115
&mut self,
116116
_partition: PartitionLeaderEpoch,
117117
_invocation_id: InvocationId,
@@ -120,7 +120,7 @@ pub mod test_util {
120120
Ok(())
121121
}
122122

123-
async fn notify_notification(
123+
fn notify_notification(
124124
&mut self,
125125
_partition: PartitionLeaderEpoch,
126126
_invocation_id: InvocationId,
@@ -130,7 +130,7 @@ pub mod test_util {
130130
Ok(())
131131
}
132132

133-
async fn notify_stored_command_ack(
133+
fn notify_stored_command_ack(
134134
&mut self,
135135
_partition: PartitionLeaderEpoch,
136136
_invocation_id: InvocationId,
@@ -140,14 +140,14 @@ pub mod test_util {
140140
Ok(())
141141
}
142142

143-
async fn abort_all_partition(
143+
fn abort_all_partition(
144144
&mut self,
145145
_partition: PartitionLeaderEpoch,
146146
) -> Result<(), NotRunningError> {
147147
Ok(())
148148
}
149149

150-
async fn abort_invocation(
150+
fn abort_invocation(
151151
&mut self,
152152
_partition_leader_epoch: PartitionLeaderEpoch,
153153
_invocation_id: InvocationId,
@@ -156,7 +156,7 @@ pub mod test_util {
156156
Ok(())
157157
}
158158

159-
async fn retry_invocation_now(
159+
fn retry_invocation_now(
160160
&mut self,
161161
_partition_leader_epoch: PartitionLeaderEpoch,
162162
_invocation_id: InvocationId,
@@ -165,7 +165,7 @@ pub mod test_util {
165165
Ok(())
166166
}
167167

168-
async fn register_partition(
168+
fn register_partition(
169169
&mut self,
170170
_partition: PartitionLeaderEpoch,
171171
_partition_key_range: RangeInclusive<PartitionKey>,

crates/invoker-impl/src/input_command.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub struct InvokerHandle<SR> {
8888
}
8989

9090
impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
91-
async fn invoke(
91+
fn invoke(
9292
&mut self,
9393
partition: PartitionLeaderEpoch,
9494
invocation_id: InvocationId,
@@ -107,7 +107,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
107107
.map_err(|_| NotRunningError)
108108
}
109109

110-
async fn notify_completion(
110+
fn notify_completion(
111111
&mut self,
112112
partition: PartitionLeaderEpoch,
113113
invocation_id: InvocationId,
@@ -122,7 +122,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
122122
.map_err(|_| NotRunningError)
123123
}
124124

125-
async fn notify_notification(
125+
fn notify_notification(
126126
&mut self,
127127
partition: PartitionLeaderEpoch,
128128
invocation_id: InvocationId,
@@ -139,7 +139,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
139139
.map_err(|_| NotRunningError)
140140
}
141141

142-
async fn notify_stored_command_ack(
142+
fn notify_stored_command_ack(
143143
&mut self,
144144
partition: PartitionLeaderEpoch,
145145
invocation_id: InvocationId,
@@ -156,7 +156,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
156156
.map_err(|_| NotRunningError)
157157
}
158158

159-
async fn abort_all_partition(
159+
fn abort_all_partition(
160160
&mut self,
161161
partition: PartitionLeaderEpoch,
162162
) -> Result<(), NotRunningError> {
@@ -165,7 +165,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
165165
.map_err(|_| NotRunningError)
166166
}
167167

168-
async fn abort_invocation(
168+
fn abort_invocation(
169169
&mut self,
170170
partition: PartitionLeaderEpoch,
171171
invocation_id: InvocationId,
@@ -180,7 +180,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
180180
.map_err(|_| NotRunningError)
181181
}
182182

183-
async fn retry_invocation_now(
183+
fn retry_invocation_now(
184184
&mut self,
185185
partition: PartitionLeaderEpoch,
186186
invocation_id: InvocationId,
@@ -195,7 +195,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
195195
.map_err(|_| NotRunningError)
196196
}
197197

198-
async fn register_partition(
198+
fn register_partition(
199199
&mut self,
200200
partition: PartitionLeaderEpoch,
201201
partition_key_range: RangeInclusive<PartitionKey>,

crates/invoker-impl/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,7 +1811,6 @@ mod tests {
18111811
EmptyStorageReader,
18121812
output_tx,
18131813
)
1814-
.await
18151814
.unwrap();
18161815
handle
18171816
.invoke(
@@ -1821,7 +1820,6 @@ mod tests {
18211820
invocation_target,
18221821
InvokeInputJournal::NoCachedJournal,
18231822
)
1824-
.await
18251823
.unwrap();
18261824

18271825
// If input order between 'register partition' and 'invoke' is not maintained, then it can happen

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,8 @@ impl LeaderState {
209209
// It's ok to not check the abort_result because either it succeeded or the invoker
210210
// is not running. If the invoker is not running, and we are not shutting down, then
211211
// we will fail the next time we try to invoke.
212-
let (shuffle_result, cleaner_result, _abort_result) = tokio::join!(
213-
shuffle_handle,
214-
cleaner_handle,
215-
invoker_handle.abort_all_partition((self.partition_id, self.leader_epoch)),
216-
);
212+
let _ = invoker_handle.abort_all_partition((self.partition_id, self.leader_epoch));
213+
let (shuffle_result, cleaner_result) = tokio::join!(shuffle_handle, cleaner_handle);
217214

218215
if let Some(shuffle_result) = shuffle_result {
219216
let _ = shuffle_result.expect("graceful termination of shuffle task");
@@ -351,7 +348,7 @@ impl LeaderState {
351348
}
352349
}
353350

354-
pub async fn handle_actions(
351+
pub fn handle_actions(
355352
&mut self,
356353
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
357354
actions: impl Iterator<Item = Action>,
@@ -369,13 +366,13 @@ impl LeaderState {
369366
)
370367
.increment(1);
371368

372-
self.handle_action(action, invoker_tx).await?;
369+
self.handle_action(action, invoker_tx)?;
373370
}
374371

375372
Ok(())
376373
}
377374

378-
async fn handle_action(
375+
fn handle_action(
379376
&mut self,
380377
action: Action,
381378
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
@@ -395,7 +392,6 @@ impl LeaderState {
395392
invocation_target,
396393
invoke_input_journal,
397394
)
398-
.await
399395
.map_err(Error::Invoker)?,
400396
Action::NewOutboxMessage {
401397
seq_number,
@@ -421,22 +417,19 @@ impl LeaderState {
421417
invocation_epoch,
422418
command_index,
423419
)
424-
.await
425420
.map_err(Error::Invoker)?;
426421
}
427422
Action::ForwardCompletion {
428423
invocation_id,
429424
completion,
430425
} => invoker_tx
431426
.notify_completion(partition_leader_epoch, invocation_id, completion)
432-
.await
433427
.map_err(Error::Invoker)?,
434428
Action::AbortInvocation {
435429
invocation_id,
436430
invocation_epoch,
437431
} => invoker_tx
438432
.abort_invocation(partition_leader_epoch, invocation_id, invocation_epoch)
439-
.await
440433
.map_err(Error::Invoker)?,
441434
Action::IngressResponse {
442435
request_id,
@@ -493,7 +486,6 @@ impl LeaderState {
493486
invocation_epoch,
494487
notification,
495488
)
496-
.await
497489
.map_err(Error::Invoker)?;
498490
}
499491
Action::ForwardKillResponse {

crates/worker/src/partition/leadership/mod.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,6 @@ where
447447
InvokerStorageReader::new(partition_store.clone()),
448448
invoker_tx,
449449
)
450-
.await
451450
.map_err(Error::Invoker)?;
452451

453452
{
@@ -472,7 +471,6 @@ where
472471
invocation_target,
473472
InvokeInputJournal::NoCachedJournal,
474473
)
475-
.await
476474
.map_err(Error::Invoker)?;
477475
count += 1;
478476
}
@@ -502,18 +500,13 @@ where
502500
}
503501
}
504502

505-
pub async fn handle_actions(
506-
&mut self,
507-
actions: impl Iterator<Item = Action>,
508-
) -> Result<(), Error> {
503+
pub fn handle_actions(&mut self, actions: impl Iterator<Item = Action>) -> Result<(), Error> {
509504
match &mut self.state {
510505
State::Follower | State::Candidate { .. } => {
511506
// nothing to do :-)
512507
}
513508
State::Leader(leader_state) => {
514-
leader_state
515-
.handle_actions(&mut self.invoker_tx, actions)
516-
.await?;
509+
leader_state.handle_actions(&mut self.invoker_tx, actions)?;
517510
}
518511
}
519512

crates/worker/src/partition/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ where
559559

560560
// Commit our changes and notify actuators about actions if we are the leader
561561
transaction.commit().await?;
562-
self.leadership_state.handle_actions(action_collector.drain(..)).await?;
562+
self.leadership_state.handle_actions(action_collector.drain(..))?;
563563
},
564564
result = self.leadership_state.run() => {
565565
let action_effects = result?;

0 commit comments

Comments
 (0)