Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions ISSUES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Issues

## Issue 1: Metric Registry — register, deduplicate, and configure feeder

**Problem**: There's no way to register a metric expression and have the system remember it.
Developers must manually construct WAVE strings, invoke the TimelineDriver by hand, and
manually configure the feeder with leaf worker names.

**What to build**:
- A `MetricRegistry` (in-memory for now) that stores registered metrics: name, DSL text,
compiled `TimelineOpGraph`, `AggregationConfig`, and the **leaf routing table** (which
leaf node indices exist and what event columns each leaf needs).
- Deduplication: if a developer registers a DSL expression identical to an existing metric,
return the existing metric instead of creating a new one.
- On registration, derive the leaf routing table by walking the compiled graph. This tells
the feeder which `event-processor` agents to send each event to per session.
- REST API in the dashboard:
- `POST /api/metrics` — register a new metric (accepts DSL text + aggregation config),
compiles it, derives the leaf routing table, and configures the feeder.
- `GET /api/metrics` — list all registered metrics with their agent templates.
- The DSL parser (`timeline-dsl`) already exists — use it to compile DSL text to `TimelineOpGraph`.
- The `GET /api/metrics` endpoint must expose the leaf routing table per metric, so that
feeders can poll it to discover new metrics and start routing events to the correct leaves.

**Scope**: Dashboard service (registry + API). Feeder changes are a separate concern —
the timeline project does not manage feeders. See "Feeder deployment — open design question"
in the README for the trade-offs between reusing existing feeders vs spinning up new ones.
No changes to timeline-core or Golem agents.

---

## Issue 2: Dashboard UI — DSL editor, deploy button, per-metric views

**Problem**: The current dashboard has hardcoded CIRR presets. Developers can't write
their own metrics or inspect them dynamically.

**What to build**:
- **Deploy Metric tab**: DSL text editor + optional aggregation config + "Deploy Metric" button.
On deploy, calls `POST /api/metrics`, shows the agent template (full list of Golem agent IDs
with `{session-id}` placeholder and business descriptions derived from the compiled graph).
- **Session lookup**: Developer enters a session ID they know (from their application logs or
event stream) and a query time → Node Inspector queries all agents for that session.
No enumeration of all sessions — that's impossible at scale.
- **Per-metric aggregation**: Aggregator worker names are derived from the metric's
`group_by_column` — show the correct preset buttons per metric.

**Depends on**: Issue 1.


