Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring core session management #486

Closed
wants to merge 77 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
745b78f
feat(session): integrate OpenTelemetry for event tracing
teocns Nov 21, 2024
0f16593
remove deprecated `header` params from HttpClient
teocns Nov 22, 2024
8e3696a
refactor(http_client): use `header`
teocns Nov 22, 2024
7b1f23c
Move SessionExporter to `session`
teocns Nov 22, 2024
1156d93
draft
teocns Nov 22, 2024
96c72f3
refactor(session): update session tracer provider management
teocns Nov 22, 2024
543dc05
rmv sess. tests
teocns Nov 22, 2024
cb52558
deactivate console exporter
teocns Nov 23, 2024
ea92035
tests: mock processor to use linear simple span processor (no batch)
teocns Nov 23, 2024
de888fe
revert setup fixture and extra tests
teocns Nov 23, 2024
9ee5bb5
refactor(session): restore otel_exporter for span processing
teocns Nov 23, 2024
70e10f4
test: Enhance assertions in session tests
teocns Nov 23, 2024
e888b71
refactor(session): reorder tracer initialization after session start
teocns Nov 23, 2024
78041ca
revert TestSingleSession.test_add_tags
teocns Nov 23, 2024
b713e51
revert TestSingleSession.test_session
teocns Nov 23, 2024
2db1c5c
fix: add missing api_key to endpoint calls
teocns Nov 23, 2024
bf7593d
revert: Session._start_session
teocns Nov 23, 2024
1e10dcf
match order of execution with original / ease of comparison
teocns Nov 23, 2024
19d96f3
remove queue attr
teocns Nov 23, 2024
9b54cdb
refactor(session): persisted span processor attr;
teocns Nov 23, 2024
31b6ced
flush_now flag
teocns Nov 23, 2024
c82de5c
small improves
teocns Nov 23, 2024
b794ad9
Improve general lifecycle management of OTEL
teocns Nov 23, 2024
b68c1dc
Removed class-level state from SessionExporterEach Session now gets i…
teocns Nov 23, 2024
d0b217a
14 errors
teocns Nov 23, 2024
ff776b3
13 errors
teocns Nov 23, 2024
65f4b1c
1 error
teocns Nov 23, 2024
a4e363b
0 error
teocns Nov 23, 2024
59fb0e2
Cleanup deps
teocns Nov 23, 2024
e834aef
refactored code for `get_analytics` method merged in `main`
the-praxs Nov 23, 2024
a71807b
tests for the `get_analytics` method
the-praxs Nov 23, 2024
7198696
Merge branch 'main' into otel-exporter
the-praxs Nov 23, 2024
10aa8db
linting
the-praxs Nov 23, 2024
f549fef
oops
the-praxs Nov 23, 2024
feaa3ab
add tests targeting SessionExporter
teocns Nov 25, 2024
2333dbe
SessionExporter: Added default value using current UTC time when end_…
teocns Nov 25, 2024
8880c12
Moved timestamp handling earlier in the process, before OpenTelemetry…
teocns Nov 25, 2024
62bf845
test: add test for exporting LLM event handling
teocns Nov 25, 2024
a828b90
test: add test for handling missing event ID in export
teocns Nov 25, 2024
2934e56
add session url
areibman Nov 25, 2024
ec26373
feat(HttpClient): add session management and header preparation
teocns Nov 26, 2024
07b9985
feat(HttpClient): Cache host env
teocns Nov 26, 2024
3877595
refactor(HttpClient): improve session management and headers
teocns Nov 26, 2024
259c045
feat(session): add static resource management in exporter
teocns Nov 26, 2024
894c846
feat(session): force flush pending spans on session end
teocns Nov 26, 2024
9589e73
replace core manual test
areibman Nov 26, 2024
8f712b3
remove log flag
areibman Nov 26, 2024
4f8f1eb
refactor(client): simplify host_env retrieval logic
teocns Nov 26, 2024
bd3f774
feat(session): add session management and API interaction classes
teocns Nov 26, 2024
95e9d8a
refactor(http_client): covnert _prepare_headers to classmethod
teocns Nov 26, 2024
07a47c0
Add SessionExporterMixIn to encapsulate OTEL behavior
teocns Nov 26, 2024
6dde179
Move SessionDict to Session
teocns Nov 26, 2024
5f9f397
feat(http_client): add retry_auth for handling JWT reauthorization
teocns Nov 26, 2024
6b2815d
Add SessionProtoocol for mixin to understnad how to use Session
teocns Nov 26, 2024
65ec3a6
refactor the Session class to leverage SessionApi.
teocns Nov 26, 2024
28a8c4a
Modify the Session class to use a threading.Event for is_running
teocns Nov 26, 2024
2067732
Imports
teocns Nov 26, 2024
c42240e
move JWT management to the HttpClient level.
teocns Nov 26, 2024
9aabaf3
feat(session): add methods to manage session collection
teocns Nov 26, 2024
ad24398
refactor(session): reorganize initialization process in Session
teocns Nov 26, 2024
7d9b10d
refactor(session exporter): simplify SessionExporterMixIn class
teocns Nov 26, 2024
072e66a
imports
teocns Nov 26, 2024
59781e2
refactor(config): Make Configuration class using dataclass
teocns Nov 26, 2024
f0ba54b
HttpClient: Dedicated JWT management & tests
teocns Nov 26, 2024
a87a4d6
session/api: add tests & remove unused retry_auth decorator
teocns Nov 26, 2024
57b8bfc
refactor(session exporter): mixin, use of SessionApi...
teocns Nov 26, 2024
6e43ac1
refactor(session): simplify session initialization and methods
teocns Nov 26, 2024
50dcd78
feat(session): add thread safety to SessionsCollection methods & impl…
teocns Nov 26, 2024
65e8e1e
feat(exporter): tests + possibly correct data handling,
teocns Nov 26, 2024
c667cbb
tests(session): use time patching to simulate time wait
teocns Nov 26, 2024
b9d46af
tests(session): migrate span processor tests to exporter tests
teocns Nov 26, 2024
a678705
feat(exporter): add GenericAdapter for span attribute conversion
teocns Nov 26, 2024
5f6eda5
test: add mock HTTP requests for all tests
teocns Nov 26, 2024
189a5e9
feat: atomic decoartor
teocns Nov 29, 2024
de54e42
save
teocns Nov 29, 2024
99a452a
save
teocns Nov 29, 2024
b7dd348
fmt
teocns Nov 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
# agentops/__init__.py
import json
import sys
from typing import Optional, List, Union
import threading
from importlib.metadata import version as get_version
from typing import List, Optional, Union

