Skip to content

Commit

Permalink
introduce SessionsCollection
Browse files Browse the repository at this point in the history
  • Loading branch information
teocns committed Nov 12, 2024
1 parent ce45bc9 commit 908a278
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 79 deletions.
23 changes: 22 additions & 1 deletion agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)


class SessionsCollection(WeakSet):
"""
A custom collection for managing Session objects that combines WeakSet's automatic cleanup
with list-like indexing capabilities.
This class is needed because:
1. We want WeakSet's automatic cleanup of unreferenced sessions
2. We need to access sessions by index (e.g., self._sessions[0]) for backwards compatibility
3. Standard WeakSet doesn't support indexing
"""

def __getitem__(self, index: int) -> Session:
"""
Enable indexing into the collection (e.g., sessions[0]).
"""
# Convert to list for indexing since sets aren't ordered
items = list(self)
return items[index]


class SessionApi:
"""
Solely focuses on interacting with the API
Expand Down Expand Up @@ -814,7 +834,8 @@ def stop(self) -> None:
if self.is_alive():
self.join(timeout=0.5)

active_sessions = WeakSet()

active_sessions = SessionsCollection()

__all__ = ["Session"]

Expand Down
148 changes: 70 additions & 78 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,86 +376,78 @@ def test_add_tags(self, mock_req):
@pytest.fixture(autouse=True)
def cleanup_sessions():
yield
# Force cleanup of any remaining sessions
for session in active_sessions: # Create a copy of the list to iterate
try:
session.stop()
except:
pass
active_sessions.clear()
active_sessions.clear() # clear is sufficient; __del__ takes care of stopping gracefully

def test_event_publisher_thread_run():
try:
# Setup
session = Session(session_id=uuid4(), config=Configuration())
active_sessions.append(session) # Add to active sessions list

# Mock the API batch method to track calls
session.api.batch = MagicMock()

# Ensure session is running before starting tests
assert session.start()

# Create some test events
test_events = [
{"id": str(uuid4()), "event_type": "test", "data": f"event_{i}"}
for i in range(5)
]

# Test Case 1: Batch size trigger
session.config.max_queue_size = 3 # Small batch size for testing

# Use session's publisher thread instead of creating new ones
publisher = session.publisher_thread

for event in test_events[:3]:
session._enqueue(event)

# Wait briefly for processing
time.sleep(0.2)

# Verify batch was published due to size
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 3
assert all(e["data"] in [f"event_{i}" for i in range(3)] for e in published_events)

# Reset for next test
session.api.batch.reset_mock()

# Test Case 2: Time trigger
session.config.max_wait_time = 100 # 100ms wait time
session._enqueue(test_events[3])

# Wait for time trigger
time.sleep(0.2)

# Verify batch was published due to time
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 1
assert published_events[0]["data"] == "event_3"

# Reset for final test
session.api.batch.reset_mock()

# Test Case 3: Empty queue trigger
session._enqueue(test_events[4])

# Wait briefly for processing
time.sleep(0.2)

# Verify batch was published
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 1
assert published_events[0]["data"] == "event_4"

finally:
# Cleanup
if session in active_sessions:
active_sessions.remove(session)
session.stop()
# Setup
session = Session(session_id=uuid4(), config=Configuration())
active_sessions.add(session) # Add to active sessions list

# Mock the API batch method to track calls
session.api.batch = MagicMock()

# Ensure session is running before starting tests
assert session.start()

# Create some test events
test_events = [
{"id": str(uuid4()), "event_type": "test", "data": f"event_{i}"}
for i in range(5)
]

# Test Case 1: Batch size trigger
session.config.max_queue_size = 3 # Small batch size for testing

# Use session's publisher thread instead of creating new ones
publisher = session.publisher_thread

for event in test_events[:3]:
session._enqueue(event)

# Wait briefly for processing
time.sleep(0.2)

# Verify batch was published due to size
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 3
assert all(e["data"] in [f"event_{i}" for i in range(3)] for e in published_events)

# Reset for next test
session.api.batch.reset_mock()

# Test Case 2: Time trigger
session.config.max_wait_time = 100 # 100ms wait time
session._enqueue(test_events[3])

# Wait for time trigger
time.sleep(0.2)

# Verify batch was published due to time
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 1
assert published_events[0]["data"] == "event_3"

# Reset for final test
session.api.batch.reset_mock()

# Test Case 3: Empty queue trigger
session._enqueue(test_events[4])

# Wait briefly for processing
time.sleep(0.2)

# Verify batch was published
session.api.batch.assert_called_once()
published_events = session.api.batch.call_args[0][0]
assert len(published_events) == 1
assert published_events[0]["data"] == "event_4"

# Cleanup
if session in active_sessions:
active_sessions.discard(session)
session.stop()

def test_event_publisher_thread_error_handling():
# Setup
Expand Down

0 comments on commit 908a278

Please sign in to comment.