Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions api/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,43 @@ def _sse(handler, event, data):
handler.wfile.flush()


def _materialize_pending_user_turn_before_error(session) -> bool:
"""Persist the pending user prompt before clearing runtime stream state.

Error paths often clear ``pending_user_message`` before appending an assistant
error marker. In deferred session-save mode that pending field can be the
only durable copy of the user's current turn, so clearing it makes the user
bubble disappear on reload/reconcile. Return True when a recovered user turn
was appended.
"""
pending_text = str(getattr(session, 'pending_user_message', None) or '')
if not pending_text:
return False
normalized_pending = " ".join(pending_text.split())
if normalized_pending:
for existing in reversed(list(getattr(session, 'messages', None) or [])[-8:]):
if not isinstance(existing, dict) or existing.get('role') != 'user':
continue
existing_text = " ".join(str(existing.get('content') or '').split())
if existing_text == normalized_pending:
return False
recovered_ts = int(time.time())
pending_started_at = getattr(session, 'pending_started_at', None)
if isinstance(pending_started_at, (int, float)) and pending_started_at > 0:
recovered_ts = int(pending_started_at)
recovered = {
'role': 'user',
'content': pending_text,
'timestamp': recovered_ts,
'_recovered': True,
}
pending_attachments = getattr(session, 'pending_attachments', None)
if pending_attachments:
recovered['attachments'] = list(pending_attachments)
session.messages.append(recovered)
return True


def _last_resort_sync_from_core(session, stream_id, agent_lock):
"""Final-exit guard: if the stream exits with pending_user_message still set,
sync messages from the core transcript or add an error marker.
Expand Down Expand Up @@ -2568,6 +2605,7 @@ def _periodic_checkpoint():
# Persist the error so it survives page reload.
# _error=True ensures _sanitize_messages_for_api excludes it from
# subsequent API calls so the LLM never sees its own error as prior context.
_materialize_pending_user_turn_before_error(s)
s.active_stream_id = None
s.pending_user_message = None
s.pending_attachments = []
Expand Down Expand Up @@ -3029,6 +3067,7 @@ def _periodic_checkpoint():
# API calls so the LLM never sees its own error as prior context on the next turn.
_lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext()
with _lock_ctx:
_materialize_pending_user_turn_before_error(s)
s.active_stream_id = None
s.pending_user_message = None
s.pending_attachments = []
Expand Down
49 changes: 49 additions & 0 deletions tests/test_issue1361_cancel_data_loss.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,52 @@ def test_no_reasoning_no_tools_no_partial(self):
f"Expected no partial msg when nothing was streamed. Got partials: {partial_msgs}"
assert len(cancel_msgs) == 1, \
f"Expected exactly 1 cancel marker. Got: {cancel_msgs}"

# ── §D: Error paths must not lose pending user turn ─────────────────────────

def test_stream_error_materializes_pending_user_turn_before_clearing_runtime_state():
"""If a stream errors before normal merge, pending_user_message must become a
durable user message before the error marker is saved. Otherwise reload/server
reconcile makes the user's just-submitted prompt disappear.
"""
from api.streaming import _materialize_pending_user_turn_before_error

sid = "test_pending_error_d1"
s = _make_session(
session_id=sid,
pending_msg="please restart the WebUI",
messages=[{"role": "assistant", "content": "previous answer"}],
)
s.pending_started_at = 1778098700.0
s.pending_attachments = [{"name": "screenshot.png"}]

appended = _materialize_pending_user_turn_before_error(s)

assert appended is True
assert s.messages[-1]["role"] == "user"
assert s.messages[-1]["content"] == "please restart the WebUI"
assert s.messages[-1]["timestamp"] == 1778098700
assert s.messages[-1]["attachments"] == [{"name": "screenshot.png"}]
assert s.pending_user_message == "please restart the WebUI"


def test_stream_error_pending_materialization_does_not_duplicate_eager_checkpoint():
"""Eager session-save mode may already have checkpointed the current user turn;
the error materializer must not append the same user message again.
"""
from api.streaming import _materialize_pending_user_turn_before_error

sid = "test_pending_error_d2"
s = _make_session(
session_id=sid,
pending_msg="please restart the WebUI",
messages=[
{"role": "assistant", "content": "previous answer"},
{"role": "user", "content": "please restart the WebUI"},
],
)

appended = _materialize_pending_user_turn_before_error(s)

assert appended is False
assert [m.get("role") for m in s.messages].count("user") == 1
Loading