Skip to content

Commit c6de652

Browse files
committed
stop trying to recv or send pg framed messages if the connection is canceled
1 parent 58763fb commit c6de652

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

crates/corro-pg/src/lib.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -556,9 +556,11 @@ pub async fn start(
556556
let cancel = cancel.clone();
557557
async move {
558558
// cancel stuff if this loop breaks
559-
let _drop_guard = cancel.drop_guard();
559+
let _drop_guard = cancel.clone().drop_guard();
560560

561-
while let Some(decode_res) = stream.next().await {
561+
while let Outcome::Completed(Some(decode_res)) =
562+
stream.next().preemptible(cancel.cancelled()).await
563+
{
562564
let msg = match decode_res {
563565
Ok(msg) => msg,
564566
Err(PgWireError::IoError(io_error)) => {
@@ -585,9 +587,9 @@ pub async fn start(
585587
break;
586588
}
587589
};
588-
589590
front_tx.send(msg).await?;
590591
}
592+
591593
debug!("frontend stream is done");
592594

593595
Ok::<_, BoxError>(())
@@ -597,8 +599,11 @@ pub async fn start(
597599
tokio::spawn({
598600
let cancel = cancel.clone();
599601
async move {
600-
let _drop_guard = cancel.drop_guard();
601-
while let Some(back) = back_rx.recv().await {
602+
let _drop_guard = cancel.clone().drop_guard();
603+
604+
while let Outcome::Completed(Some(back)) =
605+
back_rx.recv().preemptible(cancel.cancelled()).await
606+
{
602607
match back {
603608
BackendResponse::Message { message, flush } => {
604609
if let PgWireBackendMessage::ErrorResponse(e) = &message {

0 commit comments

Comments
 (0)