Skip to content

Commit 08dc4ae

Browse files
vamganVamil Gandhi
andauthored
feat: implement concurrent message reading for session managers (#897)
Replace sequential message loading with async concurrent reading in both S3SessionManager and FileSessionManager to improve performance for long conversations. Uses asyncio.gather() with run_in_executor() to read multiple messages simultaneously while maintaining proper ordering. Resolves: #874 Co-authored-by: Vamil Gandhi <[email protected]>
1 parent 428750b commit 08dc4ae

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

src/strands/session/file_session_manager.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""File-based session manager for local filesystem storage."""
22

3+
import asyncio
34
import json
45
import logging
56
import os
@@ -231,11 +232,20 @@ def list_messages(
231232
else:
232233
message_files = message_files[offset:]
233234

234-
# Load only the message files
235-
messages: list[SessionMessage] = []
236-
for filename in message_files:
235+
return asyncio.run(self._load_messages_concurrently(messages_dir, message_files))
236+
237+
async def _load_messages_concurrently(self, messages_dir: str, message_files: list[str]) -> list[SessionMessage]:
238+
"""Load multiple message files concurrently using async."""
239+
if not message_files:
240+
return []
241+
242+
async def load_message(filename: str) -> SessionMessage:
237243
file_path = os.path.join(messages_dir, filename)
238-
message_data = self._read_file(file_path)
239-
messages.append(SessionMessage.from_dict(message_data))
244+
loop = asyncio.get_event_loop()
245+
message_data = await loop.run_in_executor(None, self._read_file, file_path)
246+
return SessionMessage.from_dict(message_data)
247+
248+
tasks = [load_message(filename) for filename in message_files]
249+
messages = await asyncio.gather(*tasks)
240250

241251
return messages

src/strands/session/s3_session_manager.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""S3-based session manager for cloud storage."""
22

3+
import asyncio
34
import json
45
import logging
56
from typing import Any, Dict, List, Optional, cast
@@ -283,14 +284,23 @@ def list_messages(
283284
else:
284285
message_keys = message_keys[offset:]
285286

286-
# Load only the required message objects
287-
messages: List[SessionMessage] = []
288-
for key in message_keys:
289-
message_data = self._read_s3_object(key)
290-
if message_data:
291-
messages.append(SessionMessage.from_dict(message_data))
292-
293-
return messages
287+
# Load message objects concurrently using async
288+
return asyncio.run(self._load_messages_concurrently(message_keys))
294289

295290
except ClientError as e:
296291
raise SessionException(f"S3 error reading messages: {e}") from e
292+
293+
async def _load_messages_concurrently(self, message_keys: List[str]) -> List[SessionMessage]:
294+
"""Load multiple message objects concurrently using async."""
295+
if not message_keys:
296+
return []
297+
298+
async def load_message(key: str) -> Optional[SessionMessage]:
299+
loop = asyncio.get_event_loop()
300+
message_data = await loop.run_in_executor(None, self._read_s3_object, key)
301+
return SessionMessage.from_dict(message_data) if message_data else None
302+
303+
tasks = [load_message(key) for key in message_keys]
304+
loaded_messages = await asyncio.gather(*tasks)
305+
306+
return [msg for msg in loaded_messages if msg is not None]

0 commit comments

Comments
 (0)