Skip to content

Commit

Permalink
fix: move actor state to fdb
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato authored and NathanFlurry committed Feb 20, 2025
1 parent 33697b0 commit 1b5874e
Show file tree
Hide file tree
Showing 36 changed files with 673 additions and 362 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion packages/common/chirp-workflow/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ rivet-runtime.workspace = true
rivet-util.workspace = true
serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
sqlite-util.workspace = true
strum = { version = "0.26", features = ["derive"] }
thiserror = "1.0.59"
tokio = { version = "1.40.0", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl ActivityCtx {
}

pub async fn sqlite_for_workflow(&self, workflow_id: Uuid) -> GlobalResult<SqlitePool> {
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, false).await
common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, true).await
}

// Backwards compatibility
Expand Down
52 changes: 33 additions & 19 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,24 +380,23 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
.unpack::<JustUuid>(entry.key())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

if current_workflow_id
.map(|x| workflow_id != x)
.unwrap_or_default()
{
// Save if matches query
if matching_tags == tags.len() && name_matches && state_matches {
workflow_ids.push(workflow_id);

if workflow_ids.len() >= 100 {
current_workflow_id = None;
break;
if let Some(curr) = current_workflow_id {
if workflow_id != curr {
// Save if matches query
if matching_tags == tags.len() && name_matches && state_matches {
workflow_ids.push(curr);

if workflow_ids.len() >= 100 {
current_workflow_id = None;
break;
}
}

// Reset state
matching_tags = 0;
name_matches = name.is_none();
state_matches = state.is_none() || state == Some(WorkflowState::Dead);
}

// Reset state
matching_tags = 0;
name_matches = name.is_none();
state_matches = state.is_none() || state == Some(WorkflowState::Dead);
}

current_workflow_id = Some(workflow_id);
Expand Down Expand Up @@ -499,13 +498,24 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
.serialize(())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

let has_wake_condition_key = keys::workflow::HasWakeConditionKey::new(workflow_id);
tx.set(
&self.subspace.pack(&has_wake_condition_key),
&has_wake_condition_key
.serialize(())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);
}

Ok(())
}
})
.await
.map_err(Into::into)
.await?;

self.wake_worker();

Ok(())
}