267 changes: 259 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Watch the talk from Afsal at [LambdaConf:2024:Estes-Park:Colorado](https://www.y

![img_1.png](img_1.png)

### A quick demo

```sh
cargo make demo

# go to the dashboard at http://localhost:3000
```

## Timeline Query DSL

A text-based DSL for expressing temporal analytics over event streams. Write a query, deploy it,
Expand Down Expand Up @@ -92,14 +100,14 @@ This will:
The CIRR expression compiles into **8 Golem agents per session**. The **Node Inspector** tab in the dashboard lets you query every sub-computation's result and its progress at any point in time:

```
node-1 DurationWhere ← root: cumulative seconds where CIRR is true
node-2 And ← all 3 conditions combined
node-3 And ← has_existed ∧ ¬has_existed_within
node-4 TlHasExisted ← LEAF: has playerStateChange == "play" ever occurred?
node-5 Not ← negation of node-6
node-6 TlHasExistedWithin ← LEAF: was there a seek within the last 5 time units?
node-7 EqualTo "buffer" ← is the current player state "buffer"?
node-8 LatestEventToState ← LEAF: what is the latest playerStateChange value?
node-1 duration-where ← root: cumulative seconds where CIRR is true
node-2 and ← all 3 conditions combined
node-3 and ← has-existed ∧ ¬has-existed-within
node-4 tl-has-existed ← LEAF: has playerStateChange == "play" ever occurred?
node-5 not ← negation of node-6
node-6 tl-has-existed-within ← LEAF: was there a seek within the last 5 time units?
node-7 equal-to("buffer") ← is the current player state "buffer"?
node-8 latest-event-to-state ← LEAF: what is the latest playerStateChange value?
```

Once the demo is running and the dashboard is open at http://localhost:3000:
Expand Down Expand Up @@ -785,6 +793,249 @@ and never hit the network.
| **Cold-start latency** (resuming a suspended agent) | Golem's resume time is ~10ms. For latency-sensitive paths, keep agents warm with periodic heartbeats. |
| **Event ordering across leaves** | The push-based model processes events per-leaf independently. Two leaves in the same session may receive events at different wall-clock times. The `And`/`Or` nodes use `time + 1` lookups to see the latest state, which handles this correctly for monotonically increasing timestamps. |

## Deployment Strategy: One Component, Many Metrics

### The core idea

The `timeline-core` WASM component is deployed **once** to the Golem cluster. It contains all four
agent types (EventProcessor, TimelineProcessor, TimelineDriver, Aggregator) — these are general-purpose
building blocks, not metric-specific. Every metric expression (CIRR, Time-to-First-Play, engagement
scores, etc.) uses the same deployed component. There is no per-metric deployment.

```
┌────────────────────────────────────────────────────────────────┐
│ Golem Cluster │
│ │
│ timeline-core (1 component, 1 deployment) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ EventProcessor · TimelineProcessor · Aggregator · Driver│ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Metric: CIRR Metric: Time-to-First-Play │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ 8 agents per session │ │ 2 agents per session │ │
│ │ sess-42, sess-99 ... │ │ sess-42, sess-99 ... │ │
│ └──────────────────────┘ └──────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
```

### Roles

| Role | Can do | Cannot do |
|---|---|---|
| **Admin** | Deploy/update `timeline-core` to Golem. Manage the cluster, feeders, and Kafka infrastructure. | — |
| **Developer** | Write a metric expression in the DSL. Click "Deploy Metric" to register it. View sub-computation results, session lists, and aggregation data. | Deploy or modify the timeline-core component. |

Admins deploy the platform. Developers use it.

### What "Deploy Metric" does

When a developer writes a metric and clicks deploy, the system does **not** deploy a new component.
Instead:

1. **Registers the metric expression** — stores the DSL text and its compiled `TimelineOpGraph`
plus optional `AggregationConfig` (e.g., `group_by(cdn), count, sum, avg`).

2. **Derives the leaf routing table** — the system walks the compiled graph to identify which
nodes are leaves and what each leaf needs from the event stream. For the CIRR metric:
```
node-4 tl-has-existed → needs events containing "playerStateChange"
node-6 tl-has-existed-within → needs events containing "playerStateChange"
node-8 latest-event-to-state → needs events containing "playerStateChange"
```
This leaf routing table tells the feeder: "for every event in a session, send it to these
leaf agents."

3. **Feeder picks up the new metric** — the feeder (Kafka consumer) must learn about the new
metric's leaf routing table so it can start sending events to the right agents. For every
incoming event on a session:
- If the feeder hasn't seen this session+metric before, it first invokes
`timeline-driver("{session-id}")` → `initialize-timeline(graph, aggregation)` to spawn
and wire all agents for that session.
- It then sends the event to every leaf agent for that metric:
`event-processor("{session-id}-node-4").add-event(...)`,
`event-processor("{session-id}-node-6").add-event(...)`,
`event-processor("{session-id}-node-8").add-event(...)`.
- The push cascade takes care of the rest — each leaf pushes state changes upward
through the derived nodes to the root and aggregator.

4. **Multiple metrics, same event stream** — the feeder routes each event to the leaf agents
of **every** registered metric. If CIRR has 3 leaves and Time-to-First-Play has 1 leaf,
each event triggers 4 leaf invocations (3 + 1). Adding a new metric adds more leaf
invocations per event but does not require redeploying the component.

### Feeder deployment — open design question

The timeline project has no knowledge of where feeders run. Feeders are external infrastructure —
standalone binaries (`kafka-consumer-feeder`) that consume from Kafka and invoke Golem agents.
The deployment workflow must somehow get the leaf routing table to the feeders.

**What the feeder currently does:**

The feeder takes a hardcoded list of leaf worker names (e.g., `sess-1-node-4, sess-1-node-6`)
and sends every consumed event to every listed leaf. It has no concept of metrics, graphs, or
templates — it's just a static event router.

**The driver returns the leaf agent names:**

`initialize_timeline` returns an `InitializeResult` containing the root worker name and
all leaf worker names. The feeder calls the driver on every event for a session and gets
back exactly which `event-processor` agents to send events to — no graph walking, no
tracking, no in-memory state. The driver is a durable Golem worker: the first call spawns
and wires all agents; subsequent calls are idempotent and return the same result.

**The hard problem — how does a running feeder learn about a new metric?**

| Approach | Trade-off |
|---|---|
| **Feeder polls a config endpoint** | Simple but introduces latency (if polling every 30s, the developer waits 30s). The feeder is a blackbox — no feedback on whether it picked up the metric. |
| **Push via a config Kafka topic** | The "Deploy Metric" action publishes the graph + leaf table to a dedicated Kafka topic (e.g., `timeline-metrics-config`). The feeder already knows how to consume Kafka — it consumes from both the events topic and the config topic. Instant pickup, and Kafka provides durability + replay. |
| **New feeder per metric** | Each deploy spins up a new feeder instance. Simple isolation but expensive — more processes, more Kafka consumers. |

The **config Kafka topic** approach is likely the best fit: feeders already consume from Kafka,
adding a second topic is minimal work, delivery is instant, and Kafka's log provides an audit
trail of all registered metrics. The feeder consumes from `timeline-metrics-config` on startup
(replays all registered metrics) and watches for new entries. When a new metric arrives, the
feeder immediately starts routing events to its leaf agents — no polling delay, no blackbox.

### End-to-end example: feeder processing events

A concrete example of the CIRR metric with events arriving from Kafka:

```
Feeder startup:
1. Consumes CIRR graph + aggregation config from timeline-metrics-config topic
2. Ready to process events

Event stream from Kafka (timeline-events topic):
event1: {session: "sess-42", time: 100, playerStateChange: "play", cdn: "akamai"}
event2: {session: "sess-42", time: 200, playerStateChange: "buffer", cdn: "akamai"}
event3: {session: "sess-99", time: 100, playerStateChange: "init", cdn: "fastly"}

Processing event1 (sess-42):
1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config)
→ Driver spawns 8 agents, wires parent refs
→ Returns InitializeResult:
root_worker: "sess-42-node-1"
leaf_workers: ["sess-42-node-4", "sess-42-node-6", "sess-42-node-8"]
2. Sends event to each leaf from the result:
event-processor("sess-42-node-4").add-event(...)
event-processor("sess-42-node-6").add-event(...)
event-processor("sess-42-node-8").add-event(...)

Processing event2 (sess-42 again):
1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config)
→ Driver is durable — agents already exist, idempotent, no re-wiring
→ Returns same InitializeResult:
root_worker: "sess-42-node-1"
leaf_workers: ["sess-42-node-4", "sess-42-node-6", "sess-42-node-8"]
2. Sends event to the 3 leaves

Processing event3 (sess-99):
1. Calls: timeline-driver("sess-99").initialize-timeline(...)
→ New session — spawns 8 fresh agents
→ Returns InitializeResult:
root_worker: "sess-99-node-1"
leaf_workers: ["sess-99-node-4", "sess-99-node-6", "sess-99-node-8"]
2. Sends event to sess-99's leaves
```

**Key point**: the feeder calls `initialize_timeline` on every event. No in-memory state,
no tracking which sessions exist. The driver is a durable Golem worker — first call spawns
agents, subsequent calls are idempotent and return the same `InitializeResult`. The feeder
gets the leaf agent names directly from the return value.

### Idempotency — `initialize_timeline` is safe to call repeatedly

Golem workers are durable. Calling `initialize_timeline` on an already-initialized session
simply overwrites `operation`, `children`, and `parent` with the same values (same graph =
same configuration). The accumulated `StateDynamicsTimeLine` state is untouched — `initialize_leaf`
and `initialize_derived` don't clear it. So the feeder can safely call the driver on every
event without tracking which sessions have been initialized.



### Session discovery

There is no separate session registry. The feeder discovers session IDs from the event stream
(e.g., Kafka message keys). When an event arrives for a session the feeder hasn't seen before,
it initializes the agents for that session. Developers look up a specific session ID to inspect
its sub-computation results — there is no need to enumerate all sessions (which would be
impossible at scale). The developer either knows the session ID they want to debug, or
finds it from their own application logs / event stream.

### Metric-level compute reuse

If two developers register the **same metric expression** (identical DSL text and aggregation config),
they share the same computation. The system recognizes the duplicate and points both developers
at the same set of agents. No new agents are spawned — both developers see the same sub-computation
results, the same sessions, and the same aggregation data.

This is metric-level reuse: identical expression = shared agents. Sub-expression reuse across
different metrics (e.g., CIRR and Time-to-First-Play sharing a `has_existed(play)` subtree) is
a [future design goal](#future-design-compute-reuse-across-workflows) documented below.

### Agent naming

The `TimelineDriver` names inner agents using pre-order depth-first numbering:
`{session-id}-node-{counter}`. Each node is a separate Golem agent with its own agent type.
The developer sees a **template** of all agents their metric will create per session, with
`{session-id}` as a placeholder. They pick a real session ID from the list of sessions the
platform has seen to inspect or query.

For the CIRR metric, the full agent template is:

| Golem agent ID | Worker name | Operation | Description |
|---|---|---|---|
| `timeline-driver("{session-id}")` | `{session-id}` | — | Orchestrator (runs once to spawn and wire all agents) |
| `timeline-processor("{session-id}-node-1")` | `{session-id}-node-1` | duration-where | Root: cumulative CIRR rebuffering seconds |
| `timeline-processor("{session-id}-node-2")` | `{session-id}-node-2` | and | All 3 conditions combined |
| `timeline-processor("{session-id}-node-3")` | `{session-id}-node-3` | and | has-existed ∧ ¬has-existed-within |
| `event-processor("{session-id}-node-4")` | `{session-id}-node-4` | tl-has-existed | Leaf: has play ever occurred? |
| `timeline-processor("{session-id}-node-5")` | `{session-id}-node-5` | not | ¬has-existed-within(seek, 5) |
| `event-processor("{session-id}-node-6")` | `{session-id}-node-6` | tl-has-existed-within | Leaf: was there a recent seek? |
| `timeline-processor("{session-id}-node-7")` | `{session-id}-node-7` | equal-to("buffer") | Is current state "buffer"? |
| `event-processor("{session-id}-node-8")` | `{session-id}-node-8` | latest-event-to-state | Leaf: latest playerStateChange |
| `aggregator("aggregator-cdn-{value}")` | `aggregator-cdn-{value}` | aggregator | Shared across sessions per CDN value |

For session `sess-42` on CDN `akamai`, the actual agents are:
`timeline-driver("sess-42")`, `timeline-processor("sess-42-node-1")`, ...,
`event-processor("sess-42-node-4")`, ..., `aggregator("aggregator-cdn-akamai")`.

The developer doesn't need to construct these names manually. The dashboard shows:
- The **agent template** for their metric (the full table above with business descriptions)
- **Sub-computation results** for any session ID the developer looks up (Node Inspector)
- **Aggregation results** across sessions (Live Dashboard)

### What the developer experience looks like

1. Developer writes a metric in the DSL editor:
```
duration_where(
has_existed(playerStateChange == "play")
&& latest_event_to_state(playerStateChange) == "buffer"
) | aggregate(group_by(cdn), count, sum, avg)
```

2. Clicks **Deploy Metric**. The system registers it, names it (e.g., `buffering-duration`),
and shows the agent template:
```
node-1 DurationWhere → How long has the condition been true?
node-2 And → Are both sub-conditions true?
node-3 TlHasExisted (leaf) → Has the user ever pressed play?
node-4 EqualTo "buffer" → Is the player currently buffering?
node-5 LatestEventToState → What is the current player state?
```

3. Events start flowing. The developer enters a session ID they want to debug
(e.g., `sess-42` from their application logs) and a query time.

4. Node Inspector shows all sub-computation results for that session at that point in time.
Live Dashboard shows aggregation results across all sessions.

5. Another developer registers the exact same DSL expression → they see the same metric,
same sessions, same results. No duplicate computation.

## Future Design: Compute Reuse Across Workflows

### The opportunity
Expand Down
Loading