Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 65 additions & 42 deletions backend/app/channels/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,38 +293,56 @@ def _accumulate_stream_text(
return buffers[message_id], message_id


def _extract_artifacts(result: dict | list) -> list[str]:
"""Extract artifact paths from the last AI response cycle only.

Instead of reading the full accumulated ``artifacts`` state (which contains
all artifacts ever produced in the thread), this inspects the messages after
the last human message and collects file paths from ``present_files`` tool
calls. This ensures only newly-produced artifacts are returned.
def _extract_artifacts(result: dict | list, thread_id: str | None = None,
channel_name: str = "",
chat_id: str = "",
topic_id: str | None = None,
store: object | None = None) -> list[str]:
"""Extract artifact paths from the accumulated artifacts state.

Reads the already-normalized virtual paths from the ``artifacts`` state
key. ``present_file_tool`` normalises every path to
``/mnt/user-data/outputs/<filename>`` before writing it to state, so
reading state guarantees that every returned path passes the
``_OUTPUTS_VIRTUAL_PREFIX`` check in ``_resolve_attachments``.

When *channel_name*, *chat_id* and *store* are provided, only returns
artifacts that have not been sent in this IM conversation (using the
ChannelStore to persist sent-artifact records that survive thread
recreation across restarts).
"""
if isinstance(result, list):
messages = result
elif isinstance(result, dict):
messages = result.get("messages", [])
raw: list[str] = []
if isinstance(result, dict):
artifacts_state = result.get("artifacts", [])
if isinstance(artifacts_state, list):
raw = [p for p in artifacts_state if isinstance(p, str)]
if channel_name and chat_id and raw and store is not None:
return _diff_artifacts(channel_name, chat_id, raw, store, topic_id=topic_id)
return raw


def _diff_artifacts(channel_name: str, chat_id: str,
raw_artifacts: list[str],
store: object,
topic_id: str | None = None) -> list[str]:
"""Return only artifacts not yet sent in this IM conversation, using store persistence.
Keyed by channel:chat_id[:topic_id] so the record survives thread recreation across restarts.
"""
sent = store.get_sent_artifacts(channel_name, chat_id, topic_id=topic_id) or set()
new_only = [p for p in raw_artifacts if p not in sent]
logger.info(
"[_diff_artifacts] channel=%s chat=%s raw=%d sent=%d new=%d paths=%s",
channel_name, chat_id, len(raw_artifacts), len(sent), len(new_only),
[p.rsplit("/", 1)[-1] for p in raw_artifacts],
)
if new_only:
sent.update(new_only)
store.set_sent_artifacts(channel_name, chat_id, sent, topic_id=topic_id)
logger.info("[_diff_artifacts] updated sent_artifacts for channel=%s chat=%s count=%d",
channel_name, chat_id, len(sent))
else:
return []

artifacts: list[str] = []
for msg in reversed(messages):
if not isinstance(msg, dict):
continue
# Stop at the last human message — anything before it is a previous turn
if msg.get("type") == "human":
break
# Look for AI messages with present_files tool calls
if msg.get("type") == "ai":
for tc in msg.get("tool_calls", []):
if isinstance(tc, dict) and tc.get("name") == "present_files":
args = tc.get("args", {})
paths = args.get("filepaths", [])
if isinstance(paths, list):
artifacts.extend(p for p in paths if isinstance(p, str))
return artifacts

logger.info("[_diff_artifacts] all %d artifacts already sent, skipping", len(raw_artifacts))
return new_only

def _format_artifact_text(artifacts: list[str]) -> str:
"""Format artifact paths into a human-readable text block listing filenames."""
Expand Down Expand Up @@ -546,12 +564,13 @@ def __init__(
) -> None:
self.bus = bus
self.store = store
self._max_concurrency = max_concurrency
self._langgraph_url = langgraph_url
self._gateway_url = gateway_url
self._assistant_id = assistant_id
self._default_session = _as_dict(default_session)
self._channel_sessions = dict(channel_sessions or {})
self._running = False
self._task: asyncio.Task | None = None


@staticmethod
def _channel_supports_streaming(channel_name: str) -> bool:
from .service import get_channel_service
self._client = None # lazy init — langgraph_sdk async client
self._csrf_token = generate_csrf_token()
self._semaphore: asyncio.Semaphore | None = None
Expand Down Expand Up @@ -759,13 +778,17 @@ async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any]

uploaded = await _ingest_inbound_files(thread_id, msg)
if uploaded:
msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip()
)

if self._channel_supports_streaming(msg.channel_name):
await self._handle_streaming_chat(
client,
msg,
thread_id,
response_text = _extract_response_text(result)
artifacts = _extract_artifacts(result, thread_id=thread_id,
channel_name=msg.channel_name,
chat_id=msg.chat_id,
topic_id=msg.topic_id,
store=self.store)

logger.info(
"[Manager] agent response received: thread_id=%s, response_len=%d, artifacts=%d",
assistant_id,
run_config,
run_context,
Expand Down
28 changes: 28 additions & 0 deletions backend/app/channels/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,34 @@ def remove(self, channel_name: str, chat_id: str, topic_id: str | None = None) -
self._save()
return True

# -- sent artifacts persistence (for _diff_artifacts) ---------------

def get_sent_artifacts(self, channel_name: str, chat_id: str, topic_id: str | None = None) -> set[str] | None:
"""Return the set of artifact paths already sent for a given conversation..

Keyed by ``channel_name:chat_id[:topic_id]`` so the record survives
thread recreation (e.g. after Gateway / DeerFlow restart).

Returns None if no records exist for this conversation (first run).
"""
key = f"sent_artifacts:{self._key(channel_name, chat_id, topic_id)}"
entry = self._data.get(key)
if entry and isinstance(entry, dict):
raw = entry.get("paths")
if isinstance(raw, list):
return set(raw)
return None

def set_sent_artifacts(self, channel_name: str, chat_id: str, paths: set[str], topic_id: str | None = None) -> None:
"""Persist the set of sent artifact paths for a given conversation."""
with self._lock:
key = f"sent_artifacts:{self._key(channel_name, chat_id, topic_id)}"
self._data[key] = {
"paths": sorted(paths),
"updated_at": __import__("time").time(),
}
self._save()

def list_entries(self, channel_name: str | None = None) -> list[dict[str, Any]]:
"""List all stored mappings, optionally filtered by channel."""
results = []
Expand Down