Skip to content

Commit

Permalink
Independent get_analytics method (#522)
Browse files Browse the repository at this point in the history
* Refactor Session class and add get_analytics function

- Refactored token cost and duration handling into reusable methods.
- Added get_analytics function to allow independent session statistics retrieval.
- Improved code readability and modularity.

* run black on session.py

* update with the latest commit

* merge `session.py` from `upstream/origin`

* Revert "merge `session.py` from `upstream/origin`"

This reverts commit 6c12755.

* add `_format_duration`, `get_response`, `_format_token_cost` and `get_analytics` methods

* add test for a single session

* linting

* Revert "linting"

This reverts commit 7d31cbb.

* add test for multiple sessions

* little cleanup

* refactored `Session` for a more intuitive approach with complete documentation

* refactored to reflect changes in `session.py`

* linting

* update docs and clean a little

* fix code group

* fix scripts path

* Revert "fix scripts path"

This reverts commit 0bc2b05.

* add output for the given code

---------

Co-authored-by: Dragutin Oreški <[email protected]>
  • Loading branch information
the-praxs and dragutin-oreski authored Nov 22, 2024
1 parent 517c93e commit 882f371
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 70 deletions.
179 changes: 112 additions & 67 deletions agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from decimal import ROUND_HALF_UP, Decimal
from termcolor import colored
from typing import Optional, List, Union
from typing import Any, Optional, List, Union
from uuid import UUID, uuid4
from datetime import datetime

Expand All @@ -15,7 +15,7 @@
from .log_config import logger
from .config import Configuration
from .helpers import get_ISO_time, filter_unjsonable, safe_serialize
from .http_client import HttpClient
from .http_client import HttpClient, Response


class Session:
Expand All @@ -24,14 +24,30 @@ class Session:
Args:
session_id (UUID): The session id is used to record particular runs.
config (Configuration): The configuration object for the session.
tags (List[str], optional): Tags that can be used for grouping or sorting later. Examples could be ["GPT-4"].
host_env (dict, optional): A dictionary containing host and environment data.
Attributes:
init_timestamp (float): The timestamp for when the session started, represented as seconds since the epoch.
end_timestamp (float, optional): The timestamp for when the session ended, represented as seconds since the epoch. This is only set after end_session is called.
end_state (str, optional): The final state of the session. Suggested: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate".
init_timestamp (str): The ISO timestamp for when the session started.
end_timestamp (str, optional): The ISO timestamp for when the session ended. Only set after end_session is called.
end_state (str, optional): The final state of the session. Options: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate".
end_state_reason (str, optional): The reason for ending the session.
session_id (UUID): Unique identifier for the session.
tags (List[str]): List of tags associated with the session for grouping and filtering.
video (str, optional): URL to a video recording of the session.
host_env (dict, optional): Dictionary containing host and environment data.
config (Configuration): Configuration object containing settings for the session.
jwt (str, optional): JSON Web Token for authentication with the AgentOps API.
token_cost (Decimal): Running total of token costs for the session.
event_counts (dict): Counter for different types of events:
- llms: Number of LLM calls
- tools: Number of tool calls
- actions: Number of actions
- errors: Number of errors
- apis: Number of API calls
session_url (str, optional): URL to view the session in the AgentOps dashboard.
is_running (bool): Flag indicating if the session is currently active.
"""

def __init__(
Expand All @@ -52,14 +68,16 @@ def __init__(
self.config = config
self.jwt = None
self.lock = threading.Lock()
self.queue = []
self.queue: List[Any] = []
self.token_cost = Decimal(0)
self.event_counts = {
"llms": 0,
"tools": 0,
"actions": 0,
"errors": 0,
"apis": 0,
}
self.session_url: Optional[str] = None

self.stop_flag = threading.Event()
self.thread = threading.Thread(target=self._run)
Expand Down Expand Up @@ -87,10 +105,11 @@ def end_session(
video: Optional[str] = None,
) -> Union[Decimal, None]:
if not self.is_running:
return
return None

if not any(end_state == state.value for state in EndState):
return logger.warning("Invalid end_state. Please use one of the EndState enums")
logger.warning("Invalid end_state. Please use one of the EndState enums")
return None

self.end_timestamp = get_ISO_time()
self.end_state = end_state
Expand All @@ -101,77 +120,28 @@ def end_session(
self.stop_flag.set()
self.thread.join(timeout=1)
self._flush_queue()

def format_duration(start_time, end_time):
start = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
duration = end - start

hours, remainder = divmod(duration.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)

parts = []
if hours > 0:
parts.append(f"{int(hours)}h")
if minutes > 0:
parts.append(f"{int(minutes)}m")
parts.append(f"{seconds:.1f}s")

return " ".join(parts)

with self.lock:
payload = {"session": self.__dict__}
try:
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not end session - {e}")

logger.debug(res.body)
token_cost = res.body.get("token_cost", "unknown")

formatted_duration = format_duration(self.init_timestamp, self.end_timestamp)

if token_cost == "unknown" or token_cost is None:
token_cost_d = Decimal(0)
else:
token_cost_d = Decimal(token_cost)

formatted_cost = (
"{:.2f}".format(token_cost_d)
if token_cost_d == 0
else "{:.6f}".format(token_cost_d.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP))
)
analytics_stats = self.get_analytics()

analytics = (
f"Session Stats - "
f"{colored('Duration:', attrs=['bold'])} {formatted_duration} | "
f"{colored('Cost:', attrs=['bold'])} ${formatted_cost} | "
f"{colored('LLMs:', attrs=['bold'])} {self.event_counts['llms']} | "
f"{colored('Tools:', attrs=['bold'])} {self.event_counts['tools']} | "
f"{colored('Actions:', attrs=['bold'])} {self.event_counts['actions']} | "
f"{colored('Errors:', attrs=['bold'])} {self.event_counts['errors']}"
f"{colored('Duration:', attrs=['bold'])} {analytics_stats['Duration']} | "
f"{colored('Cost:', attrs=['bold'])} ${analytics_stats['Cost']} | "
f"{colored('LLMs:', attrs=['bold'])} {analytics_stats['LLM calls']} | "
f"{colored('Tools:', attrs=['bold'])} {analytics_stats['Tool calls']} | "
f"{colored('Actions:', attrs=['bold'])} {analytics_stats['Actions']} | "
f"{colored('Errors:', attrs=['bold'])} {analytics_stats['Errors']}"
)
logger.info(analytics)

session_url = res.body.get(
"session_url",
f"https://app.agentops.ai/drilldown?session_id={self.session_id}",
)

logger.info(
colored(
f"\x1b[34mSession Replay: {session_url}\x1b[0m",
f"\x1b[34mSession Replay: {self.session_url}\x1b[0m",
"blue",
)
)

active_sessions.remove(self)

return token_cost_d
return self.token_cost

def add_tags(self, tags: List[str]) -> None:
"""
Expand Down Expand Up @@ -388,5 +358,80 @@ def wrapper(*args, **kwargs):

return wrapper

@staticmethod
def _format_duration(start_time, end_time):
start = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
duration = end - start

hours, remainder = divmod(duration.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)

parts = []
if hours > 0:
parts.append(f"{int(hours)}h")
if minutes > 0:
parts.append(f"{int(minutes)}m")
parts.append(f"{seconds:.1f}s")

return " ".join(parts)

def _get_response(self) -> Optional[Response]:
with self.lock:
payload = {"session": self.__dict__}
try:
response = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
)
except ApiServerException as e:
logger.error(f"Could not fetch response from server - {e}")
return None

