diff --git a/ISSUES.md b/ISSUES.md new file mode 100644 index 0000000..bf9ee47 --- /dev/null +++ b/ISSUES.md @@ -0,0 +1,49 @@ +# Issues + +## Issue 1: Metric Registry — register, deduplicate, and configure feeder + +**Problem**: There's no way to register a metric expression and have the system remember it. +Developers must manually construct WAVE strings, invoke the TimelineDriver by hand, and +manually configure the feeder with leaf worker names. + +**What to build**: +- A `MetricRegistry` (in-memory for now) that stores registered metrics: name, DSL text, + compiled `TimelineOpGraph`, `AggregationConfig`, and the **leaf routing table** (which + leaf node indices exist and what event columns each leaf needs). +- Deduplication: if a developer registers a DSL expression identical to an existing metric, + return the existing metric instead of creating a new one. +- On registration, derive the leaf routing table by walking the compiled graph. This tells + the feeder which `event-processor` agents to send each event to per session. +- REST API in the dashboard: + - `POST /api/metrics` — register a new metric (accepts DSL text + aggregation config), + compiles it, derives the leaf routing table, and configures the feeder. + - `GET /api/metrics` — list all registered metrics with their agent templates. +- The DSL parser (`timeline-dsl`) already exists — use it to compile DSL text to `TimelineOpGraph`. +- The `GET /api/metrics` endpoint must expose the leaf routing table per metric, so that + feeders can poll it to discover new metrics and start routing events to the correct leaves. + +**Scope**: Dashboard service (registry + API). Feeder changes are a separate concern — +the timeline project does not manage feeders. See "Feeder deployment — open design question" +in the README for the trade-offs between reusing existing feeders vs spinning up new ones. +No changes to timeline-core or Golem agents. + +--- + +## Issue 2: Dashboard UI — DSL editor, deploy button, per-metric views + +**Problem**: The current dashboard has hardcoded CIRR presets. Developers can't write +their own metrics or inspect them dynamically. + +**What to build**: +- **Deploy Metric tab**: DSL text editor + optional aggregation config + "Deploy Metric" button. + On deploy, calls `POST /api/metrics`, shows the agent template (full list of Golem agent IDs + with `{session-id}` placeholder and business descriptions derived from the compiled graph). +- **Session lookup**: Developer enters a session ID they know (from their application logs or + event stream) and a query time → Node Inspector queries all agents for that session. + No enumeration of all sessions — that's impossible at scale. +- **Per-metric aggregation**: Aggregator worker names are derived from the metric's + `group_by_column` — show the correct preset buttons per metric. + +**Depends on**: Issue 1. + + diff --git a/README.md b/README.md index fb49c65..b7c9411 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,14 @@ Watch the talk from Afsal at [LambdaConf:2024:Estes-Park:Colorado](https://www.y ![img_1.png](img_1.png) +### A quick demo + +```sh +cargo make demo + +# go to the dashboard at http://localhost:3000 +``` + ## Timeline Query DSL A text-based DSL for expressing temporal analytics over event streams. Write a query, deploy it, @@ -92,14 +100,14 @@ This will: The CIRR expression compiles into **8 Golem agents per session**. The **Node Inspector** tab in the dashboard lets you query every sub-computation's result and its progress at any point in time: ``` -node-1 DurationWhere ← root: cumulative seconds where CIRR is true -node-2 And ← all 3 conditions combined -node-3 And ← has_existed ∧ ¬has_existed_within -node-4 TlHasExisted ← LEAF: has playerStateChange == "play" ever occurred? -node-5 Not ← negation of node-6 -node-6 TlHasExistedWithin ← LEAF: was there a seek within the last 5 time units? -node-7 EqualTo "buffer" ← is the current player state "buffer"? -node-8 LatestEventToState ← LEAF: what is the latest playerStateChange value? +node-1 duration-where ← root: cumulative seconds where CIRR is true +node-2 and ← all 3 conditions combined +node-3 and ← has-existed ∧ ¬has-existed-within +node-4 tl-has-existed ← LEAF: has playerStateChange == "play" ever occurred? +node-5 not ← negation of node-6 +node-6 tl-has-existed-within ← LEAF: was there a seek within the last 5 time units? +node-7 equal-to("buffer") ← is the current player state "buffer"? +node-8 latest-event-to-state ← LEAF: what is the latest playerStateChange value? ``` Once the demo is running and the dashboard is open at http://localhost:3000: @@ -785,6 +793,249 @@ and never hit the network. | **Cold-start latency** (resuming a suspended agent) | Golem's resume time is ~10ms. For latency-sensitive paths, keep agents warm with periodic heartbeats. | | **Event ordering across leaves** | The push-based model processes events per-leaf independently. Two leaves in the same session may receive events at different wall-clock times. The `And`/`Or` nodes use `time + 1` lookups to see the latest state, which handles this correctly for monotonically increasing timestamps. | +## Deployment Strategy: One Component, Many Metrics + +### The core idea + +The `timeline-core` WASM component is deployed **once** to the Golem cluster. It contains all four +agent types (EventProcessor, TimelineProcessor, TimelineDriver, Aggregator) — these are general-purpose +building blocks, not metric-specific. Every metric expression (CIRR, Time-to-First-Play, engagement +scores, etc.) uses the same deployed component. There is no per-metric deployment. + +``` +┌────────────────────────────────────────────────────────────────┐ +│ Golem Cluster │ +│ │ +│ timeline-core (1 component, 1 deployment) │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ EventProcessor · TimelineProcessor · Aggregator · Driver│ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +│ Metric: CIRR Metric: Time-to-First-Play │ +│ ┌──────────────────────┐ ┌──────────────────────┐ │ +│ │ 8 agents per session │ │ 2 agents per session │ │ +│ │ sess-42, sess-99 ... │ │ sess-42, sess-99 ... │ │ +│ └──────────────────────┘ └──────────────────────┘ │ +└────────────────────────────────────────────────────────────────┘ +``` + +### Roles + +| Role | Can do | Cannot do | +|---|---|---| +| **Admin** | Deploy/update `timeline-core` to Golem. Manage the cluster, feeders, and Kafka infrastructure. | — | +| **Developer** | Write a metric expression in the DSL. Click "Deploy Metric" to register it. View sub-computation results, session lists, and aggregation data. | Deploy or modify the timeline-core component. | + +Admins deploy the platform. Developers use it. + +### What "Deploy Metric" does + +When a developer writes a metric and clicks deploy, the system does **not** deploy a new component. +Instead: + +1. **Registers the metric expression** — stores the DSL text and its compiled `TimelineOpGraph` + plus optional `AggregationConfig` (e.g., `group_by(cdn), count, sum, avg`). + +2. **Derives the leaf routing table** — the system walks the compiled graph to identify which + nodes are leaves and what each leaf needs from the event stream. For the CIRR metric: + ``` + node-4 tl-has-existed → needs events containing "playerStateChange" + node-6 tl-has-existed-within → needs events containing "playerStateChange" + node-8 latest-event-to-state → needs events containing "playerStateChange" + ``` + This leaf routing table tells the feeder: "for every event in a session, send it to these + leaf agents." + +3. **Feeder picks up the new metric** — the feeder (Kafka consumer) must learn about the new + metric's leaf routing table so it can start sending events to the right agents. For every + incoming event on a session: + - If the feeder hasn't seen this session+metric before, it first invokes + `timeline-driver("{session-id}")` → `initialize-timeline(graph, aggregation)` to spawn + and wire all agents for that session. + - It then sends the event to every leaf agent for that metric: + `event-processor("{session-id}-node-4").add-event(...)`, + `event-processor("{session-id}-node-6").add-event(...)`, + `event-processor("{session-id}-node-8").add-event(...)`. + - The push cascade takes care of the rest — each leaf pushes state changes upward + through the derived nodes to the root and aggregator. + +4. **Multiple metrics, same event stream** — the feeder routes each event to the leaf agents + of **every** registered metric. If CIRR has 3 leaves and Time-to-First-Play has 1 leaf, + each event triggers 4 leaf invocations (3 + 1). Adding a new metric adds more leaf + invocations per event but does not require redeploying the component. + +### Feeder deployment — open design question + +The timeline project has no knowledge of where feeders run. Feeders are external infrastructure — +standalone binaries (`kafka-consumer-feeder`) that consume from Kafka and invoke Golem agents. +The deployment workflow must somehow get the leaf routing table to the feeders. + +**What the feeder currently does:** + +The feeder takes a hardcoded list of leaf worker names (e.g., `sess-1-node-4, sess-1-node-6`) +and sends every consumed event to every listed leaf. It has no concept of metrics, graphs, or +templates — it's just a static event router. + +**The driver returns the leaf agent names:** + +`initialize_timeline` returns an `InitializeResult` containing the root worker name and +all leaf worker names. The feeder calls the driver on every event for a session and gets +back exactly which `event-processor` agents to send events to — no graph walking, no +tracking, no in-memory state. The driver is a durable Golem worker: the first call spawns +and wires all agents; subsequent calls are idempotent and return the same result. + +**The hard problem — how does a running feeder learn about a new metric?** + +| Approach | Trade-off | +|---|---| +| **Feeder polls a config endpoint** | Simple but introduces latency (if polling every 30s, the developer waits 30s). The feeder is a blackbox — no feedback on whether it picked up the metric. | +| **Push via a config Kafka topic** | The "Deploy Metric" action publishes the graph + leaf table to a dedicated Kafka topic (e.g., `timeline-metrics-config`). The feeder already knows how to consume Kafka — it consumes from both the events topic and the config topic. Instant pickup, and Kafka provides durability + replay. | +| **New feeder per metric** | Each deploy spins up a new feeder instance. Simple isolation but expensive — more processes, more Kafka consumers. | + +The **config Kafka topic** approach is likely the best fit: feeders already consume from Kafka, +adding a second topic is minimal work, delivery is instant, and Kafka's log provides an audit +trail of all registered metrics. The feeder consumes from `timeline-metrics-config` on startup +(replays all registered metrics) and watches for new entries. When a new metric arrives, the +feeder immediately starts routing events to its leaf agents — no polling delay, no blackbox. + +### End-to-end example: feeder processing events + +A concrete example of the CIRR metric with events arriving from Kafka: + +``` +Feeder startup: + 1. Consumes CIRR graph + aggregation config from timeline-metrics-config topic + 2. Ready to process events + +Event stream from Kafka (timeline-events topic): + event1: {session: "sess-42", time: 100, playerStateChange: "play", cdn: "akamai"} + event2: {session: "sess-42", time: 200, playerStateChange: "buffer", cdn: "akamai"} + event3: {session: "sess-99", time: 100, playerStateChange: "init", cdn: "fastly"} + +Processing event1 (sess-42): + 1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config) + → Driver spawns 8 agents, wires parent refs + → Returns InitializeResult: + root_worker: "sess-42-node-1" + leaf_workers: ["sess-42-node-4", "sess-42-node-6", "sess-42-node-8"] + 2. Sends event to each leaf from the result: + event-processor("sess-42-node-4").add-event(...) + event-processor("sess-42-node-6").add-event(...) + event-processor("sess-42-node-8").add-event(...) + +Processing event2 (sess-42 again): + 1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config) + → Driver is durable — agents already exist, idempotent, no re-wiring + → Returns same InitializeResult: + root_worker: "sess-42-node-1" + leaf_workers: ["sess-42-node-4", "sess-42-node-6", "sess-42-node-8"] + 2. Sends event to the 3 leaves + +Processing event3 (sess-99): + 1. Calls: timeline-driver("sess-99").initialize-timeline(...) + → New session — spawns 8 fresh agents + → Returns InitializeResult: + root_worker: "sess-99-node-1" + leaf_workers: ["sess-99-node-4", "sess-99-node-6", "sess-99-node-8"] + 2. Sends event to sess-99's leaves +``` + +**Key point**: the feeder calls `initialize_timeline` on every event. No in-memory state, +no tracking which sessions exist. The driver is a durable Golem worker — first call spawns +agents, subsequent calls are idempotent and return the same `InitializeResult`. The feeder +gets the leaf agent names directly from the return value. + +### Idempotency — `initialize_timeline` is safe to call repeatedly + +Golem workers are durable. Calling `initialize_timeline` on an already-initialized session +simply overwrites `operation`, `children`, and `parent` with the same values (same graph = +same configuration). The accumulated `StateDynamicsTimeLine` state is untouched — `initialize_leaf` +and `initialize_derived` don't clear it. So the feeder can safely call the driver on every +event without tracking which sessions have been initialized. + + + +### Session discovery + +There is no separate session registry. The feeder discovers session IDs from the event stream +(e.g., Kafka message keys). When an event arrives for a session the feeder hasn't seen before, +it initializes the agents for that session. Developers look up a specific session ID to inspect +its sub-computation results — there is no need to enumerate all sessions (which would be +impossible at scale). The developer either knows the session ID they want to debug, or +finds it from their own application logs / event stream. + +### Metric-level compute reuse + +If two developers register the **same metric expression** (identical DSL text and aggregation config), +they share the same computation. The system recognizes the duplicate and points both developers +at the same set of agents. No new agents are spawned — both developers see the same sub-computation +results, the same sessions, and the same aggregation data. + +This is metric-level reuse: identical expression = shared agents. Sub-expression reuse across +different metrics (e.g., CIRR and Time-to-First-Play sharing a `has_existed(play)` subtree) is +a [future design goal](#future-design-compute-reuse-across-workflows) documented below. + +### Agent naming + +The `TimelineDriver` names inner agents using pre-order depth-first numbering: +`{session-id}-node-{counter}`. Each node is a separate Golem agent with its own agent type. +The developer sees a **template** of all agents their metric will create per session, with +`{session-id}` as a placeholder. They pick a real session ID from the list of sessions the +platform has seen to inspect or query. + +For the CIRR metric, the full agent template is: + +| Golem agent ID | Worker name | Operation | Description | +|---|---|---|---| +| `timeline-driver("{session-id}")` | `{session-id}` | — | Orchestrator (runs once to spawn and wire all agents) | +| `timeline-processor("{session-id}-node-1")` | `{session-id}-node-1` | duration-where | Root: cumulative CIRR rebuffering seconds | +| `timeline-processor("{session-id}-node-2")` | `{session-id}-node-2` | and | All 3 conditions combined | +| `timeline-processor("{session-id}-node-3")` | `{session-id}-node-3` | and | has-existed ∧ ¬has-existed-within | +| `event-processor("{session-id}-node-4")` | `{session-id}-node-4` | tl-has-existed | Leaf: has play ever occurred? | +| `timeline-processor("{session-id}-node-5")` | `{session-id}-node-5` | not | ¬has-existed-within(seek, 5) | +| `event-processor("{session-id}-node-6")` | `{session-id}-node-6` | tl-has-existed-within | Leaf: was there a recent seek? | +| `timeline-processor("{session-id}-node-7")` | `{session-id}-node-7` | equal-to("buffer") | Is current state "buffer"? | +| `event-processor("{session-id}-node-8")` | `{session-id}-node-8` | latest-event-to-state | Leaf: latest playerStateChange | +| `aggregator("aggregator-cdn-{value}")` | `aggregator-cdn-{value}` | aggregator | Shared across sessions per CDN value | + +For session `sess-42` on CDN `akamai`, the actual agents are: +`timeline-driver("sess-42")`, `timeline-processor("sess-42-node-1")`, ..., +`event-processor("sess-42-node-4")`, ..., `aggregator("aggregator-cdn-akamai")`. + +The developer doesn't need to construct these names manually. The dashboard shows: +- The **agent template** for their metric (the full table above with business descriptions) +- **Sub-computation results** for any session ID the developer looks up (Node Inspector) +- **Aggregation results** across sessions (Live Dashboard) + +### What the developer experience looks like + +1. Developer writes a metric in the DSL editor: + ``` + duration_where( + has_existed(playerStateChange == "play") + && latest_event_to_state(playerStateChange) == "buffer" + ) | aggregate(group_by(cdn), count, sum, avg) + ``` + +2. Clicks **Deploy Metric**. The system registers it, names it (e.g., `buffering-duration`), + and shows the agent template: + ``` + node-1 DurationWhere → How long has the condition been true? + node-2 And → Are both sub-conditions true? + node-3 TlHasExisted (leaf) → Has the user ever pressed play? + node-4 EqualTo "buffer" → Is the player currently buffering? + node-5 LatestEventToState → What is the current player state? + ``` + +3. Events start flowing. The developer enters a session ID they want to debug + (e.g., `sess-42` from their application logs) and a query time. + +4. Node Inspector shows all sub-computation results for that session at that point in time. + Live Dashboard shows aggregation results across all sessions. + +5. Another developer registers the exact same DSL expression → they see the same metric, + same sessions, same results. No duplicate computation. + ## Future Design: Compute Reuse Across Workflows ### The opportunity diff --git a/components-rust/timeline-core/src/agents/timeline_driver.rs b/components-rust/timeline-core/src/agents/timeline_driver.rs index df68952..2eaf762 100644 --- a/components-rust/timeline-core/src/agents/timeline_driver.rs +++ b/components-rust/timeline-core/src/agents/timeline_driver.rs @@ -55,7 +55,7 @@ pub trait TimelineDriver { &self, timeline: TimelineOpGraph, aggregation: Option, - ) -> Result; + ) -> Result; } struct TimelineDriverImpl { @@ -72,7 +72,7 @@ impl TimelineDriver for TimelineDriverImpl { &self, timeline: TimelineOpGraph, aggregation: Option, - ) -> Result { + ) -> Result { let recursive_op = timeline.to_recursive(); let mut leaves = Vec::new(); let (result, _) = self @@ -98,10 +98,10 @@ impl TimelineDriver for TimelineDriverImpl { } } - Ok(format!( - "Timeline initialized. Result worker: {}", - result.worker_name - )) + Ok(InitializeResult { + root_worker: result.worker_name, + leaf_workers: leaves, + }) } } diff --git a/components-rust/timeline-core/src/types.rs b/components-rust/timeline-core/src/types.rs index dd5dbbc..bc66217 100644 --- a/components-rust/timeline-core/src/types.rs +++ b/components-rust/timeline-core/src/types.rs @@ -153,6 +153,16 @@ pub struct AggregationConfig { pub aggregations: Vec, } +/// Result of initializing a timeline via the TimelineDriver. +/// +/// Contains the root worker name and all leaf worker names so the feeder +/// knows exactly where to send events — no graph walking needed. +#[derive(Clone, Debug, Schema)] +pub struct InitializeResult { + pub root_worker: String, + pub leaf_workers: Vec, +} + /// Result of querying an aggregator. #[derive(Clone, Debug, Schema)] pub struct AggregationResult { diff --git a/services/dashboard/src/dashboard.html b/services/dashboard/src/dashboard.html index c6f3a93..adf9151 100644 --- a/services/dashboard/src/dashboard.html +++ b/services/dashboard/src/dashboard.html @@ -302,14 +302,14 @@

