Skip to content

Commit 6123a67

Browse files
mrveissclaude
andauthored
feat(agents): integrate HeartbeatPanel with live events (#1522)
* feat(agents): integrate HeartbeatPanel with live events (#1522) Emit heartbeat_run_started and heartbeat_run_completed live events on channel agent:{id} from HeartbeatScheduler, and subscribe HeartbeatPanel to those events so run history refreshes in real-time without polling. * fix(agents): watch agentId with immediate:true to subscribe on mount (#1522) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent a86db77 commit 6123a67

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

autobot-backend/services/heartbeat_scheduler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sqlalchemy import select
2222
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
2323

24+
from live_event_manager import publish_live_event
2425
from models.heartbeat import (
2526
AgentRuntimeState,
2627
AgentWakeupRequest,
@@ -190,6 +191,11 @@ async def _start_run(
190191
await _append_event(session, run_id, "run_started", "Heartbeat run started")
191192
await session.commit()
192193
logger.info("Heartbeat run %s started for agent %s", run_id, agent_id)
194+
await publish_live_event(
195+
f"agent:{agent_id}",
196+
"heartbeat_run_started",
197+
{"run_id": str(run_id), "agent_id": agent_id, "trigger": trigger.value},
198+
)
193199
return run_id, state_id, timeout
194200

195201
async def _invoke_agent(
@@ -250,6 +256,18 @@ async def _finalize_run(
250256
f"Run finished with status={final_status}",
251257
)
252258
await session.commit()
259+
await publish_live_event(
260+
f"agent:{agent_id}",
261+
"heartbeat_run_completed",
262+
{
263+
"run_id": str(run_id),
264+
"agent_id": agent_id,
265+
"status": final_status,
266+
"error_message": error_msg,
267+
"tokens_used": usage.get("tokens_used"),
268+
"cost_usd": usage.get("cost_usd"),
269+
},
270+
)
253271
logger.info(
254272
"Run %s finished: status=%s agent=%s", run_id, final_status, agent_id
255273
)

autobot-frontend/src/components/agents/HeartbeatPanel.vue

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,15 @@
150150
</template>
151151

152152
<script setup lang="ts">
153-
import { computed, ref } from 'vue'
153+
import { computed, onUnmounted, ref, watch } from 'vue'
154154
155155
import { getBackendUrl } from '@/config/ssot-config'
156156
import { createLogger } from '@/utils/debugUtils'
157+
import { useLiveEvents } from '@/composables/useLiveEvents'
158+
import type { LiveEvent } from '@/services/LiveEventService'
157159
158160
const logger = createLogger('HeartbeatPanel')
161+
const { subscribe, unsubscribe } = useLiveEvents()
159162
160163
interface HeartbeatConfig {
161164
agent_id: string
@@ -221,6 +224,47 @@ const editMaxDuration = ref(600)
221224
const expandedRunId = ref<string | null>(null)
222225
const expandedRun = computed(() => runs.value.find((r) => r.id === expandedRunId.value) ?? null)
223226
227+
// ── Live event subscription ───────────────────────────────────────────────────
228+
229+
let _liveUnsub: (() => void) | null = null
230+
231+
function _onLiveEvent(event: LiveEvent): void {
232+
if (event.event_type === 'heartbeat_run_started') {
233+
logger.debug('heartbeat_run_started received, refreshing data', { run_id: event.payload.run_id })
234+
void loadData()
235+
} else if (event.event_type === 'heartbeat_run_completed') {
236+
logger.debug('heartbeat_run_completed received, refreshing data', {
237+
run_id: event.payload.run_id,
238+
status: event.payload.status,
239+
})
240+
void loadData()
241+
}
242+
}
243+
244+
watch(agentId, (newId: string, oldId: string) => {
245+
if (oldId) {
246+
const oldChannel = `agent:${oldId}`
247+
if (_liveUnsub) {
248+
_liveUnsub()
249+
_liveUnsub = null
250+
}
251+
unsubscribe(oldChannel, _onLiveEvent)
252+
}
253+
if (newId) {
254+
_liveUnsub = subscribe(`agent:${newId}`, _onLiveEvent)
255+
logger.debug('Subscribed to live events for agent', { agentId: newId })
256+
}
257+
}, { immediate: true })
258+
259+
onUnmounted(() => {
260+
if (_liveUnsub) {
261+
_liveUnsub()
262+
_liveUnsub = null
263+
}
264+
})
265+
266+
// ─────────────────────────────────────────────────────────────────────────────
267+
224268
function apiBase(): string {
225269
return `${getBackendUrl()}/api/heartbeat`
226270
}
@@ -292,7 +336,9 @@ async function triggerManual(): Promise<void> {
292336
error.value = null
293337
try {
294338
await apiFetch(`/${agentId.value}/trigger`, { method: 'POST' })
295-
setTimeout(() => loadData(), 1500)
339+
// Refresh will happen automatically via heartbeat_run_started live event;
340+
// load immediately to reflect the queued status while we wait.
341+
void loadData()
296342
} catch (err) {
297343
error.value = String(err)
298344
} finally {

0 commit comments

Comments
 (0)