Skip to content

Commit 592c2be

Browse files
committed
Omit PeekNotification for frontend peeks
1 parent 5492a2a commit 592c2be

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

src/adapter/src/peek_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ impl PeekClient {
333333
target_read_hold,
334334
target_replica,
335335
rows_tx,
336+
true,
336337
)
337338
.await
338339
.map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;

src/compute-client/src/controller.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ where
675675
}
676676

677677
tokio::select! {
678-
resp = self.response_rx.recv() => {
678+
resp = self.response_rx.recv() => { //////////// we get e.g. PeekNotification here
679679
let resp = resp.expect("`self.response_tx` not dropped");
680680
self.stashed_response = Some(resp);
681681
}
@@ -910,6 +910,7 @@ where
910910
read_hold,
911911
target_replica,
912912
peek_response_tx,
913+
false,
913914
)
914915
.expect("validated")
915916
});

src/compute-client/src/controller/instance.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ where
275275
target_read_hold: ReadHold<T>,
276276
target_replica: Option<ReplicaId>,
277277
peek_response_tx: oneshot::Sender<PeekResponse>,
278+
is_frontend_peek: bool,
278279
) -> Result<(), crate::controller::error::PeekError> {
279280
self.call_sync(move |i| {
280281
i.peek(
@@ -288,6 +289,7 @@ where
288289
target_read_hold,
289290
target_replica,
290291
peek_response_tx,
292+
is_frontend_peek,
291293
)
292294
})
293295
.await
@@ -1737,6 +1739,7 @@ where
17371739
mut read_hold: ReadHold<T>,
17381740
target_replica: Option<ReplicaId>,
17391741
peek_response_tx: oneshot::Sender<PeekResponse>,
1742+
is_frontend_peek: bool,
17401743
) -> Result<(), PeekError> {
17411744
use PeekError::*;
17421745

@@ -1769,6 +1772,7 @@ where
17691772
peek_response_tx,
17701773
limit: finishing.limit.map(usize::cast_from),
17711774
offset: finishing.offset,
1775+
is_frontend_peek,
17721776
},
17731777
);
17741778

@@ -2088,14 +2092,16 @@ where
20882092
let duration = peek.requested_at.elapsed();
20892093
self.metrics.observe_peek_response(&response, duration);
20902094

2091-
let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2092-
// NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2093-
// currently want the parent to be whatever the compute worker did with this peek.
2094-
self.deliver_response(ComputeControllerResponse::PeekNotification(
2095-
uuid,
2096-
notification,
2097-
otel_ctx,
2098-
));
2095+
if !peek.is_frontend_peek {
2096+
let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2097+
// NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2098+
// currently want the parent to be whatever the compute worker did with this peek.
2099+
self.deliver_response(ComputeControllerResponse::PeekNotification(
2100+
uuid,
2101+
notification,
2102+
otel_ctx,
2103+
));
2104+
}
20992105

21002106
self.finish_peek(uuid, response)
21012107
}
@@ -2993,6 +2999,7 @@ struct PendingPeek<T: Timestamp> {
29932999
limit: Option<usize>,
29943000
/// The offset into the peek's result.
29953001
offset: usize,
3002+
is_frontend_peek: bool,
29963003
}
29973004

29983005
#[derive(Debug, Clone)]

0 commit comments

Comments
 (0)