diff --git a/packages/server/src/db/hermes/session-store.ts b/packages/server/src/db/hermes/session-store.ts index 7669f50a..36406ef5 100644 --- a/packages/server/src/db/hermes/session-store.ts +++ b/packages/server/src/db/hermes/session-store.ts @@ -129,14 +129,16 @@ function mapMessageRow(row: Record): HermesMessageRow { export function createSession(data: { id: string profile?: string + source?: string model?: string title?: string workspace?: string }): HermesSessionRow { const now = Math.floor(Date.now() / 1000) + const source = data.source || 'api_server' if (!isSqliteAvailable()) { return { - id: data.id, profile: data.profile || 'default', source: 'api_server', + id: data.id, profile: data.profile || 'default', source, user_id: null, model: data.model || '', title: data.title || null, started_at: now, ended_at: null, end_reason: null, message_count: 0, tool_call_count: 0, @@ -148,8 +150,8 @@ export function createSession(data: { const db = getDb()! db.prepare( `INSERT INTO ${SESSIONS_TABLE} (id, profile, source, model, title, started_at, last_active, workspace) - VALUES (?, ?, 'api_server', ?, ?, ?, ?, ?)`, - ).run(data.id, data.profile || 'default', data.model || '', data.title || null, now, now, data.workspace || null) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + ).run(data.id, data.profile || 'default', source, data.model || '', data.title || null, now, now, data.workspace || null) return getSession(data.id)! } diff --git a/packages/server/src/services/hermes/session-sync.ts b/packages/server/src/services/hermes/session-sync.ts index 17a4e2ba..0e5edbcf 100644 --- a/packages/server/src/services/hermes/session-sync.ts +++ b/packages/server/src/services/hermes/session-sync.ts @@ -1,39 +1,32 @@ /** - * Sync Hermes sessions from all profiles on startup. - * Reads api_server sessions from Hermes state.db and imports into local DB. - * Only runs when local DB is empty (first startup). + * Incrementally sync Hermes sessions from all profiles on startup. + * Reads durable Hermes state.db sessions and mirrors missing chat-visible + * sessions into WebUI's local session DB. * * Uses sessions-db.ts query logic to properly aggregate session chains. */ import { readdirSync, existsSync } from 'fs' -import { resolve, join } from 'path' -import { homedir } from 'os' -import { randomBytes } from 'crypto' -import { getProfileDir } from './hermes-profile' -import { createSession, addMessage, updateSession } from '../../db/hermes/session-store' +import { join } from 'path' +import { + createSession, + addMessages, + updateSession, + deleteSession, +} from '../../db/hermes/session-store' import { getDb } from '../../db/index' import { logger } from '../logger' -import { listSessionSummaries as listHermesSessionSummaries } from '../../db/hermes/sessions-db' +import { + listSessionSummaries as listHermesSessionSummaries, + getSessionDetailFromDbWithProfile, +} from '../../db/hermes/sessions-db' import { detectHermesHome } from './hermes-path' const HERMES_BASE = detectHermesHome() const PROFILES_DIR = join(HERMES_BASE, 'profiles') +const SYNC_SOURCES = ['api_server', 'webui'] as const -/** - * Generate a UUID v4 without external dependencies - */ -function generateUuid(): string { - const bytes = randomBytes(16) - bytes[6] = (bytes[6]! & 0x0f) | 0x40 // Version 4 - bytes[8] = (bytes[8]! & 0x3f) | 0x80 // Variant 10 - return [ - bytes.subarray(0, 4).toString('hex'), - bytes.subarray(4, 6).toString('hex'), - bytes.subarray(6, 8).toString('hex'), - bytes.subarray(8, 10).toString('hex'), - bytes.subarray(10, 16).toString('hex'), - ].join('-') -} +type SyncSource = typeof SYNC_SOURCES[number] +type HermesSessionSummary = Awaited>[number] /** * Get all available profile names including 'default' @@ -51,92 +44,145 @@ function getAllProfiles(): string[] { return profiles } +function localSessionExists(profile: string, hermesSession: HermesSessionSummary, source: SyncSource): boolean { + const db = getDb() + if (!db) return false + + const canonical = db.prepare('SELECT id FROM sessions WHERE id = ? LIMIT 1').get(hermesSession.id) + if (canonical) return true + + // Older startup imports used a random local UUID. Detect those rows by their + // stable summary fingerprint so the first incremental sync does not create a + // duplicate copy under the canonical Hermes session id. + const legacy = db.prepare(` + SELECT id FROM sessions + WHERE profile = ? + AND source = ? + AND started_at = ? + AND last_active = ? + AND COALESCE(title, '') = ? + AND COALESCE(preview, '') = ? + LIMIT 1 + `).get( + profile, + hermesSession.source || source, + hermesSession.started_at, + hermesSession.last_active, + hermesSession.title || '', + hermesSession.preview || '', + ) + + return Boolean(legacy) +} + /** - * Sync api_server sessions from a single profile. + * Sync chat-visible sessions from a single profile. * Uses sessions-db.ts query logic to properly aggregate session chains. */ async function syncProfileSessions(profile: string): Promise<{ synced: number + skipped: number errors: string[] }> { - const result = { synced: 0, errors: [] as string[] } - - try { - // Use listSessionSummaries to get aggregated session chains - // This returns only root sessions with aggregated stats from the entire chain - const summaries = await listHermesSessionSummaries('api_server', 10000, profile) - - logger.info(`[session-sync] profile '${profile}': found ${summaries.length} aggregated session chains`) - - for (const hermesSession of summaries) { - // Skip ephemeral sessions (created internally by chat-run-socket) - if (hermesSession.id.startsWith('eph_')) continue - try { - // Generate new session ID for local DB - const newSessionId = generateUuid() - - // Create session in local DB - createSession({ - id: newSessionId, - profile, - model: hermesSession.model, - title: hermesSession.title || undefined, - }) - - // Get full detail including all messages from the session chain - const { getSessionDetailFromDbWithProfile } = await import('../../db/hermes/sessions-db') - const detail = await getSessionDetailFromDbWithProfile(hermesSession.id, profile) - - if (!detail || !detail.messages) { - result.errors.push(`session ${hermesSession.id}: failed to load messages`) - logger.warn(`[session-sync] failed to load messages for session ${hermesSession.id}`) + const result = { synced: 0, skipped: 0, errors: [] as string[] } + + for (const source of SYNC_SOURCES) { + try { + // Use listSessionSummaries to get aggregated session chains. + // This returns only root sessions with aggregated stats from the entire chain. + const summaries = await listHermesSessionSummaries(source, 10000, profile) + + logger.info(`[session-sync] profile '${profile}' source '${source}': found ${summaries.length} aggregated session chains`) + + for (const hermesSession of summaries) { + // Skip ephemeral sessions created internally by chat-run-socket. + if (hermesSession.id.startsWith('eph_')) { + result.skipped++ continue } - // Insert all messages from the entire chain - for (const msg of detail.messages) { - addMessage({ - session_id: newSessionId, - role: msg.role, - content: msg.content, - tool_call_id: msg.tool_call_id, - tool_calls: msg.tool_calls, - tool_name: msg.tool_name, - timestamp: msg.timestamp, - token_count: msg.token_count, - finish_reason: msg.finish_reason, - reasoning: msg.reasoning, - reasoning_details: msg.reasoning_details, - reasoning_content: msg.reasoning_content, - }) + if (localSessionExists(profile, hermesSession, source)) { + result.skipped++ + continue } - // Update session with aggregated stats from Hermes - updateSession(newSessionId, { - started_at: hermesSession.started_at, - ended_at: hermesSession.ended_at, - end_reason: hermesSession.end_reason, - input_tokens: hermesSession.input_tokens, - output_tokens: hermesSession.output_tokens, - cache_read_tokens: hermesSession.cache_read_tokens, - cache_write_tokens: hermesSession.cache_write_tokens, - reasoning_tokens: hermesSession.reasoning_tokens, - estimated_cost_usd: hermesSession.estimated_cost_usd, - last_active: hermesSession.last_active, - preview: hermesSession.preview, - }) - - result.synced++ - logger.info(`[session-sync] synced Hermes session ${hermesSession.id} -> ${newSessionId} (${detail.messages.length} messages, thread_session_count=${detail.thread_session_count})`) - } catch (err: any) { - result.errors.push(`session ${hermesSession.id}: ${err.message}`) - logger.warn(err, `[session-sync] failed to sync session ${hermesSession.id}`) + try { + // Get full detail including all messages from the session chain before + // creating the local row. That avoids partial imports when detail load fails. + const detail = await getSessionDetailFromDbWithProfile(hermesSession.id, profile) + + if (!detail || !detail.messages) { + result.errors.push(`session ${hermesSession.id}: failed to load messages`) + logger.warn(`[session-sync] failed to load messages for session ${hermesSession.id}`) + continue + } + + let createdLocalSession = false + try { + createSession({ + id: hermesSession.id, + profile, + source: hermesSession.source || source, + model: hermesSession.model, + title: hermesSession.title || undefined, + }) + createdLocalSession = true + + addMessages(detail.messages.map(msg => ({ + session_id: hermesSession.id, + role: msg.role, + content: msg.content, + tool_call_id: msg.tool_call_id, + tool_calls: msg.tool_calls, + tool_name: msg.tool_name, + timestamp: msg.timestamp, + token_count: msg.token_count, + finish_reason: msg.finish_reason, + reasoning: msg.reasoning, + reasoning_details: msg.reasoning_details, + reasoning_content: msg.reasoning_content, + }))) + + // Update session with aggregated stats from Hermes. + updateSession(hermesSession.id, { + source: hermesSession.source || source, + user_id: hermesSession.user_id, + started_at: hermesSession.started_at, + ended_at: hermesSession.ended_at, + end_reason: hermesSession.end_reason, + message_count: hermesSession.message_count, + tool_call_count: hermesSession.tool_call_count, + input_tokens: hermesSession.input_tokens, + output_tokens: hermesSession.output_tokens, + cache_read_tokens: hermesSession.cache_read_tokens, + cache_write_tokens: hermesSession.cache_write_tokens, + reasoning_tokens: hermesSession.reasoning_tokens, + billing_provider: hermesSession.billing_provider, + estimated_cost_usd: hermesSession.estimated_cost_usd, + actual_cost_usd: hermesSession.actual_cost_usd, + cost_status: hermesSession.cost_status, + preview: hermesSession.preview, + last_active: hermesSession.last_active, + }) + } catch (err) { + if (createdLocalSession) { + deleteSession(hermesSession.id) + } + throw err + } + + result.synced++ + logger.info(`[session-sync] synced Hermes session ${hermesSession.id} (${detail.messages.length} messages, thread_session_count=${detail.thread_session_count})`) + } catch (err: any) { + result.errors.push(`session ${hermesSession.id}: ${err.message}`) + logger.warn(err, `[session-sync] failed to sync session ${hermesSession.id}`) + } + } + } catch (err: any) { + if (!err.message.includes('state.db not found')) { + result.errors.push(`${source}: ${err.message}`) + logger.warn(err, `[session-sync] failed to open state.db for profile '${profile}', source '${source}'`) } - } - } catch (err: any) { - if (!err.message.includes('state.db not found')) { - result.errors.push(err.message) - logger.warn(err, `[session-sync] failed to open state.db for profile '${profile}'`) } } @@ -144,36 +190,30 @@ async function syncProfileSessions(profile: string): Promise<{ } /** - * Main entry point: sync all profiles on startup - * Only runs if local DB is empty (first startup or after DB reset) + * Main entry point: sync all profiles on startup. + * Runs incrementally so sessions created outside the local WebUI DB are imported + * after every restart without duplicating sessions already mirrored locally. */ export async function syncAllHermesSessionsOnStartup(): Promise { - // Check if local DB has any sessions - only sync if completely empty const db = getDb() if (!db) { logger.info('[session-sync] SQLite not available, skipping Hermes sync') return } - const countResult = db.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } | undefined - const hasExistingSessions = countResult && countResult.count > 0 - - if (hasExistingSessions) { - logger.info('[session-sync] local DB has %d sessions, skipping Hermes sync', countResult!.count) - return - } - - logger.info('[session-sync] local DB is empty, starting Hermes session sync...') + logger.info('[session-sync] starting incremental Hermes session sync...') const profiles = getAllProfiles() logger.info(`[session-sync] found ${profiles.length} profiles: ${profiles.join(', ')}`) let totalSynced = 0 + let totalSkipped = 0 let totalErrors = 0 for (const profile of profiles) { const result = await syncProfileSessions(profile) totalSynced += result.synced + totalSkipped += result.skipped totalErrors += result.errors.length if (result.errors.length > 0) { @@ -187,5 +227,5 @@ export async function syncAllHermesSessionsOnStartup(): Promise { } } - logger.info(`[session-sync] sync complete: synced=${totalSynced}, errors=${totalErrors}`) + logger.info(`[session-sync] sync complete: synced=${totalSynced}, skipped=${totalSkipped}, errors=${totalErrors}`) } diff --git a/tests/server/session-sync.test.ts b/tests/server/session-sync.test.ts index 780f091f..fc458372 100644 --- a/tests/server/session-sync.test.ts +++ b/tests/server/session-sync.test.ts @@ -4,7 +4,10 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' import { getDb } from '../../packages/server/src/db/index' import { initAllStores } from '../../packages/server/src/db/hermes/init' -import { listSessionSummaries } from '../../packages/server/src/db/hermes/sessions-db' +import { + listSessionSummaries, + getSessionDetailFromDbWithProfile, +} from '../../packages/server/src/db/hermes/sessions-db' import { syncAllHermesSessionsOnStartup } from '../../packages/server/src/services/hermes/session-sync' vi.mock('../../packages/server/src/db/hermes/sessions-db', () => ({ @@ -12,6 +15,71 @@ vi.mock('../../packages/server/src/db/hermes/sessions-db', () => ({ getSessionDetailFromDbWithProfile: vi.fn(), })) +const apiServerSession = { + id: 'hermes-api-session-1', + source: 'api_server', + user_id: null, + model: 'gpt-4', + title: 'Imported API session', + started_at: 1000, + ended_at: 1010, + end_reason: 'completed', + message_count: 2, + tool_call_count: 0, + input_tokens: 11, + output_tokens: 7, + cache_read_tokens: 0, + cache_write_tokens: 0, + reasoning_tokens: 0, + billing_provider: null, + estimated_cost_usd: 0.01, + actual_cost_usd: null, + cost_status: '', + preview: 'api preview', + last_active: 1010, +} + +const webuiSession = { + ...apiServerSession, + id: 'hermes-webui-session-1', + source: 'webui', + title: 'Imported WebUI session', + started_at: 2000, + ended_at: 2010, + preview: 'webui preview', + last_active: 2010, +} + +const ephemeralSession = { + ...apiServerSession, + id: 'eph_internal-session', + source: 'webui', + title: 'Ephemeral session', + started_at: 3000, + ended_at: 3010, + preview: 'ephemeral preview', + last_active: 3010, +} + +const sessionDetails = new Map([ + [apiServerSession.id, { + ...apiServerSession, + thread_session_count: 1, + messages: [ + { id: 1, session_id: apiServerSession.id, role: 'user', content: 'api question', tool_call_id: null, tool_calls: null, tool_name: null, timestamp: 1001, token_count: 2, finish_reason: null, reasoning: null }, + { id: 2, session_id: apiServerSession.id, role: 'assistant', content: 'api answer', tool_call_id: null, tool_calls: null, tool_name: null, timestamp: 1002, token_count: 3, finish_reason: 'stop', reasoning: null }, + ], + }], + [webuiSession.id, { + ...webuiSession, + thread_session_count: 1, + messages: [ + { id: 1, session_id: webuiSession.id, role: 'user', content: 'webui question', tool_call_id: null, tool_calls: null, tool_name: null, timestamp: 2001, token_count: 2, finish_reason: null, reasoning: null }, + { id: 2, session_id: webuiSession.id, role: 'assistant', content: 'webui answer', tool_call_id: null, tool_calls: null, tool_name: null, timestamp: 2002, token_count: 3, finish_reason: 'stop', reasoning: null }, + ], + }], +]) + function resetSessionTables(): void { initAllStores() @@ -22,49 +90,134 @@ function resetSessionTables(): void { } } +function sessionCount(): number { + const db = getDb()! + const result = db.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } + return result.count +} + +function messageCount(sessionId: string): number { + const db = getDb()! + const result = db.prepare('SELECT COUNT(*) as count FROM messages WHERE session_id = ?').get(sessionId) as { count: number } + return result.count +} + +function sessionRow(sessionId: string): any { + return getDb()!.prepare('SELECT * FROM sessions WHERE id = ?').get(sessionId) +} + +function insertLocalSession(id: string, overrides: Record = {}): void { + const db = getDb()! + const row = { + profile: 'default', + source: 'api_server', + model: 'gpt-4', + title: 'Existing local session', + started_at: 900, + last_active: 910, + preview: 'existing preview', + ...overrides, + } + db.prepare(` + INSERT INTO sessions (id, profile, source, model, title, started_at, last_active, preview) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `).run(id, row.profile, row.source, row.model, row.title, row.started_at, row.last_active, row.preview) +} + +function mockHermesSessions(): void { + vi.mocked(listSessionSummaries).mockImplementation(async (source?: string, _limit?: number, profile?: string) => { + if (profile !== 'default') return [] + if (source === 'api_server') return [apiServerSession] + if (source === 'webui') return [webuiSession, ephemeralSession] + return [] + }) + vi.mocked(getSessionDetailFromDbWithProfile).mockImplementation(async (sessionId: string) => { + return sessionDetails.get(sessionId) as any + }) +} + describe('session-sync', () => { beforeEach(() => { vi.clearAllMocks() resetSessionTables() + vi.mocked(listSessionSummaries).mockResolvedValue([]) + vi.mocked(getSessionDetailFromDbWithProfile).mockResolvedValue(null as any) }) afterEach(() => { resetSessionTables() }) - it('should skip sync when local DB is not empty', async () => { + it('imports missing Hermes sessions even when the local DB already has sessions', async () => { const db = getDb() expect(db).not.toBeNull() + insertLocalSession('existing-local-session') + mockHermesSessions() + + await syncAllHermesSessionsOnStartup() - // Insert a test session - db!.prepare(` - INSERT INTO sessions (id, profile, source, model, title, started_at, last_active) - VALUES ('test-session-1', 'default', 'api_server', 'gpt-4', 'Test Session', ${Date.now()}, ${Date.now()}) - `).run() + expect(vi.mocked(listSessionSummaries)).toHaveBeenCalledWith('api_server', 10000, 'default') + expect(vi.mocked(listSessionSummaries)).toHaveBeenCalledWith('webui', 10000, 'default') + expect(sessionCount()).toBe(3) + expect(sessionRow(apiServerSession.id)).toMatchObject({ id: apiServerSession.id, source: 'api_server', title: apiServerSession.title }) + expect(sessionRow(webuiSession.id)).toMatchObject({ id: webuiSession.id, source: 'webui', title: webuiSession.title }) + expect(sessionRow(ephemeralSession.id)).toBeUndefined() + expect(messageCount(apiServerSession.id)).toBe(2) + expect(messageCount(webuiSession.id)).toBe(2) + }) - // Check that session exists - const countResult = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } - expect(countResult.count).toBe(1) + it('is idempotent when startup sync runs repeatedly', async () => { + mockHermesSessions() - // Run sync - should skip because DB is not empty await syncAllHermesSessionsOnStartup() - expect(vi.mocked(listSessionSummaries)).not.toHaveBeenCalled() + await syncAllHermesSessionsOnStartup() - // Verify session still exists (no changes) - const countAfter = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } - expect(countAfter.count).toBe(1) + expect(sessionCount()).toBe(2) + expect(messageCount(apiServerSession.id)).toBe(2) + expect(messageCount(webuiSession.id)).toBe(2) }) - it('should attempt sync when local DB is empty', async () => { - const db = getDb() - expect(db).not.toBeNull() + it('does not duplicate older random-ID imports with the same session fingerprint', async () => { + insertLocalSession('legacy-random-id', { + source: webuiSession.source, + title: webuiSession.title, + started_at: webuiSession.started_at, + last_active: webuiSession.last_active, + preview: webuiSession.preview, + }) + mockHermesSessions() - // Verify DB is empty - const countBefore = db!.prepare('SELECT COUNT(*) as count FROM sessions').get() as { count: number } - expect(countBefore.count).toBe(0) + await syncAllHermesSessionsOnStartup() - // Run sync - should attempt to sync from Hermes - await expect(syncAllHermesSessionsOnStartup()).resolves.toBeUndefined() - expect(vi.mocked(listSessionSummaries)).toHaveBeenCalledWith('api_server', 10000, 'default') + expect(sessionRow('legacy-random-id')).toBeTruthy() + expect(sessionRow(webuiSession.id)).toBeUndefined() + expect(sessionRow(apiServerSession.id)).toBeTruthy() + expect(sessionCount()).toBe(2) + }) + + it('removes partial imports so a later startup can retry the same canonical session id', async () => { + vi.mocked(listSessionSummaries).mockImplementation(async (source?: string, _limit?: number, profile?: string) => { + if (profile !== 'default') return [] + if (source === 'api_server') return [apiServerSession] + return [] + }) + vi.mocked(getSessionDetailFromDbWithProfile).mockResolvedValueOnce({ + ...apiServerSession, + thread_session_count: 1, + messages: [ + { id: 1, session_id: apiServerSession.id, role: 'user', content: null, tool_call_id: null, tool_calls: null, tool_name: null, timestamp: 1001, token_count: 2, finish_reason: null, reasoning: null }, + ], + } as any) + + await syncAllHermesSessionsOnStartup() + + expect(sessionRow(apiServerSession.id)).toBeUndefined() + + vi.mocked(getSessionDetailFromDbWithProfile).mockResolvedValue(sessionDetails.get(apiServerSession.id) as any) + + await syncAllHermesSessionsOnStartup() + + expect(sessionRow(apiServerSession.id)).toMatchObject({ id: apiServerSession.id, source: 'api_server' }) + expect(messageCount(apiServerSession.id)).toBe(2) }) })