async fn get_workflow_history(
Expand Down Expand Up @@ -721,7 +731,11 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
sql_fetch_all!(
[SqlStub {}, ActivityErrorRow, pool]
"
SELECT json(location) AS location, error, COUNT(error), MAX(ts) AS latest_ts
SELECT
json(location) AS location,
error,
COUNT(error) AS count,
MAX(ts) AS latest_ts
FROM workflow_activity_errors
GROUP BY location, error
ORDER BY latest_ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashMap;
use include_dir::{include_dir, Dir, File};
use indoc::indoc;
use rivet_pools::prelude::*;
use sqlite_util::SqlitePoolExt;
use sqlx::Acquire;
use uuid::Uuid;

Expand Down
20 changes: 11 additions & 9 deletions packages/common/hub-embed/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
assert!(output.status.success(), "yarn install failed");

// println!("Running yarn build");
// let output = Command::new("yarn")
// .current_dir(&hub_path)
// .args(["dlx", "turbo", "run", "build:embedded"])
// .env("VITE_APP_API_URL", "__APP_API_URL__")
// .output()?;
// println!("stdout:\n{}", String::from_utf8_lossy(&output.stdout));
// println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
// assert!(output.status.success(), "hub build failed");
if std::env::var("RIVET_SKIP_BUILD_HUB").is_err() {
println!("Running yarn build");
let output = Command::new("yarn")
.current_dir(&hub_path)
.args(["dlx", "turbo", "run", "build:embedded"])
.env("VITE_APP_API_URL", "__APP_API_URL__")
.output()?;
println!("stdout:\n{}", String::from_utf8_lossy(&output.stdout));
println!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
assert!(output.status.success(), "hub build failed");
}

// Copy dist directory to out_dir
let dist_path = hub_path.join("dist");
Expand Down
1 change: 0 additions & 1 deletion packages/common/pools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ rand = "0.8"
rivet-config.workspace = true
rivet-metrics.workspace = true
service-discovery.workspace = true
sqlite-util.workspace = true
tempfile = "3.13.0"
thiserror = "1.0"
tokio = { version = "1.40", features = ["tracing"] }
Expand Down
4 changes: 2 additions & 2 deletions packages/common/pools/src/db/sqlite/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dirs;
use fdb_util::prelude::*;
use fdb_util::{SERIALIZABLE, prelude::*};
use foundationdb::{self as fdb, options::StreamingMode, tuple::Subspace, FdbBindingError};
use uuid::Uuid;

Expand Down Expand Up @@ -268,7 +268,7 @@ impl SqlitePoolManager {
mode: StreamingMode::WantAll,
..(&db_data_subspace).into()
},
false,
SERIALIZABLE,
);

// Aggregate data
Expand Down
3 changes: 2 additions & 1 deletion packages/common/server-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ clap = { version = "4.3", features = ["derive"] }
colored_json = "5.0.0"
futures-util = "0.3"
global-error.workspace = true
hex = "0.4"
hex.workspace = true
include_dir = "0.7.4"
indoc = "2.0.5"
reqwest = "0.12.9"
foundationdb.workspace = true
rivet-api.workspace = true
rivet-config.workspace = true
rivet-logs.workspace = true
rivet-migrate.workspace = true
rivet-pools.workspace = true
rivet-runtime.workspace = true
Expand Down
33 changes: 24 additions & 9 deletions packages/common/server-cli/src/commands/fdb/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@ use crate::util::{
format::indent_string,
};

// #[derive(Parser)]
// #[command(name = "")]
// struct Command {
// #[command(subcommand)]
// command: Commands,
// }

// TODO: Tab completion
#[derive(Parser)]
#[command(name = "")]
pub enum SubCommand {
Expand Down Expand Up @@ -72,6 +66,9 @@ pub enum SubCommand {
/// In what manner to clear the current key. Range clears the entire subspace.
#[arg(value_enum)]
clear_type: Option<ClearType>,
/// Disable confirmation prompt.
#[arg(short = 'y', long, default_value_t = false)]
yes: bool,
},

#[command(name = "exit")]
Expand All @@ -95,7 +92,10 @@ impl SubCommand {
Err(err) => println!("{err:#}"),
},
// TODO: chunks
SubCommand::Get { type_hint, chunks: _chunks } => {
SubCommand::Get {
type_hint,
chunks: _chunks,
} => {
let fut = pool.run(|tx, _mc| {
let current_tuple = current_tuple.clone();
async move {
Expand Down Expand Up @@ -206,6 +206,7 @@ impl SubCommand {
);

last_key = curr.clone();
current_hidden_subspace = None;
hidden_count = 0;
}

Expand Down Expand Up @@ -323,7 +324,21 @@ impl SubCommand {
Err(_) => println!("txn timed out"),
}
}
SubCommand::Clear { clear_type } => {
SubCommand::Clear { clear_type, yes } => {
if !yes {
let term = rivet_term::terminal();
let response = rivet_term::prompt::PromptBuilder::default()
.message("Are you sure?")
.build()
.expect("failed to build prompt")
.bool(&term)
.await
.expect("failed to show prompt");
if !response {
return CommandResult::Error;
}
}

let fut = pool.run(|tx, _mc| {
let current_tuple = current_tuple.clone();
async move {
Expand Down
12 changes: 9 additions & 3 deletions packages/common/server-cli/src/commands/fdb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::result::Result::{Err, Ok};
use std::{
path::Path,
result::Result::{Err, Ok},
};

use anyhow::*;
use clap::Parser;
Expand Down Expand Up @@ -29,7 +32,10 @@ impl Opts {
run_commands(&pool, &mut current_tuple, query).await;
} else {
let mut rl = DefaultEditor::new()?;
rl.load_history("/tmp/history.txt")?;
let history_location = Path::new("/tmp/rivet-server-fdb-history");
if history_location.exists() {
rl.load_history(&history_location)?;
}

println!("FDB Viewer\n");

Expand All @@ -53,7 +59,7 @@ impl Opts {
}
}

rl.save_history("/tmp/history.txt")?;
rl.save_history(&history_location)?;
}

Ok(())
Expand Down
20 changes: 20 additions & 0 deletions packages/common/server-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{path::Path, time::Duration};

use anyhow::*;
use clap::Parser;
use rivet_service_manager::{CronConfig, RunConfig};

// 7 day logs retention
const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60);

#[derive(Parser)]
pub struct Opts {
#[arg(long)]
Expand Down Expand Up @@ -42,6 +47,21 @@ impl Opts {
config: rivet_config::Config,
run_config: &RunConfig,
) -> Result<()> {
// Redirect logs if enabled on the edge
if config
.server()
.ok()
.and_then(|x| x.rivet.edge.as_ref())
.and_then(|x| x.redirect_logs)
.unwrap_or_default()
{
let logs_path = Path::new("/var/log/rivet-edge-server");
std::fs::create_dir_all(logs_path)?;
rivet_logs::Logs::new(logs_path.to_path_buf(), LOGS_RETENTION)
.start()
.await?;
}

// Provision services before starting server
if !self.skip_provision {
s3_util::provision(config.clone(), &run_config.s3_buckets).await?;
Expand Down
51 changes: 45 additions & 6 deletions packages/common/server-cli/src/util/fdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ pub enum SimpleTupleValue {

impl SimpleTupleValue {
fn parse(value: &str) -> Self {
if let Ok(v) = value.parse::<u64>() {
SimpleTupleValue::U64(v)
} else if let Ok(v) = value.parse::<i64>() {
if let Ok(v) = value.parse::<i64>() {
SimpleTupleValue::I64(v)
} else if let Ok(v) = value.parse::<u64>() {
SimpleTupleValue::U64(v)
} else if let Ok(v) = value.parse::<f64>() {
SimpleTupleValue::F64(v)
} else if let Ok(v) = Uuid::from_str(value) {
Expand All @@ -54,7 +54,24 @@ impl fmt::Display for SimpleTupleValue {
write!(f, "{}", style(v).green())
}
}
SimpleTupleValue::Bytes(v) => write!(f, "{:?}", style(v).italic()),
SimpleTupleValue::Bytes(v) => {
let hex_string = if v.len() > 512 { &v[..512] } else { v }
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<String>();
write!(f, "{}", style(hex_string).italic())?;

if v.len() > 512 {
write!(
f,
"{} {}",
style("...").italic(),
style(format!("({} bytes)", v.len())).dim()
)?;
}

Ok(())
}
}
}
}
Expand Down Expand Up @@ -218,7 +235,7 @@ impl SimpleValue {
Some("str") => SimpleValue::String(value.to_string()),
Some("bytes") | Some("b") => {
let bytes = hex::decode(value.as_bytes())
.with_context(|| format!("Could not parse `{value:?}` as hex encoded bytes"))?;
.with_context(|| format!("Could not parse `{value}` as hex encoded bytes"))?;
SimpleValue::Bytes(bytes)
}
Some(type_hint) => bail!("unknown type: `{type_hint}`"),
Expand Down Expand Up @@ -258,7 +275,24 @@ impl fmt::Display for SimpleValue {
}
}
SimpleValue::String(v) => write!(f, "{}", style(v).green()),
SimpleValue::Bytes(v) => write!(f, "{:?}", style(v).italic()),
SimpleValue::Bytes(v) => {
let hex_string = if v.len() > 512 { &v[..512] } else { v }
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<String>();
write!(f, "{}", style(hex_string).italic())?;

if v.len() > 512 {
write!(
f,
"{} {}",
style("...").italic(),
style(format!("({} bytes)", v.len())).dim()
)?;
}

Ok(())
}
}
}
}
Expand Down Expand Up @@ -299,6 +333,11 @@ impl SimpleTupleSegment {
Some("uuid") => Uuid::from_str(value)
.map(SimpleTupleValue::Uuid)
.with_context(|| format!("Could not parse `{value}` as UUID"))?,
Some("bytes") | Some("b") => {
let bytes = hex::decode(value.as_bytes())
.with_context(|| format!("Could not parse `{value}` as hex encoded bytes"))?;
SimpleTupleValue::Bytes(bytes)
}
Some("str") => SimpleTupleValue::String(value.to_string()),
Some(prefix) => bail!("unknown type: `{prefix}`"),
_ => SimpleTupleValue::parse(value),
Expand Down
Loading

0 comments on commit 1b5874e

Please sign in to comment.