Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Durable timeline analytics on [Golem](https://learn.golem.cloud) — a push-base
- The WASM component uses `golem-rust` SDK with `#[agent_definition]` / `#[agent_implementation]` macros — not raw WIT exports.
- Agent functions are invoked via `golem agent invoke` using WAVE format. Enum variants are kebab-case (e.g., `string-value("x")`), struct fields are kebab-case (e.g., `{col-name: "x"}`).
- `TimelineOpGraph` is the flat, non-recursive graph encoding sent over the wire (`nodes[0]` = root, children referenced by `NodeIndex`). `TimeLineOp` is the recursive internal representation.
- `setup_node` in the driver uses pre-order depth-first numbering: counter increments before recursion, producing worker names like `{session}-node-{counter}`.
- `setup_node` in the driver uses pre-order depth-first numbering: counter increments before recursion, producing agent names like `{session}-{operation}-{counter}` (e.g., `sess-42-has-existed-4`, `sess-42-duration-where-1`).
- The push cascade flows leaf → root via `on_child_state_changed`. Point-in-time queries (`get_leaf_result`, `get_derived_result`) are local lookups — no RPC cascade.

## Coding conventions
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ISSUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ their own metrics or inspect them dynamically.
On deploy, calls `POST /api/metrics`, shows the agent template (full list of Golem agent IDs
with `{session-id}` placeholder and business descriptions derived from the compiled graph).
- **Session lookup**: Developer enters a session ID they know (from their application logs or
event stream) and a query time → Node Inspector queries all agents for that session.
event stream) and a query time → Computation Progress queries all agents for that session.
No enumeration of all sessions — that's impossible at scale.
- **Per-metric aggregation**: Aggregator worker names are derived from the metric's
`group_by_column` — show the correct preset buttons per metric.
Expand Down
115 changes: 18 additions & 97 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,112 +114,33 @@ done
nc -z 127.0.0.1 9092 2>/dev/null || { echo "ERROR: Kafka did not start"; exit 1; }
echo " Kafka ready."

# --- CIRR timeline with aggregation ---
# Graph:
# 0: TlDurationWhere(1)
# 1: And(2, 6)
# 2: And(3, 4)
# 3: TlHasExisted(playerStateChange == "play")
# 4: Negation(5)
# 5: TlHasExistedWithin(playerStateChange == "seek", 5)
# 6: Comparison(EqualTo, 7, "buffer")
# 7: TlLatestEventToState("playerStateChange")
#
# Node numbering (pre-order DFS):
# node-1: DurationWhere (root, TimelineProcessor)
# node-2: And (TimelineProcessor)
# node-3: And (TimelineProcessor)
# node-4: TlHasExisted (EventProcessor LEAF)
# node-5: Not (TimelineProcessor)
# node-6: TlHasExistedWithin (EventProcessor LEAF)
# node-7: EqualTo (TimelineProcessor)
# node-8: LatestEventToState (EventProcessor LEAF)
#
# Leaves: node-4, node-6, node-8

TIMELINE='{nodes: [tl-duration-where(1), and((2, 6)), and((3, 4)), tl-has-existed({col-name: "playerStateChange", value: string-value("play"), op: equal}), negation(5), tl-has-existed-within(({col-name: "playerStateChange", value: string-value("seek"), op: equal}, 5)), comparison((equal-to, 7, string-value("buffer"))), tl-latest-event-to-state("playerStateChange")]}'
AGG='some({group-by-column: "cdn", aggregations: [count, sum, avg]})'

CDNS=("akamai" "cloudfront" "fastly")
SESSIONS_PER_CDN=3
TOTAL_SESSIONS=$((SESSIONS_PER_CDN * ${#CDNS[@]}))

echo ""
echo "==> Initializing $TOTAL_SESSIONS CIRR sessions across ${#CDNS[@]} CDNs..."

SESSION_IDS=()
SESSION_CDNS=()
for cdn in "${CDNS[@]}"; do
for j in $(seq 1 $SESSIONS_PER_CDN); do
sid="demo-${cdn}-${j}"
golem agent invoke --format json "timeline-driver(\"$sid\")" initialize-timeline "$TIMELINE" "$AGG" > /dev/null 2>&1
SESSION_IDS+=("$sid")
SESSION_CDNS+=("$cdn")
echo " Initialized session $sid (cdn=$cdn)"
done
done

# --- Generate events into Kafka ---
echo ""
echo "==> Feeding events to all sessions..."

# For each session, feed a realistic sequence of events to all 3 leaves
for idx in "${!SESSION_IDS[@]}"; do
sid="${SESSION_IDS[$idx]}"
cdn="${SESSION_CDNS[$idx]}"
leaf4="${sid}-node-4" # has_existed(playerStateChange == "play")
leaf6="${sid}-node-6" # has_existed_within(playerStateChange == "seek", 5)
leaf8="${sid}-node-8" # latest_event_to_state(playerStateChange)

# Use different timing offsets per session to create variety
base=$((idx * 100 + 100))

# Event 1: playerStateChange = "init"
t=$((base))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"init\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 2: playerStateChange = "play"
t=$((base + 10))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"play\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 3: playerStateChange = "buffer" (triggers CIRR condition)
t=$((base + 30))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"buffer\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 4: playerStateChange = "play" (ends rebuffering)
t=$((base + 50))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"play\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

echo " Fed 4 events to $sid"
done

echo ""
echo "==> Querying aggregation results..."
for cdn in "${CDNS[@]}"; do
result=$(golem agent invoke --format json "aggregator(\"aggregator-cdn-${cdn}\")" get-aggregation-result 2>/dev/null || echo '{"error": "not found"}')
echo " aggregator-cdn-${cdn}: $result"
done
echo "==> Generating events into Kafka..."
KAFKA_TOPIC=timeline-events SESSION_COUNT=9 EVENTS_PER_SESSION=50 cargo run -p kafka-producer 2>/dev/null
echo " Events published to Kafka topic 'timeline-events'."

# --- Start dashboard ---
echo ""
echo "==> Starting dashboard at http://localhost:3000 ..."
cargo run -p dashboard &
KAFKA_BROKER=localhost:9092 KAFKA_TOPIC=timeline-events cargo run -p dashboard &
DASH_PID=$!

# Wait for dashboard to start
sleep 3

echo ""
echo "════════════════════════════════════════════════════════════════"
echo " Dashboard running at http://localhost:3000"
echo " Click 'Live Dashboard' tab → select a CDN preset"
echo ""
echo " 1. Click the 'Deploy Metric' tab"
echo " 2. Enter a metric name (e.g., 'cirr') and the DSL expression"
echo " 3. Click 'Deploy Metric'"
echo " → The feeder starts consuming from Kafka and feeding events"
echo " to the Golem agents automatically"
echo " 4. Switch to 'Computation Progress' to see per-session results"
echo " 5. Switch to 'Live Dashboard' to see aggregation across CDNs"
echo ""
echo " Press Ctrl+C to stop everything"
echo "════════════════════════════════════════════════════════════════"

Expand Down
Loading