diff --git a/.env.example b/.env.example index 6fa2d5d5..5c5e790f 100644 --- a/.env.example +++ b/.env.example @@ -14,7 +14,7 @@ TIMEZONE=Asia/Shanghai # 1. Database Configuration (Required) # ============================================================================= # Choose your database provider: sqlite, oceanbase, postgres -DATABASE_PROVIDER=sqlite +DATABASE_PROVIDER=oceanbase # ----------------------------------------------------------------------------- # SQLite Configuration (Default - Recommended for development) @@ -27,7 +27,9 @@ SQLITE_COLLECTION=memories # ----------------------------------------------------------------------------- # OceanBase Configuration # ----------------------------------------------------------------------------- -OCEANBASE_HOST=127.0.0.1 +# Connection mode: set OCEANBASE_HOST for remote, leave empty for embedded SeekDB +OCEANBASE_HOST= +OCEANBASE_PATH=./seekdb_data OCEANBASE_PORT=2881 OCEANBASE_USER=root@sys OCEANBASE_PASSWORD=your_password @@ -35,7 +37,7 @@ OCEANBASE_DATABASE=powermem OCEANBASE_COLLECTION=memories ## Keep the default settings, as modifications are generally not needed. -OCEANBASE_INDEX_TYPE=IVF_FLAT +OCEANBASE_INDEX_TYPE=HNSW OCEANBASE_VECTOR_METRIC_TYPE=cosine OCEANBASE_TEXT_FIELD=document OCEANBASE_VECTOR_FIELD=embedding diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2ccf7e25..d5504cc2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,10 +12,9 @@ on: python_version: description: 'Python version to build with' required: false - default: '3.10' + default: '3.11' type: choice options: - - '3.10' - '3.11' - '3.12' @@ -53,7 +52,7 @@ jobs: needs: build-dashboard strategy: matrix: - python-version: ["3.10", "3.11", "3.12"] + python-version: ["3.11", "3.12"] fail-fast: false steps: diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index ef5f4934..6f461ea5 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -36,7 +36,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11" - name: Inject Frontend and Build Package run: | diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index c7a83387..e5aab811 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -36,7 +36,7 @@ jobs: if: github.event_name == 'push' || github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || (github.event_name == 'pull_request_target' && github.event.pull_request.head.repo.full_name == github.repository) strategy: matrix: - python-version: ["3.10"] + python-version: ["3.11"] fail-fast: false steps: @@ -174,7 +174,7 @@ jobs: if: github.event_name == 'push' || github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || (github.event_name == 'pull_request_target' && github.event.pull_request.head.repo.full_name == github.repository) strategy: matrix: - python-version: ["3.10"] + python-version: ["3.11"] fail-fast: false steps: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bdc559a2..559e406a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.10", "3.11", "3.12"] + python-version: ["3.11", "3.12"] fail-fast: false steps: diff --git a/README.md b/README.md index 124281a4..21263430 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,8 @@ One command to add PowerMem memory to OpenClaw: `openclaw plugins install memory license - - pyversions + + pyversions Ask DeepWiki diff --git a/README_CN.md b/README_CN.md index d733b3e9..035209b9 100644 --- a/README_CN.md +++ b/README_CN.md @@ -27,8 +27,8 @@ license - - pyversions + + pyversions Ask DeepWiki diff --git a/README_JP.md b/README_JP.md index 21e0cb1f..c7eeca00 100644 --- a/README_JP.md +++ b/README_JP.md @@ -27,8 +27,8 @@ license - - pyversions + + pyversions Ask DeepWiki diff --git a/benchmark/server/main.py b/benchmark/server/main.py index 37615d6f..77f84101 100644 --- a/benchmark/server/main.py +++ b/benchmark/server/main.py @@ -153,7 +153,7 @@ def create(self, *args: Any, **kwargs: Any) -> Any: app = FastAPI( title="PowerMem Benchmark REST APIs", description="A REST API for managing and searching memories for benchmark testing scenarios.", - version="1.0.0", + version="1.1.0", docs_url="/docs", redoc_url="/redoc", ) diff --git a/docs/api/0002-async_memory.md b/docs/api/0002-async_memory.md index 434a9b08..1e4bbc67 100644 --- a/docs/api/0002-async_memory.md +++ b/docs/api/0002-async_memory.md @@ -263,6 +263,24 @@ async def batch_process(): asyncio.run(batch_process()) ``` +### Limitation: Embedded SeekDB Does Not Support Async + +Embedded SeekDB (local file mode with no `host` configured) uses a single-threaded C++ engine that **does not support concurrent multi-threaded access**. `AsyncMemory` internally submits synchronous operations to a `ThreadPoolExecutor`, which causes multiple threads to read and write the same embedded SeekDB instance simultaneously. This leads to C++-level crashes such as `pure virtual method called` or `Segmentation fault`. + +**`AsyncMemory` cannot be used with embedded SeekDB.** Use the synchronous `Memory` class instead. + +```python +# ❌ Not supported with embedded SeekDB +from powermem import AsyncMemory +async_memory = AsyncMemory(config=embedded_seekdb_config) # crashes + +# ✓ Use the synchronous interface with embedded SeekDB +from powermem import Memory +memory = Memory(config=embedded_seekdb_config) +``` + +Remote OceanBase (with `host` configured) is not affected by this limitation and fully supports `AsyncMemory`. + ### When to Use AsyncMemory Use `AsyncMemory` when: @@ -270,9 +288,11 @@ Use `AsyncMemory` when: - Building async web applications (FastAPI, aiohttp) - Implementing batch processing pipelines - Need non-blocking memory operations +- Using **remote OceanBase** (with `host` configured) Use `Memory` when: - Simple synchronous scripts - Interactive notebooks - Simple use cases without concurrency needs +- Using **embedded SeekDB** (local file mode, no `host`) diff --git a/docs/benchmark/overview.md b/docs/benchmark/overview.md index 09470bd0..ea17d193 100644 --- a/docs/benchmark/overview.md +++ b/docs/benchmark/overview.md @@ -54,7 +54,7 @@ bash run.sh results ### Prerequisites -- Python 3.10 or higher +- Python 3.11 or higher - pip or poetry for dependency management - LLM and embedding API keys (OpenAI, Qwen, etc. — see root `.env.example`) - Database: OceanBase, PostgreSQL, or SQLite (depending on your configuration) @@ -398,7 +398,7 @@ The benchmark evaluates performance using multiple metrics: - **Solution**: - Install all dependencies: `pip install -r benchmark/locomo/requirements.txt` - Ensure you're running from the correct directory - - Check Python version (requires 3.10+) + - Check Python version (requires 3.11+) #### Slow performance - **Solution**: diff --git a/docs/development/overview.md b/docs/development/overview.md index 8fdfc555..78b67b18 100644 --- a/docs/development/overview.md +++ b/docs/development/overview.md @@ -25,7 +25,7 @@ This guide provides comprehensive instructions for developers who want to contri ### Prerequisites -- Python 3.10 or higher +- Python 3.11 or higher - pip (Python package manager) - Git diff --git a/docs/examples/overview.md b/docs/examples/overview.md index d597c4bb..44120eef 100644 --- a/docs/examples/overview.md +++ b/docs/examples/overview.md @@ -67,7 +67,7 @@ Each notebook includes: ## Requirements -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Jupyter Notebook (for interactive notebooks) - LLM provider configured (for intelligent features in Scenario 2+) diff --git a/docs/examples/scenario_10_sparse_vector.ipynb b/docs/examples/scenario_10_sparse_vector.ipynb index b53c22af..e8f74aec 100644 --- a/docs/examples/scenario_10_sparse_vector.ipynb +++ b/docs/examples/scenario_10_sparse_vector.ipynb @@ -10,7 +10,7 @@ "\n", "## Prerequisites\n", "\n", - "- Python 3.10+\n", + "- Python 3.11+\n", "- powermem installed (`pip install powermem`)\n", "- Database: **seekdb** or **OceanBase >= 4.5.0**\n" ] diff --git a/docs/examples/scenario_10_sparse_vector.md b/docs/examples/scenario_10_sparse_vector.md index a9e7692c..90341f9f 100644 --- a/docs/examples/scenario_10_sparse_vector.md +++ b/docs/examples/scenario_10_sparse_vector.md @@ -4,7 +4,7 @@ This example demonstrates how to use the sparse vector feature, including config ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Database: **seekdb** or **OceanBase >= 4.5.0** diff --git a/docs/examples/scenario_1_basic_usage.ipynb b/docs/examples/scenario_1_basic_usage.ipynb index 535890fd..4770e5ce 100644 --- a/docs/examples/scenario_1_basic_usage.ipynb +++ b/docs/examples/scenario_1_basic_usage.ipynb @@ -15,7 +15,7 @@ "source": [ "## Prerequisites\n", "\n", - "- Python 3.10+\n", + "- Python 3.11+\n", "- powermem installed (`pip install powermem`)\n" ] }, diff --git a/docs/examples/scenario_1_basic_usage.md b/docs/examples/scenario_1_basic_usage.md index 8720fd81..0e0385a4 100644 --- a/docs/examples/scenario_1_basic_usage.md +++ b/docs/examples/scenario_1_basic_usage.md @@ -4,7 +4,7 @@ This scenario guides you through the basics of powermem - storing, retrieving, a ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) ## Configuration diff --git a/docs/examples/scenario_7_multimodal.ipynb b/docs/examples/scenario_7_multimodal.ipynb index d89655be..2794265c 100644 --- a/docs/examples/scenario_7_multimodal.ipynb +++ b/docs/examples/scenario_7_multimodal.ipynb @@ -17,7 +17,7 @@ "source": [ "## Prerequisites\n", "\n", - "- Python 3.10+\n", + "- Python 3.11+\n", "- powermem installed (`pip install powermem`)\n", "- Multimodal LLM API support\n" ] diff --git a/docs/examples/scenario_7_multimodal.md b/docs/examples/scenario_7_multimodal.md index 10ae83e5..8d0a173f 100644 --- a/docs/examples/scenario_7_multimodal.md +++ b/docs/examples/scenario_7_multimodal.md @@ -4,7 +4,7 @@ This scenario demonstrates PowerMem's multimodal capability - storing and retrie ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Multimodal LLM API support diff --git a/docs/examples/scenario_8_ebbinghaus_forgetting_curve.ipynb b/docs/examples/scenario_8_ebbinghaus_forgetting_curve.ipynb index 2c8c15b8..5d0aefbb 100644 --- a/docs/examples/scenario_8_ebbinghaus_forgetting_curve.ipynb +++ b/docs/examples/scenario_8_ebbinghaus_forgetting_curve.ipynb @@ -15,7 +15,7 @@ "source": [ "## Prerequisites\n", "\n", - "- Python 3.10+\n", + "- Python 3.11+\n", "- powermem installed (`pip install powermem`)\n", "- matplotlib and numpy for visualization (optional)\n" ] diff --git a/docs/examples/scenario_8_ebbinghaus_forgetting_curve.md b/docs/examples/scenario_8_ebbinghaus_forgetting_curve.md index 920f9d44..68aac445 100644 --- a/docs/examples/scenario_8_ebbinghaus_forgetting_curve.md +++ b/docs/examples/scenario_8_ebbinghaus_forgetting_curve.md @@ -4,7 +4,7 @@ This scenario demonstrates how to implement and utilize the Ebbinghaus Forgettin ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - matplotlib and numpy for visualization (optional) diff --git a/docs/examples/scenario_9_user_memory.ipynb b/docs/examples/scenario_9_user_memory.ipynb index 7d08cf15..b7497c8e 100644 --- a/docs/examples/scenario_9_user_memory.ipynb +++ b/docs/examples/scenario_9_user_memory.ipynb @@ -11,7 +11,7 @@ "\n", "## Prerequisites\n", "\n", - "- Python 3.10+\n", + "- Python 3.11+\n", "- powermem installed (`pip install powermem`)\n", "- LLM provider configured (for profile extraction)\n", "- OceanBase configured as vector store (UserMemory requires OceanBase)\n", diff --git a/docs/examples/scenario_9_user_memory.md b/docs/examples/scenario_9_user_memory.md index 5859462d..d94a0f06 100644 --- a/docs/examples/scenario_9_user_memory.md +++ b/docs/examples/scenario_9_user_memory.md @@ -4,7 +4,7 @@ This scenario demonstrates powermem's UserMemory feature - automatic user profil ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - LLM provider configured (for profile extraction) - OceanBase configured as vector store (UserMemory requires OceanBase) diff --git a/docs/guides/0001-getting_started.md b/docs/guides/0001-getting_started.md index 30ceffa2..33550dd0 100644 --- a/docs/guides/0001-getting_started.md +++ b/docs/guides/0001-getting_started.md @@ -1,6 +1,6 @@ ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) ## Configuration diff --git a/docs/guides/0002-getting_started_async.md b/docs/guides/0002-getting_started_async.md index 318062c7..a30cab29 100644 --- a/docs/guides/0002-getting_started_async.md +++ b/docs/guides/0002-getting_started_async.md @@ -1,7 +1,7 @@ ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Basic understanding of Python async/await syntax diff --git a/docs/guides/0005-multi_agent.md b/docs/guides/0005-multi_agent.md index fd79f51e..ff56ea3e 100644 --- a/docs/guides/0005-multi_agent.md +++ b/docs/guides/0005-multi_agent.md @@ -1,6 +1,6 @@ ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Basic understanding of powermem (see [Getting Started Guide](./0001-getting_started.md)) diff --git a/docs/guides/0007-multimodal.md b/docs/guides/0007-multimodal.md index bd6362d6..f7ed9724 100644 --- a/docs/guides/0007-multimodal.md +++ b/docs/guides/0007-multimodal.md @@ -1,6 +1,6 @@ ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Multimodal LLM API support diff --git a/docs/guides/0008-ebbinghaus_forgetting_curve.md b/docs/guides/0008-ebbinghaus_forgetting_curve.md index 6a6fecdb..b582625a 100644 --- a/docs/guides/0008-ebbinghaus_forgetting_curve.md +++ b/docs/guides/0008-ebbinghaus_forgetting_curve.md @@ -1,6 +1,6 @@ ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - matplotlib and numpy for visualization (optional) diff --git a/docs/guides/0010-user_memory.md b/docs/guides/0010-user_memory.md index 091164d1..6be0e702 100644 --- a/docs/guides/0010-user_memory.md +++ b/docs/guides/0010-user_memory.md @@ -4,7 +4,7 @@ UserMemory is PowerMem's advanced user profile management module. It automatical ## Prerequisites -- Python 3.10+ +- Python 3.11+ - PowerMem installed (`pip install powermem`) - LLM and embedding services configured (for profile extraction) - Vector store configured (for storing memories and profiles) diff --git a/docs/guides/0011-sparse_vector.md b/docs/guides/0011-sparse_vector.md index e6e39283..56402ced 100644 --- a/docs/guides/0011-sparse_vector.md +++ b/docs/guides/0011-sparse_vector.md @@ -4,7 +4,7 @@ This guide explains how to use the Sparse Vector feature in PowerMem, including ## Prerequisites -- Python 3.10+ +- Python 3.11+ - powermem installed (`pip install powermem`) - Database requirements: **seekdb** or **OceanBase >= 4.5.0** diff --git a/examples/langchain/README.md b/examples/langchain/README.md index 270b1fe6..fec9ed85 100644 --- a/examples/langchain/README.md +++ b/examples/langchain/README.md @@ -40,7 +40,7 @@ This example demonstrates how to build an AI Healthcare Support Bot using **Powe ## Prerequisites -1. **Python 3.10+** +1. **Python 3.11+** 2. **OceanBase Database** (configured and running) 3. **API Keys**: - LLM API key (OpenAI, Qwen, etc.) diff --git a/examples/langgraph/README.md b/examples/langgraph/README.md index 12c310e5..61f39ddc 100644 --- a/examples/langgraph/README.md +++ b/examples/langgraph/README.md @@ -40,7 +40,7 @@ This example demonstrates how to build an AI Customer Service Bot using **PowerM ## Prerequisites -1. **Python 3.10+** +1. **Python 3.11+** 2. **OceanBase Database** (configured and running) 3. **API Keys**: - LLM API key (OpenAI, Qwen, etc.) diff --git a/pyproject.toml b/pyproject.toml index 554b94f5..2ef95f77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "powermem" -version = "1.0.2" +version = "1.1.0" description = "Intelligent Memory System - Persistent memory layer for LLM applications" readme = "README.md" license = {text = "Apache-2.0"} @@ -16,13 +16,12 @@ classifiers = [ "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Scientific/Engineering :: Artificial Intelligence", ] -requires-python = ">=3.10" +requires-python = ">=3.11" dependencies = [ "pydantic>=2.0.0", "pydantic-settings>=2.0.0", @@ -55,6 +54,7 @@ dependencies = [ "dashscope>=1.14.0", "psutil>=5.9.0", "zai-sdk>=0.2.0", + "pyseekdb>=0.1.0", ] [project.optional-dependencies] @@ -106,7 +106,7 @@ powermem-cli = "powermem.cli.main:cli" [tool.black] line-length = 88 -target-version = ['py310'] +target-version = ['py311'] include = '\.pyi?$' extend-exclude = ''' /( @@ -129,7 +129,7 @@ line_length = 88 known_first_party = ["powermem"] [tool.mypy] -python_version = "3.10" +python_version = "3.11" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true diff --git a/src/powermem/cli/commands/config.py b/src/powermem/cli/commands/config.py index c8f7d168..dd626812 100644 --- a/src/powermem/cli/commands/config.py +++ b/src/powermem/cli/commands/config.py @@ -829,8 +829,13 @@ def _wizard_database(existing: Dict[str, str]) -> Dict[str, str]: if provider == "oceanbase": updates["OCEANBASE_HOST"] = click.prompt( - "OceanBase host", - default=existing.get("OCEANBASE_HOST") or "127.0.0.1", + "OceanBase host (empty for embedded SeekDB)", + default=existing.get("OCEANBASE_HOST") or "", + show_default=True, + ) + updates["OCEANBASE_PATH"] = click.prompt( + "OceanBase embedded SeekDB path (used when host is empty)", + default=existing.get("OCEANBASE_PATH") or "./seekdb_data", show_default=True, ) updates["OCEANBASE_PORT"] = click.prompt( @@ -918,8 +923,13 @@ def _wizard_database_quickstart(existing: Dict[str, str]) -> Dict[str, str]: if provider == "oceanbase": updates["OCEANBASE_HOST"] = click.prompt( - "OceanBase host", - default=existing.get("OCEANBASE_HOST") or "127.0.0.1", + "OceanBase host (empty for embedded SeekDB)", + default=existing.get("OCEANBASE_HOST") or "", + show_default=True, + ) + updates["OCEANBASE_PATH"] = click.prompt( + "OceanBase embedded SeekDB path (used when host is empty)", + default=existing.get("OCEANBASE_PATH") or "./seekdb_data", show_default=True, ) updates["OCEANBASE_PORT"] = click.prompt( diff --git a/src/powermem/config_loader.py b/src/powermem/config_loader.py index ce4e5f68..27477697 100644 --- a/src/powermem/config_loader.py +++ b/src/powermem/config_loader.py @@ -160,7 +160,7 @@ def to_config(self) -> Dict[str, Any]: # 4. For OceanBase, build connection_args for backward compatibility if db_provider == "oceanbase": connection_args = {} - for key in ["host", "port", "user", "password", "db_name"]: + for key in ["host", "port", "user", "password", "db_name", "ob_path"]: if key in vector_store_config: connection_args[key] = vector_store_config[key] diff --git a/src/powermem/core/audit.py b/src/powermem/core/audit.py index ff993ffc..d2c3dbe8 100644 --- a/src/powermem/core/audit.py +++ b/src/powermem/core/audit.py @@ -92,7 +92,7 @@ def log_event( "user_id": user_id, "agent_id": agent_id, "details": details, - "version": "1.0.2", + "version": "1.1.0", } # Log to file diff --git a/src/powermem/core/memory.py b/src/powermem/core/memory.py index 09c4254e..ac2155c3 100644 --- a/src/powermem/core/memory.py +++ b/src/powermem/core/memory.py @@ -1225,13 +1225,27 @@ def search( # Intelligent plugin lifecycle management on search if self._intelligence_plugin and self._intelligence_plugin.enabled: updates, deletes = self._intelligence_plugin.on_search(processed_results) + # For embedded SeekDB the engine is single-threaded (NullPool, not + # thread-safe). Background threads opening concurrent connections + # crash the C++ layer. Run updates/deletes synchronously instead. + _is_embedded_store = ( + hasattr(self.storage, 'vector_store') + and hasattr(self.storage.vector_store, 'connection_args') + and not self.storage.vector_store.connection_args.get("host") + ) if updates: for mem_id, upd in updates: - _BACKGROUND_EXECUTOR.submit(self.storage.update_memory,mem_id,{**upd},user_id,agent_id) + if _is_embedded_store: + self.storage.update_memory(mem_id, {**upd}, user_id, agent_id) + else: + _BACKGROUND_EXECUTOR.submit(self.storage.update_memory, mem_id, {**upd}, user_id, agent_id) logger.info(f"Submitted {len(updates)} update operations to background executor") if deletes: for mem_id in deletes: - _BACKGROUND_EXECUTOR.submit(self.storage.delete_memory,mem_id,user_id,agent_id) + if _is_embedded_store: + self.storage.delete_memory(mem_id, user_id, agent_id) + else: + _BACKGROUND_EXECUTOR.submit(self.storage.delete_memory, mem_id, user_id, agent_id) logger.info(f"Submitted {len(deletes)} delete operations to background executor") # Transform results to match benchmark expected format diff --git a/src/powermem/core/telemetry.py b/src/powermem/core/telemetry.py index a0162968..df52d770 100644 --- a/src/powermem/core/telemetry.py +++ b/src/powermem/core/telemetry.py @@ -82,7 +82,7 @@ def capture_event( "user_id": user_id, "agent_id": agent_id, "timestamp": get_current_datetime().isoformat(), - "version": "1.0.2", + "version": "1.1.0", } self.events.append(event) @@ -182,7 +182,7 @@ def set_user_properties(self, user_id: str, properties: Dict[str, Any]) -> None: "properties": properties, "user_id": user_id, "timestamp": get_current_datetime().isoformat(), - "version": "1.0.2", + "version": "1.1.0", } self.events.append(event) diff --git a/src/powermem/storage/config/base.py b/src/powermem/storage/config/base.py index 3a99bede..423f2736 100644 --- a/src/powermem/storage/config/base.py +++ b/src/powermem/storage/config/base.py @@ -102,13 +102,23 @@ class BaseGraphStoreConfig(BaseVectorStoreConfig): # Override connection fields with GRAPH_STORE_ fallback aliases host: str = Field( - default="127.0.0.1", + default="", validation_alias=AliasChoices( "host", "GRAPH_STORE_HOST", # Priority 1 "OCEANBASE_HOST", # Priority 2 (fallback) ), - description="Database server host" + description="Database server host (empty means embedded SeekDB mode)" + ) + + ob_path: str = Field( + default="./seekdb_data", + validation_alias=AliasChoices( + "ob_path", + "GRAPH_STORE_PATH", + "OCEANBASE_PATH", + ), + description="Path for embedded SeekDB data directory (used when host is empty)" ) port: str = Field( diff --git a/src/powermem/storage/config/oceanbase.py b/src/powermem/storage/config/oceanbase.py index 91b830ee..f24102a8 100644 --- a/src/powermem/storage/config/oceanbase.py +++ b/src/powermem/storage/config/oceanbase.py @@ -30,12 +30,21 @@ class OceanBaseConfig(BaseVectorStoreConfig): # Connection parameters host: str = Field( - default="127.0.0.1", + default="", validation_alias=AliasChoices( "host", "OCEANBASE_HOST", ), - description="OceanBase server host" + description="OceanBase server host (empty means embedded SeekDB mode)" + ) + + ob_path: str = Field( + default="./seekdb_data", + validation_alias=AliasChoices( + "ob_path", + "OCEANBASE_PATH", + ), + description="Path for embedded SeekDB data directory (used when host is empty)" ) port: str = Field( diff --git a/src/powermem/storage/oceanbase/constants.py b/src/powermem/storage/oceanbase/constants.py index 410d9ff2..f3ae3715 100644 --- a/src/powermem/storage/oceanbase/constants.py +++ b/src/powermem/storage/oceanbase/constants.py @@ -21,11 +21,12 @@ class VecIndexType: # ============================================================================= DEFAULT_OCEANBASE_CONNECTION = { - "host": "127.0.0.1", + "host": "", "port": "2881", "user": "root@test", "password": "", "db_name": "test", + "ob_path": "./seekdb_data", } diff --git a/src/powermem/storage/oceanbase/oceanbase.py b/src/powermem/storage/oceanbase/oceanbase.py index 08221c63..7e222d4b 100644 --- a/src/powermem/storage/oceanbase/oceanbase.py +++ b/src/powermem/storage/oceanbase/oceanbase.py @@ -65,6 +65,7 @@ def __init__( user: Optional[str] = None, password: Optional[str] = None, db_name: Optional[str] = None, + ob_path: Optional[str] = None, hybrid_search: bool = True, fulltext_parser: str = constants.DEFAULT_FULLTEXT_PARSER, vector_weight: float = 0.5, @@ -134,6 +135,7 @@ def __init__( "user": user or connection_args.get("user", constants.DEFAULT_OCEANBASE_CONNECTION["user"]), "password": password or connection_args.get("password", constants.DEFAULT_OCEANBASE_CONNECTION["password"]), "db_name": db_name or connection_args.get("db_name", constants.DEFAULT_OCEANBASE_CONNECTION["db_name"]), + "ob_path": ob_path or connection_args.get("ob_path", constants.DEFAULT_OCEANBASE_CONNECTION["ob_path"]), } self.connection_args = final_connection_args @@ -189,18 +191,23 @@ def __init__( def _create_client(self, **kwargs): """Create and initialize the OceanBase vector client.""" host = self.connection_args.get("host") - port = self.connection_args.get("port") - user = self.connection_args.get("user") - password = self.connection_args.get("password") db_name = self.connection_args.get("db_name") - self.obvector = ObVecClient( - uri=f"{host}:{port}", - user=user, - password=password, - db_name=db_name, - **kwargs, - ) + if host: + port = self.connection_args.get("port") + user = self.connection_args.get("user") + password = self.connection_args.get("password") + self.obvector = ObVecClient( + uri=f"{host}:{port}", + user=user, + password=password, + db_name=db_name, + **kwargs, + ) + else: + ob_path = self.connection_args.get("ob_path", "./seekdb_data") + OceanBaseUtil.ensure_embedded_database_exists(ob_path, db_name) + self.obvector = ObVecClient(path=ob_path, db_name=db_name) def _configure_vector_index_settings(self): """Configure OceanBase vector index settings automatically.""" @@ -383,6 +390,20 @@ def _create_col(self): "Please configure embedding_model_dims in your OceanBaseConfig." ) + # Embedded SeekDB does not tolerate IVF-family indexes on small datasets: + # IVF requires at least nlist training vectors; fewer vectors causes a native + # SIGSEGV that cannot be caught by Python. Switch to HNSW automatically. + is_embedded = not self.connection_args.get("host") + if is_embedded and self.index_type in constants.INDEX_TYPE_IVF: + nlist = (self.vidx_algo_params or {}).get("nlist", constants.DEFAULT_OCEANBASE_IVF_BUILD_PARAM.get("nlist", 128)) + logger.warning( + "Embedded SeekDB: index_type '%s' (nlist=%d) requires at least %d vectors " + "and may crash on small datasets. Auto-switching to HNSW.", + self.index_type, nlist, nlist, + ) + self.index_type = "HNSW" + self.vidx_algo_params = constants.DEFAULT_OCEANBASE_HNSW_BUILD_PARAM.copy() + # Set up vector index parameters if self.vidx_metric_type not in ("l2", "inner_product", "cosine"): raise ValueError( @@ -599,12 +620,26 @@ def _row_to_model(self, row): # Create a new Model instance (not bound to Session) record = self.model_class() + # Support both SQLAlchemy Row objects and plain dicts (used when rows + # are materialised early to avoid embedded SeekDB cursor crashes). + mapping = row._mapping if hasattr(row, '_mapping') else row + + # Build a normalized lookup: strip table-name prefix that embedded SeekDB + # may add (e.g. "memories.document" → "document") so we can always find + # the value regardless of whether the driver returns bare or prefixed keys. + normalized: Dict[str, any] = {} + for k in mapping.keys(): + bare = k.split(".")[-1] if "." in str(k) else k + # Prefer the bare key; only store prefixed key if bare not yet seen + if bare not in normalized: + normalized[bare] = mapping[k] + # Iterate through all columns in the table, map values from Row to Model instance for col_name in self.model_class.__table__.c.keys(): - # Check if Row contains this column (queries may not include all columns) - if col_name in row._mapping.keys(): + value = normalized.get(col_name) + if value is not None or col_name in normalized: attr_name = 'metadata_' if col_name == 'metadata' else col_name - setattr(record, attr_name, row._mapping[col_name]) + setattr(record, attr_name, value) return record @@ -715,12 +750,13 @@ def _parse_row_to_dict(self, row, include_vector: bool = False, extract_score: b # Extract additional score/distance fields (these fields are not in Model, need to get from original row) if extract_score: - if 'score' in row._mapping.keys(): - score_or_distance = row._mapping['score'] - elif 'distance' in row._mapping.keys(): - score_or_distance = row._mapping['distance'] - elif 'anon_1' in row._mapping.keys(): - score_or_distance = row._mapping['anon_1'] + mapping = row._mapping if hasattr(row, '_mapping') else row + if 'score' in mapping.keys(): + score_or_distance = mapping['score'] + elif 'distance' in mapping.keys(): + score_or_distance = mapping['distance'] + elif 'anon_1' in mapping.keys(): + score_or_distance = mapping['anon_1'] # Build standard metadata metadata = { @@ -873,7 +909,7 @@ def _vector_search(self, # Convert results to OutputData objects search_results = [] - for row in results.fetchall(): + for row in OceanBaseUtil.safe_fetchall(results): parsed = self._parse_row_to_dict(row, include_vector=False, extract_score=True) # Convert distance to similarity score (0-1 range, higher is better) @@ -963,12 +999,14 @@ def _fulltext_search(self, query: str, limit: int = 5, filters: Optional[Dict] = stmt = stmt.limit(limit) # Execute the query with parameters - use direct parameter passing + # Materialize rows to dicts inside the connection context to avoid + # "pure virtual method called" crash in embedded SeekDB (the C++ + # cursor is invalidated once the transaction/connection closes). with self.obvector.engine.connect() as conn: with conn.begin(): logger.info(f"Executing FTS query with parameters: query={query}") - # Execute with parameter dictionary - the standard SQLAlchemy way results = conn.execute(stmt) - rows = results.fetchall() + rows = [dict(r._mapping) for r in OceanBaseUtil.safe_fetchall(results)] except Exception as e: logger.warning(f"Full-text search failed, falling back to LIKE search: {e}") @@ -1001,9 +1039,8 @@ def _fulltext_search(self, query: str, limit: int = 5, filters: Optional[Dict] = with self.obvector.engine.connect() as conn: with conn.begin(): logger.info(f"Executing LIKE fallback query with parameters: like_query={like_query}") - # Execute with parameter dictionary - the standard SQLAlchemy way results = conn.execute(stmt) - rows = results.fetchall() + rows = [dict(r._mapping) for r in OceanBaseUtil.safe_fetchall(results)] except Exception as fallback_error: logger.error(f"Both full-text search and LIKE fallback failed: {fallback_error}") return [] @@ -1082,12 +1119,13 @@ def _sparse_search(self, sparse_embedding: Dict[int, float], limit: int = 5, fil stmt = stmt.limit(limit) # Execute the query + # Materialize rows to dicts inside the connection context to avoid + # "pure virtual method called" crash in embedded SeekDB. with self.obvector.engine.connect() as conn: with conn.begin(): logger.debug(f"Executing sparse vector search query with sparse_vector: {sparse_vector_str}") - # Execute the query results = conn.execute(stmt) - rows = results.fetchall() + rows = [dict(r._mapping) for r in OceanBaseUtil.safe_fetchall(results)] # Convert results to OutputData objects sparse_results = [] @@ -1229,7 +1267,8 @@ def _native_hybrid_search( with self.obvector.engine.connect() as conn: with conn.begin(): - res = conn.execute(sql, {"index": self.collection_name, "body_str": body_str}).fetchone() + res_result = conn.execute(sql, {"index": self.collection_name, "body_str": body_str}) + res = OceanBaseUtil.safe_fetchone(res_result) result_json_str = res[0] if res else None # 6. Parse and return results @@ -1318,44 +1357,64 @@ def _hybrid_search(self, query: str, vectors: List[List[float]], limit: int = 5, # Determine which searches to perform perform_sparse = self.include_sparse and sparse_embedding is not None - # Perform searches in parallel for better performance - search_tasks = [] - with ThreadPoolExecutor(max_workers=3 if perform_sparse else 2) as executor: - # Submit vector search - vector_future = executor.submit(self._vector_search, query, vectors, candidate_limit, filters) - search_tasks.append(('vector', vector_future)) - - # Submit full-text search - fts_future = executor.submit(self._fulltext_search, query, candidate_limit, filters) - search_tasks.append(('fts', fts_future)) + is_embedded = not self.connection_args.get("host") - # Submit sparse vector search if enabled - if perform_sparse: - sparse_future = executor.submit(self._sparse_search, sparse_embedding, candidate_limit, filters) - search_tasks.append(('sparse', sparse_future)) + if is_embedded: + # SeekDB embedded engine does not support concurrent SQL across threads + try: + vector_results = self._vector_search(query, vectors, candidate_limit, filters) + except Exception as e: + logger.warning(f"vector search failed: {e}") + vector_results = [] - # Wait for all searches to complete and get results - vector_results = None - fts_results = None - sparse_results = None + try: + fts_results = self._fulltext_search(query, candidate_limit, filters) + except Exception as e: + logger.warning(f"fts search failed: {e}") + fts_results = [] - for search_type, future in search_tasks: + sparse_results = [] + if perform_sparse: try: - results = future.result() - if search_type == 'vector': - vector_results = results - elif search_type == 'fts': - fts_results = results - elif search_type == 'sparse': - sparse_results = results + sparse_results = self._sparse_search(sparse_embedding, candidate_limit, filters) except Exception as e: - logger.warning(f"{search_type} search failed: {e}") - if search_type == 'vector': - vector_results = [] - elif search_type == 'fts': - fts_results = [] - elif search_type == 'sparse': - sparse_results = [] + logger.warning(f"sparse search failed: {e}") + sparse_results = [] + else: + # Remote mode: parallel searches for better performance + search_tasks = [] + with ThreadPoolExecutor(max_workers=3 if perform_sparse else 2) as executor: + vector_future = executor.submit(self._vector_search, query, vectors, candidate_limit, filters) + search_tasks.append(('vector', vector_future)) + + fts_future = executor.submit(self._fulltext_search, query, candidate_limit, filters) + search_tasks.append(('fts', fts_future)) + + if perform_sparse: + sparse_future = executor.submit(self._sparse_search, sparse_embedding, candidate_limit, filters) + search_tasks.append(('sparse', sparse_future)) + + vector_results = None + fts_results = None + sparse_results = None + + for search_type, future in search_tasks: + try: + results = future.result() + if search_type == 'vector': + vector_results = results + elif search_type == 'fts': + fts_results = results + elif search_type == 'sparse': + sparse_results = results + except Exception as e: + logger.warning(f"{search_type} search failed: {e}") + if search_type == 'vector': + vector_results = [] + elif search_type == 'fts': + fts_results = [] + elif search_type == 'sparse': + sparse_results = [] # Ensure we have at least empty lists if vector_results is None: @@ -1645,11 +1704,12 @@ def _rrf_fusion(self, vector_results: List[OutputData], fts_results: List[Output # Convert to final results and sort by RRF score heap = [] for doc_id, doc_data in all_docs.items(): - # Use document ID as tiebreaker to avoid dict comparison when rrf_scores are equal + # Use document ID as tiebreaker; coerce None to 0 for safe comparison + safe_id = doc_id if doc_id is not None else 0 if len(heap) < limit: - heapq.heappush(heap, (doc_data['rrf_score'], doc_id, doc_data)) + heapq.heappush(heap, (doc_data['rrf_score'], safe_id, doc_data)) elif doc_data['rrf_score'] > heap[0][0]: - heapq.heapreplace(heap, (doc_data['rrf_score'], doc_id, doc_data)) + heapq.heapreplace(heap, (doc_data['rrf_score'], safe_id, doc_data)) final_results = [] for score, _, doc_data in sorted(heap, key=lambda x: x[0], reverse=True): @@ -1819,62 +1879,76 @@ def delete(self, vector_id: int): logger.error(f"Failed to delete vector with ID {vector_id} from collection '{self.collection_name}': {e}", exc_info=True) raise + def _get_records_by_id(self, vector_id, output_columns: List[str]) -> list: + """Fetch rows by primary key while keeping the connection open during fetchall. + + pyobvector.get() returns the cursor *after* committing the transaction via + ``with conn.begin()``. In embedded SeekDB the commit invalidates the cursor, + so calling fetchall() on it afterwards triggers a C++ ``pure virtual method + called`` crash. This helper avoids that by running fetchall() inside the + ``with engine.connect()`` block. + """ + table = Table(self.collection_name, self.obvector.metadata_obj, autoload_with=self.obvector.engine) + cols = [table.c[col] for col in output_columns if col in table.c] + stmt = select(*cols).where(table.c[self.primary_field].in_([vector_id])) + with self.obvector.engine.connect() as conn: + result = conn.execute(stmt) + return OceanBaseUtil.safe_fetchall(result) + def update(self, vector_id: int, vector: Optional[List[float]] = None, payload: Optional[Dict] = None): """Update a vector and its payload.""" try: - # Get existing record to preserve fields not being updated - # Always try to get sparse_vector_field to preserve it even when include_sparse=False - # This prevents accidentally clearing sparse_embedding when using a non-sparse Memory instance - output_columns = [self.vector_field] + # Fetch ALL existing columns so a partial payload never wipes fields + output_columns = self._get_standard_column_names(include_vector_field=True) has_sparse_column = OceanBaseUtil.check_column_exists(self.obvector, self.collection_name, self.sparse_vector_field) - if has_sparse_column: + if has_sparse_column and self.sparse_vector_field not in output_columns: output_columns.append(self.sparse_vector_field) - existing_result = self.obvector.get( - table_name=self.collection_name, - ids=[vector_id], - output_column_name=output_columns - ) - - existing_rows = existing_result.fetchall() + existing_rows = self._get_records_by_id(vector_id, output_columns) if not existing_rows: logger.warning(f"Vector with ID {vector_id} not found in collection '{self.collection_name}'") return - # Prepare update data - update_data: Dict[str, Any] = { - self.primary_field: vector_id, + # Parse existing row into a dict so we can merge with the new payload + existing = self._parse_row_to_dict(existing_rows[0], include_vector=True, extract_score=False) + + # Rebuild the existing payload dict that _build_record_for_insert expects + existing_payload: Dict[str, Any] = { + "data": existing.get("text_content", ""), + "metadata": existing.get("metadata", {}).get("metadata", {}), + "user_id": existing.get("user_id", ""), + "agent_id": existing.get("agent_id", ""), + "run_id": existing.get("run_id", ""), + "actor_id": existing.get("actor_id", ""), + "hash": existing.get("hash_val", ""), + "created_at": existing.get("created_at", ""), + "updated_at": existing.get("updated_at", ""), + "category": existing.get("category", ""), } + # Note: sparse_embedding is handled separately below via the column check - # Extract existing values from row - existing_vector = existing_rows[0][0] if existing_rows[0] else None - existing_sparse_embedding = existing_rows[0][1] if has_sparse_column and len(existing_rows[0]) > 1 else None - - if vector is not None: - update_data[self.vector_field] = ( - vector if not self.normalize else OceanBaseUtil.normalize(vector) - ) + # Merge: new payload keys override existing ones + if payload is not None: + merged_payload = {**existing_payload, **payload} else: - # Preserve the existing vector to avoid it being cleared by upsert - if existing_vector is not None: - update_data[self.vector_field] = existing_vector - logger.debug(f"Preserving existing vector for ID {vector_id}") + merged_payload = existing_payload - if payload is not None: - # Use the helper method to build fields, then merge with update_data - temp_record = self._build_record_for_insert(vector or [], payload) + # Build the full record from the merged payload + existing_vector = existing.get("vector") + update_vector = vector if vector is not None else existing_vector + temp_record = self._build_record_for_insert(update_vector if update_vector is not None else [], merged_payload) - # Copy relevant fields from temp_record (excluding primary key and vector if not updating) - for key, value in temp_record.items(): - if key != self.primary_field and (vector is not None or key != self.vector_field): - update_data[key] = value + # Prepare update data + update_data: Dict[str, Any] = {self.primary_field: vector_id} + for key, value in temp_record.items(): + if key != self.primary_field: + update_data[key] = value # Preserve existing sparse_embedding if not explicitly provided in payload - # This prevents intelligence_plugin updates from accidentally clearing sparse_embedding - # Check column existence instead of include_sparse to protect data even when sparse is disabled if has_sparse_column and self.sparse_vector_field not in update_data: - if existing_sparse_embedding is not None: - update_data[self.sparse_vector_field] = existing_sparse_embedding + existing_sparse = existing.get("sparse_embedding") + if existing_sparse is not None: + update_data[self.sparse_vector_field] = existing_sparse logger.debug(f"Preserving existing sparse_embedding for ID {vector_id}") # Update record @@ -1894,13 +1968,7 @@ def get(self, vector_id: int): # Build output column name list output_columns = self._get_standard_column_names(include_vector_field=True) - results = self.obvector.get( - table_name=self.collection_name, - ids=[vector_id], - output_column_name=output_columns, - ) - - rows = results.fetchall() + rows = self._get_records_by_id(vector_id, output_columns) if not rows: logger.debug(f"Vector with ID {vector_id} not found in collection '{self.collection_name}'") return None @@ -2030,7 +2098,7 @@ def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None, # Execute query with self.obvector.engine.connect() as conn: results = conn.execute(stmt) - rows = results.fetchall() + rows = OceanBaseUtil.safe_fetchall(results) memories = [] for row in rows: @@ -2342,7 +2410,7 @@ def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> List # Try to fetch results (for SELECT queries) try: - rows = result.fetchall() + rows = OceanBaseUtil.safe_fetchall(result) # Convert rows to dictionaries if rows and result.keys(): return [dict(zip(result.keys(), row)) for row in rows] diff --git a/src/powermem/storage/oceanbase/oceanbase_graph.py b/src/powermem/storage/oceanbase/oceanbase_graph.py index 90560590..f189522b 100644 --- a/src/powermem/storage/oceanbase/oceanbase_graph.py +++ b/src/powermem/storage/oceanbase/oceanbase_graph.py @@ -37,6 +37,7 @@ from powermem.integrations import EmbedderFactory, LLMFactory from powermem.storage.base import GraphStoreBase from powermem.utils.utils import format_entities, remove_code_blocks, generate_snowflake_id, get_current_datetime +from powermem.utils.oceanbase_util import OceanBaseUtil try: from rank_bm25 import BM25Okapi @@ -123,18 +124,23 @@ def get_config_value(key: str, default: Any = None) -> Any: ) # Initialize OceanBase client - host = get_config_value("host", "127.0.0.1") - port = get_config_value("port", "2881") - user = get_config_value("user", "root") - password = get_config_value("password", "") + host = get_config_value("host", "") db_name = get_config_value("db_name", "test") - self.client = ObVecClient( - uri=f"{host}:{port}", - user=user, - password=password, - db_name=db_name, - ) + if host: + port = get_config_value("port", "2881") + user = get_config_value("user", "root") + password = get_config_value("password", "") + self.client = ObVecClient( + uri=f"{host}:{port}", + user=user, + password=password, + db_name=db_name, + ) + else: + ob_path = get_config_value("ob_path", "./seekdb_data") + OceanBaseUtil.ensure_embedded_database_exists(ob_path, db_name) + self.client = ObVecClient(path=ob_path, db_name=db_name) self.engine = self.client.engine self.metadata = MetaData() @@ -540,7 +546,7 @@ def delete_all(self, filters: Dict[str, Any]) -> None: # Collect unique entity IDs from relationships entity_ids = set() - for rel in relationships_results.fetchall(): + for rel in OceanBaseUtil.safe_fetchall(relationships_results): entity_ids.add(rel[1]) # source_entity_id entity_ids.add(rel[2]) # destination_entity_id @@ -583,7 +589,7 @@ def get_all(self, filters: Dict[str, Any], limit: int = 100) -> List[Dict[str, s where_clause=where_clause ) - relationships = relationships_results.fetchall() + relationships = OceanBaseUtil.safe_fetchall(relationships_results) if not relationships: return [] @@ -605,7 +611,7 @@ def get_all(self, filters: Dict[str, Any], limit: int = 100) -> List[Dict[str, s ) # Create a mapping from entity_id to entity_name - entity_map = {entity[0]: entity[1] for entity in entities_results.fetchall()} + entity_map = {entity[0]: entity[1] for entity in OceanBaseUtil.safe_fetchall(entities_results)} # Build final results with updated_at for sorting final_results = [] @@ -869,12 +875,12 @@ def _execute_single_hop_query( if conn is not None: # Reuse existing connection (transactional) result = conn.execute(text(query), params) - rows = result.fetchall() + rows = OceanBaseUtil.safe_fetchall(result) else: # Create new connection (backward compatibility) with self.engine.connect() as new_conn: result = new_conn.execute(text(query), params) - rows = result.fetchall() + rows = OceanBaseUtil.safe_fetchall(result) # Format results and filter out cycles formatted_results = [] @@ -1071,8 +1077,8 @@ def _delete_entities( ) # Get entity IDs - source_rows = source_entities.fetchall() if source_entities else [] - dest_rows = dest_entities.fetchall() if dest_entities else [] + source_rows = OceanBaseUtil.safe_fetchall(source_entities) if source_entities else [] + dest_rows = OceanBaseUtil.safe_fetchall(dest_entities) if dest_entities else [] source_ids = [e[0] for e in source_rows] dest_ids = [e[0] for e in dest_rows] @@ -1239,7 +1245,7 @@ def _search_node( where_clause=where_clause, ) - rows = results.fetchall() + rows = OceanBaseUtil.safe_fetchall(results) if rows: if limit == 1: row = rows[0] @@ -1336,7 +1342,7 @@ def _create_or_update_relationship( where_clause=[where_clause_with_params] ) - existing_rows = existing_relationships.fetchall() + existing_rows = OceanBaseUtil.safe_fetchall(existing_relationships) if not existing_rows: # Relationship doesn't exist, create new one current_time = get_current_datetime() @@ -1359,17 +1365,17 @@ def _create_or_update_relationship( # Get the names for return value using pyobvector get method # First get the entities - source_entity = self.client.get( + source_entity = OceanBaseUtil.safe_fetchone(self.client.get( table_name=constants.TABLE_ENTITIES, ids=[source_id], output_column_name=["id", "name"] - ).fetchone() + )) - dest_entity = self.client.get( + dest_entity = OceanBaseUtil.safe_fetchone(self.client.get( table_name=constants.TABLE_ENTITIES, ids=[dest_id], output_column_name=["id", "name"] - ).fetchone() + )) return { "source": source_entity[1] if source_entity else None, diff --git a/src/powermem/user_memory/storage/user_profile.py b/src/powermem/user_memory/storage/user_profile.py index 93c50376..f3501a1d 100644 --- a/src/powermem/user_memory/storage/user_profile.py +++ b/src/powermem/user_memory/storage/user_profile.py @@ -10,6 +10,7 @@ from sqlalchemy import and_, or_, func, literal, null, Index from ...storage.oceanbase import constants +from ...utils.oceanbase_util import OceanBaseUtil from ...utils.utils import serialize_datetime, generate_snowflake_id, get_current_datetime try: @@ -41,6 +42,7 @@ def __init__( user: Optional[str] = None, password: Optional[str] = None, db_name: Optional[str] = None, + ob_path: Optional[str] = None, **kwargs, ): """ @@ -49,11 +51,12 @@ def __init__( Args: table_name (str): Name of the table to store user profiles. connection_args (Optional[Dict[str, Any]]): Connection parameters for OceanBase. - host (Optional[str]): OceanBase server host. + host (Optional[str]): OceanBase server host (empty means embedded SeekDB mode). port (Optional[str]): OceanBase server port. user (Optional[str]): OceanBase username. password (Optional[str]): OceanBase password. db_name (Optional[str]): OceanBase database name. + ob_path (Optional[str]): Path for embedded SeekDB data directory. """ self.table_name = table_name self.primary_field = "id" @@ -69,6 +72,7 @@ def __init__( "user": user or connection_args.get("user", constants.DEFAULT_OCEANBASE_CONNECTION["user"]), "password": password or connection_args.get("password", constants.DEFAULT_OCEANBASE_CONNECTION["password"]), "db_name": db_name or connection_args.get("db_name", constants.DEFAULT_OCEANBASE_CONNECTION["db_name"]), + "ob_path": ob_path or connection_args.get("ob_path", constants.DEFAULT_OCEANBASE_CONNECTION["ob_path"]), } self.connection_args = final_connection_args @@ -83,18 +87,23 @@ def __init__( def _create_client(self, **kwargs): """Create and initialize the OceanBase client.""" host = self.connection_args.get("host") - port = self.connection_args.get("port") - user = self.connection_args.get("user") - password = self.connection_args.get("password") db_name = self.connection_args.get("db_name") - self.obvector = ObVecClient( - uri=f"{host}:{port}", - user=user, - password=password, - db_name=db_name, - **kwargs, - ) + if host: + port = self.connection_args.get("port") + user = self.connection_args.get("user") + password = self.connection_args.get("password") + self.obvector = ObVecClient( + uri=f"{host}:{port}", + user=user, + password=password, + db_name=db_name, + **kwargs, + ) + else: + ob_path = self.connection_args.get("ob_path", "./seekdb_data") + OceanBaseUtil.ensure_embedded_database_exists(ob_path, db_name) + self.obvector = ObVecClient(path=ob_path, db_name=db_name) def _create_table(self) -> None: """Create user profiles table if it doesn't exist.""" @@ -159,7 +168,7 @@ def save_profile( stmt = self.table.select().where(and_(*conditions)).limit(1) result = conn.execute(stmt) - existing_row = result.fetchone() + existing_row = OceanBaseUtil.safe_fetchone(result) # Prepare update/insert values values = { @@ -226,7 +235,7 @@ def get_profile_by_user_id(self, user_id: str) -> Optional[Dict[str, Any]]: stmt = stmt.limit(1) result = conn.execute(stmt) - row = result.fetchone() + row = OceanBaseUtil.safe_fetchone(result) if row: return { @@ -413,7 +422,7 @@ def get_profile( # Execute query and build results result = conn.execute(stmt) - rows = result.fetchall() + rows = OceanBaseUtil.safe_fetchall(result) return [ self._build_profile_dict(row, main_topic, sub_topic) diff --git a/src/powermem/user_memory/user_memory.py b/src/powermem/user_memory/user_memory.py index bc3e4c81..8bc25651 100644 --- a/src/powermem/user_memory/user_memory.py +++ b/src/powermem/user_memory/user_memory.py @@ -74,6 +74,7 @@ def __init__( "user": connection_args.get("user"), "password": connection_args.get("password"), "db_name": connection_args.get("db_name"), + "ob_path": connection_args.get("ob_path"), } # Use factory to create UserProfileStore based on storage_type diff --git a/src/powermem/utils/oceanbase_util.py b/src/powermem/utils/oceanbase_util.py index 03e654b1..df4a2e07 100644 --- a/src/powermem/utils/oceanbase_util.py +++ b/src/powermem/utils/oceanbase_util.py @@ -783,4 +783,43 @@ def parse_native_hybrid_results( return [] except Exception as e: logger.error(f"Error parsing native hybrid search results: {e}") - return [] \ No newline at end of file + return [] + + @staticmethod + def safe_fetchall(result): + """Safely fetch all rows, returning empty list when SeekDB embedded returns no-row result for empty tables.""" + if not getattr(result, 'returns_rows', True): + return [] + return result.fetchall() + + @staticmethod + def safe_fetchone(result): + """Safely fetch one row, returning None when SeekDB embedded returns no-row result for empty tables.""" + if not getattr(result, 'returns_rows', True): + return None + return result.fetchone() + + @staticmethod + def ensure_embedded_database_exists(ob_path: str, db_name: str) -> None: + """ + For embedded SeekDB mode only: ensure the target database exists, creating it if necessary. + + Connects to the default 'test' database first, then executes + CREATE DATABASE IF NOT EXISTS for the target database. + + Args: + ob_path: Path for embedded SeekDB data directory. + db_name: Target database name to ensure exists. + """ + if not db_name or db_name == "test": + return + + try: + from pyobvector import ObVecClient + temp_client = ObVecClient(path=ob_path, db_name="test") + with temp_client.engine.connect() as conn: + conn.execute(text(f"CREATE DATABASE IF NOT EXISTS `{db_name}`")) + conn.commit() + logger.info(f"Ensured embedded database '{db_name}' exists") + except Exception as e: + logger.warning(f"Failed to create embedded database '{db_name}': {e}") \ No newline at end of file diff --git a/src/powermem/version.py b/src/powermem/version.py index 3ac559be..6374fe15 100644 --- a/src/powermem/version.py +++ b/src/powermem/version.py @@ -2,11 +2,12 @@ Version information management """ -__version__ = "1.0.2" +__version__ = "1.1.0" __version_info__ = tuple(map(int, __version__.split("."))) # Version history VERSION_HISTORY = { + "1.1.0": "2026-04-02 - Version 1.1.0 release", "1.0.0": "2026-03-16 - Version 1.0.0 release", "0.5.0": "2026-02-06 - Version 0.5.0 release", "0.4.0": "2026-01-20 - Version 0.4.0 release", diff --git a/src/script/scripts/upgrade_sparse_vector.py b/src/script/scripts/upgrade_sparse_vector.py index 988581d9..a54bf6b0 100644 --- a/src/script/scripts/upgrade_sparse_vector.py +++ b/src/script/scripts/upgrade_sparse_vector.py @@ -69,31 +69,40 @@ def _validate_and_parse_config(config: Dict[str, Any]) -> Tuple[ObVecClient, str user = connection_args.get('user') password = connection_args.get('password') db_name = connection_args.get('db_name') + ob_path = connection_args.get('ob_path', './seekdb_data') collection_name = ob_config.get('collection_name', 'power_mem') - # 6. Validate required parameters - if not all([host, port, user, db_name]): - missing = [] - if not host: missing.append('host') - if not port: missing.append('port') - if not user: missing.append('user') - if not db_name: missing.append('db_name') - raise ValueError( - f"Missing required OceanBase connection parameters: {', '.join(missing)}. " - f"Please ensure config contains 'vector_store.config.connection_args.{{host, port, user, db_name}}'." - ) - - # 7. Create database connection + # 6. Validate required parameters and create connection try: - logger.info(f"Connecting to OceanBase at {host}:{port}...") - obvector = ObVecClient( - uri=f"{host}:{port}", - user=user, - password=password or "", - db_name=db_name - ) - logger.info(f"Connected successfully to database '{db_name}'") + if host: + if not all([port, user, db_name]): + missing = [] + if not port: missing.append('port') + if not user: missing.append('user') + if not db_name: missing.append('db_name') + raise ValueError( + f"Missing required OceanBase connection parameters: {', '.join(missing)}. " + f"Please ensure config contains 'vector_store.config.connection_args.{{port, user, db_name}}'." + ) + logger.info(f"Connecting to OceanBase at {host}:{port}...") + obvector = ObVecClient( + uri=f"{host}:{port}", + user=user, + password=password or "", + db_name=db_name + ) + logger.info(f"Connected successfully to database '{db_name}'") + else: + if not db_name: + raise ValueError( + "Missing required parameter 'db_name' for embedded SeekDB connection." + ) + logger.info(f"Connecting to embedded SeekDB at {ob_path}...") + obvector = ObVecClient(path=ob_path, db_name=db_name) + logger.info(f"Connected successfully to embedded SeekDB database '{db_name}'") return obvector, collection_name + except (ValueError, RuntimeError): + raise except Exception as e: raise RuntimeError(f"Failed to connect to OceanBase: {e}") from e diff --git a/src/server/__init__.py b/src/server/__init__.py index 34924e67..f04b959c 100644 --- a/src/server/__init__.py +++ b/src/server/__init__.py @@ -5,4 +5,4 @@ for memory management, search, user profiles, and multi-agent support. """ -__version__ = "0.1.0" +__version__ = "1.1.0" diff --git a/src/server/api/v1/agents.py b/src/server/api/v1/agents.py index aaf94e52..f4d3ae4b 100644 --- a/src/server/api/v1/agents.py +++ b/src/server/api/v1/agents.py @@ -16,9 +16,17 @@ router = APIRouter(prefix="/agents", tags=["agents"]) -def get_agent_service() -> AgentService: - """Dependency to get agent service""" - return AgentService() +def get_agent_service(request: Request) -> AgentService: + """Dependency to get agent service singleton from app state""" + service = request.app.state.agent_service + if service is None: + from ...models.errors import ErrorCode, APIError + raise APIError( + code=ErrorCode.INTERNAL_ERROR, + message="Agent service unavailable: storage backend initialization failed", + status_code=503, + ) + return service @router.get( diff --git a/src/server/api/v1/memories.py b/src/server/api/v1/memories.py index 8405c6ce..98354070 100644 --- a/src/server/api/v1/memories.py +++ b/src/server/api/v1/memories.py @@ -31,9 +31,17 @@ router = APIRouter(prefix="/memories", tags=["memories"]) -def get_memory_service() -> MemoryService: - """Dependency to get memory service""" - return MemoryService() +def get_memory_service(request: Request) -> MemoryService: + """Dependency to get memory service singleton from app state""" + service = request.app.state.memory_service + if service is None: + from ...models.errors import ErrorCode, APIError + raise APIError( + code=ErrorCode.INTERNAL_ERROR, + message="Memory service unavailable: storage backend initialization failed", + status_code=503, + ) + return service @router.post( diff --git a/src/server/api/v1/search.py b/src/server/api/v1/search.py index 47cc38dd..d5f2a93f 100644 --- a/src/server/api/v1/search.py +++ b/src/server/api/v1/search.py @@ -17,9 +17,17 @@ router = APIRouter(prefix="/memories", tags=["search"]) -def get_search_service() -> SearchService: - """Dependency to get search service""" - return SearchService() +def get_search_service(request: Request) -> SearchService: + """Dependency to get search service singleton from app state""" + service = request.app.state.search_service + if service is None: + from ...models.errors import ErrorCode, APIError + raise APIError( + code=ErrorCode.INTERNAL_ERROR, + message="Search service unavailable: storage backend initialization failed", + status_code=503, + ) + return service @router.post( diff --git a/src/server/api/v1/users.py b/src/server/api/v1/users.py index bce201d9..9f44180a 100644 --- a/src/server/api/v1/users.py +++ b/src/server/api/v1/users.py @@ -16,9 +16,17 @@ router = APIRouter(prefix="/users", tags=["users"]) -def get_user_service() -> UserService: - """Dependency to get user service""" - return UserService() +def get_user_service(request: Request) -> UserService: + """Dependency to get user service singleton from app state""" + service = request.app.state.user_service + if service is None: + from ...models.errors import ErrorCode, APIError + raise APIError( + code=ErrorCode.INTERNAL_ERROR, + message="User service unavailable: storage backend initialization failed", + status_code=503, + ) + return service @router.get( diff --git a/src/server/cli/server.py b/src/server/cli/server.py index 7ad24817..90451f02 100644 --- a/src/server/cli/server.py +++ b/src/server/cli/server.py @@ -8,6 +8,33 @@ from ..middleware.logging import setup_logging +def _is_embedded_storage() -> bool: + """ + Check whether the configured storage backend is an embedded (single-process) database. + + Returns True for: + - SQLite (always embedded, file-based) + - OceanBase/SeekDB in embedded mode (OCEANBASE_HOST is empty) + """ + try: + from powermem.config_loader import DatabaseSettings + db_settings = DatabaseSettings() + provider = db_settings.provider.lower() + + if provider == "sqlite": + return True + + if provider == "oceanbase": + from powermem.storage.config.oceanbase import OceanBaseConfig + ob_config = OceanBaseConfig() + return not (ob_config.host or "").strip() + + except Exception: + pass + + return False + + @click.command() @click.option("--host", default=None, help="Host to bind to") @click.option("--port", default=None, type=int, help="Port to bind to") @@ -21,6 +48,8 @@ def server(host, port, workers, reload, log_level): Example: powermem-server --host 0.0.0.0 --port 8000 --reload """ + import sys + # Override config with CLI options if host: config.host = host @@ -32,11 +61,20 @@ def server(host, port, workers, reload, log_level): config.reload = True if log_level: config.log_level = log_level - + + # Embedded databases (SQLite / embedded SeekDB) only support a single process. + # Force workers=1 automatically so users don't have to set it manually. + if not config.reload and config.workers != 1 and _is_embedded_storage(): + print( + f"[server] Embedded storage detected (SQLite or SeekDB without host). " + f"Forcing workers=1 (was {config.workers}).", + file=sys.stderr, + ) + config.workers = 1 + # Debug: Print current log format (can be removed later) - import sys print(f"[DEBUG] Current log_format: {config.log_format}", file=sys.stderr) - + # Setup logging BEFORE starting uvicorn to ensure all logs have timestamps setup_logging() diff --git a/src/server/main.py b/src/server/main.py index 74c56a13..e6288f8a 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -2,6 +2,7 @@ Main FastAPI application for PowerMem API Server """ +from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles @@ -19,13 +20,44 @@ from .middleware.auth import verify_api_key import os +import logging # Setup logging setup_logging() +logger = logging.getLogger("server") + # Setup templates BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Initialize shared service singletons at startup and clean up on shutdown.""" + from .services.memory_service import MemoryService + from .services.search_service import SearchService + from .services.user_service import UserService + from .services.agent_service import AgentService + + logger.info("Initializing service singletons...") + try: + app.state.memory_service = MemoryService() + app.state.search_service = SearchService() + app.state.user_service = UserService() + app.state.agent_service = AgentService() + logger.info("Service singletons initialized") + except Exception as e: + logger.error(f"Failed to initialize service singletons: {e}", exc_info=True) + app.state.memory_service = None + app.state.search_service = None + app.state.user_service = None + app.state.agent_service = None + + yield + + logger.info("Shutting down services...") + + # Create FastAPI app app = FastAPI( title=config.api_title, @@ -34,6 +66,7 @@ docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json", + lifespan=lifespan, ) # Setup CORS