@@ -5,6 +5,7 @@ use epoxy_protocol::{
55	versioned, 
66} ; 
77use  futures_util:: { StreamExt ,  stream:: FuturesUnordered } ; 
8+ use  rivet_api_builder:: ApiCtx ; 
89use  std:: future:: Future ; 
910use  versioned_data_util:: OwnedVersionedData ; 
1011
5657	. await ; 
5758	tracing:: info!( ?quorum_size,  len = ?responses. len( ) ,  ?quorum_type,  "fanout quorum size" ) ; 
5859
59- 	// Choow  how many successful responses we need before considering a success 
60+ 	// Choose  how many successful responses we need before considering a success 
6061	let  target_responses = match  quorum_type { 
6162		// Only require 1 response 
6263		utils:: QuorumType :: Any  => 1 , 
@@ -93,19 +94,32 @@ where
9394} 
9495
9596pub  async  fn  send_message ( 
97+ 	ctx :  & ApiCtx , 
9698	config :  & protocol:: ClusterConfig , 
97- 	to_replica_id :  ReplicaId , 
9899	request :  protocol:: Request , 
99100)  -> Result < protocol:: Response >  { 
100- 	let  replica_url = find_replica_address ( config,  to_replica_id) ?; 
101- 	send_message_to_address ( replica_url ,  to_replica_id ,  request) . await 
101+ 	let  replica_url = find_replica_address ( config,  request . to_replica_id ) ?; 
102+ 	send_message_to_address ( ctx ,  replica_url ,  request) . await 
102103} 
103104
104105pub  async  fn  send_message_to_address ( 
106+ 	ctx :  & ApiCtx , 
105107	replica_url :  String , 
106- 	to_replica_id :  ReplicaId , 
107108	request :  protocol:: Request , 
108109)  -> Result < protocol:: Response >  { 
110+ 	let  from_replica_id = request. from_replica_id ; 
111+ 	let  to_replica_id = request. to_replica_id ; 
112+ 
113+ 	if  from_replica_id == to_replica_id { 
114+ 		tracing:: info!( 
115+ 			to_replica = to_replica_id, 
116+ 			"sending message to replica directly" 
117+ 		) ; 
118+ 
119+ 		return  crate :: replica:: message_request:: message_request ( & ctx,  from_replica_id,  request) 
120+ 			. await ; 
121+ 	} 
122+ 
109123	let  mut  replica_url = url:: Url :: parse ( & replica_url) ?; 
110124	replica_url. set_path ( & format ! ( "/v{PROTOCOL_VERSION}/epoxy/message" ) ) ; 
111125
0 commit comments