Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- Pluggable architecture for chunkers, embedders, and vector databases.
- Hybrid storage with Qdrant and MongoDB.

## v0.1.1-beta.5 (2025-11-21)

## v0.1.1-beta.4 (2025-11-20)

## v0.1.1-beta.3 (2025-10-25)
Expand Down
215 changes: 215 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,218 @@ Documents → Extract Content → Graphiti Processing → Neo4j Graph
4. **LLM cost** – Entity extraction uses LLM calls; consider batching for large documents
5. **Phase 1 feature** – Hybrid retrieval (merging graph + vector results) is planned for Phase 2

## Async Processing & Celery (NEW)

### Overview

Async processing with Celery + Redis enables:
- Non-blocking document submission (returns task_id immediately)
- Background processing of documents (entity/relationship extraction)
- Horizontal worker scaling for throughput
- Task monitoring and status tracking
- Automatic retry logic for failed tasks

### Architecture

```
FastAPI Request
GraphRAGClient.submit_add_documents_async()
Celery Task Submission → Redis Queue "default"
Worker Pool (multiple processes)
Task Execution (Neo4j entity extraction)
Results → Redis Backend
Client polls /tasks/{task_id} for status
Results Retrieved
```

### Key Components

1. **src/insta_rag/celery_app.py** - Celery initialization
- Reads CELERY_BROKER_URL and CELERY_RESULT_BACKEND from environment
- Configures task serialization, timeouts, retry logic
- Registers task modules

2. **src/insta_rag/tasks/graph_rag_tasks.py** - Async task definitions
- `@app.task` decorated functions
- Handles async GraphRAGClient in sync Celery context
- Auto-retry (3 times) with exponential backoff
- Returns JSON-serializable results

3. **src/insta_rag/graph_rag/client.py** - Modified GraphRAGClient
- `submit_add_documents_async()` - Submit without waiting
- `submit_add_chunk_async()` - Submit single chunk
- Both return task_id immediately

4. **src/insta_rag/task_monitoring.py** - Task tracking
- `get_task_status(task_id)` - Returns PENDING/STARTED/SUCCESS/FAILURE
- `get_task_result(task_id)` - Returns task results
- `get_queue_length()` - Queue depth monitoring
- `cancel_task(task_id)` - Cancel pending tasks
- Singleton pattern for single instance

5. **src/insta_rag/worker_manager.py** - Single worker management
- `start_worker()` - Start single worker process
- `stop_worker()` - Graceful shutdown
- `is_worker_running()` - Check status
- Subprocess-based process management

6. **src/insta_rag/worker_pool.py** - Multiple workers (horizontal scaling)
- `start_worker_pool(num_workers)` - Start N workers
- `stop_worker_pool()` - Stop all workers
- `scale_pool(target_workers)` - Dynamic scaling
- `auto_scale_if_needed()` - Auto-scale based on queue depth
- `get_pool_status()` - Worker pool metrics
- `get_queue_depth()` - Pending task count

### Configuration Environment Variables

```dotenv
# Redis Configuration (required for async processing)
CELERY_BROKER_URL=redis://default:password@host:6379/0
CELERY_RESULT_BACKEND=redis://default:password@host:6379/1

# Redis databases:
# /0 - Task queue (broker) - holds pending tasks
# /1 - Result storage (backend) - stores task results
```

### Critical Implementation Details

#### 1. Queue Configuration
- **CRITICAL**: Workers must listen to queue "default"
- Task submission uses `.delay()` which queues to "default"
- Worker startup includes `-Q default` parameter

#### 2. load_dotenv() Ordering (CRITICAL)
In testing_api/main.py, **load_dotenv() MUST be called BEFORE importing insta_rag**:
```python
from dotenv import load_dotenv

# CRITICAL: Load environment BEFORE imports
load_dotenv()

# Now celery_app.py will find CELERY_BROKER_URL and CELERY_RESULT_BACKEND
from insta_rag import ...
```

#### 3. Security (No Hardcoded Credentials)
- src/insta_rag/celery_app.py uses `os.getenv()` with NO defaults
- src/insta_rag/core/config.py has `broker_url: Optional[str] = None`
- All credentials come from environment variables only
- Credentials in testing_api/.env, not in code

