diff --git a/engine/src/invocation/mod.rs b/engine/src/invocation/mod.rs index 4adb344f4..429831b2e 100644 --- a/engine/src/invocation/mod.rs +++ b/engine/src/invocation/mod.rs @@ -155,6 +155,9 @@ impl InvocationHandler { acc.invocations_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed); acc.invocations_success.fetch_add(1, std::sync::atomic::Ordering::Relaxed); acc.increment_function(&function_id); + if !crate::workers::telemetry::is_iii_standard_function_id(&function_id) { + let _ = acc.first_user_success_fn.set(function_id.clone()); + } let _ = invocation.sender.send(Ok(result)); } @@ -190,6 +193,9 @@ impl InvocationHandler { acc.invocations_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed); acc.invocations_error.fetch_add(1, std::sync::atomic::Ordering::Relaxed); acc.increment_function(&function_id); + if !crate::workers::telemetry::is_iii_standard_function_id(&function_id) { + let _ = acc.first_user_failure_fn.set(function_id.clone()); + } let _ = invocation.sender.send(Err(error)); } diff --git a/engine/src/workers/observability/metrics.rs b/engine/src/workers/observability/metrics.rs index 5bf308484..789102f3f 100644 --- a/engine/src/workers/observability/metrics.rs +++ b/engine/src/workers/observability/metrics.rs @@ -354,6 +354,10 @@ pub struct MetricsAccumulator { pub invocations_by_function: dashmap::DashMap, pub workers_spawns: std::sync::atomic::AtomicU64, pub workers_deaths: std::sync::atomic::AtomicU64, + /// Set once on the first successful invocation of a user-defined function. + pub first_user_success_fn: std::sync::OnceLock, + /// Set once on the first failed invocation of a user-defined function. + pub first_user_failure_fn: std::sync::OnceLock, } impl Default for MetricsAccumulator { @@ -366,6 +370,8 @@ impl Default for MetricsAccumulator { invocations_by_function: dashmap::DashMap::new(), workers_spawns: std::sync::atomic::AtomicU64::new(0), workers_deaths: std::sync::atomic::AtomicU64::new(0), + first_user_success_fn: std::sync::OnceLock::new(), + first_user_failure_fn: std::sync::OnceLock::new(), } } } @@ -3449,6 +3455,48 @@ mod tests { assert_eq!(counts.get("func_y"), Some(&1)); } + #[test] + fn test_first_user_success_fn_set_once() { + let acc = MetricsAccumulator::default(); + assert!(acc.first_user_success_fn.get().is_none()); + + let _ = acc.first_user_success_fn.set("math::add".to_string()); + assert_eq!(acc.first_user_success_fn.get(), Some(&"math::add".to_string())); + + let _ = acc.first_user_success_fn.set("math::multiply".to_string()); + assert_eq!( + acc.first_user_success_fn.get(), + Some(&"math::add".to_string()), + "OnceLock should retain the first value" + ); + } + + #[test] + fn test_first_user_failure_fn_set_once() { + let acc = MetricsAccumulator::default(); + assert!(acc.first_user_failure_fn.get().is_none()); + + let _ = acc.first_user_failure_fn.set("math::add".to_string()); + assert_eq!(acc.first_user_failure_fn.get(), Some(&"math::add".to_string())); + + let _ = acc.first_user_failure_fn.set("other::fn".to_string()); + assert_eq!( + acc.first_user_failure_fn.get(), + Some(&"math::add".to_string()), + "OnceLock should retain the first value" + ); + } + + #[test] + fn test_first_user_fns_independent() { + let acc = MetricsAccumulator::default(); + let _ = acc.first_user_success_fn.set("math::add".to_string()); + let _ = acc.first_user_failure_fn.set("math::divide".to_string()); + + assert_eq!(acc.first_user_success_fn.get(), Some(&"math::add".to_string())); + assert_eq!(acc.first_user_failure_fn.get(), Some(&"math::divide".to_string())); + } + // ========================================================================= // TimeIndexedMetricStorage len / is_empty // ========================================================================= diff --git a/engine/src/workers/telemetry/mod.rs b/engine/src/workers/telemetry/mod.rs index a3ddb2e64..96a822151 100644 --- a/engine/src/workers/telemetry/mod.rs +++ b/engine/src/workers/telemetry/mod.rs @@ -58,6 +58,7 @@ impl Default for TelemetryConfig { struct ProjectContext { project_id: Option, project_name: Option, + source: Option, } fn find_project_root() -> Option { @@ -79,12 +80,19 @@ fn find_project_root() -> Option { None } -fn read_project_ini(root: &std::path::Path) -> Option<(Option, Option)> { +struct ProjectIniData { + project_id: Option, + project_name: Option, + source: Option, +} + +fn read_project_ini(root: &std::path::Path) -> Option { let ini_path = root.join(".iii").join("project.ini"); let contents = std::fs::read_to_string(&ini_path).ok()?; let mut project_id: Option = None; let mut project_name: Option = None; + let mut source: Option = None; for line in contents.lines() { let line = line.trim(); @@ -98,11 +106,20 @@ fn read_project_ini(root: &std::path::Path) -> Option<(Option, Option bool { +pub fn is_iii_standard_function_id(id: &str) -> bool { id.starts_with("engine::") || id.starts_with("state::") || id.starts_with("stream::") - || id == "iii::durable::publish" - || id == "publish" - || id.starts_with("bridge.") || id.starts_with("iii::") + || id.starts_with("bridge.") + || id.starts_with("motia::") + || id == "publish" + || id == "motia_step_get" + || id.starts_with("steps::") } fn check_disabled(config: &TelemetryConfig) -> Option { @@ -190,8 +212,6 @@ struct FunctionTriggerData { functions: Vec, trigger_count: usize, trigger_types: Vec, - functions_iii_builtin_count: usize, - functions_non_iii_builtin_count: usize, } struct EngineSnapshot { @@ -217,6 +237,9 @@ fn build_base_properties(snap: &EngineSnapshot) -> serde_json::Map serde_json::Map serde_json::Map FunctionTriggerData { - let mut functions_iii_builtin_count = 0usize; - let mut functions_non_iii_builtin_count = 0usize; - for entry in engine.functions.iter() { - let id = entry.key(); - if is_iii_builtin_function_id(id) { - functions_iii_builtin_count += 1; - } else { - functions_non_iii_builtin_count += 1; - } - } - let functions: Vec = engine .functions .iter() .map(|entry| entry.key().clone()) - .filter(|id| !id.starts_with("engine::")) + .filter(|id| !is_iii_standard_function_id(id)) .collect(); let function_count = functions.len(); @@ -411,8 +415,6 @@ fn collect_functions_and_triggers(engine: &Engine) -> FunctionTriggerData { functions, trigger_count, trigger_types: trigger_types_used.into_iter().collect(), - functions_iii_builtin_count, - functions_non_iii_builtin_count, } } @@ -569,6 +571,27 @@ impl TelemetryContext { } } +const TEMPLATE_POLL_INTERVAL_SECS: u64 = 3; +const TEMPLATE_POLL_TIMEOUT_SECS: u64 = 60 * 60; + +fn build_template_lifecycle_properties( + event_type: &str, + function_id: &str, + source: &str, + project: &ProjectContext, +) -> (String, serde_json::Value) { + let mut props = serde_json::Map::new(); + props.insert("function_id".into(), serde_json::json!(function_id)); + props.insert("source".into(), serde_json::json!(source)); + if let Some(pid) = &project.project_id { + props.insert("project_id".into(), serde_json::json!(pid)); + } + if let Some(pname) = &project.project_name { + props.insert("project_name".into(), serde_json::json!(pname)); + } + (event_type.to_string(), serde_json::Value::Object(props)) +} + pub struct TelemetryWorker { engine: Arc, config: TelemetryConfig, @@ -793,6 +816,70 @@ impl Worker for TelemetryWorker { } }); + // Template lifecycle polling: fires template_success / template_failure + // once each when the first user function succeeds or fails. + let project_ctx = resolve_project_context(None); + if let Some(source) = project_ctx.source { + let client_for_template = Arc::clone(self.active_client()); + let ctx_for_template = self.ctx.clone(); + let project_for_template = resolve_project_context(None); + tokio::spawn(async move { + let mut success_sent = false; + let mut failure_sent = false; + let timeout = std::time::Duration::from_secs(TEMPLATE_POLL_TIMEOUT_SECS); + + loop { + if start_time.elapsed() > timeout || (success_sent && failure_sent) { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs( + TEMPLATE_POLL_INTERVAL_SECS, + )) + .await; + + let acc = + crate::workers::observability::metrics::get_metrics_accumulator(); + + if !success_sent { + if let Some(fn_id) = acc.first_user_success_fn.get() { + let (event_type, props) = build_template_lifecycle_properties( + "template_success", + fn_id, + &source, + &project_for_template, + ); + let event = ctx_for_template.build_event( + &event_type, + props, + None, + ); + let _ = client_for_template.send_event(event).await; + success_sent = true; + } + } + + if !failure_sent { + if let Some(fn_id) = acc.first_user_failure_fn.get() { + let (event_type, props) = build_template_lifecycle_properties( + "template_failure", + fn_id, + &source, + &project_for_template, + ); + let event = ctx_for_template.build_event( + &event_type, + props, + None, + ); + let _ = client_for_template.send_event(event).await; + failure_sent = true; + } + } + } + }); + } + Ok(()) } @@ -1066,11 +1153,42 @@ mod tests { ) .unwrap(); - let result = read_project_ini(dir.path()); - assert!(result.is_some()); - let (project_id, project_name) = result.unwrap(); - assert_eq!(project_id, Some("abc-123".to_string())); - assert_eq!(project_name, Some("my-project".to_string())); + let data = read_project_ini(dir.path()).unwrap(); + assert_eq!(data.project_id, Some("abc-123".to_string())); + assert_eq!(data.project_name, Some("my-project".to_string())); + assert_eq!(data.source, None); + } + + #[test] + fn test_read_project_ini_parses_source() { + let dir = tempfile::tempdir().unwrap(); + let iii_dir = dir.path().join(".iii"); + std::fs::create_dir_all(&iii_dir).unwrap(); + std::fs::write( + iii_dir.join("project.ini"), + "project_id=abc-123\nproject_name=my-project\nsource=quickstart\n", + ) + .unwrap(); + + let data = read_project_ini(dir.path()).unwrap(); + assert_eq!(data.project_id, Some("abc-123".to_string())); + assert_eq!(data.project_name, Some("my-project".to_string())); + assert_eq!(data.source, Some("quickstart".to_string())); + } + + #[test] + fn test_read_project_ini_source_none_when_missing() { + let dir = tempfile::tempdir().unwrap(); + let iii_dir = dir.path().join(".iii"); + std::fs::create_dir_all(&iii_dir).unwrap(); + std::fs::write( + iii_dir.join("project.ini"), + "project_id=abc-123\nproject_name=my-project\n", + ) + .unwrap(); + + let data = read_project_ini(dir.path()).unwrap(); + assert_eq!(data.source, None); } #[test] @@ -1560,22 +1678,39 @@ mod tests { assert_eq!(result.trigger_count, 0); assert!(result.functions.is_empty()); assert!(result.trigger_types.is_empty()); - assert_eq!(result.functions_iii_builtin_count, 0); - assert_eq!(result.functions_non_iii_builtin_count, 0); } #[test] - fn test_collect_functions_and_triggers_filters_engine_prefix() { + fn test_collect_functions_and_triggers_filters_engine_and_iii_prefixes() { let engine = make_test_engine(); let handler: Arc = Arc::new(|_inv_id, _input, _session| { Box::pin(async { crate::function::FunctionResult::NoResult }) }); + + for id in &[ + "engine::internal_fn", + "iii::durable::publish", + "iii::queue::redrive", + ] { + engine.functions.register_function( + id.to_string(), + crate::function::Function { + handler: handler.clone(), + _function_id: id.to_string(), + _description: None, + request_format: None, + response_format: None, + metadata: None, + }, + ); + } + engine.functions.register_function( - "engine::internal_fn".to_string(), + "user::my_function".to_string(), crate::function::Function { handler: handler.clone(), - _function_id: "engine::internal_fn".to_string(), + _function_id: "user::my_function".to_string(), _description: None, request_format: None, response_format: None, @@ -1584,10 +1719,10 @@ mod tests { ); engine.functions.register_function( - "user::my_function".to_string(), + "math::add".to_string(), crate::function::Function { handler, - _function_id: "user::my_function".to_string(), + _function_id: "math::add".to_string(), _description: None, request_format: None, response_format: None, @@ -1596,11 +1731,10 @@ mod tests { ); let result = collect_functions_and_triggers(&engine); - assert_eq!(result.function_count, 1); - assert_eq!(result.functions.len(), 1); - assert_eq!(result.functions[0], "user::my_function"); - assert_eq!(result.functions_iii_builtin_count, 1); - assert_eq!(result.functions_non_iii_builtin_count, 1); + assert_eq!(result.function_count, 2); + let mut fns = result.functions.clone(); + fns.sort(); + assert_eq!(fns, vec!["math::add", "user::my_function"]); } #[test] @@ -1774,15 +1908,23 @@ mod tests { } #[test] - fn test_is_iii_builtin_function_id() { - assert!(is_iii_builtin_function_id("engine::x")); - assert!(is_iii_builtin_function_id("state::get")); - assert!(is_iii_builtin_function_id("stream::list")); - assert!(is_iii_builtin_function_id("iii::durable::publish")); - assert!(is_iii_builtin_function_id("publish")); - assert!(is_iii_builtin_function_id("bridge.invoke")); - assert!(is_iii_builtin_function_id("iii::queue::redrive")); - assert!(!is_iii_builtin_function_id("orders::process")); + fn test_is_iii_standard_function_id() { + assert!(is_iii_standard_function_id("engine::x")); + assert!(is_iii_standard_function_id("state::get")); + assert!(is_iii_standard_function_id("stream::list")); + assert!(is_iii_standard_function_id("iii::durable::publish")); + assert!(is_iii_standard_function_id("publish")); + assert!(is_iii_standard_function_id("bridge.invoke")); + assert!(is_iii_standard_function_id("iii::queue::redrive")); + assert!(is_iii_standard_function_id("motia::stream::authenticate")); + assert!(is_iii_standard_function_id("motia::stream::join")); + assert!(is_iii_standard_function_id("motia::stream::leave")); + assert!(is_iii_standard_function_id("motia_step_get")); + assert!(is_iii_standard_function_id("steps::my-step::trigger::http(GET /api)")); + assert!(is_iii_standard_function_id("steps::my-step::trigger::http(GET /api)::middleware::0")); + assert!(is_iii_standard_function_id("steps::my-step::trigger::http(GET /api)::conditions::1")); + assert!(!is_iii_standard_function_id("orders::process")); + assert!(!is_iii_standard_function_id("math::add")); } // ========================================================================= @@ -2133,4 +2275,154 @@ mod tests { reset_telemetry_globals(); } + + // ========================================================================= + // build_template_lifecycle_properties + // ========================================================================= + + #[test] + fn test_template_success_properties() { + let project = ProjectContext { + project_id: Some("proj-123".to_string()), + project_name: Some("my-project".to_string()), + source: Some("quickstart".to_string()), + }; + + let (event_type, props) = build_template_lifecycle_properties( + "template_success", + "math::add", + "quickstart", + &project, + ); + + assert_eq!(event_type, "template_success"); + assert_eq!(props["function_id"], "math::add"); + assert_eq!(props["source"], "quickstart"); + assert_eq!(props["project_id"], "proj-123"); + assert_eq!(props["project_name"], "my-project"); + } + + #[test] + fn test_template_failure_properties() { + let project = ProjectContext { + project_id: Some("proj-456".to_string()), + project_name: Some("other".to_string()), + source: Some("quickstart".to_string()), + }; + + let (event_type, props) = build_template_lifecycle_properties( + "template_failure", + "math::divide", + "quickstart", + &project, + ); + + assert_eq!(event_type, "template_failure"); + assert_eq!(props["function_id"], "math::divide"); + assert_eq!(props["source"], "quickstart"); + assert_eq!(props["project_id"], "proj-456"); + } + + #[test] + fn test_template_properties_with_custom_source() { + let project = ProjectContext { + project_id: Some("proj-789".to_string()), + project_name: None, + source: Some("multi-worker-orchestration".to_string()), + }; + + let (_, props) = build_template_lifecycle_properties( + "template_success", + "orders::process", + "multi-worker-orchestration", + &project, + ); + + assert_eq!(props["source"], "multi-worker-orchestration"); + assert_eq!(props["function_id"], "orders::process"); + assert!( + props.get("project_name").is_none(), + "None project_name should be omitted" + ); + } + + #[test] + fn test_template_properties_no_project_id() { + let project = ProjectContext { + project_id: None, + project_name: None, + source: Some("quickstart".to_string()), + }; + + let (_, props) = build_template_lifecycle_properties( + "template_success", + "math::add", + "quickstart", + &project, + ); + + assert!(props.get("project_id").is_none()); + assert!(props.get("project_name").is_none()); + assert_eq!(props["function_id"], "math::add"); + assert_eq!(props["source"], "quickstart"); + } + + #[test] + fn test_template_success_event_construction() { + let ctx = TelemetryContext { + device_id: "test-device".to_string(), + env_info: make_env_info(), + }; + + let project = ProjectContext { + project_id: Some("proj-1".to_string()), + project_name: Some("test-proj".to_string()), + source: Some("quickstart".to_string()), + }; + + let (event_type, props) = build_template_lifecycle_properties( + "template_success", + "math::add", + "quickstart", + &project, + ); + + let event = ctx.build_event(&event_type, props, None); + assert_eq!(event.event_type, "template_success"); + assert_eq!(event.device_id, "test-device"); + assert_eq!(event.platform, "iii-engine"); + assert_eq!(event.event_properties["function_id"], "math::add"); + assert_eq!(event.event_properties["source"], "quickstart"); + } + + #[test] + fn test_template_failure_event_construction() { + let ctx = TelemetryContext { + device_id: "test-device".to_string(), + env_info: make_env_info(), + }; + + let project = ProjectContext { + project_id: Some("proj-1".to_string()), + project_name: Some("test-proj".to_string()), + source: Some("quickstart".to_string()), + }; + + let (event_type, props) = build_template_lifecycle_properties( + "template_failure", + "math::divide", + "quickstart", + &project, + ); + + let event = ctx.build_event(&event_type, props, None); + assert_eq!(event.event_type, "template_failure"); + assert_eq!(event.event_properties["function_id"], "math::divide"); + } + + #[test] + fn test_template_constants() { + assert_eq!(TEMPLATE_POLL_INTERVAL_SECS, 3); + assert_eq!(TEMPLATE_POLL_TIMEOUT_SECS, 60 * 60); + } }