Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,9 @@ Pulumi.*.yaml
chat-server/tests/chat_server_tests/*.json
chat-server/tests/chat_server_tests/output

# Chat-server Test datasets
chat-server/tests/e2e/datasets
chat-server/tests/e2e/datasets/*.csv

# Visualization test artifacts
chat-server/tests/e2e/viz_utils/examples_arguments_syntax
chat-server/tests/e2e/datasets
chat-server/tests/e2e/output
chat-server/tests/dspy/output
4 changes: 3 additions & 1 deletion chat-server/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{
"python.analysis.typeCheckingMode": "basic",
"python.analysis.autoImportCompletions": true
"python.analysis.autoImportCompletions": true,
"cursorpyright.analysis.autoImportCompletions": true,
"cursorpyright.analysis.typeCheckingMode": "basic"
}
163 changes: 163 additions & 0 deletions chat-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Gopie Chat Server

AI Agent for data analysis using LangGraph, LangChain, and FastAPI.

## Quick Start

```bash
cd chat-server
uv sync
uvicorn app.main:app --host 0.0.0.0 --port 8001 --reload
```

## Development Commands

| Command | Description |
|---------|-------------|
| `uvicorn app.main:app --host 0.0.0.0 --port 8001 --reload` | Start development server |
| `pytest` | Run all tests |
| `pytest -m unit` | Run unit tests only |
| `pytest -m e2e` | Run E2E tests only |
| `black .` | Code formatting |
| `isort .` | Import sorting |
| `uv sync` | Install dependencies |

## Configuration

### Environment Variables

#### OLAP Backend Configuration

The chat server supports multiple OLAP backends through the `CHAT_OLAP_DB_TYPE` environment variable:

| Value | Description |
|-------|-------------|
| `duckdb` | DuckDB (default) |
| `motherduck` | MotherDuck cloud DuckDB |
| `motherduck_org` | MotherDuck organization mode |
| `clickhouse` | ClickHouse single-node |
| `clickhouse_cluster` | ClickHouse cluster mode |
| `clickhouse_org` | ClickHouse organization mode |

Example:
```bash
export CHAT_OLAP_DB_TYPE=clickhouse
```

The OLAP backend type controls:
- SQL dialect used in generated queries (sampling syntax, function names)
- System table queries for table metadata
- Fuzzy string matching functions (`levenshtein` vs `levenshteinDistance`)
- LLM prompts for database-aware SQL generation

#### Other Configuration

See `app/core/config.py` for complete configuration options:

```bash
# API Configuration
PROJECT_NAME="Gopie Chat Server"
API_V1_STR="/api/v1"
MODE="development"

# LLM Providers
PORTKEY_API_KEY="your-portkey-key"
OPENAI_API_KEY="your-openai-key"
DEFAULT_LLM_MODEL="gpt-4o"

# Vector Database (Qdrant)
QDRANT_HOST="localhost"
QDRANT_PORT=6333
QDRANT_COLLECTION="dataset_collection"

# Code Execution (E2B)
E2B_API_KEY="your-e2b-key"
E2B_TIMEOUT=120

# External Services
GOPIE_API_ENDPOINT="http://localhost:8000"
```

## Architecture

### OLAP Query Builder

The chat server uses an abstraction layer for database-specific SQL generation:

```
┌─────────────────────────────────────────┐
│ Business Logic (table_utils, etc.) │
└────────────────────┬────────────────────┘
┌────────────────────▼────────────────────┐
│ OlapQueryBuilder (Abstract Base) │
│ - get_estimated_size_query() │
│ - build_sample_query() │
│ - build_levenshtein_query() │
│ - get_db_type() │
└────────────────────┬────────────────────┘
┌────────────┴────────────┐
▼ ▼
DuckDBQueryBuilder ClickHouseQueryBuilder
```

#### SQL Syntax Differences

| Feature | DuckDB | ClickHouse |
|---------|--------|------------|
| Table Stats | `duckdb_tables()` | `system.tables` |
| Row Count Column | `estimated_size` | `total_rows` |
| Sampling | `USING SAMPLE X% (system)` | `ORDER BY rand() LIMIT n` |
| Fuzzy Match | `levenshtein()` | `levenshteinDistance()` |
| Random | `random()` | `rand()` |

### Key Components

- **`app/utils/olap/`** - OLAP query builder abstraction
- `base.py` - Abstract base class
- `duckdb.py` - DuckDB implementation
- `clickhouse.py` - ClickHouse implementation
- `factory.py` - Factory function

- **`app/workflow/`** - LangGraph agent workflows
- **`app/tool_utils/`** - LangChain tools for SQL execution
- **`app/services/`** - External service integrations

## Testing

### Running Tests

```bash
# All tests
pytest

# Unit tests only
pytest -m unit

# E2E tests only
pytest -m e2e

# Specific test file
pytest tests/unit/test_olap_query_builders.py -v
```

### Test Structure

```
tests/
├── unit/ # Unit tests
│ └── test_olap_query_builders.py # OLAP query builder tests
├── e2e/ # End-to-end tests
└── conftest.py # Pytest configuration
```

## Docker

```bash
# Start chat server only
docker-compose up

# Full stack (with Go server, Qdrant, etc.)
cd .. && docker-compose -f docker-compose-noauth.yaml up
```
19 changes: 14 additions & 5 deletions chat-server/app/api/v1/routers/dataset_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import Annotated

from fastapi import APIRouter, HTTPException, status
from fastapi import APIRouter, Header, HTTPException, status

from app.core.session import SingletonAiohttp
from app.models.router import UploadResponse, UploadSchemaRequest
Expand All @@ -17,7 +18,10 @@


@dataset_router.post("/upload_schema", response_model=UploadResponse)
async def upload_schema(payload: UploadSchemaRequest):
async def upload_schema(
payload: UploadSchemaRequest,
x_organization_id: Annotated[str | None, Header()] = None,
):
"""
Processes and index dataset schema.

Expand All @@ -28,16 +32,21 @@ async def upload_schema(payload: UploadSchemaRequest):
project_id = payload.project_id
dataset_id = payload.dataset_id
dataset_details, project_details = await asyncio.gather(
get_dataset_info(dataset_id, project_id),
get_project_info(project_id),
get_dataset_info(
dataset_id, project_id, org_id=x_organization_id, is_view=payload.is_view
),
get_project_info(project_id, org_id=x_organization_id),
)
dataset_summary, sample_data = await generate_summary(
dataset_details.name, org_id=x_organization_id
)
dataset_summary, sample_data = await generate_summary(dataset_details.name)

success = await store_schema_in_qdrant(
dataset_summary=dataset_summary,
sample_data=sample_data,
dataset_details=dataset_details,
project_details=project_details,
org_id=x_organization_id,
)
if not success:
raise HTTPException(
Expand Down
42 changes: 42 additions & 0 deletions chat-server/app/api/v1/routers/fetch_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from fastapi import APIRouter, HTTPException, status

from app.core.log import custom_logger as logger
from app.models.router import FetchSqlRequest, FetchSqlResponse
from app.workflow.graph.nl_to_sql_graph.graph import nl_to_sql_graph

fetch_sql_router = APIRouter()


@fetch_sql_router.post("/fetch-sql", response_model=FetchSqlResponse)
async def fetch_sql(payload: FetchSqlRequest):
"""
Generate SQL queries from a natural language description.

- Single dataset: provide `dataset_ids` with one ID
- Multi-dataset: provide `dataset_ids` with multiple IDs and/or `project_ids`

Returns a list of generated SQL queries with their explanations.
"""
try:
dataset_ids = [did for did in (payload.dataset_ids or []) if did]
project_ids = [pid for pid in (payload.project_ids or []) if pid]

result = await nl_to_sql_graph.ainvoke(
{
"user_query": payload.description,
"dataset_ids": dataset_ids or None,
"project_ids": project_ids or None,
}
)

return FetchSqlResponse(
sql_queries=result.get("sql_queries", []),
message=result.get("message"),
)
except Exception as e:
logger.error(f"Error in fetch_sql: {e}")

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate SQL queries, please try again.",
) from e
17 changes: 8 additions & 9 deletions chat-server/app/api/v1/routers/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
from datetime import datetime
from typing import Any, Dict

import aiohttp
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from qdrant_client import AsyncQdrantClient

from app.core.config import settings
from app.core.session import SingletonAiohttp
from app.core.log import logger

router = APIRouter()

Expand Down Expand Up @@ -51,6 +50,7 @@ async def check_qdrant_health() -> Dict[str, Any]:
"collection_exists": False,
}
except Exception as e:
logger.exception("Error checking Qdrant health: %s", e)
return {"status": "unhealthy", "error": str(e), "collection_exists": False}


Expand Down Expand Up @@ -87,6 +87,7 @@ async def check_llm_provider_health() -> Dict[str, Any]:
"note": "Provider instantiated but no default model configured",
}
except Exception as e:
logger.exception("Error checking LLM provider health: %s", e)
return {
"status": "unhealthy",
"provider_type": settings.LLM_GATEWAY_PROVIDER,
Expand All @@ -103,18 +104,15 @@ async def check_gopie_server_health() -> Dict[str, Any]:
return {"status": "not_configured", "error": "GOPIE_API_ENDPOINT not configured"}

try:
http_session = SingletonAiohttp.get_aiohttp_client()
from app.services.gopie.client import GopieClient

client = GopieClient()
# Test basic connectivity to Gopie server
url = settings.GOPIE_API_ENDPOINT.rstrip("/")
headers = {"accept": "application/json"}

async with http_session.get(
url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
) as response:
async with await client.get("/") as response:
# Any response (even 404) means the server is reachable
return {"status": "healthy", "response_code": response.status, "server_reachable": True}
except Exception as e:
logger.exception("Error checking Gopie server health: %s", e)
return {"status": "unhealthy", "error": str(e), "server_reachable": False}


Expand Down Expand Up @@ -154,6 +152,7 @@ async def check_embedding_provider_health() -> Dict[str, Any]:
"note": "Provider instantiated but no default model configured",
}
except Exception as e:
logger.exception("Error checking embedding provider health: %s", e)
return {
"status": "unhealthy",
"provider_type": settings.EMBEDDING_GATEWAY_PROVIDER,
Expand Down
6 changes: 5 additions & 1 deletion chat-server/app/api/v1/routers/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import uuid
from typing import Annotated

from fastapi import APIRouter
from fastapi import APIRouter, Header
from fastapi.responses import JSONResponse, StreamingResponse

from app.utils.adapters.openai.input import (
Expand All @@ -22,6 +23,7 @@ async def root():
@router.post("/chat/completions")
async def create(
openai_format_request: RequestNonStreaming | RequestStreaming,
x_organization_id: Annotated[str | None, Header()] = None,
):
"""
Handle chat completion requests, supporting both streaming and non-streaming responses.
Expand Down Expand Up @@ -53,6 +55,7 @@ async def create(
chat_id=chat_id,
dataset_ids=request.dataset_ids,
project_ids=request.project_ids,
org_id=x_organization_id,
)
),
media_type="text/event-stream",
Expand All @@ -65,5 +68,6 @@ async def create(
chat_id=chat_id,
dataset_ids=request.dataset_ids,
project_ids=request.project_ids,
org_id=x_organization_id,
)
)
Loading