from packaging import version

from .client import Client
from .event import Event, ActionEvent, LLMEvent, ToolEvent, ErrorEvent
from .decorators import record_action, track_agent, record_tool, record_function
from .decorators import record_action, record_function, record_tool, track_agent
from .event import ActionEvent, ErrorEvent, Event, LLMEvent, ToolEvent
from .helpers import check_agentops_update
from .log_config import logger
from .session import Session
import threading
from importlib.metadata import version as get_version
from packaging import version
from .session import Session, active_sessions

try:
from .partners.langchain_callback_handler import (
LangchainCallbackHandler,
AsyncLangchainCallbackHandler,
)
from .partners.langchain_callback_handler import AsyncLangchainCallbackHandler, LangchainCallbackHandler
except ModuleNotFoundError:
pass

Expand Down Expand Up @@ -150,7 +149,7 @@ def configure(
def start_session(
tags: Optional[List[str]] = None,
inherited_session_id: Optional[str] = None,
) -> Union[Session, None]:
) -> Session:
"""
Start a new session for recording events.

Expand Down Expand Up @@ -321,3 +320,24 @@ def get_session(session_id: str):
# prevents unexpected sessions on new tests
def end_all_sessions() -> None:
return Client().end_all_sessions()


def flush():
"""
Publish all pending events to API and return the flushed events

Returns:
bool: Whether the flush was successful
"""
# from agentops.session.exporter import get_tracer_provider
from opentelemetry import trace

from agentops.client import Client

# assert Client()
# Get the provider and force flush
provider = trace.get_tracer_provider()

print("@@@PROVIDER:" + str(id(provider)))
breakpoint()
return provider.force_flush() # !!
36 changes: 25 additions & 11 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,36 @@
Client: Provides methods to interact with the AgentOps service.
"""

import inspect
import atexit
import inspect
import logging
import os
import signal
import sys
import threading
import traceback
from decimal import Decimal
from functools import cached_property
from typing import List, Optional, Tuple, Union
from uuid import UUID, uuid4
from typing import Optional, List, Union, Tuple

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from termcolor import colored

