diff --git a/crates/ov_cli/src/client.rs b/crates/ov_cli/src/client.rs index deae228f8..78d711581 100644 --- a/crates/ov_cli/src/client.rs +++ b/crates/ov_cli/src/client.rs @@ -328,6 +328,20 @@ impl HttpClient { self.post("/api/v1/content/reindex", &body).await } + pub async fn compress( + &self, + uri: &str, + max_abstract_length: u32, + dry_run: bool, + ) -> Result { + let body = serde_json::json!({ + "uri": uri, + "max_abstract_length": max_abstract_length, + "dry_run": dry_run, + }); + self.post("/api/v1/content/compress", &body).await + } + /// Download file as raw bytes pub async fn get_bytes(&self, uri: &str) -> Result> { let url = format!("{}/api/v1/content/download", self.base_url); diff --git a/crates/ov_cli/src/commands/content.rs b/crates/ov_cli/src/commands/content.rs index 4803bb180..388a37fe8 100644 --- a/crates/ov_cli/src/commands/content.rs +++ b/crates/ov_cli/src/commands/content.rs @@ -51,6 +51,19 @@ pub async fn reindex( Ok(()) } +pub async fn compress( + client: &HttpClient, + uri: &str, + max_abstract_length: u32, + dry_run: bool, + output_format: OutputFormat, + compact: bool, +) -> Result<()> { + let result = client.compress(uri, max_abstract_length, dry_run).await?; + crate::output::output_success(result, output_format, compact); + Ok(()) +} + pub async fn get(client: &HttpClient, uri: &str, local_path: &str) -> Result<()> { // Check if target path already exists let path = Path::new(local_path); diff --git a/crates/ov_cli/src/main.rs b/crates/ov_cli/src/main.rs index a8266b2ab..c34172085 100644 --- a/crates/ov_cli/src/main.rs +++ b/crates/ov_cli/src/main.rs @@ -332,6 +332,17 @@ enum Commands { #[arg(long, default_value = "true")] wait: bool, }, + /// Compress verbose memory abstracts in a directory + Compress { + /// Viking URI (directory to scan) + uri: String, + /// Maximum abstract length in characters + #[arg(long, default_value = "128")] + max_abstract_length: u32, + /// Preview changes without modifying anything + #[arg(long)] + dry_run: bool, + }, /// Download file to local path (supports binaries/images) Get { /// Viking URI @@ -750,6 +761,11 @@ async fn main() { regenerate, wait, } => handle_reindex(uri, regenerate, wait, ctx).await, + Commands::Compress { + uri, + max_abstract_length, + dry_run, + } => handle_compress(uri, max_abstract_length, dry_run, ctx).await, Commands::Get { uri, local_path } => handle_get(uri, local_path, ctx).await, Commands::Find { query, @@ -1193,6 +1209,24 @@ async fn handle_reindex(uri: String, regenerate: bool, wait: bool, ctx: CliConte .await } +async fn handle_compress( + uri: String, + max_abstract_length: u32, + dry_run: bool, + ctx: CliContext, +) -> Result<()> { + let client = ctx.get_client(); + commands::content::compress( + &client, + &uri, + max_abstract_length, + dry_run, + ctx.output_format, + ctx.compact, + ) + .await +} + async fn handle_get(uri: String, local_path: String, ctx: CliContext) -> Result<()> { let client = ctx.get_client(); commands::content::get(&client, &uri, &local_path).await diff --git a/openviking/server/routers/content.py b/openviking/server/routers/content.py index d95fdeea0..9373f6e54 100644 --- a/openviking/server/routers/content.py +++ b/openviking/server/routers/content.py @@ -28,6 +28,14 @@ class ReindexRequest(BaseModel): wait: bool = True +class CompressRequest(BaseModel): + """Request to compress memory abstracts in a directory.""" + + uri: str + max_abstract_length: int = 128 + dry_run: bool = False + + router = APIRouter(prefix="/api/v1/content", tags=["content"]) @@ -172,6 +180,33 @@ async def reindex( ) +@router.post("/compress") +async def compress( + request: CompressRequest = Body(...), + _ctx: RequestContext = Depends(get_request_context), +): + """Compress verbose memory abstracts in a directory. + + Scans the target directory for memory files with abstracts exceeding + max_abstract_length and truncates them. Use dry_run=True to preview + what would be compressed without modifying anything. + """ + from openviking.utils.compress_service import CompressService + + service = CompressService(max_abstract_length=request.max_abstract_length) + result = await service.compress_directory( + uri=request.uri, + ctx=_ctx, + dry_run=request.dry_run, + ) + if result.get("status") == "error": + return Response( + status="error", + error=ErrorInfo(code="COMPRESS_FAILED", message=result.get("message", "")), + ) + return Response(status="ok", result=result) + + async def _do_reindex( service, uri: str, diff --git a/openviking/utils/compress_service.py b/openviking/utils/compress_service.py new file mode 100644 index 000000000..2c4d73481 --- /dev/null +++ b/openviking/utils/compress_service.py @@ -0,0 +1,83 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Compress service for batch memory abstract reduction.""" + +from typing import Any, Dict, List + +from openviking.server.identity import RequestContext +from openviking.storage.viking_fs import get_viking_fs +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class CompressService: + """Scan a directory and re-summarize memories with abstracts exceeding a target length.""" + + def __init__(self, max_abstract_length: int = 128): + self.max_abstract_length = max_abstract_length + + async def compress_directory( + self, + uri: str, + ctx: RequestContext, + dry_run: bool = False, + ) -> Dict[str, Any]: + """Scan directory for memories with verbose abstracts and re-summarize. + + Returns stats: files_scanned, files_compressed, estimated_tokens_saved. + """ + viking_fs = get_viking_fs() + if not viking_fs: + return {"status": "error", "message": "VikingFS not available"} + + try: + entries = await viking_fs.list_directory(uri, ctx=ctx) + except Exception as e: + logger.error("Failed to list directory %s: %s", uri, e) + return {"status": "error", "message": str(e)} + + files_scanned = 0 + files_compressed = 0 + chars_saved = 0 + verbose_files: List[Dict[str, Any]] = [] + + for entry in entries: + entry_uri = entry.get("uri", "") + if not entry_uri.endswith(".md"): + continue + files_scanned += 1 + + abstract = entry.get("abstract", "") + if len(abstract) <= self.max_abstract_length: + continue + + excess = len(abstract) - self.max_abstract_length + verbose_files.append( + { + "uri": entry_uri, + "current_length": len(abstract), + "excess": excess, + } + ) + + if not dry_run: + try: + truncated = abstract[: self.max_abstract_length].rsplit(" ", 1)[0] + "..." + await viking_fs.write_metadata(entry_uri, {"abstract": truncated}, ctx=ctx) + files_compressed += 1 + chars_saved += excess + except Exception as e: + logger.warning("Failed to compress %s: %s", entry_uri, e) + else: + files_compressed += 1 + chars_saved += excess + + return { + "status": "ok", + "files_scanned": files_scanned, + "files_compressed": files_compressed, + "chars_saved": chars_saved, + "dry_run": dry_run, + "verbose_files": verbose_files[:20], + } diff --git a/tests/unit/utils/test_compress_service.py b/tests/unit/utils/test_compress_service.py new file mode 100644 index 000000000..7cb26ce6c --- /dev/null +++ b/tests/unit/utils/test_compress_service.py @@ -0,0 +1,27 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Tests for CompressService.""" + +import pytest + +from openviking.utils.compress_service import CompressService + + +def test_compress_service_init_defaults(): + service = CompressService() + assert service.max_abstract_length == 128 + + +def test_compress_service_init_custom(): + service = CompressService(max_abstract_length=256) + assert service.max_abstract_length == 256 + + +@pytest.mark.asyncio +async def test_compress_directory_no_viking_fs(monkeypatch): + """Returns error when VikingFS is not available.""" + monkeypatch.setattr("openviking.utils.compress_service.get_viking_fs", lambda: None) + service = CompressService() + result = await service.compress_directory("viking://user/memories/", ctx=None) + assert result["status"] == "error" + assert "VikingFS" in result["message"]