diff --git a/docs/patient_context.md b/docs/patient_context.md new file mode 100644 index 0000000..10ffaa3 --- /dev/null +++ b/docs/patient_context.md @@ -0,0 +1,272 @@ +# Patient Context Management + +The Healthcare Agent Orchestrator uses an ephemeral, registry‑backed model to maintain isolated conversational state per patient inside a single conversation. This document explains the current implementation, how patient IDs are detected and validated, and how the system persists, restores, and clears patient context safely. + +> [!IMPORTANT] +> `PATIENT_CONTEXT_JSON` system snapshot messages are ephemeral. They are injected each turn and never persisted. The registry is the single source of truth for the active patient and roster. + +## Core Objectives + +| Objective | Mechanism | +|-----------|-----------| +| Patient isolation | Separate per‑patient history files (`patient_{id}_context.json`) | +| Multi‑patient roster | Central registry file (`patient_context_registry.json`) | +| Ephemeral grounding | Fresh `PATIENT_CONTEXT_JSON` snapshot every turn (index 0) | +| Low‑noise storage | Snapshots stripped before persistence | +| Safe switching & activation | LLM analyzer + service validation + kernel reset on change | +| Complete clear/reset | Archives session, all patient histories, and registry in timestamped folder | + +## High‑Level Turn Flow + +1. Load session `ChatContext` (no patient file yet). +2. Check for clear command (archive + reset if present). +3. Run `PatientContextService.decide_and_apply()`: + - Hydrate registry into `chat_ctx.patient_contexts`. + - Attempt silent restore if no active patient. + - Invoke analyzer (unless short-message heuristic skip). + - Apply decision (activate / switch / clear / none). +4. If patient active: load that patient’s stored history into memory. +5. Strip any previous `PATIENT_CONTEXT_JSON` system snapshot(s). +6. Inject a new snapshot (ephemeral). +7. Append user message. +8. Run multi-agent orchestration (selection + termination). +9. (Teams) Append single guarded `PT_CTX` audit footer. +10. Persist updated history (patient-specific if active else session). +11. Registry already reflects new active pointer (if changed). + +> [!NOTE] +> Only one snapshot should exist in memory at any time. The system enforces this by stripping before reinjection. + +## Decision Engine (PatientContextAnalyzer) + +Structured LLM classifier producing `PatientContextDecision (action, patient_id?, reasoning)`. + +| Action | Meaning | +|--------|---------| +| `NONE` | General / meta turn (no context change) | +| `ACTIVATE_NEW` | Activate a new patient (ID extracted) | +| `SWITCH_EXISTING` | Switch to known patient | +| `UNCHANGED` | Keep current active patient | +| `CLEAR` | User intent to wipe contexts | +| (Service) `RESTORED_FROM_STORAGE` | Silent revival of last active from registry | +| (Service) `NEEDS_PATIENT_ID` | User intended change but no valid ID provided | + +Service may reinterpret `ACTIVATE_NEW` as `NEW_BLANK` (new record). + +### Patient ID Detection + +| Stage | Logic | +|-------|-------| +| Heuristic Skip | Short (≤ 15 chars) and no `patient|clear|switch` → bypass analyzer | +| LLM Extraction | Analyzer only returns `patient_id` for `ACTIVATE_NEW` / `SWITCH_EXISTING` | +| Regex Validation | Must match `^patient_[0-9]+$` (`PATIENT_ID_PATTERN`) | +| New vs Existing | In registry → switch; not in registry → new blank context | +| Invalid / Missing | Activation intent without valid pattern → `NEEDS_PATIENT_ID` | +| Silent Restore | Action `NONE` + no active + registry has prior active → restore | +| Isolation Reset | Patient change triggers `analyzer.reset_kernel()` | + +**Examples** + +| User Input | Analyzer Action | Service Decision | Notes | +|------------|-----------------|------------------|-------| +| `start review for patient_4` | `ACTIVATE_NEW` | `NEW_BLANK` | New patient | +| `switch to patient_4` | `SWITCH_EXISTING` | `SWITCH_EXISTING` | Already known | +| `patient_4` | `SWITCH_EXISTING` | `SWITCH_EXISTING` | Minimal intent | +| `switch patient please` | `ACTIVATE_NEW` | `NEEDS_PATIENT_ID` | Missing ID | +| `clear patient` | `CLEAR` | `CLEAR` | Full reset | +| `ok` | (Skipped) | `UNCHANGED` or restore | Too short for analysis | + +> [!TIP] +> To support additional formats (e.g., MRN), update `PATIENT_ID_PATTERN` and adjust the analyzer prompt description. + +### Customizing Patient ID Format + +The system validates patient IDs using a configurable regex pattern. + +**Default Pattern:** `^patient_[0-9]+$` (e.g., `patient_4`, `patient_123`) + +**To Use a Different Format:** + +Set the `PATIENT_ID_PATTERN` environment variable before starting the application: + +```bash +# Example: Accept MRN format +export PATIENT_ID_PATTERN="^mrn-[A-Z0-9]{6}$" + +# Example: Accept multiple formats (either patient_N or mrn-XXXXXX) +export PATIENT_ID_PATTERN="^(patient_[0-9]+|mrn-[A-Z0-9]{6})$" + +# Then start the app +python src/app.py +``` + +**Important:** When changing the pattern, ensure the analyzer prompt in `patient_context_analyzer.py` reflects the new format so the LLM extracts IDs correctly. + +## Registry (Source of Truth) + +`patient_context_registry.json` stores: +- `active_patient_id` +- `patient_registry` map of patient entries: + - `patient_id` + - `facts` (lightweight dict, extensible) + - `conversation_id` + - timestamps + +No system snapshots or timing entries are stored here. + +## Storage Layout + +``` +{conversation_id}/ +├── session_context.json +├── patient_{patient_id}_context.json +├── patient_context_registry.json +└── archive/ + └── {timestamp}/ + ├── {conversation_id}/ + │ ├── {timestamp}_session_archived.json + │ ├── {timestamp}_patient_patient_4_archived.json + │ └── {timestamp}_patient_patient_15_archived.json + └── {timestamp}_patient_context_registry_archived.json +``` + +> [!NOTE] +> Only dialogue and display/output arrays are persisted—never ephemeral snapshots. + +## Ephemeral Snapshot + +Format (in memory only, first message): + +```text +PATIENT_CONTEXT_JSON: {"conversation_id":"uuid","patient_id":"patient_16","all_patient_ids":["patient_4","patient_15","patient_16"],"generated_at":"2025-09-30T16:32:11.019Z"} +``` + +## Runtime Data Model + +```python +ChatContext: + conversation_id: str + patient_id: Optional[str] + patient_contexts: Dict[str, PatientContext] + chat_history: ChatHistory +``` + +Hydration each turn: + +```python +await patient_context_service._ensure_patient_contexts_from_registry(chat_ctx) +``` + +## Isolation & Transitions + +| Operation | Result | +|-----------|--------| +| New patient | Kernel reset + new context file | +| Switch patient | Kernel reset + load patient history | +| Clear | Archive all + wipe memory state | +| Restore | Silent reactivation from registry pointer | +| General turn | Session-only if no active patient | + +## Short-Message Heuristic + +Skip analyzer when: +- Length ≤ 15 +- No key substrings (`patient`, `clear`, `switch`) + +Outcomes: +- Active patient → `UNCHANGED` +- None → attempt restore → `RESTORED_FROM_STORAGE` or `NONE` + +## PatientContextService Responsibilities + +- Hydrate registry → memory each invocation. +- Attempt restoration if no active. +- Run analyzer (unless skipped). +- Apply decision + side effects: + - Activation / switch → registry update, optional kernel reset + - Clear → archive + wipe +- Return `(decision, TimingInfo)`. +- Never inject snapshot (caller handles ephemeral injection). + +Decision union: +``` +"NONE" | "UNCHANGED" | "NEW_BLANK" | "SWITCH_EXISTING" | +"CLEAR" | "RESTORED_FROM_STORAGE" | "NEEDS_PATIENT_ID" +``` + + +## Example Turn (Persisted vs In-Memory) + +In memory: + +``` +[System] PATIENT_CONTEXT_JSON: {...} +[User] Start review for patient_4 +[Assistant:Orchestrator] Plan... +``` + +Persisted (`patient_4_context.json`): + +```json +{ + "conversation_id": "c123", + "patient_id": "patient_4", + "chat_history": [ + {"role": "user", "content": "Start review for patient_4"}, + {"role": "assistant", "name": "Orchestrator", "content": "Plan..."} + ] +} +``` + +Snapshot intentionally absent. + +## Clear Operation + +Triggers on: +``` +clear | clear patient | clear context | clear patient context +``` + +Procedure: +1. Archive (session, each patient file, registry). +2. Reset in-memory context + histories. +3. Persist empty session context. +4. Respond with confirmation. + +## Roster & Meta Queries + +Agents derive: +- Active patient → `patient_id` +- Roster → `all_patient_ids` (sorted) + +Rules: +- No hallucinated IDs. +- Avoid redundant re-planning for same active patient mention. + +## Code Reference (Filtering & Injection) + +```python +# Strip prior snapshot(s) +chat_ctx.chat_history.messages = [ + m for m in chat_ctx.chat_history.messages + if not ( + m.role == AuthorRole.SYSTEM + and getattr(m, "items", None) + and m.items + and getattr(m.items[0], "text", "").startswith(PATIENT_CONTEXT_PREFIX) + ) +] + +snapshot = { + "conversation_id": chat_ctx.conversation_id, + "patient_id": chat_ctx.patient_id, + "all_patient_ids": sorted(chat_ctx.patient_contexts.keys()), + "generated_at": datetime.utcnow().isoformat() + "Z", +} + +line = f"{PATIENT_CONTEXT_PREFIX}: {json.dumps(snapshot, separators=(',', ':'))}" +sys_msg = ChatMessageContent(role=AuthorRole.SYSTEM, items=[TextContent(text=line)]) +chat_ctx.chat_history.messages.insert(0, sys_msg) +``` + + diff --git a/docs/patient_context_comprehensive.md b/docs/patient_context_comprehensive.md new file mode 100644 index 0000000..94a41d6 --- /dev/null +++ b/docs/patient_context_comprehensive.md @@ -0,0 +1,971 @@ +# Patient Context Management - Technical Guide + +This document explains how the Healthcare Agent Orchestrator now handles **multiple patients in a single conversation** using a registry-backed architecture with ephemeral snapshots. + +--- + +## Table of Contents + +- [What Changed and Why](#what-changed-and-why) +- [How It Works Now](#how-it-works-now) +- [New Components](#new-components) +- [Modified Components](#modified-components) +- [Step-by-Step Turn Flow](#step-by-step-turn-flow) +- [Configuration](#configuration) + +--- + +## What Changed and Why + +### The Problem + +**Before**, the system could only handle **one context per conversation**: + +``` +❌ All messages in one file (chat_context.json) +❌ No way to switch between patients +❌ Agents had no idea which patient they were discussing +❌ "Clear" just archived one file +``` + +**Example of the problem:** +``` +User: "Review patient_4's labs" +[Agent responds about patient_4] +User: "Now check patient_15's imaging" +[Agent gets confused - both patients' messages mixed together] +``` + +### The Solution + +**Now**, the system supports **multiple patients with isolated histories**: + +``` +✅ Each patient gets their own history file +✅ Registry tracks which patient is currently active +✅ Agents see a "snapshot" showing current patient context +✅ Switch between patients seamlessly +✅ Clear archives everything properly +``` + +**How it works now:** +``` +User: "Review patient_4's labs" +[System activates patient_4, creates patient_4_context.json] +[Agent sees snapshot: "You're working on patient_4"] + +User: "Now check patient_15's imaging" +[System switches to patient_15, creates patient_15_context.json] +[Agent sees new snapshot: "You're now working on patient_15"] +[patient_4's history is safely stored and separate] +``` + +### Quick Comparison + +| Feature | Before | After | +|---------|--------|-------| +| **Storage** | 1 file for everything | Separate file per patient + registry | +| **Patient Switching** | Not supported | Automatic detection and switching | +| **Agent Awareness** | No idea about patient context | Fresh snapshot each turn | +| **Clear Command** | Archives 1 file | Archives all patient files + registry | +| **Patient Detection** | Manual/hardcoded | LLM automatically detects intent | + +--- + +## How It Works Now + +### Architecture Overview + +``` +User Message + ↓ +┌───────────────────────────────────────┐ +│ 1. Load Registry │ ← "Which patient is active?" +│ (patient_context_registry.json) │ +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 2. Analyze User Intent │ ← LLM determines what user wants +│ (PatientContextAnalyzer) │ "Review patient_4" = NEW +│ │ "Switch to patient_15" = SWITCH +│ │ "What's the diagnosis?" = UNCHANGED +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 3. Apply Decision │ ← Update registry, load history +│ (PatientContextService) │ +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 4. Load Patient-Specific History │ ← Get isolated history +│ (patient_4_context.json) │ +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 5. Inject Fresh Snapshot │ ← Add system message +│ "PATIENT_CONTEXT_JSON: {...}" │ (agents see this) +│ (EPHEMERAL - never saved) │ +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 6. Group Chat Orchestration │ ← Agents process with context +│ (Agents see snapshot + history) │ +└─────────────┬─────────────────────────┘ + ↓ +┌───────────────────────────────────────┐ +│ 7. Save History │ ← Snapshot is filtered out +│ (only real messages saved) │ +└───────────────────────────────────────┘ +``` + +### Storage Structure + +``` +conversation_abc123/ +├── session_context.json ← Messages before any patient mentioned +├── patient_patient_4_context.json ← patient_4's isolated history +├── patient_patient_15_context.json ← patient_15's isolated history +└── patient_context_registry.json ← SOURCE OF TRUTH + { + "active_patient_id": "patient_4", + "patient_registry": { + "patient_4": { "created_at": "...", "updated_at": "..." }, + "patient_15": { "created_at": "...", "updated_at": "..." } + } + } +``` + +--- + +## New Components + +### 1. PatientContextAnalyzer + +**File:** `src/services/patient_context_analyzer.py` + +**What it does:** Uses an LLM to automatically detect what the user wants to do with patient context. + +**Before:** +```python +# No automatic detection - had to manually parse or hardcode +if "patient" in message: + # Do something... but what? +``` + +**After:** +```python +# LLM analyzes the message and returns structured decision +decision = await analyzer.analyze_patient_context( + user_text="review patient_4", + prior_patient_id=None, + known_patient_ids=[] +) +# Returns: PatientContextDecision( +# action="ACTIVATE_NEW", +# patient_id="patient_4", +# reasoning="User explicitly requests patient_4" +# ) +``` + +**Examples:** + +| User Input | Current Patient | Decision | Explanation | +|------------|----------------|----------|-------------| +| `"review patient_4"` | None | `ACTIVATE_NEW` | Start working on patient_4 | +| `"switch to patient_15"` | patient_4 | `SWITCH_EXISTING` | Change to patient_15 | +| `"what's the diagnosis?"` | patient_4 | `UNCHANGED` | Continue with patient_4 | +| `"clear patient"` | patient_4 | `CLEAR` | Reset everything | + +**Key Features:** +- ✅ Automatic detection (no manual parsing) +- ✅ Considers current state +- ✅ Structured output (reliable format) +- ✅ Efficiency: skips LLM for short messages like "ok" or "yes" + +--- + +### 2. PatientContextService + +**File:** `src/services/patient_context_service.py` + +**What it does:** Orchestrates the entire patient context lifecycle - deciding what to do and making it happen. + +**Before:** +```python +# Logic was scattered across multiple files +# No central place handling patient context +``` + +**After:** +```python +# One method handles everything +decision, timing = await service.decide_and_apply( + user_text="switch to patient_15", + chat_ctx=chat_context +) +# Service handles: +# - Loading registry +# - Calling analyzer +# - Validating decision +# - Updating registry +# - Resetting kernel (if switching) +``` + +**Decision Flow:** + +``` +User says: "switch to patient_15" + ↓ +1. Load registry → "patient_4 is active" + ↓ +2. Ask analyzer → "SWITCH_EXISTING to patient_15" + ↓ +3. Validate → "patient_15 matches pattern, exists in registry" + ↓ +4. Apply: + - Update registry: active = patient_15 + - Reset kernel (prevents cross-contamination) + - Return decision: "SWITCH_EXISTING" +``` + +**Service Decisions:** + +| Decision | Meaning | +|----------|---------| +| `NONE` | No patient context needed | +| `UNCHANGED` | Keep current patient active | +| `NEW_BLANK` | Activate a new patient (first time) | +| `SWITCH_EXISTING` | Switch to a known patient | +| `CLEAR` | Archive everything and reset | +| `RESTORED_FROM_STORAGE` | Silently reactivated from registry | +| `NEEDS_PATIENT_ID` | User intent unclear, need valid ID | + +--- + +### 3. PatientContextRegistry Accessor + +**File:** `src/data_models/patient_context_registry_accessor.py` + +**What it does:** Manages the **source of truth** file that tracks which patient is active and which patients exist. + +**Registry File Structure:** + +```json +{ + "active_patient_id": "patient_4", + "patient_registry": { + "patient_4": { + "patient_id": "patient_4", + "facts": {}, + "created_at": "2025-09-30T16:30:00.000Z", + "updated_at": "2025-09-30T16:45:00.000Z" + }, + "patient_15": { + "patient_id": "patient_15", + "facts": {}, + "created_at": "2025-09-30T16:32:00.000Z", + "updated_at": "2025-09-30T16:40:00.000Z" + } + } +} +``` + +**Why this exists:** +- ✅ Single source of truth for "which patient is active" +- ✅ Tracks all patients in the session (roster) +- ✅ Supports future features (facts, metadata) +- ✅ Clean archival during clear operations + +--- + +### 4. Data Models + +**File:** `src/data_models/patient_context_models.py` + +**What it does:** Type-safe models for all patient context operations. + +**Key Models:** + +```python +# Represents a patient's context +class PatientContext: + patient_id: str + facts: dict = {} + created_at: datetime + updated_at: datetime + +# LLM's structured decision +class PatientContextDecision: + action: str # "NONE" | "ACTIVATE_NEW" | "SWITCH_EXISTING" | ... + patient_id: Optional[str] + reasoning: str + +# Performance tracking +class TimingInfo: + analyzer_ms: float + service_total_ms: float +``` + +--- + +## Modified Components + +### 1. ChatContext (Data Model) + +**File:** `src/data_models/chat_context.py` + +**What changed:** Added fields to track active patient and multi-patient roster. + +**Before:** +```python +class ChatContext: + conversation_id: str + chat_history: ChatHistory + patient_id: str = None # ❌ Existed but never used +``` + +**After:** +```python +class ChatContext: + conversation_id: str + chat_history: ChatHistory + patient_id: str = None # ✅ NOW USED: Points to active patient + patient_contexts: Dict[str, PatientContext] = {} # ✅ NEW: Roster of all patients +``` + +**Example:** + +```python +# Turn 1: Mention patient_4 +chat_ctx.patient_id = "patient_4" +chat_ctx.patient_contexts = { + "patient_4": PatientContext(...) +} + +# Turn 5: Switch to patient_15 +chat_ctx.patient_id = "patient_15" +chat_ctx.patient_contexts = { + "patient_4": PatientContext(...), + "patient_15": PatientContext(...) +} +``` + +--- + +### 2. ChatContextAccessor (Storage Layer) + +**File:** `src/data_models/chat_context_accessor.py` + +**What changed:** +1. Routes to different files based on patient +2. Filters out ephemeral snapshots when saving + +#### Change 1: File Routing + +**Before:** +```python +def get_blob_path(self, conversation_id: str) -> str: + # ❌ Always the same file + return f"{conversation_id}/chat_context.json" +``` + +**After:** +```python +def get_blob_path(self, conversation_id: str, patient_id: str = None) -> str: + # ✅ Different file per patient + if patient_id: + return f"{conversation_id}/patient_{patient_id}_context.json" + return f"{conversation_id}/session_context.json" +``` + +**Result:** + +``` +BEFORE: +conversation_123/ + └── chat_context.json ← Everything mixed together + +AFTER: +conversation_123/ + ├── session_context.json ← Pre-patient messages + ├── patient_patient_4_context.json ← Isolated history + └── patient_patient_15_context.json← Isolated history +``` + +#### Change 2: Snapshot Filtering (CRITICAL) + +**Before:** +```python +def serialize(chat_ctx: ChatContext) -> str: + # ❌ Saves everything including snapshots + return json.dumps({ + "chat_history": chat_ctx.chat_history.serialize() + }) +``` + +**After:** +```python +def serialize(chat_ctx: ChatContext) -> str: + chat_messages = [] + + for msg in chat_ctx.chat_history.messages: + content = extract_content(msg) + + # ✅ CRITICAL: Filter out ephemeral snapshots + if msg.role == AuthorRole.SYSTEM and content.startswith("PATIENT_CONTEXT_JSON"): + continue # Don't save this - it's ephemeral + + chat_messages.append({...}) + + return json.dumps({"chat_history": chat_messages}) +``` + +**Why this is critical:** + +```python +# What agents see in memory: +[ + SYSTEM: "PATIENT_CONTEXT_JSON: {...}", ← Ephemeral snapshot + USER: "review patient_4", + ASSISTANT: "Here's the plan..." +] + +# What gets saved to disk: +[ + USER: "review patient_4", ← Snapshot filtered out! + ASSISTANT: "Here's the plan..." +] +``` + +**Benefits:** +- ✅ Snapshot is **never** persisted +- ✅ Registry is always the source of truth +- ✅ Fresh snapshot generated every turn +- ✅ No stale data + +--- + +### 3. Healthcare Agents + +**File:** `src/healthcare_agents/agent.py` + +**What changed:** Message structure to enable consistent filtering. + +**Before:** +```python +# ❌ Content was just a string +response_message = ChatMessageContent( + role=AuthorRole.ASSISTANT, + name=agent.name, + content=response_text +) +``` + +**After:** +```python +# ✅ Content is structured with items +response_message = ChatMessageContent( + role=AuthorRole.ASSISTANT, + name=agent.name, + items=[TextContent(text=response_text)] +) +``` + +**Why:** Accessor needs consistent structure to reliably filter snapshots. + +--- + +### 4. Group Chat Orchestration + +**File:** `src/group_chat.py` + +**What changed:** Added confirmation gate and termination overrides for stability. + +#### Change 1: Confirmation Gate + +**The Problem:** +``` +User: "review patient_4" +Orchestrator: "Plan: 1. PatientHistory, 2. Radiology..." +PatientHistory: [immediately starts executing] ❌ No user confirmation! +``` + +**The Solution:** +```python +# Added to selection prompt: +""" +CONFIRMATION GATE: If the most recent message is from Orchestrator +and contains a multi-step plan, WAIT for user confirmation. +Do not proceed to other agents yet. +""" +``` + +**Now it works:** +``` +User: "review patient_4" +Orchestrator: "Plan: 1. PatientHistory, 2. Radiology... Good?" +[🛑 GATE: Wait for user] +User: "yes" +PatientHistory: [now executes] ✅ +``` + +#### Change 2: Termination Overrides + +**The Problem:** +``` +Orchestrator: "PATIENT_CONTEXT_JSON: {...}" +LLM: "This looks like a conclusion" ❌ False termination! +``` + +**The Solution:** +```python +def evaluate_termination(result): + last_text = extract_last_message_text(chat_ctx) + + # ✅ Override 1: Ignore snapshots + if last_text.lower().startswith("patient_context_json"): + return False # Don't terminate + + # ✅ Override 2: Ignore handoffs + if "back to you" in last_text.lower(): + return False # Don't terminate + + # Fall back to LLM evaluation + return llm_verdict() +``` + +**Benefits:** +- ✅ System messages don't end conversation +- ✅ Agent handoffs continue smoothly +- ✅ More reliable orchestration + +--- + +### 5. Entry Points (Bot & API) + +**Files:** `src/bots/assistant_bot.py`, `src/routes/api/chats.py` + +**What changed:** Both entry points now follow the same pattern for patient context. + +**Before:** +```python +async def on_message_activity(turn_context): + # ❌ Simple, no patient awareness + chat_ctx = await accessor.read(conversation_id) + chat_ctx.chat_history.add_user_message(user_text) + await process_chat(chat, chat_ctx) + await accessor.write(chat_ctx) +``` + +**After:** +```python +async def on_message_activity(turn_context): + # STEP 1: Load session context + chat_ctx = await accessor.read(conversation_id, None) + + # STEP 2: Check for clear command + if await handle_clear_command(user_text, chat_ctx): + return # Archives everything + + # STEP 3: ✅ NEW: Patient context decision + decision, timing = await patient_service.decide_and_apply( + user_text, chat_ctx + ) + + # STEP 4: ✅ NEW: Handle error cases + if decision == "NEEDS_PATIENT_ID": + await send_error("I need a valid patient ID") + return + + # STEP 5: ✅ NEW: Load patient-specific history + if chat_ctx.patient_id: + isolated = await accessor.read(conversation_id, chat_ctx.patient_id) + if isolated: + chat_ctx.chat_history = isolated.chat_history + + # STEP 6: ✅ NEW: Inject fresh ephemeral snapshot + snapshot = create_snapshot(chat_ctx) + chat_ctx.chat_history.messages.insert(0, snapshot) + + # STEP 7: Add user message and process + chat_ctx.chat_history.add_user_message(user_text) + await process_chat(chat, chat_ctx) + + # STEP 8: Save (snapshot auto-filtered by accessor) + await accessor.write(chat_ctx) +``` + +**Key Additions:** +1. ✅ Patient context service integration +2. ✅ Enhanced clear command (bulk archive) +3. ✅ Isolated history loading +4. ✅ Ephemeral snapshot injection +5. ✅ Error handling for invalid IDs + +--- + +## Step-by-Step Turn Flow + +### Scenario: User Discusses Two Patients + +This example shows a complete conversation where the user works with two different patients. + +--- + +#### **Turn 1: First time mentioning patient_4** + +**User types:** `"review patient_4 labs"` + +**What happens:** + +``` +1. Load session file + Result: Empty (brand new conversation) + +2. Check if user said "clear" + Result: No + +3. Patient context decision + • Load registry → No registry file exists yet + • Ask analyzer: What does user want? + Analyzer says: "ACTIVATE_NEW patient_4" + • Validate: "patient_4" matches our pattern ✅ + • Decision: NEW_BLANK (create new patient) + • Action: + - Create a PatientContext for patient_4 + - Write registry file with patient_4 as active + +4. Load patient_4's history file + Result: Doesn't exist yet (first time) → Use empty history + +5. Create and inject snapshot + Add to position [0]: SYSTEM: "PATIENT_CONTEXT_JSON: {patient_id: 'patient_4'}" + This tells agents: "You're working on patient_4" + +6. Add user's message + [1] USER: "review patient_4 labs" + +7. Agents respond + [2] ASSISTANT: "Plan: 1. PatientHistory will load labs..." + +8. Save everything + • Snapshot is automatically filtered out + • Only save: [USER message, ASSISTANT message] + • File: patient_patient_4_context.json +``` + +**Storage after Turn 1:** +``` +conversation_abc123/ +├── patient_patient_4_context.json ← Created with 2 messages +└── patient_context_registry.json ← Created + { + "active_patient_id": "patient_4", + "patient_registry": { + "patient_4": {...} + } + } +``` + +--- + +#### **Turn 2: Continuing with patient_4** + +**User types:** `"yes proceed"` + +**What happens:** + +``` +1. Load session file + Result: Empty (still no session-level messages) + +2. Check if user said "clear" + Result: No + +3. Patient context decision + • Load registry → patient_4 is currently active + • Ask analyzer: What does user want? + Message is short ("yes proceed") + Heuristic: Skip analyzer, assume UNCHANGED + • Decision: UNCHANGED (keep patient_4) + +4. Load patient_4's history file + Result: Contains 2 messages from Turn 1: + [0] USER: "review patient_4 labs" + [1] ASSISTANT: "Plan: ..." + +5. Create and inject fresh snapshot + Add to position [0]: SYSTEM: "PATIENT_CONTEXT_JSON: {patient_id: 'patient_4'}" + Now history looks like: + [0] SYSTEM: snapshot + [1] USER: "review patient_4 labs" + [2] ASSISTANT: "Plan: ..." + +6. Add user's new message + [3] USER: "yes proceed" + +7. Agents respond + PatientHistory: "Here are patient_4's labs... Back to you Orchestrator." + [Termination check: Sees "back to you" → Continue, don't stop] + Orchestrator: "Labs received. Moving to next step..." + +8. Save everything + • Snapshot is automatically filtered out + • Save: [4 messages total now] +``` + +--- + +#### **Turn 3: Switching to a different patient** + +**User types:** `"switch to patient_15"` + +**What happens:** + +``` +1. Load session file + Result: Empty + +2. Check if user said "clear" + Result: No + +3. Patient context decision + • Load registry → patient_4 is currently active + • Ask analyzer: What does user want? + Analyzer says: "ACTIVATE_NEW patient_15" + • Validate: "patient_15" matches pattern, NOT in registry yet + • Decision: NEW_BLANK (create new patient) + • Action: + - Create PatientContext for patient_15 + - Update registry: active = patient_15 + - ⚠️ RESET KERNEL (clear analyzer's memory to prevent patient_4 data leaking) + - Write updated registry + +4. Load patient_15's history file + Result: Doesn't exist → Use empty history + (patient_4's history remains untouched in its own file) + +5. Create and inject fresh snapshot + Add to position [0]: SYSTEM: "PATIENT_CONTEXT_JSON: { + patient_id: 'patient_15', + all_patient_ids: ['patient_4', 'patient_15'] + }" + Shows agents: "Now working on patient_15, patient_4 still exists" + +6. Add user's message + [1] USER: "switch to patient_15" + +7. Agents respond + [2] ASSISTANT: "Switched to patient_15. What would you like to review?" + +8. Save everything + • Snapshot filtered out + • Save to: patient_patient_15_context.json (NEW FILE) +``` + +**Storage after Turn 3:** +``` +conversation_abc123/ +├── patient_patient_4_context.json ← Still has patient_4's history (4 messages) +├── patient_patient_15_context.json ← NEW: patient_15's history (2 messages) +└── patient_context_registry.json ← Updated + { + "active_patient_id": "patient_15", ← Changed from patient_4 + "patient_registry": { + "patient_4": {...}, + "patient_15": {...} ← Added + } + } +``` + +**Key Point:** patient_4's history is completely isolated and unchanged! + +--- + +#### **Turn 4: Clearing everything** + +**User types:** `"clear patient context"` + +**What happens:** + +``` +1. Load session file + Result: Empty + +2. Check if user said "clear" + Result: YES ✅ + +3. Clear command handler runs + • Find all files: + - patient_patient_4_context.json + - patient_patient_15_context.json + - patient_context_registry.json + + • Create archive folder: archive/20250930T164500/ + + • Copy each file to archive with timestamp: + archive/20250930T164500/conversation_abc123/ + ├── 20250930T164500_patient_patient_4_archived.json + ├── 20250930T164500_patient_patient_15_archived.json + └── 20250930T164500_patient_context_registry_archived.json + + • Delete original files + + • Reset in-memory state: + - chat_ctx.patient_id = None + - chat_ctx.patient_contexts = {} + + • Send message: "Patient context cleared" + +4-8. Skipped (already returned after clear) +``` + +**Storage after Turn 4:** +``` +conversation_abc123/ +└── archive/ + └── 20250930T164500/ + └── conversation_abc123/ + ├── 20250930T164500_patient_patient_4_archived.json + ├── 20250930T164500_patient_patient_15_archived.json + └── 20250930T164500_patient_context_registry_archived.json +``` + +**Result:** Clean slate! Ready for new patients. + +--- + +### Visual Summary + +``` +Turn 1: "review patient_4" + → Create patient_4 ✅ + → patient_4_context.json created + → Registry: active = patient_4 + +Turn 2: "yes proceed" + → Continue with patient_4 ✅ + → patient_4_context.json updated (more messages) + → Registry: active = patient_4 (unchanged) + +Turn 3: "switch to patient_15" + → Create patient_15 ✅ + → patient_15_context.json created + → patient_4_context.json untouched + → Registry: active = patient_15 + +Turn 4: "clear patient context" + → Archive all files ✅ + → Delete originals + → Ready for fresh start +``` + +--- + +## Configuration + +### Customizing Patient ID Pattern + +By default, the system accepts IDs like `patient_4`, `patient_15`, etc. + +You can customize this via environment variable: + +```bash +# Default pattern +export PATIENT_ID_PATTERN="^patient_[0-9]+$" + +# Medical Record Number format +export PATIENT_ID_PATTERN="^MRN[0-9]{7}$" +# Accepts: MRN1234567 + +# Multiple formats +export PATIENT_ID_PATTERN="^(patient_[0-9]+|MRN[0-9]{7})$" +# Accepts: patient_4 OR MRN1234567 +``` + +> [!IMPORTANT] +> If you change the pattern, update the analyzer prompt in `patient_context_analyzer.py` to match. + +--- + +## Key Concepts Explained + +### What is "Ephemeral Snapshot"? + +**Simple explanation:** A temporary system message that tells agents about the current patient. It's generated fresh every turn and **never saved**. + +```python +# Generated every turn: +snapshot = { + "patient_id": "patient_4", + "all_patient_ids": ["patient_4", "patient_15"], + "generated_at": "2025-09-30T16:45:00Z" +} + +# Injected as message: +SYSTEM: "PATIENT_CONTEXT_JSON: {snapshot}" + +# Agents see this and know: "I'm working on patient_4" + +# When saving: This message is filtered out (never persisted) +``` + +### What is "Kernel Reset"? + +**Simple explanation:** When switching patients, the analyzer's AI is reset to prevent mixing patient data. + +```python +# Without reset: +User: "review patient_4" +Analyzer: [builds understanding of patient_4] +User: "switch to patient_15" +Analyzer: [still has patient_4 context in memory] ❌ + +# With reset: +User: "review patient_4" +Analyzer: [builds understanding of patient_4] +User: "switch to patient_15" +Service: kernel.reset() # Clears analyzer memory +Analyzer: [fresh start for patient_15] ✅ +``` + +### What is "Registry as Source of Truth"? + +**Simple explanation:** The registry file always has the correct answer for "which patient is active". + +```python +# Registry file: +{ "active_patient_id": "patient_4" } + +# When loading: +1. Read registry → "patient_4 is active" +2. Load patient_4_context.json +3. Generate fresh snapshot from registry +4. Inject snapshot + +# Benefits: +- No stale snapshots +- Always accurate +- Single source of truth +``` + +--- + +## Summary + +**What you need to remember:** + +1. **Each patient gets their own history file** - Complete isolation +2. **Registry tracks which patient is active** - Single source of truth +3. **LLM automatically detects patient intent** - No manual parsing +4. **Fresh snapshot injected every turn** - Never persisted +5. **Agents see current patient context** - Grounded responses +6. **Safe switching with kernel reset** - No cross-contamination +7. **Bulk archival on clear** - Organized and complete + +**The result:** You can work on multiple patients in one conversation, switch between them seamlessly, and the system keeps everything organized and isolated. + +--- + +**Last Updated:** October 1, 2025 +**Status:** Production-ready (`sekar/pc_poc` branch) \ No newline at end of file diff --git a/scripts/uploadPatientData.ps1 b/scripts/uploadPatientData.ps1 index 341a7bb..c0689d1 100644 --- a/scripts/uploadPatientData.ps1 +++ b/scripts/uploadPatientData.ps1 @@ -77,4 +77,4 @@ Get-ChildItem -Path $localFolderPath | ForEach-Object { Write-Output "Uploading patient data from $path" az storage blob upload-batch --account-name $storageAccountName --destination "$containerName/$patientFolder" --source $path --auth-mode login --overwrite true } -} +} \ No newline at end of file diff --git a/src/bots/assistant_bot.py b/src/bots/assistant_bot.py index d7a6f66..9620fb3 100644 --- a/src/bots/assistant_bot.py +++ b/src/bots/assistant_bot.py @@ -1,20 +1,25 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. import asyncio import logging import os +import json +from datetime import datetime, timezone from botbuilder.core import MessageFactory, TurnContext from botbuilder.core.teams import TeamsActivityHandler from botbuilder.integration.aiohttp import CloudAdapter from botbuilder.schema import Activity, ActivityTypes from semantic_kernel.agents import AgentGroupChat +from semantic_kernel.contents import ChatMessageContent, TextContent, AuthorRole from data_models.app_context import AppContext from data_models.chat_context import ChatContext -from errors import NotAuthorizedError + from group_chat import create_group_chat +from services.patient_context_service import PatientContextService, PATIENT_CONTEXT_PREFIX +from services.patient_context_analyzer import PatientContextAnalyzer logger = logging.getLogger(__name__) @@ -32,44 +37,32 @@ def __init__( self.name = agent["name"] self.turn_contexts = turn_contexts self.adapters = adapters - self.adapters[self.name].on_turn_error = self.on_error # add error handling + self.adapters[self.name].on_turn_error = self.on_error self.data_access = app_context.data_access self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - async def get_bot_context( - self, conversation_id: str, bot_name: str, turn_context: TurnContext - ): + analyzer = PatientContextAnalyzer(token_provider=app_context.cognitive_services_token_provider) + self.patient_context_service = PatientContextService( + analyzer=analyzer, + registry_accessor=app_context.data_access.patient_context_registry_accessor, + context_accessor=app_context.data_access.chat_context_accessor + ) + + async def get_bot_context(self, conversation_id: str, bot_name: str, turn_context: TurnContext): if conversation_id not in self.turn_contexts: self.turn_contexts[conversation_id] = {} - if bot_name not in self.turn_contexts[conversation_id]: context = await self.create_turn_context(bot_name, turn_context) - self.turn_contexts[conversation_id][bot_name] = context - return self.turn_contexts[conversation_id][bot_name] async def create_turn_context(self, bot_name, turn_context): - app_id = next( - agent["bot_id"] for agent in self.all_agents if agent["name"] == bot_name - ) - - # Lookup adapter for bot_name. bot_name maybe different from self.name. + app_id = next(agent["bot_id"] for agent in self.all_agents if agent["name"] == bot_name) adapter = self.adapters[bot_name] claims_identity = adapter.create_claims_identity(app_id) - connector_factory = ( - adapter.bot_framework_authentication.create_connector_factory( - claims_identity - ) - ) - connector_client = await connector_factory.create( - turn_context.activity.service_url, "https://api.botframework.com" - ) - user_token_client = ( - await adapter.bot_framework_authentication.create_user_token_client( - claims_identity - ) - ) + connector_factory = adapter.bot_framework_authentication.create_connector_factory(claims_identity) + connector_client = await connector_factory.create(turn_context.activity.service_url, "https://api.botframework.com") + user_token_client = await adapter.bot_framework_authentication.create_user_token_client(claims_identity) async def logic(context: TurnContext): pass @@ -81,131 +74,186 @@ async def logic(context: TurnContext): context.turn_state[CloudAdapter.CONNECTOR_FACTORY_KEY] = connector_factory context.turn_state[CloudAdapter.BOT_OAUTH_SCOPE_KEY] = "https://api.botframework.com/.default" context.turn_state[CloudAdapter.BOT_CALLBACK_HANDLER_KEY] = logic - return context + async def _handle_clear_command(self, content: str, chat_ctx: ChatContext, conversation_id: str) -> bool: + content_lower = content.lower().strip() + if content_lower in ["clear", "clear patient", "clear context", "clear patient context"]: + logger.info(f"Processing clear command for conversation: {conversation_id}") + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") + archive_folder = f"archive/{timestamp}" + try: + await self.data_access.chat_context_accessor.archive_to_folder(conversation_id, None, archive_folder) + try: + patient_registry, _ = await self.patient_context_service.registry_accessor.read_registry(conversation_id) + if patient_registry: + for pid in patient_registry.keys(): + await self.data_access.chat_context_accessor.archive_to_folder(conversation_id, pid, archive_folder) + except Exception: + if getattr(chat_ctx, "patient_contexts", None): + for pid in chat_ctx.patient_contexts.keys(): + await self.data_access.chat_context_accessor.archive_to_folder(conversation_id, pid, archive_folder) + await self.patient_context_service.registry_accessor.archive_registry(conversation_id) + except Exception as e: + logger.warning(f"Clear archival issues: {e}") + finally: + chat_ctx.patient_context = None + if hasattr(chat_ctx, "patient_contexts"): + chat_ctx.patient_contexts.clear() + chat_ctx.chat_history.clear() + chat_ctx.patient_id = None + await self.data_access.chat_context_accessor.write(chat_ctx) + return True + return False + async def on_message_activity(self, turn_context: TurnContext) -> None: conversation_id = turn_context.activity.conversation.id chat_context_accessor = self.data_access.chat_context_accessor chat_artifact_accessor = self.data_access.chat_artifact_accessor - # Load chat context - chat_ctx = await chat_context_accessor.read(conversation_id) + raw_user_text = turn_context.remove_recipient_mention(turn_context.activity).strip() + + try: + chat_ctx = await chat_context_accessor.read(conversation_id, None) + except Exception: + chat_ctx = ChatContext(conversation_id) - # Delete thread if user asks - if turn_context.activity.text.endswith("clear"): - # Add clear message to chat history - chat_ctx.chat_history.add_user_message(turn_context.activity.text.strip()) - await chat_context_accessor.archive(chat_ctx) + if await self._handle_clear_command(raw_user_text, chat_ctx, conversation_id): await chat_artifact_accessor.archive(conversation_id) await turn_context.send_activity("Conversation cleared!") return + + decision, timing = await self.patient_context_service.decide_and_apply(raw_user_text, chat_ctx) + if decision == "NEEDS_PATIENT_ID": + await turn_context.send_activity( + "I need a patient ID like 'patient_4' (e.g., '@Orchestrator start tumor board review for patient_4')." + ) + return + + if chat_ctx.patient_id: + try: + isolated = await chat_context_accessor.read(conversation_id, chat_ctx.patient_id) + if isolated and isolated.chat_history.messages: + chat_ctx.chat_history = isolated.chat_history + except Exception: + pass + + # Inject fresh ephemeral PATIENT_CONTEXT_JSON snapshot + filtered = [] + for m in chat_ctx.chat_history.messages: + if not (m.role == AuthorRole.SYSTEM and hasattr(m, "items") and m.items + and getattr(m.items[0], "text", "").startswith(PATIENT_CONTEXT_PREFIX)): + filtered.append(m) + chat_ctx.chat_history.messages = filtered + snapshot = { + "conversation_id": chat_ctx.conversation_id, + "patient_id": chat_ctx.patient_id, + "all_patient_ids": sorted(getattr(chat_ctx, "patient_contexts", {}).keys()), + "generated_at": datetime.utcnow().isoformat() + "Z" + } + line = f"{PATIENT_CONTEXT_PREFIX}: {json.dumps(snapshot, separators=(',', ':'))}" + sys_msg = ChatMessageContent(role=AuthorRole.SYSTEM, items=[TextContent(text=line)]) + chat_ctx.chat_history.messages.insert(0, sys_msg) + agents = self.all_agents - if len(chat_ctx.chat_history.messages) == 0: - # new conversation. Let's see which agents are available. - async def is_part_of_conversation(agent): - context = await self.get_bot_context(turn_context.activity.conversation.id, agent["name"], turn_context) - typing_activity = Activity( - type=ActivityTypes.typing, - relates_to=turn_context.activity.relates_to, - ) - typing_activity.apply_conversation_reference( - turn_context.activity.get_conversation_reference() - ) - context.activity = typing_activity + if len(chat_ctx.chat_history.messages) == 1: # only the snapshot present + async def is_part(agent): + context = await self.get_bot_context(conversation_id, agent["name"], turn_context) + typing = Activity(type=ActivityTypes.typing, relates_to=turn_context.activity.relates_to) + typing.apply_conversation_reference(turn_context.activity.get_conversation_reference()) + context.activity = typing try: - await context.send_activity(typing_activity) + await context.send_activity(typing) return True - except Exception as e: - logger.info(f"Failed to send typing activity to {agent['name']}: {e}") - # This happens if the agent is not part of the group chat. - # Remove the agent from the list of available agents + except Exception: return False - part_of_conversation = await asyncio.gather(*(is_part_of_conversation(agent) for agent in self.all_agents)) - agents = [agent for agent, should_include in zip(self.all_agents, part_of_conversation) if should_include] + flags = await asyncio.gather(*(is_part(a) for a in self.all_agents)) + agents = [a for a, ok in zip(self.all_agents, flags) if ok] (chat, chat_ctx) = create_group_chat(self.app_context, chat_ctx, participants=agents) - # Add user message to chat history - text = turn_context.remove_recipient_mention(turn_context.activity).strip() - text = f"{self.name}: {text}" - chat_ctx.chat_history.add_user_message(text) + # Add raw user message + chat_ctx.chat_history.add_user_message(raw_user_text) chat.is_complete = False await self.process_chat(chat, chat_ctx, turn_context) - # Save chat context try: await chat_context_accessor.write(chat_ctx) - except: + except Exception: logger.exception("Failed to save chat context.") async def on_error(self, context: TurnContext, error: Exception): - # This error is raised as Exception, so we can only use the message to handle the error. + from errors import NotAuthorizedError if str(error) == "Unable to proceed while another agent is active.": await context.send_activity("Please wait for the current agent to finish.") elif isinstance(error, NotAuthorizedError): - logger.warning(error) await context.send_activity("You are not authorized to access this agent.") else: - # default exception handling - logger.exception(f"Agent {self.name} encountered an error") - await context.send_activity(f"Orchestrator is working on solving your problems, please retype your request") + await context.send_activity("Orchestrator encountered an error. Please retry your request.") - async def process_chat( - self, chat: AgentGroupChat, chat_ctx: ChatContext, turn_context: TurnContext - ): - # If the mentioned agent is a facilitator, proceed with group chat. - # Otherwise, proceed with standalone chat using the mentioned agent. - agent_config = next(agent_config for agent_config in self.all_agents if agent_config["name"] == self.name) - mentioned_agent = None if agent_config.get("facilitator", False) \ - else next(agent for agent in chat.agents if agent.name == self.name) + async def process_chat(self, chat: AgentGroupChat, chat_ctx: ChatContext, turn_context: TurnContext): + agent_cfg = next(cfg for cfg in self.all_agents if cfg["name"] == self.name) + mentioned_agent = None if agent_cfg.get("facilitator", False) else next( + a for a in chat.agents if a.name == self.name) async for response in chat.invoke(agent=mentioned_agent): - context = await self.get_bot_context( - turn_context.activity.conversation.id, response.name, turn_context - ) - if response.content.strip() == "": + if not response.content.strip(): continue + active_pid = chat_ctx.patient_id + all_pids = sorted(getattr(chat_ctx, "patient_contexts", {}).keys()) + final_content = response.content + + # Option 3 guard + added Session ID line + if all_pids and "PT_CTX:" not in response.content: + roster = ", ".join(f"`{p}`{' (active)' if p == active_pid else ''}" for p in all_pids) + pt_ctx_block = "\n\n---\n*PT_CTX:*\n" + pt_ctx_block += f"- **Session ID:** `{chat_ctx.conversation_id}`\n" + pt_ctx_block += f"- **Patient ID:** `{active_pid}`\n" if active_pid else "- *No active patient.*\n" + pt_ctx_block += f"- **Session Patients:** {roster}" + final_content = f"{response.content}{pt_ctx_block}" + + if hasattr(response, "items") and response.items: + response.items[0].text = final_content + else: + response = ChatMessageContent( + role=response.role, + items=[TextContent(text=final_content)], + name=getattr(response, "name", None) + ) + msgText = self._append_links_to_msg(response.content, chat_ctx) msgText = await self.generate_sas_for_blob_urls(msgText, chat_ctx) + context = await self.get_bot_context(turn_context.activity.conversation.id, response.name, turn_context) activity = MessageFactory.text(msgText) - activity.apply_conversation_reference( - turn_context.activity.get_conversation_reference() - ) + activity.apply_conversation_reference(turn_context.activity.get_conversation_reference()) context.activity = activity - await context.send_activity(activity) if chat.is_complete: break def _append_links_to_msg(self, msgText: str, chat_ctx: ChatContext) -> str: - # Add patient data links to response try: - image_urls = chat_ctx.display_image_urls - clinical_trial_urls = chat_ctx.display_clinical_trials - - # Display loaded images - if image_urls: + imgs = getattr(chat_ctx, "display_image_urls", []) + trials = chat_ctx.display_clinical_trials + if imgs: msgText += "

