Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend/app/api/routes/session_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
# In-memory storage for active sessions
active_sessions: Dict[str, Dict] = {}

@router.websocket("/ws/video")
async def websocket_video_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time video processing - No auth required"""
await handle_websocket(websocket)

@router.post("/analyze")
async def analyze_video():
"""Endpoint for post-session analysis - No auth required"""
# Add your analysis logic here
return {"message": "Analysis complete"}

@router.post("/sessions/start")
async def start_session(current_user: User = Depends(auth_service.get_current_user)):
"""Start a new interview session"""
Expand Down
149 changes: 20 additions & 129 deletions backend/app/services/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,22 @@
from typing import Dict, List
import json
import asyncio
import sys
import os
from datetime import datetime

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from app.services.video_processor import VideoProcessor
from app.services.recording_storage import RecordingStorage
from app.services.post_processor import PostProcessor
from app.db.models.analysis_models import AnalysisStorage
from app.core.config import get_settings
import cv2
import numpy as np
import base64

settings = get_settings()
class InterviewSessionManager:
class VideoAnalysisManager:
def __init__(self):
self.active_sessions: Dict[str, WebSocket] = {}
self.video_processor = VideoProcessor()
self.recording_storage = RecordingStorage(
settings.MONGODB_URL,
settings.DATABASE_NAME
)
self.analysis_storage = AnalysisStorage(self.recording_storage.db)
self.post_processor = PostProcessor(
self.recording_storage,
self.analysis_storage
)

# Track recording IDs for each session
self.session_recordings: Dict[str, str] = {}

async def connect(self, websocket: WebSocket, session_id: str):
"""Initialize session and start recording"""
await websocket.accept()
self.active_sessions[session_id] = websocket

# Start new recording
recording_id = await self.recording_storage.start_recording(session_id)
self.session_recordings[session_id] = recording_id

# Send confirmation to client
await websocket.send_json({
"type": "session_started",
"session_id": session_id,
"recording_id": recording_id
})

async def disconnect(self, session_id: str):
"""Clean up session and trigger post-processing"""
if session_id in self.active_sessions:
# End recording if exists
if session_id in self.session_recordings:
recording_id = self.session_recordings[session_id]
try:
# End recording
await self.recording_storage.end_recording(recording_id)

# Start post-processing
analysis_id = await self.post_processor.process_recording(
recording_id,
session_id
)

# Send final analysis to client if still connected
websocket = self.active_sessions.get(session_id)
if websocket:
analysis = await self.analysis_storage.get_analysis(analysis_id)
if analysis:
await websocket.send_json({
"type": "analysis_complete",
"analysis_id": analysis_id,
"results": analysis.dict()
})

except Exception as e:
print(f"Error during session cleanup: {e}")

del self.session_recordings[session_id]

del self.active_sessions[session_id]

async def process_frame(self, frame_data: str, session_id: str):
"""Process and store video frame"""

async def process_frame(self, frame_data: str):
"""Process and analyze video frame"""
try:
# Store frame in recording
if session_id in self.session_recordings:
recording_id = self.session_recordings[session_id]
await self.recording_storage.store_chunk(
recording_id,
frame_data,
"video",
datetime.utcnow()
)

# Process frame for real-time feedback
encoded_data = frame_data.split(',')[1] if ',' in frame_data else frame_data
import base64
import numpy as np
import cv2

nparr = np.frombuffer(base64.b64decode(encoded_data), np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
Expand All @@ -121,57 +39,31 @@ async def process_frame(self, frame_data: str, session_id: str):
"attention_status": "error",
"sentiment": "neutral"
}

async def process_audio(self, audio_data: str, session_id: str):
"""Store audio chunk"""
try:
if session_id in self.session_recordings:
recording_id = self.session_recordings[session_id]
await self.recording_storage.store_chunk(
recording_id,
audio_data,
"audio",
datetime.utcnow()
)
return {"status": "success"}
except Exception as e:
print(f"Error processing audio: {e}")
return {"status": "error"}

# Create a global session manager
session_manager = InterviewSessionManager()
# Create a global manager
analysis_manager = VideoAnalysisManager()

async def handle_websocket(websocket: WebSocket, session_id: str):
"""Main WebSocket handler"""
await session_manager.connect(websocket, session_id)
async def handle_websocket(websocket: WebSocket):
"""Main WebSocket handler - No session management"""
await websocket.accept()

try:
while True:
# Receive and process messages
data = await websocket.receive_json()
response = {"session_id": session_id}
response = {}

if data["type"] == "video":
feedback = await session_manager.process_frame(
data["frame"],
session_id
)
response["type"] = "video_feedback"
response["feedback"] = feedback

elif data["type"] == "audio":
result = await session_manager.process_audio(
data["audio"],
session_id
)
response["type"] = "audio_received"
response["status"] = result["status"]
feedback = await analysis_manager.process_frame(data["frame"])
response = {
"type": "video_feedback",
"feedback": feedback
}

await websocket.send_json(response)

except WebSocketDisconnect:
await session_manager.disconnect(session_id)

print("Client disconnected")
except Exception as e:
print(f"WebSocket error: {e}")
try:
Expand All @@ -180,5 +72,4 @@ async def handle_websocket(websocket: WebSocket, session_id: str):
"message": str(e)
})
except:
pass
await session_manager.disconnect(session_id)
pass
12 changes: 6 additions & 6 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from app.api.routes import session_routes
from app.api.routes import auth_routes # Add this line
# Import routes with proper paths
from app.api.routes.session_routes import router as session_router
from app.api.routes.auth_routes import router as auth_router

app = FastAPI(
title="Intreview API",
Expand All @@ -19,20 +20,19 @@
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update this with your frontend URL in production
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Mount static files directory
static_dir = Path(__file__).parent / "app" / "static"
print(f"Static directory path: {static_dir}") # Add this line to debug
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")

# Include routers
app.include_router(auth_routes.router, prefix="/auth", tags=["authentication"]) # Add this line
app.include_router(session_routes.router, prefix="/api", tags=["sessions"])
app.include_router(session_router, prefix="/api", tags=["sessions"]) # This contains both auth and non-auth routes
app.include_router(auth_router, prefix="/auth", tags=["authentication"])

@app.get("/")
async def root():
Expand Down