from .event import Event, ErrorEvent
from .singleton import (
conditional_singleton,
)
from .session import Session, active_sessions
from .config import Configuration
from .event import ErrorEvent, Event
from .host_env import get_host_env
from .llms import LlmTracker
from .log_config import logger
from .meta_client import MetaClient
from .config import Configuration
from .llms import LlmTracker
from .session import Session, active_sessions
from .singleton import conditional_singleton

# This can only be done once, a warning will be logged if any further attempt is made.
_provider = TracerProvider()
print("@@@PROVIDER:" + str(id(_provider)))
trace.set_tracer_provider(_provider)


@conditional_singleton
Expand All @@ -39,6 +46,7 @@ def __init__(self):
self._sessions: List[Session] = active_sessions
self._config = Configuration()
self._pre_init_queue = {"agents": []}
self._host_env = None # Cache host env data

self.configure(
api_key=os.environ.get("AGENTOPS_API_KEY"),
Expand Down Expand Up @@ -111,6 +119,7 @@ def initialize(self) -> Union[Session, None]:
def _initialize_autogen_logger(self) -> None:
try:
import autogen

from .partners.autogen_logger import AutogenLogger

autogen.runtime_logging.start(logger=AutogenLogger())
Expand Down Expand Up @@ -196,7 +205,7 @@ def start_session(
self,
tags: Optional[List[str]] = None,
inherited_session_id: Optional[str] = None,
) -> Union[Session, None]:
) -> Session:
"""
Start a new session for recording events.

Expand Down Expand Up @@ -224,7 +233,7 @@ def start_session(
session = Session(
session_id=session_id,
tags=list(session_tags),
host_env=get_host_env(self._config.env_data_opt_out),
host_env=self.host_env,
config=self._config,
)

Expand Down Expand Up @@ -430,3 +439,8 @@ def api_key(self):
@property
def parent_key(self):
return self._config.parent_key

@cached_property
def host_env(self):
"""Cache and reuse host environment data"""
return get_host_env(self._config.env_data_opt_out)
111 changes: 43 additions & 68 deletions agentops/config.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,49 @@
from typing import List, Optional
from dataclasses import dataclass, field
from typing import Optional, Set
from uuid import UUID

from .log_config import logger

# TODO: Use annotations to clarify the purpose of each attribute.
# Details are defined in a docstrings found in __init__.py, but
# it's good to have those right on the fields at class definition

class Configuration:
def __init__(self):
self.api_key: Optional[str] = None
self.parent_key: Optional[str] = None
self.endpoint: str = "https://api.agentops.ai"
self.max_wait_time: int = 5000
self.max_queue_size: int = 512
self.default_tags: set[str] = set()
self.instrument_llm_calls: bool = True
self.auto_start_session: bool = True
self.skip_auto_end_session: bool = False
self.env_data_opt_out: bool = False

def configure(
self,
client,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
endpoint: Optional[str] = None,
max_wait_time: Optional[int] = None,
max_queue_size: Optional[int] = None,
default_tags: Optional[List[str]] = None,
instrument_llm_calls: Optional[bool] = None,
auto_start_session: Optional[bool] = None,
skip_auto_end_session: Optional[bool] = None,
env_data_opt_out: Optional[bool] = None,
):
if api_key is not None:
try:
UUID(api_key)
self.api_key = api_key
except ValueError:
message = f"API Key is invalid: {{{api_key}}}.\n\t Find your API key at https://app.agentops.ai/settings/projects"
client.add_pre_init_warning(message)
logger.error(message)

if parent_key is not None:
try:
UUID(parent_key)
self.parent_key = parent_key
except ValueError:
message = f"Parent Key is invalid: {parent_key}"
client.add_pre_init_warning(message)
logger.warning(message)

if endpoint is not None:
self.endpoint = endpoint

if max_wait_time is not None:
self.max_wait_time = max_wait_time

if max_queue_size is not None:
self.max_queue_size = max_queue_size

if default_tags is not None:
self.default_tags.update(default_tags)

if instrument_llm_calls is not None:
self.instrument_llm_calls = instrument_llm_calls

if auto_start_session is not None:
self.auto_start_session = auto_start_session

if skip_auto_end_session is not None:
self.skip_auto_end_session = skip_auto_end_session

if env_data_opt_out is not None:
self.env_data_opt_out = env_data_opt_out
@dataclass
class Configuration:
api_key: Optional[str] = None
parent_key: Optional[str] = None
endpoint: str = "https://api.agentops.ai"
max_wait_time: int = 5000
max_queue_size: int = 512
default_tags: Set[str] = field(default_factory=set)
instrument_llm_calls: bool = True
auto_start_session: bool = True
skip_auto_end_session: bool = False
env_data_opt_out: bool = False

def configure(self, client, **kwargs):
# Special handling for keys that need UUID validation
for key_name in ["api_key", "parent_key"]:
if key_name in kwargs and kwargs[key_name] is not None:
try:
UUID(kwargs[key_name])
setattr(self, key_name, kwargs[key_name])
except ValueError:
message = (
f"API Key is invalid: {{{kwargs[key_name]}}}.\n\t Find your API key at https://app.agentops.ai/settings/projects"
if key_name == "api_key"
else f"Parent Key is invalid: {kwargs[key_name]}"
)
client.add_pre_init_warning(message)
logger.error(message) if key_name == "api_key" else logger.warning(message)
kwargs.pop(key_name)

# Special handling for default_tags which needs update() instead of assignment
if "default_tags" in kwargs and kwargs["default_tags"] is not None:
self.default_tags.update(kwargs.pop("default_tags"))

# Handle all other attributes
for key, value in kwargs.items():
if value is not None and hasattr(self, key):
setattr(self, key, value)
97 changes: 97 additions & 0 deletions agentops/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import inspect
from types import MethodType
from typing import Optional, Union
from uuid import uuid4

Expand Down Expand Up @@ -361,3 +362,99 @@ def new_init(self, *args, **kwargs):
return obj

return decorator


# >>> multi-purpose atomic-ops decorator
import threading
import weakref
from functools import wraps
from typing import Dict, Union


class atomic:
"""A decorator class that provides thread-safe atomic operations.