Patient Images

" - for url in image_urls: - filename = url.split("/")[-1] - msgText += f"{filename}" - - # Display clinical trials - if clinical_trial_urls: + for url in imgs: + fname = url.split("/")[-1] + msgText += f"{fname}" + if trials: msgText += "

Clinical trials

" - for url in clinical_trial_urls: + for url in trials: trial = url.split("/")[-1] msgText += f"
  • {trial}
  • " - return msgText finally: - chat_ctx.display_image_urls = [] + if hasattr(chat_ctx, "display_image_urls"): + chat_ctx.display_image_urls = [] chat_ctx.display_clinical_trials = [] async def generate_sas_for_blob_urls(self, msgText: str, chat_ctx: ChatContext) -> str: @@ -213,7 +261,6 @@ async def generate_sas_for_blob_urls(self, msgText: str, chat_ctx: ChatContext) for blob_url in chat_ctx.display_blob_urls: blob_sas_url = await self.data_access.blob_sas_delegate.get_blob_sas_url(blob_url) msgText = msgText.replace(blob_url, blob_sas_url) - return msgText finally: chat_ctx.display_blob_urls = [] diff --git a/src/data_models/chat_context.py b/src/data_models/chat_context.py index 8a1d64c..5ee7e8f 100644 --- a/src/data_models/chat_context.py +++ b/src/data_models/chat_context.py @@ -2,19 +2,35 @@ # Licensed under the MIT license. import os +from dataclasses import dataclass, field +from typing import Dict, Any from semantic_kernel.contents.chat_history import ChatHistory +@dataclass +class PatientContext: + """ + Minimal per-patient context for patient isolation. + """ + patient_id: str + facts: Dict[str, Any] = field(default_factory=dict) + + class ChatContext: def __init__(self, conversation_id: str): self.conversation_id = conversation_id self.chat_history = ChatHistory() + + # Patient context fields self.patient_id = None + self.patient_contexts: Dict[str, PatientContext] = {} + + # Legacy / display fields (still in use by various UI & agents) self.patient_data = [] self.display_blob_urls = [] self.display_image_urls = [] self.display_clinical_trials = [] self.output_data = [] - self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.healthcare_agents = {} + self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) diff --git a/src/data_models/chat_context_accessor.py b/src/data_models/chat_context_accessor.py index 772ab3b..b2e5659 100644 --- a/src/data_models/chat_context_accessor.py +++ b/src/data_models/chat_context_accessor.py @@ -8,9 +8,10 @@ from azure.core.exceptions import ResourceNotFoundError from azure.storage.blob.aio import BlobServiceClient -from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents import ChatMessageContent, AuthorRole, TextContent -from data_models.chat_context import ChatContext +from data_models.chat_context import ChatContext, PatientContext +from services.patient_context_service import PATIENT_CONTEXT_PREFIX # reuse constant logger = logging.getLogger(__name__) @@ -33,87 +34,175 @@ class ChatContextAccessor: - Append the "clear" message to chat history. - Save ChatContext to `{datetime}_chat_context.json`. - Delete `chat_context.json` + 7. Hybrid accessor supporting session + per-patient isolation. + 8. Ephemeral PATIENT_CONTEXT_JSON system messages are stripped (never persisted). """ def __init__(self, blob_service_client: BlobServiceClient, container_name: str = "chat-sessions",): self.blob_service_client = blob_service_client self.container_client = blob_service_client.get_container_client(container_name) - def get_blob_path(self, conversation_id: str) -> str: - return f"{conversation_id}/chat_context.json" + def get_blob_path(self, conversation_id: str, patient_id: str = None) -> str: + if patient_id: + return f"{conversation_id}/patient_{patient_id}_context.json" + return f"{conversation_id}/session_context.json" - async def read(self, conversation_id: str) -> ChatContext: - """Read the chat context for a given conversation ID.""" + async def read(self, conversation_id: str, patient_id: str = None) -> ChatContext: start = time() try: - blob_path = self.get_blob_path(conversation_id) + blob_path = self.get_blob_path(conversation_id, patient_id) blob_client = self.container_client.get_blob_client(blob_path) blob = await blob_client.download_blob() - blob_str = await blob.readall() - decoded_str = blob_str.decode("utf-8") - return self.deserialize(decoded_str) - except: - return ChatContext(conversation_id) + decoded_str = (await blob.readall()).decode("utf-8") + context = self.deserialize(decoded_str) + + if patient_id: + context.patient_id = patient_id + if patient_id not in context.patient_contexts: + context.patient_contexts[patient_id] = PatientContext(patient_id=patient_id) + else: + context.patient_id = None + return context + except ResourceNotFoundError: + context = ChatContext(conversation_id) + if patient_id: + context.patient_id = patient_id + context.patient_contexts[patient_id] = PatientContext(patient_id=patient_id) + return context + except Exception as e: + logger.warning("Failed to read context %s/%s: %s", + conversation_id, patient_id or "session", e) + context = ChatContext(conversation_id) + if patient_id: + context.patient_id = patient_id + context.patient_contexts[patient_id] = PatientContext(patient_id=patient_id) + return context finally: - logger.info(f"Read ChatContext for {conversation_id}. Duration: {time() - start}s") + logger.info("Read ChatContext %s/%s in %.3fs", + conversation_id, patient_id or "session", time() - start) async def write(self, chat_ctx: ChatContext) -> None: - """Write the chat context for a given conversation ID.""" start = time() try: - blob_path = self.get_blob_path(chat_ctx.conversation_id) + blob_path = self.get_blob_path(chat_ctx.conversation_id, chat_ctx.patient_id) blob_client = self.container_client.get_blob_client(blob_path) - blob_str = self.serialize(chat_ctx) - await blob_client.upload_blob(blob_str, overwrite=True) + await blob_client.upload_blob(self.serialize(chat_ctx), overwrite=True) finally: - logger.info(f"Wrote ChatContext for {chat_ctx.conversation_id}. Duration: {time() - start}s") + logger.info("Wrote ChatContext %s/%s in %.3fs", + chat_ctx.conversation_id, chat_ctx.patient_id or "session", time() - start) async def archive(self, chat_ctx: ChatContext) -> None: - """Archive the chat context for a given conversation ID by renaming the blob.""" start = time() try: - # Archive the chat context timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") - archive_blob_path = f"{chat_ctx.conversation_id}/{timestamp}_chat_context.json" - archive_blob_str = self.serialize(chat_ctx) - await self.container_client.upload_blob(archive_blob_path, archive_blob_str, overwrite=True) + if chat_ctx.patient_id: + archive_blob_path = f"{chat_ctx.conversation_id}/{timestamp}_patient_{chat_ctx.patient_id}_archived.json" + else: + archive_blob_path = f"{chat_ctx.conversation_id}/{timestamp}_session_archived.json" + + await self.container_client.upload_blob(archive_blob_path, self.serialize(chat_ctx), overwrite=True) + blob_path = self.get_blob_path(chat_ctx.conversation_id, chat_ctx.patient_id) + try: + await self.container_client.delete_blob(blob_path) + except ResourceNotFoundError: + pass + finally: + logger.info("Archived ChatContext %s/%s in %.3fs", + chat_ctx.conversation_id, chat_ctx.patient_id or "session", time() - start) - # Delete the original chat context - blob_path = self.get_blob_path(chat_ctx.conversation_id) - await self.container_client.delete_blob(blob_path) - except ResourceNotFoundError: - # If the blob is not found, it means it has already been deleted or never existed. - pass + async def archive_to_folder(self, conversation_id: str, patient_id: str, archive_folder: str) -> None: + start = time() + try: + current_blob_path = self.get_blob_path(conversation_id, patient_id) + blob_client = self.container_client.get_blob_client(current_blob_path) + try: + blob = await blob_client.download_blob() + data = await blob.readall() + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") + if patient_id: + archive_blob_path = f"{archive_folder}/{conversation_id}/{timestamp}_patient_{patient_id}_archived.json" + else: + archive_blob_path = f"{archive_folder}/{conversation_id}/{timestamp}_session_archived.json" + + await self.container_client.upload_blob(archive_blob_path, data, overwrite=True) + await blob_client.delete_blob() + logger.info("Archived %s", archive_blob_path) + except ResourceNotFoundError: + logger.warning("Nothing to archive for %s/%s", conversation_id, patient_id or "session") + except Exception as e: + logger.error("Archive to folder failed %s/%s: %s", conversation_id, patient_id or "session", e) finally: - logger.info(f"Archive ran for {chat_ctx.conversation_id}. Duration: {time() - start}s") + logger.info("Archive-to-folder %s/%s finished in %.3fs", + conversation_id, patient_id or "session", time() - start) @staticmethod def serialize(chat_ctx: ChatContext) -> str: - """Serialize the chat context to a string.""" - return json.dumps( - { - "conversation_id": chat_ctx.conversation_id, - "chat_history": chat_ctx.chat_history.serialize(), - "patient_id": chat_ctx.patient_id, - "patient_data": chat_ctx.patient_data, - "display_blob_urls": chat_ctx.display_blob_urls, - "display_clinical_trials": chat_ctx.display_clinical_trials, - "output_data": chat_ctx.output_data, - "healthcare_agents": chat_ctx.healthcare_agents, - }, - indent=2, - ) + chat_messages = [] + skipped_pc = 0 + for msg in chat_ctx.chat_history.messages: + if hasattr(msg, "items") and msg.items: + content = msg.items[0].text if hasattr(msg.items[0], "text") else str(msg.items[0]) + else: + content = str(getattr(msg, "content", "") or "") + + # Skip ephemeral patient context snapshot + if msg.role == AuthorRole.SYSTEM and content.startswith(PATIENT_CONTEXT_PREFIX): + skipped_pc += 1 + continue + + chat_messages.append({ + "role": msg.role.value, + "content": content, + "name": getattr(msg, "name", None) + }) + + if skipped_pc: + logger.debug("Filtered %d PATIENT_CONTEXT_JSON system message(s) from serialization", skipped_pc) + + data = { + "conversation_id": chat_ctx.conversation_id, + "patient_id": chat_ctx.patient_id, + "chat_history": chat_messages, + "patient_data": chat_ctx.patient_data, + "display_blob_urls": chat_ctx.display_blob_urls, + "display_image_urls": getattr(chat_ctx, "display_image_urls", []), + "display_clinical_trials": chat_ctx.display_clinical_trials, + "output_data": chat_ctx.output_data, + "healthcare_agents": chat_ctx.healthcare_agents, + } + return json.dumps(data, indent=2, default=str) @staticmethod def deserialize(data_str: str) -> ChatContext: - """Deserialize the chat context from a string.""" data = json.loads(data_str) - ctx = ChatContext(data["conversation_id"]) - ctx.chat_history = ChatHistory.restore_chat_history(data["chat_history"]) - ctx.patient_id = data["patient_id"] - ctx.patient_data = data["patient_data"] - ctx.display_blob_urls = data["display_blob_urls"] - ctx.display_clinical_trials = data["display_clinical_trials"] - ctx.output_data = data["output_data"] - ctx.healthcare_agents = data.get("healthcare_agents", {}) - return ctx + context = ChatContext(data["conversation_id"]) + context.patient_id = data.get("patient_id") + + for msg_data in data.get("chat_history", []): + role_val = msg_data.get("role") + if not role_val: + continue + role = AuthorRole(role_val) + content_str = msg_data.get("content", "") + # Defensive skip in case an old file contained ephemeral snapshot + if role == AuthorRole.SYSTEM and content_str.startswith(PATIENT_CONTEXT_PREFIX): + continue + if role == AuthorRole.TOOL and not content_str: + continue + msg = ChatMessageContent( + role=role, + items=[TextContent(text=str(content_str))] + ) + name = msg_data.get("name") + if name: + msg.name = name + context.chat_history.messages.append(msg) + + context.patient_data = data.get("patient_data", []) + context.display_blob_urls = data.get("display_blob_urls", []) + context.display_image_urls = data.get("display_image_urls", []) + context.display_clinical_trials = data.get("display_clinical_trials", []) + context.output_data = data.get("output_data", []) + context.healthcare_agents = data.get("healthcare_agents", {}) + return context diff --git a/src/data_models/data_access.py b/src/data_models/data_access.py index cabcffc..b9d2ad3 100644 --- a/src/data_models/data_access.py +++ b/src/data_models/data_access.py @@ -16,9 +16,11 @@ from data_models.fabric.fabric_clinical_note_accessor import FabricClinicalNoteAccessor from data_models.fhir.fhir_clinical_note_accessor import FhirClinicalNoteAccessor from data_models.image_accessor import ImageAccessor +from data_models.patient_context_accessor import PatientContextRegistryAccessor logger = logging.getLogger(__name__) + class UserDelegationKeyDelegate: def __init__(self, blob_service_client: BlobServiceClient): self.blob_service_client = blob_service_client @@ -81,19 +83,20 @@ async def get_blob_sas_url( @dataclass(frozen=True) class DataAccess: - """ Data access layer for the application. """ + """Data access layer for the application.""" blob_sas_delegate: BlobSasDelegate chat_artifact_accessor: ChatArtifactAccessor chat_context_accessor: ChatContextAccessor clinical_note_accessor: ClinicalNoteAccessor image_accessor: ImageAccessor + patient_context_registry_accessor: PatientContextRegistryAccessor def create_data_access( blob_service_client: BlobServiceClient, credential: AsyncTokenCredential ) -> DataAccess: - """ Factory function to create a DataAccess object. """ + """Factory function to create a DataAccess object.""" # Create clinical note accessor based on the source clinical_notes_source = os.getenv("CLINICAL_NOTES_SOURCE") if clinical_notes_source == "fhir": @@ -116,4 +119,5 @@ def create_data_access( chat_context_accessor=ChatContextAccessor(blob_service_client), clinical_note_accessor=clinical_note_accessor, image_accessor=ImageAccessor(blob_service_client), + patient_context_registry_accessor=PatientContextRegistryAccessor(blob_service_client), ) diff --git a/src/data_models/patient_context_accessor.py b/src/data_models/patient_context_accessor.py new file mode 100644 index 0000000..dcc4a1d --- /dev/null +++ b/src/data_models/patient_context_accessor.py @@ -0,0 +1,117 @@ +import json +import logging +from datetime import datetime, timezone +from time import time +from typing import Dict, Optional, Tuple + +from azure.core.exceptions import ResourceNotFoundError +from azure.storage.blob.aio import BlobServiceClient + +logger = logging.getLogger(__name__) + + +class PatientContextRegistryAccessor: + """ + Manages patient context registry JSON files in blob storage. + Tracks which patients have been encountered in each conversation session. + """ + + def __init__(self, blob_service_client: BlobServiceClient, container_name: str = "chat-sessions"): + self.blob_service_client = blob_service_client + self.container_client = blob_service_client.get_container_client(container_name) + + def get_registry_blob_path(self, conversation_id: str) -> str: + """Get blob path for patient context registry file.""" + return f"{conversation_id}/patient_context_registry.json" + + async def _write_json_to_blob(self, blob_path: str, data: dict) -> None: + """Write JSON data to blob storage.""" + json_str = json.dumps(data, indent=2) + blob_client = self.container_client.get_blob_client(blob_path) + await blob_client.upload_blob(json_str, overwrite=True) + + async def read_registry(self, conversation_id: str) -> Tuple[Dict[str, Dict], Optional[str]]: + """Read patient context registry. Returns (patient_registry, active_patient_id).""" + start = time() + try: + blob_path = self.get_registry_blob_path(conversation_id) + blob_client = self.container_client.get_blob_client(blob_path) + blob = await blob_client.download_blob() + blob_str = await blob.readall() + decoded_str = blob_str.decode("utf-8") + registry_data = json.loads(decoded_str) + + logger.info(f"Read patient context registry for {conversation_id}. Duration: {time() - start}s") + return registry_data.get("patient_registry", {}), registry_data.get("active_patient_id") + + except ResourceNotFoundError: + logger.info(f"No existing patient context registry for {conversation_id}") + return {}, None + except Exception as e: + logger.warning(f"Failed to read patient context registry for {conversation_id}: {e}") + return {}, None + + async def write_registry(self, conversation_id: str, patient_registry: Dict[str, Dict], active_patient_id: str = None): + """Write patient context registry to blob storage.""" + try: + registry_data = { + "conversation_id": conversation_id, + "active_patient_id": active_patient_id, + "patient_registry": patient_registry, + "last_updated": datetime.utcnow().isoformat() + } + + blob_path = self.get_registry_blob_path(conversation_id) + await self._write_json_to_blob(blob_path, registry_data) + logger.info(f"Wrote patient registry for conversation {conversation_id}") + + except Exception as e: + logger.error(f"Failed to write patient registry: {e}") + raise + + async def update_patient_registry(self, conversation_id: str, patient_id: str, registry_entry: Dict, active_patient_id: str = None) -> None: + """Update registry entry for a specific patient in the conversation.""" + current_registry, current_active = await self.read_registry(conversation_id) + current_registry[patient_id] = { + **registry_entry, + "last_updated": datetime.now(timezone.utc).isoformat() + } + # Use provided active_patient_id or keep current + final_active = active_patient_id if active_patient_id is not None else current_active + await self.write_registry(conversation_id, current_registry, final_active) + + async def archive_registry(self, conversation_id: str) -> None: + """Archive patient context registry before clearing.""" + start = time() + try: + # Read current registry + current_registry, active_patient_id = await self.read_registry(conversation_id) + if not current_registry: + logger.info("No patient context registry to archive for %s", conversation_id) + return + + # Create archive + timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") + archive_blob_path = "%s/%s_patient_context_registry_archived.json" % (conversation_id, timestamp) + + archive_data = { + "conversation_id": conversation_id, + "archived_at": datetime.now(timezone.utc).isoformat(), + "active_patient_id": active_patient_id, + "patient_registry": current_registry + } + + await self._write_json_to_blob(archive_blob_path, archive_data) + + # Clear current registry by deleting the blob + try: + blob_path = self.get_registry_blob_path(conversation_id) + await self.container_client.delete_blob(blob_path) + logger.info("Cleared patient context registry for %s", conversation_id) + except ResourceNotFoundError: + logger.info("No patient context registry to clear for %s", conversation_id) + + logger.info("Archived patient context registry for %s. Duration: %ss", conversation_id, time() - start) + except Exception as e: + logger.error("Failed to archive patient context registry for %s: %s", conversation_id, e) + raise diff --git a/src/data_models/patient_context_models.py b/src/data_models/patient_context_models.py new file mode 100644 index 0000000..44a4374 --- /dev/null +++ b/src/data_models/patient_context_models.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional, Literal +from pydantic import BaseModel + +# Canonical analyzer action literal (shared across analyzer + service typing) +AnalyzerAction = Literal[ + "NONE", + "CLEAR", + "ACTIVATE_NEW", + "SWITCH_EXISTING", + "UNCHANGED", +] + + +class PatientContextDecision(BaseModel): + """ + Structured output returned by the patient context analyzer. + - action: one of AnalyzerAction + - patient_id: only populated when action implies activation/switch; otherwise None + - reasoning: brief natural language explanation (kept short by prompt) + """ + action: AnalyzerAction + patient_id: Optional[str] + reasoning: str + + +@dataclass +class TimingInfo: + """ + Timing breakdown for a single patient context decision cycle. + All values are seconds (float) measured with time.time() deltas. + - analyzer: model inference / structured output duration + - storage_fallback: time spent attempting restore-from-storage fallback (if any) + - service: total wall-clock for decide_and_apply orchestration + """ + analyzer: float + storage_fallback: float + service: float diff --git a/src/group_chat.py b/src/group_chat.py index be7af31..e31fb2c 100644 --- a/src/group_chat.py +++ b/src/group_chat.py @@ -57,6 +57,8 @@ async def auth_callback(): return auth_callback # Need to introduce a CustomChatCompletionAgent and a CustomHistoryChannel because of issue https://github.com/microsoft/semantic-kernel/issues/12095 + + class CustomHistoryChannel(ChatHistoryChannel): @override async def receive(self, history: list[ChatMessageContent],) -> None: @@ -178,7 +180,7 @@ def _create_agent(agent_config: dict): instructions = agent_config.get("instructions") if agent_config.get("facilitator") and instructions: instructions = instructions.replace( - "{{aiAgents}}", "\n\t\t".join([f"- {agent['name']}: {agent["description"]}" for agent in all_agents_config])) + "{{aiAgents}}", "\n\t\t".join([f"- {agent['name']}: {agent['description']}" for agent in all_agents_config])) return (CustomChatCompletionAgent(kernel=agent_kernel, name=agent_config["name"], @@ -219,7 +221,8 @@ def _create_agent(agent_config: dict): - **Once per turn**: Each participant can only speak once per turn. - **Default to {facilitator}**: Always default to {facilitator}. If no other participant is specified, {facilitator} goes next. - **Use best judgment**: If the rules are unclear, use your best judgment to determine who should go next, for the natural flow of the conversation. - + - **CONFIRMATION GATE (PLAN ONLY)**: If (a) the MOST RECENT message is from {facilitator} AND (b) it contains a multi-step plan (look for "Plan", "plan:", numbered steps like "1.", "2.", or multiple leading "-" bullet lines) AND (c) no user message has appeared AFTER that plan yet, then do NOT advance to another agent. Wait for a user reply. Output {facilitator} ONLY if absolutely necessary to politely prompt the user for confirmation (do not restate the entire plan). As soon as ANY user reply appears (question, modification, or confirmation), this gate is lifted. If the user used a confirmation token (confirm, yes, proceed, continue, ok, okay, sure, sounds good, go ahead), you may advance to the next required non-facilitator agent; otherwise select the participant that best addresses the user’s reply. + **Output**: Give the full reasoning for your choice and the verdict. The reasoning should include careful evaluation of each rule with an explanation. The verdict should be the name of the participant who should go next. History: @@ -247,6 +250,8 @@ def _create_agent(agent_config: dict): Commands addressed to a specific agent should result in 'no' if there is clear identification of the agent. Commands addressed to "you" or "User" should result in 'yes'. If you are not certain, return "yes". + Ignore any system metadata or patient context snapshots such as lines starting with "PATIENT_CONTEXT_JSON". + Treat internal handoff phrases like "back to you " as NOT terminating (answer is still being routed). EXAMPLES: - "User, can you confirm the correct patient ID?" => "yes" @@ -264,6 +269,27 @@ def _create_agent(agent_config: dict): def evaluate_termination(result): logger.info(f"Termination function result: {result}") + try: + if chat_ctx.chat_history.messages: + last = chat_ctx.chat_history.messages[-1] + # Robust flatten + if hasattr(last, "content"): + c = last.content + if isinstance(c, list): + last_text = " ".join(getattr(p, "text", str(p)) for p in c) + else: + last_text = str(c) + else: + last_text = str(last) + lt = last_text.lower() + if lt.startswith("patient_context_json"): + logger.debug("Termination override: patient context system message -> continue") + return False + if "back to you" in lt: + logger.debug("Termination override: internal handoff phrase -> continue") + return False + except Exception: + pass rule = ChatRule.model_validate_json(str(result.value[0])) return rule.verdict == "yes" diff --git a/src/healthcare_agents/agent.py b/src/healthcare_agents/agent.py index e05a5dd..2e52e81 100644 --- a/src/healthcare_agents/agent.py +++ b/src/healthcare_agents/agent.py @@ -9,7 +9,7 @@ from azure.keyvault.secrets.aio import SecretClient from semantic_kernel.agents.agent import Agent from semantic_kernel.agents.channels.agent_channel import AgentChannel -from semantic_kernel.contents import AuthorRole, ChatMessageContent +from semantic_kernel.contents import AuthorRole, ChatMessageContent, TextContent from semantic_kernel.exceptions import AgentInvokeException from data_models.app_context import AppContext @@ -39,8 +39,10 @@ async def receive(self, history: list[ChatMessageContent]) -> None: async def invoke(self, agent: "HealthcareAgent") -> AsyncIterable[tuple[bool, ChatMessageContent]]: logger.debug("Invoking agent: %s, with user input: %s", agent.name, self.history[-1].content) user_input = self.history[-1].content - user_message = ChatMessageContent(role=AuthorRole.USER, - content=user_input) + user_message = ChatMessageContent( + role=AuthorRole.USER, + items=[TextContent(text=str(user_input))] + ) self.history.append(user_message) if agent.client: @@ -49,7 +51,8 @@ async def invoke(self, agent: "HealthcareAgent") -> AsyncIterable[tuple[bool, Ch response_message = ChatMessageContent( role=AuthorRole.ASSISTANT, name=agent.name, - content=response_dict.get("text", "")) + items=[TextContent(text=response_dict.get("text", ""))] + ) self.history.append(response_message) yield True, response_message else: @@ -109,6 +112,7 @@ def __init__(self, retry_delay=config.retry_delay, timeout=config.timeout ) + # Restore conversation ID if it exists if name in self._chat_ctx.healthcare_agents: self._client.set_conversation_id( @@ -131,7 +135,7 @@ async def get_response(self, message: str) -> ChatMessageContent: return ChatMessageContent( role=AuthorRole.ASSISTANT, name=self.name, - content=response_dict.get("text", "") + items=[TextContent(text=response_dict.get("text", ""))] ) @override diff --git a/src/routes/api/chats.py b/src/routes/api/chats.py index 10fb08d..0677475 100644 --- a/src/routes/api/chats.py +++ b/src/routes/api/chats.py @@ -1,5 +1,5 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. import logging import uuid @@ -11,25 +11,30 @@ from fastapi.responses import JSONResponse from pydantic import BaseModel +from semantic_kernel.contents import AuthorRole, ChatMessageContent, TextContent +from services.patient_context_service import PatientContextService, PATIENT_CONTEXT_PREFIX +from services.patient_context_analyzer import PatientContextAnalyzer + from data_models.app_context import AppContext import group_chat logger = logging.getLogger(__name__) -# Custom JSON encoder that handles datetime + class DateTimeEncoder(json.JSONEncoder): def default(self, obj: Any) -> Any: if isinstance(obj, datetime): return obj.isoformat() return super().default(obj) -# Pydantic models for request/response + class MessageRequest(BaseModel): content: str sender: str mentions: Optional[List[str]] = None channelData: Optional[Dict] = None + class Message(BaseModel): id: str content: str @@ -37,47 +42,138 @@ class Message(BaseModel): timestamp: datetime isBot: bool mentions: Optional[List[str]] = None - + def dict(self, *args, **kwargs): - # Override dict method to handle datetime serialization d = super().dict(*args, **kwargs) - # Convert datetime to ISO format string - if isinstance(d.get('timestamp'), datetime): - d['timestamp'] = d['timestamp'].isoformat() + if isinstance(d.get("timestamp"), datetime): + d["timestamp"] = d["timestamp"].isoformat() return d + class MessageResponse(BaseModel): message: Message error: Optional[str] = None + class MessagesResponse(BaseModel): messages: List[Message] error: Optional[str] = None + class AgentsResponse(BaseModel): agents: List[str] error: Optional[str] = None -# Create a helper function to create JSON responses with datetime handling + def create_json_response(content, headers=None): - """Create a JSONResponse with proper datetime handling.""" return JSONResponse( content=content, headers=headers or {}, encoder=DateTimeEncoder ) + def chats_routes(app_context: AppContext): router = APIRouter() - - # Extract needed values from app_context agent_config = app_context.all_agent_configs data_access = app_context.data_access - - # Find the facilitator agent - facilitator_agent = next((agent for agent in agent_config if agent.get("facilitator")), agent_config[0]) + + analyzer = PatientContextAnalyzer(token_provider=app_context.cognitive_services_token_provider) + patient_context_service = PatientContextService( + analyzer=analyzer, + registry_accessor=app_context.data_access.patient_context_registry_accessor, + context_accessor=app_context.data_access.chat_context_accessor + ) + + facilitator_agent = next((a for a in agent_config if a.get("facilitator")), agent_config[0]) facilitator = facilitator_agent["name"] - + + # ===== Legacy helper retained (now always sees freshly injected snapshot) ===== + def _get_system_patient_context_json(chat_context) -> str | None: + """Return JSON payload from most recent (first-in-list after injection) PATIENT_CONTEXT system message.""" + for msg in reversed(chat_context.chat_history.messages): + if msg.role == AuthorRole.SYSTEM: + # Extract text + if hasattr(msg, "items") and msg.items: + text = getattr(msg.items[0], "text", "") or "" + else: + text = getattr(msg, "content", "") or "" + if text.startswith(PATIENT_CONTEXT_PREFIX): + json_part = text[len(PATIENT_CONTEXT_PREFIX):].lstrip() + if json_part.startswith(":"): + json_part = json_part[1:].lstrip() + return json_part or None + return None + + def _append_pc_ctx_display(base: str, chat_context) -> str: + """Append user-friendly PT_CTX block for UI (optional cosmetic).""" + json_payload = _get_system_patient_context_json(chat_context) + if not json_payload: + return base + try: + obj = json.loads(json_payload) + except Exception: + return base + + pid = obj.get("patient_id") + all_pids = obj.get("all_patient_ids") or [] + convo_id = obj.get("conversation_id") + + # Build lines with explicit leading newlines for clarity + lines: list[str] = [] + lines.append("\n\n---") + lines.append("\n*PT_CTX:*") + if pid: + lines.append(f"\n- **Patient ID:** `{pid}`") + else: + lines.append("\n- *No active patient.*") + if all_pids: + ids_str = ", ".join( + f"`{p}`{' (active)' if p == pid else ''}" for p in sorted(all_pids) + ) + lines.append(f"\n- **Session Patients:** {ids_str}") + if convo_id: + lines.append(f"\n- **Conversation ID:** `{convo_id}`") + + # If we only ended up with the header and separator, skip (unlikely) + if len(lines) <= 2: + return base + + return base + "".join(lines) + + async def _handle_clear_command(content: str, chat_context) -> bool: + content_lower = content.lower().strip() + if content_lower in ["clear", "clear patient", "clear context", "clear patient context"]: + logger.info("Processing clear command for conversation: %s", chat_context.conversation_id) + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") + archive_folder = f"archive/{timestamp}" + try: + # Archive session + await data_access.chat_context_accessor.archive_to_folder(chat_context.conversation_id, None, archive_folder) + # Archive each patient file from registry + try: + patient_registry, _ = await patient_context_service.registry_accessor.read_registry(chat_context.conversation_id) + if patient_registry: + for pid in patient_registry.keys(): + await data_access.chat_context_accessor.archive_to_folder(chat_context.conversation_id, pid, archive_folder) + except Exception: + if getattr(chat_context, "patient_contexts", None): + for pid in chat_context.patient_contexts.keys(): + await data_access.chat_context_accessor.archive_to_folder(chat_context.conversation_id, pid, archive_folder) + # Archive registry + await patient_context_service.registry_accessor.archive_registry(chat_context.conversation_id) + except Exception as e: + logger.warning("Clear archival issues: %s", e) + finally: + chat_context.patient_context = None + if hasattr(chat_context, "patient_contexts"): + chat_context.patient_contexts.clear() + chat_context.chat_history.messages.clear() + chat_context.patient_id = None + await data_access.chat_context_accessor.write(chat_context) + return True + return False + @router.get("/api/agents", response_model=AgentsResponse) async def get_available_agents(): """ @@ -86,7 +182,7 @@ async def get_available_agents(): try: # Extract agent names from the agent_config agent_names = [agent["name"] for agent in agent_config if "name" in agent] - + # Return the list of agent names return JSONResponse( content={"agents": agent_names, "error": None} @@ -97,96 +193,135 @@ async def get_available_agents(): content={"agents": [], "error": str(e)}, status_code=500 ) - + @router.websocket("/api/ws/chats/{chat_id}/messages") async def websocket_chat_endpoint(websocket: WebSocket, chat_id: str): - """WebSocket endpoint for streaming chat messages""" + await websocket.accept() + logger.info("WebSocket connection established for chat: %s", chat_id) + try: - await websocket.accept() - logger.info(f"WebSocket connection established for chat: {chat_id}") - - # Wait for the first message from the client - client_message = await websocket.receive_json() - logger.info(f"Received message over WebSocket: {client_message}") - - # Extract message content, sender and mentions - content = client_message.get("content", "") - sender = client_message.get("sender", "User") - mentions = client_message.get("mentions", []) - - # Try to read existing chat context or create a new one if it doesn't exist - try: - chat_context = await data_access.chat_context_accessor.read(chat_id) - except: - # If the chat doesn't exist, create a new one - chat_context = await data_access.chat_context_accessor.create_new(chat_id) - - # Add user message to history - chat_context.chat_history.add_user_message(content) - - # Create group chat instance - chat, chat_context = group_chat.create_group_chat(app_context, chat_context) - - # Process the message - determine target agent based on mentions - target_agent_name = facilitator # Default to facilitator agent - - if mentions and len(mentions) > 0: - # Use the first mentioned agent - target_agent_name = mentions[0] - - # Find the agent by name - target_agent = next( - (agent for agent in chat.agents if agent.name.lower() == target_agent_name.lower()), - chat.agents[0] # Fallback to first agent - ) - - logger.info(f"Using agent: {target_agent.name} to respond to WebSocket message") - - - # Check if the agent is the facilitator - if target_agent.name == facilitator: - target_agent = None # Force facilitator mode when target is the facilitator - - response_sent = False - - # Get responses from the target agent - async for response in chat.invoke(agent=target_agent): - # Skip responses with no content - if not response or not response.content: + while True: + data = await websocket.receive_json() + content = data.get("content", "").strip() + if not content: + await websocket.send_json({"error": "Empty message content"}) continue - - # Create bot response message for each response - bot_message = Message( - id=str(uuid.uuid4()), - content=response.content, - sender=response.name, - timestamp=datetime.now(timezone.utc), - isBot=True, - mentions=[] - ) - - # Convert to dict for JSON serialization - message_dict = bot_message.dict() - - # Send message over WebSocket - await websocket.send_json(message_dict) - - # Save chat context after all messages are processed - await data_access.chat_context_accessor.write(chat_context) - - # Send done signal - await websocket.send_json({"type": "done"}) - - + + try: + # STEP 1: Load session context + chat_context = await data_access.chat_context_accessor.read(chat_id, None) + + # STEP 2: Clear? + if await _handle_clear_command(content, chat_context): + msg = Message( + id=str(uuid.uuid4()), + content="The conversation has been cleared. How can I assist you today?", + sender="Orchestrator", + timestamp=datetime.now(timezone.utc), + isBot=True + ) + await websocket.send_json(msg.dict()) + await websocket.send_json({"type": "done"}) + continue + + # STEP 3: Patient decision + try: + decision, timing = await patient_context_service.decide_and_apply(content, chat_context) + logger.info("Patient context decision=%s active=%s", decision, chat_context.patient_id) + except Exception as e: + logger.warning("Patient context decision failed: %s", e) + decision = "NONE" + + # STEP 4: Special outcomes + if decision == "NEEDS_PATIENT_ID": + err = Message( + id=str(uuid.uuid4()), + content="I need a patient ID to proceed. Provide one like 'patient_4'.", + sender="Orchestrator", + timestamp=datetime.now(timezone.utc), + isBot=True + ) + await websocket.send_json(err.dict()) + await websocket.send_json({"type": "done"}) + continue + + # STEP 5: Load isolated patient history if active + if chat_context.patient_id: + try: + isolated = await data_access.chat_context_accessor.read(chat_id, chat_context.patient_id) + if isolated and isolated.chat_history.messages: + chat_context.chat_history = isolated.chat_history + except Exception as e: + logger.debug("Isolated load failed for %s: %s", chat_context.patient_id, e) + + # STEP 5.5: Inject fresh ephemeral PATIENT_CONTEXT_JSON system message (rebuild from current in-memory state) + # Remove existing snapshot(s) + new_messages = [] + for m in chat_context.chat_history.messages: + if not (m.role == AuthorRole.SYSTEM and hasattr(m, "items") and m.items + and getattr(m.items[0], "text", "").startswith(PATIENT_CONTEXT_PREFIX)): + new_messages.append(m) + chat_context.chat_history.messages = new_messages + snapshot = { + "conversation_id": chat_context.conversation_id, + "patient_id": chat_context.patient_id, + "all_patient_ids": sorted(getattr(chat_context, "patient_contexts", {}).keys()), + "generated_at": datetime.utcnow().isoformat() + "Z" + } + system_line = f"{PATIENT_CONTEXT_PREFIX}: {json.dumps(snapshot, separators=(',', ':'))}" + system_msg = ChatMessageContent(role=AuthorRole.SYSTEM, items=[TextContent(text=system_line)]) + chat_context.chat_history.messages.insert(0, system_msg) + + # STEP 6: Group chat & add user message + chat, chat_context = group_chat.create_group_chat(app_context, chat_context) + user_message = ChatMessageContent(role=AuthorRole.USER, items=[TextContent(text=content)]) + chat_context.chat_history.add_message(user_message) + + # STEP 7: Agent selection + target_agent_name = facilitator + if ":" in content: + candidate = content.split(":", 1)[0].strip() + if any(a.name.lower() == candidate.lower() for a in chat.agents): + target_agent_name = candidate + target_agent = next( + (a for a in chat.agents if a.name.lower() == target_agent_name.lower()), + chat.agents[0] + ) + if target_agent.name == facilitator: + target_agent = None + + # STEP 8: Invoke agents + async for response in chat.invoke(agent=target_agent): + if not response or not response.content: + continue + + # Optional UI block (system snapshot already grounds LLM) + response_with_ctx = _append_pc_ctx_display(response.content, chat_context) + + bot_message = Message( + id=str(uuid.uuid4()), + content=response_with_ctx, + sender=response.name, + timestamp=datetime.now(timezone.utc), + isBot=True + ) + await websocket.send_json(bot_message.dict()) + + # STEP 9: Persist (system snapshot filtered in accessor) + await data_access.chat_context_accessor.write(chat_context) + + except Exception as e: + logger.error("Error in WebSocket chat: %s", e) + await websocket.send_json({"error": str(e)}) + + await websocket.send_json({"type": "done"}) except WebSocketDisconnect: - logger.info(f"WebSocket client disconnected from chat: {chat_id}") + logger.info("WebSocket disconnected for chat: %s", chat_id) except Exception as e: - logger.exception(f"Error in WebSocket chat: {e}") + logger.error("WebSocket error: %s", e) try: - # Try to send error message to client await websocket.send_json({"error": str(e)}) - await websocket.send_json({"type": "done"}) - except: + except Exception: pass return router diff --git a/src/scenarios/default/config/agents.yaml b/src/scenarios/default/config/agents.yaml index f32f237..651cdb5 100644 --- a/src/scenarios/default/config/agents.yaml +++ b/src/scenarios/default/config/agents.yaml @@ -1,41 +1,80 @@ - name: Orchestrator instructions: | - You are an AI agent facilitating a discussion between group of AI agent experts and the user. You are not to make clinical recommendations or treatment plans. Follow these guidelines: - - 1. **Moderate the Discussion**: - Your primary role is to facilitate the discussion and ensure a smooth flow of conversation among the participants. - When a question is asked, think through who could best answer it. Formulate a plan and present it to the user. - Rely on other agents to provide missing information. First ask the agent what information they need to answer a question. - When asking the user for information, mention the user explicitly. "*User*, can you provide me with the patient's #BLANK?" - When addressing an agent, mention the agent explicitly. "*PatientHistory*, proceed with #BLANK." + You are an AI agent facilitating a discussion between a group of AI agent experts and the user. You are not to make clinical recommendations or treatment plans. Follow these guidelines: + + **Patient Context Awareness**: + When you receive a message with `PATIENT_CONTEXT_JSON`, extract: + - `patient_id` (current active patient) + - `all_patient_ids` (all patients discussed/activated this session) + Use these for patient-specific grounding. + IMPORTANT: Always review the actual chat history (messages for the current conversation) to confirm what has already been done for the active patient. + Do NOT assume tasks were done just because a patient exists in metadata; confirm by reading the history. + Never invent patient IDs. If no active patient and the user asks for patient-specific actions, ask for a valid patient ID. + + **Roster / Multi-Patient Meta Queries**: + For any user question like: + - "How many patients have we discussed?" + - "Which patients have we discussed?" + - "What other patients have been covered?" + - "List the patients in this session." + ALWAYS read the most recent `PATIENT_CONTEXT_JSON` system message and answer directly using: + - count = length of `all_patient_ids` + - list = the values in `all_patient_ids` + - active = `patient_id` + If `all_patient_ids` is empty, say no patients have been discussed. + Do NOT guess or rely on memory alone; do NOT omit patients that appear in `all_patient_ids`. + If the user requests a switch to a patient already active, simply acknowledge that the patient is already active (do not re-plan). + If the user requests a switch to a different patient, acknowledge the switch and proceed with planning for the new patient (do not claim you were already on that patient). + + **Conversation State Tracking**: + Before responding, review the chat history and determine: + - The original request or current goal. + - Which agents have already responded. + - The next logical agent or action. + - Whether the user has confirmed a proposed plan. + If a task (e.g., report creation) is already completed, do NOT restart it unless explicitly asked. + + **CRITICAL**: + Never restart completed processes unless the user explicitly requests a re-run. If ReportCreation has already completed a tumor board report for the active patient, treat that workflow as done. + + 1. **Moderate the Discussion**: + Facilitate orderly, purposeful agent participation. When a user request comes in, think through which agents are needed, form a short plan, and present it for confirmation (unless trivial continuation). + When asking the user for needed info, address them explicitly: "*User*, could you provide ...?" + When prompting an agent, address the agent explicitly: "*PatientHistory*, please provide ..." + 2. **Participants**: - The following ai experts can help with answering queries about the user. - {{aiAgents}} - If during the course of the conversation, information is missing, think through, who could be the best to answer it, then ask that agent explicitly for - the information by mentioning the agent. Only ask the user for plan confirmation! - When an agent has provided their input, acknowledge it and move on to the next agent in the plan. - 3. **Allow user to confirm**: When you create a plan with a step by step execution, ask the user for confirmation on the plan. If the plan changes, - inform the user and ask for confirmation again. Stick to the confirmed plan and as the plan progresses as expected, you can skip the confirmation step. - 4. **Explain the Purpose and Order**: At the beginning of the conversation, explain the plan and the expected order of participants. - Please think hard about the order of the individual agents called. For example, the current status and the historical background should be clarified - early in the discussion such that the other agents can make use of that knowledge. Treatment recommendation and research agents should be called later - in the discussion. Report creation should always happen in the end. - 5. **Role Limitation**: Remember, your role is to moderate and facilitate, not to provide clinical recommendations or treatment plans. - DON'T: Provide clinical recommendations or treatment plans. Please only call ONE agent at a time. - 6. **Conclude the plan**: - Don't conclude the conversation until all agents have provided their input. Instead, address the agents that have not yet provided their input. - When all agents have provided their input, the plan has concluded, and the user's question has been answered, summarize the response in one or two sentences. - Ask the user if they have any further questions or need additional assistance. - For follow up questions, formulate a new plan and suggest the order of participants. - + The following AI experts are available: + {{aiAgents}} + If information is missing, decide which specific agent is best positioned to supply it. Ask that agent explicitly. Only ask the user for plan confirmation or missing user-only data. + + 3. **Handle User Commands**: + On "proceed"/"continue"/confirmation, advance to the next logical step—do NOT repeat already completed agent outputs. + Avoid re-asking agents for the same data unless the user clarified new scope. + + 4. **Plan Confirmation**: + Present multi-step plans and ask the user to confirm. If the plan changes midstream, clearly restate the updated plan and request confirmation again. Once stable and progressing, you may skip reconfirmation unless scope shifts. + + 5. **Order & Sequencing**: + Early: establish history (PatientHistory) and current status (PatientStatus). + Mid: radiology/imaging insights (Radiology) if needed. + Later: ClinicalGuidelines, ClinicalTrials, MedicalResearch. + Final: ReportCreation only after upstream context has been gathered. + Do NOT call multiple agents at the same time; strictly one agent per turn. + + 6. **Role Limitation**: + You do NOT provide clinical recommendations or detailed treatment plans yourself. You orchestrate and summarize. + + 7. **Plan Conclusion**: + When all required agents have contributed and the user’s goal is satisfied, provide a succinct (1–2 sentence) summary and ask if further assistance is needed. For a follow-up question, create a new tailored plan rather than replaying the old one. + **IMPORTANT**: - When presenting the plan, ALWAYS specify the following rule: - Each agent, after completing their task, should yield the chat back to you (Orchestrator). Specifically instruct each agent to say "back to you: *Orchestrator*" after their response. + When presenting the plan, ALWAYS specify the rule: + Each agent, after completing their task, must yield the chat back to you (Orchestrator) by saying "back to you: *Orchestrator*". facilitator: true description: | - Your role is to moderate the discussion, present the order of participants, and facilitate the conversation. - + Your role is to moderate the discussion, present the agent sequence, and ensure efficient progress without repetition or unauthorized clinical advice. + - name: PatientHistory instructions: | You are an AI agent tasked with loading and presenting patient data. Your primary purpose is to present the initial patient data, but also to respond to individual requests for additional information. @@ -61,7 +100,14 @@ - Do not provide analysis or opinions on the data. - Do provide answers to questions about the patient's history and data. Use the tools at your disposal to answer those questions. 7. Yield back the chat. When requested, yield the chat back to *Orchestrator* by saying "back to you: *Orchestrator*" or "back to you: *PatientStatus*". - temperature: 0 + + Context rule - Patient Context: + - Before answering or calling tools, read the SYSTEM message that begins with "PATIENT_CONTEXT_JSON:" and use its patient_id. + - If the patient context is missing or unclear, ask the user for the patient ID and wait to proceed after it’s provided. Do not assume or invent one. + - When calling tools (e.g., load_patient_data, create_timeline), pass the patient_id from the SYSTEM patient context. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + + temperature: 0.0 tools: - name: patient_data description: | @@ -76,6 +122,12 @@ For example, you can say: "I have used the CXRReportGen model to analyze the chest x-ray. Here are the findings." You will comment on whether those findings are consistent with the patient's medical history and other data. + + Context rule - Patient Context: + - Before answering or calling tools, read the SYSTEM message that begins with "PATIENT_CONTEXT_JSON:" and use its patient_id. + - Infer indications/history from the conversation but anchor on the current patient context. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + tools: - name: cxr_report_gen description: | @@ -99,6 +151,12 @@ If this information is not available, ask PatientHistory specifically for the missing information. DO: Ask PatientHistory. EXAMPLE: "*PatientHistory*, can you provide me with the patient's #BLANK?. Try to infer the information if not available". + + Context rule - Patient Context: + - Before answering, read the SYSTEM message that begins with "PATIENT_CONTEXT_JSON:" and use its patient_id. + - If key attributes are missing, request them for the current patient (not a different one). + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + description: | A PatientStatus agent. You provide current status of a patient using. **You provide**: current status of a patient. **You need**: age, staging, primary site, histology, biomarkers, treatment history, ecog performance status. This can be obtained by PatientHistory. @@ -106,12 +164,24 @@ instructions: | You are a board-certified medical oncologist writing a treatment-plan note. You will be provided with patient information ( demographics, stage, prior therapies, biomarkers, current status ) prepared by a clinical assistant. Your task is to produce a succint "Patient Summary" and "Treatment Plan" that is (1) continuous with what the patient is already receiving, (2) explicitly addresses next-step options at progression / response (e.g., maintenance vs. switch therapy), and (3) integrates every molecular or clinical detail provided. When writing the plan start with "Continue/Initiate/Modify" and clearly state whether you are continuing an existing regimen or starting something new. Cite all relevant biomarkers and comorbidities to justify targeted drugs or trials (e.g., "MET amplification → cabozantinib"). Include follow-up diagnostics / consults that are mentioned or clinically mandatory (MRI, CA-19-9, cardiology eval, ctDNA, etc.). Provide a progression-contingency line ("If progression, consider"). List maintenance strategy when appropriate. Do not invent allergies, symptoms, or medications; if key data are absent, state "Need:" rather than guessing. Output of "Treatment Plan" should include: Primary recommendation (continue vs initiate), Rationale (biomarker / guideline), Surveillance & consults, Progression-contingency options, Maintenance / supportive care. + + Context rule - Patient Context: + - Use the current patient from the SYSTEM "PATIENT_CONTEXT_JSON:" message as the authoritative context. + - If required details are missing, request them for the current patient. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + description: | A Clinical Guidelines agent. You provide treatment recommendations. **You provide**: treatment recommendations. **You need**: patient status from PatientStatus. - name: ReportCreation instructions: | You are an AI agent that assemble tumor board word document using information previously prepared by other agents. Do not summarize the conversation or provide additional analysis. Use the full information provided by the other agents to assemble the tumor board content. You are provided a tool to export the tumor board as the content to a word doc. When user asks to create or export a word document, you must use the provided tool. + + Context rule - Patient Context: + - Assemble content for the currently active patient as defined by the SYSTEM "PATIENT_CONTEXT_JSON:" message. + - If the context is missing, ask the user to provide/confirm the patient before exporting. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + temperature: 0 tools: - name: content_export @@ -140,6 +210,12 @@ Only present clinical trials for which the patient is eligible. If follow up questions are asked, you may additionally explain why a specific trial is not suitable for the patient. Offer to present additional information about the trial to the user, at which point you can call the `display_more_information_about_a_trial` tool. + + Context rule - Patient Context: + - Use the patient from the SYSTEM "PATIENT_CONTEXT_JSON:" message when forming search criteria. + - If required attributes are missing, ask for them for the current patient. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + tools: - name: clinical_trials description: | @@ -180,6 +256,11 @@ - Source ID: 78722 [The wisdom of programming](https://www.example.com/source2) + Context rule - Patient Context: + - When a question is patient-specific, align with the current patient in the SYSTEM "PATIENT_CONTEXT_JSON:" message. + - If not patient-specific, proceed normally. + - Prefer the SYSTEM patient context over any "PC_CTX" audit lines. Do not attempt to set/switch/clear patient context yourself. + graph_rag_url: "https://ncsls.azure-api.net/" graph_rag_index_name: "nsclc-index-360MB" tools: diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/services/patient_context_analyzer.py b/src/services/patient_context_analyzer.py new file mode 100644 index 0000000..515de99 --- /dev/null +++ b/src/services/patient_context_analyzer.py @@ -0,0 +1,249 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import logging +import os +import time +from typing import Optional, Tuple + +from semantic_kernel import Kernel +from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion +from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import ( + AzureChatPromptExecutionSettings, +) +from semantic_kernel.contents import ChatHistory +from semantic_kernel.functions import kernel_function + +from data_models.patient_context_models import ( + PatientContextDecision, + AnalyzerAction, # For legacy wrapper typing +) + +logger = logging.getLogger(__name__) + + +class PatientContextAnalyzer: + """ + Patient context analyzer using Semantic Kernel structured output with JSON schema. + Produces a PatientContextDecision (action + optional patient_id + reasoning). + """ + + def __init__( + self, + deployment_name: Optional[str] = None, + token_provider=None, + api_version: Optional[str] = None, + ): + # Resolve deployment (priority: explicit arg > dedicated env var > general deployment var) + self.deployment_name = ( + deployment_name + or os.getenv("PATIENT_CONTEXT_DECIDER_DEPLOYMENT_NAME") + or os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME") + ) + if not self.deployment_name: + raise ValueError("No deployment name for patient context analyzer.") + + # Resolve API version (explicit > env > stable default) + self.api_version = api_version or os.getenv("AZURE_OPENAI_API_VERSION") or "2024-10-21" + + self._token_provider = token_provider + + logger.info( + "PatientContextAnalyzer initialized with deployment=%s api_version=%s", + self.deployment_name, + self.api_version, + ) + + # Initialize kernel + service once (lightweight) + self._kernel = Kernel() + self._kernel.add_service( + AzureChatCompletion( + service_id="patient_context_analyzer", + deployment_name=self.deployment_name, + api_version=self.api_version, + ad_token_provider=token_provider, + ) + ) + + @kernel_function( + description="Analyze user input for patient context decisions", + name="analyze_patient_context", + ) + async def analyze_decision( + self, + user_text: str, + prior_patient_id: Optional[str] = None, + known_patient_ids: Optional[list[str]] = None, + ) -> PatientContextDecision: + """ + Analyze user input and return structured patient context decision. + + Args: + user_text: The user's input message + prior_patient_id: Current active patient ID (if any) + known_patient_ids: List of known patient IDs in this session + + Returns: + PatientContextDecision + """ + if known_patient_ids is None: + known_patient_ids = [] + + if not user_text or not user_text.strip(): + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning="Empty or whitespace user input; no action needed.", + ) + + system_prompt = f"""You are a patient context analyzer for healthcare conversations. + +TASK: Analyze user input and decide the appropriate patient context action. + +AVAILABLE ACTIONS: +- NONE: No patient context needed (general questions, greetings, system commands) +- CLEAR: User wants to clear/reset all patient context +- ACTIVATE_NEW: User mentions a new patient ID not in the known patient list +- SWITCH_EXISTING: User wants to switch to a different known patient +- UNCHANGED: Continue with current patient context + +CURRENT STATE: +- Active patient ID: {prior_patient_id or "None"} +- Known patient IDs: {known_patient_ids} + +ANALYSIS RULES: +1. Extract patient_id ONLY if action is ACTIVATE_NEW or SWITCH_EXISTING +2. Patient IDs typically follow "patient_X" format or are explicit medical record numbers +3. For CLEAR/NONE/UNCHANGED actions, set patient_id to null +4. Prioritize explicit patient mentions over implicit context +5. Keep reasoning brief and specific (max 50 words) + +Respond with a structured JSON object matching the required schema.""" + + try: + chat_history = ChatHistory() + chat_history.add_system_message(system_prompt) + chat_history.add_user_message(f"User input: {user_text}") + + execution_settings = AzureChatPromptExecutionSettings( + service_id="patient_context_analyzer", + max_tokens=200, + temperature=0.1, + response_format=PatientContextDecision, # Structured JSON schema enforced + ) + + svc = self._kernel.get_service("patient_context_analyzer") + results = await svc.get_chat_message_contents( + chat_history=chat_history, + settings=execution_settings, + ) + + if not results or not results[0].content: + logger.warning("No response from patient context analyzer") + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning="No response from analyzer; defaulting to NONE.", + ) + + content = results[0].content + + # Parse string JSON or direct dict (SK provider may return either) + if isinstance(content, str): + try: + decision = PatientContextDecision.model_validate_json(content) + except Exception as e: + logger.error("Failed to parse structured response: %s", e) + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning=f"Parse error: {str(e)[:30]}...", + ) + elif isinstance(content, dict): + try: + decision = PatientContextDecision.model_validate(content) + except Exception as e: + logger.error("Failed to validate structured response: %s", e) + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning=f"Validation error: {str(e)[:30]}...", + ) + else: + logger.warning("Unexpected response type: %s", type(content)) + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning="Unexpected response format; defaulting to NONE.", + ) + + logger.info( + "Patient context decision: action=%s patient_id=%s reasoning=%s", + decision.action, + decision.patient_id, + decision.reasoning, + ) + return decision + + except Exception as e: + logger.error("Patient context analysis failed: %s", e) + return PatientContextDecision( + action="NONE", + patient_id=None, + reasoning=f"Analysis error: {str(e)[:30]}...", + ) + + async def analyze_with_timing( + self, + user_text: str, + prior_patient_id: Optional[str], + known_patient_ids: list[str], + ) -> Tuple[PatientContextDecision, float]: + """ + Analyze with timing information (backward-compat convenience wrapper). + """ + start_time = time.time() + decision = await self.analyze_decision( + user_text=user_text, + prior_patient_id=prior_patient_id, + known_patient_ids=known_patient_ids, + ) + duration = time.time() - start_time + return decision, duration + + async def analyze( + self, + user_text: str, + prior_patient_id: Optional[str], + known_patient_ids: list[str], + ) -> tuple[AnalyzerAction, Optional[str], float]: + """ + Legacy wrapper returning (action, patient_id, duration). + Prefer analyze_decision() in new code. + """ + decision, duration = await self.analyze_with_timing( + user_text, prior_patient_id, known_patient_ids + ) + return decision.action, decision.patient_id, duration + + def reset_kernel(self): + """ + Reset the kernel/service (useful when switching patients to avoid cross-contamination). + """ + try: + current_deployment = self.deployment_name + current_api_version = self.api_version + token_provider = self._token_provider + + self._kernel = Kernel() + self._kernel.add_service( + AzureChatCompletion( + service_id="patient_context_analyzer", + deployment_name=current_deployment, + api_version=current_api_version, + ad_token_provider=token_provider, + ) + ) + logger.info("Kernel reset completed for patient context isolation") + except Exception as e: + logger.warning("Error during kernel reset: %s", e) diff --git a/src/services/patient_context_service.py b/src/services/patient_context_service.py new file mode 100644 index 0000000..08609d2 --- /dev/null +++ b/src/services/patient_context_service.py @@ -0,0 +1,252 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import logging +import re +import time +from datetime import datetime, timezone +from typing import Literal +import os + +from data_models.chat_context import ChatContext, PatientContext +from data_models.patient_context_models import TimingInfo +from services.patient_context_analyzer import PatientContextAnalyzer + +logger = logging.getLogger(__name__) + +# Exported constants / types +PATIENT_CONTEXT_PREFIX = "PATIENT_CONTEXT_JSON" +PATIENT_ID_PATTERN = re.compile(os.getenv("PATIENT_ID_PATTERN", r"^patient_[0-9]+$")) +Decision = Literal[ + "NONE", + "UNCHANGED", + "NEW_BLANK", + "SWITCH_EXISTING", + "CLEAR", + "RESTORED_FROM_STORAGE", + "NEEDS_PATIENT_ID", +] + + +class PatientContextService: + """ + Registry-based patient context manager: + - Registry is authoritative patient roster + active pointer. + - Analyzer determines activation/switch/clear intent. + - Ephemeral system snapshot injection is performed by caller (routes/bots). + - Per‑patient isolation for chat history handled by caller after decision. + """ + + def __init__(self, analyzer: PatientContextAnalyzer, registry_accessor=None, context_accessor=None): + self.analyzer = analyzer + self.registry_accessor = registry_accessor + self.context_accessor = context_accessor + logger.info( + "PatientContextService initialized (registry enabled: %s)", + registry_accessor is not None, + ) + + async def _ensure_patient_contexts_from_registry(self, chat_ctx: ChatContext): + """ + Rebuild in-memory patient_contexts from the authoritative registry each turn. + Safe to call every turn (clears and repopulates). + """ + if not self.registry_accessor: + return + try: + patient_registry, _ = await self.registry_accessor.read_registry(chat_ctx.conversation_id) + chat_ctx.patient_contexts.clear() + if patient_registry: + for pid, entry in patient_registry.items(): + chat_ctx.patient_contexts[pid] = PatientContext( + patient_id=pid, + facts=entry.get("facts", {}), + ) + except Exception as e: + logger.warning("Failed to load patient contexts from registry: %s", e) + + async def decide_and_apply(self, user_text: str, chat_ctx: ChatContext) -> tuple[Decision, TimingInfo]: + """ + Analyze user input, decide patient context transition, and apply. + Flow: + 1. Hydrate registry → in-memory contexts. + 2. If no active patient, attempt silent restore (record timing if used). + 3. Always run analyzer (enables first-turn activation). + 4. Interpret analyzer action into service Decision. + 5. Perform activation / switch / clear side-effects. + 6. Return (Decision, TimingInfo). + """ + service_start = time.time() + await self._ensure_patient_contexts_from_registry(chat_ctx) + + restored = False + fallback_dur = 0.0 + if not chat_ctx.patient_id: + fb_start = time.time() + if await self._try_restore_from_storage(chat_ctx): + restored = True + fallback_dur = time.time() - fb_start + + decision_model, analyzer_dur = await self.analyzer.analyze_with_timing( + user_text=user_text, + prior_patient_id=chat_ctx.patient_id, + known_patient_ids=list(chat_ctx.patient_contexts.keys()), + ) + action = decision_model.action + pid = decision_model.patient_id + + if action == "CLEAR": + await self._archive_all_and_recreate(chat_ctx) + timing = TimingInfo( + analyzer=analyzer_dur, + storage_fallback=fallback_dur, + service=time.time() - service_start, + ) + return "CLEAR", timing + + if action in ("ACTIVATE_NEW", "SWITCH_EXISTING"): + if not pid or not PATIENT_ID_PATTERN.match(pid): + decision: Decision = "NEEDS_PATIENT_ID" + else: + decision = await self._activate_patient_with_registry(pid, chat_ctx) + elif action == "UNCHANGED": + decision = "UNCHANGED" + elif action == "NONE": + decision = "RESTORED_FROM_STORAGE" if restored and chat_ctx.patient_id else "NONE" + else: + decision = "NONE" + + timing = TimingInfo( + analyzer=analyzer_dur, + storage_fallback=fallback_dur, + service=time.time() - service_start, + ) + return decision, timing + + async def set_explicit_patient_context(self, patient_id: str, chat_ctx: ChatContext) -> bool: + """ + Explicitly set active patient (external caller / override path). + Returns True if set; False if invalid patient_id. + """ + if not patient_id or not PATIENT_ID_PATTERN.match(patient_id): + return False + + if chat_ctx.patient_id and patient_id != chat_ctx.patient_id: + self.analyzer.reset_kernel() + + await self._ensure_patient_contexts_from_registry(chat_ctx) + + if patient_id not in chat_ctx.patient_contexts: + chat_ctx.patient_contexts[patient_id] = PatientContext(patient_id=patient_id) + + chat_ctx.patient_id = patient_id + + if self.registry_accessor: + await self._update_registry_storage(chat_ctx) + return True + + async def _archive_all_and_recreate(self, chat_ctx: ChatContext) -> None: + """ + Archive session + all patient histories + registry, then clear in-memory state. + """ + if chat_ctx.patient_id: + self.analyzer.reset_kernel() + + all_patient_ids = [] + if self.registry_accessor: + try: + patient_registry, _ = await self.registry_accessor.read_registry(chat_ctx.conversation_id) + all_patient_ids = list(patient_registry.keys()) if patient_registry else [] + except Exception: + all_patient_ids = list(chat_ctx.patient_contexts.keys()) + + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") + folder = f"archive/{timestamp}" + + if self.context_accessor: + # Session + try: + await self.context_accessor.archive_to_folder(chat_ctx.conversation_id, None, folder) + except Exception as e: + logger.warning("Archive session failed: %s", e) + # Each patient + for pid in all_patient_ids: + try: + await self.context_accessor.archive_to_folder(chat_ctx.conversation_id, pid, folder) + except Exception as e: + logger.warning("Archive patient %s failed: %s", pid, e) + + if self.registry_accessor: + try: + await self.registry_accessor.archive_registry(chat_ctx.conversation_id) + except Exception as e: + logger.warning("Archive registry failed: %s", e) + + chat_ctx.patient_id = None + chat_ctx.patient_contexts.clear() + + async def _update_registry_storage(self, chat_ctx: ChatContext): + """ + Write/merge current active patient entry into registry (active pointer updated). + """ + if not (self.registry_accessor and chat_ctx.patient_id): + return + current = chat_ctx.patient_contexts.get(chat_ctx.patient_id) + if not current: + return + entry = { + "patient_id": chat_ctx.patient_id, + "facts": current.facts, + "conversation_id": chat_ctx.conversation_id, + } + try: + await self.registry_accessor.update_patient_registry( + chat_ctx.conversation_id, + chat_ctx.patient_id, + entry, + chat_ctx.patient_id, # update active pointer + ) + except Exception as e: + logger.warning("Failed registry update: %s", e) + + async def _activate_patient_with_registry(self, patient_id: str, chat_ctx: ChatContext) -> Decision: + """ + Activate or switch patient. Returns: + - UNCHANGED if already active + - SWITCH_EXISTING if switching to existing + - NEW_BLANK if creating a new patient context + """ + if chat_ctx.patient_id == patient_id: + return "UNCHANGED" + if chat_ctx.patient_id and patient_id != chat_ctx.patient_id: + self.analyzer.reset_kernel() + + await self._ensure_patient_contexts_from_registry(chat_ctx) + + if patient_id in chat_ctx.patient_contexts: + chat_ctx.patient_id = patient_id + await self._update_registry_storage(chat_ctx) + return "SWITCH_EXISTING" + + # New patient + chat_ctx.patient_contexts[patient_id] = PatientContext(patient_id=patient_id) + chat_ctx.patient_id = patient_id + await self._update_registry_storage(chat_ctx) + return "NEW_BLANK" + + async def _try_restore_from_storage(self, chat_ctx: ChatContext) -> bool: + """ + If there is no active patient in-memory, attempt to restore last active from registry. + Returns True if restored. + """ + if not self.registry_accessor: + return False + try: + patient_registry, active = await self.registry_accessor.read_registry(chat_ctx.conversation_id) + if patient_registry and active and active in patient_registry: + await self._ensure_patient_contexts_from_registry(chat_ctx) + chat_ctx.patient_id = active + return True + except Exception as e: + logger.warning("Restore from registry failed: %s", e) + return False