@@ -159,15 +159,15 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
159159 }
160160 }
161161 protocol:: ToServer :: ToServerEvents ( events) => {
162- let last_event_idx = events. last ( ) . map ( |event| event. index ) ;
163-
164162 // NOTE: This should not be parallelized because signals should be sent in order
165163 // Forward to actor workflows
166- for event in events {
164+ for event in & events {
167165 let actor_id =
168166 crate :: utils:: event_actor_id ( & event. inner ) . to_string ( ) ;
169167 let res = ctx
170- . signal ( crate :: workflows:: actor:: Event { inner : event. inner } )
168+ . signal ( crate :: workflows:: actor:: Event {
169+ inner : event. inner . clone ( ) ,
170+ } )
171171 . to_workflow :: < crate :: workflows:: actor:: Workflow > ( )
172172 . tag ( "actor_id" , & actor_id)
173173 . send ( )
@@ -186,20 +186,29 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
186186 }
187187 }
188188
189- // Ack every 500 events
190- if let Some ( last_event_idx) = last_event_idx {
191- if last_event_idx > state. last_event_ack_idx . saturating_add ( 500 ) {
192- state. last_event_ack_idx = last_event_idx;
189+ if !events. is_empty ( ) {
190+ ctx. activity ( InsertEventsInput {
191+ events : events. clone ( ) ,
192+ } )
193+ . await ?;
193194
194- ctx. activity ( SendMessageToRunnerInput {
195- runner_id : input. runner_id ,
196- message : protocol:: ToClient :: ToClientAckEvents (
197- protocol:: ToClientAckEvents {
198- last_event_idx : state. last_event_ack_idx ,
199- } ,
200- ) ,
201- } )
202- . await ?;
195+ // Ack every 500 events
196+ let last_event_idx = events. last ( ) . map ( |event| event. index ) ;
197+ if let Some ( last_event_idx) = last_event_idx {
198+ if last_event_idx > state. last_event_ack_idx . saturating_add ( 500 )
199+ {
200+ state. last_event_ack_idx = last_event_idx;
201+
202+ ctx. activity ( SendMessageToRunnerInput {
203+ runner_id : input. runner_id ,
204+ message : protocol:: ToClient :: ToClientAckEvents (
205+ protocol:: ToClientAckEvents {
206+ last_event_idx : state. last_event_ack_idx ,
207+ } ,
208+ ) ,
209+ } )
210+ . await ?;
211+ }
203212 }
204213 }
205214 }
@@ -822,7 +831,6 @@ async fn ack_commands(ctx: &ActivityCtx, input: &AckCommandsInput) -> Result<()>
822831
823832#[ derive( Debug , Serialize , Deserialize , Hash ) ]
824833struct InsertEventsInput {
825- runner_id : Id ,
826834 events : Vec < protocol:: EventWrapper > ,
827835}
828836
0 commit comments