@@ -281,9 +281,8 @@ impl PegboardGateway {
281281 let mut ws_rx = client_ws. accept ( ) . await ?;
282282
283283 // Spawn task to forward messages from server to client
284- let mut msg_rx_for_task = msg_rx;
285- tokio:: spawn ( async move {
286- while let Some ( msg) = msg_rx_for_task. recv ( ) . await {
284+ let mut server_to_client = tokio:: spawn ( async move {
285+ while let Some ( msg) = msg_rx. recv ( ) . await {
287286 match msg {
288287 TunnelMessageData :: Message (
289288 protocol:: ToServerTunnelMessageKind :: ToServerWebSocketMessage ( ws_msg) ,
@@ -302,6 +301,7 @@ impl PegboardGateway {
302301 protocol:: ToServerTunnelMessageKind :: ToServerWebSocketClose ( close) ,
303302 ) => {
304303 tracing:: info!( ?close, "server closed websocket" ) ;
304+ // Exit the task - websocket will be closed when handle_websocket_inner exits
305305 break ;
306306 }
307307 TunnelMessageData :: Timeout => {
@@ -313,48 +313,76 @@ impl PegboardGateway {
313313 }
314314 } ) ;
315315
316- // Forward messages from client to server
317- let mut close_reason = None ;
318- while let Some ( msg) = ws_rx. next ( ) . await {
319- match msg {
320- Result :: Ok ( Message :: Binary ( data) ) => {
321- let ws_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage (
322- protocol:: ToClientWebSocketMessage {
323- data : data. into ( ) ,
324- binary : true ,
325- } ,
326- ) ;
327- if let Err ( err) = self . shared_state . send_message ( request_id, ws_message) . await {
328- if is_tunnel_service_unavailable ( & err) {
329- tracing:: warn!( "tunnel closed sending binary message" ) ;
330- close_reason = Some ( "Tunnel closed" . to_string ( ) ) ;
331- break ;
332- } else {
333- tracing:: error!( ?err, "error sending binary message" ) ;
316+ // Spawn task to forward messages from client to server
317+ let shared_state_clone = self . shared_state . clone ( ) ;
318+ let mut client_to_server = tokio:: spawn ( async move {
319+ let mut close_reason = None ;
320+ while let Some ( msg) = ws_rx. next ( ) . await {
321+ match msg {
322+ Result :: Ok ( Message :: Binary ( data) ) => {
323+ let ws_message =
324+ protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage (
325+ protocol:: ToClientWebSocketMessage {
326+ data : data. into ( ) ,
327+ binary : true ,
328+ } ,
329+ ) ;
330+ if let Err ( err) = shared_state_clone
331+ . send_message ( request_id, ws_message)
332+ . await
333+ {
334+ if is_tunnel_service_unavailable ( & err) {
335+ tracing:: warn!( "tunnel closed sending binary message" ) ;
336+ close_reason = Some ( "Tunnel closed" . to_string ( ) ) ;
337+ break ;
338+ } else {
339+ tracing:: error!( ?err, "error sending binary message" ) ;
340+ }
334341 }
335342 }
336- }
337- Result :: Ok ( Message :: Text ( text) ) => {
338- let ws_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage (
339- protocol:: ToClientWebSocketMessage {
340- data : text. as_bytes ( ) . to_vec ( ) ,
341- binary : false ,
342- } ,
343- ) ;
344- if let Err ( err) = self . shared_state . send_message ( request_id, ws_message) . await {
345- if is_tunnel_service_unavailable ( & err) {
346- tracing:: warn!( "tunnel closed sending text message" ) ;
347- close_reason = Some ( "Tunnel closed" . to_string ( ) ) ;
348- break ;
349- } else {
350- tracing:: error!( ?err, "error sending text message" ) ;
343+ Result :: Ok ( Message :: Text ( text) ) => {
344+ let ws_message =
345+ protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage (
346+ protocol:: ToClientWebSocketMessage {
347+ data : text. as_bytes ( ) . to_vec ( ) ,
348+ binary : false ,
349+ } ,
350+ ) ;
351+ if let Err ( err) = shared_state_clone
352+ . send_message ( request_id, ws_message)
353+ . await
354+ {
355+ if is_tunnel_service_unavailable ( & err) {
356+ tracing:: warn!( "tunnel closed sending text message" ) ;
357+ close_reason = Some ( "Tunnel closed" . to_string ( ) ) ;
358+ break ;
359+ } else {
360+ tracing:: error!( ?err, "error sending text message" ) ;
361+ }
351362 }
352363 }
364+ Result :: Ok ( Message :: Close ( _) ) | Err ( _) => break ,
365+ _ => { }
353366 }
354- Result :: Ok ( Message :: Close ( _) ) | Err ( _) => break ,
355- _ => { }
356367 }
357- }
368+ close_reason
369+ } ) ;
370+
371+ // Wait for either task to complete
372+ let close_reason = tokio:: select! {
373+ _ = & mut server_to_client => {
374+ tracing:: info!( "server to client task completed" ) ;
375+ None
376+ }
377+ res = & mut client_to_server => {
378+ tracing:: info!( "client to server task completed" ) ;
379+ res. unwrap_or( None )
380+ }
381+ } ;
382+
383+ // Abort remaining tasks
384+ server_to_client. abort ( ) ;
385+ client_to_server. abort ( ) ;
358386
359387 // Send WebSocket close message
360388 let close_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketClose (
0 commit comments