diff --git a/lib/auth/events.test.ts b/lib/auth/events.test.ts index 720dd40a..d5edc4eb 100644 --- a/lib/auth/events.test.ts +++ b/lib/auth/events.test.ts @@ -309,9 +309,9 @@ describe('Authentication Events', () => { expect.anything(), ) - // Should not start services without access token + // Should not set auth token without access token, but should start sync (for self-hosted users) expect(mockGrpcClient.setAuthToken).not.toHaveBeenCalled() - expect(mockSyncService.start).not.toHaveBeenCalled() + expect(mockSyncService.start).toHaveBeenCalled() }) test('should only store access token when ID token is null', () => { @@ -341,12 +341,13 @@ describe('Authentication Events', () => { expect(mockSyncService.start).toHaveBeenCalled() }) - test('should not setup services when no access token provided', () => { + test('should start sync service even without access token (for self-hosted)', () => { handleLogin(testProfile, 'id-token', null) - // Services should not be started + // gRPC auth should not be set without access token expect(mockGrpcClient.setAuthToken).not.toHaveBeenCalled() - expect(mockSyncService.start).not.toHaveBeenCalled() + // But sync service should start (for self-hosted users) + expect(mockSyncService.start).toHaveBeenCalled() }) test('should setup services in correct order when access token present', () => { diff --git a/lib/auth/events.ts b/lib/auth/events.ts index d7edbfc0..ead88b0e 100644 --- a/lib/auth/events.ts +++ b/lib/auth/events.ts @@ -317,10 +317,9 @@ export const handleLogin = ( if (accessToken) { mainStore.set(STORE_KEYS.ACCESS_TOKEN, accessToken) grpcClient.setAuthToken(accessToken) - syncService.start() } - // For self-hosted users, we don't start sync service since they don't have tokens + syncService.start() } export const handleLogout = () => { diff --git a/lib/clients/grpcClient.ts b/lib/clients/grpcClient.ts index 93ea5ea0..af0be0db 100644 --- a/lib/clients/grpcClient.ts +++ b/lib/clients/grpcClient.ts @@ -474,16 +474,6 @@ class GrpcClient { } async getAdvancedSettings(): Promise { - // Check if user is self-hosted and skip server sync - const userId = getCurrentUserId() - const isSelfHosted = userId === 'self-hosted' - - if (isSelfHosted) { - console.log('Self-hosted user detected, using local advanced settings') - // Return null for self-hosted users since they don't sync with server - return null - } - return this.withRetry(async () => { const request = create(GetAdvancedSettingsRequestSchema, {}) return await this.client.getAdvancedSettings(request, { diff --git a/lib/main/main.ts b/lib/main/main.ts index 12bf95aa..dbb8656d 100644 --- a/lib/main/main.ts +++ b/lib/main/main.ts @@ -29,7 +29,6 @@ import { initializeMicrophoneSelection } from '../media/microphoneSetUp' import { validateStoredTokens, ensureValidTokens } from '../auth/events' import { Auth0Config, validateAuth0Config } from '../auth/config' import { createAppTray } from './tray' -import { itoSessionManager } from './itoSessionManager' import { initializeAutoUpdater } from './autoUpdaterWrapper' import { teardown } from './teardown' import { ITO_ENV } from './env' @@ -84,10 +83,11 @@ app.whenReady().then(async () => { | undefined if (accessToken) { grpcClient.setAuthToken(accessToken) - syncService.start() } } + syncService.start() + // Setup protocol handling for deep links setupProtocolHandling() diff --git a/lib/main/syncService.test.ts b/lib/main/syncService.test.ts index 9f667c16..fb5309e4 100644 --- a/lib/main/syncService.test.ts +++ b/lib/main/syncService.test.ts @@ -421,14 +421,13 @@ describe('SyncService Integration Tests', () => { await syncService.start() - // Should NOT push anything (no lastSyncedAt means first sync) + // Should check for local changes to push (notes and dictionary items) expect(mockNotesTable.findModifiedSince).toHaveBeenCalledWith(epoch) - expect(mockInteractionsTable.findModifiedSince).toHaveBeenCalledWith( - epoch, - ) expect(mockDictionaryTable.findModifiedSince).toHaveBeenCalledWith(epoch) + // Interactions are created by server, so we don't push them + expect(mockInteractionsTable.findModifiedSince).not.toHaveBeenCalled() - // Should still pull everything + // Should pull everything from server expect(mockGrpcClient.listNotesSince).toHaveBeenCalledWith(epoch) expect(mockGrpcClient.listInteractionsSince).toHaveBeenCalledWith(epoch) expect(mockGrpcClient.listDictionaryItemsSince).toHaveBeenCalledWith( @@ -441,18 +440,17 @@ describe('SyncService Integration Tests', () => { await syncService.start() - // Should push (checking for modifications since last sync) + // Should push local changes (notes and dictionary items) expect(mockNotesTable.findModifiedSince).toHaveBeenCalledWith( '2024-01-01T00:00:00.000Z', ) - expect(mockInteractionsTable.findModifiedSince).toHaveBeenCalledWith( - '2024-01-01T00:00:00.000Z', - ) expect(mockDictionaryTable.findModifiedSince).toHaveBeenCalledWith( '2024-01-01T00:00:00.000Z', ) + // Interactions are created by server, so we don't push them + expect(mockInteractionsTable.findModifiedSince).not.toHaveBeenCalled() - // Should also pull + // Should pull updates from server expect(mockGrpcClient.listNotesSince).toHaveBeenCalledWith( '2024-01-01T00:00:00.000Z', ) diff --git a/lib/main/syncService.ts b/lib/main/syncService.ts index 678e174c..b8cb4f05 100644 --- a/lib/main/syncService.ts +++ b/lib/main/syncService.ts @@ -9,8 +9,6 @@ import { Note, Interaction, DictionaryItem } from './sqlite/models' import mainStore from './store' import { STORE_KEYS } from '../constants/store-keys' import type { AdvancedSettings } from './store' -import { DEFAULT_ADVANCED_SETTINGS } from '../constants/generated-defaults.js' -import { main } from 'bun' import { mainWindow } from './app' const LAST_SYNCED_AT_KEY = 'lastSyncedAt' @@ -91,7 +89,6 @@ export class SyncService { // ================================================================= let processedChanges = 0 processedChanges += await this.pushNotes(lastSyncedAt) - processedChanges += await this.pushInteractions(lastSyncedAt) processedChanges += await this.pushDictionaryItems(lastSyncedAt) // ================================================================= @@ -138,27 +135,6 @@ export class SyncService { return modifiedNotes.length } - private async pushInteractions(lastSyncedAt: string): Promise { - const modifiedInteractions = - await InteractionsTable.findModifiedSince(lastSyncedAt) - if (modifiedInteractions.length > 0) { - for (const interaction of modifiedInteractions) { - try { - if (new Date(interaction.created_at) > new Date(lastSyncedAt)) { - await grpcClient.createInteraction(interaction) - } else if (interaction.deleted_at) { - await grpcClient.deleteInteraction(interaction) - } else { - await grpcClient.updateInteraction(interaction) - } - } catch (e) { - console.error(`Failed to push interaction ${interaction.id}:`, e) - } - } - } - return modifiedInteractions.length - } - private async pushDictionaryItems(lastSyncedAt: string): Promise { const modifiedItems = await DictionaryTable.findModifiedSince(lastSyncedAt) if (modifiedItems.length > 0) { @@ -240,7 +216,7 @@ export class SyncService { created_at: remoteInteraction.createdAt, updated_at: remoteInteraction.updatedAt, deleted_at: remoteInteraction.deletedAt || null, - raw_audio_id: remoteInteraction.rawAudioId, + raw_audio_id: remoteInteraction.rawAudioId || null, sample_rate: null, } await InteractionsTable.upsert(localInteraction) @@ -277,7 +253,6 @@ export class SyncService { // Get remote advanced settings const remoteSettings = await grpcClient.getAdvancedSettings() if (!remoteSettings) { - console.warn('No remote advanced settings found, skipping sync.') return } diff --git a/server/.env.example b/server/.env.example index cf69f956..6d52dc8b 100644 --- a/server/.env.example +++ b/server/.env.example @@ -21,7 +21,7 @@ CEREBRAS_API_KEY="your_cerebras_api_key" BLOB_STORAGE_BUCKET=ito-blob-storage TIMING_BUCKET=ito-timing-storage AWS_REGION=us-east-1 -S3_ENDPOINT=http://localhost:9000 +S3_ENDPOINT=http://minio:9000 S3_ACCESS_KEY_ID=minioadmin S3_SECRET_ACCESS_KEY=minioadmin S3_FORCE_PATH_STYLE=true diff --git a/server/docker-compose.yml b/server/docker-compose.yml index 30f8fe11..01d76f94 100644 --- a/server/docker-compose.yml +++ b/server/docker-compose.yml @@ -12,7 +12,10 @@ services: container_name: ito-server restart: always depends_on: - - db + db: + condition: service_started + createbuckets: + condition: service_completed_successfully environment: NODE_ENV: development @@ -44,9 +47,31 @@ services: - minio_data:/data healthcheck: test: ['CMD', 'mc', 'ready', 'local'] - interval: 30s - timeout: 20s - retries: 3 + interval: 5s + timeout: 5s + retries: 5 + + createbuckets: + image: minio/mc:latest + container_name: ito-minio-init + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set myminio http://minio:9000 $${S3_ACCESS_KEY_ID} $${S3_SECRET_ACCESS_KEY}; + /usr/bin/mc mb myminio/$${BLOB_STORAGE_BUCKET} --ignore-existing; + /usr/bin/mc mb myminio/$${TIMING_BUCKET} --ignore-existing; + /usr/bin/mc anonymous set download myminio/$${BLOB_STORAGE_BUCKET}; + /usr/bin/mc anonymous set download myminio/$${TIMING_BUCKET}; + echo 'Buckets created successfully'; + exit 0; + " + environment: + S3_ACCESS_KEY_ID: '$S3_ACCESS_KEY_ID' + S3_SECRET_ACCESS_KEY: '$S3_SECRET_ACCESS_KEY' + BLOB_STORAGE_BUCKET: '$BLOB_STORAGE_BUCKET' + TIMING_BUCKET: '$TIMING_BUCKET' volumes: pgdata: diff --git a/server/src/auth/authHelpers.ts b/server/src/auth/authHelpers.ts new file mode 100644 index 00000000..c7e189db --- /dev/null +++ b/server/src/auth/authHelpers.ts @@ -0,0 +1,30 @@ +import type { FastifyRequest } from 'fastify' + +/** + * Extracts the user ID from a Fastify request. + * In production (requireAuth=true), returns the authenticated user's sub from JWT. + * In dev mode (requireAuth=false), returns 'self-hosted' as a default user ID. + * + * @param request - The Fastify request object + * @param requireAuth - Whether authentication is required (from REQUIRE_AUTH env var) + * @returns The user ID, or undefined if not authenticated and auth is required + */ +export function getUserIdFromRequest( + request: FastifyRequest, + requireAuth: boolean, +): string | undefined { + // Try to get authenticated user ID from JWT + const authenticatedUserId = (request as any).user?.sub + + if (authenticatedUserId) { + return authenticatedUserId + } + + // In dev mode (auth disabled), use 'self-hosted' as default user + if (!requireAuth) { + return 'self-hosted' + } + + // Auth required but no user found + return undefined +} diff --git a/server/src/db/repo.ts b/server/src/db/repo.ts index c8bb1382..57c39f53 100644 --- a/server/src/db/repo.ts +++ b/server/src/db/repo.ts @@ -11,7 +11,6 @@ import { import { CreateNoteRequest, UpdateNoteRequest, - CreateInteractionRequest, UpdateInteractionRequest, CreateDictionaryItemRequest, UpdateDictionaryItemRequest, @@ -98,12 +97,15 @@ export class NotesRepository { } export class InteractionsRepository { - static async create( - interactionData: Omit & { - userId: string - rawAudioId?: string - }, - ): Promise { + static async create(interactionData: { + id: string + userId: string + title: string + asrOutput: string + llmOutput: string | null + rawAudioId?: string + durationMs: number + }): Promise { const res = await pool.query( `INSERT INTO interactions (id, user_id, title, asr_output, llm_output, raw_audio_id, duration_ms) VALUES ($1, $2, $3, $4, $5, $6, $7) @@ -337,7 +339,6 @@ export class AdvancedSettingsRepository { ) const llmSettings = res.rows[0] - console.log('Upserted advanced settings:', llmSettings) return { id: llmSettings.id, user_id: llmSettings.user_id, diff --git a/server/src/server.ts b/server/src/server.ts index c1b07df6..fbb9391d 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -170,6 +170,10 @@ export const startServer = async () => { if (REQUIRE_AUTH && request.user && request.user.sub) { return createContextValues().set(kUser, request.user) } + // In dev mode (auth disabled), use a self-hosted user + if (!REQUIRE_AUTH) { + return createContextValues().set(kUser, { sub: 'self-hosted' }) + } return createContextValues() }, }) diff --git a/server/src/services/billing.ts b/server/src/services/billing.ts index dc579283..a3f18e95 100644 --- a/server/src/services/billing.ts +++ b/server/src/services/billing.ts @@ -5,6 +5,7 @@ import { getAuth0ManagementToken, getUserInfoFromAuth0, } from '../auth/auth0Helpers.js' +import { getUserIdFromRequest } from '../auth/authHelpers.js' type Options = { requireAuth: boolean @@ -40,7 +41,7 @@ export const registerBillingRoutes = async ( fastify.post('/billing/checkout', async (request, reply) => { console.log('billing/checkout', request.body) try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return @@ -79,7 +80,7 @@ export const registerBillingRoutes = async ( fastify.post('/billing/confirm', async (request, reply) => { try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return @@ -175,7 +176,7 @@ export const registerBillingRoutes = async ( fastify.post('/billing/cancel', async (request, reply) => { try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return @@ -236,7 +237,7 @@ export const registerBillingRoutes = async ( fastify.get('/billing/status', async (request, reply) => { try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return @@ -323,7 +324,7 @@ export const registerBillingRoutes = async ( fastify.post('/billing/reactivate', async (request, reply) => { try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return diff --git a/server/src/services/ito/interactionHelpers.ts b/server/src/services/ito/interactionHelpers.ts new file mode 100644 index 00000000..12d9d799 --- /dev/null +++ b/server/src/services/ito/interactionHelpers.ts @@ -0,0 +1,70 @@ +import { v4 as uuidv4 } from 'uuid' +import { getStorageClient } from '../../clients/s3storageClient.js' +import { createAudioKey } from '../../constants/storage.js' +import { InteractionsRepository } from '../../db/repo.js' +import type { Interaction } from '../../db/models.js' + +export interface CreateInteractionParams { + id: string + userId: string + title: string + asrOutput: string + llmOutput: string | null + durationMs: number + rawAudio?: Buffer +} + +/** + * Creates an interaction in the database, optionally uploading audio to S3. + * This helper is shared between the gRPC createInteraction endpoint and + * the transcribeStreamV2Handler. + */ +export async function createInteractionWithAudio( + params: CreateInteractionParams, +): Promise { + const { id, userId, title, asrOutput, llmOutput, durationMs, rawAudio } = + params + + let rawAudioId: string | undefined + + // If raw audio is provided, upload to S3 + if (rawAudio && rawAudio.length > 0) { + try { + const storageClient = getStorageClient() + rawAudioId = uuidv4() + const audioKey = createAudioKey(userId, rawAudioId) + + await storageClient.uploadObject( + audioKey, + rawAudio, + undefined, // ContentType + { + userId, + interactionId: id, + timestamp: new Date().toISOString(), + }, + ) + + console.log( + `✅ [${new Date().toISOString()}] Uploaded audio to S3: ${audioKey}`, + ) + } catch (error) { + console.error('Failed to upload raw audio to S3:', error) + } + } + + // Create interaction in database + const interaction = await InteractionsRepository.create({ + id, + userId, + title, + asrOutput, + llmOutput, + rawAudioId, + durationMs, + }) + + console.log(`✅ [${new Date().toISOString()}] Created interaction: ${id}`) + + return interaction +} diff --git a/server/src/services/ito/itoService.ts b/server/src/services/ito/itoService.ts index 3034fab5..96d02a30 100644 --- a/server/src/services/ito/itoService.ts +++ b/server/src/services/ito/itoService.ts @@ -16,8 +16,8 @@ import { import { create } from '@bufbuild/protobuf' import type { HandlerContext } from '@connectrpc/connect' import { getStorageClient } from '../../clients/s3storageClient.js' -import { v4 as uuidv4 } from 'uuid' import { createAudioKey } from '../../constants/storage.js' +import { createInteractionWithAudio } from './interactionHelpers.js' import { DictionaryRepository, InteractionsRepository, @@ -194,51 +194,28 @@ export default (router: ConnectRouter) => { throw new ConnectError('User not authenticated', Code.Unauthenticated) } - let rawAudioId: string | undefined - - // If raw audio is provided, upload to S3 - if (request.rawAudio && request.rawAudio.length > 0) { - try { - const storageClient = getStorageClient() - rawAudioId = uuidv4() - const audioKey = createAudioKey(userId, rawAudioId) - - // Upload audio to S3 - await storageClient.uploadObject( - audioKey, - Buffer.from(request.rawAudio), - undefined, // ContentType - { - userId, - interactionId: request.id, - timestamp: new Date().toISOString(), - }, - ) - - // Create interaction with UUID reference instead of blob - const interactionRequest = { - ...request, - userId, - rawAudioId, - rawAudio: undefined, // Don't store the blob in DB - } - const newInteraction = - await InteractionsRepository.create(interactionRequest) - return dbToInteractionPb(newInteraction) - } catch (error) { - console.error('Failed to upload audio to S3:', error) + try { + // Convert raw audio from Uint8Array to Buffer if provided + const rawAudio = + request.rawAudio && request.rawAudio.length > 0 + ? Buffer.from(request.rawAudio) + : undefined + + // Use shared helper to create interaction and upload audio + const newInteraction = await createInteractionWithAudio({ + id: request.id, + userId, + title: request.title, + asrOutput: request.asrOutput, + llmOutput: request.llmOutput, + durationMs: request.durationMs, + rawAudio, + }) - throw new ConnectError( - 'Failed to store interaction audio', - Code.Internal, - ) - } - } else { - // No audio provided - const interactionRequest = { ...request, userId } - const newInteraction = - await InteractionsRepository.create(interactionRequest) return dbToInteractionPb(newInteraction) + } catch (error) { + console.error('Failed to create interaction:', error) + throw new ConnectError('Failed to store interaction', Code.Internal) } }, diff --git a/server/src/services/ito/transcribeStreamV2Handler.ts b/server/src/services/ito/transcribeStreamV2Handler.ts index 8888d038..988ac1c0 100644 --- a/server/src/services/ito/transcribeStreamV2Handler.ts +++ b/server/src/services/ito/transcribeStreamV2Handler.ts @@ -29,6 +29,8 @@ import { ServerTimingEventName, } from '../timing/ServerTimingCollector.js' import { kUser } from '../../auth/userContext.js' +import { createInteractionWithAudio } from './interactionHelpers.js' +import { v4 as uuidv4 } from 'uuid' export class TranscribeStreamV2Handler { private readonly MODE_CHANGE_GRACE_PERIOD_MS = 100 @@ -123,6 +125,9 @@ export class TranscribeStreamV2Handler { asrConfig.noSpeechThreshold, ) + // Store original ASR transcript before adjustment + const originalTranscript = transcript + // Time transcript adjustment (only happens in EDIT mode) // transcript = await serverTimingCollector.timeAsync( // ServerTimingEventName.LLM_ADJUSTMENT, @@ -144,6 +149,57 @@ export class TranscribeStreamV2Handler { const duration = Date.now() - startTime + // Create interaction in database if we have a user ID + if (!userId) { + console.error( + `❌ [${new Date().toISOString()}] Cannot create interaction: userId is missing. This should not happen - check server authentication configuration.`, + ) + } else { + try { + // Generate interaction ID if not provided by client + const finalInteractionId = interactionId || uuidv4() + + // Generate a meaningful title from the transcript + const displayTranscript = + mode === ItoMode.EDIT ? transcript : originalTranscript + const title = + displayTranscript && displayTranscript.length > 50 + ? displayTranscript.substring(0, 50) + '...' + : displayTranscript || 'Voice interaction' + + // Create ASR output object + const asrOutput = JSON.stringify({ + transcript: originalTranscript, + timestamp: new Date().toISOString(), + durationMs: duration, + }) + + // Create LLM output object (only if transcript was adjusted in EDIT mode) + const llmOutput = + mode === ItoMode.EDIT && transcript !== originalTranscript + ? JSON.stringify({ + adjustedTranscript: transcript, + mode: 'EDIT', + timestamp: new Date().toISOString(), + }) + : null + + // Use shared helper to create interaction and upload audio + await createInteractionWithAudio({ + id: finalInteractionId, + userId, + title, + asrOutput, + llmOutput, + durationMs: duration, + rawAudio: fullAudioWAV, + }) + } catch (error) { + console.error('Failed to create interaction:', error) + // Don't throw - we don't want to fail the transcription if interaction creation fails + } + } + // Finalize timing serverTimingCollector.endTiming( ServerTimingEventName.TOTAL_PROCESSING, diff --git a/server/src/services/logging.ts b/server/src/services/logging.ts index 0cbf51dc..4f596bf3 100644 --- a/server/src/services/logging.ts +++ b/server/src/services/logging.ts @@ -1,5 +1,6 @@ import type { FastifyInstance } from 'fastify' import { CloudWatchLogger } from './cloudWatchLogger.js' +import { getUserIdFromRequest } from '../auth/authHelpers.js' type LogEvent = { ts: number @@ -36,7 +37,7 @@ export const registerLoggingRoutes = async ( } const events = body.events - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) const now = Date.now() const entries = events.map(e => { diff --git a/server/src/services/trial.ts b/server/src/services/trial.ts index 842bfae2..2e046325 100644 --- a/server/src/services/trial.ts +++ b/server/src/services/trial.ts @@ -5,6 +5,7 @@ import { getAuth0ManagementToken, getUserInfoFromAuth0, } from '../auth/auth0Helpers.js' +import { getUserIdFromRequest } from '../auth/authHelpers.js' const TRIAL_DAYS = 14 const MS_PER_DAY = 24 * 60 * 60 * 1000 @@ -144,7 +145,7 @@ export const registerTrialRoutes = async ( fastify.post('/trial/start', async (request, reply) => { console.log('trial/start', request.body) try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return @@ -276,7 +277,7 @@ export const registerTrialRoutes = async ( fastify.post('/trial/complete', async (request, reply) => { try { - const userSub = (requireAuth && (request as any).user?.sub) || undefined + const userSub = getUserIdFromRequest(request, requireAuth) if (!userSub) { reply.code(401).send({ success: false, error: 'Unauthorized' }) return