#### 4. Task Serialization
- Tasks serialized as JSON (not pickle)
- Documents converted to dicts for transmission
- Results converted to JSON-serializable dicts
- Compatible with distributed workers

### How It's Called from testing_api

#### 1. API Initialization (testing_api/main.py)
```python
@app.on_event("startup")
async def startup_event():
from insta_rag.worker_pool import start_worker_pool
start_worker_pool(num_workers=2, concurrency_per_worker=4)
```

#### 2. Document Submission (testing_api/graph_rag_routes.py)
```python
@router.post("/graph-rag/add-documents")
async def add_documents(documents: List[DocumentInput]):
async with GraphRAGClient() as client:
await client.initialize()
# Submit without waiting - returns task_id
task_id = await client.submit_add_documents_async(documents, "collection")

return {"task_id": task_id, "status": "submitted"}
```

#### 3. Task Status Check (testing_api/graph_rag_routes.py)
```python
@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
from insta_rag.task_monitoring import get_task_monitoring
monitor = get_task_monitoring()
status = monitor.get_task_status(task_id)

if status == "SUCCESS":
result = monitor.get_task_result(task_id)
return {"status": status, "result": result}

return {"status": status}
```

### Files Modified/Created

**Created:**
- src/insta_rag/celery_app.py
- src/insta_rag/tasks/graph_rag_tasks.py
- src/insta_rag/task_monitoring.py
- src/insta_rag/worker_manager.py
- src/insta_rag/worker_pool.py

**Modified:**
- src/insta_rag/graph_rag/client.py - Added async submission methods
- src/insta_rag/core/config.py - CeleryConfig with environment loading
- src/insta_rag/__init__.py - Export worker functions
- testing_api/main.py - Worker pool auto-start/stop
- testing_api/graph_rag_routes.py - Async endpoints
- testing_api/.env - Redis configuration
- pyproject.toml - Added httpcore, httpx dependencies

### Worker Startup Command

```bash
# Start single worker
celery -A insta_rag.celery_app worker -l debug -Q default -c 4

# Parameters:
# -A: Celery app module (insta_rag.celery_app)
# -l: Log level (debug, info, warning, error)
# -Q: Queue name (CRITICAL: must be "default")
# -c: Concurrency (number of parallel tasks)
```

### Task States and Lifecycle

```
Submission → PENDING → STARTED → SUCCESS / FAILURE
(or RETRY)
```

- **PENDING**: Waiting in Redis queue
- **STARTED**: Worker picked up task
- **SUCCESS**: Task completed successfully
- **FAILURE**: Task failed (after retries)
- **RETRY**: Task being retried (up to 3 times)

### Testing Async Processing

```python
# Test task submission
task_id = await client.submit_add_documents_async(docs, "test")

# Test status polling
monitor = get_task_monitoring()
status = monitor.get_task_status(task_id)
assert status in ["PENDING", "STARTED", "SUCCESS", "FAILURE"]

# Test result retrieval
if status == "SUCCESS":
result = monitor.get_task_result(task_id)
assert result is not None
```

## Code Quality Standards

### Pre-commit Hooks (Enforced on commit)
Expand Down Expand Up @@ -466,6 +678,7 @@ Note: Pre-commit hooks automatically manage `uv.lock` and `requirements.txt`.
- ✅ **Vector RAG** – Semantic chunking, hybrid retrieval, HyDE, reranking
- ✅ **Graph RAG** – Knowledge graph-based retrieval with Neo4j and Graphiti (Phase 1)
- ✅ **Hybrid Storage** – Qdrant for vectors, MongoDB for content
- ✅ **Async Processing** – Celery + Redis for non-blocking document ingestion and horizontal scaling

### Roadmap (Phase 2+)
- Graph RAG scoring – Implement semantic similarity + BM25 scoring for edges
Expand All @@ -475,6 +688,8 @@ Note: Pre-commit hooks automatically manage `uv.lock` and `requirements.txt`.
- CLI GA – General availability of command-line interface
- LangChain/LlamaIndex adapters – Deep integration with popular frameworks
- Streaming/tracing hooks – OpenTelemetry support, real-time monitoring
- Task persistence – Save task state to database for recovery
- Advanced scheduling – Cron jobs and delayed task execution

### Known Limitations

Expand Down
Loading
Loading