Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,7 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None:
try:
entries = await viking_fs.ls(dir_uri, ctx=ctx)
except Exception as e:
logger.warning(f"Failed to list memory directory {dir_uri}: {e}")
return
raise RuntimeError(f"Failed to list memory directory {dir_uri}: {e}") from e
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] (blocking) Re-raising here fixes the silent early return, but it does not actually make this failure path report an error in production. on_dequeue() still routes non-permanent exceptions through the re-enqueue branch, and classify_api_error() only recognizes 401/403/5xx/timeout patterns. That means common filesystem failures here, such as FileNotFoundError, Permission denied, or local I/O errors, are classified as unknown, re-enqueued, and ultimately counted as success instead of report_error(). So this PR removes the stuck in_progress symptom, but it does not guarantee the intended issue behavior from the PR description, and it can turn invalid memory URIs into infinite retries. Please either classify these directory read/write failures as permanent at the source, or extend the error-classification path so local filesystem failures are reported as queue errors rather than retried forever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b8b504b. Added _PERMANENT_IO_ERRORS = (FileNotFoundError, PermissionError, IsADirectoryError, NotADirectoryError) with an isinstance check at the top of classify_api_error(), so filesystem errors are classified as "permanent" and hit report_error() instead of being re-enqueued. This prevents both the infinite retry loop and the false success counting.


file_paths: List[str] = []
for entry in entries:
Expand Down Expand Up @@ -496,8 +495,7 @@ async def _gen(idx: int, file_path: str) -> None:
await viking_fs.write_file(f"{dir_uri}/.abstract.md", abstract, ctx=ctx)
logger.info(f"Generated abstract.md and overview.md for {dir_uri}")
except Exception as e:
logger.error(f"Failed to write abstract/overview for {dir_uri}: {e}")
return
raise RuntimeError(f"Failed to write abstract/overview for {dir_uri}: {e}") from e

await self._vectorize_directory(
uri=dir_uri,
Expand Down
130 changes: 130 additions & 0 deletions tests/storage/test_memory_semantic_stall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0

"""Tests for memory semantic queue stall fix (issue #864).

Ensures that _process_memory_directory() error paths propagate exceptions
so that on_dequeue() always calls report_success() or report_error().
"""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from openviking.storage.queuefs.semantic_msg import SemanticMsg
from openviking.storage.queuefs.semantic_processor import SemanticProcessor


def _make_msg(uri="viking://user/memories", context_type="memory", **kwargs):
"""Build a minimal SemanticMsg for testing."""
defaults = {
"id": "test-msg-1",
"uri": uri,
"context_type": context_type,
"recursive": False,
"role": "root",
"account_id": "acc1",
"user_id": "usr1",
"agent_id": "test-agent",
"telemetry_id": "",
"target_uri": "",
"lifecycle_lock_handle_id": "",
"changes": None,
"is_code_repo": False,
}
defaults.update(kwargs)
return SemanticMsg.from_dict(defaults)


def _build_data(msg: SemanticMsg) -> dict:
"""Wrap a SemanticMsg into the dict format on_dequeue expects."""
return msg.to_dict()


@pytest.mark.asyncio
async def test_memory_empty_dir_still_reports_success():
"""When viking_fs.ls returns an empty list, report_success() must be called."""
processor = SemanticProcessor()

fake_fs = MagicMock()
fake_fs.ls = AsyncMock(return_value=[])

msg = _make_msg()
data = _build_data(msg)

success_called = False

def on_success():
nonlocal success_called
success_called = True

error_called = False

def on_error(error_msg, error_data=None):
nonlocal error_called
error_called = True

processor.set_callbacks(on_success, on_error)

with (
patch(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
return_value=fake_fs,
),
patch(
"openviking.storage.queuefs.semantic_processor.resolve_telemetry",
return_value=None,
),
):
await processor.on_dequeue(data)

assert success_called, "report_success() was not called for empty memory directory"
assert not error_called, "report_error() should not be called for empty directory"


@pytest.mark.asyncio
async def test_memory_ls_error_reports_error():
"""When viking_fs.ls raises, report_error() must be called (not stuck)."""
processor = SemanticProcessor()

fake_fs = MagicMock()
fake_fs.ls = AsyncMock(side_effect=OSError("disk read failed"))

msg = _make_msg()
data = _build_data(msg)

success_called = False

def on_success():
nonlocal success_called
success_called = True

error_called = False
error_info = {}

def on_error(error_msg, error_data=None):
nonlocal error_called, error_info
error_called = True
error_info["msg"] = error_msg

processor.set_callbacks(on_success, on_error)

with (
patch(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
return_value=fake_fs,
),
patch(
"openviking.storage.queuefs.semantic_processor.resolve_telemetry",
return_value=None,
),
patch(
"openviking.storage.queuefs.semantic_processor.classify_api_error",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] (non-blocking) This test currently proves the desired behavior only because classify_api_error() is mocked to return "permanent". In the real code path, OSError("disk read failed") is classified as unknown, so on_dequeue() re-enqueues it instead of calling report_error(). Please add at least one test that exercises the real classifier behavior, and ideally a second one for the new write_file() failure path as well, so the tests match production semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b8b504b. Removed the classify_api_error mock — tests now use real FileNotFoundError and PermissionError which the updated classifier handles as permanent. Also added a write-failure test path and 2 new tests in test_circuit_breaker.py verifying all 4 filesystem error types + chained cause detection.

return_value="permanent",
),
):
await processor.on_dequeue(data)

assert error_called, "report_error() was not called when ls() raised an exception"
assert not success_called, "report_success() should not be called on ls() error"
assert "disk read failed" in error_info["msg"]
Loading