Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions beacon_skill/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,70 @@ def heartbeat(
},
}


def batch_heartbeat(
self,
heartbeats: List[Dict[str, Any]],
default_token: Optional[str] = None,
) -> Dict[str, Any]:
"""Process a batched relay heartbeat.

Args:
heartbeats: List of dicts, each with 'agent_id', 'status', and optionally 'token', 'health'.
default_token: Token to use if a heartbeat doesn't provide one.
"""
results = {}
agents = self._load_agents()
now = int(time.time())
dirty = False

for hb in heartbeats:
agent_id = hb.get("agent_id")
token = hb.get("token") or default_token
status = hb.get("status", "alive")
health = hb.get("health")

if not agent_id:
continue
Comment on lines +355 to +356
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In batch_heartbeat, heartbeats missing agent_id are silently skipped (continue), so the caller gets no indication that an input item was invalid. Consider recording an explicit error result for invalid items (e.g., under a synthetic key like an index) or returning a top-level errors list so clients can reconcile inputs to outputs.

Copilot uses AI. Check for mistakes.

agent_data = agents.get(agent_id)
if not agent_data:
results[agent_id] = {"error": "Agent not registered", "code": "NOT_FOUND"}
continue

if agent_data.get("relay_token") != token:
results[agent_id] = {"error": "Invalid relay token", "code": "AUTH_FAILED"}
continue

if agent_data.get("token_expires", 0) < now:
results[agent_id] = {"error": "Token expired — re-register", "code": "TOKEN_EXPIRED"}
continue

# Update heartbeat state
agent_data["last_heartbeat"] = now
agent_data["beat_count"] = agent_data.get("beat_count", 0) + 1
agent_data["status"] = status

# Refresh token TTL
agent_data["token_expires"] = now + RELAY_TOKEN_TTL_S

if health:
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

health is guarded with if health:, which will skip saving valid-but-falsy payloads (e.g., {} or 0). If an empty dict is a meaningful “no issues” health report, this will be lost. Use a is not None check (and keep the setdefault("metadata", {})) so all provided health values are persisted.

Suggested change
if health:
if health is not None:

Copilot uses AI. Check for mistakes.
agent_data.setdefault("metadata", {})
agent_data["metadata"]["last_health"] = health

results[agent_id] = {
"ok": True,
"expires_in": RELAY_TOKEN_TTL_S,
Comment on lines +384 to +385
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The per-agent success payload differs from heartbeat() in a way that’s likely to complicate clients: heartbeat() returns token_expires (absolute) and assessment, while batch_heartbeat() returns only expires_in. Consider returning token_expires (and optionally assessment/status) per agent for consistency with the single-heartbeat API.

Suggested change
"ok": True,
"expires_in": RELAY_TOKEN_TTL_S,
"ok": True,
"expires_in": RELAY_TOKEN_TTL_S,
"token_expires": agent_data["token_expires"],

Copilot uses AI. Check for mistakes.
"agent_id": agent_id,
"beat_count": agent_data["beat_count"]
}
dirty = True

if dirty:
self._save_agents(agents)

Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike heartbeat(), batch_heartbeat() doesn’t emit any relay log entries. This reduces auditability/diagnostics, especially when batch is expected to become the common path at scale. Consider logging a single aggregated event (e.g., action batch_heartbeat with count and maybe ok/error totals) to preserve observability without per-agent log spam.

Suggested change
# Log a single aggregated event for observability without per-agent spam.
ok_count = 0
error_counts: Dict[str, int] = {}
for _agent_id, res in results.items():
if res.get("ok"):
ok_count += 1
else:
code = res.get("code", "UNKNOWN")
error_counts[code] = error_counts.get(code, 0) + 1
log_entry = {
"ts": now,
"action": "batch_heartbeat",
"total": len(heartbeats),
"ok": ok_count,
"errors": error_counts,
}
append_jsonl(_dir(RELAY_LOG_FILE), log_entry)

