Skip to content

Commit 902bc29

Browse files
committed
feat: compress sqlite db (#2278)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
1 parent ea92b9d commit 902bc29

File tree

10 files changed

+234
-79
lines changed

10 files changed

+234
-79
lines changed

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,7 @@ impl Database for DatabaseFdbSqliteNats {
205205
fn from_pools(pools: rivet_pools::Pools) -> Result<Arc<Self>, rivet_pools::Error> {
206206
// Start background flush handler task
207207
let (flush_tx, flush_rx) = mpsc::unbounded_channel();
208-
let pools2 = pools.clone();
209-
tokio::spawn(async move {
210-
if let Err(err) = flush_handler(pools2, flush_rx).await {
211-
tracing::error!(
212-
?err,
213-
"fdb_sqlite_nats workflow driver flush handler task failed"
214-
);
215-
216-
// Shut down entire runtime
217-
rivet_runtime::shutdown().await;
218-
}
219-
});
208+
tokio::spawn(flush_handler(pools.clone(), flush_rx));
220209

221210
Ok(Arc::new(DatabaseFdbSqliteNats {
222211
pools,
@@ -3237,24 +3226,24 @@ enum WorkflowState {
32373226
Silenced,
32383227
}
32393228

3240-
async fn flush_handler(
3241-
pools: rivet_pools::Pools,
3242-
mut flush_rx: mpsc::UnboundedReceiver<Uuid>,
3243-
) -> WorkflowResult<()> {
3229+
async fn flush_handler(pools: rivet_pools::Pools, mut flush_rx: mpsc::UnboundedReceiver<Uuid>) {
32443230
while let Some(workflow_id) = flush_rx.recv().await {
32453231
tracing::debug!(?workflow_id, "flushing workflow");
32463232

3247-
pools
3233+
if let Err(err) = pools
32483234
.sqlite_manager()
32493235
.flush(vec![
32503236
sqlite::db_name_internal(workflow_id),
32513237
crate::db::sqlite_db_name_data(workflow_id),
32523238
])
3253-
.await?;
3239+
.await
3240+
{
3241+
// TODO: Somehow forward the error to the workflow so it can die
3242+
tracing::error!(?workflow_id, ?err, "failed to flush workflow databases");
3243+
}
32543244
}
32553245

32563246
// If the channel is closed that means the db driver instance was dropped which is not an error
3257-
Ok(())
32583247
}
32593248

32603249
fn value_to_str(v: &serde_json::Value) -> WorkflowResult<String> {

packages/common/fdb-util/src/keys.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub const CLIENTS_BY_REMAINING_MEM: usize = 43;
4545
pub const SQLITE: usize = 44;
4646
pub const INTERNAL: usize = 45;
4747
pub const METADATA: usize = 46;
48+
pub const COMPRESSED_DATA: usize = 47;
4849

4950
// Directories with fdbrs must use string paths instead of tuples
5051
pub mod dir {

packages/common/pools/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ hex.workspace = true
2020
hyper = { version = "0.14" }
2121
hyper-tls = { version = "0.5.0" }
2222
lazy_static = "1.4"
23+
lz4_flex = "0.11.3"
2324
rivet-config.workspace = true
2425
rivet-metrics.workspace = true
2526
service-discovery.workspace = true

packages/common/pools/src/db/sqlite/keys.rs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,6 @@ impl DbDataKey {
1111
}
1212
}
1313

14-
//impl FormalChunkedKey for SqliteDbKey {
15-
// type ChunkKey = SqliteDbKey;
16-
// type Value = Vec<u8>;
17-
//
18-
// fn chunk(&self, chunk: usize) -> Self::ChunkKey {
19-
// SqliteDbKey {
20-
// workflow_id: self.workflow_id,
21-
// chunk,
22-
// }
23-
// }
24-
//
25-
// fn combine(&self, chunks: Vec<FdbValue>) -> Result<Self::Value> {
26-
// TODO
27-
// }
28-
//
29-
// fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
30-
// self.split_ref(value.as_ref())
31-
// }
32-
//}
33-
3414
impl TuplePack for DbDataKey {
3515
fn pack<W: std::io::Write>(
3616
&self,
@@ -42,27 +22,68 @@ impl TuplePack for DbDataKey {
4222
}
4323
}
4424

25+
/// Uncompressed data.
4526
pub struct DbDataChunkKey {
27+
#[allow(dead_code)]
28+
pub db_name_segment: Arc<Vec<u8>>,
29+
pub chunk: usize,
30+
}
31+
32+
impl<'de> TupleUnpack<'de> for DbDataChunkKey {
33+
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
34+
let (input, (_, db_name_segment, _, chunk)) =
35+
<(usize, Vec<u8>, usize, usize)>::unpack(input, tuple_depth)?;
36+
let v = DbDataChunkKey {
37+
db_name_segment: Arc::new(db_name_segment),
38+
chunk,
39+
};
40+
41+
Ok((input, v))
42+
}
43+
}
44+
45+
pub struct CompressedDbDataKey {
46+
db_name_segment: Arc<Vec<u8>>,
47+
}
48+
49+
impl CompressedDbDataKey {
50+
pub fn new(db_name_segment: Arc<Vec<u8>>) -> Self {
51+
CompressedDbDataKey { db_name_segment }
52+
}
53+
}
54+
55+
impl TuplePack for CompressedDbDataKey {
56+
fn pack<W: std::io::Write>(
57+
&self,
58+
w: &mut W,
59+
tuple_depth: TupleDepth,
60+
) -> std::io::Result<VersionstampOffset> {
61+
let t = (DBS, &*self.db_name_segment, COMPRESSED_DATA);
62+
t.pack(w, tuple_depth)
63+
}
64+
}
65+
66+
pub struct CompressedDbDataChunkKey {
4667
pub db_name_segment: Arc<Vec<u8>>,
4768
pub chunk: usize,
4869
}
4970

50-
impl TuplePack for DbDataChunkKey {
71+
impl TuplePack for CompressedDbDataChunkKey {
5172
fn pack<W: std::io::Write>(
5273
&self,
5374
w: &mut W,
5475
tuple_depth: TupleDepth,
5576
) -> std::io::Result<VersionstampOffset> {
56-
let t = (DBS, &*self.db_name_segment, DATA, self.chunk);
77+
let t = (DBS, &*self.db_name_segment, COMPRESSED_DATA, self.chunk);
5778
t.pack(w, tuple_depth)
5879
}
5980
}
6081

61-
impl<'de> TupleUnpack<'de> for DbDataChunkKey {
82+
impl<'de> TupleUnpack<'de> for CompressedDbDataChunkKey {
6283
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
6384
let (input, (_, db_name_segment, _, chunk)) =
6485
<(usize, Vec<u8>, usize, usize)>::unpack(input, tuple_depth)?;
65-
let v = DbDataChunkKey {
86+
let v = CompressedDbDataChunkKey {
6687
db_name_segment: Arc::new(db_name_segment),
6788
chunk,
6889
};

0 commit comments

Comments
 (0)