logger.debug(response.body)
return response

def _get_token_cost(self, response: Response) -> Decimal:
token_cost = response.body.get("token_cost", "unknown")
if token_cost == "unknown" or token_cost is None:
return Decimal(0)
return Decimal(token_cost)

@staticmethod
def _format_token_cost(token_cost_d):
return (
"{:.2f}".format(token_cost_d)
if token_cost_d == 0
else "{:.6f}".format(token_cost_d.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP))
)

def get_analytics(self) -> Optional[dict[str, Union[Decimal, str]]]:
if not self.end_timestamp:
self.end_timestamp = get_ISO_time()

formatted_duration = self._format_duration(self.init_timestamp, self.end_timestamp)

response = self._get_response()
if response is None:
return None

self.token_cost = self._get_token_cost(response)
formatted_cost = self._format_token_cost(self.token_cost)

self.session_url = response.body.get(
"session_url",
f"https://app.agentops.ai/drilldown?session_id={self.session_id}",
)

return {
"LLM calls": self.event_counts["llms"],
"Tool calls": self.event_counts["tools"],
"Actions": self.event_counts["actions"],
"Errors": self.event_counts["errors"],
"Duration": formatted_duration,
"Cost": formatted_cost,
}


active_sessions: List[Session] = []
38 changes: 35 additions & 3 deletions docs/v1/concepts/sessions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ Optionally, sessions may include:

_Note: Overrides any current tags_

#### `get_analytics`
**Returns** (dict): A dictionary containing various analytics metrics for the session.


## Starting a Session
When you call `agentops.init()`, a session is automatically started.
Expand All @@ -62,7 +65,7 @@ Both `agentops.init()` and `agentops.start_session()` work as a factory pattern

## Ending a Session
If a process ends without any call to agentops, it will show in the dashboard as `Indeterminate`.
To end with a state, call either `agentops.end_session(...)` [(reference)](/v1/usage/sdk-reference/#end-session) if only one session is in use. Otherwise use `session.end_session(...)`
To end with a state, call either `agentops.end_session(...)` [(reference)](/v1/usage/sdk-reference/#end-session) if only one session is in use. Otherwise use `session.end_session(...)`.

## Inherited Sessions
When working with multiple agents running in different processes, it's possible to initialize AgentOps or start a session
Expand All @@ -71,22 +74,51 @@ with an existing session_id.
`agentops.init(inherited_session_id=<id>)`
`agentops.start_session(inherited_session_id=<id>)`

You can retrieve the current `session_id` by assigning the returned value from `init()` or `start_session()`
You can retrieve the current `session_id` by assigning the returned value from `init()` or `start_session()`.

<CodeGroup>
```python python

```python
import agentops
session = agentops.init()
# pass session.session_id to the other process
```

```python
# -- other process --
session_id = retrieve_session_id() # <-- your function
agentops.init(inherited_session_id=<id>)
```

</CodeGroup>

Both processes will now contribute data to the same session.

## Session Analytics
You can retrieve the analytics for a session by calling `session.get_analytics()`.

The example below shows how to record events and retrieve analytics.

<CodeGroup>

```python
import agentops
session = agentops.init()
session.record(ActionEvent("llms"))
session.record(ActionEvent("tools"))
analytics = session.get_analytics()
print(analytics)
session.end_session("Success")
```

The output will look like this -

```bash
{'LLM calls': 0, 'Tool calls': 0, 'Actions': 0, 'Errors': 0, 'Duration': '0.9s', 'Cost': '0.00'}
```

</CodeGroup>

## The AgentOps SDK Client
_More info for the curious_

Expand Down
Loading

0 comments on commit 882f371

Please sign in to comment.