Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic_ai.fastapi.agent_router import AgentAPIRouter
from pydantic_ai.fastapi.registry import AgentRegistry

__all__ = [
'AgentRegistry',
'AgentAPIRouter',
]
165 changes: 165 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/agent_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import logging
from typing import Any

try:
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai.types import ErrorObject
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.model import Model
from openai.types.responses import Response
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` package to enable the fastapi openai compatible endpoint, '
'you can use the `openai` and `fastapi` optional group — `pip install "pydantic-ai-slim[openai,fastapi]"`'
) from _import_error

from pydantic_ai.fastapi.api import AgentChatCompletionsAPI, AgentModelsAPI, AgentResponsesAPI
from pydantic_ai.fastapi.data_models import (
ChatCompletionRequest,
ErrorResponse,
ModelsResponse,
ResponsesRequest,
)
from pydantic_ai.fastapi.registry import AgentRegistry

logger = logging.getLogger(__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger = logging.getLogger(__name__)



class AgentAPIRouter(APIRouter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to implement our own APIRouter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use the registry, the registry needs to be set in the APIs structs, to access the correct agent based on the defined model.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you can create a factory that creates the APIRouter and includes the routes based on conditionals.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've pushed that change

"""FastAPI Router for Pydantic Agent."""

def __init__(
self,
agent_registry: AgentRegistry,
disable_response_api: bool = False,
disable_completions_api: bool = False,
*args: tuple[Any],
**kwargs: tuple[Any],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong. The type of the *args and **kwargs should represent each element in the case of args, and each value in the case of kwargs - that said, this is a properly typed library, so we can't include this as is.

):
super().__init__(*args, **kwargs)
self.registry = agent_registry
self.responses_api = AgentResponsesAPI(self.registry)
self.completions_api = AgentChatCompletionsAPI(self.registry)
self.models_api = AgentModelsAPI(self.registry)
self.enable_responses_api = not disable_response_api
self.enable_completions_api = not disable_completions_api

# Registers OpenAI/v1 API routes
self._register_routes()

def _register_routes(self) -> None: # noqa: C901
if self.enable_completions_api:

@self.post(
'/v1/chat/completions',
response_model=ChatCompletion,
)
async def chat_completions( # type: ignore
request: ChatCompletionRequest,
) -> ChatCompletion | StreamingResponse:
if not request.messages:
raise HTTPException(
status_code=400,
detail=ErrorResponse(
error=ErrorObject(
type='invalid_request_error',
message='Messages cannot be empty',
),
).model_dump(),
)
try:
if getattr(request, 'stream', False):
return StreamingResponse(
self.completions_api.create_streaming_completion(request),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'text/plain; charset=utf-8',
},
)
else:
return await self.completions_api.create_completion(request)
except Exception as e:
logger.error(f'Error in chat completion: {e}', exc_info=True)
raise HTTPException(
status_code=500,
detail=ErrorResponse(
error=ErrorObject(
type='internal_server_error',
message=str(e),
),
).model_dump(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is already raised, and the server already sends a 500.

Suggested change
except Exception as e:
logger.error(f'Error in chat completion: {e}', exc_info=True)
raise HTTPException(
status_code=500,
detail=ErrorResponse(
error=ErrorObject(
type='internal_server_error',
message=str(e),
),
).model_dump(),
)


if self.enable_responses_api:

@self.post(
'/v1/responses',
response_model=Response,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename the import above to OpenAIResponse or something like that, Response in the context of a web framework is already used.

)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@self.post(
'/v1/responses',
response_model=Response,
)
@self.post('/v1/responses', response_model=Response)

async def responses( # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type: ignore is too generic. What's the problem here?

request: ResponsesRequest,
) -> Response:
if not request.input:
raise HTTPException(
status_code=400,
detail=ErrorResponse(
error=ErrorObject(
type='invalid_request_error',
message='Messages cannot be empty',
),
).model_dump(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need those extremely lengthy HTTPExceptions, but also, we shouldn't need to validate the input twice!

This line is never reached since request.input doesn't accept None - unless ResponseInputParam can be an empty dictionary? In any case, we should fix on the ResponsesRequest definition.

try:
if getattr(request, 'stream', False):
# TODO: add streaming support for responses api
raise HTTPException(status_code=501)
else:
return await self.responses_api.create_response(request)
except Exception as e:
logger.error(f'Error in responses: {e}', exc_info=True)
raise HTTPException(
status_code=500,
detail=ErrorResponse(
error=ErrorObject(
type='internal_server_error',
message=str(e),
),
).model_dump(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need again.


@self.get('/v1/models', response_model=ModelsResponse)
async def get_models() -> ModelsResponse: # type: ignore
try:
return await self.models_api.list_models()
except Exception as e:
logger.error(f'Error listing models: {e}', exc_info=True)
raise HTTPException(
status_code=500,
detail=ErrorResponse(
error=ErrorObject(
type='internal_server_error',
message=f'Error retrieving models: {str(e)}',
),
).model_dump(),
)

@self.get('/v1/models' + '/{model_id}', response_model=Model)
async def get_model(model_id: str) -> Model: # type: ignore
try:
return await self.models_api.get_model(model_id)
except HTTPException:
raise
except Exception as e:
logger.error(f'Error fetching model info: {e}', exc_info=True)
raise HTTPException(
status_code=500,
detail=ErrorResponse(
error=ErrorObject(
type='internal_server_error',
message=f'Error retrieving model: {str(e)}',
),
).model_dump(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

9 changes: 9 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/__init__.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module shouldn't be called fastapi. The underlying package doesn't matter.

Copy link

@ion-elgreco ion-elgreco Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are open to better name suggestions! Internally we called this differently

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic_ai.fastapi.api.completions import AgentChatCompletionsAPI
from pydantic_ai.fastapi.api.models import AgentModelsAPI
from pydantic_ai.fastapi.api.responses import AgentResponsesAPI

__all__ = [
'AgentChatCompletionsAPI',
'AgentModelsAPI',
'AgentResponsesAPI',
]
129 changes: 129 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/completions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import json
import logging
import time
from collections.abc import AsyncGenerator
from typing import Any

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice as Chunkhoice, ChoiceDelta
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` package to enable the fastapi openai compatible endpoint, '
'you can use the `openai` and `fastapi` optional group — `pip install "pydantic-ai-slim[openai,fastapi]"`'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the extra should be called fastapi. The idea is to expose the chat completions/responses endpoints - so maybe chat-completions.

) from _import_error

