Skip to content

Commit a8d3e24

Browse files
committed
JoinHandle is infallible
Tasks can fail in theory for two ways: - Panic! However, we disable panics in production. (And for tests, we percolate the panic upward.) - Task is explicitly aborted. This is statically ruled out by the wrapper. - Runtime is shutting down. This is not interesting - it means the process is shutting down - and there's no value to percolating errors in that case. Here, we just remain pending indefinitely until the runtime dies.
1 parent 51f8b10 commit a8d3e24

File tree

52 files changed

+149
-228
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+149
-228
lines changed

src/adapter/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3372,7 +3372,7 @@ mod tests {
33723372

33733373
let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
33743374
for handle in handles {
3375-
handle.await.expect("must succeed");
3375+
handle.await;
33763376
}
33773377
}
33783378

src/adapter/src/catalog/apply.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,7 @@ impl CatalogState {
14201420
// Wait for a view to be ready.
14211421
let (handle, _idx, remaining) = future::select_all(handles).await;
14221422
handles = remaining;
1423-
let (id, global_id, res) = handle.expect("must join");
1423+
let (id, global_id, res) = handle;
14241424
let mut insert_cached_expr = |cached_expr| {
14251425
if let Some(cached_expr) = cached_expr {
14261426
local_expression_cache.insert_cached_expression(global_id, cached_expr);

src/adapter/src/coord.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2553,15 +2553,12 @@ impl Coordinator {
25532553

25542554
let retractions_res = futures::future::join_all(retraction_tasks).await;
25552555
for retractions in retractions_res {
2556-
let retractions = retractions.expect("cannot fail to fetch snapshot");
25572556
builtin_table_updates.push(retractions);
25582557
}
25592558

25602559
let audit_join_start = Instant::now();
25612560
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2562-
let audit_log_updates = audit_log_task
2563-
.await
2564-
.expect("cannot fail to fetch audit log updates");
2561+
let audit_log_updates = audit_log_task.await;
25652562
let audit_log_builtin_table_updates = self
25662563
.catalog()
25672564
.state()

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,7 @@ impl Coordinator {
252252
spawn(|| "sequence_staged", async move {
253253
tokio::select! {
254254
res = handle => {
255-
let next = match res {
256-
Ok(next) => return_if_err!(next, ctx),
257-
Err(err) => {
258-
tracing::error!("sequence_staged join error {err}");
259-
ctx.retire(Err(AdapterError::Internal(
260-
"sequence_staged join error".into(),
261-
)));
262-
return;
263-
}
264-
};
255+
let next = return_if_err!(res, ctx);
265256
f(ctx, next);
266257
}
267258
_ = rx, if cancel_enabled => {

src/adapter/src/explain/insights.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl PlanInsights {
132132
});
133133
for task in tasks {
134134
let res = task.await;
135-
let Ok(Ok((name, plan))) = res else {
135+
let Ok((name, plan)) = res else {
136136
continue;
137137
};
138138
let (plan, _, _) = plan.unapply();

src/adapter/src/webhook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl AppendWebhookValidator {
223223
.map_err(|e| {
224224
tracing::error!("Failed to run validation for webhook, {e}");
225225
AppendWebhookError::ValidationError
226-
})??;
226+
})?;
227227

228228
valid
229229
}

src/aws-util/src/s3_uploader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use bytes::{Bytes, BytesMut};
2020
use bytesize::ByteSize;
2121
use mz_ore::cast::CastFrom;
2222
use mz_ore::error::ErrorExt;
23-
use mz_ore::task::{JoinHandle, JoinHandleExt, spawn};
23+
use mz_ore::task::{JoinHandle, spawn};
2424

2525
/// A multi part uploader which can upload a single object across multiple parts
2626
/// and keeps track of state to eventually finish the upload process.
@@ -226,7 +226,7 @@ impl S3MultiPartUploader {
226226

227227
let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
228228
for handle in self.upload_handles {
229-
let (etag, part_num) = handle.wait_and_assert_finished().await?;
229+
let (etag, part_num) = handle.await?;
230230
match etag {
231231
Some(etag) => {
232232
parts.push(

src/balancerd/tests/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ async fn test_balancer() {
334334
handles.push(handle);
335335
}
336336
for handle in handles {
337-
handle.await.unwrap();
337+
handle.await;
338338
}
339339
let end_auth_count = *frontegg_server.auth_requests.lock().unwrap();
340340
// We expect that the auth count increased by fewer than the number of connections.

src/catalog/tests/debug.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) {
589589
.await
590590
.unwrap();
591591

592-
state_handle.await.unwrap();
592+
state_handle.await;
593593

594594
let mut state = state_builder
595595
.clone()
@@ -625,7 +625,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) {
625625
.await
626626
.unwrap();
627627

628-
state_handle.await.unwrap();
628+
state_handle.await;
629629

630630
let configs = state_builder
631631
.clone()

src/catalog/tests/open.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) {
970970
.unwrap()
971971
.0;
972972

973-
state_handle.await.unwrap();
973+
state_handle.await;
974974

975975
// Open again to ensure that we didn't commit an invalid retraction.
976976
let _state = state_builder

0 commit comments

Comments
 (0)