diff --git a/.github/workflows/docker-images.yml b/.github/workflows/docker-images.yml new file mode 100644 index 0000000..8191402 --- /dev/null +++ b/.github/workflows/docker-images.yml @@ -0,0 +1,37 @@ +name: Build Docker Images + +on: + release: + types: [published] + +permissions: + contents: read + +jobs: + build-api-image: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Docker Login + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + file: Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ vars.DOCKERHUB_NAMESPACE }}/agent-api:dev, ${{ vars.DOCKERHUB_NAMESPACE }}/agent-api:prd diff --git a/.github/workflows/ecr-images.yml b/.github/workflows/ecr-images.yml new file mode 100644 index 0000000..31c1947 --- /dev/null +++ b/.github/workflows/ecr-images.yml @@ -0,0 +1,44 @@ +name: Build ECR Images + +on: workflow_dispatch + +permissions: + # For AWS OIDC Token access as per https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-amazon-web-services#updating-your-github-actions-workflow + id-token: write # This is required for requesting the JWT + contents: read # This is required for actions/checkout + +env: + ECR_REPO: YOUR ECR REPO + # Create role using https://aws.amazon.com/blogs/security/use-iam-roles-to-connect-github-actions-to-actions-in-aws/ + AWS_ROLE: YOUR_ROLE_ARN + AWS_REGION: us-east-1 + +jobs: + build-api-image: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + # https://github.com/marketplace/actions/configure-aws-credentials-action-for-github-actions + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ env.AWS_ROLE }} + aws-region: ${{ env.AWS_REGION }} + # https://github.com/marketplace/actions/amazon-ecr-login-action-for-github-actions + - name: ECR Login + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + - name: Build, tag, and push docker image to Amazon ECR + uses: docker/build-push-action@v5 + with: + context: . + file: Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ env.ECR_REPO }}/agent-api:dev, ${{ env.ECR_REPO }}/agent-api:prd diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml new file mode 100644 index 0000000..1d2dc19 --- /dev/null +++ b/.github/workflows/validate.yml @@ -0,0 +1,49 @@ +name: Validate + +on: + push: + branches: + - "main" + pull_request: + types: + - opened + - synchronize + - reopened + branches: + - "main" + +jobs: + validate: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v3 + with: + enable-cache: true + cache-dependency-glob: "requirements**.txt" + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install the project + run: uv pip sync requirements.txt + + - name: Format with ruff + run: uv run ruff format . + + - name: Lint with ruff + run: uv run ruff check . + + - name: Type-check with mypy + run: uv run mypy . + + # - name: Run tests + # run: uv run pytest tests diff --git a/README.md b/README.md index ccbab69..512b8a3 100644 --- a/README.md +++ b/README.md @@ -1,64 +1,69 @@ -## Api Workspace Template +## Sample Agentic Application -This repo contains the code for running an agent-app and supports 2 environments: +This repo contains the code for a running an agentic application with: -1. **dev**: A development environment running locally on docker -2. **prd**: A production environment running on AWS ECS +1. A FastAPI server +2. A Postgres database with the PgVector extension. -## Setup Workspace +You can run the agentic application in 2 environments: -1. [Install uv](https://docs.astral.sh/uv/#getting-started): `curl -LsSf https://astral.sh/uv/install.sh | sh` - -> from the `agent-api-template` dir: +1. A development environment running locally on docker +2. A production environment running on AWS ECS -2. Install workspace and activate the virtual env: +## Setup -```sh -./scripts/install.sh -source .venv/bin/activate -``` +1. [Install uv](https://docs.astral.sh/uv/#getting-started): `curl -LsSf https://astral.sh/uv/install.sh | sh` -3. Copy `workspace/example_secrets` to `workspace/secrets`: +2. Create a virtual environment and install dependencies: ```sh -cp -r workspace/example_secrets workspace/secrets +./scripts/dev_setup.sh ``` -4. Optional: Create `.env` file: +3. Activate virtual environment -```sh -cp example.env .env +``` +source .venv/bin/activate ``` -## Run Api Workspace Template locally +## Run application locally using docker 1. Install [docker desktop](https://www.docker.com/products/docker-desktop) -2. Set OpenAI Key +2. Export API keys -Set the `OPENAI_API_KEY` environment variable using +Required: Set the `OPENAI_API_KEY` environment variable using ```sh -export OPENAI_API_KEY=sk-*** +export OPENAI_API_KEY=*** ``` -**OR** set in the `.env` file +> you may use any model provider, just need to update the /agents + +Optional: Set the `EXA_API_KEY` if you'd like to use Exa search + +```sh +export EXA_API_KEY=*** +``` -3. Start the workspace using: +3. Start the workspace: ```sh -ag ws up dev +ag ws up ``` -Open [localhost:8000/docs](http://localhost:8000/docs) to view the FastAPI docs. -Open [localhost:8501](http://localhost:8501) to view the Streamlit App. +- This will run 3 containers: + - FastAPI on [localhost:8000](http://localhost:8000/docs) + - Postgres on [localhost:5432](http://localhost:5432) +- Open [localhost:8501](http://localhost:8501) to view the Streamlit App. +- Open [localhost:8000/docs](http://localhost:8000/docs) to view the FastAPI docs. 4. Stop the workspace using: ```sh -ag ws down dev +ag ws down ``` -## Next Steps: +## Learn more: -- Run the Api Workspace Template on AWS - Docs coming soon +- Learn more about this template and how to customize it. diff --git a/agents/agents.py b/agents/operator.py similarity index 56% rename from agents/agents.py rename to agents/operator.py index c570d2b..386c625 100644 --- a/agents/agents.py +++ b/agents/operator.py @@ -1,17 +1,28 @@ -from typing import Optional +from enum import Enum +from typing import List, Optional from agents.sage import get_sage from agents.scholar import get_scholar +class AgentType(Enum): + SAGE = "sage" + SCHOLAR = "scholar" + + +def get_available_agents() -> List[str]: + """Returns a list of all available agent IDs.""" + return [agent.value for agent in AgentType] + + def get_agent( model_id: str = "gpt-4o", - agent_id: Optional[str] = None, + agent_id: Optional[AgentType] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, debug_mode: bool = True, ): - if agent_id == "sage": + if agent_id == AgentType.SAGE: return get_sage(model_id=model_id, user_id=user_id, session_id=session_id, debug_mode=debug_mode) else: return get_scholar(model_id=model_id, user_id=user_id, session_id=session_id, debug_mode=debug_mode) diff --git a/api/routes/agents.py b/api/routes/agents.py index 7d64320..5257aa8 100644 --- a/api/routes/agents.py +++ b/api/routes/agents.py @@ -1,12 +1,13 @@ -from typing import Any, Dict, Generator, List, Literal, Optional +from enum import Enum +from typing import Any, AsyncGenerator, Dict, List, Optional from agno.agent import Agent from agno.storage.agent.session import AgentSession -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException, status from fastapi.responses import StreamingResponse from pydantic import BaseModel -from agents.agents import get_agent +from agents.operator import AgentType, get_agent, get_available_agents from utils.log import logger ###################################################### @@ -16,45 +17,78 @@ agents_router = APIRouter(prefix="/agents", tags=["Agents"]) -class LoadKnowledgeBaseRequest(BaseModel): - session_id: Optional[str] = None - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None +class Model(str, Enum): + gpt_4o = "gpt-4o" + o3_mini = "o3-mini" + +@agents_router.get("", response_model=List[str]) +async def list_agents(): + """ + GET /agents -@agents_router.post("/load-knowledge-base") -def load_knowledge_base(body: LoadKnowledgeBaseRequest): - """Loads the knowledge base for an Agent""" + Returns a list of all available agent IDs. - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) - if agent.knowledge: - agent.knowledge.load(recreate=False) - return {"message": "Knowledge Base Loaded"} + Returns: + List[str]: List of agent identifiers + """ + return get_available_agents() -def chat_response_streamer(agent: Agent, message: str) -> Generator: - for chunk in agent.run(message, stream=True): - # Ideally we would yield chunk +async def chat_response_streamer(agent: Agent, message: str) -> AsyncGenerator: + """ + Stream agent responses chunk by chunk. + + Args: + agent: The agent instance to interact with + message: User message to process + + Yields: + Text chunks from the agent response + """ + run_response = await agent.arun(message, stream=True) + async for chunk in run_response: + # chunk.content only contains the text response from the Agent. + # For advanced use cases, we should yield the entire chunk + # that contains the tool calls and intermediate steps. yield chunk.content class RunRequest(BaseModel): + """Request model for an running an agent""" + message: str stream: bool = True - model_id: str = "gpt-4o" - session_id: Optional[str] = None + model: Model = Model.gpt_4o user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None + session_id: Optional[str] = None + +@agents_router.post("/{agent_id}/run", status_code=status.HTTP_200_OK) +async def run_agent(agent_id: AgentType, body: RunRequest): + """ + POST /agents/{agent_id}/run -@agents_router.post("/run") -def run(body: RunRequest): - """Sends a message to an Agent and returns the response""" + Sends a message to a specific agent and returns the response. + Args: + agent_id: The ID of the agent to interact with + body: Request parameters including the message + + Returns: + Either a streaming response or the complete agent response + """ logger.debug(f"RunRequest: {body}") - agent: Agent = get_agent( - session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id, model_id=body.model_id - ) + + try: + agent: Agent = get_agent( + model_id=body.model.value, + agent_id=agent_id, + user_id=body.user_id, + session_id=body.session_id, + ) + except Exception as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent not found: {str(e)}") if body.stream: return StreamingResponse( @@ -62,146 +96,195 @@ def run(body: RunRequest): media_type="text/event-stream", ) else: - response = agent.run(body.message, stream=False) - # Ideally we would return response + response = await agent.arun(body.message, stream=False) + # response.content only contains the text response from the Agent. + # For advanced use cases, we should yield the entire response + # that contains the tool calls and intermediate steps. return response.content -class RunHistoryRequest(BaseModel): - session_id: Optional[str] = None - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None - - -@agents_router.post("/history", response_model=List[Dict[str, Any]]) -def get_chat_history(body: RunHistoryRequest): - """Return the chat history for an Agent run""" - - logger.debug(f"RunHistoryRequest: {body}") - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) - # Load the agent from the database - agent.read_from_storage() +@agents_router.get("/{agent_id}/sessions", response_model=List[str]) +async def get_agent_session_ids(agent_id: AgentType, user_id: Optional[str] = None): + """ + GET /agents/{agent_id}/sessions - if agent.memory: - return agent.memory.get_messages() - else: - return [] + Returns all session IDs for a specific agent and user. + Args: + agent_id: The agent type + user_id: Optional user ID -class GetAgentRunRequest(BaseModel): - session_id: Optional[str] = None - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None + Returns: + List[str]: List of session identifiers + """ + logger.debug(f"GetAgentSessionsRequest: agent_id={agent_id}, user_id={user_id}") + return get_agent(user_id=user_id, agent_id=agent_id).storage.get_all_session_ids() -@agents_router.post("/get", response_model=Optional[AgentSession]) -def get_agent_run(body: GetAgentRunRequest): - """Returns the Agent run""" +@agents_router.get("/{agent_id}/sessions/{session_id}", response_model=Optional[AgentSession]) +async def get_agent_session(agent_id: AgentType, session_id: str, user_id: Optional[str] = None): + """ + GET /agents/{agent_id}/sessions/{session_id} - logger.debug(f"GetAgentRunRequest: {body}") - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) + Retrieves details about a specific agent session. - return agent.read_from_storage() + Args: + agent_id: Agent ID + session_id: The session ID to retrieve + user_id: Optional user ID + Returns: + AgentSession or None if not found + """ + logger.debug(f"GetAgentSessionRequest: agent_id={agent_id}, session_id={session_id}, user_id={user_id}") -class GetAllAgentSessionsRequest(BaseModel): - user_id: str - agent_id: Optional[Literal["sage", "scholar"]] = None + try: + agent: Agent = get_agent(session_id=session_id, user_id=user_id, agent_id=agent_id) + return agent.read_from_storage() + except Exception as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Session not found: {str(e)}") -@agents_router.post("/get-all", response_model=List[AgentSession]) -def get_agents(body: GetAllAgentSessionsRequest): - """Return all Agent sessions for a user""" +@agents_router.get("/{agent_id}/sessions/{session_id}/messages", response_model=List[Dict[str, Any]]) +async def get_session_messages(agent_id: AgentType, session_id: str, user_id: Optional[str] = None): + """ + GET /agents/{agent_id}/sessions/{session_id}/messages - logger.debug(f"GetAllAgentSessionsRequest: {body}") - return get_agent(user_id=body.user_id, agent_id=body.agent_id).storage.get_all_sessions() + Retrieves the messages for a specific agent session. + Args: + agent_id: Agent ID + session_id: The session ID to retrieve history for + user_id: Optional user ID -class GetAllAgentSessionIdsRequest(BaseModel): - user_id: str - agent_id: Optional[Literal["sage", "scholar"]] = None + Returns: + List of message objects representing the conversation history + """ + logger.debug(f"GetSessionHistoryRequest: agent_id={agent_id}, session_id={session_id}, user_id={user_id}") + try: + agent: Agent = get_agent(session_id=session_id, user_id=user_id, agent_id=agent_id) + # Load the agent from the database + agent.read_from_storage() + except Exception as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Session not found: {str(e)}") -@agents_router.post("/get-all-ids", response_model=List[str]) -def get_session_ids(body: GetAllAgentSessionIdsRequest): - """Return all session_ids for a user""" - - logger.debug(f"GetAllAgentSessionIdsRequest: {body}") - return get_agent(user_id=body.user_id, agent_id=body.agent_id).storage.get_all_session_ids() - - -class RenameAgentRequest(BaseModel): - session_id: str - agent_name: str - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None - + if agent.memory: + return agent.memory.get_messages() + else: + return [] -class RenameAgentResponse(BaseModel): - session_id: str - agent_name: str +@agents_router.delete("/{agent_id}/sessions/{session_id}", response_model=dict) +async def delete_session(agent_id: AgentType, session_id: str, user_id: Optional[str] = None): + """ + DELETE /agents/{agent_id}/sessions/{session_id} -@agents_router.post("/rename_agent", response_model=RenameAgentResponse) -def rename_agent(body: RenameAgentRequest): - """Rename an Agent""" + Deletes a specific agent session. + """ + logger.debug(f"DeleteSessionRequest: agent_id={agent_id}, session_id={session_id}") - logger.debug(f"RenameAgentRequest: {body}") - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) - agent.rename(name=body.agent_name) + try: + agent: Agent = get_agent(user_id=user_id, agent_id=agent_id, session_id=session_id) + agent.delete_session(session_id=session_id) + return {"message": "Session deleted"} + except Exception as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Session not found: {str(e)}") - return RenameAgentResponse( - session_id=agent.session_id, - agent_name=agent.name, - ) +class RenameSessionRequest(BaseModel): + """Request model for renaming a session""" -class RenameAgentSessionRequest(BaseModel): - session_id: str session_name: str - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None -class RenameAgentSessionResponse(BaseModel): - session_id: str - session_name: str +@agents_router.patch("/{agent_id}/sessions/{session_id}/rename", response_model=dict) +async def rename_session( + agent_id: AgentType, session_id: str, body: RenameSessionRequest, user_id: Optional[str] = None +): + """ + PATCH /agents/{agent_id}/sessions/{session_id}/rename + Renames a specific agent session. -@agents_router.post("/rename_session", response_model=RenameAgentSessionResponse) -def rename_agent_session(body: RenameAgentSessionRequest): - """Rename an Agent Session""" + Args: + agent_id: Agent ID + session_id: The session ID to rename + body: Request containing the new session name + user_id: Optional user ID - logger.debug(f"RenameAgentSessionRequest: {body}") - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) - agent.rename_session(session_name=body.session_name) - - return RenameAgentSessionResponse( - session_id=agent.session_id, - session_name=agent.session_name, + Returns: + Updated session information + """ + logger.debug( + f"RenameSessionRequest: agent_id={agent_id}, session_id={session_id}, session_name={body.session_name}" ) - -class AutoRenameAgentSessionRequest(BaseModel): - session_id: str - user_id: Optional[str] = None - agent_id: Optional[Literal["sage", "scholar"]] = None - - -class AutoRenameAgentSessionResponse(BaseModel): - session_id: str - session_name: str - - -@agents_router.post("/auto_rename_session", response_model=AutoRenameAgentSessionResponse) -def auto_rename_agent_session(body: AutoRenameAgentSessionRequest): - """Rename a agent session using the LLM""" - - logger.debug(f"AutoRenameAgentSessionRequest: {body}") - agent: Agent = get_agent(session_id=body.session_id, user_id=body.user_id, agent_id=body.agent_id) - agent.auto_rename_session() - - return AutoRenameAgentSessionResponse( - session_id=agent.session_id, - session_name=agent.session_name, - ) + try: + agent: Agent = get_agent(user_id=user_id, agent_id=agent_id, session_id=session_id) + agent.rename_session(session_name=body.session_name) + + return { + "session_id": agent.session_id, + "session_name": agent.session_name, + } + except Exception as e: + # Use a more appropriate status code for errors that might not be "not found" + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Failed to rename session: {str(e)}") + + +@agents_router.post("/{agent_id}/sessions/{session_id}/auto-rename", response_model=dict) +async def auto_rename_session(agent_id: AgentType, session_id: str, user_id: Optional[str] = None): + """ + POST /agents/{agent_id}/sessions/{session_id}/auto-rename + + Automatically renames a session using the LLM based on conversation context. + + Args: + session_id: The session ID to auto-rename + user_id: Optional user ID + agent_id: Optional agent type + + Returns: + Updated session information with the auto-generated name + """ + logger.debug(f"AutoRenameSessionRequest: agent_id={agent_id}, session_id={session_id}") + + try: + agent: Agent = get_agent(user_id=user_id, agent_id=agent_id, session_id=session_id) + agent.auto_rename_session() + + return { + "session_id": agent.session_id, + "session_name": agent.session_name, + } + except Exception as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Failed to auto-rename session: {str(e)}") + + +@agents_router.post("/{agent_id}/load-knowledge", status_code=status.HTTP_200_OK) +async def load_knowledge(agent_id: AgentType, user_id: Optional[str] = None, recreate: bool = False): + """ + POST /agents/{agent_id}/load-knowledge + + Loads the knowledge base for a specific agent. Please update the Agent Knowledge Base with the required data before calling this endpoint. Example: PDFUrlKnowledgeBase, CSVKnowledgeBase, etc. + + Args: + agent_id: The agent type + user_id: Optional user ID + recreate: Whether to recreate the knowledge base + Returns: + Confirmation message + """ + logger.debug(f"LoadKnowledgeRequest: agent_id={agent_id}, user_id={user_id}") + + try: + agent: Agent = get_agent(user_id=user_id, agent_id=agent_id) + logger.debug(f"Agent: {agent.knowledge}") + if agent.knowledge is not None: + agent.knowledge.load(recreate=recreate) + return {"message": "Knowledge Base Loaded"} + except Exception as e: + logger.error(f"Failed to load knowledge base: {str(e)}") + # Consider more specific error handling here - not all exceptions should be 404 + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Failed to load knowledge base: {str(e)}")