from pydantic import TypeAdapter

from pydantic_ai import Agent, _utils
from pydantic_ai.fastapi.convert import (
openai_chat_completions_2pai,
pai_result_to_openai_completions,
)
from pydantic_ai.fastapi.data_models import ChatCompletionRequest, ErrorResponse
from pydantic_ai.fastapi.registry import AgentRegistry
from pydantic_ai.settings import ModelSettings

logger = logging.getLogger(__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for this.



class AgentChatCompletionsAPI:
"""Chat completions API openai <-> pydantic-ai conversion."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

def get_agent(self, name: str) -> Agent:
"""Retrieves agent."""
try:
agent = self.registry.get_completions_agent(name)
except KeyError:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
message=f'Model {name} is not available as chat completions API',
type='not_found_error',
),
).model_dump(),
)

return agent

async def create_completion(self, request: ChatCompletionRequest) -> ChatCompletion:
"""Create a non-streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)

model_settings_ta = TypeAdapter(ModelSettings)
messages = openai_chat_completions_2pai(messages=request.messages)

try:
async with agent:
result = await agent.run(
message_history=messages,
model_settings=model_settings_ta.validate_python(
{k: v for k, v in request.model_dump().items() if v is not None},
),
)

return pai_result_to_openai_completions(
result=result,
model=model_name,
)

except Exception as e:
logger.error(f'Error creating completion: {e}')
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is shown anyway, there's no need for this handling.

Also, in other code sources, you may want to do: logger.exception('Error when creating completion'), since the exception is already logged.


Suggested change
except Exception as e:
logger.error(f'Error creating completion: {e}')
raise


async def create_streaming_completion(self, request: ChatCompletionRequest) -> AsyncGenerator[str]:
"""Create a streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)
messages = openai_chat_completions_2pai(messages=request.messages)

role_sent = False

async with (
agent,
agent.run_stream(
message_history=messages,
) as result,
):
async for chunk in result.stream_text(delta=True):
delta = ChoiceDelta(
role='assistant' if not role_sent else None,
content=chunk,
)
role_sent = True

stream_response = ChatCompletionChunk(
id=f'chatcmpl-{_utils.now_utc().isoformat()}',
created=int(_utils.now_utc().timestamp()),
model=model_name,
object='chat.completion.chunk',
choices=[
Chunkhoice(
index=0,
delta=delta,
),
],
)

yield f'data: {stream_response.model_dump_json()}\n\n'

final_chunk: dict[str, Any] = {
'id': f'chatcmpl-{int(time.time())}',
'object': 'chat.completion.chunk',
'model': model_name,
'choices': [
{
'index': 0,
'delta': {},
'finish_reason': 'stop',
},
],
}
yield f'data: {json.dumps(final_chunk)}\n\n'
57 changes: 57 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import time

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.model import Model
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` package to enable the fastapi openai compatible endpoint, '
'you can use the `openai` and `fastapi` optional group — `pip install "pydantic-ai-slim[openai,fastapi]"`'
) from _import_error

from pydantic_ai.fastapi.data_models import (
ErrorResponse,
ModelsResponse,
)
from pydantic_ai.fastapi.registry import AgentRegistry

logger = logging.getLogger(__name__)


class AgentModelsAPI:
"""Models API for pydantic-ai agents."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

async def list_models(self) -> ModelsResponse:
"""List available models (OpenAI-compatible endpoint)."""
agents = self.registry.all_agents

models = [
Model(
id=name,
object='model',
created=int(time.time()),
owned_by='model_owner',
)
for name in agents
]
return ModelsResponse(data=models)

async def get_model(self, name: str) -> Model:
"""Get information about a specific model (OpenAI-compatible endpoint)."""
if name in self.registry.all_agents:
return Model(id=name, object='model', created=int(time.time()), owned_by='NDIA')
else:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
type='not_found_error',
message=f"Model '{name}' not found",
),
).model_dump(),
)
Loading
Loading