diff --git a/CHANGELOG.md b/CHANGELOG.md index 81c2f88..e621bcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ - Pluggable architecture for chunkers, embedders, and vector databases. - Hybrid storage with Qdrant and MongoDB. +## v0.1.1-beta.5 (2025-11-21) + ## v0.1.1-beta.4 (2025-11-20) ## v0.1.1-beta.3 (2025-10-25) diff --git a/CLAUDE.md b/CLAUDE.md index 66a27b7..2e6739e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -284,6 +284,218 @@ Documents → Extract Content → Graphiti Processing → Neo4j Graph 4. **LLM cost** – Entity extraction uses LLM calls; consider batching for large documents 5. **Phase 1 feature** – Hybrid retrieval (merging graph + vector results) is planned for Phase 2 +## Async Processing & Celery (NEW) + +### Overview + +Async processing with Celery + Redis enables: +- Non-blocking document submission (returns task_id immediately) +- Background processing of documents (entity/relationship extraction) +- Horizontal worker scaling for throughput +- Task monitoring and status tracking +- Automatic retry logic for failed tasks + +### Architecture + +``` +FastAPI Request + ↓ +GraphRAGClient.submit_add_documents_async() + ↓ +Celery Task Submission → Redis Queue "default" + ↓ +Worker Pool (multiple processes) + ↓ +Task Execution (Neo4j entity extraction) + ↓ +Results → Redis Backend + ↓ +Client polls /tasks/{task_id} for status + ↓ +Results Retrieved +``` + +### Key Components + +1. **src/insta_rag/celery_app.py** - Celery initialization + - Reads CELERY_BROKER_URL and CELERY_RESULT_BACKEND from environment + - Configures task serialization, timeouts, retry logic + - Registers task modules + +2. **src/insta_rag/tasks/graph_rag_tasks.py** - Async task definitions + - `@app.task` decorated functions + - Handles async GraphRAGClient in sync Celery context + - Auto-retry (3 times) with exponential backoff + - Returns JSON-serializable results + +3. **src/insta_rag/graph_rag/client.py** - Modified GraphRAGClient + - `submit_add_documents_async()` - Submit without waiting + - `submit_add_chunk_async()` - Submit single chunk + - Both return task_id immediately + +4. **src/insta_rag/task_monitoring.py** - Task tracking + - `get_task_status(task_id)` - Returns PENDING/STARTED/SUCCESS/FAILURE + - `get_task_result(task_id)` - Returns task results + - `get_queue_length()` - Queue depth monitoring + - `cancel_task(task_id)` - Cancel pending tasks + - Singleton pattern for single instance + +5. **src/insta_rag/worker_manager.py** - Single worker management + - `start_worker()` - Start single worker process + - `stop_worker()` - Graceful shutdown + - `is_worker_running()` - Check status + - Subprocess-based process management + +6. **src/insta_rag/worker_pool.py** - Multiple workers (horizontal scaling) + - `start_worker_pool(num_workers)` - Start N workers + - `stop_worker_pool()` - Stop all workers + - `scale_pool(target_workers)` - Dynamic scaling + - `auto_scale_if_needed()` - Auto-scale based on queue depth + - `get_pool_status()` - Worker pool metrics + - `get_queue_depth()` - Pending task count + +### Configuration Environment Variables + +```dotenv +# Redis Configuration (required for async processing) +CELERY_BROKER_URL=redis://default:password@host:6379/0 +CELERY_RESULT_BACKEND=redis://default:password@host:6379/1 + +# Redis databases: +# /0 - Task queue (broker) - holds pending tasks +# /1 - Result storage (backend) - stores task results +``` + +### Critical Implementation Details + +#### 1. Queue Configuration +- **CRITICAL**: Workers must listen to queue "default" +- Task submission uses `.delay()` which queues to "default" +- Worker startup includes `-Q default` parameter + +#### 2. load_dotenv() Ordering (CRITICAL) +In testing_api/main.py, **load_dotenv() MUST be called BEFORE importing insta_rag**: +```python +from dotenv import load_dotenv + +# CRITICAL: Load environment BEFORE imports +load_dotenv() + +# Now celery_app.py will find CELERY_BROKER_URL and CELERY_RESULT_BACKEND +from insta_rag import ... +``` + +#### 3. Security (No Hardcoded Credentials) +- src/insta_rag/celery_app.py uses `os.getenv()` with NO defaults +- src/insta_rag/core/config.py has `broker_url: Optional[str] = None` +- All credentials come from environment variables only +- Credentials in testing_api/.env, not in code + +#### 4. Task Serialization +- Tasks serialized as JSON (not pickle) +- Documents converted to dicts for transmission +- Results converted to JSON-serializable dicts +- Compatible with distributed workers + +### How It's Called from testing_api + +#### 1. API Initialization (testing_api/main.py) +```python +@app.on_event("startup") +async def startup_event(): + from insta_rag.worker_pool import start_worker_pool + start_worker_pool(num_workers=2, concurrency_per_worker=4) +``` + +#### 2. Document Submission (testing_api/graph_rag_routes.py) +```python +@router.post("/graph-rag/add-documents") +async def add_documents(documents: List[DocumentInput]): + async with GraphRAGClient() as client: + await client.initialize() + # Submit without waiting - returns task_id + task_id = await client.submit_add_documents_async(documents, "collection") + + return {"task_id": task_id, "status": "submitted"} +``` + +#### 3. Task Status Check (testing_api/graph_rag_routes.py) +```python +@router.get("/tasks/{task_id}") +async def get_task_status(task_id: str): + from insta_rag.task_monitoring import get_task_monitoring + monitor = get_task_monitoring() + status = monitor.get_task_status(task_id) + + if status == "SUCCESS": + result = monitor.get_task_result(task_id) + return {"status": status, "result": result} + + return {"status": status} +``` + +### Files Modified/Created + +**Created:** +- src/insta_rag/celery_app.py +- src/insta_rag/tasks/graph_rag_tasks.py +- src/insta_rag/task_monitoring.py +- src/insta_rag/worker_manager.py +- src/insta_rag/worker_pool.py + +**Modified:** +- src/insta_rag/graph_rag/client.py - Added async submission methods +- src/insta_rag/core/config.py - CeleryConfig with environment loading +- src/insta_rag/__init__.py - Export worker functions +- testing_api/main.py - Worker pool auto-start/stop +- testing_api/graph_rag_routes.py - Async endpoints +- testing_api/.env - Redis configuration +- pyproject.toml - Added httpcore, httpx dependencies + +### Worker Startup Command + +```bash +# Start single worker +celery -A insta_rag.celery_app worker -l debug -Q default -c 4 + +# Parameters: +# -A: Celery app module (insta_rag.celery_app) +# -l: Log level (debug, info, warning, error) +# -Q: Queue name (CRITICAL: must be "default") +# -c: Concurrency (number of parallel tasks) +``` + +### Task States and Lifecycle + +``` +Submission → PENDING → STARTED → SUCCESS / FAILURE + ↑ + (or RETRY) +``` + +- **PENDING**: Waiting in Redis queue +- **STARTED**: Worker picked up task +- **SUCCESS**: Task completed successfully +- **FAILURE**: Task failed (after retries) +- **RETRY**: Task being retried (up to 3 times) + +### Testing Async Processing + +```python +# Test task submission +task_id = await client.submit_add_documents_async(docs, "test") + +# Test status polling +monitor = get_task_monitoring() +status = monitor.get_task_status(task_id) +assert status in ["PENDING", "STARTED", "SUCCESS", "FAILURE"] + +# Test result retrieval +if status == "SUCCESS": + result = monitor.get_task_result(task_id) + assert result is not None +``` + ## Code Quality Standards ### Pre-commit Hooks (Enforced on commit) @@ -466,6 +678,7 @@ Note: Pre-commit hooks automatically manage `uv.lock` and `requirements.txt`. - ✅ **Vector RAG** – Semantic chunking, hybrid retrieval, HyDE, reranking - ✅ **Graph RAG** – Knowledge graph-based retrieval with Neo4j and Graphiti (Phase 1) - ✅ **Hybrid Storage** – Qdrant for vectors, MongoDB for content +- ✅ **Async Processing** – Celery + Redis for non-blocking document ingestion and horizontal scaling ### Roadmap (Phase 2+) - Graph RAG scoring – Implement semantic similarity + BM25 scoring for edges @@ -475,6 +688,8 @@ Note: Pre-commit hooks automatically manage `uv.lock` and `requirements.txt`. - CLI GA – General availability of command-line interface - LangChain/LlamaIndex adapters – Deep integration with popular frameworks - Streaming/tracing hooks – OpenTelemetry support, real-time monitoring +- Task persistence – Save task state to database for recovery +- Advanced scheduling – Cron jobs and delayed task execution ### Known Limitations diff --git a/GRAPH_RAG_CELERY_IMPLEMENTATION_GUIDE.md b/GRAPH_RAG_CELERY_IMPLEMENTATION_GUIDE.md new file mode 100644 index 0000000..10b226a --- /dev/null +++ b/GRAPH_RAG_CELERY_IMPLEMENTATION_GUIDE.md @@ -0,0 +1,1269 @@ +# Graph RAG + Celery Implementation Guide + +## Complete Guide to Building Scalable Knowledge Graph Ingestion + +This guide explains how to implement **Graph RAG with Celery** for production-grade, scalable document processing and knowledge graph construction. + +**Table of Contents:** +1. [Overview](#overview) +2. [Architecture](#architecture) +3. [Prerequisites](#prerequisites) +4. [Complete Setup Guide](#complete-setup-guide) +5. [Implementation Patterns](#implementation-patterns) +6. [Production Deployment](#production-deployment) +7. [Troubleshooting](#troubleshooting) +8. [Best Practices](#best-practices) + +--- + +## Overview + +### What is Graph RAG + Celery? + +**Graph RAG** extracts entities and relationships from documents and stores them in a Neo4j knowledge graph. **Celery** with **Redis** enables non-blocking async processing, allowing you to: + +- Submit documents and return immediately with task IDs +- Process documents in the background without blocking your API +- Scale horizontally by running multiple workers +- Monitor task progress in real-time +- Automatically retry failed tasks + +### When to Use This Pattern + +✅ **Use Graph RAG + Celery if you:** +- Need to process large documents (long entity extraction time) +- Want responsive APIs that don't block on processing +- Need to scale document ingestion horizontally +- Want to monitor individual task progress +- Need automatic retry logic for failed extractions + +❌ **Don't use if you:** +- Processing documents is very fast (< 1 second) +- You can afford to block the API during processing +- You don't need task monitoring or retries +- Your deployment is single-machine only + +--- + +## Architecture + +### Component Diagram + +``` +┌─────────────────────────────────────────────────────────────┐ +│ FastAPI Server │ +├─────────────────────────────────────────────────────────────┤ +│ POST /graph-rag/add-documents │ +│ ├─ GraphRAGClient.submit_add_documents_async() │ +│ └─ Return: { task_id: "abc-123" } │ +│ │ +│ GET /tasks/{task_id} │ +│ └─ TaskMonitoring.get_task_status() │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Redis (Broker) │ +├─────────────────────────────────────────────────────────────┤ +│ Database /0: Task Queue "default" │ +│ Database /1: Result Storage (task results) │ +│ │ +│ Message: { │ +│ "task": "insta_rag.tasks.add_documents_task", │ +│ "args": [documents_list, "collection_name"], │ +│ "task_id": "abc-123" │ +│ } │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Celery Worker Pool (Processes) │ +├─────────────────────────────────────────────────────────────┤ +│ Worker-0 │ Worker-1 │ Worker-2 │ +│ (PID: xxx)│ (PID: yyy)│ (PID: zzz) │ +│ │ +│ 1. Poll Redis "default" queue │ +│ 2. Pick up task message │ +│ 3. Deserialize documents from JSON │ +│ 4. Call add_documents_task(documents, collection) │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ GraphRAGClient (Async Context) │ +├─────────────────────────────────────────────────────────────┤ +│ await client.initialize() → Neo4j connection │ +│ await client.add_documents() → Entity extraction │ +│ ├─ Convert documents to Graphiti episodes │ +│ ├─ Call LLM for entity extraction │ +│ ├─ Call LLM for relationship extraction │ +│ └─ Store in Neo4j graph │ +│ return result │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Neo4j Graph Database │ +├─────────────────────────────────────────────────────────────┤ +│ Nodes: [Alice, TechCorp, Engineer, ...] │ +│ Edges: [works_at, builds, builds_for, ...] │ +│ │ +│ Sample node: { │ +│ name: "Alice", │ +│ labels: ["Person"], │ +│ properties: { job_title: "Engineer" } │ +│ } │ +│ │ +│ Sample edge: { │ +│ fact: "Alice works at TechCorp", │ +│ relationship_type: "works_at", │ +│ source: Alice_uuid, │ +│ target: TechCorp_uuid │ +│ } │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Task Result Stored in Redis Backend │ +├─────────────────────────────────────────────────────────────┤ +│ Key: "celery-task-result-abc-123" │ +│ Value: { │ +│ "status": "success", │ +│ "data": { │ +│ "nodes_created": 5, │ +│ "edges_created": 3, │ +│ "collection_name": "company" │ +│ } │ +│ } │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Request/Response Flow + +**Step 1: Client Submits Documents** +``` +POST /graph-rag/add-documents +Body: { + "documents": [ + { "text": "Alice works at TechCorp as an engineer" }, + { "text": "TechCorp builds AI products" } + ] +} + +Response: +{ + "task_id": "3f7a8c2d-1b9f-4e5c-9a1d-2c8f5e9a1b3d", + "status": "submitted" +} +(API returns immediately - no waiting!) +``` + +**Step 2: API Endpoint Logic** +```python +async def add_documents(documents: List[DocumentInput]): + async with GraphRAGClient() as client: + await client.initialize() + + # Submit to Celery (non-blocking) + task_id = await client.submit_add_documents_async( + documents, + collection_name="documents" + ) + + # Return immediately with task_id + return {"task_id": task_id, "status": "submitted"} +``` + +**Step 3: Documents Serialized and Queued** +```python +# GraphRAGClient.submit_add_documents_async() +doc_dicts = [doc.model_dump() for doc in documents] + +# Call Celery task with .delay() (non-blocking) +task = add_documents_task.delay(doc_dicts, "documents") + +return task.id # Return task_id immediately +``` + +**Step 4: Task Placed in Redis Queue** +``` +Redis Queue "default": + [ + { + "task": "insta_rag.tasks.add_documents_task", + "args": [[{...documents...}], "documents"], + "id": "3f7a8c2d-1b9f-4e5c-9a1d-2c8f5e9a1b3d" + } + ] +``` + +**Step 5: Worker Picks Up Task** +``` +Worker-0: + 1. Detect new message in Redis queue "default" + 2. Deserialize: { documents: [...], collection_name: "documents" } + 3. Call: add_documents_task([...], "documents") + 4. Status → STARTED (in Redis backend) +``` + +**Step 6: Task Execution (Neo4j Operations)** +```python +# add_documents_task (sync Celery task) +async def _add_documents_async(documents, collection_name): + async with GraphRAGClient() as client: + await client.initialize() + + # Neo4j entity extraction via Graphiti + result = await client.add_documents(documents, collection_name) + + return result # { nodes_created: 5, edges_created: 3 } + +# Celery task runs async code in event loop +result = loop.run_until_complete(_add_documents_async(...)) +return {"status": "success", "data": result} +``` + +**Step 7: Results Stored in Redis** +``` +Redis Backend Database /1: + celery-task-result-3f7a8c2d-1b9f-4e5c-9a1d-2c8f5e9a1b3d: + { + "status": "success", + "data": { + "nodes_created": 5, + "edges_created": 3, + "collection_name": "company" + } + } + + Task status changed to: SUCCESS +``` + +**Step 8: Client Polls Task Status** +``` +GET /tasks/3f7a8c2d-1b9f-4e5c-9a1d-2c8f5e9a1b3d + +Response: +{ + "task_id": "3f7a8c2d-1b9f-4e5c-9a1d-2c8f5e9a1b3d", + "status": "SUCCESS", + "result": { + "nodes_created": 5, + "edges_created": 3, + "collection_name": "company" + } +} +``` + +--- + +## Prerequisites + +### Required Services + +1. **Redis** (Message Broker + Result Backend) + ```bash + # Docker + docker run -d -p 6379:6379 redis:7 + + # Or install locally + brew install redis # macOS + sudo apt-get install redis-server # Ubuntu + ``` + +2. **Neo4j** (Graph Database) + ```bash + # Docker + docker run -d -p 7687:7687 -e NEO4J_AUTH=neo4j/password neo4j:5 + + # Or use managed: Aura Cloud, AuraDB, etc. + ``` + +### Python Dependencies + +```bash +# Core async processing +celery>=5.3.0 +redis>=4.5.0 + +# HTTP clients +httpcore>=1.0.0 +httpx>=0.24.0 + +# Already in insta_rag +graphiti-core>=0.1.0 # Entity extraction +qdrant-client>=1.7.0 # Vector DB (optional) +openai>=1.12.0 # LLM calls +pydantic>=2.5.0 # Data validation +``` + +### Environment Setup + +```bash +# 1. Install package with all dependencies +uv pip install -e . --group dev + +# 2. Create .env file with credentials +cat > testing_api/.env << 'EOF' +# Redis Configuration +CELERY_BROKER_URL=redis://default:password@localhost:6379/0 +CELERY_RESULT_BACKEND=redis://default:password@localhost:6379/1 + +# Neo4j Configuration +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=password +NEO4J_DATABASE=neo4j + +# LLM Configuration (for entity extraction) +GRAPHITI_LLM_MODEL=gpt-4.1 +GRAPHITI_EMBEDDING_MODEL=text-embedding-3-large +AZURE_OPENAI_API_KEY=your_key +AZURE_OPENAI_ENDPOINT=your_endpoint + +# Vector DB (optional) +QDRANT_URL=https://your-qdrant:6333 +QDRANT_API_KEY=your_key +EOF + +# 3. Verify connectivity +redis-cli ping # Should return PONG +cypher-shell -u neo4j -p password # Should connect +``` + +--- + +## Complete Setup Guide + +### Phase 1: Local Development Setup + +#### Step 1: Start Infrastructure + +```bash +# Terminal 1: Redis +docker run -d -p 6379:6379 --name redis redis:7 + +# Terminal 2: Neo4j +docker run -d -p 7687:7687 \ + -e NEO4J_AUTH=neo4j/password \ + --name neo4j neo4j:5 + +# Terminal 3: Verify connectivity +redis-cli ping +# PONG + +# Check Neo4j +curl -u neo4j:password -X GET http://localhost:7474/db/data/ +``` + +#### Step 2: Start Celery Workers + +```bash +# Terminal 4: Start single worker +celery -A insta_rag.celery_app worker -l debug -Q default -c 4 + +# Or start worker pool (multiple workers) +python3 << 'EOF' +from insta_rag import start_worker_pool +start_worker_pool(num_workers=2, concurrency_per_worker=4) + +# Keep running... +input("Workers started. Press Enter to stop...") +EOF +``` + +#### Step 3: Start FastAPI Server + +```bash +# Terminal 5: Start API +cd testing_api +uvicorn main:app --reload --host 0.0.0.0 --port 8000 + +# Swagger available at: http://localhost:8000/docs +``` + +#### Step 4: Test End-to-End + +```bash +# Terminal 6: Test the flow +python3 << 'EOF' +import asyncio +import httpx +import time + +async def test_async_flow(): + client = httpx.AsyncClient() + + # 1. Submit documents + response = await client.post( + "http://localhost:8000/graph-rag/add-documents", + json={ + "documents": [ + {"text": "Alice works at TechCorp as a Senior Engineer"}, + {"text": "TechCorp builds AI products for enterprises"} + ] + } + ) + + task_id = response.json()["task_id"] + print(f"✓ Task submitted: {task_id}") + + # 2. Poll task status + for i in range(30): # Poll for up to 30 seconds + response = await client.get(f"http://localhost:8000/tasks/{task_id}") + task = response.json() + + print(f" Status: {task['status']}") + + if task["status"] == "SUCCESS": + print(f"✓ Task completed!") + print(f" Result: {task.get('result')}") + break + + await asyncio.sleep(1) + else: + print("✗ Task didn't complete in 30 seconds") + +asyncio.run(test_async_flow()) +EOF +``` + +### Phase 2: Production Deployment + +#### Option A: Docker Compose + +```yaml +# docker-compose.yml +version: '3.8' + +services: + redis: + image: redis:7 + ports: + - "6379:6379" + volumes: + - redis_data:/data + environment: + - REDIS_PASSWORD=your_secure_password + + neo4j: + image: neo4j:5 + ports: + - "7687:7687" + - "7474:7474" + environment: + - NEO4J_AUTH=neo4j/your_secure_password + volumes: + - neo4j_data:/var/lib/neo4j/data + + api: + build: . + ports: + - "8000:8000" + depends_on: + - redis + - neo4j + environment: + - CELERY_BROKER_URL=redis://:your_secure_password@redis:6379/0 + - CELERY_RESULT_BACKEND=redis://:your_secure_password@redis:6379/1 + - NEO4J_URI=bolt://neo4j:7687 + - NEO4J_USER=neo4j + - NEO4J_PASSWORD=your_secure_password + command: uvicorn testing_api.main:app --host 0.0.0.0 --port 8000 + + worker: + build: . + depends_on: + - redis + - neo4j + environment: + - CELERY_BROKER_URL=redis://:your_secure_password@redis:6379/0 + - CELERY_RESULT_BACKEND=redis://:your_secure_password@redis:6379/1 + - NEO4J_URI=bolt://neo4j:7687 + - NEO4J_USER=neo4j + - NEO4J_PASSWORD=your_secure_password + command: celery -A insta_rag.celery_app worker -l info -Q default -c 4 + scale: 2 # Run 2 workers + +volumes: + redis_data: + neo4j_data: +``` + +Start with: `docker-compose up` + +#### Option B: Kubernetes + +```yaml +# kubernetes/celery-deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: celery-worker +spec: + replicas: 3 # 3 workers + selector: + matchLabels: + app: celery-worker + template: + metadata: + labels: + app: celery-worker + spec: + containers: + - name: worker + image: your-registry/insta-rag:latest + command: + - celery + - -A + - insta_rag.celery_app + - worker + - -l + - info + - -Q + - default + - -c + - "4" + env: + - name: CELERY_BROKER_URL + valueFrom: + secretKeyRef: + name: app-secrets + key: celery-broker-url + - name: CELERY_RESULT_BACKEND + valueFrom: + secretKeyRef: + name: app-secrets + key: celery-result-backend + - name: NEO4J_URI + valueFrom: + secretKeyRef: + name: app-secrets + key: neo4j-uri + resources: + limits: + memory: "2Gi" + cpu: "1000m" + requests: + memory: "1Gi" + cpu: "500m" +``` + +--- + +## Implementation Patterns + +### Pattern 1: Simple Non-Blocking Ingestion + +**Use Case:** Submit documents and get task ID immediately + +```python +@app.post("/ingest") +async def ingest_documents(documents: List[DocumentInput]): + """Submit documents for async processing.""" + async with GraphRAGClient() as client: + await client.initialize() + task_id = await client.submit_add_documents_async( + documents, + collection_name="docs" + ) + + return { + "task_id": task_id, + "status": "queued", + "message": f"Processing {len(documents)} documents" + } +``` + +**Client Usage:** +```python +# Get task ID immediately +response = await client.post("/ingest", json={"documents": [...]}) +task_id = response.json()["task_id"] + +# Check status later +while True: + result = await client.get(f"/tasks/{task_id}") + if result.json()["status"] == "SUCCESS": + print(result.json()["result"]) + break + await asyncio.sleep(1) +``` + +### Pattern 2: Batch Processing with Progress + +**Use Case:** Process many documents in batches with progress tracking + +```python +@app.post("/batch-ingest") +async def batch_ingest(batch: BatchRequest): + """Submit batch of documents for processing.""" + task_ids = [] + + # Submit each document as separate task for progress tracking + async with GraphRAGClient() as client: + await client.initialize() + + for doc in batch.documents: + task_id = await client.submit_add_documents_async( + [doc], + collection_name=batch.collection_name + ) + task_ids.append(task_id) + + return { + "batch_id": str(uuid.uuid4()), + "task_ids": task_ids, + "total": len(task_ids), + "status": "processing" + } + + +@app.get("/batch/{batch_id}") +async def get_batch_status(batch_id: str, task_ids: List[str]): + """Get status of all tasks in batch.""" + monitor = get_task_monitoring() + + completed = 0 + failed = 0 + pending = 0 + + results = {} + for task_id in task_ids: + status = monitor.get_task_status(task_id) + results[task_id] = status + + if status == "SUCCESS": + completed += 1 + elif status == "FAILURE": + failed += 1 + else: + pending += 1 + + return { + "batch_id": batch_id, + "total": len(task_ids), + "completed": completed, + "failed": failed, + "pending": pending, + "tasks": results + } +``` + +### Pattern 3: Auto-Scaling Based on Queue Depth + +**Use Case:** Automatically scale workers based on task volume + +```python +# In main.py startup +import asyncio +from apscheduler.schedulers.background import BackgroundScheduler + +scheduler = BackgroundScheduler() + +def check_queue_and_scale(): + """Auto-scale workers based on queue depth.""" + from insta_rag.task_monitoring import get_task_monitoring + from insta_rag.worker_pool import auto_scale_if_needed + + monitor = get_task_monitoring() + queue_depth = monitor.get_queue_length() + + print(f"Queue depth: {queue_depth}") + + # Scale based on queue depth + auto_scale_if_needed( + queue_depth_threshold=10, # If > 10 tasks, add workers + min_workers=1, + max_workers=8 + ) + +scheduler.add_job( + check_queue_and_scale, + trigger="interval", + seconds=30 # Check every 30 seconds +) + +@app.on_event("startup") +async def startup(): + scheduler.start() + start_worker_pool(num_workers=2, concurrency_per_worker=4) + +@app.on_event("shutdown") +async def shutdown(): + scheduler.shutdown() + stop_worker_pool() +``` + +### Pattern 4: Priority Queues + +**Use Case:** Process high-priority documents faster + +```python +# Multiple queues based on priority +# celery_app.py + +app.conf.update( + task_routes={ + 'insta_rag.tasks.add_documents_task': {'queue': 'default'}, + } +) + +# Start workers for different queues +# Worker 1 (high priority): celery -A insta_rag.celery_app worker -Q priority +# Worker 2 (default): celery -A insta_rag.celery_app worker -Q default + +# Submit to priority queue +@app.post("/ingest/priority") +async def ingest_priority(documents: List[DocumentInput]): + """Submit high-priority documents.""" + task = add_documents_task.apply_async( + args=[doc_dicts, collection_name], + queue='priority' + ) + return {"task_id": task.id} +``` + +### Pattern 5: Scheduled/Delayed Processing + +**Use Case:** Process documents at scheduled time + +```python +@app.post("/schedule-ingest") +async def schedule_ingest( + documents: List[DocumentInput], + delay_seconds: int +): + """Schedule documents for processing after delay.""" + async with GraphRAGClient() as client: + await client.initialize() + doc_dicts = [doc.model_dump() for doc in documents] + + # Schedule for processing after delay + task = add_documents_task.apply_async( + args=[doc_dicts, "documents"], + countdown=delay_seconds # Delay in seconds + ) + + return { + "task_id": task.id, + "scheduled_in_seconds": delay_seconds + } +``` + +--- + +## Production Deployment + +### Security Checklist + +- [ ] **Redis Authentication** + ``` + CELERY_BROKER_URL=redis://:your_strong_password@host:6379/0 + ``` + +- [ ] **Redis TLS/SSL** + ``` + CELERY_BROKER_URL=rediss://:password@host:6379/0 + CELERY_ACCEPT_CONTENT=['json'] # Don't use pickle + ``` + +- [ ] **Neo4j Authentication** + ``` + NEO4J_URI=bolt://neo4j:password@host:7687 + ``` + +- [ ] **Network Isolation** + - Redis not exposed to internet + - Neo4j only accessible from worker pods + - Only API exposed publicly + +- [ ] **Rate Limiting** + ```python + from slowapi import Limiter + from slowapi.util import get_remote_address + + limiter = Limiter(key_func=get_remote_address) + + @app.post("/ingest") + @limiter.limit("10/minute") # Max 10 requests per minute + async def ingest_documents(request: Request, ...): + pass + ``` + +- [ ] **Input Validation** + ```python + class DocumentInput(BaseModel): + text: str = Field(..., max_length=1000000) # Max 1MB + metadata: Optional[Dict] = Field(default={}, max_items=10) + ``` + +- [ ] **Monitoring & Alerts** + ```python + # Monitor task failures + @app.get("/health/tasks") + async def task_health(): + monitor = get_task_monitoring() + failed_count = len(monitor.get_failed_tasks()) + + if failed_count > 10: + # Alert! + send_alert(f"Too many failed tasks: {failed_count}") + + return {"failed_tasks": failed_count} + ``` + +### Performance Tuning + +**Redis Configuration:** +```ini +# redis.conf +maxmemory 4gb +maxmemory-policy allkeys-lru +appendonly yes # Persistence +``` + +**Celery Configuration:** +```python +# Tune for your workload +app.conf.update( + # Prefetch tasks + worker_prefetch_multiplier=1, # Don't prefetch (better for long tasks) + + # Worker recycling + worker_max_tasks_per_child=1000, # Recycle after 1000 tasks + + # Task timeouts + task_soft_time_limit=3600, # 1 hour soft limit + task_time_limit=7200, # 2 hour hard limit + + # Result expiration + result_expires=86400, # Keep results for 24 hours + + # Concurrency + worker_concurrency=4, # Depends on CPU/memory +) +``` + +**Neo4j Performance:** +```cypher +-- Create indices for faster lookups +CREATE INDEX FOR (n:GraphNode) ON (n.name); +CREATE INDEX FOR (n:GraphNode) ON (n.group_id); +CREATE INDEX FOR (e:GraphEdge) ON (e.relationship_type); + +-- Monitor slowest queries +PROFILE MATCH (n:GraphNode)-[:works_at]->(c) RETURN n, c; +``` + +### Monitoring & Observability + +**Prometheus Metrics:** +```python +from prometheus_client import Counter, Histogram, Gauge + +task_submitted = Counter('celery_task_submitted', 'Tasks submitted', ['task_name']) +task_duration = Histogram('celery_task_duration', 'Task duration', ['task_name']) +worker_count = Gauge('celery_worker_count', 'Active workers') +queue_depth = Gauge('celery_queue_depth', 'Pending tasks') + +@app.get("/metrics") +async def metrics(): + from prometheus_client import generate_latest + return Response(generate_latest(), media_type="text/plain") +``` + +**Logging:** +```python +import logging +from pythonjsonlogger import jsonlogger + +logger = logging.getLogger() +logHandler = logging.StreamHandler() +formatter = jsonlogger.JsonFormatter() +logHandler.setFormatter(formatter) +logger.addHandler(logHandler) + +logger.info("Task started", extra={ + "task_id": task_id, + "collection": collection_name, + "document_count": len(documents) +}) +``` + +--- + +## Troubleshooting + +### Common Issues + +#### Issue 1: Tasks Always PENDING + +**Symptoms:** Tasks submitted but never processed (status stays PENDING) + +**Causes:** +1. Workers not listening to correct queue +2. Redis not connected +3. Worker not started + +**Solution:** +```bash +# Check workers are running on "default" queue +celery -A insta_rag.celery_app inspect active_queues + +# Should show: +# worker1@hostname: - default + +# If not showing, restart with -Q default +celery -A insta_rag.celery_app worker -l debug -Q default + +# Check Redis connectivity +redis-cli ping +# PONG +``` + +#### Issue 2: "ConnectionError: No Redis" + +**Symptoms:** Worker crashes with "Error: No broker" + +**Cause:** CELERY_BROKER_URL environment variable not set + +**Solution:** +```bash +# Check environment variable +echo $CELERY_BROKER_URL +# Should print Redis URL + +# If empty, load from .env +source .env +export CELERY_BROKER_URL + +# Or verify .env is in correct location +ls testing_api/.env +``` + +#### Issue 3: Load_dotenv() Not Loading + +**Symptoms:** Redis credentials from .env not found + +**Cause:** load_dotenv() called AFTER imports + +**Solution:** +```python +# testing_api/main.py - CORRECT ORDER + +from dotenv import load_dotenv + +# Call FIRST - before any insta_rag imports +load_dotenv() + +# Now these imports will find environment variables +from insta_rag import ... +from insta_rag.celery_app import app +``` + +#### Issue 4: Memory Growing Unbounded + +**Symptoms:** Worker memory usage increases over time + +**Cause:** Results not expiring from Redis + +**Solution:** +```python +# celery_app.py + +app.conf.update( + result_expires=3600, # Results expire after 1 hour (not 24) + worker_max_tasks_per_child=500, # Recycle worker frequently +) + +# Or manually clean old results +python3 << 'EOF' +import redis +from datetime import datetime, timedelta + +r = redis.Redis.from_url(os.getenv('CELERY_RESULT_BACKEND')) + +# Delete results older than 1 hour +for key in r.scan_iter(match='celery-task-result-*'): + if r.ttl(key) == -1: # No TTL set + r.delete(key) +EOF +``` + +#### Issue 5: Worker Crashes on Large Documents + +**Symptoms:** Worker process dies when processing large documents + +**Cause:** Memory limit hit during entity extraction + +**Solution:** +```python +# Increase worker memory limit +celery -A insta_rag.celery_app worker \ + --max-memory-per-child 500000 \ # 500MB + -l debug -Q default -c 2 + +# Or batch large documents +documents_batches = [ + documents[i:i+10] + for i in range(0, len(documents), 10) +] + +for batch in documents_batches: + task_id = await client.submit_add_documents_async(batch, collection) + # Wait for completion before next batch +``` + +--- + +## Best Practices + +### 1. Always Use Async Context Manager + +```python +# ✅ GOOD +async with GraphRAGClient() as client: + await client.initialize() + task_id = await client.submit_add_documents_async(docs, collection) + +# ❌ BAD +client = GraphRAGClient() +await client.initialize() +task_id = await client.submit_add_documents_async(docs, collection) +# No cleanup! +``` + +### 2. Monitor Task Progress + +```python +# ✅ GOOD - Store task IDs and check progress +async def submit_batch(): + task_ids = [] + for doc in documents: + task_id = await client.submit_add_documents_async([doc], collection) + task_ids.append(task_id) + + # Log task + logger.info(f"Submitted task {task_id}") + + return task_ids + +# ❌ BAD - Fire and forget +task_id = await client.submit_add_documents_async(documents, collection) +# No tracking of success/failure +``` + +### 3. Handle Failures Gracefully + +```python +# ✅ GOOD - Check status and retry +monitor = get_task_monitoring() +status = monitor.get_task_status(task_id) + +if status == "FAILURE": + logger.error(f"Task {task_id} failed, retrying...") + # Resubmit + new_task_id = await client.submit_add_documents_async(docs, collection) + +# ❌ BAD - Assume success +# No failure handling +``` + +### 4. Limit Concurrency Per Task + +```python +# ✅ GOOD - Reasonable limits +# Worker with concurrency=2 means max 2 tasks in parallel +celery -A insta_rag.celery_app worker -c 2 + +# Prevents memory exhaustion + +# ❌ BAD - Too high +celery -A insta_rag.celery_app worker -c 100 +# Will crash with OOM +``` + +### 5. Use Appropriate Timeouts + +```python +# ✅ GOOD - Reasonable timeout for entity extraction +app.conf.update( + task_soft_time_limit=3600, # 1 hour + task_time_limit=7200, # 2 hours +) + +# ❌ BAD - Too short +app.conf.update( + task_time_limit=10, # 10 seconds - not enough for LLM calls +) +``` + +### 6. Log Comprehensively + +```python +# ✅ GOOD - Log all stages +logger.info("Task submitted", extra={"task_id": task_id}) +logger.debug("Processing started", extra={"task_id": task_id}) +logger.info("Task completed", extra={ + "task_id": task_id, + "nodes_created": result['nodes_created'], + "edges_created": result['edges_created'] +}) + +# ❌ BAD - No logging +# Can't debug issues +``` + +### 7. Clean Up Old Results + +```python +# ✅ GOOD - Regular cleanup +@app.on_event("startup") +async def cleanup_task(): + # Clean old results every hour + async def cleanup_job(): + while True: + monitor = get_task_monitoring() + # Results expire automatically based on result_expires config + await asyncio.sleep(3600) + + asyncio.create_task(cleanup_job()) + +# ❌ BAD - Results pile up forever +# Redis memory keeps growing +``` + +--- + +## Complete Example: End-to-End Implementation + +```python +# testing_api/main.py - Complete setup + +from fastapi import FastAPI, HTTPException +from dotenv import load_dotenv +import asyncio +import logging + +# CRITICAL: Load environment BEFORE imports +load_dotenv() + +from insta_rag import ( + DocumentInput, + start_worker_pool, + stop_worker_pool, + GraphRAGClient +) +from insta_rag.task_monitoring import get_task_monitoring + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = FastAPI(title="Graph RAG API") + +# Auto-start/stop workers +@app.on_event("startup") +async def startup_event(): + logger.info("Starting worker pool...") + start_worker_pool(num_workers=2, concurrency_per_worker=4) + logger.info("✓ Worker pool started") + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("Stopping worker pool...") + stop_worker_pool() + logger.info("✓ Worker pool stopped") + +# Submit documents asynchronously +@app.post("/graph-rag/add-documents") +async def add_documents(documents: list[DocumentInput]): + """Submit documents for async processing.""" + try: + async with GraphRAGClient() as client: + await client.initialize() + + task_id = await client.submit_add_documents_async( + documents, + collection_name="documents" + ) + + logger.info(f"Task submitted: {task_id}") + + return { + "task_id": task_id, + "status": "submitted", + "message": f"Processing {len(documents)} document(s)" + } + + except Exception as e: + logger.error(f"Error submitting task: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +# Check task status +@app.get("/tasks/{task_id}") +async def get_task_status(task_id: str): + """Get status and results of a task.""" + try: + monitor = get_task_monitoring() + status = monitor.get_task_status(task_id) + + response = { + "task_id": task_id, + "status": status + } + + if status == "SUCCESS": + result = monitor.get_task_result(task_id) + response["result"] = result + elif status == "FAILURE": + error = monitor.get_task_result(task_id) + response["error"] = str(error) + + return response + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Health check +@app.get("/health") +async def health(): + """Health check endpoint.""" + try: + monitor = get_task_monitoring() + queue_depth = monitor.get_queue_length() + + return { + "status": "healthy", + "queue_depth": queue_depth + } + except Exception as e: + return {"status": "unhealthy", "error": str(e)}, 503 + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) +``` + +--- + +## Summary + +**Key Takeaways:** + +1. **Graph RAG** extracts entities/relationships → Neo4j +2. **Celery** handles async task processing +3. **Redis** stores task queue and results +4. **Workers** process documents in background +5. **Client** gets task ID immediately, polls for results + +**Critical Steps:** +- ✅ Load environment BEFORE imports +- ✅ Start workers on "default" queue +- ✅ Use async context manager for GraphRAGClient +- ✅ Store credentials in .env, not code +- ✅ Monitor queue depth and worker health + +**Next Steps:** +1. Follow "Complete Setup Guide" to get running locally +2. Deploy with Docker Compose or Kubernetes +3. Implement monitoring and alerting +4. Scale workers based on load +5. Iterate on entity extraction quality + diff --git a/README.md b/README.md index 08ce6e7..4ad7f57 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,173 @@ async def hybrid_retrieval(): --- +## Async Document Processing (NEW) + +**Async Processing with Celery** allows you to submit documents for Graph RAG processing without blocking your API, enabling horizontal scaling and better resource management. + +### When to Use Async Processing + +- Processing large documents that take long to extract entities/relationships +- Building responsive APIs that should return immediately with task IDs +- Scaling horizontally with multiple workers +- Monitoring task progress in real-time +- Retrying failed tasks automatically + +### Quick Start with Celery + +#### 1) Configure Redis + +Add to `.env`: + +```dotenv +# Redis Configuration (for Celery async task processing) +CELERY_BROKER_URL=redis://default:your_password@your_host:6379/0 +CELERY_RESULT_BACKEND=redis://default:your_password@your_host:6379/1 +``` + +#### 2) Start Workers + +```bash +# Single worker with 4 concurrent tasks +celery -A insta_rag.celery_app worker -l debug -Q default -c 4 + +# Or use the library function to start multiple workers +python3 -c "from insta_rag import start_worker_pool; start_worker_pool(num_workers=2, concurrency_per_worker=4)" +``` + +#### 3) Submit Documents Asynchronously + +```python +import asyncio +from insta_rag.graph_rag import GraphRAGClient +from insta_rag import DocumentInput + +async def main(): + async with GraphRAGClient() as client: + await client.initialize() + + docs = [DocumentInput.from_text("Alice works at TechCorp")] + + # Submit without waiting (returns immediately with task_id) + task_id = await client.submit_add_documents_async( + docs, + collection_name="company" + ) + + print(f"Task submitted: {task_id}") + + # Check status anytime + from insta_rag.task_monitoring import get_task_monitoring + monitor = get_task_monitoring() + status = monitor.get_task_status(task_id) + + print(f"Task status: {status}") # PENDING, STARTED, SUCCESS, or FAILURE + +asyncio.run(main()) +``` + +#### 4) Monitor Tasks + +```python +from insta_rag.task_monitoring import get_task_monitoring + +monitor = get_task_monitoring() + +# Get task status +status = monitor.get_task_status(task_id) + +# Get task result (when complete) +if status == "SUCCESS": + result = monitor.get_task_result(task_id) + print(result) + +# Get queue depth +queue_length = monitor.get_queue_length() +print(f"Pending tasks: {queue_length}") +``` + +#### 5) Scale Workers Horizontally + +```python +from insta_rag import scale_pool, get_pool_status, auto_scale_if_needed + +# Scale to specific number +scale_pool(target_workers=4) + +# Get pool status +status = get_pool_status() +print(f"Active workers: {status['active_workers']}") + +# Auto-scale based on queue depth +auto_scale_if_needed(queue_depth_threshold=10, min_workers=1, max_workers=8) +``` + +### FastAPI Integration + +```python +from fastapi import FastAPI +from insta_rag import start_worker_pool, stop_worker_pool, DocumentInput +from insta_rag.graph_rag import GraphRAGClient +from insta_rag.task_monitoring import get_task_monitoring + +app = FastAPI() + +@app.on_event("startup") +async def startup(): + # Auto-start worker pool + start_worker_pool(num_workers=2, concurrency_per_worker=4) + +@app.on_event("shutdown") +async def shutdown(): + # Auto-stop worker pool + stop_worker_pool() + +@app.post("/graph-rag/add-documents") +async def add_documents(documents: list[DocumentInput]): + """Submit documents for async processing (non-blocking).""" + async with GraphRAGClient() as client: + await client.initialize() + task_id = await client.submit_add_documents_async( + documents, + collection_name="documents" + ) + + return {"task_id": task_id, "status": "submitted"} + +@app.get("/tasks/{task_id}") +async def get_task_status(task_id: str): + """Get status and results of a task.""" + monitor = get_task_monitoring() + status = monitor.get_task_status(task_id) + + response = {"task_id": task_id, "status": status} + + if status == "SUCCESS": + response["result"] = monitor.get_task_result(task_id) + + return response +``` + +### Architecture + +``` +Document Submission (FastAPI) + ↓ + Celery Task Queue (Redis) + ↓ + Workers (multiple processes) + ↓ + Graph RAG Processing + ↓ + Results stored in Redis + ↓ + Task Status Polling (FastAPI) + ↓ + Results Retrieved +``` + +--- + ## FastAPI Example ```python @@ -480,6 +647,7 @@ We welcome contributions! Please check out the **Contributing Guide** for: - [x] **Hybrid Storage** – Qdrant vectors + MongoDB content - [x] **Hybrid Retrieval** – Semantic + BM25 search - [x] **HyDE & Reranking** – Query transformation and SOTA reranking +- [x] **Async Processing** – Celery + Redis for non-blocking document ingestion and horizontal scaling ### Coming Soon (Phase 2+) - [ ] Graph RAG Scoring – Semantic similarity + BM25 for edges @@ -489,6 +657,8 @@ We welcome contributions! Please check out the **Contributing Guide** for: - [ ] LangChain/LlamaIndex adapters - [ ] Streaming & tracing hooks (OpenTelemetry) - [ ] Native PDF/HTML loaders with auto‑chunk profiles +- [ ] Task persistence and recovery +- [ ] Advanced scheduling and cron job support --- diff --git a/START_WORKER.sh b/START_WORKER.sh new file mode 100755 index 0000000..2458d68 --- /dev/null +++ b/START_WORKER.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# Celery Worker Startup Script for Insta RAG +# Starts a Celery worker for async document ingestion tasks + +set -e + +echo "==========================================" +echo "Insta RAG - Celery Worker Startup" +echo "==========================================" + +# Check if .env file exists +if [ ! -f ".env" ]; then + echo "❌ Error: .env file not found!" + echo "" + echo "Please create a .env file with the following variables:" + echo " CELERY_BROKER_URL=redis://default:...@52.140.76.45:6379/0" + echo " CELERY_RESULT_BACKEND=redis://default:...@52.140.76.45:6379/1" + echo " NEO4J_URI=bolt://..." + echo " NEO4J_USER=neo4j" + echo " NEO4J_PASSWORD=..." + exit 1 +fi + +# Load environment variables +export $(cat .env | xargs) + +# Test Redis connection +echo "✓ Testing Redis connection..." +python3 << 'EOF' +import socket +import sys + +redis_host = "52.140.76.45" +redis_port = 6379 + +try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + result = sock.connect_ex((redis_host, redis_port)) + + if result == 0: + print("✓ Redis connection successful") + sock.close() + sys.exit(0) + else: + print("❌ Cannot connect to Redis") + sys.exit(1) +except Exception as e: + print(f"❌ Redis connection failed: {e}") + sys.exit(1) +EOF + +if [ $? -ne 0 ]; then + exit 1 +fi + +# Parse arguments +CONCURRENCY=4 +LOG_LEVEL="info" +QUEUE="default" + +while [[ $# -gt 0 ]]; do + case $1 in + -c|--concurrency) + CONCURRENCY="$2" + shift 2 + ;; + -l|--log-level) + LOG_LEVEL="$2" + shift 2 + ;; + -q|--queue) + QUEUE="$2" + shift 2 + ;; + -h|--help) + echo "Usage: ./START_WORKER.sh [OPTIONS]" + echo "" + echo "Options:" + echo " -c, --concurrency Number of concurrent tasks (default: 4)" + echo " -l, --log-level Logging level: debug, info, warning (default: info)" + echo " -q, --queue Queue to process (default: default)" + echo " -h, --help Show this help message" + echo "" + echo "Examples:" + echo " ./START_WORKER.sh" + echo " ./START_WORKER.sh --concurrency 8" + echo " ./START_WORKER.sh --log-level debug --concurrency 2" + exit 0 + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done + +echo "==========================================" +echo "Worker Configuration:" +echo " Concurrency: $CONCURRENCY tasks" +echo " Log Level: $LOG_LEVEL" +echo " Queue: $QUEUE" +echo "==========================================" +echo "" + +# Start Celery worker +echo "✓ Starting Celery worker..." +echo "" +echo "Press Ctrl+C to stop the worker" +echo "==========================================" +echo "" + +celery -A insta_rag.celery_app worker \ + -l $LOG_LEVEL \ + -c $CONCURRENCY \ + -Q $QUEUE \ + --loglevel=$LOG_LEVEL diff --git a/examples/celery_async_ingestion.py b/examples/celery_async_ingestion.py new file mode 100644 index 0000000..08bd775 --- /dev/null +++ b/examples/celery_async_ingestion.py @@ -0,0 +1,281 @@ +"""Example: Async document ingestion using Celery for Graph RAG. + +This example demonstrates how to submit document ingestion tasks to Celery, +allowing documents to be processed asynchronously without blocking the application. +The task ID is returned immediately, and you can track progress using the +TaskMonitoring service. + +Prerequisites: + - Redis server running at 52.140.76.45:6379 + - Celery worker running: celery -A insta_rag.celery_app worker -l info + - Neo4j database configured and running +""" + +import asyncio +import logging +import time +from insta_rag.graph_rag.client import GraphRAGClient +from insta_rag.models.document import DocumentInput +from insta_rag.task_monitoring import get_task_monitoring + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def example_1_basic_async_submission(): + """Example 1: Submit documents asynchronously and immediately return task ID. + + This demonstrates the key benefit of Celery integration - the API call + returns immediately with a task ID, while the actual processing happens + in the background. + """ + logger.info("\n=== Example 1: Basic Async Submission ===") + + # Create documents for ingestion + documents = [ + DocumentInput.from_text( + "Alice works at TechCorp as a senior engineer. " + "She specializes in machine learning and distributed systems." + ), + DocumentInput.from_text( + "TechCorp is an AI company founded in 2015. " + "It focuses on building cutting-edge AI products." + ), + DocumentInput.from_text( + "Bob is the CEO of TechCorp. He has 20 years of experience in tech." + ), + ] + + # Initialize client (no async required for submission) + client = GraphRAGClient() + + # Submit documents for async ingestion + # This returns IMMEDIATELY with a task ID + result = client.submit_add_documents_async( + documents=documents, + collection_name="company_info", + ) + + logger.info(f"Task submitted successfully!") + logger.info(f" Task ID: {result['task_id']}") + logger.info(f" Status: {result['status']}") + logger.info(f" Message: {result['message']}") + logger.info(f" Documents: {result['num_documents']}") + + return result["task_id"] + + +def example_2_monitor_task_status(task_id: str): + """Example 2: Monitor task status and progress. + + After submitting a task, you can check its status, progress, and final results. + """ + logger.info("\n=== Example 2: Monitor Task Status ===") + + monitor = get_task_monitoring() + + # Initial status check + status = monitor.get_task_status(task_id) + logger.info(f"Initial task status: {status['state']}") + + # Monitor until completion + max_wait_time = 300 # 5 minutes max wait + start_time = time.time() + + while not status["ready"] and (time.time() - start_time) < max_wait_time: + logger.info(f"Task state: {status['state']}") + + if "progress" in status: + logger.info(f" Progress: {status['progress']}") + + time.sleep(2) # Check every 2 seconds + status = monitor.get_task_status(task_id) + + logger.info(f"Final task state: {status['state']}") + + if status["successful"]: + logger.info("✓ Task completed successfully!") + logger.info(f"Result: {status['result']}") + elif status["failed"]: + logger.info("✗ Task failed!") + logger.info(f"Error: {status['error']}") + + return status + + +def example_3_wait_for_result(task_id: str): + """Example 3: Wait for task result with timeout. + + This is a more direct approach - wait for the task to complete and get results. + """ + logger.info("\n=== Example 3: Wait for Result ===") + + monitor = get_task_monitoring() + + try: + # Wait up to 5 minutes for the result + result = monitor.get_task_result(task_id, timeout=300) + logger.info("✓ Task completed successfully!") + logger.info(f"Result: {result}") + return result + except TimeoutError: + logger.warning("Task did not complete within timeout period") + return None + except Exception as e: + logger.error(f"Task failed with error: {e}") + return None + + +def example_4_queue_monitoring(): + """Example 4: Monitor queued and active tasks. + + See what tasks are currently queued or being processed by workers. + """ + logger.info("\n=== Example 4: Queue Monitoring ===") + + monitor = get_task_monitoring() + + # Get queue statistics + queue_length = monitor.get_queue_length() + logger.info(f"Total tasks in queue: {queue_length}") + + # Get active tasks (currently executing) + active = monitor.get_active_tasks() + if active: + logger.info("Active tasks:") + for worker, tasks in active.items(): + logger.info(f" {worker}: {len(tasks)} tasks") + for task in tasks[:2]: # Show first 2 tasks + logger.info(f" - {task.get('name')}: {task.get('id')[:8]}...") + else: + logger.info("No active tasks") + + # Get reserved tasks (pending execution) + reserved = monitor.get_reserved_tasks() + if reserved: + logger.info("Reserved (pending) tasks:") + for worker, tasks in reserved.items(): + logger.info(f" {worker}: {len(tasks)} tasks") + else: + logger.info("No reserved tasks") + + # Get worker statistics + stats = monitor.get_worker_stats() + if stats: + logger.info("Worker statistics:") + for worker_name, worker_stats in stats.items(): + logger.info(f" {worker_name}:") + logger.info(f" Pool size: {worker_stats['pool_size']}") + logger.info(f" Active: {worker_stats['active_tasks']}") + logger.info(f" Reserved: {worker_stats['reserved_tasks']}") + else: + logger.info("No workers available") + + +def example_5_batch_ingestion(): + """Example 5: Submit multiple ingestion tasks concurrently. + + Submit multiple document batches to be processed in parallel by workers. + """ + logger.info("\n=== Example 5: Batch Concurrent Ingestion ===") + + client = GraphRAGClient() + monitor = get_task_monitoring() + + # Define multiple document batches + batches = [ + { + "collection": "company_info", + "docs": [ + DocumentInput.from_text("Alice is a senior engineer at TechCorp."), + DocumentInput.from_text("Bob is the CEO of TechCorp."), + ], + }, + { + "collection": "products", + "docs": [ + DocumentInput.from_text("TechCorp develops AI-powered analytics platform."), + DocumentInput.from_text("The platform uses machine learning for predictions."), + ], + }, + { + "collection": "partnerships", + "docs": [ + DocumentInput.from_text("TechCorp partners with Google Cloud for infrastructure."), + ], + }, + ] + + # Submit all batches concurrently + task_ids = [] + for batch in batches: + result = client.submit_add_documents_async( + documents=batch["docs"], + collection_name=batch["collection"], + ) + task_ids.append(result["task_id"]) + logger.info( + f"Submitted batch to '{batch['collection']}': " + f"Task ID = {result['task_id'][:8]}..." + ) + + logger.info(f"\nSubmitted {len(task_ids)} batches for parallel processing") + + # Monitor all tasks + pending = set(task_ids) + completed = set() + + while pending: + for task_id in list(pending): + status = monitor.get_task_status(task_id) + + if status["ready"]: + if status["successful"]: + logger.info(f"✓ Task {task_id[:8]}... completed successfully") + completed.add(task_id) + else: + logger.warning(f"✗ Task {task_id[:8]}... failed") + completed.add(task_id) + + pending.discard(task_id) + + if pending: + logger.info(f"Waiting for {len(pending)} tasks to complete...") + time.sleep(3) + + logger.info(f"All {len(completed)} batches completed!") + + +def main(): + """Run all examples.""" + logger.info("Celery Async Ingestion Examples for Graph RAG") + logger.info("=" * 50) + + try: + # Example 1: Submit async + logger.info("\nStarting Example 1...") + task_id = example_1_basic_async_submission() + + # Example 2: Monitor status + logger.info("\nStarting Example 2...") + time.sleep(2) # Give task a moment to start + example_2_monitor_task_status(task_id) + + # Example 4: Queue monitoring + logger.info("\nStarting Example 4...") + example_4_queue_monitoring() + + # Example 5: Batch ingestion + logger.info("\nStarting Example 5...") + example_5_batch_ingestion() + + logger.info("\n" + "=" * 50) + logger.info("All examples completed!") + + except Exception as e: + logger.error(f"Error running examples: {e}", exc_info=True) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index f28c9ca..4047dd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "insta_rag" -version = "0.1.1-beta.4" +version = "0.1.1-beta.5" description = "A RAG (Retrieval-Augmented Generation) library for document processing and retrieval." authors = [ { name = "Aukik Aurnab", email = "aukikaurnabx@gmail.com" }, @@ -39,6 +39,10 @@ dependencies = [ "requests>=2.32.5", "rank-bm25>=0.2.2", "graphiti-core>=0.1.0", + "celery>=5.3.0", + "redis>=4.5.0", + "httpcore>=1.0.0", + "httpx>=0.24.0", ] [project.urls] diff --git a/src/insta_rag/__init__.py b/src/insta_rag/__init__.py index 07ff65c..7f5b892 100644 --- a/src/insta_rag/__init__.py +++ b/src/insta_rag/__init__.py @@ -8,10 +8,33 @@ from .core.config import RAGConfig from .models.document import DocumentInput from .models.response import AddDocumentsResponse +from .worker_manager import start_worker, stop_worker, is_worker_running, get_worker_pid +from .worker_pool import ( + start_worker_pool, + stop_worker_pool, + get_pool_status, + get_active_worker_count, + is_pool_healthy, + scale_pool, + get_queue_depth, + auto_scale_if_needed, +) __all__ = [ "RAGClient", "RAGConfig", "DocumentInput", "AddDocumentsResponse", + "start_worker", + "stop_worker", + "is_worker_running", + "get_worker_pid", + "start_worker_pool", + "stop_worker_pool", + "get_pool_status", + "get_active_worker_count", + "is_pool_healthy", + "scale_pool", + "get_queue_depth", + "auto_scale_if_needed", ] diff --git a/src/insta_rag/celery_app.py b/src/insta_rag/celery_app.py new file mode 100644 index 0000000..97bd58e --- /dev/null +++ b/src/insta_rag/celery_app.py @@ -0,0 +1,65 @@ +"""Celery application configuration for Insta RAG async task processing.""" + +import os + +from celery import Celery + +# Initialize Celery app +app = Celery("insta_rag") + +# Get broker and backend URLs from environment +# IMPORTANT: These MUST be set via environment variables (e.g., from .env file) +# They are required for async task processing to work +celery_broker_url = os.getenv("CELERY_BROKER_URL") +celery_result_backend = os.getenv("CELERY_RESULT_BACKEND") + +# Validate that required Redis configuration is present +if not celery_broker_url: + print( + "[Celery] WARNING: CELERY_BROKER_URL not set in environment. " + "Async tasks may not work properly. " + "Set CELERY_BROKER_URL in your .env file or environment." + ) +if not celery_result_backend: + print( + "[Celery] WARNING: CELERY_RESULT_BACKEND not set in environment. " + "Task results may not be stored. " + "Set CELERY_RESULT_BACKEND in your .env file or environment." + ) + +# Celery configuration +app.conf.update( + broker_url=celery_broker_url, + result_backend=celery_result_backend, + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + task_track_started=True, # Track task started state + task_time_limit=3600, # 1 hour hard time limit + task_soft_time_limit=3300, # 55 minutes soft time limit + worker_prefetch_multiplier=1, # Don't prefetch tasks (better for long-running) + worker_max_tasks_per_child=1000, # Recycle worker after 1000 tasks + result_expires=86400, # Results expire after 24 hours +) + +# Import tasks to register them with Celery +# This must happen AFTER app configuration to avoid circular imports +try: + print("[Celery] Attempting to import tasks...") + from insta_rag.tasks import graph_rag_tasks # noqa: F401 + print("[Celery] ✓ Tasks imported successfully") +except ImportError as e: + print(f"[Celery] ✗ Failed to import tasks: {e}") +except Exception as e: + print(f"[Celery] ✗ Unexpected error importing tasks: {e}") + + +def get_celery_app(): + """Get the Celery app instance. + + Returns: + Celery: The configured Celery application instance. + """ + return app diff --git a/src/insta_rag/core/config.py b/src/insta_rag/core/config.py index fcc4847..841fea0 100644 --- a/src/insta_rag/core/config.py +++ b/src/insta_rag/core/config.py @@ -258,6 +258,92 @@ def to_dict(self) -> Dict[str, Any]: } +@dataclass +class CeleryConfig: + """Celery configuration for async task processing. + + Used for configuring background task queue for document ingestion + and other long-running operations via Celery with Redis broker. + + IMPORTANT: Redis credentials must come from environment variables: + - CELERY_BROKER_URL: Set to your Redis broker URL + - CELERY_RESULT_BACKEND: Set to your Redis result backend URL + + These are REQUIRED and cannot be None. + """ + + broker_url: Optional[str] = None + result_backend: Optional[str] = None + enabled: bool = True + task_serializer: str = "json" + result_serializer: str = "json" + task_track_started: bool = True + task_time_limit: int = 3600 # 1 hour + task_soft_time_limit: int = 3300 # 55 minutes + worker_prefetch_multiplier: int = 1 + worker_max_tasks_per_child: int = 1000 + result_expires: int = 86400 # 24 hours + + def validate(self) -> None: + """Validate Celery configuration.""" + if not self.broker_url: + raise ConfigurationError("Celery broker URL is required") + if not self.result_backend: + raise ConfigurationError("Celery result backend URL is required") + + @classmethod + def from_env(cls, **kwargs) -> "CeleryConfig": + """Create Celery configuration from environment variables. + + Environment variables: + CELERY_BROKER_URL: Redis broker URL + CELERY_RESULT_BACKEND: Redis result backend URL + CELERY_ENABLED: Enable/disable Celery (default: True) + CELERY_TASK_TIME_LIMIT: Task hard time limit in seconds + CELERY_TASK_SOFT_TIME_LIMIT: Task soft time limit in seconds + + Args: + **kwargs: Override specific configuration values + + Returns: + CeleryConfig instance + """ + config = cls( + broker_url=os.getenv( + "CELERY_BROKER_URL", + + ), + result_backend=os.getenv( + "CELERY_RESULT_BACKEND", + + ), + enabled=os.getenv("CELERY_ENABLED", "true").lower() == "true", + task_time_limit=int(os.getenv("CELERY_TASK_TIME_LIMIT", "3600")), + task_soft_time_limit=int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "3300")), + ) + + # Apply any overrides from kwargs + for key, value in kwargs.items(): + if hasattr(config, key): + setattr(config, key, value) + + return config + + def to_dict(self) -> Dict[str, Any]: + """Convert configuration to dictionary (without sensitive data).""" + return { + "enabled": self.enabled, + "task_serializer": self.task_serializer, + "result_serializer": self.result_serializer, + "task_track_started": self.task_track_started, + "task_time_limit": self.task_time_limit, + "task_soft_time_limit": self.task_soft_time_limit, + "worker_prefetch_multiplier": self.worker_prefetch_multiplier, + "worker_max_tasks_per_child": self.worker_max_tasks_per_child, + "result_expires": self.result_expires, + } + + @dataclass class RAGConfig: """Main RAG system configuration.""" diff --git a/src/insta_rag/graph_rag/client.py b/src/insta_rag/graph_rag/client.py index 519266a..4937cb5 100644 --- a/src/insta_rag/graph_rag/client.py +++ b/src/insta_rag/graph_rag/client.py @@ -3,7 +3,7 @@ from __future__ import annotations import os -from typing import Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from openai import AsyncOpenAI @@ -207,6 +207,133 @@ async def add_chunk( return await self._builder.add_chunk(chunk, collection_name) + # ======================== Async Task Submission (Non-blocking) ======================== + + def submit_add_documents_async( + self, + documents: List[DocumentInput], + collection_name: str = "default", + neo4j_uri: Optional[str] = None, + neo4j_user: Optional[str] = None, + neo4j_password: Optional[str] = None, + neo4j_database: Optional[str] = None, + ) -> Dict[str, Any]: + """Submit documents for async ingestion via Celery (non-blocking). + + This method immediately returns a task ID without waiting for the documents + to be processed. Use the task monitoring service to track progress and retrieve results. + + Args: + documents: List of DocumentInput objects to add + collection_name: Name of collection to add to (default: 'default') + neo4j_uri: Neo4j URI (optional, uses env var if not provided) + neo4j_user: Neo4j username (optional, uses env var if not provided) + neo4j_password: Neo4j password (optional, uses env var if not provided) + neo4j_database: Neo4j database name (optional, uses env var if not provided) + + Returns: + Dict with keys: + - 'task_id': Celery task ID for tracking + - 'status': 'submitted' + - 'message': Task submission details + - 'collection_name': The collection name + - 'num_documents': Number of documents submitted + + Raises: + ValueError: If documents list is empty + ImportError: If Celery is not configured + + Example: + >>> documents = [DocumentInput.from_text("..."), ...] + >>> result = client.submit_add_documents_async(documents, "my_collection") + >>> task_id = result['task_id'] + >>> # Later, check status: + >>> from insta_rag.task_monitoring import get_task_monitoring + >>> monitor = get_task_monitoring() + >>> status = monitor.get_task_status(task_id) + """ + if not documents: + raise ValueError("documents list cannot be empty") + + try: + from insta_rag.tasks.graph_rag_tasks import add_documents_task + except ImportError: + raise ImportError( + "Celery is required for async document submission. " + "Install with: pip install celery redis" + ) + + # Convert DocumentInput objects to dicts for serialization + documents_data = [] + for doc in documents: + doc_dict = { + "source": doc.source, + "source_type": doc.source_type.value, + "metadata": doc.metadata or {}, + } + documents_data.append(doc_dict) + + # Submit task to Celery + celery_result = add_documents_task.apply_async( + args=[documents_data, collection_name], + kwargs={ + "neo4j_uri": neo4j_uri, + "neo4j_user": neo4j_user, + "neo4j_password": neo4j_password, + "neo4j_database": neo4j_database, + "group_id": self.group_id, + }, + queue="default", + ) + + return { + "task_id": celery_result.id, + "status": "submitted", + "message": f"Document ingestion task submitted for {len(documents)} documents", + "collection_name": collection_name, + "num_documents": len(documents), + } + + def submit_add_chunk_async( + self, + chunk: Chunk, + collection_name: str = "default", + neo4j_uri: Optional[str] = None, + neo4j_user: Optional[str] = None, + neo4j_password: Optional[str] = None, + neo4j_database: Optional[str] = None, + ) -> Dict[str, Any]: + """Submit a chunk for async ingestion via Celery (non-blocking). + + Args: + chunk: Chunk object to add + collection_name: Name of collection to add to (default: 'default') + neo4j_uri: Neo4j URI (optional) + neo4j_user: Neo4j username (optional) + neo4j_password: Neo4j password (optional) + neo4j_database: Neo4j database name (optional) + + Returns: + Dict with task submission details (see submit_add_documents_async) + + Raises: + ImportError: If Celery is not configured + """ + # Convert chunk to DocumentInput + doc = DocumentInput.from_text(chunk.content) + if chunk.metadata: + doc.metadata = chunk.metadata.to_dict() if hasattr(chunk.metadata, 'to_dict') else dict(chunk.metadata) + + # Use the documents submission method + return self.submit_add_documents_async( + documents=[doc], + collection_name=collection_name, + neo4j_uri=neo4j_uri, + neo4j_user=neo4j_user, + neo4j_password=neo4j_password, + neo4j_database=neo4j_database, + ) + # ======================== Retrieval Operations ======================== async def retrieve( diff --git a/src/insta_rag/task_monitoring.py b/src/insta_rag/task_monitoring.py new file mode 100644 index 0000000..d8c6c1c --- /dev/null +++ b/src/insta_rag/task_monitoring.py @@ -0,0 +1,317 @@ +"""Task monitoring and tracking service for Celery-based async operations.""" + +import logging +from typing import Any, Dict, List, Optional + +from celery import states +from celery.result import AsyncResult + +from insta_rag.celery_app import app + +logger = logging.getLogger(__name__) + + +class TaskMonitoring: + """Service for monitoring and tracking Celery task execution. + + Provides methods to check task status, retrieve results, monitor queues, + and get worker statistics for the document ingestion pipeline. + """ + + def __init__(self): + """Initialize TaskMonitoring service with the Celery app.""" + self.celery_app = app + + def get_task_status(self, task_id: str) -> Dict[str, Any]: + """Get the current status of a task. + + Args: + task_id: The Celery task ID + + Returns: + Dict with keys: + - 'task_id': The task ID + - 'state': Current state (PENDING, STARTED, SUCCESS, FAILURE, etc.) + - 'ready': Boolean indicating if task is complete + - 'successful': Boolean indicating if task succeeded + - 'failed': Boolean indicating if task failed + - 'progress': Dict with 'current' and 'total' keys (if available) + - 'result': Task result if state is SUCCESS + - 'error': Error message if state is FAILURE + """ + try: + result = AsyncResult(task_id, app=self.celery_app) + + status_dict = { + "task_id": task_id, + "state": result.state, + "ready": result.ready(), + "successful": result.successful() if result.ready() else None, + "failed": result.failed() if result.ready() else None, + } + + # Add progress info if available + if result.state == "PROGRESS": + status_dict["progress"] = result.info + elif result.ready() and result.state == states.SUCCESS: + status_dict["result"] = result.result + elif result.ready() and result.state == states.FAILURE: + status_dict["error"] = str(result.info) + + return status_dict + except Exception as e: + logger.error(f"Error getting task status for {task_id}: {e}") + return { + "task_id": task_id, + "state": "UNKNOWN", + "error": str(e), + } + + def get_task_result( + self, task_id: str, timeout: Optional[float] = None + ) -> Any: + """Get the result of a task, waiting if necessary. + + Args: + task_id: The Celery task ID + timeout: Maximum time to wait in seconds (None = infinite) + + Returns: + The task result (dict with success/failure info) + + Raises: + TimeoutError: If timeout is exceeded + Exception: If the task failed + """ + result = AsyncResult(task_id, app=self.celery_app) + return result.get(timeout=timeout, propagate=True) + + def is_task_ready(self, task_id: str) -> bool: + """Check if a task has completed (succeeded or failed). + + Args: + task_id: The Celery task ID + + Returns: + True if task is complete, False otherwise + """ + result = AsyncResult(task_id, app=self.celery_app) + return result.ready() + + def is_task_successful(self, task_id: str) -> bool: + """Check if a task completed successfully. + + Args: + task_id: The Celery task ID + + Returns: + True if task succeeded, False otherwise + """ + result = AsyncResult(task_id, app=self.celery_app) + return result.successful() if result.ready() else False + + def is_task_failed(self, task_id: str) -> bool: + """Check if a task failed. + + Args: + task_id: The Celery task ID + + Returns: + True if task failed, False otherwise + """ + result = AsyncResult(task_id, app=self.celery_app) + return result.failed() if result.ready() else False + + def get_all_queued_tasks(self) -> Dict[str, List[Dict[str, Any]]]: + """Get all tasks currently in queues (pending and reserved). + + Returns: + Dict with worker names as keys and lists of task info as values. + Each task info dict contains: name, id, args, kwargs, eta, etc. + """ + try: + inspector = self.celery_app.control.inspect() + + # Get reserved (not yet executed) tasks + reserved = inspector.reserved() or {} + # Get active (currently executing) tasks + active = inspector.active() or {} + + queued_tasks = {} + + # Combine reserved tasks (pending) + for worker, tasks in reserved.items(): + if tasks: + queued_tasks[f"{worker}_reserved"] = tasks + + # Add info that tasks are active + for worker, tasks in active.items(): + if tasks: + queued_tasks[f"{worker}_active"] = tasks + + return queued_tasks + + except Exception as e: + logger.error(f"Error getting queued tasks: {e}") + return {} + + def get_active_tasks(self) -> Dict[str, List[Dict[str, Any]]]: + """Get all currently executing tasks. + + Returns: + Dict with worker names as keys and lists of active task info as values. + """ + try: + inspector = self.celery_app.control.inspect() + active = inspector.active() or {} + + return { + worker: tasks + for worker, tasks in active.items() + if tasks + } + + except Exception as e: + logger.error(f"Error getting active tasks: {e}") + return {} + + def get_reserved_tasks(self) -> Dict[str, List[Dict[str, Any]]]: + """Get all reserved tasks (pending execution). + + Returns: + Dict with worker names as keys and lists of reserved task info as values. + """ + try: + inspector = self.celery_app.control.inspect() + reserved = inspector.reserved() or {} + + return { + worker: tasks + for worker, tasks in reserved.items() + if tasks + } + + except Exception as e: + logger.error(f"Error getting reserved tasks: {e}") + return {} + + def get_queue_length(self) -> int: + """Get total number of tasks in all queues. + + Returns: + Total count of queued tasks + """ + try: + active = self.get_active_tasks() + reserved = self.get_reserved_tasks() + + active_count = sum(len(tasks) for tasks in active.values()) + reserved_count = sum(len(tasks) for tasks in reserved.values()) + + return active_count + reserved_count + + except Exception as e: + logger.error(f"Error getting queue length: {e}") + return 0 + + def get_worker_stats(self) -> Dict[str, Any]: + """Get statistics about all active workers. + + Returns: + Dict with worker names as keys and stats as values. + Stats include: pool size, active tasks, processed tasks, etc. + """ + try: + inspector = self.celery_app.control.inspect() + + # Get stats for all workers + stats = inspector.stats() or {} + + worker_info = {} + for worker_name, worker_stats in stats.items(): + active_tasks = self.get_active_tasks().get(worker_name, []) + reserved_tasks = self.get_reserved_tasks().get(worker_name, []) + + worker_info[worker_name] = { + "pool_size": worker_stats.get("pool", {}).get("max-concurrency", "N/A"), + "active_tasks": len(active_tasks), + "reserved_tasks": len(reserved_tasks), + "total_tasks": len(active_tasks) + len(reserved_tasks), + "processed": worker_stats.get("total", {}), + "status": "online", + } + + return worker_info + + except Exception as e: + logger.error(f"Error getting worker stats: {e}") + return {} + + def cancel_task(self, task_id: str, terminate: bool = False) -> bool: + """Cancel a running task. + + Args: + task_id: The Celery task ID + terminate: If True, force terminate; if False, wait for graceful shutdown + + Returns: + True if cancellation was successful + """ + try: + if terminate: + self.celery_app.control.revoke(task_id, terminate=True) + else: + self.celery_app.control.revoke(task_id, terminate=False) + + logger.info(f"Task {task_id} cancelled successfully") + return True + + except Exception as e: + logger.error(f"Error cancelling task {task_id}: {e}") + return False + + def get_task_info_summary(self, task_id: str) -> Dict[str, Any]: + """Get a comprehensive summary of a task's information. + + Args: + task_id: The Celery task ID + + Returns: + Dict with comprehensive task information + """ + status = self.get_task_status(task_id) + + summary = { + "task_id": task_id, + "state": status.get("state"), + "is_ready": status.get("ready"), + "is_successful": status.get("successful"), + "is_failed": status.get("failed"), + } + + if status.get("progress"): + summary["progress"] = status["progress"] + + if status.get("result"): + summary["result"] = status["result"] + + if status.get("error"): + summary["error"] = status["error"] + + return summary + + +# Global instance +_task_monitoring = None + + +def get_task_monitoring() -> TaskMonitoring: + """Get or create the global TaskMonitoring instance. + + Returns: + TaskMonitoring instance + """ + global _task_monitoring + if _task_monitoring is None: + _task_monitoring = TaskMonitoring() + return _task_monitoring diff --git a/src/insta_rag/tasks/__init__.py b/src/insta_rag/tasks/__init__.py new file mode 100644 index 0000000..7607f36 --- /dev/null +++ b/src/insta_rag/tasks/__init__.py @@ -0,0 +1,5 @@ +"""Celery tasks for async operations in Insta RAG.""" + +from .graph_rag_tasks import add_documents_task + +__all__ = ["add_documents_task"] diff --git a/src/insta_rag/tasks/graph_rag_tasks.py b/src/insta_rag/tasks/graph_rag_tasks.py new file mode 100644 index 0000000..98e03ff --- /dev/null +++ b/src/insta_rag/tasks/graph_rag_tasks.py @@ -0,0 +1,229 @@ +"""Celery tasks for Graph RAG document ingestion operations.""" + +import asyncio +import logging +from typing import Any, Dict, List, Optional + +from insta_rag.celery_app import app +from insta_rag.graph_rag.client import GraphRAGClient +from insta_rag.models.document import DocumentInput + +logger = logging.getLogger(__name__) + + +@app.task( + bind=True, + name="insta_rag.tasks.add_documents_task", + track_started=True, + autoretry_for=(Exception,), + retry_kwargs={"max_retries": 3}, + retry_backoff=True, + retry_backoff_max=600, # Max retry delay: 10 minutes + retry_jitter=True, +) +def add_documents_task( + self, + documents_data: List[Dict[str, Any]], + collection_name: str = "default", + neo4j_uri: Optional[str] = None, + neo4j_user: Optional[str] = None, + neo4j_password: Optional[str] = None, + neo4j_database: Optional[str] = None, + group_id: str = "insta_rag", +) -> Dict[str, Any]: + """Add documents to Graph RAG knowledge graph asynchronously. + + This task runs as a Celery task in a background worker, allowing document + ingestion to proceed without blocking the caller. The task returns immediately + with a task ID, and results can be retrieved using the task monitoring service. + + Args: + documents_data: List of document dictionaries with keys: + - 'source': The document text or content + - 'source_type': 'TEXT', 'FILE', or 'BINARY' + - 'metadata': Optional dict of custom metadata + collection_name: Name of the collection in knowledge graph (default: 'default') + neo4j_uri: Neo4j connection URI (optional, uses env var if not provided) + neo4j_user: Neo4j username (optional, uses env var if not provided) + neo4j_password: Neo4j password (optional, uses env var if not provided) + neo4j_database: Neo4j database name (optional, uses env var if not provided) + group_id: Group ID for organizing graph data (default: 'insta_rag') + + Returns: + Dict with keys: + - 'status': 'success' or 'failure' + - 'task_id': The Celery task ID + - 'collection_name': The collection name used + - 'total_documents': Number of documents processed + - 'succeeded': Number of successfully added documents + - 'failed': Number of failed documents + - 'results': List of GraphAddResult dicts + - 'error': Error message if status is 'failure' + - 'processing_time_ms': Total processing time in milliseconds + + Raises: + ValueError: If documents_data is empty or invalid + Exception: If Neo4j connection or document processing fails + """ + if not documents_data: + raise ValueError("documents_data cannot be empty") + + self.update_state(state="PROGRESS", meta={"current": 0, "total": len(documents_data)}) + + try: + # Convert document dicts to DocumentInput objects + documents = [] + for i, doc_data in enumerate(documents_data): + try: + source = doc_data.get("source", "") + source_type = doc_data.get("source_type", "TEXT").upper() + metadata = doc_data.get("metadata", {}) + + # Create DocumentInput based on source type + if source_type == "TEXT": + doc = DocumentInput.from_text(source) + if metadata: + doc.metadata = metadata + elif source_type == "FILE": + doc = DocumentInput.from_file(source) + if metadata: + doc.metadata = metadata + elif source_type == "BINARY": + doc = DocumentInput.from_binary(source) + if metadata: + doc.metadata = metadata + else: + raise ValueError(f"Invalid source_type: {source_type}") + + documents.append(doc) + except Exception as e: + logger.warning(f"Failed to parse document {i}: {e}") + continue + + if not documents: + raise ValueError("No valid documents could be parsed from documents_data") + + # Run async GraphRAGClient in sync context using asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete( + _add_documents_async( + documents=documents, + collection_name=collection_name, + neo4j_uri=neo4j_uri, + neo4j_user=neo4j_user, + neo4j_password=neo4j_password, + neo4j_database=neo4j_database, + group_id=group_id, + task_id=self.request.id, + update_progress=self.update_state, + ) + ) + return result + finally: + loop.close() + + except Exception as e: + logger.error(f"Document ingestion task failed: {e}", exc_info=True) + raise + + +async def _add_documents_async( + documents: List[DocumentInput], + collection_name: str, + neo4j_uri: Optional[str], + neo4j_user: Optional[str], + neo4j_password: Optional[str], + neo4j_database: Optional[str], + group_id: str, + task_id: str, + update_progress, +) -> Dict[str, Any]: + """Internal async function to add documents using GraphRAGClient. + + Args: + documents: List of DocumentInput objects + collection_name: Collection name + neo4j_uri: Neo4j URI + neo4j_user: Neo4j username + neo4j_password: Neo4j password + neo4j_database: Neo4j database + group_id: Group ID + task_id: Celery task ID (for logging/tracking) + update_progress: Function to update task progress + + Returns: + Dict with success/failure status and results + """ + try: + # Initialize GraphRAGClient with optional parameters + client = GraphRAGClient( + neo4j_uri=neo4j_uri, + neo4j_user=neo4j_user, + neo4j_password=neo4j_password, + neo4j_database=neo4j_database, + group_id=group_id, + ) + + # Initialize the client + await client.initialize() + + try: + logger.info( + f"[Task {task_id}] Starting document ingestion: {len(documents)} documents " + f"to collection '{collection_name}'" + ) + + # Add documents with progress tracking + results = await client.add_documents(documents, collection_name) + + logger.info( + f"[Task {task_id}] Document ingestion completed: {len(results)} results" + ) + + # Convert results to serializable dicts + results_dicts = [] + total_nodes = 0 + total_edges = 0 + + for result in results: + total_nodes += result.nodes_created + total_edges += result.edges_created + results_dicts.append( + { + "episode_uuid": result.episode_uuid, + "nodes_created": result.nodes_created, + "edges_created": result.edges_created, + "group_id": result.group_id, + "processing_time_ms": result.processing_time_ms, + } + ) + + return { + "status": "success", + "task_id": task_id, + "collection_name": collection_name, + "total_documents": len(documents), + "succeeded": len(results), + "failed": len(documents) - len(results), + "total_nodes_created": total_nodes, + "total_edges_created": total_edges, + "results": results_dicts, + } + + finally: + await client.close() + + except Exception as e: + logger.error(f"[Task {task_id}] Document ingestion failed: {e}", exc_info=True) + return { + "status": "failure", + "task_id": task_id, + "collection_name": collection_name, + "total_documents": len(documents), + "succeeded": 0, + "failed": len(documents), + "error": str(e), + } diff --git a/src/insta_rag/worker_manager.py b/src/insta_rag/worker_manager.py new file mode 100644 index 0000000..36a0c4e --- /dev/null +++ b/src/insta_rag/worker_manager.py @@ -0,0 +1,180 @@ +"""Celery worker lifecycle management for Insta RAG. + +This module provides utilities to start and stop Celery workers programmatically, +useful for applications that want to manage workers without manual terminal commands. +""" + +import subprocess +import atexit +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + +# Global worker process +_celery_worker_process: Optional[subprocess.Popen] = None + + +def start_worker( + log_file: str = "/tmp/celery_worker.log", + queue: str = "default", + loglevel: str = "info", + concurrency: int = 4, +) -> Optional[int]: + """Start Celery worker in a background subprocess. + + This function starts a Celery worker process that handles async document + ingestion tasks. The worker runs independently and processes tasks from + the Redis queue. + + Args: + log_file: Path to write worker logs (default: /tmp/celery_worker.log) + queue: Queue to listen to (default: 'default') + loglevel: Logging level: debug, info, warning (default: 'info') + concurrency: Number of concurrent tasks (default: 4) + + Returns: + Process ID (PID) of worker if started successfully, None otherwise + + Example: + >>> from insta_rag.worker_manager import start_worker + >>> pid = start_worker(queue="default", loglevel="info") + >>> print(f"Worker started with PID: {pid}") + """ + global _celery_worker_process + + try: + # Check if worker is already running + if _celery_worker_process is not None and _celery_worker_process.poll() is None: + logger.info("✓ Celery worker is already running") + return _celery_worker_process.pid + + # Open log file for writing + worker_log_file = open(log_file, "a") + + # Start the worker process with specified configuration + _celery_worker_process = subprocess.Popen( + [ + "celery", + "-A", + "insta_rag.celery_app", + "worker", + "-l", + loglevel, + "-c", + str(concurrency), + "-Q", + queue, + ], + stdout=worker_log_file, + stderr=worker_log_file, + start_new_session=True, # Run in new process group (detached) + ) + + logger.info(f"✓ Celery worker started (PID: {_celery_worker_process.pid})") + logger.info(f" Queue: {queue}") + logger.info(f" Concurrency: {concurrency}") + logger.info(f" Log file: {log_file}") + + # Register cleanup handler to stop worker on exit + atexit.register(stop_worker) + + return _celery_worker_process.pid + + except FileNotFoundError: + logger.error("✗ Celery not found. Install with: pip install celery") + return None + except Exception as e: + logger.error(f"✗ Failed to start Celery worker: {e}") + _celery_worker_process = None + return None + + +def stop_worker(timeout: int = 5) -> bool: + """Stop the Celery worker process gracefully. + + This function gracefully shuts down the background Celery worker. + If graceful shutdown fails, it force-kills the process. + + Args: + timeout: Seconds to wait for graceful shutdown (default: 5) + + Returns: + True if stopped successfully, False otherwise + + Example: + >>> from insta_rag.worker_manager import stop_worker + >>> success = stop_worker(timeout=5) + >>> print("Worker stopped" if success else "Failed to stop worker") + """ + global _celery_worker_process + + if _celery_worker_process is None: + return True + + try: + # Check if process is still running + if _celery_worker_process.poll() is None: + # Terminate gracefully + _celery_worker_process.terminate() + + # Wait for graceful shutdown + try: + _celery_worker_process.wait(timeout=timeout) + logger.info("✓ Celery worker stopped gracefully") + return True + except subprocess.TimeoutExpired: + # Force kill if graceful shutdown fails + _celery_worker_process.kill() + logger.warning("⚠ Celery worker force-killed (graceful shutdown timeout)") + return False + + _celery_worker_process = None + return True + + except Exception as e: + logger.error(f"✗ Error stopping Celery worker: {e}") + _celery_worker_process = None + return False + + +def is_worker_running() -> bool: + """Check if Celery worker is currently running. + + Returns: + True if worker is running, False otherwise + + Example: + >>> from insta_rag.worker_manager import is_worker_running + >>> if is_worker_running(): + ... print("Worker is active") + """ + global _celery_worker_process + + if _celery_worker_process is None: + return False + + return _celery_worker_process.poll() is None + + +def get_worker_pid() -> Optional[int]: + """Get the PID of the running worker process. + + Returns: + Process ID if worker is running, None otherwise + + Example: + >>> from insta_rag.worker_manager import get_worker_pid + >>> pid = get_worker_pid() + >>> if pid: + ... print(f"Worker PID: {pid}") + """ + global _celery_worker_process + + if _celery_worker_process is None: + return None + + if _celery_worker_process.poll() is None: + return _celery_worker_process.pid + + return None diff --git a/src/insta_rag/worker_pool.py b/src/insta_rag/worker_pool.py new file mode 100644 index 0000000..0bae1c8 --- /dev/null +++ b/src/insta_rag/worker_pool.py @@ -0,0 +1,351 @@ +"""Celery worker pool management for horizontal scaling. + +This module provides utilities to manage multiple Celery workers for horizontal +scaling across multiple instances or containers. +""" + +import subprocess +import logging +import time +from typing import List, Optional, Dict, Any + +from .task_monitoring import get_task_monitoring + +logger = logging.getLogger(__name__) + +# Global worker processes registry +_worker_processes: List[subprocess.Popen] = [] +_worker_configs: Dict[str, Dict[str, Any]] = {} + + +def start_worker_pool( + num_workers: int = 2, + queue: str = "default", + loglevel: str = "info", + concurrency_per_worker: int = 4, + log_dir: str = "/tmp", + auto_scale: bool = False, +) -> List[int]: + """Start a pool of Celery workers for horizontal scaling. + + This starts multiple worker processes that all connect to the same Redis + broker, enabling horizontal scaling of document processing tasks. + + Args: + num_workers: Number of worker processes to start (default: 2) + queue: Queue to listen to (default: 'default') + loglevel: Logging level: debug, info, warning (default: 'info') + concurrency_per_worker: Concurrent tasks per worker (default: 4) + log_dir: Directory for worker logs (default: '/tmp') + auto_scale: Enable auto-scaling based on queue depth (default: False) + + Returns: + List of process IDs (PIDs) of started workers + + Example: + >>> from insta_rag.worker_pool import start_worker_pool + >>> pids = start_worker_pool(num_workers=4, concurrency_per_worker=8) + >>> print(f"Started {len(pids)} workers: {pids}") + """ + global _worker_processes, _worker_configs + + if num_workers < 1: + logger.error("num_workers must be >= 1") + return [] + + try: + # Check if workers are already running + running_workers = [p for p in _worker_processes if p.poll() is None] + if running_workers: + logger.warning( + f"✓ {len(running_workers)} workers already running. " + f"Call stop_worker_pool() first to restart." + ) + return [p.pid for p in running_workers] + + started_pids = [] + + for worker_id in range(num_workers): + worker_name = f"worker{worker_id + 1}" + log_file_path = f"{log_dir}/celery_{worker_name}.log" + + try: + log_file = open(log_file_path, "a") + + worker_process = subprocess.Popen( + [ + "celery", + "-A", + "insta_rag.celery_app", + "worker", + "-n", + worker_name, + "-l", + loglevel, + "-c", + str(concurrency_per_worker), + "-Q", + queue, + ], + stdout=log_file, + stderr=log_file, + start_new_session=True, + ) + + _worker_processes.append(worker_process) + _worker_configs[worker_name] = { + "pid": worker_process.pid, + "queue": queue, + "concurrency": concurrency_per_worker, + "log_file": log_file_path, + "started_at": time.time(), + } + + started_pids.append(worker_process.pid) + logger.info( + f"✓ Worker {worker_name} started (PID: {worker_process.pid}, " + f"concurrency: {concurrency_per_worker})" + ) + + except Exception as e: + logger.error(f"✗ Failed to start worker {worker_name}: {e}") + continue + + if started_pids: + logger.info( + f"✓ Started {len(started_pids)} workers. " + f"Log directory: {log_dir}" + ) + if auto_scale: + logger.info("✓ Auto-scaling enabled. Monitoring queue depth...") + + return started_pids + + except Exception as e: + logger.error(f"✗ Failed to start worker pool: {e}") + return [] + + +def stop_worker_pool(timeout: int = 10) -> bool: + """Stop all Celery workers in the pool. + + Args: + timeout: Seconds to wait for graceful shutdown (default: 10) + + Returns: + True if all workers stopped successfully, False otherwise + + Example: + >>> from insta_rag.worker_pool import stop_worker_pool + >>> success = stop_worker_pool(timeout=10) + >>> print("All workers stopped" if success else "Some workers remained") + """ + global _worker_processes, _worker_configs + + if not _worker_processes: + logger.info("No workers running") + return True + + all_stopped = True + + for worker_process in _worker_processes: + if worker_process.poll() is None: # Still running + try: + worker_process.terminate() + + try: + worker_process.wait(timeout=timeout) + logger.info(f"✓ Worker PID {worker_process.pid} stopped gracefully") + except subprocess.TimeoutExpired: + worker_process.kill() + logger.warning( + f"⚠ Worker PID {worker_process.pid} force-killed " + f"(graceful shutdown timeout)" + ) + all_stopped = False + + except Exception as e: + logger.error(f"✗ Error stopping worker PID {worker_process.pid}: {e}") + all_stopped = False + + _worker_processes.clear() + _worker_configs.clear() + + if all_stopped: + logger.info("✓ All workers stopped gracefully") + else: + logger.warning("⚠ Some workers required force-kill") + + return all_stopped + + +def get_pool_status() -> Dict[str, Any]: + """Get the status of the worker pool. + + Returns: + Dict with pool information and statistics + + Example: + >>> from insta_rag.worker_pool import get_pool_status + >>> status = get_pool_status() + >>> print(f"Active workers: {status['active_workers']}") + >>> print(f"Queue depth: {status['queue_depth']}") + """ + global _worker_processes, _worker_configs + + active_workers = [p for p in _worker_processes if p.poll() is None] + monitoring = get_task_monitoring() + + queue_depth = monitoring.get_queue_length() + worker_stats = monitoring.get_worker_stats() + + total_concurrency = sum( + config.get("concurrency", 0) for config in _worker_configs.values() + ) + + return { + "total_workers": len(_worker_processes), + "active_workers": len(active_workers), + "inactive_workers": len(_worker_processes) - len(active_workers), + "total_concurrency": total_concurrency, + "queue_depth": queue_depth, + "workers": list(_worker_configs.keys()), + "worker_details": _worker_configs, + "worker_stats": worker_stats, + } + + +def get_active_worker_count() -> int: + """Get number of active workers. + + Returns: + Number of currently running workers + """ + global _worker_processes + return sum(1 for p in _worker_processes if p.poll() is None) + + +def is_pool_healthy() -> bool: + """Check if worker pool is healthy. + + Returns: + True if at least one worker is active, False otherwise + """ + return get_active_worker_count() > 0 + + +def scale_pool(target_workers: int, timeout: int = 10) -> bool: + """Scale worker pool to target number of workers. + + This function adjusts the pool size by stopping excess workers or + starting additional workers as needed. + + Args: + target_workers: Target number of workers to maintain + timeout: Seconds to wait for graceful shutdown (default: 10) + + Returns: + True if scaling succeeded, False otherwise + + Example: + >>> from insta_rag.worker_pool import scale_pool + >>> success = scale_pool(target_workers=8) + >>> print("Scaled to 8 workers" if success else "Scaling failed") + """ + global _worker_processes + + current_active = get_active_worker_count() + + if current_active == target_workers: + logger.info(f"✓ Pool already at target size: {target_workers} workers") + return True + + if current_active > target_workers: + logger.info(f"Scaling down from {current_active} to {target_workers} workers") + return stop_worker_pool(timeout=timeout) + + # Scale up + logger.info(f"Scaling up from {current_active} to {target_workers} workers") + additional_workers = target_workers - current_active + + # Get config from first worker to maintain consistency + if _worker_configs: + first_config = list(_worker_configs.values())[0] + queue = first_config.get("queue", "default") + concurrency = first_config.get("concurrency", 4) + + pids = start_worker_pool( + num_workers=additional_workers, + queue=queue, + concurrency_per_worker=concurrency, + ) + return len(pids) == additional_workers + + return False + + +def get_queue_depth() -> int: + """Get current queue depth (number of pending tasks). + + Returns: + Number of tasks waiting in queue + """ + monitoring = get_task_monitoring() + return monitoring.get_queue_length() + + +def auto_scale_if_needed( + queue_depth_threshold: int = 50, + min_workers: int = 2, + max_workers: int = 8, +) -> Optional[int]: + """Auto-scale worker pool based on queue depth. + + Increases workers if queue depth exceeds threshold, + decreases if queue is empty. + + Args: + queue_depth_threshold: Queue depth to trigger scaling up (default: 50) + min_workers: Minimum workers to maintain (default: 2) + max_workers: Maximum workers to allow (default: 8) + + Returns: + New worker count if scaled, None if no scaling occurred + + Example: + >>> from insta_rag.worker_pool import auto_scale_if_needed + >>> new_count = auto_scale_if_needed( + ... queue_depth_threshold=30, + ... min_workers=2, + ... max_workers=10 + ... ) + >>> if new_count: + ... print(f"Scaled to {new_count} workers") + """ + current_active = get_active_worker_count() + queue_depth = get_queue_depth() + + logger.info(f"Queue depth: {queue_depth}, Active workers: {current_active}") + + # Scale up if queue is backing up + if queue_depth > queue_depth_threshold and current_active < max_workers: + new_count = min(current_active + 2, max_workers) + logger.info( + f"Queue depth ({queue_depth}) exceeds threshold ({queue_depth_threshold}). " + f"Scaling up to {new_count} workers" + ) + if scale_pool(new_count): + return new_count + return None + + # Scale down if queue is empty and we have extra workers + if queue_depth == 0 and current_active > min_workers: + new_count = max(current_active - 1, min_workers) + logger.info( + f"Queue empty. Scaling down to {new_count} workers" + ) + if scale_pool(new_count): + return new_count + return None + + return None diff --git a/tests/test_celery_integration.py b/tests/test_celery_integration.py new file mode 100644 index 0000000..188e997 --- /dev/null +++ b/tests/test_celery_integration.py @@ -0,0 +1,399 @@ +"""Tests for Celery async task integration.""" + +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from insta_rag.core.config import CeleryConfig +from insta_rag.graph_rag.client import GraphRAGClient +from insta_rag.models.document import DocumentInput +from insta_rag.task_monitoring import TaskMonitoring, get_task_monitoring +from insta_rag.tasks.graph_rag_tasks import add_documents_task + + +class TestCeleryConfig: + """Tests for CeleryConfig dataclass.""" + + def test_celery_config_defaults(self): + """Test CeleryConfig with default values.""" + config = CeleryConfig() + assert config.enabled is True + assert config.task_serializer == "json" + assert config.task_track_started is True + assert config.task_time_limit == 3600 + assert config.task_soft_time_limit == 3300 + + def test_celery_config_from_env(self, monkeypatch): + """Test CeleryConfig.from_env() with environment variables.""" + monkeypatch.setenv("CELERY_ENABLED", "false") + monkeypatch.setenv("CELERY_TASK_TIME_LIMIT", "7200") + + config = CeleryConfig.from_env() + assert config.enabled is False + assert config.task_time_limit == 7200 + + def test_celery_config_validate(self): + """Test CeleryConfig validation.""" + config = CeleryConfig(broker_url="", result_backend="redis://") + + with pytest.raises(Exception): # ConfigurationError + config.validate() + + def test_celery_config_to_dict(self): + """Test CeleryConfig.to_dict() method.""" + config = CeleryConfig() + config_dict = config.to_dict() + + assert "enabled" in config_dict + assert "task_serializer" in config_dict + assert "result_expires" in config_dict + assert config_dict["enabled"] is True + + +class TestGraphRAGClientAsyncMethods: + """Tests for GraphRAGClient async submission methods.""" + + def test_submit_add_documents_async_returns_task_id(self): + """Test that submit_add_documents_async returns task ID.""" + client = GraphRAGClient() + + documents = [ + DocumentInput.from_text("Test document 1"), + DocumentInput.from_text("Test document 2"), + ] + + with patch("insta_rag.tasks.graph_rag_tasks.add_documents_task") as mock_task: + # Mock the async result + mock_result = MagicMock() + mock_result.id = "test-task-id-123" + mock_task.apply_async.return_value = mock_result + + result = client.submit_add_documents_async( + documents=documents, + collection_name="test_collection", + ) + + # Verify return format + assert "task_id" in result + assert result["task_id"] == "test-task-id-123" + assert result["status"] == "submitted" + assert result["num_documents"] == 2 + assert result["collection_name"] == "test_collection" + + # Verify task was submitted with correct args + mock_task.apply_async.assert_called_once() + + def test_submit_add_documents_async_empty_list_raises(self): + """Test that submitting empty document list raises ValueError.""" + client = GraphRAGClient() + + with pytest.raises(ValueError, match="documents list cannot be empty"): + client.submit_add_documents_async(documents=[], collection_name="test") + + def test_submit_add_documents_async_missing_celery_raises(self): + """Test that missing Celery raises ImportError.""" + client = GraphRAGClient() + + documents = [DocumentInput.from_text("Test")] + + with patch("insta_rag.graph_rag.client.add_documents_task", side_effect=ImportError): + with pytest.raises(ImportError, match="Celery is required"): + client.submit_add_documents_async(documents, "test") + + def test_submit_add_documents_async_with_custom_neo4j_params(self): + """Test submitting with custom Neo4j parameters.""" + client = GraphRAGClient() + + documents = [DocumentInput.from_text("Test")] + + with patch("insta_rag.tasks.graph_rag_tasks.add_documents_task") as mock_task: + mock_result = MagicMock() + mock_result.id = "task-id" + mock_task.apply_async.return_value = mock_result + + result = client.submit_add_documents_async( + documents=documents, + collection_name="test", + neo4j_uri="bolt://custom:7687", + neo4j_user="custom_user", + neo4j_password="custom_pass", + neo4j_database="custom_db", + ) + + # Verify custom params were passed + call_kwargs = mock_task.apply_async.call_args[1]["kwargs"] + assert call_kwargs["neo4j_uri"] == "bolt://custom:7687" + assert call_kwargs["neo4j_user"] == "custom_user" + assert call_kwargs["neo4j_password"] == "custom_pass" + assert call_kwargs["neo4j_database"] == "custom_db" + + def test_submit_add_chunk_async(self): + """Test submitting a chunk asynchronously.""" + from insta_rag.models.chunk import Chunk, ChunkMetadata + + client = GraphRAGClient() + + metadata = ChunkMetadata( + document_id="doc1", + source="test.txt", + chunk_index=0, + total_chunks=1, + ) + chunk = Chunk( + chunk_id="chunk1", + content="Test chunk content", + metadata=metadata, + ) + + with patch("insta_rag.tasks.graph_rag_tasks.add_documents_task") as mock_task: + mock_result = MagicMock() + mock_result.id = "task-id" + mock_task.apply_async.return_value = mock_result + + result = client.submit_add_chunk_async(chunk, "test_collection") + + assert result["task_id"] == "task-id" + assert result["status"] == "submitted" + assert result["num_documents"] == 1 + + +class TestTaskMonitoring: + """Tests for TaskMonitoring service.""" + + def test_task_monitoring_initialization(self): + """Test TaskMonitoring initialization.""" + monitor = TaskMonitoring() + assert monitor.celery_app is not None + + def test_get_task_monitoring_singleton(self): + """Test get_task_monitoring returns singleton.""" + monitor1 = get_task_monitoring() + monitor2 = get_task_monitoring() + assert monitor1 is monitor2 + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_get_task_status(self, mock_async_result): + """Test get_task_status method.""" + monitor = TaskMonitoring() + + # Mock AsyncResult + mock_result = MagicMock() + mock_result.state = "SUCCESS" + mock_result.ready.return_value = True + mock_result.successful.return_value = True + mock_result.failed.return_value = False + mock_result.result = {"status": "success"} + + mock_async_result.return_value = mock_result + + status = monitor.get_task_status("task-id-123") + + assert status["task_id"] == "task-id-123" + assert status["state"] == "SUCCESS" + assert status["ready"] is True + assert status["successful"] is True + assert status["failed"] is False + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_is_task_ready(self, mock_async_result): + """Test is_task_ready method.""" + monitor = TaskMonitoring() + + mock_result = MagicMock() + mock_result.ready.return_value = True + mock_async_result.return_value = mock_result + + assert monitor.is_task_ready("task-id") is True + + mock_result.ready.return_value = False + assert monitor.is_task_ready("task-id") is False + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_is_task_successful(self, mock_async_result): + """Test is_task_successful method.""" + monitor = TaskMonitoring() + + mock_result = MagicMock() + mock_result.ready.return_value = True + mock_result.successful.return_value = True + mock_async_result.return_value = mock_result + + assert monitor.is_task_successful("task-id") is True + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_is_task_failed(self, mock_async_result): + """Test is_task_failed method.""" + monitor = TaskMonitoring() + + mock_result = MagicMock() + mock_result.ready.return_value = True + mock_result.failed.return_value = True + mock_async_result.return_value = mock_result + + assert monitor.is_task_failed("task-id") is True + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_get_task_result_success(self, mock_async_result): + """Test get_task_result with successful task.""" + monitor = TaskMonitoring() + + mock_result = MagicMock() + mock_result.get.return_value = {"status": "success", "data": "result"} + mock_async_result.return_value = mock_result + + result = monitor.get_task_result("task-id", timeout=10) + + assert result == {"status": "success", "data": "result"} + mock_result.get.assert_called_once_with(timeout=10, propagate=True) + + def test_get_queue_length(self): + """Test get_queue_length method.""" + monitor = TaskMonitoring() + + with patch.object(monitor, "get_active_tasks") as mock_active: + with patch.object(monitor, "get_reserved_tasks") as mock_reserved: + mock_active.return_value = { + "worker1": [{"id": "1"}, {"id": "2"}], + "worker2": [{"id": "3"}], + } + mock_reserved.return_value = { + "worker1": [{"id": "4"}, {"id": "5"}, {"id": "6"}], + } + + length = monitor.get_queue_length() + assert length == 6 # 3 active + 3 reserved + + @patch("insta_rag.task_monitoring.AsyncResult") + def test_cancel_task(self, mock_async_result): + """Test cancel_task method.""" + monitor = TaskMonitoring() + + with patch.object(monitor.celery_app.control, "revoke") as mock_revoke: + result = monitor.cancel_task("task-id", terminate=False) + + assert result is True + mock_revoke.assert_called_once_with("task-id", terminate=False) + + def test_get_active_tasks(self): + """Test get_active_tasks method.""" + monitor = TaskMonitoring() + + with patch.object(monitor.celery_app.control, "inspect") as mock_inspect: + mock_inspector = MagicMock() + mock_inspector.active.return_value = { + "worker1": [{"name": "task1", "id": "id1"}], + "worker2": [{"name": "task2", "id": "id2"}], + } + mock_inspect.return_value = mock_inspector + + active = monitor.get_active_tasks() + + assert "worker1" in active + assert len(active["worker1"]) == 1 + + def test_get_worker_stats(self): + """Test get_worker_stats method.""" + monitor = TaskMonitoring() + + with patch.object(monitor.celery_app.control, "inspect") as mock_inspect: + mock_inspector = MagicMock() + mock_inspector.stats.return_value = { + "worker1": {"pool": {"max-concurrency": 4}, "total": 100}, + } + mock_inspect.return_value = mock_inspector + + with patch.object(monitor, "get_active_tasks") as mock_active: + with patch.object(monitor, "get_reserved_tasks") as mock_reserved: + mock_active.return_value = { + "worker1": [{"id": "1"}], + } + mock_reserved.return_value = { + "worker1": [{"id": "2"}, {"id": "3"}], + } + + stats = monitor.get_worker_stats() + + assert "worker1" in stats + assert stats["worker1"]["pool_size"] == 4 + assert stats["worker1"]["active_tasks"] == 1 + assert stats["worker1"]["reserved_tasks"] == 2 + + +class TestAddDocumentsTask: + """Tests for add_documents_task Celery task.""" + + @pytest.mark.asyncio + async def test_add_documents_task_structure(self): + """Test add_documents_task structure and properties.""" + assert hasattr(add_documents_task, "apply_async") + assert hasattr(add_documents_task, "delay") + assert add_documents_task.name == "insta_rag.tasks.add_documents_task" + + def test_document_dict_conversion(self): + """Test document serialization to dict format.""" + doc = DocumentInput.from_text("Test content") + + doc_dict = { + "source": doc.source, + "source_type": doc.source_type.value, + "metadata": doc.metadata or {}, + } + + assert doc_dict["source"] == "Test content" + assert doc_dict["source_type"] == "TEXT" + assert isinstance(doc_dict["metadata"], dict) + + +class TestAsyncIntegration: + """Integration tests for async document submission flow.""" + + def test_submit_and_track_flow(self): + """Test complete flow: submit -> check status -> get result.""" + client = GraphRAGClient() + monitor = TaskMonitoring() + + documents = [DocumentInput.from_text("Test document")] + + with patch("insta_rag.tasks.graph_rag_tasks.add_documents_task") as mock_task: + # Setup mock task + mock_result = MagicMock() + task_id = "test-task-123" + mock_result.id = task_id + mock_task.apply_async.return_value = mock_result + + # Submit + submit_result = client.submit_add_documents_async(documents, "test") + assert submit_result["task_id"] == task_id + + # Status check + with patch("insta_rag.task_monitoring.AsyncResult") as mock_async: + mock_async_instance = MagicMock() + mock_async_instance.state = "SUCCESS" + mock_async_instance.ready.return_value = True + mock_async_instance.successful.return_value = True + mock_async.return_value = mock_async_instance + + status = monitor.get_task_status(task_id) + assert status["state"] == "SUCCESS" + assert status["ready"] is True + + +# Fixtures for integration testing +@pytest.fixture +def celery_config(): + """Fixture providing CeleryConfig.""" + return CeleryConfig() + + +@pytest.fixture +def graph_rag_client(): + """Fixture providing GraphRAGClient.""" + return GraphRAGClient() + + +@pytest.fixture +def task_monitoring(): + """Fixture providing TaskMonitoring service.""" + return TaskMonitoring() diff --git a/uv.lock b/uv.lock index e968a49..2c2905d 100644 --- a/uv.lock +++ b/uv.lock @@ -8,6 +8,18 @@ resolution-markers = [ "python_full_version < '3.11'", ] +[[package]] +name = "amqp" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/79/fc/ec94a357dfc6683d8c86f8b4cfa5416a4c36b28052ec8260c77aca96a443/amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432", size = 129013, upload-time = "2024-11-12T19:55:44.051Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/26/99/fc813cd978842c26c82534010ea849eee9ab3a13ea2b74e95cb9c99e747b/amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", size = 50944, upload-time = "2024-11-12T19:55:41.782Z" }, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -41,6 +53,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/74/f5/9373290775639cb67a2fce7f629a1c240dce9f12fe927bc32b2736e16dfc/argcomplete-3.6.3-py3-none-any.whl", hash = "sha256:f5007b3a600ccac5d25bbce33089211dfd49eab4a7718da3f10e3082525a92ce", size = 43846, upload-time = "2025-10-20T03:33:33.021Z" }, ] +[[package]] +name = "async-timeout" +version = "5.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a5/ae/136395dfbfe00dfc94da3f3e136d0b13f394cba8f4841120e34226265780/async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3", size = 9274, upload-time = "2024-11-06T16:41:39.6Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fe/ba/e2081de779ca30d473f21f5b30e0e737c438205440784c7dfc81efc2b029/async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c", size = 6233, upload-time = "2024-11-06T16:41:37.9Z" }, +] + [[package]] name = "backoff" version = "2.2.1" @@ -50,6 +71,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/df/73/b6e24bd22e6720ca8ee9a85a0c4a2971af8497d8f3193fa05390cbd46e09/backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8", size = 15148, upload-time = "2022-10-05T19:19:30.546Z" }, ] +[[package]] +name = "billiard" +version = "4.2.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6a/50/cc2b8b6e6433918a6b9a3566483b743dcd229da1e974be9b5f259db3aad7/billiard-4.2.3.tar.gz", hash = "sha256:96486f0885afc38219d02d5f0ccd5bec8226a414b834ab244008cbb0025b8dcb", size = 156450, upload-time = "2025-11-16T17:47:30.281Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/cc/38b6f87170908bd8aaf9e412b021d17e85f690abe00edf50192f1a4566b9/billiard-4.2.3-py3-none-any.whl", hash = "sha256:989e9b688e3abf153f307b68a1328dfacfb954e30a4f920005654e276c69236b", size = 87042, upload-time = "2025-11-16T17:47:29.005Z" }, +] + +[[package]] +name = "celery" +version = "5.5.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "billiard" }, + { name = "click" }, + { name = "click-didyoumean" }, + { name = "click-plugins" }, + { name = "click-repl" }, + { name = "kombu" }, + { name = "python-dateutil" }, + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bb/7d/6c289f407d219ba36d8b384b42489ebdd0c84ce9c413875a8aae0c85f35b/celery-5.5.3.tar.gz", hash = "sha256:6c972ae7968c2b5281227f01c3a3f984037d21c5129d07bf3550cc2afc6b10a5", size = 1667144, upload-time = "2025-06-01T11:08:12.563Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c9/af/0dcccc7fdcdf170f9a1585e5e96b6fb0ba1749ef6be8c89a6202284759bd/celery-5.5.3-py3-none-any.whl", hash = "sha256:0b5761a07057acee94694464ca482416b959568904c9dfa41ce8413a7d65d525", size = 438775, upload-time = "2025-06-01T11:08:09.94Z" }, +] + [[package]] name = "certifi" version = "2025.10.5" @@ -239,6 +288,55 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0a/4c/925909008ed5a988ccbb72dcc897407e5d6d3bd72410d69e051fc0c14647/charset_normalizer-3.4.4-py3-none-any.whl", hash = "sha256:7a32c560861a02ff789ad905a2fe94e3f840803362c84fecf1851cb4cf3dc37f", size = 53402, upload-time = "2025-10-14T04:42:31.76Z" }, ] +[[package]] +name = "click" +version = "8.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/fa/656b739db8587d7b5dfa22e22ed02566950fbfbcdc20311993483657a5c0/click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a", size = 295065, upload-time = "2025-11-15T20:45:42.706Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" }, +] + +[[package]] +name = "click-didyoumean" +version = "0.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/30/ce/217289b77c590ea1e7c24242d9ddd6e249e52c795ff10fac2c50062c48cb/click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463", size = 3089, upload-time = "2024-03-24T08:22:07.499Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/5b/974430b5ffdb7a4f1941d13d83c64a0395114503cc357c6b9ae4ce5047ed/click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c", size = 3631, upload-time = "2024-03-24T08:22:06.356Z" }, +] + +[[package]] +name = "click-plugins" +version = "1.1.1.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c3/a4/34847b59150da33690a36da3681d6bbc2ec14ee9a846bc30a6746e5984e4/click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261", size = 8343, upload-time = "2025-06-25T00:47:37.555Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/9a/2abecb28ae875e39c8cad711eb1186d8d14eab564705325e77e4e6ab9ae5/click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6", size = 11051, upload-time = "2025-06-25T00:47:36.731Z" }, +] + +[[package]] +name = "click-repl" +version = "0.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "prompt-toolkit" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cb/a2/57f4ac79838cfae6912f997b4d1a64a858fb0c86d7fcaae6f7b58d267fca/click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9", size = 10449, upload-time = "2023-06-15T12:43:51.141Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/52/40/9d857001228658f0d59e97ebd4c346fe73e138c6de1bce61dc568a57c7f8/click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812", size = 10289, upload-time = "2023-06-15T12:43:48.626Z" }, +] + [[package]] name = "cohere" version = "5.19.0" @@ -825,11 +923,14 @@ wheels = [ [[package]] name = "insta-rag" -version = "0.1.1b4" +version = "0.1.1b5" source = { editable = "." } dependencies = [ + { name = "celery" }, { name = "cohere" }, { name = "graphiti-core" }, + { name = "httpcore" }, + { name = "httpx" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "numpy", version = "2.3.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "openai" }, @@ -839,6 +940,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "qdrant-client" }, { name = "rank-bm25" }, + { name = "redis" }, { name = "requests" }, { name = "tiktoken" }, ] @@ -858,8 +960,11 @@ dev = [ [package.metadata] requires-dist = [ + { name = "celery", specifier = ">=5.3.0" }, { name = "cohere", specifier = ">=4.47.0" }, { name = "graphiti-core", specifier = ">=0.1.0" }, + { name = "httpcore", specifier = ">=1.0.0" }, + { name = "httpx", specifier = ">=0.24.0" }, { name = "numpy", specifier = ">=1.24.0" }, { name = "openai", specifier = ">=1.12.0" }, { name = "pdfplumber", specifier = ">=0.10.3" }, @@ -868,6 +973,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "qdrant-client", specifier = ">=1.7.0" }, { name = "rank-bm25", specifier = ">=0.2.2" }, + { name = "redis", specifier = ">=4.5.0" }, { name = "requests", specifier = ">=2.32.5" }, { name = "tiktoken", specifier = ">=0.5.2" }, ] @@ -994,6 +1100,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d9/71/71408b02c6133153336d29fa3ba53000f1e1a3f78bb2fc2d1a1865d2e743/jiter-0.11.1-graalpy312-graalpy250_312_native-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:18c77aaa9117510d5bdc6a946baf21b1f0cfa58ef04d31c8d016f206f2118960", size = 343697, upload-time = "2025-10-17T11:31:13.773Z" }, ] +[[package]] +name = "kombu" +version = "5.5.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "amqp" }, + { name = "packaging" }, + { name = "tzdata" }, + { name = "vine" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0f/d3/5ff936d8319ac86b9c409f1501b07c426e6ad41966fedace9ef1b966e23f/kombu-5.5.4.tar.gz", hash = "sha256:886600168275ebeada93b888e831352fe578168342f0d1d5833d88ba0d847363", size = 461992, upload-time = "2025-06-01T10:19:22.281Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/70/a07dcf4f62598c8ad579df241af55ced65bed76e42e45d3c368a6d82dbc1/kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8", size = 210034, upload-time = "2025-06-01T10:19:20.436Z" }, +] + [[package]] name = "markupsafe" version = "3.0.3" @@ -1899,6 +2020,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/21/f691fb2613100a62b3fa91e9988c991e9ca5b89ea31c0d3152a3210344f9/rank_bm25-0.2.2-py3-none-any.whl", hash = "sha256:7bd4a95571adadfc271746fa146a4bcfd89c0cf731e49c3d1ad863290adbe8ae", size = 8584, upload-time = "2022-02-16T12:10:50.626Z" }, ] +[[package]] +name = "redis" +version = "7.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "async-timeout", marker = "python_full_version < '3.11.3'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/43/c8/983d5c6579a411d8a99bc5823cc5712768859b5ce2c8afe1a65b37832c81/redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c", size = 4796669, upload-time = "2025-11-19T15:54:39.961Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/89/f0/8956f8a86b20d7bb9d6ac0187cf4cd54d8065bc9a1a09eb8011d4d326596/redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b", size = 354159, upload-time = "2025-11-19T15:54:38.064Z" }, +] + [[package]] name = "regex" version = "2025.9.18" @@ -2272,6 +2405,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" }, ] +[[package]] +name = "tzdata" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, +] + [[package]] name = "urllib3" version = "2.5.0" @@ -2281,6 +2423,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795, upload-time = "2025-06-18T14:07:40.39Z" }, ] +[[package]] +name = "vine" +version = "5.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/bd/e4/d07b5f29d283596b9727dd5275ccbceb63c44a1a82aa9e4bfd20426762ac/vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0", size = 48980, upload-time = "2023-11-05T08:46:53.857Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/ff/7c0c86c43b3cbb927e0ccc0255cb4057ceba4799cd44ae95174ce8e8b5b2/vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc", size = 9636, upload-time = "2023-11-05T08:46:51.205Z" }, +] + [[package]] name = "virtualenv" version = "20.35.3"