Skip to content

Commit

Permalink
Otel exporter (#529)
Browse files Browse the repository at this point in the history
* feat(session): integrate OpenTelemetry for event tracing

refactor(session): simplify jwt check in session method

* remove deprecated `header` params from HttpClient

Signed-off-by: Teo <[email protected]>

* refactor(http_client): use `header`

* Move SessionExporter to `session`

Signed-off-by: Teo <[email protected]>

* draft

Signed-off-by: Teo <[email protected]>

* refactor(session): update session tracer provider management

* rmv sess. tests

* deactivate console exporter

Signed-off-by: Teo <[email protected]>

* tests: mock processor to use linear simple span processor (no batch)

* revert setup fixture and extra tests

Signed-off-by: Teo <[email protected]>

* refactor(session): restore otel_exporter for span processing

* test: Enhance assertions in session tests

* refactor(session): reorder tracer initialization after session start

* revert TestSingleSession.test_add_tags

Signed-off-by: Teo <[email protected]>

* revert TestSingleSession.test_session

Signed-off-by: Teo <[email protected]>

* fix: add missing api_key to endpoint calls

* revert: Session._start_session

Signed-off-by: Teo <[email protected]>

* match order of execution with original / ease of comparison

Signed-off-by: Teo <[email protected]>

* remove queue attr

Signed-off-by: Teo <[email protected]>

* refactor(session): persisted span processor attr;

* flush_now flag

Signed-off-by: Teo <[email protected]>

* small improves

Signed-off-by: Teo <[email protected]>

* Improve general lifecycle management of OTEL

Signed-off-by: Teo <[email protected]>

* Removed class-level state from SessionExporterEach Session now gets its own TracerProvider instance;; Shutdown flag is now instance-level

Signed-off-by: Teo <[email protected]>

* 14 errors

Signed-off-by: Teo <[email protected]>

* 13 errors

Signed-off-by: Teo <[email protected]>

* 1 error

Signed-off-by: Teo <[email protected]>

* 0 error

Signed-off-by: Teo <[email protected]>

* Cleanup deps

Signed-off-by: Teo <[email protected]>

* refactored code for `get_analytics` method merged in `main`

* tests for the `get_analytics` method

* linting

* oops

* add tests targeting SessionExporter

Failing: test_export_with_missing_timestamp
Signed-off-by: Teo <[email protected]>

* SessionExporter: Added default value using current UTC time when end_timestamp is None

Signed-off-by: Teo <[email protected]>

* Moved timestamp handling earlier in the process, before OpenTelemetry validation

Signed-off-by: Teo <[email protected]>

* test: add test for exporting LLM event handling

* test: add test for handling missing event ID in export

* add session url

* feat(HttpClient): add session management and header preparation

* feat(HttpClient): Cache host env

* refactor(HttpClient): improve session management and headers

* feat(session): add static resource management in exporter

delete(tests): remove conftest.py test configuration file

refactor(session): remove unused resource and tracer provider methods

* feat(session): force flush pending spans on session end

* replace core manual test

* remove log flag

* refactor(client): simplify host_env retrieval logic

* refactor(http_client): covnert _prepare_headers to classmethod

* ruff

Signed-off-by: Teo <[email protected]>

* added cost param to LLMEvent (how was this not here before??)

* fix autogen. Added costs and completions

* remove prints

* better completion grabber

* revert autogen line

* updated autogen completions

* black fixes

* Revert "updated autogen completions"

This reverts commit 8d542c0.

* ruff format

* revert notebook

* revert math notebook

* revert ollama notebook

* revert anthropic notebook

---------

Signed-off-by: Teo <[email protected]>
Co-authored-by: Pratyush Shukla <[email protected]>
Co-authored-by: reibs <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent 0ce29b3 commit 76d3b7e
Show file tree
Hide file tree
Showing 15 changed files with 2,326 additions and 1,751 deletions.
27 changes: 17 additions & 10 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
Client: Provides methods to interact with the AgentOps service.
"""

import inspect
import atexit
import inspect
import logging
import os
import signal
import sys
import threading
import traceback
from decimal import Decimal
from functools import cached_property
from typing import List, Optional, Tuple, Union
from uuid import UUID, uuid4
from typing import Optional, List, Union, Tuple

from termcolor import colored

from .event import Event, ErrorEvent
from .singleton import (
conditional_singleton,
)
from .session import Session, active_sessions
from .config import Configuration
from .event import ErrorEvent, Event
from .host_env import get_host_env
from .llms import LlmTracker
from .log_config import logger
from .meta_client import MetaClient
from .config import Configuration
from .llms import LlmTracker
from .session import Session, active_sessions
from .singleton import conditional_singleton


@conditional_singleton
Expand All @@ -39,6 +39,7 @@ def __init__(self):
self._sessions: List[Session] = active_sessions
self._config = Configuration()
self._pre_init_queue = {"agents": []}
self._host_env = None # Cache host env data

self.configure(
api_key=os.environ.get("AGENTOPS_API_KEY"),
Expand Down Expand Up @@ -111,6 +112,7 @@ def initialize(self) -> Union[Session, None]:
def _initialize_autogen_logger(self) -> None:
try:
import autogen

from .partners.autogen_logger import AutogenLogger

autogen.runtime_logging.start(logger=AutogenLogger())
Expand Down Expand Up @@ -224,7 +226,7 @@ def start_session(
session = Session(
session_id=session_id,
tags=list(session_tags),
host_env=get_host_env(self._config.env_data_opt_out),
host_env=self.host_env,
config=self._config,
)

Expand Down Expand Up @@ -430,3 +432,8 @@ def api_key(self):
@property
def parent_key(self):
return self._config.parent_key

@cached_property
def host_env(self):
"""Cache and reuse host environment data"""
return get_host_env(self._config.env_data_opt_out)
1 change: 1 addition & 0 deletions agentops/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class LLMEvent(Event):
prompt_tokens: Optional[int] = None
completion: Union[str, object] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
model: Optional[str] = None


Expand Down
108 changes: 75 additions & 33 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from enum import Enum
from typing import Optional
from requests.adapters import Retry, HTTPAdapter
from typing import Optional, Dict, Any

import requests
from requests.adapters import HTTPAdapter, Retry
import json

from .exceptions import ApiServerException

Expand Down Expand Up @@ -54,33 +56,79 @@ def get_status(code: int) -> HttpStatus:


class HttpClient:
@staticmethod
_session: Optional[requests.Session] = None

@classmethod
def get_session(cls) -> requests.Session:
"""Get or create the global session with optimized connection pooling"""
if cls._session is None:
cls._session = requests.Session()

# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=15, # Number of connection pools
pool_maxsize=256, # Connections per pool
max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]),
)

# Mount adapter for both HTTP and HTTPS
cls._session.mount("http://", adapter)
cls._session.mount("https://", adapter)

# Set default headers
cls._session.headers.update(
{
"Connection": "keep-alive",
"Keep-Alive": "timeout=10, max=1000",
"Content-Type": "application/json",
}
)

return cls._session

@classmethod
def _prepare_headers(
cls,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
custom_headers: Optional[dict] = None,
) -> dict:
"""Prepare headers for the request"""
headers = JSON_HEADER.copy()

if api_key is not None:
headers["X-Agentops-Api-Key"] = api_key

if parent_key is not None:
headers["X-Agentops-Parent-Key"] = parent_key

if jwt is not None:
headers["Authorization"] = f"Bearer {jwt}"

if custom_headers is not None:
headers.update(custom_headers)

return headers

@classmethod
def post(
cls,
url: str,
payload: bytes,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP POST request using connection pooling"""
result = Response()
try:
# Create request session with retries configured
request_session = requests.Session()
request_session.mount(url, HTTPAdapter(max_retries=retry_config))

if api_key is not None:
JSON_HEADER["X-Agentops-Api-Key"] = api_key

if parent_key is not None:
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"

res = request_session.post(url, data=payload, headers=JSON_HEADER, timeout=20)

headers = cls._prepare_headers(api_key, parent_key, jwt, header)
session = cls.get_session()
res = session.post(url, data=payload, headers=headers, timeout=20)
result.parse(res)

except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
Expand Down Expand Up @@ -112,28 +160,22 @@ def post(

return result

@staticmethod
@classmethod
def get(
cls,
url: str,
api_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP GET request using connection pooling"""
result = Response()
try:
# Create request session with retries configured
request_session = requests.Session()
request_session.mount(url, HTTPAdapter(max_retries=retry_config))

if api_key is not None:
JSON_HEADER["X-Agentops-Api-Key"] = api_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"

res = request_session.get(url, headers=JSON_HEADER, timeout=20)

headers = cls._prepare_headers(api_key, None, jwt, header)
session = cls.get_session()
res = session.get(url, headers=headers, timeout=20)
result.parse(res)

except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
Expand Down
6 changes: 3 additions & 3 deletions agentops/llms/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def async_generator():
The raw response has the following structure:
{
'id': str, # Message ID (e.g. 'msg_018Gk9N2pcWaYLS7mxXbPD5i')
'id': str, # Message ID (e.g. 'msg_018Gk9N2pcWaYLS7mxXbPD5i')
'type': str, # Type of response (e.g. 'message')
'role': str, # Role of responder (e.g. 'assistant')
'model': str, # Model used (e.g. 'claude-3-5-sonnet-20241022')
Expand All @@ -151,7 +151,7 @@ async def async_generator():
}
Note: We import Anthropic types here since the package must be installed
for raw responses to be available; doing so in the global scope would
for raw responses to be available; doing so in the global scope would
result in dependencies error since this provider is not lazily imported (tests fail)
"""
from anthropic import APIResponse
Expand All @@ -167,7 +167,7 @@ async def async_generator():
llm_event.model = response_data["model"]
llm_event.completion = {
"role": response_data.get("role"),
"content": response_data.get("content")[0].get("text") if response_data.get("content") else "",
"content": (response_data.get("content")[0].get("text") if response_data.get("content") else ""),
}
if usage := response_data.get("usage"):
llm_event.prompt_tokens = usage.get("input_tokens")
Expand Down
8 changes: 5 additions & 3 deletions agentops/partners/autogen_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from openai.types.chat import ChatCompletion

from autogen.logger.base_logger import BaseLogger, LLMConfig
from autogen.logger.logger_utils import get_current_ts, to_dict

from agentops.enums import EndState
from agentops.helpers import get_ISO_time

from agentops import LLMEvent, ToolEvent, ActionEvent
from uuid import uuid4
Expand Down Expand Up @@ -55,17 +55,19 @@ def log_chat_completion(
start_time: str,
) -> None:
"""Records an LLMEvent to AgentOps session"""
end_time = get_current_ts()

completion = response.choices[len(response.choices) - 1]

# Note: Autogen tokens are not included in the request and function call tokens are not counted in the completion
llm_event = LLMEvent(
prompt=request["messages"],
completion=completion.message,
model=response.model,
cost=cost,
returns=completion.message.to_json(),
)
llm_event.init_timestamp = start_time
llm_event.end_timestamp = end_time
llm_event.end_timestamp = get_ISO_time()
llm_event.agent_id = self._get_agentops_id_from_agent(str(id(agent)))
agentops.record(llm_event)

Expand Down
Loading

0 comments on commit 76d3b7e

Please sign in to comment.