Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"services/event-generator",
"services/kafka-producer",
"services/kafka-consumer-feeder",
"services/dashboard",
"test/integration-harness",
]

Expand Down
169 changes: 169 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,172 @@ cargo test -p integration-harness -- --nocapture --ignored --test-threads=1

echo "==> All tests passed!"
'''

[tasks.demo]
clear = true
description = "Run a CIRR demo with aggregation across 3 CDNs and open the dashboard"
script_runner = "@shell"
script = '''
set -e
cleanup() {
echo ""
echo "Stopping dashboard..."
[ -n "$DASH_PID" ] && kill "$DASH_PID" 2>/dev/null || true
echo "Stopping Golem..."
pkill -f "golem-server" 2>/dev/null || true
pkill -f "golem server run" 2>/dev/null || true
echo "Stopping Kafka..."
docker compose -f test/integration-harness/docker/compose.yml down -v 2>/dev/null || true
echo "Cleanup done."
}
trap cleanup EXIT

echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ CIRR Demo — Connection Induced Rebuffering Ratio ║"
echo "║ ║"
echo "║ duration_where( ║"
echo "║ has_existed(playerStateChange == \"play\") ║"
echo "║ && !has_existed_within(playerStateChange == \"seek\", 5) ║"
echo "║ && latest_event_to_state(playerStateChange) == \"buffer\" ║"
echo "║ ) | aggregate(group_by(cdn), count, sum, avg) ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo ""

# --- Start infrastructure ---
echo "==> Starting Golem server..."
pkill -f "golem-server" 2>/dev/null || true
pkill -f "golem server run" 2>/dev/null || true
sleep 1
golem server run --clean > /dev/null 2>&1 &
for i in $(seq 1 60); do
nc -z 127.0.0.1 9881 2>/dev/null && break
sleep 1
done
nc -z 127.0.0.1 9881 2>/dev/null || { echo "ERROR: Golem server did not start"; exit 1; }
echo " Golem server ready."

echo "==> Building and deploying component..."
golem build
golem deploy --yes
echo " Component deployed."

echo "==> Starting Kafka..."
docker compose -f test/integration-harness/docker/compose.yml up -d
for i in $(seq 1 30); do
nc -z 127.0.0.1 9092 2>/dev/null && break
sleep 1
done
nc -z 127.0.0.1 9092 2>/dev/null || { echo "ERROR: Kafka did not start"; exit 1; }
echo " Kafka ready."

# --- CIRR timeline with aggregation ---
# Graph:
# 0: TlDurationWhere(1)
# 1: And(2, 6)
# 2: And(3, 4)
# 3: TlHasExisted(playerStateChange == "play")
# 4: Negation(5)
# 5: TlHasExistedWithin(playerStateChange == "seek", 5)
# 6: Comparison(EqualTo, 7, "buffer")
# 7: TlLatestEventToState("playerStateChange")
#
# Node numbering (pre-order DFS):
# node-1: DurationWhere (root, TimelineProcessor)
# node-2: And (TimelineProcessor)
# node-3: And (TimelineProcessor)
# node-4: TlHasExisted (EventProcessor LEAF)
# node-5: Not (TimelineProcessor)
# node-6: TlHasExistedWithin (EventProcessor LEAF)
# node-7: EqualTo (TimelineProcessor)
# node-8: LatestEventToState (EventProcessor LEAF)
#
# Leaves: node-4, node-6, node-8

TIMELINE='{nodes: [tl-duration-where(1), and((2, 6)), and((3, 4)), tl-has-existed({col-name: "playerStateChange", value: string-value("play"), op: equal}), negation(5), tl-has-existed-within(({col-name: "playerStateChange", value: string-value("seek"), op: equal}, 5)), comparison((equal-to, 7, string-value("buffer"))), tl-latest-event-to-state("playerStateChange")]}'
AGG='some({group-by-column: "cdn", aggregations: [count, sum, avg]})'

CDNS=("akamai" "cloudfront" "fastly")
SESSIONS_PER_CDN=3
TOTAL_SESSIONS=$((SESSIONS_PER_CDN * ${#CDNS[@]}))

echo ""
echo "==> Initializing $TOTAL_SESSIONS CIRR sessions across ${#CDNS[@]} CDNs..."

SESSION_IDS=()
SESSION_CDNS=()
for cdn in "${CDNS[@]}"; do
for j in $(seq 1 $SESSIONS_PER_CDN); do
sid="demo-${cdn}-${j}"
golem agent invoke --format json "timeline-driver(\"$sid\")" initialize-timeline "$TIMELINE" "$AGG" > /dev/null 2>&1
SESSION_IDS+=("$sid")
SESSION_CDNS+=("$cdn")
echo " Initialized session $sid (cdn=$cdn)"
done
done

echo ""
echo "==> Feeding events to all sessions..."

# For each session, feed a realistic sequence of events to all 3 leaves
for idx in "${!SESSION_IDS[@]}"; do
sid="${SESSION_IDS[$idx]}"
cdn="${SESSION_CDNS[$idx]}"
leaf4="${sid}-node-4" # has_existed(playerStateChange == "play")
leaf6="${sid}-node-6" # has_existed_within(playerStateChange == "seek", 5)
leaf8="${sid}-node-8" # latest_event_to_state(playerStateChange)

# Use different timing offsets per session to create variety
base=$((idx * 100 + 100))

# Event 1: playerStateChange = "init"
t=$((base))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"init\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 2: playerStateChange = "play"
t=$((base + 10))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"play\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 3: playerStateChange = "buffer" (triggers CIRR condition)
t=$((base + 30))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"buffer\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

# Event 4: playerStateChange = "play" (ends rebuffering)
t=$((base + 50))
ev="{time: ${t}, event: [(\"playerStateChange\", string-value(\"play\")), (\"cdn\", string-value(\"${cdn}\"))]}"
golem agent invoke --format json "event-processor(\"$leaf4\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf6\")" add-event "$ev" > /dev/null 2>&1
golem agent invoke --format json "event-processor(\"$leaf8\")" add-event "$ev" > /dev/null 2>&1

echo " Fed 4 events to $sid"
done

echo ""
echo "==> Querying aggregation results..."
for cdn in "${CDNS[@]}"; do
result=$(golem agent invoke --format json "aggregator(\"aggregator-cdn-${cdn}\")" get-aggregation-result 2>/dev/null || echo '{"error": "not found"}')
echo " aggregator-cdn-${cdn}: $result"
done

echo ""
echo "==> Starting dashboard at http://localhost:3000 ..."
cargo run -p dashboard &
DASH_PID=$!

echo ""
echo "════════════════════════════════════════════════════════════════"
echo " Dashboard running at http://localhost:3000"
echo " Click 'Live Dashboard' tab → select a CDN preset"
echo " Press Ctrl+C to stop everything"
echo "════════════════════════════════════════════════════════════════"

wait $DASH_PID
'''
Loading