diff --git a/crates/ov_cli/src/client.rs b/crates/ov_cli/src/client.rs index deae228f8..11df78c2a 100644 --- a/crates/ov_cli/src/client.rs +++ b/crates/ov_cli/src/client.rs @@ -4,7 +4,6 @@ use serde_json::Value; use std::fs::File; use std::path::Path; use tempfile::{Builder, NamedTempFile}; -use url::Url; use zip::CompressionMethod; use zip::write::FileOptions; @@ -314,6 +313,34 @@ impl HttpClient { self.get("/api/v1/content/overview", ¶ms).await } + pub async fn write( + &self, + uri: &str, + content: &str, + mode: &str, + wait: bool, + timeout: Option, + ) -> Result { + let body = Self::build_write_body(uri, content, mode, wait, timeout); + self.post("/api/v1/content/write", &body).await + } + + fn build_write_body( + uri: &str, + content: &str, + mode: &str, + wait: bool, + timeout: Option, + ) -> Value { + serde_json::json!({ + "uri": uri, + "content": content, + "mode": mode, + "wait": wait, + "timeout": timeout, + }) + } + pub async fn reindex( &self, uri: &str, @@ -858,6 +885,7 @@ impl HttpClient { #[cfg(test)] mod tests { use super::HttpClient; + use serde_json::json; #[test] fn build_headers_includes_tenant_identity_headers() { @@ -897,4 +925,28 @@ mod tests { Some("alice") ); } + + #[test] + fn build_write_body_omits_removed_semantic_flags() { + let body = HttpClient::build_write_body( + "viking://resources/demo.md", + "updated", + "replace", + true, + Some(3.0), + ); + + assert_eq!( + body, + json!({ + "uri": "viking://resources/demo.md", + "content": "updated", + "mode": "replace", + "wait": true, + "timeout": 3.0, + }) + ); + assert!(body.get("regenerate_semantics").is_none()); + assert!(body.get("revectorize").is_none()); + } } diff --git a/crates/ov_cli/src/commands/content.rs b/crates/ov_cli/src/commands/content.rs index 4803bb180..e032e9802 100644 --- a/crates/ov_cli/src/commands/content.rs +++ b/crates/ov_cli/src/commands/content.rs @@ -38,6 +38,29 @@ pub async fn overview( Ok(()) } +pub async fn write( + client: &HttpClient, + uri: &str, + content: &str, + append: bool, + wait: bool, + timeout: Option, + output_format: OutputFormat, + compact: bool, +) -> Result<()> { + let result = client + .write( + uri, + content, + if append { "append" } else { "replace" }, + wait, + timeout, + ) + .await?; + crate::output::output_success(result, output_format, compact); + Ok(()) +} + pub async fn reindex( client: &HttpClient, uri: &str, diff --git a/crates/ov_cli/src/main.rs b/crates/ov_cli/src/main.rs index 3b83c2487..4963da5ec 100644 --- a/crates/ov_cli/src/main.rs +++ b/crates/ov_cli/src/main.rs @@ -321,6 +321,26 @@ enum Commands { /// Viking URI uri: String, }, + /// Write text content to an existing file + Write { + /// Viking URI + uri: String, + /// Content to write + #[arg(long, conflicts_with = "from_file")] + content: Option, + /// Read content from a local file + #[arg(long = "from-file", conflicts_with = "content")] + from_file: Option, + /// Append instead of replacing the file + #[arg(long)] + append: bool, + /// Wait for async processing to finish + #[arg(long, default_value = "false")] + wait: bool, + /// Optional wait timeout in seconds + #[arg(long)] + timeout: Option, + }, /// Reindex content at URI (regenerates .abstract.md and .overview.md) Reindex { /// Viking URI @@ -751,6 +771,15 @@ async fn main() { Commands::Read { uri } => handle_read(uri, ctx).await, Commands::Abstract { uri } => handle_abstract(uri, ctx).await, Commands::Overview { uri } => handle_overview(uri, ctx).await, + Commands::Write { + uri, + content, + from_file, + append, + wait, + timeout, + } => handle_write(uri, content, from_file, append, wait, timeout, ctx) + .await, Commands::Reindex { uri, regenerate, @@ -1186,6 +1215,35 @@ async fn handle_overview(uri: String, ctx: CliContext) -> Result<()> { commands::content::overview(&client, &uri, ctx.output_format, ctx.compact).await } +async fn handle_write( + uri: String, + content: Option, + from_file: Option, + append: bool, + wait: bool, + timeout: Option, + ctx: CliContext, +) -> Result<()> { + let client = ctx.get_client(); + let payload = match (content, from_file) { + (Some(value), None) => value, + (None, Some(path)) => std::fs::read_to_string(path) + .map_err(|e| Error::Client(format!("Failed to read --from-file: {}", e)))?, + _ => return Err(Error::Client("Specify exactly one of --content or --from-file".into())), + }; + commands::content::write( + &client, + &uri, + &payload, + append, + wait, + timeout, + ctx.output_format, + ctx.compact, + ) + .await +} + async fn handle_reindex(uri: String, regenerate: bool, wait: bool, ctx: CliContext) -> Result<()> { let client = ctx.get_client(); commands::content::reindex( @@ -1476,4 +1534,19 @@ mod tests { assert_eq!(ctx.config.user.as_deref(), Some("from-cli-user")); assert_eq!(ctx.config.agent_id.as_deref(), Some("from-cli-agent")); } + + #[test] + fn cli_write_rejects_removed_semantic_flags() { + let result = Cli::try_parse_from([ + "ov", + "write", + "viking://resources/demo.md", + "--content", + "updated", + "--no-semantics", + "--no-vectorize", + ]); + + assert!(result.is_err(), "removed write flags should not parse"); + } } diff --git a/docs/en/api/01-overview.md b/docs/en/api/01-overview.md index fe65f4aaa..8085e4cfc 100644 --- a/docs/en/api/01-overview.md +++ b/docs/en/api/01-overview.md @@ -310,6 +310,7 @@ Compact JSON with status wrapper (when `--compact` is true, which is the default | GET | `/api/v1/content/read` | Read full content (L2) | | GET | `/api/v1/content/abstract` | Read abstract (L0) | | GET | `/api/v1/content/overview` | Read overview (L1) | +| POST | `/api/v1/content/write` | Update an existing file and refresh semantics/vectors | ### Search diff --git a/docs/en/api/03-filesystem.md b/docs/en/api/03-filesystem.md index e1a762e2a..75f28ba5b 100644 --- a/docs/en/api/03-filesystem.md +++ b/docs/en/api/03-filesystem.md @@ -143,6 +143,95 @@ openviking read viking://resources/docs/api.md --- +### write() + +Update an existing file and automatically refresh related semantics and vectors. + +**Parameters** + +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| uri | str | Yes | - | Existing file URI | +| content | str | Yes | - | New content to write | +| mode | str | No | `replace` | `replace` or `append` | +| wait | bool | No | `false` | Wait for background semantic/vector refresh | +| timeout | float | No | `null` | Timeout in seconds when `wait=true` | + +**Notes** + +- Only existing files are supported; directories are rejected. +- Derived semantic files cannot be written directly: `.abstract.md`, `.overview.md`, `.relations.json`. +- The public API no longer accepts `regenerate_semantics` or `revectorize`; write always refreshes related semantics and vectors. + +**Python SDK (Embedded / HTTP)** + +```python +result = client.write( + "viking://resources/docs/api.md", + "# Updated API\n\nFresh content.", + mode="replace", + wait=True, +) +print(result["root_uri"]) +``` + +**HTTP API** + +``` +POST /api/v1/content/write +``` + +```bash +curl -X POST "http://localhost:1933/api/v1/content/write" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "uri": "viking://resources/docs/api.md", + "content": "# Updated API\n\nFresh content.", + "mode": "replace", + "wait": true + }' +``` + +**CLI** + +```bash +openviking write viking://resources/docs/api.md \ + --content "# Updated API\n\nFresh content." \ + --wait +``` + +**Response** + +```json +{ + "status": "ok", + "result": { + "uri": "viking://resources/docs/api.md", + "root_uri": "viking://resources/docs", + "context_type": "resource", + "mode": "replace", + "written_bytes": 29, + "semantic_updated": true, + "vector_updated": true, + "queue_status": { + "Semantic": { + "processed": 1, + "error_count": 0, + "errors": [] + }, + "Embedding": { + "processed": 2, + "error_count": 0, + "errors": [] + } + } + } +} +``` + +--- + ### ls() List directory contents. diff --git a/docs/zh/api/01-overview.md b/docs/zh/api/01-overview.md index e85dcb816..436572193 100644 --- a/docs/zh/api/01-overview.md +++ b/docs/zh/api/01-overview.md @@ -311,6 +311,7 @@ openviking -o json ls viking://resources/ | GET | `/api/v1/content/read` | 读取完整内容(L2) | | GET | `/api/v1/content/abstract` | 读取摘要(L0) | | GET | `/api/v1/content/overview` | 读取概览(L1) | +| POST | `/api/v1/content/write` | 修改已有文件并自动刷新语义与向量 | ### 搜索 diff --git a/docs/zh/api/03-filesystem.md b/docs/zh/api/03-filesystem.md index 75c94f9a4..f16af6cd1 100644 --- a/docs/zh/api/03-filesystem.md +++ b/docs/zh/api/03-filesystem.md @@ -143,6 +143,95 @@ openviking read viking://resources/docs/api.md --- +### write() + +修改一个已存在的文件,并自动刷新相关语义与向量。 + +**参数** + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| uri | str | 是 | - | 已存在文件的 URI | +| content | str | 是 | - | 要写入的新内容 | +| mode | str | 否 | `replace` | `replace` 或 `append` | +| wait | bool | 否 | `false` | 是否等待后台语义/向量刷新完成 | +| timeout | float | 否 | `null` | 当 `wait=true` 时的超时时间(秒) | + +**说明** + +- 只支持已存在文件;目录会被拒绝。 +- 不允许直接写入派生语义文件:`.abstract.md`、`.overview.md`、`.relations.json`。 +- 公共 API 已不再接受 `regenerate_semantics` 或 `revectorize`;写入后一定会自动刷新相关语义与向量。 + +**Python SDK (Embedded / HTTP)** + +```python +result = client.write( + "viking://resources/docs/api.md", + "# Updated API\n\nFresh content.", + mode="replace", + wait=True, +) +print(result["root_uri"]) +``` + +**HTTP API** + +``` +POST /api/v1/content/write +``` + +```bash +curl -X POST "http://localhost:1933/api/v1/content/write" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "uri": "viking://resources/docs/api.md", + "content": "# Updated API\n\nFresh content.", + "mode": "replace", + "wait": true + }' +``` + +**CLI** + +```bash +openviking write viking://resources/docs/api.md \ + --content "# Updated API\n\nFresh content." \ + --wait +``` + +**响应** + +```json +{ + "status": "ok", + "result": { + "uri": "viking://resources/docs/api.md", + "root_uri": "viking://resources/docs", + "context_type": "resource", + "mode": "replace", + "written_bytes": 29, + "semantic_updated": true, + "vector_updated": true, + "queue_status": { + "Semantic": { + "processed": 1, + "error_count": 0, + "errors": [] + }, + "Embedding": { + "processed": 2, + "error_count": 0, + "errors": [] + } + } + } +} +``` + +--- + ### ls() 列出目录内容。 diff --git a/openviking/async_client.py b/openviking/async_client.py index c57e9b8e5..15e0eac8d 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -373,6 +373,26 @@ async def read(self, uri: str, offset: int = 0, limit: int = -1) -> str: await self._ensure_initialized() return await self._client.read(uri, offset=offset, limit=limit) + async def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + await self._ensure_initialized() + return await self._client.write( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + async def ls(self, uri: str, **kwargs) -> List[Any]: """ List directory contents. diff --git a/openviking/client/local.py b/openviking/client/local.py index 4a45a0383..a118c2735 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -227,6 +227,33 @@ async def overview(self, uri: str) -> str: """Read L1 overview.""" return await self._service.fs.overview(uri, ctx=self._ctx) + async def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + execution = await run_with_telemetry( + operation="content.write", + telemetry=telemetry, + fn=lambda: self._service.fs.write( + uri=uri, + content=content, + ctx=self._ctx, + mode=mode, + wait=wait, + timeout=timeout, + ), + ) + return attach_telemetry_payload( + execution.result, + execution.telemetry, + ) + # ============= Search ============= async def find( diff --git a/openviking/server/routers/content.py b/openviking/server/routers/content.py index d95fdeea0..7f2931595 100644 --- a/openviking/server/routers/content.py +++ b/openviking/server/routers/content.py @@ -7,12 +7,14 @@ from fastapi import APIRouter, Body, Depends, Query from fastapi.responses import Response as FastAPIResponse -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict from openviking.server.auth import get_request_context from openviking.server.dependencies import get_service from openviking.server.identity import RequestContext from openviking.server.models import ErrorInfo, Response +from openviking.server.telemetry import run_operation +from openviking.telemetry import TelemetryRequest from openviking_cli.utils import get_logger logger = get_logger(__name__) @@ -28,6 +30,19 @@ class ReindexRequest(BaseModel): wait: bool = True +class WriteContentRequest(BaseModel): + """Request to write or append text content to an existing file.""" + + model_config = ConfigDict(extra="forbid") + + uri: str + content: str + mode: str = "replace" + wait: bool = False + timeout: float | None = None + telemetry: TelemetryRequest = False + + router = APIRouter(prefix="/api/v1/content", tags=["content"]) @@ -105,6 +120,32 @@ async def download( ) +@router.post("/write") +async def write( + request: WriteContentRequest = Body(...), + _ctx: RequestContext = Depends(get_request_context), +): + """Write text content to an existing file and refresh semantics/vectors.""" + service = get_service() + execution = await run_operation( + operation="content.write", + telemetry=request.telemetry, + fn=lambda: service.fs.write( + uri=request.uri, + content=request.content, + ctx=_ctx, + mode=request.mode, + wait=request.wait, + timeout=request.timeout, + ), + ) + return Response( + status="ok", + result=execution.result, + telemetry=execution.telemetry, + ).model_dump(exclude_none=True) + + @router.post("/reindex") async def reindex( request: ReindexRequest = Body(...), diff --git a/openviking/service/fs_service.py b/openviking/service/fs_service.py index f6e763fdf..336d2dcb2 100644 --- a/openviking/service/fs_service.py +++ b/openviking/service/fs_service.py @@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional from openviking.server.identity import RequestContext +from openviking.storage.content_write import ContentWriteCoordinator from openviking.storage.viking_fs import VikingFS from openviking_cli.exceptions import NotInitializedError from openviking_cli.utils import get_logger @@ -185,3 +186,24 @@ async def read_file_bytes(self, uri: str, ctx: RequestContext) -> bytes: """Read file as raw bytes.""" viking_fs = self._ensure_initialized() return await viking_fs.read_file_bytes(uri, ctx=ctx) + + async def write( + self, + uri: str, + content: str, + ctx: RequestContext, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + """Write to an existing file and refresh semantics/vectors.""" + viking_fs = self._ensure_initialized() + coordinator = ContentWriteCoordinator(viking_fs=viking_fs) + return await coordinator.write( + uri=uri, + content=content, + ctx=ctx, + mode=mode, + wait=wait, + timeout=timeout, + ) diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index 9fdb6fa72..ae09e8219 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -227,6 +227,8 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, embedding_msg: Optional[EmbeddingMsg] = None collector = None + report_success = False + report_error_args: Optional[tuple[str, Optional[Dict[str, Any]]]] = None try: queue_data = json.loads(data["data"]) # Parse EmbeddingMsg from data @@ -239,14 +241,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if self._vikingdb.is_closing: logger.debug("Skip embedding dequeue during shutdown") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) - self.report_success() + report_success = True return None # Only process string messages if not isinstance(embedding_msg.message, str): logger.debug(f"Skipping non-string message type: {type(embedding_msg.message)}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) - self.report_success() + report_success = True return data # Circuit breaker: if API is known-broken, re-enqueue and wait @@ -261,10 +263,10 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if wait > 0: await asyncio.sleep(wait) await self._vikingdb.enqueue_embedding_msg(embedding_msg) - self.report_success() + report_success = True return None # No queue manager — cannot re-enqueue, drop with error - self.report_error("Circuit breaker open and no queue manager", data) + report_error_args = ("Circuit breaker open and no queue manager", data) return None # Initialize embedder if not already initialized @@ -304,7 +306,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.critical(error_msg) self._circuit_breaker.record_failure(embed_err) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(error_msg, data) + report_error_args = (error_msg, data) return None # Transient or unknown — re-enqueue for retry @@ -316,13 +318,13 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.info( f"Re-enqueued embedding message after transient error: {embedding_msg.id}" ) - self.report_success() + report_success = True return None except Exception as requeue_err: logger.error(f"Failed to re-enqueue message: {requeue_err}") self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(error_msg, data) + report_error_args = (error_msg, data) return None # Add dense vector @@ -333,7 +335,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}" logger.error(error_msg) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(error_msg, data) + report_error_args = (error_msg, data) return None # Add sparse vector if present @@ -346,7 +348,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, error_msg = "Embedder not initialized, skipping vector generation" logger.warning(error_msg) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(error_msg, data) + report_error_args = (error_msg, data) return None # Write to vector database @@ -375,28 +377,28 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if self._vikingdb.is_closing: logger.debug(f"Skip embedding write during shutdown: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) - self.report_success() + report_success = True return None logger.error(f"Failed to write to vector database: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(str(db_err), data) + report_error_args = (str(db_err), data) return None except Exception as db_err: if self._vikingdb.is_closing: logger.debug(f"Skip embedding write during shutdown: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) - self.report_success() + report_success = True return None logger.error(f"Failed to write to vector database: {db_err}") import traceback traceback.print_exc() self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(str(db_err), data) + report_error_args = (str(db_err), data) return None self._merge_request_stats(embedding_msg.telemetry_id, processed=1) - self.report_success() + report_success = True self._circuit_breaker.record_success() return inserted_data @@ -407,7 +409,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, traceback.print_exc() if embedding_msg is not None: self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) - self.report_error(str(e), data) + report_error_args = (str(e), data) return None finally: if embedding_msg and embedding_msg.semantic_msg_id: @@ -418,3 +420,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, await tracker.decrement(embedding_msg.semantic_msg_id) except Exception as tracker_err: logger.warning(f"Failed to decrement embedding tracker: {tracker_err}") + if report_error_args is not None: + self.report_error(*report_error_args) + elif report_success: + self.report_success() diff --git a/openviking/storage/content_write.py b/openviking/storage/content_write.py new file mode 100644 index 000000000..2c5fc8ace --- /dev/null +++ b/openviking/storage/content_write.py @@ -0,0 +1,420 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Coordinator for content write operations.""" + +from __future__ import annotations + +import os +from typing import Any, Dict, Optional + +from openviking.server.identity import RequestContext +from openviking.session.memory.utils.content import deserialize_full, serialize_with_metadata +from openviking.storage.queuefs import SemanticMsg, get_queue_manager +from openviking.storage.queuefs.semantic_processor import SemanticProcessor +from openviking.storage.transaction import get_lock_manager +from openviking.storage.viking_fs import VikingFS +from openviking.telemetry import get_current_telemetry +from openviking.telemetry.resource_summary import build_queue_status_payload +from openviking.utils.embedding_utils import vectorize_file +from openviking_cli.exceptions import DeadlineExceededError, InvalidArgumentError, NotFoundError +from openviking_cli.utils import VikingURI +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + +_DERIVED_FILENAMES = frozenset({".abstract.md", ".overview.md", ".relations.json"}) + + +class ContentWriteCoordinator: + """Write an existing file and trigger downstream maintenance.""" + + def __init__(self, viking_fs: VikingFS): + self._viking_fs = viking_fs + + async def write( + self, + *, + uri: str, + content: str, + ctx: RequestContext, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + normalized_uri = VikingURI.normalize(uri) + self._validate_mode(mode) + self._validate_target_uri(normalized_uri) + + stat = await self._safe_stat(normalized_uri, ctx=ctx) + if stat.get("isDir"): + raise InvalidArgumentError(f"write only supports existing files, got directory: {uri}") + + context_type = self._context_type_for_uri(normalized_uri) + root_uri = await self._resolve_root_uri(normalized_uri, ctx=ctx) + written_bytes = len(content.encode("utf-8")) + + if context_type == "memory": + return await self._write_memory_with_refresh( + uri=normalized_uri, + root_uri=root_uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + ctx=ctx, + written_bytes=written_bytes, + ) + + lock_manager = get_lock_manager() + handle = lock_manager.create_handle() + lock_path = self._viking_fs._uri_to_path(root_uri, ctx=ctx) + acquired = await lock_manager.acquire_subtree(handle, lock_path) + if not acquired: + await lock_manager.release(handle) + raise InvalidArgumentError( + f"resource is busy and cannot be written now: {normalized_uri}" + ) + + temp_root_uri = "" + lock_transferred = False + try: + temp_root_uri, temp_target_uri = await self._prepare_temp_write( + uri=normalized_uri, + root_uri=root_uri, + content=content, + mode=mode, + ctx=ctx, + ) + await self._enqueue_semantic_refresh( + temp_root_uri=temp_root_uri, + target_root_uri=root_uri, + temp_target_uri=temp_target_uri, + context_type=context_type, + ctx=ctx, + lifecycle_lock_handle_id=handle.id, + ) + lock_transferred = True + queue_status = await self._wait_for_queues(timeout=timeout) if wait else None + return { + "uri": normalized_uri, + "root_uri": root_uri, + "context_type": context_type, + "mode": mode, + "written_bytes": written_bytes, + "semantic_updated": True, + "vector_updated": True, + "queue_status": queue_status, + } + except Exception: + if not lock_transferred and temp_root_uri: + try: + await self._viking_fs.delete_temp(temp_root_uri, ctx=ctx) + except Exception: + logger.debug("Failed to clean temp tree after write failure", exc_info=True) + if not lock_transferred: + await lock_manager.release(handle) + raise + + def _validate_mode(self, mode: str) -> None: + if mode not in {"replace", "append"}: + raise InvalidArgumentError(f"unsupported write mode: {mode}") + + def _validate_target_uri(self, uri: str) -> None: + name = uri.rstrip("/").split("/")[-1] + if name in _DERIVED_FILENAMES: + raise InvalidArgumentError(f"cannot write derived semantic file directly: {uri}") + + parsed = VikingURI(uri) + if parsed.scope not in {"resources", "user", "agent"}: + raise InvalidArgumentError(f"write is not supported for scope: {parsed.scope}") + + async def _safe_stat(self, uri: str, *, ctx: RequestContext) -> Dict[str, Any]: + try: + return await self._viking_fs.stat(uri, ctx=ctx) + except Exception as exc: + if isinstance(exc, NotFoundError): + raise + raise NotFoundError(uri, "file") from exc + + async def _write_in_place( + self, + uri: str, + content: str, + *, + mode: str, + ctx: RequestContext, + ) -> None: + if mode == "replace" and self._context_type_for_uri(uri) == "memory": + existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + _, metadata = deserialize_full(existing_raw) + if metadata: + content = serialize_with_metadata(content, metadata) + await self._viking_fs.write_file(uri, content, ctx=ctx) + return + + if mode == "append": + existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + existing_content, metadata = deserialize_full(existing_raw) + updated_content = existing_content + content + if metadata: + updated_raw = serialize_with_metadata(updated_content, metadata) + else: + updated_raw = updated_content + await self._viking_fs.write_file(uri, updated_raw, ctx=ctx) + return + await self._viking_fs.write_file(uri, content, ctx=ctx) + + async def _prepare_temp_write( + self, + *, + uri: str, + root_uri: str, + content: str, + mode: str, + ctx: RequestContext, + ) -> tuple[str, str]: + temp_base = self._viking_fs.create_temp_uri() + await self._viking_fs.mkdir(temp_base, exist_ok=True, ctx=ctx) + root_name = root_uri.rstrip("/").split("/")[-1] + temp_root_uri = f"{temp_base.rstrip('/')}/{root_name}" + await self._copy_tree(root_uri, temp_root_uri, ctx=ctx) + + temp_target_uri = self._translate_to_temp_uri( + uri=uri, root_uri=root_uri, temp_root_uri=temp_root_uri + ) + await self._write_in_place(temp_target_uri, content, mode=mode, ctx=ctx) + return temp_root_uri, temp_target_uri + + async def _copy_tree(self, src_uri: str, dst_uri: str, *, ctx: RequestContext) -> None: + stat = await self._safe_stat(src_uri, ctx=ctx) + if not stat.get("isDir"): + raise InvalidArgumentError(f"incremental write root must be a directory: {src_uri}") + + await self._viking_fs.mkdir(dst_uri, exist_ok=True, ctx=ctx) + entries = await self._viking_fs.ls( + src_uri, output="original", show_all_hidden=True, ctx=ctx + ) + for entry in entries: + name = entry.get("name", "") + if not name or name in {".", ".."}: + continue + src_child = VikingURI(src_uri).join(name).uri + dst_child = VikingURI(dst_uri).join(name).uri + if entry.get("isDir", False): + await self._copy_tree(src_child, dst_child, ctx=ctx) + continue + content = await self._viking_fs.read_file_bytes(src_child, ctx=ctx) + await self._viking_fs.write_file_bytes(dst_child, content, ctx=ctx) + + def _translate_to_temp_uri(self, *, uri: str, root_uri: str, temp_root_uri: str) -> str: + if uri == root_uri: + return temp_root_uri + prefix = root_uri.rstrip("/") + "/" + if not uri.startswith(prefix): + raise InvalidArgumentError(f"uri {uri} is not inside write root {root_uri}") + relative = uri[len(prefix) :] + return f"{temp_root_uri.rstrip('/')}/{relative}" + + async def _enqueue_semantic_refresh( + self, + *, + temp_root_uri: str, + target_root_uri: str, + temp_target_uri: str, + context_type: str, + ctx: RequestContext, + lifecycle_lock_handle_id: str, + ) -> None: + queue_manager = get_queue_manager() + semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC, allow_create=True) + telemetry = get_current_telemetry() + msg = SemanticMsg( + uri=temp_root_uri, + target_uri=target_root_uri, + context_type=context_type, + account_id=ctx.account_id, + user_id=ctx.user.user_id, + agent_id=ctx.user.agent_id, + role=ctx.role.value, + skip_vectorization=False, + telemetry_id=telemetry.telemetry_id if telemetry.enabled else "", + lifecycle_lock_handle_id=lifecycle_lock_handle_id, + changes={"modified": [temp_target_uri]}, + ) + await semantic_queue.enqueue(msg) + + async def _enqueue_memory_refresh( + self, + *, + root_uri: str, + modified_uri: str, + ctx: RequestContext, + lifecycle_lock_handle_id: str, + ) -> None: + queue_manager = get_queue_manager() + semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC, allow_create=True) + telemetry = get_current_telemetry() + msg = SemanticMsg( + uri=root_uri, + context_type="memory", + account_id=ctx.account_id, + user_id=ctx.user.user_id, + agent_id=ctx.user.agent_id, + role=ctx.role.value, + skip_vectorization=False, + telemetry_id=telemetry.telemetry_id if telemetry.enabled else "", + lifecycle_lock_handle_id=lifecycle_lock_handle_id, + changes={"modified": [modified_uri]}, + ) + await semantic_queue.enqueue(msg) + + async def _wait_for_queues(self, *, timeout: Optional[float]) -> Dict[str, Any]: + queue_manager = get_queue_manager() + try: + status = await queue_manager.wait_complete(timeout=timeout) + except TimeoutError as exc: + raise DeadlineExceededError("queue processing", timeout) from exc + return build_queue_status_payload(status) + + async def _vectorize_single_file( + self, + uri: str, + *, + context_type: str, + ctx: RequestContext, + ) -> None: + parent = VikingURI(uri).parent + if parent is None: + raise InvalidArgumentError(f"file has no parent directory: {uri}") + summary_dict = await self._summary_dict_for_vectorize( + uri, context_type=context_type, ctx=ctx + ) + await vectorize_file( + file_path=uri, + summary_dict=summary_dict, + parent_uri=parent.uri, + context_type=context_type, + ctx=ctx, + ) + + async def _summary_dict_for_vectorize( + self, + uri: str, + *, + context_type: str, + ctx: RequestContext, + ) -> Dict[str, str]: + file_name = os.path.basename(uri) + if context_type != "memory": + return {"name": file_name} + + try: + processor = SemanticProcessor(max_concurrent_llm=1) + return await processor._generate_single_file_summary(uri, ctx=ctx) + except Exception: + logger.warning( + "Failed to generate summary for memory write vector refresh: %s", + uri, + exc_info=True, + ) + return {"name": file_name} + + async def _write_memory_with_refresh( + self, + *, + uri: str, + root_uri: str, + content: str, + mode: str, + wait: bool, + timeout: Optional[float], + ctx: RequestContext, + written_bytes: int, + ) -> Dict[str, Any]: + lock_manager = get_lock_manager() + handle = lock_manager.create_handle() + lock_path = self._viking_fs._uri_to_path(root_uri, ctx=ctx) + acquired = await lock_manager.acquire_subtree(handle, lock_path) + if not acquired: + await lock_manager.release(handle) + raise InvalidArgumentError(f"resource is busy and cannot be written now: {uri}") + + lock_transferred = False + try: + await self._write_in_place(uri, content, mode=mode, ctx=ctx) + await self._vectorize_single_file(uri, context_type="memory", ctx=ctx) + await self._enqueue_memory_refresh( + root_uri=root_uri, + modified_uri=uri, + ctx=ctx, + lifecycle_lock_handle_id=handle.id, + ) + lock_transferred = True + queue_status = await self._wait_for_queues(timeout=timeout) if wait else None + return { + "uri": uri, + "root_uri": root_uri, + "context_type": "memory", + "mode": mode, + "written_bytes": written_bytes, + "semantic_updated": True, + "vector_updated": True, + "queue_status": queue_status, + } + except Exception: + if not lock_transferred: + await lock_manager.release(handle) + raise + + async def _resolve_root_uri(self, uri: str, *, ctx: RequestContext) -> str: + parsed = VikingURI(uri) + parts = [part for part in parsed.full_path.split("/") if part] + if not parts: + raise InvalidArgumentError(f"invalid write uri: {uri}") + + root_uri = uri + if parts[0] == "resources": + if len(parts) >= 2: + root_uri = VikingURI.build("resources", parts[1]) + elif parts[0] == "user": + try: + memories_idx = parts.index("memories") + except ValueError as exc: + raise InvalidArgumentError( + f"write only supports memory files under user scope: {uri}" + ) from exc + if len(parts) <= memories_idx + 1: + raise InvalidArgumentError( + f"memory write target must be inside a memory type directory: {uri}" + ) + root_uri = VikingURI.build(*parts[: memories_idx + 2]) + elif parts[0] == "agent": + if len(parts) >= 3 and parts[1] == "skills": + root_uri = VikingURI.build(*parts[:3]) + else: + try: + memories_idx = parts.index("memories") + except ValueError as exc: + raise InvalidArgumentError( + f"write only supports memory or skill files under agent scope: {uri}" + ) from exc + if len(parts) <= memories_idx + 1: + raise InvalidArgumentError( + f"memory write target must be inside a memory type directory: {uri}" + ) + root_uri = VikingURI.build(*parts[: memories_idx + 2]) + + stat = await self._safe_stat(root_uri, ctx=ctx) + if not stat.get("isDir"): + parent = VikingURI(uri).parent + if parent is None: + raise InvalidArgumentError(f"could not resolve write root for {uri}") + root_uri = parent.uri + return root_uri + + def _context_type_for_uri(self, uri: str) -> str: + if "/memories/" in uri: + return "memory" + if "/skills/" in uri or uri.startswith("viking://agent/skills/"): + return "skill" + return "resource" diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index 3eac0905f..fb45e9feb 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -125,6 +125,7 @@ async def sync_diff_callback() -> None: self._target_uri, ctx=self._ctx, file_change_status=self._file_change_status, + lifecycle_lock_handle_id=self._lifecycle_lock_handle_id, ) logger.info( f"[SyncDiff] Diff computed: " diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 042b0128f..a069c0bf9 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -237,6 +237,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, """Process dequeued SemanticMsg, recursively process all subdirectories.""" msg: Optional[SemanticMsg] = None collector = None + release_lock_in_finally = True try: import json @@ -306,6 +307,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, is_code_repo=msg.is_code_repo, ) self._dag_executor = executor + if msg.lifecycle_lock_handle_id: + # The DAG owns lifecycle lock release after this point. + release_lock_in_finally = False await executor.run(msg.uri) self._cache_dag_stats( msg.telemetry_id, @@ -351,7 +355,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, finally: # Safety net: release lifecycle lock if still held (e.g. on exception # before the DAG executor took ownership) - if msg and msg.lifecycle_lock_handle_id: + if release_lock_in_finally and msg and msg.lifecycle_lock_handle_id: try: from openviking.storage.transaction import get_lock_manager @@ -412,6 +416,8 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: entries = await viking_fs.ls(dir_uri, ctx=ctx) except Exception as e: logger.warning(f"Failed to list memory directory {dir_uri}: {e}") + if msg.lifecycle_lock_handle_id: + await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return file_paths: List[str] = [] @@ -425,6 +431,8 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: if not file_paths: logger.info(f"No memory files found in {dir_uri}") + if msg.lifecycle_lock_handle_id: + await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return file_summaries: List[Dict[str, str]] = [] @@ -497,17 +505,34 @@ async def _gen(idx: int, file_path: str) -> None: logger.info(f"Generated abstract.md and overview.md for {dir_uri}") except Exception as e: logger.error(f"Failed to write abstract/overview for {dir_uri}: {e}") + if msg.lifecycle_lock_handle_id: + await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return - await self._vectorize_directory( - uri=dir_uri, - context_type="memory", - abstract=abstract, - overview=overview, - ctx=ctx, - semantic_msg_id=msg.id, - ) - logger.info(f"Vectorized abstract.md and overview.md for {dir_uri}") + try: + await self._vectorize_directory( + uri=dir_uri, + context_type="memory", + abstract=abstract, + overview=overview, + ctx=ctx, + semantic_msg_id=msg.id, + ) + logger.info(f"Vectorized abstract.md and overview.md for {dir_uri}") + finally: + if msg.lifecycle_lock_handle_id: + await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) + + async def _release_memory_lifecycle_lock(self, handle_id: str) -> None: + """Release a lifecycle lock held by in-place memory refresh.""" + try: + from openviking.storage.transaction import get_lock_manager + + handle = get_lock_manager().get_handle(handle_id) + if handle: + await get_lock_manager().release(handle) + except Exception as e: + logger.warning(f"[SemanticProcessor] Failed to release memory lifecycle lock: {e}") async def _sync_topdown_recursive( self, @@ -515,9 +540,15 @@ async def _sync_topdown_recursive( target_uri: str, ctx: Optional[RequestContext] = None, file_change_status: Optional[Dict[str, bool]] = None, + lifecycle_lock_handle_id: str = "", ) -> DiffResult: viking_fs = get_viking_fs() diff = DiffResult() + lock_handle = None + if lifecycle_lock_handle_id: + from openviking.storage.transaction import get_lock_manager + + lock_handle = get_lock_manager().get_handle(lifecycle_lock_handle_id) async def list_children(dir_uri: str) -> Tuple[Dict[str, str], Dict[str, str]]: files: Dict[str, str] = {} @@ -546,7 +577,12 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: target_files, target_dirs = await list_children(target_dir) try: - await viking_fs._mv_vector_store_l0_l1(root_dir, target_dir, ctx=ctx) + await viking_fs._mv_vector_store_l0_l1( + root_dir, + target_dir, + ctx=ctx, + lock_handle=lock_handle, + ) except Exception as e: logger.error( f"[SyncDiff] Failed to move L0/L1 index: {root_dir} -> {target_dir}, error={e}" @@ -560,7 +596,12 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if root_file and name in target_dirs: target_conflict_dir = target_dirs[name] try: - await viking_fs.rm(target_conflict_dir, recursive=True, ctx=ctx) + await viking_fs.rm( + target_conflict_dir, + recursive=True, + ctx=ctx, + lock_handle=lock_handle, + ) diff.deleted_dirs.append(target_conflict_dir) target_dirs.pop(name, None) except Exception as e: @@ -571,7 +612,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if target_file and name in root_dirs and not root_file: try: - await viking_fs.rm(target_file, ctx=ctx) + await viking_fs.rm(target_file, ctx=ctx, lock_handle=lock_handle) diff.deleted_files.append(target_file) target_files.pop(name, None) except Exception as e: @@ -582,7 +623,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if target_file and not root_file: try: - await viking_fs.rm(target_file, ctx=ctx) + await viking_fs.rm(target_file, ctx=ctx, lock_handle=lock_handle) diff.deleted_files.append(target_file) except Exception as e: logger.error(f"[SyncDiff] Failed to delete file: {target_file}, error={e}") @@ -605,13 +646,18 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if changed: diff.updated_files.append(root_file) try: - await viking_fs.rm(target_file, ctx=ctx) + await viking_fs.rm(target_file, ctx=ctx, lock_handle=lock_handle) except Exception as e: logger.error( f"[SyncDiff] Failed to remove old file before update: {target_file}, error={e}" ) try: - await viking_fs.mv(root_file, target_file, ctx=ctx) + await viking_fs.mv( + root_file, + target_file, + ctx=ctx, + lock_handle=lock_handle, + ) except Exception as e: logger.error( f"[SyncDiff] Failed to move updated file: {root_file} -> {target_file}, error={e}" @@ -622,7 +668,12 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: diff.added_files.append(root_file) target_file_uri = VikingURI(target_dir).join(name).uri try: - await viking_fs.mv(root_file, target_file_uri, ctx=ctx) + await viking_fs.mv( + root_file, + target_file_uri, + ctx=ctx, + lock_handle=lock_handle, + ) except Exception as e: logger.error( f"[SyncDiff] Failed to move added file: {root_file} -> {target_file_uri}, error={e}" @@ -636,7 +687,11 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if root_subdir and name in target_files: target_conflict_file = target_files[name] try: - await viking_fs.rm(target_conflict_file, ctx=ctx) + await viking_fs.rm( + target_conflict_file, + ctx=ctx, + lock_handle=lock_handle, + ) diff.deleted_files.append(target_conflict_file) target_files.pop(name, None) except Exception as e: @@ -647,7 +702,12 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if target_subdir and not root_subdir: try: - await viking_fs.rm(target_subdir, recursive=True, ctx=ctx) + await viking_fs.rm( + target_subdir, + recursive=True, + ctx=ctx, + lock_handle=lock_handle, + ) diff.deleted_dirs.append(target_subdir) except Exception as e: logger.error( @@ -659,7 +719,12 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: diff.added_dirs.append(root_subdir) target_subdir_uri = VikingURI(target_dir).join(name).uri try: - await viking_fs.mv(root_subdir, target_subdir_uri, ctx=ctx) + await viking_fs.mv( + root_subdir, + target_subdir_uri, + ctx=ctx, + lock_handle=lock_handle, + ) except Exception as e: logger.error( f"[SyncDiff] Failed to move added directory: {root_subdir} -> {target_subdir_uri}, error={e}" @@ -675,7 +740,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if parent_uri: await viking_fs.mkdir(parent_uri.uri, exist_ok=True, ctx=ctx) diff.added_dirs.append(root_uri) - await viking_fs.mv(root_uri, target_uri, ctx=ctx) + await viking_fs.mv(root_uri, target_uri, ctx=ctx, lock_handle=lock_handle) return diff await sync_dir(root_uri, target_uri) diff --git a/openviking/storage/transaction/lock_context.py b/openviking/storage/transaction/lock_context.py index e60525d09..01eee15b0 100644 --- a/openviking/storage/transaction/lock_context.py +++ b/openviking/storage/transaction/lock_context.py @@ -23,16 +23,22 @@ def __init__( lock_mode: str = "point", mv_dst_parent_path: Optional[str] = None, src_is_dir: bool = True, + handle: Optional[LockHandle] = None, ): self._manager = lock_manager self._paths = paths self._lock_mode = lock_mode self._mv_dst_parent_path = mv_dst_parent_path self._src_is_dir = src_is_dir - self._handle: Optional[LockHandle] = None + self._handle: Optional[LockHandle] = handle + self._owns_handle = handle is None + self._locks_before: list[str] = [] + self._acquired_lock_paths: list[str] = [] async def __aenter__(self) -> LockHandle: - self._handle = self._manager.create_handle() + if self._handle is None: + self._handle = self._manager.create_handle() + self._locks_before = list(self._handle.locks) success = False if self._lock_mode == "subtree": @@ -55,8 +61,15 @@ async def __aenter__(self) -> LockHandle: if not success: break + self._acquired_lock_paths = [ + lock_path for lock_path in self._handle.locks if lock_path not in self._locks_before + ] + if not success: - await self._manager.release(self._handle) + if self._owns_handle: + await self._manager.release(self._handle) + else: + await self._manager.release_selected(self._handle, self._acquired_lock_paths) raise LockAcquisitionError( f"Failed to acquire {self._lock_mode} lock for {self._paths}" ) @@ -64,5 +77,8 @@ async def __aenter__(self) -> LockHandle: async def __aexit__(self, exc_type, exc_val, exc_tb): if self._handle: - await self._manager.release(self._handle) + if self._owns_handle: + await self._manager.release(self._handle) + else: + await self._manager.release_selected(self._handle, self._acquired_lock_paths) return False diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index 5748ef69d..28878fe5e 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -160,6 +160,9 @@ async def release(self, handle: LockHandle) -> None: await self._path_lock.release(handle) self._handles.pop(handle.id, None) + async def release_selected(self, handle: LockHandle, lock_paths: List[str]) -> None: + await self._path_lock.release_selected(handle, lock_paths) + async def _stale_cleanup_loop(self) -> None: """Check and release leaked handles every 60 s (in-process safety net).""" while self._running: diff --git a/openviking/storage/transaction/path_lock.py b/openviking/storage/transaction/path_lock.py index 0f62a68cf..867afc064 100644 --- a/openviking/storage/transaction/path_lock.py +++ b/openviking/storage/transaction/path_lock.py @@ -116,6 +116,26 @@ async def _verify_lock_ownership(self, lock_path: str, owner_id: str) -> bool: lock_owner, _, _ = _parse_fencing_token(token) return lock_owner == owner_id + async def _owned_lock_type(self, path: str, owner: LockOwner) -> Optional[str]: + lock_path = self._get_lock_path(path) + if lock_path not in owner.locks: + return None + token = self._read_token(lock_path) + if token is None: + return None + lock_owner, _, lock_type = _parse_fencing_token(token) + if lock_owner != owner.id: + return None + return lock_type + + async def _has_owned_ancestor_subtree(self, path: str, owner: LockOwner) -> bool: + current = path.rstrip("/") + while current: + if await self._owned_lock_type(current, owner) == LOCK_TYPE_SUBTREE: + return True + current = self._get_parent_path(current) or "" + return False + async def _remove_lock_file(self, lock_path: str) -> bool: try: self._agfs.rm(lock_path) @@ -174,12 +194,22 @@ async def _scan_descendants_for_locks(self, path: str, exclude_owner_id: str) -> logger.warning(f"Failed to scan descendants of {path}: {e}") return None - async def acquire_point(self, path: str, owner: LockOwner, timeout: Optional[float] = 0.0) -> bool: + async def acquire_point( + self, path: str, owner: LockOwner, timeout: Optional[float] = 0.0 + ) -> bool: owner_id = owner.id lock_path = self._get_lock_path(path) + owned_lock_type = await self._owned_lock_type(path, owner) + if owned_lock_type in {LOCK_TYPE_POINT, LOCK_TYPE_SUBTREE}: + owner.add_lock(lock_path) + logger.debug(f"[POINT] Reusing owned lock on: {path}") + return True + if await self._has_owned_ancestor_subtree(path, owner): + logger.debug(f"[POINT] Reusing owned ancestor SUBTREE lock on: {path}") + return True if timeout is None: # 无限等待 - deadline = float('inf') + deadline = float("inf") else: # 有限超时 deadline = asyncio.get_running_loop().time() + timeout @@ -255,12 +285,22 @@ async def acquire_point(self, path: str, owner: LockOwner, timeout: Optional[flo logger.debug(f"[POINT] Lock acquired: {lock_path}") return True - async def acquire_subtree(self, path: str, owner: LockOwner, timeout: Optional[float] = 0.0) -> bool: + async def acquire_subtree( + self, path: str, owner: LockOwner, timeout: Optional[float] = 0.0 + ) -> bool: owner_id = owner.id lock_path = self._get_lock_path(path) + owned_lock_type = await self._owned_lock_type(path, owner) + if owned_lock_type == LOCK_TYPE_SUBTREE: + owner.add_lock(lock_path) + logger.debug(f"[SUBTREE] Reusing owned SUBTREE lock on: {path}") + return True + if await self._has_owned_ancestor_subtree(path, owner): + logger.debug(f"[SUBTREE] Reusing owned ancestor SUBTREE lock on: {path}") + return True if timeout is None: # 无限等待 - deadline = float('inf') + deadline = float("inf") else: # 有限超时 deadline = asyncio.get_running_loop().time() + timeout @@ -419,3 +459,12 @@ async def release(self, owner: LockOwner) -> None: owner.remove_lock(lock_path) logger.debug(f"Released {lock_count} locks for owner {owner.id}") + + async def release_selected(self, owner: LockOwner, lock_paths: list[str]) -> None: + for lock_path in reversed(lock_paths): + if lock_path not in owner.locks: + continue + if not await self._verify_lock_ownership(lock_path, owner.id): + continue + await self._remove_lock_file(lock_path) + owner.remove_lock(lock_path) diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index de830a6a0..d9ed13d42 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -33,6 +33,7 @@ from openviking_cli.utils.uri import VikingURI if TYPE_CHECKING: + from openviking.storage.transaction.lock_handle import LockHandle from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend from openviking_cli.utils.config import RerankConfig @@ -358,7 +359,11 @@ async def mkdir( self.agfs.mkdir(path) async def rm( - self, uri: str, recursive: bool = False, ctx: Optional[RequestContext] = None + self, + uri: str, + recursive: bool = False, + ctx: Optional[RequestContext] = None, + lock_handle: Optional["LockHandle"] = None, ) -> Dict[str, Any]: """Delete file/directory + recursively update vector index. @@ -397,7 +402,12 @@ async def rm( lock_mode = "point" try: - async with LockContext(get_lock_manager(), lock_paths, lock_mode=lock_mode): + async with LockContext( + get_lock_manager(), + lock_paths, + lock_mode=lock_mode, + handle=lock_handle, + ): uris_to_delete = await self._collect_uris(path, recursive, ctx=ctx) uris_to_delete.append(target_uri) await self._delete_from_vector_store(uris_to_delete, ctx=ctx) @@ -411,6 +421,7 @@ async def mv( old_uri: str, new_uri: str, ctx: Optional[RequestContext] = None, + lock_handle: Optional["LockHandle"] = None, ) -> Dict[str, Any]: """Move file/directory + recursively update vector index. @@ -441,6 +452,7 @@ async def mv( lock_mode="mv", mv_dst_parent_path=dst_parent, src_is_dir=is_dir, + handle=lock_handle, ): uris_to_move = await self._collect_uris(old_path, recursive=True, ctx=ctx) uris_to_move.append(target_uri) @@ -1409,6 +1421,7 @@ async def _mv_vector_store_l0_l1( old_uri: str, new_uri: str, ctx: Optional[RequestContext] = None, + lock_handle: Optional["LockHandle"] = None, ) -> None: from openviking.storage.errors import LockAcquisitionError, ResourceBusyError from openviking.storage.transaction import LockContext, get_lock_manager @@ -1451,6 +1464,7 @@ async def _mv_vector_store_l0_l1( lock_mode="mv", mv_dst_parent_path=dst_parent, src_is_dir=True, + handle=lock_handle, ): await self._update_vector_store_uris( uris=[old_dir], diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 5b7e89ea4..bfd4f7aed 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -197,6 +197,27 @@ def read(self, uri: str, offset: int = 0, limit: int = -1) -> str: """Read file""" return run_async(self._async_client.read(uri, offset=offset, limit=limit)) + def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + return run_async( + self._async_client.write( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + ) + def ls(self, uri: str, **kwargs) -> List[Any]: """ List directory contents. diff --git a/openviking_cli/client/base.py b/openviking_cli/client/base.py index 89d85f5e7..aa8703b09 100644 --- a/openviking_cli/client/base.py +++ b/openviking_cli/client/base.py @@ -133,6 +133,19 @@ async def overview(self, uri: str) -> str: """Read L1 overview (.overview.md).""" ... + @abstractmethod + async def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + ... + # ============= Search ============= @abstractmethod diff --git a/openviking_cli/client/http.py b/openviking_cli/client/http.py index f17047e3a..ace0d1dac 100644 --- a/openviking_cli/client/http.py +++ b/openviking_cli/client/http.py @@ -552,6 +552,32 @@ async def overview(self, uri: str) -> str: ) return self._handle_response(response) + async def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + telemetry = self._validate_telemetry(telemetry) + uri = VikingURI.normalize(uri) + response = await self._http.post( + "/api/v1/content/write", + json={ + "uri": uri, + "content": content, + "mode": mode, + "wait": wait, + "timeout": timeout, + "telemetry": telemetry, + }, + ) + response_data = self._handle_response_data(response) + return self._attach_telemetry(response_data.get("result") or {}, response_data) + # ============= Search ============= async def find( diff --git a/openviking_cli/client/sync_http.py b/openviking_cli/client/sync_http.py index a75b7368c..f43b8f9fe 100644 --- a/openviking_cli/client/sync_http.py +++ b/openviking_cli/client/sync_http.py @@ -330,6 +330,27 @@ def overview(self, uri: str) -> str: """Read L1 overview.""" return run_async(self._async_client.overview(uri)) + def write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Write text content to an existing file and refresh semantics/vectors.""" + return run_async( + self._async_client.write( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + ) + # ============= Relations ============= def relations(self, uri: str) -> List[Dict[str, Any]]: diff --git a/tests/api_test/api/client.py b/tests/api_test/api/client.py index 509ac7128..c4254ce0b 100644 --- a/tests/api_test/api/client.py +++ b/tests/api_test/api/client.py @@ -327,10 +327,25 @@ def fs_read(self, uri: str) -> requests.Response: url = self._build_url(self.server_url, endpoint, params) return self._request_with_retry("GET", url) - def fs_write(self, uri: str, content: str) -> requests.Response: + def fs_write( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + ) -> requests.Response: endpoint = "/api/v1/content/write" url = self._build_url(self.server_url, endpoint) - return self._request_with_retry("POST", url, json={"uri": uri, "content": content}) + payload = { + "uri": uri, + "content": content, + "mode": mode, + "wait": wait, + } + if timeout is not None: + payload["timeout"] = timeout + return self._request_with_retry("POST", url, json=payload) def fs_rm(self, uri: str, recursive: bool = False) -> requests.Response: endpoint = "/api/v1/fs" diff --git a/tests/api_test/tools/checkers/check_endpoints.py b/tests/api_test/tools/checkers/check_endpoints.py index ca297c240..3fc8121a1 100644 --- a/tests/api_test/tools/checkers/check_endpoints.py +++ b/tests/api_test/tools/checkers/check_endpoints.py @@ -71,19 +71,26 @@ def check_endpoints(): except Exception as e: print(f"2. /api/v1/fs/ls: ERROR - {e}") - # Check fs_write (will likely fail but let's try) - try: - import uuid - - test_uri = f"viking://user/test-{uuid.uuid4()[:8]}.md" - response = client.fs_write(test_uri, "test content") - print(f"\n7. /api/v1/content/write: {response.status_code}") - if response.status_code == 200: - print(f" Response: {response.json()}") - else: - print(f" Response text: {response.text}") - except Exception as e: - print(f"7. /api/v1/content/write: ERROR - {e}") + # Check fs_write only when the caller provides an existing file URI. + write_check_uri = os.getenv("OPENVIKING_WRITE_CHECK_URI") + if write_check_uri: + try: + response = client.fs_write( + write_check_uri, + "endpoint write smoke check", + wait=False, + ) + print(f"\n7. /api/v1/content/write: {response.status_code}") + if response.status_code == 200: + print(f" Response: {response.json()}") + else: + print(f" Response text: {response.text}") + except Exception as e: + print(f"7. /api/v1/content/write: ERROR - {e}") + else: + print( + "\n7. /api/v1/content/write: SKIPPED - set OPENVIKING_WRITE_CHECK_URI to an existing file" + ) print("\n" + "=" * 80) diff --git a/tests/client/test_filesystem.py b/tests/client/test_filesystem.py index b4611bbad..82ed1e66e 100644 --- a/tests/client/test_filesystem.py +++ b/tests/client/test_filesystem.py @@ -3,9 +3,11 @@ """Filesystem operation tests""" +from unittest.mock import AsyncMock + import pytest -from openviking import AsyncOpenViking +from openviking import AsyncOpenViking, OpenViking class TestLs: @@ -115,3 +117,34 @@ async def test_tree_specific_directory(self, client_with_resource): tree = await client.tree(parent_uri) assert isinstance(tree, (list, dict)) + + +async def test_sync_openviking_write_updates_existing_file(test_data_dir, sample_markdown_file): + """Sync OpenViking exposes write() and delegates to the async client.""" + await AsyncOpenViking.reset() + client = OpenViking(path=str(test_data_dir)) + + try: + client._async_client.write = AsyncMock(return_value={"uri": "viking://resources/demo.md"}) + + write_result = client.write( + "viking://resources/demo.md", + "updated content", + mode="append", + wait=True, + timeout=3.0, + telemetry=False, + ) + + assert write_result == {"uri": "viking://resources/demo.md"} + client._async_client.write.assert_awaited_once_with( + uri="viking://resources/demo.md", + content="updated content", + mode="append", + wait=True, + timeout=3.0, + telemetry=False, + ) + finally: + client.close() + await AsyncOpenViking.reset() diff --git a/tests/client/test_http_client_local_upload.py b/tests/client/test_http_client_local_upload.py index 1bed80a8b..006f8086b 100644 --- a/tests/client/test_http_client_local_upload.py +++ b/tests/client/test_http_client_local_upload.py @@ -3,6 +3,8 @@ """Tests for client-side temp uploads when using localhost URLs.""" +import json + import pytest from openviking_cli.client.http import AsyncHTTPClient @@ -17,6 +19,36 @@ async def post(self, path, json=None, files=None): return object() +@pytest.fixture(autouse=True) +def isolated_ovcli_config(tmp_path, monkeypatch): + config_path = tmp_path / "ovcli.conf" + config_path.write_text(json.dumps({"url": "http://localhost:1933"})) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(config_path)) + + +@pytest.mark.asyncio +async def test_write_omits_removed_semantic_flags_from_http_payload(tmp_path, monkeypatch): + client = AsyncHTTPClient(url="http://localhost:1933") + fake_http = _FakeHTTPClient() + client._http = fake_http + client._handle_response_data = lambda _response: { + "result": {"uri": "viking://resources/demo.md"} + } + + await client.write("viking://resources/demo.md", "updated", wait=True) + + call = fake_http.calls[-1] + assert call["path"] == "/api/v1/content/write" + assert call["json"] == { + "uri": "viking://resources/demo.md", + "content": "updated", + "mode": "replace", + "wait": True, + "timeout": None, + "telemetry": False, + } + + @pytest.mark.asyncio async def test_add_skill_uploads_local_file_even_when_url_is_localhost(tmp_path): skill_file = tmp_path / "SKILL.md" diff --git a/tests/server/test_api_content_write.py b/tests/server/test_api_content_write.py new file mode 100644 index 000000000..cea70741f --- /dev/null +++ b/tests/server/test_api_content_write.py @@ -0,0 +1,115 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for content write endpoint.""" + +import pytest + + +async def _first_file_uri(client, root_uri: str) -> str: + resp = await client.get( + "/api/v1/fs/ls", + params={"uri": root_uri, "simple": True, "recursive": True, "output": "original"}, + ) + assert resp.status_code == 200 + children = resp.json().get("result", []) + assert children + return children[0] + + +async def test_write_endpoint_registered(client): + resp = await client.get("/api/v1/content/write") + assert resp.status_code == 405 + + +async def test_write_rejects_directory_uri(client_with_resource): + client, uri = client_with_resource + resp = await client.post( + "/api/v1/content/write", + json={"uri": uri, "content": "new content"}, + ) + assert resp.status_code == 400 + body = resp.json() + assert body["status"] == "error" + assert body["error"]["code"] == "INVALID_ARGUMENT" + + +async def test_write_rejects_derived_file_uri(client_with_resource): + client, uri = client_with_resource + resp = await client.post( + "/api/v1/content/write", + json={"uri": f"{uri}/.overview.md", "content": "new content"}, + ) + assert resp.status_code == 400 + body = resp.json() + assert body["status"] == "error" + assert body["error"]["code"] == "INVALID_ARGUMENT" + + +async def test_write_replaces_existing_resource_file(client_with_resource): + client, uri = client_with_resource + file_uri = await _first_file_uri(client, uri) + + write_resp = await client.post( + "/api/v1/content/write", + json={ + "uri": file_uri, + "content": "# Updated\n\nFresh content.", + "mode": "replace", + "wait": True, + }, + ) + assert write_resp.status_code == 200 + body = write_resp.json() + assert body["status"] == "ok" + assert body["result"]["uri"] == file_uri + assert body["result"]["mode"] == "replace" + + read_resp = await client.get("/api/v1/content/read", params={"uri": file_uri}) + assert read_resp.status_code == 200 + assert read_resp.json()["result"] == "# Updated\n\nFresh content." + + +async def test_write_appends_existing_resource_file(client_with_resource): + client, uri = client_with_resource + file_uri = await _first_file_uri(client, uri) + original = (await client.get("/api/v1/content/read", params={"uri": file_uri})).json()["result"] + + write_resp = await client.post( + "/api/v1/content/write", + json={ + "uri": file_uri, + "content": "\n\nAppended section.", + "mode": "append", + "wait": True, + }, + ) + assert write_resp.status_code == 200 + + read_resp = await client.get("/api/v1/content/read", params={"uri": file_uri}) + assert read_resp.status_code == 200 + assert read_resp.json()["result"] == original + "\n\nAppended section." + + +@pytest.mark.asyncio +async def test_write_missing_uri_validation(client): + resp = await client.post("/api/v1/content/write", json={"content": "missing uri"}) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_write_rejects_removed_semantic_flags(client_with_resource): + client, uri = client_with_resource + file_uri = await _first_file_uri(client, uri) + + resp = await client.post( + "/api/v1/content/write", + json={ + "uri": file_uri, + "content": "updated", + "regenerate_semantics": False, + "revectorize": False, + }, + ) + + assert resp.status_code == 422 diff --git a/tests/server/test_content_write_service.py b/tests/server/test_content_write_service.py new file mode 100644 index 000000000..6fb72c40a --- /dev/null +++ b/tests/server/test_content_write_service.py @@ -0,0 +1,264 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""Service-level tests for content write coordination.""" + +import pytest + +from openviking.server.identity import RequestContext, Role +from openviking.session.memory.utils.content import deserialize_full, serialize_with_metadata +from openviking.storage.content_write import ContentWriteCoordinator +from openviking_cli.exceptions import DeadlineExceededError, NotFoundError +from openviking_cli.session.user_id import UserIdentifier + + +@pytest.mark.asyncio +async def test_write_updates_memory_file_and_parent_overview(service): + ctx = RequestContext(user=service.user, role=Role.USER) + memory_dir = f"viking://user/{ctx.user.user_space_name()}/memories/preferences" + memory_uri = f"{memory_dir}/theme.md" + + await service.viking_fs.write_file(memory_uri, "Original preference", ctx=ctx) + + result = await service.fs.write( + memory_uri, + content="Updated preference", + ctx=ctx, + mode="replace", + wait=True, + ) + + assert result["context_type"] == "memory" + assert await service.viking_fs.read_file(memory_uri, ctx=ctx) == "Updated preference" + assert await service.viking_fs.read_file(f"{memory_dir}/.overview.md", ctx=ctx) + assert await service.viking_fs.read_file(f"{memory_dir}/.abstract.md", ctx=ctx) + + +@pytest.mark.asyncio +async def test_write_denies_foreign_user_memory_space(service): + owner_ctx = RequestContext(user=service.user, role=Role.USER) + memory_uri = ( + f"viking://user/{owner_ctx.user.user_space_name()}/memories/preferences/private-note.md" + ) + await service.viking_fs.write_file(memory_uri, "Owner note", ctx=owner_ctx) + + foreign_ctx = RequestContext( + user=UserIdentifier(owner_ctx.account_id, "other_user", owner_ctx.user.agent_id), + role=Role.USER, + ) + + with pytest.raises(NotFoundError): + await service.fs.write( + memory_uri, + content="Intruder update", + ctx=foreign_ctx, + ) + + +@pytest.mark.asyncio +async def test_memory_replace_preserves_metadata(service): + ctx = RequestContext(user=service.user, role=Role.USER) + memory_uri = f"viking://user/{ctx.user.user_space_name()}/memories/preferences/theme.md" + metadata = { + "tags": ["ui", "preference"], + "created_at": "2026-04-01T10:00:00", + "updated_at": "2026-04-01T10:05:00", + "fields": {"topic": "theme"}, + } + full_content = serialize_with_metadata("Original preference", metadata) + _, expected_metadata = deserialize_full(full_content) + await service.viking_fs.write_file(memory_uri, full_content, ctx=ctx) + + await service.fs.write( + memory_uri, + content="Updated preference", + ctx=ctx, + mode="replace", + ) + + stored = await service.viking_fs.read_file(memory_uri, ctx=ctx) + stored_content, stored_metadata = deserialize_full(stored) + + assert stored_content == "Updated preference" + assert stored_metadata == expected_metadata + + +@pytest.mark.asyncio +async def test_memory_write_vector_refresh_includes_generated_summary(monkeypatch): + file_uri = "viking://user/default/memories/preferences/theme.md" + root_uri = "viking://user/default/memories/preferences" + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + coordinator = ContentWriteCoordinator( + viking_fs=_FakeVikingFS(file_uri=file_uri, root_uri=root_uri) + ) + + captured = {} + + async def _fake_generate_single_file_summary(self, file_path, llm_sem=None, ctx=None): + del self, llm_sem, ctx + return {"name": "theme.md", "summary": f"summary for {file_path}"} + + async def _fake_vectorize_file( + *, + file_path, + summary_dict, + parent_uri, + context_type, + ctx, + semantic_msg_id=None, + use_summary=False, + ): + del ctx, semantic_msg_id, use_summary + captured["file_path"] = file_path + captured["summary_dict"] = summary_dict + captured["parent_uri"] = parent_uri + captured["context_type"] = context_type + + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_processor.SemanticProcessor._generate_single_file_summary", + _fake_generate_single_file_summary, + ) + monkeypatch.setattr( + "openviking.storage.content_write.vectorize_file", + _fake_vectorize_file, + ) + + await coordinator._vectorize_single_file(file_uri, context_type="memory", ctx=ctx) + + assert captured["file_path"] == file_uri + assert captured["parent_uri"] == root_uri + assert captured["context_type"] == "memory" + assert captured["summary_dict"] == { + "name": "theme.md", + "summary": f"summary for {file_uri}", + } + + +class _FakeHandle: + def __init__(self, handle_id: str): + self.id = handle_id + + +class _FakeLockManager: + def __init__(self): + self.handle = _FakeHandle("lock-1") + self.release_calls = [] + + def create_handle(self): + return self.handle + + async def acquire_subtree(self, handle, path): + del handle, path + return True + + async def release(self, handle): + self.release_calls.append(handle.id) + + +class _FakeVikingFS: + def __init__(self, file_uri: str, root_uri: str): + self._file_uri = file_uri + self._root_uri = root_uri + self.delete_temp_calls = [] + + async def stat(self, uri: str, ctx=None): + del ctx + if uri == self._file_uri: + return {"isDir": False} + if uri == self._root_uri: + return {"isDir": True} + raise AssertionError(f"unexpected stat uri: {uri}") + + def _uri_to_path(self, uri: str, ctx=None): + del ctx + return f"/fake/{uri.replace('://', '/').strip('/')}" + + async def delete_temp(self, temp_uri: str, ctx=None): + del ctx + self.delete_temp_calls.append(temp_uri) + + +@pytest.mark.asyncio +async def test_write_timeout_after_enqueue_does_not_release_resource_lock(monkeypatch): + file_uri = "viking://resources/demo/doc.md" + root_uri = "viking://resources/demo" + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + viking_fs = _FakeVikingFS(file_uri=file_uri, root_uri=root_uri) + coordinator = ContentWriteCoordinator(viking_fs=viking_fs) + lock_manager = _FakeLockManager() + + monkeypatch.setattr( + "openviking.storage.content_write.get_lock_manager", + lambda: lock_manager, + ) + + async def _fake_prepare_temp_write(**kwargs): + del kwargs + return "viking://temp/demo", "viking://temp/demo/doc.md" + + async def _fake_enqueue_semantic_refresh(**kwargs): + del kwargs + return None + + async def _fake_wait_for_queues(*, timeout): + raise DeadlineExceededError("queue processing", timeout) + + monkeypatch.setattr(coordinator, "_prepare_temp_write", _fake_prepare_temp_write) + monkeypatch.setattr(coordinator, "_enqueue_semantic_refresh", _fake_enqueue_semantic_refresh) + monkeypatch.setattr(coordinator, "_wait_for_queues", _fake_wait_for_queues) + + with pytest.raises(DeadlineExceededError): + await coordinator.write( + uri=file_uri, + content="updated", + ctx=ctx, + wait=True, + ) + + assert lock_manager.release_calls == [] + assert viking_fs.delete_temp_calls == [] + + +@pytest.mark.asyncio +async def test_memory_write_timeout_after_enqueue_does_not_release_lock(monkeypatch): + file_uri = "viking://user/default/memories/preferences/theme.md" + root_uri = "viking://user/default/memories/preferences" + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + viking_fs = _FakeVikingFS(file_uri=file_uri, root_uri=root_uri) + coordinator = ContentWriteCoordinator(viking_fs=viking_fs) + lock_manager = _FakeLockManager() + + monkeypatch.setattr( + "openviking.storage.content_write.get_lock_manager", + lambda: lock_manager, + ) + + async def _fake_write_in_place(uri, content, *, mode, ctx): + del uri, content, mode, ctx + return None + + async def _fake_vectorize_single_file(uri, *, context_type, ctx): + del uri, context_type, ctx + return None + + async def _fake_enqueue_memory_refresh(**kwargs): + del kwargs + return None + + async def _fake_wait_for_queues(*, timeout): + raise DeadlineExceededError("queue processing", timeout) + + monkeypatch.setattr(coordinator, "_write_in_place", _fake_write_in_place) + monkeypatch.setattr(coordinator, "_vectorize_single_file", _fake_vectorize_single_file) + monkeypatch.setattr(coordinator, "_enqueue_memory_refresh", _fake_enqueue_memory_refresh) + monkeypatch.setattr(coordinator, "_wait_for_queues", _fake_wait_for_queues) + + with pytest.raises(DeadlineExceededError): + await coordinator.write( + uri=file_uri, + content="updated", + ctx=ctx, + wait=True, + ) + + assert lock_manager.release_calls == [] diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index c10958213..59618d22c 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -1,6 +1,7 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: AGPL-3.0 +import asyncio import inspect import json from types import SimpleNamespace @@ -144,6 +145,60 @@ async def upsert(self, data, *, ctx): assert captured["data"]["parent_uri"] == "viking://resources" +@pytest.mark.asyncio +async def test_embedding_handler_marks_success_only_after_tracker_completion(monkeypatch): + class _CapturingVikingDB: + is_closing = False + mode = "local" + + async def upsert(self, _data, *, ctx): + return "rec-1" + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + decrement_started = asyncio.Event() + allow_decrement_finish = asyncio.Event() + + class _FakeTracker: + async def decrement(self, _semantic_msg_id): + decrement_started.set() + await allow_decrement_finish.wait() + return 0 + + monkeypatch.setattr( + "openviking.storage.queuefs.embedding_tracker.EmbeddingTaskTracker.get_instance", + lambda: _FakeTracker(), + ) + + handler = TextEmbeddingHandler(_CapturingVikingDB()) + status = {"success": 0, "error": 0} + handler.set_callbacks( + on_success=lambda: status.__setitem__("success", status["success"] + 1), + on_error=lambda *_: status.__setitem__("error", status["error"] + 1), + ) + + payload = _build_queue_payload() + queue_data = json.loads(payload["data"]) + queue_data["semantic_msg_id"] = "semantic-1" + payload["data"] = json.dumps(queue_data) + + task = asyncio.create_task(handler.on_dequeue(payload)) + await decrement_started.wait() + + assert status["success"] == 0 + assert status["error"] == 0 + + allow_decrement_finish.set() + await task + + assert status["success"] == 1 + assert status["error"] == 0 + + def test_context_collection_excludes_parent_uri(): schema = CollectionSchemas.context_collection("ctx", 8) diff --git a/tests/storage/test_semantic_processor_lifecycle_lock.py b/tests/storage/test_semantic_processor_lifecycle_lock.py new file mode 100644 index 000000000..9fd43e9ae --- /dev/null +++ b/tests/storage/test_semantic_processor_lifecycle_lock.py @@ -0,0 +1,88 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from __future__ import annotations + +import pytest + +from openviking.storage.queuefs.semantic_dag import DagStats +from openviking.storage.queuefs.semantic_msg import SemanticMsg +from openviking.storage.queuefs.semantic_processor import SemanticProcessor + + +class _FakeHandle: + def __init__(self, handle_id: str): + self.id = handle_id + + +class _FakeLockManager: + def __init__(self): + self._handles = {"lock-1": _FakeHandle("lock-1")} + self.release_calls = [] + + def get_handle(self, handle_id: str): + return self._handles.get(handle_id) + + async def release(self, handle): + self.release_calls.append(handle.id) + self._handles.pop(handle.id, None) + + def create_handle(self): + handle = _FakeHandle("new-lock") + self._handles[handle.id] = handle + return handle + + async def acquire_subtree(self, handle, lock_path): + del handle, lock_path + return True + + +class _FakeVikingFS: + async def exists(self, uri, ctx=None): + del uri, ctx + return False + + def _uri_to_path(self, uri, ctx=None): + del ctx + return f"/fake/{uri.replace('://', '/').strip('/')}" + + +@pytest.mark.asyncio +async def test_semantic_processor_does_not_release_lock_owned_by_dag(monkeypatch): + processor = SemanticProcessor() + lock_manager = _FakeLockManager() + + class _FakeDagExecutor: + def __init__(self, **kwargs): + self.lifecycle_lock_handle_id = kwargs.get("lifecycle_lock_handle_id", "") + + async def run(self, root_uri): + assert root_uri == "viking://resources/demo" + assert self.lifecycle_lock_handle_id == "lock-1" + + def get_stats(self): + return DagStats() + + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_processor.get_viking_fs", + lambda: _FakeVikingFS(), + ) + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_processor.SemanticDagExecutor", + lambda **kwargs: _FakeDagExecutor(**kwargs), + ) + monkeypatch.setattr( + "openviking.storage.transaction.get_lock_manager", + lambda: lock_manager, + ) + + await processor.on_dequeue( + SemanticMsg( + uri="viking://resources/demo", + context_type="resource", + recursive=False, + lifecycle_lock_handle_id="lock-1", + ).to_dict() + ) + + assert lock_manager.release_calls == [] diff --git a/tests/transaction/test_lock_context.py b/tests/transaction/test_lock_context.py index 67aa2e922..65bc1f06c 100644 --- a/tests/transaction/test_lock_context.py +++ b/tests/transaction/test_lock_context.py @@ -83,3 +83,27 @@ async def test_handle_cleaned_up_on_failure(self, lm): pass assert len(lm.get_active_handles()) == 0 + + +class TestLockContextExternalHandle: + async def test_external_handle_reuses_existing_subtree_lock(self, agfs_client, lm, test_dir): + lock_path = f"{test_dir}/{LOCK_FILE_NAME}" + + async with LockContext(lm, [test_dir], lock_mode="subtree") as handle: + before = agfs_client.cat(lock_path) + before_token = before.decode("utf-8") if isinstance(before, bytes) else before + assert ":S" in before_token + + async with LockContext(lm, [test_dir], lock_mode="point", handle=handle): + current = agfs_client.cat(lock_path) + current_token = current.decode("utf-8") if isinstance(current, bytes) else current + assert current_token == before_token + assert ":S" in current_token + + still_owned = agfs_client.cat(lock_path) + still_owned_token = ( + still_owned.decode("utf-8") if isinstance(still_owned, bytes) else still_owned + ) + assert still_owned_token == before_token + + assert _lock_file_gone(agfs_client, lock_path) diff --git a/tests/transaction/test_lock_reentrancy_unit.py b/tests/transaction/test_lock_reentrancy_unit.py new file mode 100644 index 000000000..44687e7bc --- /dev/null +++ b/tests/transaction/test_lock_reentrancy_unit.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from __future__ import annotations + +import os + +import pytest + +from openviking.storage.transaction.lock_context import LockContext +from openviking.storage.transaction.lock_manager import LockManager +from openviking.storage.transaction.path_lock import LOCK_FILE_NAME, PathLock + + +class _FakeAGFS: + def __init__(self): + self._dirs = {"/"} + self._files = {} + + def stat(self, path: str): + if path in self._dirs: + return {"isDir": True} + if path in self._files: + return {"isDir": False} + raise FileNotFoundError(path) + + def mkdir(self, path: str): + current = "" + for part in path.split("/"): + if not part: + current = "/" + continue + current = os.path.join(current, part) if current != "/" else f"/{part}" + self._dirs.add(current) + + def write(self, path: str, data: bytes): + parent = os.path.dirname(path) or "/" + self.mkdir(parent) + self._files[path] = data + + def read(self, path: str): + if path not in self._files: + raise FileNotFoundError(path) + return self._files[path] + + def rm(self, path: str, recursive: bool = False): + if path in self._files: + self._files.pop(path, None) + return + if path not in self._dirs: + raise FileNotFoundError(path) + prefix = path.rstrip("/") + "/" + has_children = any( + child.startswith(prefix) + for child in self._dirs | set(self._files.keys()) + if child != path + ) + if has_children and not recursive: + raise OSError("directory not empty") + for child in list(self._files.keys()): + if child.startswith(prefix): + self._files.pop(child, None) + for child in sorted(self._dirs, reverse=True): + if child == path or child.startswith(prefix): + self._dirs.discard(child) + + def ls(self, path: str): + if path not in self._dirs: + raise FileNotFoundError(path) + prefix = path.rstrip("/") + "/" + children = {} + for child in self._dirs: + if not child.startswith(prefix) or child == path: + continue + rest = child[len(prefix) :] + if "/" in rest or not rest: + continue + children[rest] = {"name": rest, "isDir": True} + for child in self._files: + if not child.startswith(prefix): + continue + rest = child[len(prefix) :] + if "/" in rest or not rest: + continue + children[rest] = {"name": rest, "isDir": False} + return list(children.values()) + + +@pytest.mark.asyncio +async def test_path_lock_reuses_same_owner_subtree_without_overwriting_token(): + agfs = _FakeAGFS() + agfs.mkdir("/root") + lock = PathLock(agfs) + handle = LockManager(agfs).create_handle() + + assert await lock.acquire_subtree("/root", handle, timeout=0.1) is True + lock_path = f"/root/{LOCK_FILE_NAME}" + before = agfs.read(lock_path).decode("utf-8") + assert before.endswith(":S") + + assert await lock.acquire_point("/root", handle, timeout=0.1) is True + after = agfs.read(lock_path).decode("utf-8") + assert after == before + assert after.endswith(":S") + + +@pytest.mark.asyncio +async def test_path_lock_reuses_ancestor_subtree_without_creating_child_lock(): + agfs = _FakeAGFS() + agfs.mkdir("/root") + agfs.mkdir("/root/child") + lock = PathLock(agfs) + handle = LockManager(agfs).create_handle() + + assert await lock.acquire_subtree("/root", handle, timeout=0.1) is True + assert await lock.acquire_point("/root/child", handle, timeout=0.1) is True + + with pytest.raises(FileNotFoundError): + agfs.read(f"/root/child/{LOCK_FILE_NAME}") + + +@pytest.mark.asyncio +async def test_lock_context_with_external_handle_keeps_outer_subtree_lock(): + agfs = _FakeAGFS() + agfs.mkdir("/root") + lm = LockManager(agfs=agfs, lock_timeout=0.1, lock_expire=60.0) + lock_path = f"/root/{LOCK_FILE_NAME}" + + async with LockContext(lm, ["/root"], lock_mode="subtree") as handle: + before = agfs.read(lock_path).decode("utf-8") + assert before.endswith(":S") + + async with LockContext(lm, ["/root"], lock_mode="point", handle=handle): + current = agfs.read(lock_path).decode("utf-8") + assert current == before + + still_owned = agfs.read(lock_path).decode("utf-8") + assert still_owned == before + + with pytest.raises(FileNotFoundError): + agfs.read(lock_path) diff --git a/tests/transaction/test_path_lock.py b/tests/transaction/test_path_lock.py index d1840910c..fb3e59709 100644 --- a/tests/transaction/test_path_lock.py +++ b/tests/transaction/test_path_lock.py @@ -334,3 +334,53 @@ async def test_subtree_same_path_mutual_exclusion(self, agfs_client, test_dir): ok2_retry = await lock.acquire_subtree(target, tx2, timeout=3.0) assert ok2_retry is True await lock.release(tx2) + + async def test_point_reuses_same_owner_subtree_lock_on_same_path(self, agfs_client, test_dir): + lock = PathLock(agfs_client) + tx = LockHandle(id="tx-reentrant-same-path") + + ok = await lock.acquire_subtree(test_dir, tx, timeout=3.0) + assert ok is True + + lock_path = f"{test_dir}/{LOCK_FILE_NAME}" + before = agfs_client.cat(lock_path) + before_token = before.decode("utf-8") if isinstance(before, bytes) else before + assert ":S" in before_token + + ok_reuse = await lock.acquire_point(test_dir, tx, timeout=0.5) + assert ok_reuse is True + + after = agfs_client.cat(lock_path) + after_token = after.decode("utf-8") if isinstance(after, bytes) else after + assert after_token == before_token + assert ":S" in after_token + + await lock.release(tx) + + async def test_point_under_same_owner_subtree_does_not_create_child_lock( + self, agfs_client, test_dir + ): + import uuid as _uuid + + child = f"{test_dir}/child-reentrant-{_uuid.uuid4().hex}" + agfs_client.mkdir(child) + + lock = PathLock(agfs_client) + tx = LockHandle(id="tx-reentrant-child") + + ok = await lock.acquire_subtree(test_dir, tx, timeout=3.0) + assert ok is True + + ok_child = await lock.acquire_point(child, tx, timeout=0.5) + assert ok_child is True + + child_lock_path = f"{child}/{LOCK_FILE_NAME}" + try: + agfs_client.stat(child_lock_path) + raise AssertionError("child lock should not be created when ancestor subtree is owned") + except AssertionError: + raise + except Exception: + pass + + await lock.release(tx)