Copilot uses AI. Check for mistakes.
return {"batch_results": results}

# ── Discovery ──

def discover(self, provider: Optional[str] = None, capability: Optional[str] = None) -> List[Dict[str, Any]]:
Expand Down
22 changes: 22 additions & 0 deletions beacon_skill/transports/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,28 @@ def heartbeat_seo(
)
return resp.json()

def batch_heartbeat(
self,
heartbeats: List[Dict[str, Any]],
token: Optional[str] = None,
) -> Dict[str, Any]:
"""Send multiple heartbeats in a single request.

Args:
heartbeats: List of dicts, each with 'agent_id', 'status', and optionally 'health', 'token'.
token: Global auth token for the batch.
"""
body = {
"heartbeats": heartbeats
}
resp = requests.post(
self._url("/relay/heartbeat/batch"),
json=body,
headers=self._headers(token or ""),
timeout=self.timeout_s,
)
return resp.json()

def discover(
self,
provider: Optional[str] = None,
Expand Down
47 changes: 47 additions & 0 deletions tests/test_relay_batch_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest
import time
from beacon_skill.relay import RelayManager
from beacon_skill.transports.relay import RelayClient

def test_relay_batch_heartbeat(tmp_path):
mgr = RelayManager(data_dir=tmp_path)

now = int(time.time())
# Register a few agents directly into the manager
agents = {
"agent_1": {
"relay_token": "token1",
"token_expires": now + 86400,
"status": "dormant"
},
"agent_2": {
"relay_token": "token2",
"token_expires": now + 86400,
"status": "dormant"
}
}
mgr._save_agents(agents)

heartbeats = [
{"agent_id": "agent_1", "status": "active", "token": "token1"},
{"agent_id": "agent_2", "status": "idle", "token": "token2"},
{"agent_id": "agent_3", "status": "busy", "token": "wrong"} # not registered
]

# We patch RELAY_TOKEN_TTL_S to be accessible in globals if needed, or it's built-in

Comment on lines +1 to +32
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test imports pytest and RelayClient but doesn’t use them. If the project runs any linting, this will fail; even without linting it adds noise. Consider removing unused imports (and the stray comment about patching TTL) to keep the test focused.

Copilot uses AI. Check for mistakes.
result = mgr.batch_heartbeat(heartbeats)

assert "batch_results" in result
assert "agent_1" in result["batch_results"]
assert "agent_2" in result["batch_results"]
assert "agent_3" in result["batch_results"]

assert result["batch_results"]["agent_1"].get("ok") is True
assert result["batch_results"]["agent_2"].get("ok") is True
assert result["batch_results"]["agent_3"].get("code") == "NOT_FOUND"

updated = mgr._load_agents()
assert updated["agent_1"]["status"] == "active"
assert updated["agent_2"]["status"] == "idle"

Comment on lines +3 to +47
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test claims to cover TTL logic, but it hard-codes 86400 and sets token_expires to now + 86400, which can’t reliably prove that batch_heartbeat() refreshed the TTL. Consider importing RELAY_TOKEN_TTL_S, initializing token_expires to an old value (or near-expiry), and asserting token_expires is updated and beat_count increments for each successful heartbeat. Adding a case for a registered agent with a wrong token would also cover the AUTH_FAILED branch.

Suggested change
from beacon_skill.relay import RelayManager
from beacon_skill.transports.relay import RelayClient
def test_relay_batch_heartbeat(tmp_path):
mgr = RelayManager(data_dir=tmp_path)
now = int(time.time())
# Register a few agents directly into the manager
agents = {
"agent_1": {
"relay_token": "token1",
"token_expires": now + 86400,
"status": "dormant"
},
"agent_2": {
"relay_token": "token2",
"token_expires": now + 86400,
"status": "dormant"
}
}
mgr._save_agents(agents)
heartbeats = [
{"agent_id": "agent_1", "status": "active", "token": "token1"},
{"agent_id": "agent_2", "status": "idle", "token": "token2"},
{"agent_id": "agent_3", "status": "busy", "token": "wrong"} # not registered
]
# We patch RELAY_TOKEN_TTL_S to be accessible in globals if needed, or it's built-in
result = mgr.batch_heartbeat(heartbeats)
assert "batch_results" in result
assert "agent_1" in result["batch_results"]
assert "agent_2" in result["batch_results"]
assert "agent_3" in result["batch_results"]
assert result["batch_results"]["agent_1"].get("ok") is True
assert result["batch_results"]["agent_2"].get("ok") is True
assert result["batch_results"]["agent_3"].get("code") == "NOT_FOUND"
updated = mgr._load_agents()
assert updated["agent_1"]["status"] == "active"
assert updated["agent_2"]["status"] == "idle"
from beacon_skill.relay import RelayManager, RELAY_TOKEN_TTL_S
from beacon_skill.transports.relay import RelayClient
def test_relay_batch_heartbeat(tmp_path):
mgr = RelayManager(data_dir=tmp_path)
now = int(time.time())
# Register a few agents directly into the manager with expired tokens
expired_time = now - 10
agents = {
"agent_1": {
"relay_token": "token1",
"token_expires": expired_time,
"status": "dormant",
"beat_count": 0,
},
"agent_2": {
"relay_token": "token2",
"token_expires": expired_time,
"status": "dormant",
"beat_count": 0,
},
# Registered agent with wrong token in heartbeat to cover AUTH_FAILED
"agent_3": {
"relay_token": "token3",
"token_expires": expired_time,
"status": "dormant",
"beat_count": 0,
},
}
original_expires = {
agent_id: data["token_expires"] for agent_id, data in agents.items()
}
original_beats = {
agent_id: data["beat_count"] for agent_id, data in agents.items()
}
mgr._save_agents(agents)
heartbeats = [
{"agent_id": "agent_1", "status": "active", "token": "token1"},
{"agent_id": "agent_2", "status": "idle", "token": "token2"},
# Registered but wrong token -> AUTH_FAILED
{"agent_id": "agent_3", "status": "busy", "token": "wrong"},
# Not registered at all -> NOT_FOUND
{"agent_id": "agent_4", "status": "busy", "token": "wrong"},
]
result = mgr.batch_heartbeat(heartbeats)
assert "batch_results" in result
assert "agent_1" in result["batch_results"]
assert "agent_2" in result["batch_results"]
assert "agent_3" in result["batch_results"]
assert "agent_4" in result["batch_results"]
# Successful heartbeats
assert result["batch_results"]["agent_1"].get("ok") is True
assert result["batch_results"]["agent_2"].get("ok") is True
# Auth failure for registered agent with wrong token
assert result["batch_results"]["agent_3"].get("code") == "AUTH_FAILED"
# Not found for completely unknown agent
assert result["batch_results"]["agent_4"].get("code") == "NOT_FOUND"
updated = mgr._load_agents()
# Status updates for successful agents
assert updated["agent_1"]["status"] == "active"
assert updated["agent_2"]["status"] == "idle"
# TTL should be refreshed for successful agents
for agent_id in ("agent_1", "agent_2"):
assert updated[agent_id]["token_expires"] > original_expires[agent_id]
assert updated[agent_id]["token_expires"] >= now
# beat_count should increment on each successful heartbeat
assert updated[agent_id]["beat_count"] == original_beats[agent_id] + 1
# For AUTH_FAILED, TTL and beat_count should not change
assert updated["agent_3"]["token_expires"] == original_expires["agent_3"]
assert updated["agent_3"]["beat_count"] == original_beats["agent_3"]
assert updated["agent_3"]["status"] == agents["agent_3"]["status"]

Copilot uses AI. Check for mistakes.
Loading