Skip to content

Commit ff15327

Browse files
committed
fix(pb): rewrite runner wf to handle batch signals
1 parent 457234c commit ff15327

File tree

7 files changed

+1187
-10
lines changed

7 files changed

+1187
-10
lines changed

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub async fn init_conn(
154154
};
155155

156156
// Forward to runner wf
157-
ctx.signal(pegboard::workflows::runner::Forward { inner: packet })
157+
ctx.signal(pegboard::workflows::runner2::Forward { inner: packet })
158158
.to_workflow_id(workflow_id)
159159
.send()
160160
.await

engine/packages/pegboard-runner/src/ping_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> {
5555
if let RunnerEligibility::ReEligible = notif.eligibility {
5656
tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again");
5757

58-
ctx.signal(pegboard::workflows::runner::CheckQueue {})
58+
ctx.signal(pegboard::workflows::runner2::CheckQueue {})
5959
.to_workflow_id(notif.workflow_id)
6060
.send()
6161
.await?;

engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ async fn handle_message(
358358
| protocol::ToServer::ToServerEvents(_)
359359
| protocol::ToServer::ToServerAckCommands(_)
360360
| protocol::ToServer::ToServerStopping => {
361-
ctx.signal(pegboard::workflows::runner::Forward {
361+
ctx.signal(pegboard::workflows::runner2::Forward {
362362
inner: protocol::ToServer::try_from(msg)
363363
.context("failed to convert message for workflow forwarding")?,
364364
})

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
320320
.await?;
321321

322322
// Send signal to stop actor now that we know it will be sleeping
323-
ctx.signal(crate::workflows::runner::Command {
323+
ctx.signal(crate::workflows::runner2::Command {
324324
inner: protocol::Command::CommandStopActor(
325325
protocol::CommandStopActor {
326326
actor_id: input.actor_id.to_string(),
@@ -349,7 +349,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
349349
})
350350
.await?;
351351

352-
ctx.signal(crate::workflows::runner::Command {
352+
ctx.signal(crate::workflows::runner2::Command {
353353
inner: protocol::Command::CommandStopActor(
354354
protocol::CommandStopActor {
355355
actor_id: input.actor_id.to_string(),
@@ -492,7 +492,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
492492
})
493493
.await?;
494494

495-
ctx.signal(crate::workflows::runner::Command {
495+
ctx.signal(crate::workflows::runner2::Command {
496496
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
497497
actor_id: input.actor_id.to_string(),
498498
generation: state.generation,
@@ -506,7 +506,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
506506
Main::Destroy(_) => {
507507
// If allocated, send stop actor command
508508
if let Some(runner_workflow_id) = state.runner_workflow_id {
509-
ctx.signal(crate::workflows::runner::Command {
509+
ctx.signal(crate::workflows::runner2::Command {
510510
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
511511
actor_id: input.actor_id.to_string(),
512512
generation: state.generation,
@@ -623,7 +623,7 @@ async fn handle_stopped(
623623
if let (StoppedVariant::Lost { .. }, Some(old_runner_workflow_id)) =
624624
(&variant, old_runner_workflow_id)
625625
{
626-
ctx.signal(crate::workflows::runner::Command {
626+
ctx.signal(crate::workflows::runner2::Command {
627627
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
628628
actor_id: input.actor_id.to_string(),
629629
generation: state.generation,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ pub async fn spawn_actor(
517517
.send()
518518
.await?;
519519

520-
ctx.signal(crate::workflows::runner::Command {
520+
ctx.signal(crate::workflows::runner2::Command {
521521
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
522522
actor_id: input.actor_id.to_string(),
523523
generation,
@@ -576,7 +576,7 @@ pub async fn spawn_actor(
576576
})
577577
.await?;
578578

579-
ctx.signal(crate::workflows::runner::Command {
579+
ctx.signal(crate::workflows::runner2::Command {
580580
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
581581
actor_id: input.actor_id.to_string(),
582582
generation,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod actor;
22
pub mod runner;
3+
pub mod runner2;

0 commit comments

Comments
 (0)