Skip to content

Commit 560107a

Browse files
authored
Agent communication via redis (#141)
* support https in the agent * send all commands via redis
1 parent 1caa6c1 commit 560107a

File tree

6 files changed

+288
-407
lines changed

6 files changed

+288
-407
lines changed

backend/app/fastapi.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import json
33
import os
4-
from contextlib import asynccontextmanager, suppress
4+
from contextlib import asynccontextmanager
55
from httpx import AsyncClient, Limits
66
from fastapi.staticfiles import StaticFiles
77
from fastapi.responses import JSONResponse, HTMLResponse
@@ -13,8 +13,6 @@
1313
from fastapi.middleware.gzip import GZipMiddleware
1414
from app.middleware import GzipRequestMiddleware
1515
from app.ws.agent_ws import router as agent_ws_router
16-
from app.ws.agent_tasks import start_background_listener
17-
from app.redis import create_redis_connection
1816
from app.websockets import register_ws_routes, redis_listener
1917
from app.config import config
2018
from app.utils.sentry import initialize_sentry
@@ -24,14 +22,10 @@ async def lifespan(app: FastAPI):
2422
initialize_sentry()
2523
app.state.http_client = AsyncClient(limits=Limits(max_connections=20, max_keepalive_connections=10))
2624
app.state.http_semaphore = asyncio.Semaphore(10)
27-
app.state.redis = create_redis_connection()
2825
task = asyncio.create_task(redis_listener())
29-
start_background_listener(app)
3026
yield
3127
await app.state.http_client.aclose()
3228
task.cancel()
33-
with suppress(Exception):
34-
await app.state.redis.close()
3529

3630
app = FastAPI(lifespan=lifespan)
3731
app.add_middleware(GZipMiddleware)

backend/app/tasks/deploy_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ async def _create_local_build_archive(
194194
cmd = (
195195
f"cd {source_dir} && nimble setup && "
196196
f"{self.nim_path} compile --os:linux --cpu:{cpu} "
197-
f"--compileOnly --genScript --nimcache:{build_dir} "
197+
f"--compileOnly --genScript --nimcache:{build_dir} -d:ssl "
198198
f"{debug_opts} src/frameos_agent.nim 2>&1"
199199
)
200200

backend/app/utils/remote_exec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async def _exec_via_agent(
6868
"timeout": timeout,
6969
}
7070

71-
await redis.rpush("agent:cmd:queue", json.dumps(message).encode())
71+
await redis.rpush(f"agent:cmd:{frame.id}", json.dumps(message).encode())
7272

7373
resp_key = f"agent:resp:{cmd_id}"
7474
res = await redis.blpop(resp_key, timeout=timeout)
@@ -108,7 +108,7 @@ async def _file_write_via_agent(
108108
"blob": base64.b64encode(zipped).decode(),
109109
}
110110

111-
await redis.rpush("agent:cmd:queue", json.dumps(message).encode())
111+
await redis.rpush(f"agent:cmd:{frame.id}", json.dumps(message).encode())
112112

113113
resp_key = f"agent:resp:{cmd_id}"
114114
res = await redis.blpop(resp_key, timeout=timeout)

backend/app/ws/agent_bridge.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import annotations
2+
3+
import base64
4+
import json
5+
import uuid
6+
from arq import ArqRedis as Redis
7+
8+
CMD_KEY = "agent:cmd:{id}" # per-frame inbound queue
9+
RESP_KEY = "agent:resp:{id}" # per-command outbound queue
10+
STREAM_KEY = "agent:cmd:stream:{id}"
11+
12+
async def send_cmd(
13+
redis: Redis,
14+
frame_id: int,
15+
payload: dict,
16+
*,
17+
blob: bytes | None = None,
18+
timeout: int = 120,
19+
):
20+
cmd_id = str(uuid.uuid4())
21+
job = {
22+
"id": cmd_id,
23+
"frame_id": frame_id,
24+
"payload": payload,
25+
"timeout": timeout,
26+
}
27+
if blob is not None:
28+
job["blob"] = base64.b64encode(blob).decode()
29+
30+
await redis.rpush(CMD_KEY.format(id=frame_id), json.dumps(job).encode())
31+
32+
res = await redis.blpop(RESP_KEY.format(id=cmd_id), timeout=timeout)
33+
if res is None: # ⏰ timed out
34+
raise TimeoutError(f"agent timed-out after {timeout}s")
35+
36+
key, raw = res
37+
reply = json.loads(raw)
38+
if not reply.get("ok"):
39+
raise RuntimeError(reply.get("result", {}).get("error") or reply.get("error", "agent error"))
40+
41+
if reply.get("binary"):
42+
return base64.b64decode(reply["result"])
43+
res = reply.get("result")
44+
45+
# --- nested binary inside an http dict ------------------------------
46+
if isinstance(res, dict) and res.get("binary"):
47+
body = res.get("body")
48+
if isinstance(body, str): # base64 string → bytes
49+
res["body"] = base64.b64decode(body)
50+
return res

backend/app/ws/agent_tasks.py

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)