Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion packages/server/src/services/hermes/chat-run-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { getSessionDetailFromDb } from '../../db/hermes/sessions-db'
import { getModelContextLength } from './model-context'
import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/context-compressor'
import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot'
import { inFlightHermesSessionIds } from './session-deleter'
import { logger } from '../logger'

const compressor = new ChatContextCompressor()
Expand Down Expand Up @@ -1201,10 +1202,16 @@ export class ChatRunSocket {
* After sync, enqueues the ephemeral session for deletion.
*/
private syncFromHermes(socket: Socket, localSessionId: string, hermesSessionId: string, profile?: string) {
// Mark this Hermes session as in-flight so SessionDeleter.drain will defer
// any pending deletion until after we finish reading messages — see #352.
inFlightHermesSessionIds.add(hermesSessionId)
getSessionDetailFromDb(hermesSessionId)
.then((detail) => {
if (!detail || !detail.messages?.length) {
logger.warn('[chat-run-socket] syncFromHermes: no data for Hermes session %s', hermesSessionId)
// No data to sync — drop the in-flight guard so the empty Hermes
// session can still be cleaned up by drain on its own schedule.
inFlightHermesSessionIds.delete(hermesSessionId)
return
}

Expand Down Expand Up @@ -1307,10 +1314,18 @@ export class ChatRunSocket {
this.calcAndUpdateUsage(localSessionId, state, emit)
}

// Enqueue ephemeral session for deferred deletion
// Enqueue ephemeral session for deferred deletion.
// IMPORTANT: enqueue BEFORE clearing the in-flight guard so the drain
// loop never sees the row without the guard — see #352.
this.enqueueEphemeralDelete(hermesSessionId, profile)
inFlightHermesSessionIds.delete(hermesSessionId)
})
.catch((err: any) => {
// Release the guard so a stuck id doesn't block deletion forever.
// We deliberately do NOT enqueue on failure; if the run truly produced
// a Hermes session worth deleting, the next successful sync (or a
// restart-time scan) will pick it up.
inFlightHermesSessionIds.delete(hermesSessionId)
logger.warn(err, '[chat-run-socket] syncFromHermes failed for session %s (hermesId: %s, profile: %s)', localSessionId, hermesSessionId, profile || 'default')
})
}
Expand Down
41 changes: 40 additions & 1 deletion packages/server/src/services/hermes/session-deleter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
* Reads from gc_pending_session_deletes table, executes deletion via
* Hermes CLI, tracks failures (max 3 attempts), and auto-drains on
* a timer + profile switch.
*
* Race-condition guard: callers that are still ASYNCHRONOUSLY reading
* a Hermes session (e.g. chat-run-socket.syncFromHermes) must register
* the hermes session id in `inFlightHermesSessionIds` BEFORE the read
* starts and remove it AFTER the read completes (success or failure).
* The drain loop will skip any id present in that set, so the periodic
* timer / profile-switch tick cannot delete a session out from under
* an in-flight reader. See issue #352.
*/
import { getDb } from '../../db/index'
import { deleteSession as hermesDeleteSession } from './hermes-cli'
Expand All @@ -12,6 +20,18 @@ import { logger } from '../logger'
const MAX_ATTEMPTS = 3
const DRAIN_INTERVAL_MS = 300_000

/**
* Hermes session ids currently being read by an async consumer
* (e.g. syncFromHermes). The drain loop skips any id in this set so
* that periodic deletion cannot race with an in-flight read and leave
* the local DB missing assistant/tool messages. See issue #352.
*
* Producers must:
* inFlightHermesSessionIds.add(id)
* try { ...read... } finally { inFlightHermesSessionIds.delete(id) }
*/
export const inFlightHermesSessionIds = new Set<string>()

export class SessionDeleter {
private static _instance: SessionDeleter | null = null
private timer: ReturnType<typeof setInterval> | null = null
Expand Down Expand Up @@ -74,11 +94,30 @@ export class SessionDeleter {

if (rows.length === 0) return { deleted: [], skipped: [], failed: [] }

// Skip any session that an async reader (e.g. syncFromHermes) is still
// pulling messages out of. Without this guard, drain can delete a session
// before the reader finishes, dropping assistant/tool messages from the
// local mirror — see issue #352.
const eligibleRows = rows.filter(r => !inFlightHermesSessionIds.has(r.session_id))
if (eligibleRows.length === 0) {
logger.debug(
'[SessionDeleter] all %d candidate(s) deferred (in-flight reads in progress)',
rows.length,
)
return { deleted: [], skipped: rows.map(r => r.session_id), failed: [] }
}
if (eligibleRows.length < rows.length) {
logger.debug(
'[SessionDeleter] deferred %d candidate(s) due to in-flight reads',
rows.length - eligibleRows.length,
)
}

const deleted: string[] = []
const skipped: string[] = []
const failed: string[] = []

for (const row of rows) {
for (const row of eligibleRows) {
try {
const ok = await hermesDeleteSession(row.session_id)
if (ok) {
Expand Down
13 changes: 9 additions & 4 deletions packages/server/src/services/hermes/session-sync.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/**
* Sync Hermes sessions from all profiles on startup.
* Reads api_server sessions from Hermes state.db and imports into local DB.
* Reads sessions from ALL sources (cli, api_server, telegram, discord, etc.)
* out of Hermes state.db and imports into local DB.
* Only runs when local DB is empty (first startup).
*
* Uses sessions-db.ts query logic to properly aggregate session chains.
* Uses sessions-db.ts query logic to properly aggregate session chains
* (parent_session_id roots only, with descendants merged).
*/
import { readdirSync, existsSync } from 'fs'
import { resolve, join } from 'path'
Expand Down Expand Up @@ -62,8 +64,11 @@ async function syncProfileSessions(profile: string): Promise<{

try {
// Use listSessionSummaries to get aggregated session chains
// This returns only root sessions with aggregated stats from the entire chain
const summaries = await listHermesSessionSummaries('api_server', 10000, profile)
// This returns only root sessions with aggregated stats from the entire chain.
// Pass `undefined` for source to import sessions from ALL sources (cli, api_server,
// telegram, discord, etc.) — previously hardcoded 'api_server' silently dropped
// CLI conversations and any other channels on first-startup sync.
const summaries = await listHermesSessionSummaries(undefined, 10000, profile)

logger.info(`[session-sync] profile '${profile}': found ${summaries.length} aggregated session chains`)

Expand Down