1- use  std:: { 
2- 	borrow:: Cow , 
3- 	collections:: HashMap  as  StdHashMap , 
4- 	net:: SocketAddr , 
5- 	sync:: Arc , 
6- 	time:: { Duration ,  Instant } , 
7- } ; 
8- 
91use  anyhow:: * ; 
102use  bytes:: Bytes ; 
113use  futures_util:: { SinkExt ,  StreamExt } ; 
@@ -16,17 +8,30 @@ use hyper_util::{client::legacy::Client, rt::TokioExecutor};
168use  moka:: future:: Cache ; 
179use  rand; 
1810use  rivet_api_builder:: { ErrorResponse ,  RawErrorResponse } ; 
19- use  rivet_error:: RivetError ; 
11+ use  rivet_error:: { INTERNAL_ERROR ,   RivetError } ; 
2012use  rivet_metrics:: KeyValue ; 
2113use  rivet_util:: Id ; 
2214use  serde_json; 
15+ use  std:: { 
16+ 	borrow:: Cow , 
17+ 	collections:: HashMap  as  StdHashMap , 
18+ 	net:: SocketAddr , 
19+ 	sync:: Arc , 
20+ 	time:: { Duration ,  Instant } , 
21+ } ; 
2322use  tokio:: sync:: Mutex ; 
2423use  tokio:: time:: timeout; 
25- use  tokio_tungstenite:: tungstenite:: client:: IntoClientRequest ; 
24+ use  tokio_tungstenite:: tungstenite:: { 
25+ 	client:: IntoClientRequest , 
26+ 	protocol:: { CloseFrame ,  frame:: coding:: CloseCode } , 
27+ } ; 
2628use  tracing:: Instrument ; 
2729use  url:: Url ; 
2830
29- use  crate :: { custom_serve:: CustomServeTrait ,  errors,  metrics,  request_context:: RequestContext } ; 
31+ use  crate :: { 
32+ 	WebSocketHandle ,  custom_serve:: CustomServeTrait ,  errors,  metrics, 
33+ 	request_context:: RequestContext , 
34+ } ; 
3035
3136pub  const  X_FORWARDED_FOR :  HeaderName  = HeaderName :: from_static ( "x-forwarded-for" ) ; 
3237pub  const  X_RIVET_ERROR :  HeaderName  = HeaderName :: from_static ( "x-rivet-error" ) ; 
@@ -1432,9 +1437,9 @@ impl ProxyService {
14321437
14331438									// Close the WebSocket connection with the response message 
14341439									let  _ = client_ws. close ( Some ( tokio_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1435- 								code :  tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1436- 								reason :  response. message . as_ref ( ) . into ( ) , 
1437- 							} ) ) . await ; 
1440+ 										 code :  tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1441+ 										 reason :  response. message . as_ref ( ) . into ( ) , 
1442+ 									 } ) ) . await ; 
14381443									return ; 
14391444								} 
14401445								Result :: Ok ( ResolveRouteOutput :: CustomServe ( _) )  => { 
@@ -1813,31 +1818,42 @@ impl ProxyService {
18131818						let  mut  attempts = 0u32 ; 
18141819						let  mut  client_ws = client_websocket; 
18151820
1821+ 						let  ws_handle = WebSocketHandle :: new ( client_ws) ; 
1822+ 
18161823						loop  { 
18171824							match  handlers
18181825								. handle_websocket ( 
1819- 									client_ws , 
1826+ 									ws_handle . clone ( ) , 
18201827									& req_headers, 
18211828									& req_path, 
18221829									& mut  request_context, 
18231830								) 
18241831								. await 
18251832							{ 
1826- 								Result :: Ok ( ( ) )  => break , 
1827- 								Result :: Err ( ( returned_client_ws,  err) )  => { 
1833+ 								Result :: Ok ( ( ) )  => { 
1834+ 									tracing:: debug!( "websocket closed" ) ; 
1835+ 
1836+ 									// Send graceful close 
1837+ 									ws_handle. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1838+ 										hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1839+ 											code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Normal , 
1840+ 											reason :  format ! ( "Closed" ) . into ( ) , 
1841+ 										} , 
1842+ 									) ) ) ; 
1843+ 
1844+ 									break ; 
1845+ 								} 
1846+ 								Result :: Err ( err)  => { 
18281847									attempts += 1 ; 
18291848									if  attempts > max_attempts || !is_retryable_ws_error ( & err)  { 
1830- 										// Accept and close the client websocket with an error reason 
1831- 										if  let  Result :: Ok ( mut  ws)  = returned_client_ws. await  { 
1832- 											let  _ = ws
1833- 													. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1834- 														hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1835- 															code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1836- 															reason :  format ! ( "{}" ,  err) . into ( ) , 
1837- 													} , 
1838- 												) ) ) 
1839- 												. await ; 
1840- 										} 
1849+ 										// Close WebSocket with error 
1850+ 										ws_handle
1851+ 											. accept_and_send ( 
1852+ 												hyper_tungstenite:: tungstenite:: Message :: Close ( 
1853+ 													Some ( err_to_close_frame ( err) ) , 
1854+ 												) , 
1855+ 											) 
1856+ 											. await ?; 
18411857
18421858										break ; 
18431859									}  else  { 
@@ -1861,49 +1877,38 @@ impl ProxyService {
18611877												new_handlers, 
18621878											) )  => { 
18631879												handlers = new_handlers; 
1864- 												client_ws = returned_client_ws; 
18651880												continue ; 
18661881											} 
18671882											Result :: Ok ( ResolveRouteOutput :: Response ( response) )  => { 
1868- 												if  let  Result :: Ok ( mut  ws)  = returned_client_ws. await 
1869- 												{ 
1870- 													let  _ = ws
1871- 															. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1872- 																hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1873- 																code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1874- 																reason :  response. message . as_ref ( ) . into ( ) , 
1875- 															} , 
1876- 															) ) ) 
1877- 															. await ; 
1878- 												} 
1879- 												break ; 
1883+ 												ws_handle
1884+ 													. accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1885+ 														hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1886+ 															code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1887+ 															reason :  response. message . as_ref ( ) . into ( ) , 
1888+ 														} , 
1889+ 													) ) ) 
1890+ 													. await ; 
18801891											} 
18811892											Result :: Ok ( ResolveRouteOutput :: Target ( _) )  => { 
1882- 												if  let  Result :: Ok ( mut  ws)  = returned_client_ws. await 
1883- 												{ 
1884- 													let  _ = ws
1885- 															. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1886- 																hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1887- 																code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1888- 																reason :  "Cannot retry WebSocket with non-custom serve route" . into ( ) , 
1889- 															} , 
1890- 															) ) ) 
1891- 															. await ; 
1892- 												} 
1893+ 												ws_handle
1894+ 													. accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1895+ 														hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1896+ 															code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1897+ 															reason :  "Cannot retry WebSocket with non-custom serve route" . into ( ) , 
1898+ 														} , 
1899+ 													) ) ) 
1900+ 													. await ; 
18931901												break ; 
18941902											} 
18951903											Err ( res_err)  => { 
1896- 												if  let  Result :: Ok ( mut  ws)  = returned_client_ws. await 
1897- 												{ 
1898- 													let  _ = ws
1899- 															. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1900- 																hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1901- 																code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1902- 																reason :  format ! ( "Routing error: {}" ,  res_err) . into ( ) , 
1903- 															} , 
1904- 															) ) ) 
1905- 															. await ; 
1906- 												} 
1904+ 												ws_handle
1905+ 													. accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some ( 
1906+ 														hyper_tungstenite:: tungstenite:: protocol:: CloseFrame  { 
1907+ 															code :  hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error , 
1908+ 															reason :  format ! ( "Routing error: {}" ,  res_err) . into ( ) , 
1909+ 														} , 
1910+ 													) ) ) 
1911+ 													. await ; 
19071912												break ; 
19081913											} 
19091914										} 
@@ -2242,3 +2247,26 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool {
22422247		false 
22432248	} 
22442249} 
2250+ 
2251+ pub  fn  err_to_close_frame ( err :  anyhow:: Error )  -> CloseFrame  { 
2252+ 	let  rivet_err = err
2253+ 		. chain ( ) 
2254+ 		. find_map ( |x| x. downcast_ref :: < RivetError > ( ) ) 
2255+ 		. cloned ( ) 
2256+ 		. unwrap_or_else ( || RivetError :: from ( & INTERNAL_ERROR ) ) ; 
2257+ 
2258+ 	let  code = match  ( rivet_err. group ( ) ,  rivet_err. code ( ) )  { 
2259+ 		( "ws" ,  "connection_closed" )  => CloseCode :: Normal , 
2260+ 		_ => CloseCode :: Error , 
2261+ 	} ; 
2262+ 
2263+ 	// NOTE: reason cannot be more than 123 bytes as per the WS protocol 
2264+ 	let  reason = rivet_util:: safe_slice ( 
2265+ 		& format ! ( "{}.{}" ,  rivet_err. group( ) ,  rivet_err. code( ) ) , 
2266+ 		0 , 
2267+ 		123 , 
2268+ 	) 
2269+ 	. into ( ) ; 
2270+ 
2271+ 	CloseFrame  {  code,  reason } 
2272+ } 
0 commit comments