Skip to content

Commit 45c6cc6

Browse files
committed
feat(workflows): defererred sqlite tasks
1 parent 1318c02 commit 45c6cc6

File tree

16 files changed

+196
-172
lines changed

16 files changed

+196
-172
lines changed

docker/dev-full/grafana/dashboards/chirp-workflow.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -1662,14 +1662,14 @@
16621662
"uid": "prometheus"
16631663
},
16641664
"editorMode": "code",
1665-
"expr": "sum(increase(rivet_chirp_workflow_branch_upsert_duration_bucket{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (le)",
1665+
"expr": "sum(increase(rivet_chirp_workflow_loop_iteration_duration_bucket{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (le)",
16661666
"format": "heatmap",
16671667
"legendFormat": "{{le}}",
16681668
"range": true,
16691669
"refId": "A"
16701670
}
16711671
],
1672-
"title": "Branch Upsert Duration",
1672+
"title": "Loop Iteration Duration",
16731673
"tooltip": {
16741674
"show": true,
16751675
"showHistogram": true
@@ -1779,13 +1779,13 @@
17791779
"uid": "prometheus"
17801780
},
17811781
"editorMode": "code",
1782-
"expr": "sum(irate(rivet_chirp_workflow_branch_upsert_duration_count{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (workflow_name)",
1782+
"expr": "sum(irate(rivet_chirp_workflow_loop_iteration_duration_count{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (workflow_name)",
17831783
"legendFormat": "{{workflow_name}}",
17841784
"range": true,
17851785
"refId": "A"
17861786
}
17871787
],
1788-
"title": "Branch Events Per Second",
1788+
"title": "Loop Events Per Second",
17891789
"type": "timeseries"
17901790
},
17911791
{

examples/system-test/yarn.lock

-8
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,6 @@ __metadata:
277277
languageName: node
278278
linkType: hard
279279

280-
"@types/deno@npm:^2.2.0":
281-
version: 2.2.0
282-
resolution: "@types/deno@npm:2.2.0"
283-
checksum: 10c0/cb45bbffe66a3008224a509c6bcb338921cc68b9045363f77ba5d84650d879b8fd4c810db24369a93fbce4a8e2855808bb141c0447feb47d911a7512ba374bde
284-
languageName: node
285-
linkType: hard
286-
287280
"@types/node@npm:*, @types/node@npm:^22.13.9":
288281
version: 22.13.9
289282
resolution: "@types/node@npm:22.13.9"
@@ -1526,7 +1519,6 @@ __metadata:
15261519
"@hono/node-ws": "npm:^1.1.0"
15271520
"@rivet-gg/actor-core": "npm:^5.1.2"
15281521
"@rivet-gg/api": "npm:^24.6.2"
1529-
"@types/deno": "npm:^2.2.0"
15301522
"@types/node": "npm:^22.13.9"
15311523
"@types/ws": "npm:^8.18.0"
15321524
hono: "npm:^4.6.17"

packages/common/cache/build/src/driver.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,22 @@ impl moka::Expiry<String, ExpiringValue> for ValueExpiry {
370370
}
371371

372372
/// In-memory cache driver implementation using the moka crate
373-
#[derive(Clone, Debug)]
373+
#[derive(Clone)]
374374
pub struct InMemoryDriver {
375375
service_name: String,
376376
cache: Cache<String, ExpiringValue>,
377377
/// In-memory rate limiting store - maps keys to hit counts with expiration
378378
rate_limits: Cache<String, ExpiringValue>,
379379
}
380380

381+
impl Debug for InMemoryDriver {
382+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383+
f.debug_struct("InMemoryDriver")
384+
.field("service_name", &self.service_name)
385+
.finish()
386+
}
387+
}
388+
381389
impl InMemoryDriver {
382390
pub fn new(service_name: String, max_capacity: u64) -> Self {
383391
// Create a cache with ValueExpiry implementation for custom expiration times
@@ -391,7 +399,7 @@ impl InMemoryDriver {
391399
.build();
392400

393401
Self {
394-
service_name, // Kept for future expansion (e.g., namespacing keys by service)
402+
service_name,
395403
cache,
396404
rate_limits,
397405
}

packages/common/chirp-workflow/core/src/builder/workflow/message.rs

+18-26
Original file line numberDiff line numberDiff line change
@@ -107,32 +107,24 @@ impl<'a, M: Message> MessageBuilder<'a, M> {
107107
let tags = serde_json::Value::Object(self.tags);
108108
let tags2 = tags.clone();
109109

110-
let (msg, write) = tokio::join!(
111-
async {
112-
self.ctx
113-
.db()
114-
.commit_workflow_message_send_event(
115-
self.ctx.workflow_id(),
116-
&location,
117-
self.version,
118-
&tags,
119-
M::NAME,
120-
&body_val,
121-
self.ctx.loop_location(),
122-
)
123-
.await
124-
},
125-
async {
126-
if self.wait {
127-
self.ctx.msg_ctx().message_wait(tags2, self.body).await
128-
} else {
129-
self.ctx.msg_ctx().message(tags2, self.body).await
130-
}
131-
},
132-
);
133-
134-
msg.map_err(GlobalError::raw)?;
135-
write.map_err(GlobalError::raw)?;
110+
self.ctx
111+
.db()
112+
.commit_workflow_message_send_event(
113+
self.ctx.workflow_id(),
114+
&location,
115+
self.version,
116+
&tags,
117+
M::NAME,
118+
&body_val,
119+
self.ctx.loop_location(),
120+
)
121+
.await?;
122+
123+
if self.wait {
124+
self.ctx.msg_ctx().message_wait(tags2, self.body).await?;
125+
} else {
126+
self.ctx.msg_ctx().message(tags2, self.body).await?;
127+
}
136128

137129
let dt = start_instant.elapsed().as_secs_f64();
138130
metrics::MESSAGE_SEND_DURATION

packages/common/chirp-workflow/core/src/ctx/workflow.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -774,8 +774,6 @@ impl WorkflowCtx {
774774
.map_err(WorkflowError::SerializeLoopOutput)
775775
.map_err(GlobalError::raw)?;
776776

777-
let start_instant = Instant::now();
778-
779777
// Insert event before loop is run so the history is consistent
780778
self.db
781779
.upsert_workflow_loop_event(
@@ -790,11 +788,6 @@ impl WorkflowCtx {
790788
)
791789
.await?;
792790

793-
let dt = start_instant.elapsed().as_secs_f64();
794-
metrics::BRANCH_UPSERT_DURATION
795-
.with_label_values(&[&self.name])
796-
.observe(dt);
797-
798791
(0, state, None)
799792
};
800793

@@ -875,7 +868,7 @@ impl WorkflowCtx {
875868
.map_err(GlobalError::raw)?;
876869

877870
let dt = start_instant.elapsed().as_secs_f64();
878-
metrics::BRANCH_UPSERT_DURATION
871+
metrics::LOOP_ITERATION_DURATION
879872
.with_label_values(&[&self.name])
880873
.observe(dt - dt2);
881874
}
@@ -910,7 +903,7 @@ impl WorkflowCtx {
910903
.map_err(GlobalError::raw)?;
911904

912905
let dt = start_instant.elapsed().as_secs_f64();
913-
metrics::BRANCH_UPSERT_DURATION
906+
metrics::LOOP_ITERATION_DURATION
914907
.with_label_values(&[&self.name])
915908
.observe(dt - dt2);
916909

0 commit comments

Comments
 (0)