@@ -62,7 +62,7 @@ pub(crate) struct RemoteMap {
6262 local_addrs : n0_watcher:: Direct < BTreeSet < DirectAddr > > ,
6363 disco : DiscoState ,
6464 sender : TransportsSender ,
65- actor_tasks : Mutex < JoinSet < Vec < RemoteStateMessage > > > ,
65+ actor_tasks : Mutex < JoinSet < ( EndpointId , Vec < RemoteStateMessage > ) > > ,
6666}
6767
6868impl RemoteMap {
@@ -118,7 +118,22 @@ impl RemoteMap {
118118 senders. retain ( |_eid, sender| !sender. is_closed ( ) ) ;
119119 while let Some ( result) = self . actor_tasks . lock ( ) . expect ( "poisoned" ) . try_join_next ( ) {
120120 match result {
121- Ok ( leftover_msgs) => debug ! ( ?leftover_msgs, "TODO: handle leftover messages" ) ,
121+ Ok ( ( eid, leftover_msgs) ) => {
122+ let entry = senders. entry ( eid) ;
123+ if leftover_msgs. is_empty ( ) {
124+ match entry {
125+ hash_map:: Entry :: Occupied ( occupied_entry) => occupied_entry. remove ( ) ,
126+ hash_map:: Entry :: Vacant ( _) => {
127+ panic ! ( "this should be impossible TODO(matheus23)" ) ;
128+ }
129+ } ;
130+ } else {
131+ // The remote actor got messages while it was closing, so we're restarting
132+ debug ! ( %eid, "restarting terminated remote state actor: messages received during shutdown" ) ;
133+ let sender = self . start_remote_state_actor ( eid, leftover_msgs) ;
134+ entry. insert_entry ( sender) ;
135+ }
136+ }
122137 Err ( err) => {
123138 if let Ok ( panic) = err. try_into_panic ( ) {
124139 error ! ( "RemoteStateActor panicked." ) ;
@@ -139,7 +154,7 @@ impl RemoteMap {
139154 match handles. entry ( eid) {
140155 hash_map:: Entry :: Occupied ( entry) => entry. get ( ) . clone ( ) ,
141156 hash_map:: Entry :: Vacant ( entry) => {
142- let sender = self . start_remote_state_actor ( eid) ;
157+ let sender = self . start_remote_state_actor ( eid, vec ! [ ] ) ;
143158 entry. insert ( sender. clone ( ) ) ;
144159 sender
145160 }
@@ -149,7 +164,11 @@ impl RemoteMap {
149164 /// Starts a new remote state actor and returns a handle and a sender.
150165 ///
151166 /// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
152- fn start_remote_state_actor ( & self , eid : EndpointId ) -> mpsc:: Sender < RemoteStateMessage > {
167+ fn start_remote_state_actor (
168+ & self ,
169+ eid : EndpointId ,
170+ initial_msgs : Vec < RemoteStateMessage > ,
171+ ) -> mpsc:: Sender < RemoteStateMessage > {
153172 // Ensure there is a RemoteMappedAddr for this EndpointId.
154173 self . endpoint_mapped_addrs . get ( & eid) ;
155174 RemoteStateActor :: new (
@@ -161,7 +180,10 @@ impl RemoteMap {
161180 self . metrics . clone ( ) ,
162181 self . sender . clone ( ) ,
163182 )
164- . start ( self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) )
183+ . start (
184+ initial_msgs,
185+ self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) ,
186+ )
165187 }
166188
167189 pub ( super ) fn handle_ping ( & self , msg : disco:: Ping , sender : EndpointId , src : transports:: Addr ) {
0 commit comments