Watch aggregation results

What the node tree means

The CIRR expression compiles into 8 Golem agents per session:

- node-1 DurationWhere ← root (how long is CIRR true?) -node-2 And ← all 3 conditions combined -node-3 And ← has_existed ∧ ¬has_existed_within -node-4 TlHasExisted ← LEAF: playerStateChange == "play" ever? -node-5 Not ← negation of node-6 -node-6 TlHasExistedWithin ← LEAF: seek within last 5 time units? -node-7 EqualTo "buffer" ← current state == "buffer"? -node-8 LatestEventToState ← LEAF: latest playerStateChange value + node-1 duration-where ← root (how long is CIRR true?) +node-2 and ← all 3 conditions combined +node-3 and ← has-existed ∧ ¬has-existed-within +node-4 tl-has-existed ← LEAF: playerStateChange == "play" ever? +node-5 not ← negation of node-6 +node-6 tl-has-existed-within ← LEAF: seek within last 5 time units? +node-7 equal-to("buffer") ← current state == "buffer"? +node-8 latest-event-to-state ← LEAF: latest playerStateChange value

Leaf nodes ingest raw events. Derived nodes recompute from children. @@ -347,10 +347,10 @@

Cleanup

-
duration_where( ← node-1 (root) - has_existed(playerStateChange == "play") ← node-4 (leaf) - && !has_existed_within(playerStateChange == "seek", 5) ← !node-6 (leaf) via node-5 - && latest_event_to_state(playerStateChange) == "buffer" ← node-8 (leaf) via node-7 +
duration_where( ← {session-id}-node-1 (root) + has_existed(playerStateChange == "play") ← {session-id}-node-4 (leaf) + && !has_existed_within(playerStateChange == "seek", 5) ← {session-id}-node-5 ¬{session-id}-node-6 (leaf) + && latest_event_to_state(playerStateChange) == "buffer" ← {session-id}-node-7{session-id}-node-8 (leaf) ) | aggregate(group_by(cdn), count, sum, avg)
@@ -417,21 +417,21 @@

Cleanup

// ========== Node Inspector ========== // The CIRR tree structure (pre-order DFS numbering) const CIRR_TREE = [ - { node: 1, op: 'DurationWhere', type: 'derived', depth: 0, + { node: 1, op: 'duration-where', type: 'derived', agent: 'timeline-processor', depth: 0, desc: 'How many seconds has the user been experiencing connection-induced rebuffering?' }, - { node: 2, op: 'And', type: 'derived', depth: 1, + { node: 2, op: 'and', type: 'derived', agent: 'timeline-processor', depth: 1, desc: 'Are ALL three rebuffering conditions true at the same time?' }, - { node: 3, op: 'And', type: 'derived', depth: 2, + { node: 3, op: 'and', type: 'derived', agent: 'timeline-processor', depth: 2, desc: 'Did playback start AND was there no recent seek? (rules out seek-induced buffering)' }, - { node: 4, op: 'TlHasExisted', type: 'leaf', depth: 3, + { node: 4, op: 'tl-has-existed', type: 'leaf', agent: 'event-processor', depth: 3, desc: 'Has the user ever pressed play? (playerStateChange == "play" at any point in the past)' }, - { node: 5, op: 'Not', type: 'derived', depth: 3, + { node: 5, op: 'not', type: 'derived', agent: 'timeline-processor', depth: 3, desc: 'Was there NO recent seek? (true = no seek recently, so buffering is connection-induced)' }, - { node: 6, op: 'TlHasExistedWithin', type: 'leaf', depth: 4, + { node: 6, op: 'tl-has-existed-within', type: 'leaf', agent: 'event-processor', depth: 4, desc: 'Did the user seek in the last 5 time units? (if yes, buffering is seek-induced, not connection-induced)' }, - { node: 7, op: 'EqualTo "buffer"', type: 'derived', depth: 2, + { node: 7, op: 'equal-to("buffer")', type: 'derived', agent: 'timeline-processor', depth: 2, desc: 'Is the player currently buffering? (latest state == "buffer")' }, - { node: 8, op: 'LatestEventToState', type: 'leaf', depth: 3, + { node: 8, op: 'latest-event-to-state', type: 'leaf', agent: 'event-processor', depth: 3, desc: 'What is the player doing right now? (latest playerStateChange value: play, buffer, pause, etc.)' }, ]; @@ -510,14 +510,15 @@

Cleanup

const cardCls = r.type; const resultCls = r.status === 'ok' ? 'ok' : (r.status === 'err' ? 'err' : 'pending'); + const workerName = session + '-node-' + r.node; + const agentId = r.agent + '("' + workerName + '")'; html += '
' + '
' + indent + '
' + '
' + '
' + '' + r.type + '' + - 'node-' + r.node + '' + - '' + escHtml(r.op) + '' + + '' + escHtml(agentId) + '' + '→ ' + escHtml(r.display) + '' + '
' + '
' + escHtml(r.desc) + '
' +