This decorator ensures that decorated functions/methods execute atomically by using locks.
It supports both function and method decoration, with flexible locking strategies.

Key Features:
- Thread-safe execution of decorated functions/methods
- Supports both @atomic and @atomic() syntax
- Configurable locking scope via custom keys
- Optional reentrant locking
- Instance-specific locking for methods

Usage:
@atomic # Uses function name as lock key
def my_func():
pass

@atomic(key="custom_key") # Uses custom key for lock
def another_func():
pass

@atomic(key=lambda self: f"instance_{id(self)}") # Instance-specific locks
def method(self):
pass

Args:
key: Optional[Union[str, Callable]]. Lock key or key generator function.
If None, uses function's qualified name.
reentrant: bool. If True, allows reentrant locking. Default False.
"""

_locks = {} # Class-level registry of all locks
_registry_lock = threading.Lock() # Lock for thread-safe registry access

def __init__(self, func=None, *, key=None, reentrant=False):
# Support both @atomic and @atomic() syntax
self.func = func
self.key = key
self.reentrant = reentrant

if func is not None: # @atomic case
self._wrapped = self._wrap(func)

def __call__(self, *args, **kwargs):
if self.func is None: # @atomic() case
self.func = args[0]
self._wrapped = self._wrap(self.func)
return self._wrapped
return self._wrapped(*args, **kwargs)

def _wrap(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get instance for instance methods
instance = args[0] if args and not isinstance(args[0], type) else None

# Generate lock key
if callable(self.key) and instance:
# For instance-specific locks, use the key generator
lock_key = self.key(instance)
else:
# For shared locks or function locks, use the provided key or function qualname
lock_key = self.key or f"{func.__module__}.{func.__qualname__}"

# Get or create lock
with self._registry_lock:
if lock_key not in self._locks:
self._locks[lock_key] = threading.RLock() # THIS HAS TO BE A RLOCK NO MATTER WHAT
lock = self._locks[lock_key]

if not self.reentrant and lock._is_owned():
raise RuntimeError("Lock is not reentrant")

with lock:
return func(*args, **kwargs)

return wrapper

def __get__(self, obj, objtype=None):
"""Support descriptor protocol for instance methods"""
if obj is None:
return self
return MethodType(self._wrapped, obj)


# >>> @atomic
Loading
Loading