diff --git a/.env b/.env index 7563d9c..e1ca055 100644 --- a/.env +++ b/.env @@ -1 +1 @@ -MEM0_API_KEY= \ No newline at end of file +MEM0_API_KEY=m0-A6mOkNpchMyMIwF5BYvPG2CIXOmRVnr54d0cmw50 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..90909e2 --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +MEM0_API_KEY= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c0e8e0a --- /dev/null +++ b/.gitignore @@ -0,0 +1,48 @@ +# Python bytecode +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +dist/ +build/ +*.egg-info/ +mem0_mcp.egg-info/ + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# Environment variables +.env + +# Mac OS specific files +.DS_Store + +# Editor specific files +.vscode/ +.idea/ +*.swp +*.swo + +# Python version file +.python-version + +# Cache and logs +.cache/ +.pytest_cache/ +.coverage +htmlcov/ +coverage.xml +*.cover +logs/ +*.log + +# Dependency files +uv.lock + +# Temporary files +tmp/ +temp/ \ No newline at end of file diff --git a/README.md b/README.md index 1fff365..bd20593 100644 --- a/README.md +++ b/README.md @@ -38,23 +38,161 @@ MEM0_API_KEY=your_api_key_here uv run main.py ``` -2. In Cursor, connect to the SSE endpoint, follow this [doc](https://docs.cursor.com/context/model-context-protocol) for reference: +Or if you're using the metadata tagging version: +```bash +uv run main_metadata_tagging.py ``` + +2. In Cursor, connect to the SSE endpoint, follow this [doc](https://docs.cursor.com/context/model-context-protocol) for reference: + +```text http://0.0.0.0:8080/sse ``` 3. Open the Composer in Cursor and switch to `Agent` mode. -## Demo with Cursor +## Prompt Examples + +Here are examples of how to effectively use each of the tools provided by this MCP server: + +### Adding Coding Preferences + +When you want to store a code snippet, implementation pattern, or programming knowledge, use prompts like these: + +```text +Please save this React custom hook for fetching data with caching: + +import { useState, useEffect } from 'react'; + +export const useFetchWithCache = (url, options = {}) => { + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + useEffect(() => { + // Check cache first + const cachedData = sessionStorage.getItem(`cache_${url}`); + + if (cachedData) { + setData(JSON.parse(cachedData)); + setLoading(false); + return; + } + + const fetchData = async () => { + try { + const response = await fetch(url, options); + if (!response.ok) throw new Error(`HTTP error ${response.status}`); + + const result = await response.json(); + + // Cache the result + sessionStorage.setItem(`cache_${url}`, JSON.stringify(result)); + + setData(result); + } catch (err) { + setError(err.message); + } finally { + setLoading(false); + } + }; + + fetchData(); + }, [url, JSON.stringify(options)]); + + return { data, loading, error }; +}; + +This is a React hook (React 18) that fetches data from an API and implements caching using sessionStorage to improve performance. It handles loading states and error conditions gracefully. + +``` + +For project-specific storage (with metadata tagging): + +```text +Please save this PostgreSQL query pattern for efficient pagination in the "database-patterns" project: + +SELECT * +FROM table_name +WHERE id > (SELECT id FROM table_name ORDER BY id LIMIT 1 OFFSET $offset) +ORDER BY id +LIMIT $limit; + +This pattern uses keyset pagination which is more efficient than OFFSET/LIMIT for large datasets as it avoids scanning through offset rows each time. It works best with indexed columns like id. +``` + +### Retrieving All Coding Preferences + +When you want to review all stored coding patterns: + +```text + +Please show me all the coding patterns we've stored so far. + +``` + +For project-specific retrieval: + +```text -https://github.com/user-attachments/assets/56670550-fb11-4850-9905-692d3496231c +Can you retrieve all the coding preferences we've saved for the "database-patterns" project? + +``` + +### Searching Coding Preferences + +When you need to find specific coding patterns or solutions: + +```text + +Find me any React hooks we've saved for handling API requests. + +``` + +```text + +Search for efficient pagination patterns in SQL databases. + +``` + +```text + +Do we have any examples of implementing authentication in Express.js? + +``` + +For project-specific searches: + +```text + +Within the "frontend-components" project, search for any modal implementation patterns. + +``` + +## Effective Workflow + +For the most effective use of this system: + +1. **Consistently Store Valuable Code**: Whenever you encounter or create a useful code pattern, implementation, or solution, store it with thorough documentation. + +2. **Always Search First**: Before implementing a solution, search the stored preferences to see if you already have a pattern for it. + +3. **Use Project Tags**: Organize related code patterns using project tags to keep your knowledge base well-structured. + +4. **Include Context**: When storing code, always include: + - The programming language/framework and version + - Any dependencies or prerequisites + - Example usage + - Edge cases or limitations + - Performance considerations ## Features The server provides three main tools for managing code preferences: 1. `add_coding_preference`: Store code snippets, implementation details, and coding patterns with comprehensive context including: + - Complete code with dependencies - Language/framework versions - Setup instructions @@ -79,9 +217,10 @@ This implementation allows for a persistent coding preferences system that can b By default, the server runs on 0.0.0.0:8080 but is configurable with command line arguments like: -``` +```bash + uv run main.py --host --port + ``` The server exposes an SSE endpoint at `/sse` that MCP clients can connect to for accessing the coding preferences management tools. - diff --git a/main_metadata_tagging.py b/main_metadata_tagging.py new file mode 100644 index 0000000..7a7b882 --- /dev/null +++ b/main_metadata_tagging.py @@ -0,0 +1,217 @@ +""" +MCP Server for Mem0 Code Preferences with Metadata Tagging + +This module implements a Model Control Protocol (MCP) server that integrates with the Mem0 memory system +to store, retrieve, and search coding preferences, snippets, and implementation patterns. It provides +tools for AI assistants to: + +1. Add code snippets and programming knowledge to persistent memory with project-specific metadata tagging +2. Retrieve all stored coding preferences for a specific project +3. Perform semantic searches across stored code knowledge, filtered by project + +The server exposes these capabilities through a FastMCP interface, making them available as tools +for AI assistants. Each memory is tagged with project metadata for better organization and retrieval. + +The module also sets up a Starlette-based web server with Server-Sent Events (SSE) for real-time +communication with clients. + +Usage: + Run the script directly to start the server: + ``` + python main_metadata_tagging.py --host 0.0.0.0 --port 8080 + ``` +""" +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from mcp.server.sse import SseServerTransport +from starlette.requests import Request +from starlette.routing import Mount, Route +from mcp.server import Server +import uvicorn +from mem0 import MemoryClient +from dotenv import load_dotenv +import json + +load_dotenv() + +# Initialize FastMCP server for mem0 tools +mcp = FastMCP("mem0-mcp") + +# Initialize mem0 client and set default user +mem0_client = MemoryClient() +DEFAULT_USER_ID = "cursor_mcp" +CUSTOM_INSTRUCTIONS = """ +Extract the Following Information: + +- Code Snippets: Save the actual code for future reference. +- Explanation: Document a clear description of what the code does and how it works. +- Related Technical Details: Include information about the programming language, dependencies, and system specifications. +- Key Features: Highlight the main functionalities and important aspects of the snippet. +""" +mem0_client.update_project(custom_instructions=CUSTOM_INSTRUCTIONS) + +@mcp.tool( + description="""Add a new coding preference to mem0. + This tool stores code snippets, implementation details, and coding patterns for future reference. + Use the 'project' parameter to tag the memory with a project identifier. + """ +) +async def add_coding_preference(text: str, project: str = "default_project") -> str: + """Add a new coding preference to mem0 with metadata tagging for the project. + + This tool is designed to store code snippets, implementation patterns, and programming knowledge. + When storing code, it's recommended to include: + - Complete code with imports and dependencies + - Language/framework information + - Setup instructions if needed + - Documentation and comments + - Example usage + + Args: + text: The content to store in memory, including code, documentation, and context + project: The project identifier to tag the memory with + """ + try: + messages = [{"role": "user", "content": text}] + # Pass metadata to tag the memory with the project name + result = mem0_client.add(messages, user_id=DEFAULT_USER_ID, output_format="v1.1", metadata={"project": project}) + + # Extract the memory ID from the result if available + memory_id = None + if isinstance(result, dict) and 'id' in result: + memory_id = result['id'] + elif isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict) and 'id' in result[0]: + memory_id = result[0]['id'] + + if memory_id: + return f"Successfully added preference with ID {memory_id} for project '{project}': {text}" + else: + return f"Successfully added preference for project '{project}': {text}" + except Exception as e: + return f"Error adding preference: {str(e)}" + +@mcp.tool( + description="""Retrieve all stored coding preferences for a given project. + Provide the 'project' parameter to limit the results. + Returns a JSON list of memories that have been tagged with the given project. + """ +) +async def get_all_coding_preferences(project: str = "default_project") -> str: + """Get all coding preferences for the specified project. + + Returns a JSON formatted list of all stored preferences, including: + - Code implementations and patterns + - Technical documentation + - Programming best practices + - Setup guides and examples + Each preference includes metadata about when it was created and its content type. + """ + try: + # Use proper v2 API format with filters + memories = mem0_client.get_all( + filters={"metadata": {"project": project}}, + user_id=DEFAULT_USER_ID, + page=1, + page_size=50, + version="v2" + ) + + # Handle different response formats - v2 API returns a list directly + if isinstance(memories, list): + flattened_memories = memories + elif isinstance(memories, dict) and "results" in memories: + flattened_memories = memories["results"] + else: + flattened_memories = [] + + return json.dumps(flattened_memories, indent=2) + except Exception as e: + return f"Error getting preferences: {str(e)}" + +@mcp.tool( + description="""Search through stored coding preferences using semantic search. + The 'project' parameter allows you to restrict the search to a specific project. + This tool should be called for EVERY user query to find relevant code and implementation details. + It helps find: + - Specific code implementations or patterns + - Solutions to programming problems + - Best practices and coding standards + - Setup and configuration guides + - Technical documentation and examples + The search uses natural language understanding to find relevant matches, so you can + describe what you're looking for in plain English. Always search the preferences before + providing answers to ensure you leverage existing knowledge.""" +) +async def search_coding_preferences(query: str, project: str = "default_project") -> str: + """Search coding preferences using semantic search, limited to a specific project. + + The search is powered by natural language understanding, allowing you to find: + - Code implementations and patterns + - Programming solutions and techniques + - Technical documentation and guides + - Best practices and standards + Results are ranked by relevance to your query. + + Args: + query: Search query string describing what you're looking for. Can be natural language + or specific technical terms. + project: The project identifier to restrict the search to + """ + try: + # Use proper v2 API format with filters + search_result = mem0_client.search( + query, + filters={"metadata": {"project": project}}, + user_id=DEFAULT_USER_ID, + version="v2" + ) + + # Handle different response formats + if isinstance(search_result, dict) and "results" in search_result: + flattened_memories = search_result["results"] + elif isinstance(search_result, list): + flattened_memories = search_result + else: + flattened_memories = [] + + return json.dumps(flattened_memories, indent=2) + except Exception as e: + return f"Error searching preferences: {str(e)}" + +def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: + """Create a Starlette application that can serve the provided MCP server with SSE.""" + sse = SseServerTransport("/messages/") + + async def handle_sse(request: Request) -> None: + async with sse.connect_sse( + request.scope, + request.receive, + request._send, # noqa: SLF001 + ) as (read_stream, write_stream): + await mcp_server.run( + read_stream, + write_stream, + mcp_server.create_initialization_options(), + ) + + return Starlette( + debug=debug, + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ], + ) + +if __name__ == "__main__": + mcp_server = mcp._mcp_server + + import argparse + parser = argparse.ArgumentParser(description='Run MCP SSE-based server (Metadata version)') + parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') + parser.add_argument('--port', type=int, default=8080, help='Port to listen on') + args = parser.parse_args() + + # Bind SSE request handling to MCP server + starlette_app = create_starlette_app(mcp_server, debug=True) + + uvicorn.run(starlette_app, host=args.host, port=args.port) diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..cc18fe3 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,106 @@ +# Mem0 MCP Tests + +This directory contains test scripts for the Mem0 MCP (Model Control Protocol) server and its functionality. The tests are designed to verify that the memory storage, retrieval, and search functions are working correctly. + +## Overview + +The test suite includes: + +1. **Unit Tests**: Test individual functions in isolation + + - `test_add_coding_preference.py`: Tests for adding memories with project metadata + - `test_get_all_coding_preferences.py`: Tests for retrieving all memories by project + - `test_search_coding_preferences.py`: Tests for semantic search functionality + +2. **Integration Tests**: Test the complete workflow + + - `test_integration.py`: Tests for combined functionality across multiple operations + +3. **Mock Implementation**: + - `conftest.py`: Contains the mock implementation of the Mem0 API client + +## Running the Tests + +### Using the Test Runner Script (Recommended) + +We provide a convenient shell script to run tests: + +```bash +# Make the script executable (if needed) +chmod +x tests/run_tests.sh + +# Run all tests +./tests/run_tests.sh + +# View all options +./tests/run_tests.sh --help +``` + +### Manual Test Execution + +To run all tests using uv: + +```bash +uv run -m pytest tests/ +``` + +To run specific test files: + +```bash +uv run -m pytest tests/test_add_coding_preference.py +``` + +To run tests with increased verbosity: + +```bash +uv run -m pytest tests/ -v +``` + +## Test Coverage + +The tests cover: + +- Adding code snippets with project-specific metadata +- Retrieving all snippets for a specific project +- Searching for code snippets using semantic search +- Error handling for all operations +- Project isolation (ensuring that memories are properly organized by project) +- Testing with both the default project and custom projects + +## API Version + +These tests support both v1 (deprecated) and v2 of the Mem0 API. The main implementation has been updated to use the v2 API format, which uses: + +```python +# Get all memories +client.get_all( + filters={"metadata": {"project": project_name}}, + version="v2" +) + +# Search memories +client.search( + query, + filters={"metadata": {"project": project_name}}, + version="v2" +) +``` + +The mock client in `conftest.py` supports both API versions for backward compatibility with tests. + +## Requirements + +- pytest +- pytest-asyncio (for async test support) + +Install with `uv` (the project's package manager): + +```bash +uv pip install pytest pytest-asyncio +``` + +Or update the project dependencies: + +```bash +uv pip sync +``` diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..37d4c1d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,172 @@ +""" +Pytest configuration file for mem0-mcp tests. + +This file contains fixtures and mocks used across the test suite. +""" +import json +import pytest +from unittest.mock import MagicMock, patch + +class MockMemoryClient: + """Mock implementation of the MemoryClient from mem0.""" + + def __init__(self): + self.memories = {} + self.next_id = 1 + + def update_project(self, custom_instructions=None): + """Mock method for updating project instructions.""" + self.custom_instructions = custom_instructions + return {"status": "success"} + + def add(self, messages, user_id, output_format=None, metadata=None): + """Mock method for adding a memory.""" + memory_id = f"memory_{self.next_id}" + self.next_id += 1 + + memory_data = { + "id": memory_id, + "messages": messages, + "user_id": user_id, + "metadata": metadata or {}, + "created_at": "2023-01-01T00:00:00Z" + } + + # Store memory by user_id and project + user_memories = self.memories.setdefault(user_id, {}) + project = metadata.get("project", "default_project") if metadata else "default_project" + project_memories = user_memories.setdefault(project, []) + project_memories.append(memory_data) + + return {"id": memory_id, "status": "success"} + + def get_all(self, user_id=None, page=1, page_size=10, filters=None, version=None, metadata_filters=None): + """Mock method for retrieving all memories. + + Supports both v1 (deprecated) with metadata_filters and v2 with filters parameter. + """ + user_memories = self.memories.get(user_id, {}) + results = [] + + # Extract project from filters (v2) or metadata_filters (v1) + project = None + + # Handle v2 API format + if version == "v2" and filters and "metadata" in filters and "project" in filters["metadata"]: + project = filters["metadata"]["project"] + # Handle legacy v1 format for backward compatibility with tests + elif metadata_filters and "project" in metadata_filters: + project = metadata_filters["project"] + + if project: + memories = user_memories.get(project, []) + else: + # Flatten all project memories + memories = [] + for project_memories in user_memories.values(): + memories.extend(project_memories) + + # Apply paging + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + page_memories = memories[start_idx:end_idx] + + # Format response based on version + if version == "v2": + # In v2, we return memories directly + return page_memories + else: + # In v1, we wrap memories in a results object + for memory in page_memories: + results.append({ + "id": memory["id"], + "memory": { + "id": memory["id"], + "messages": memory["messages"], + "created_at": memory["created_at"], + "metadata": memory["metadata"] + } + }) + + return { + "results": results, + "total": len(memories), + "page": page, + "page_size": page_size + } + + def search(self, query, user_id=None, output_format=None, filters=None, version=None, metadata_filters=None): + """Mock method for semantic search of memories. + + Supports both v1 (deprecated) with metadata_filters and v2 with filters parameter. + """ + # In our mock, we'll do a simple substring search instead of semantic search + user_memories = self.memories.get(user_id, {}) + results = [] + + # Extract project from filters (v2) or metadata_filters (v1) + project = None + + # Handle v2 API format + if version == "v2" and filters and "metadata" in filters and "project" in filters["metadata"]: + project = filters["metadata"]["project"] + # Handle legacy v1 format for backward compatibility with tests + elif metadata_filters and "project" in metadata_filters: + project = metadata_filters["project"] + + # If project filter is provided, only search within that project + if project: + projects_to_search = [project] + else: + projects_to_search = user_memories.keys() + + for project in projects_to_search: + project_memories = user_memories.get(project, []) + for memory in project_memories: + # Simple substring search in message content + for message in memory["messages"]: + if query.lower() in message.get("content", "").lower(): + memory_result = { + "id": memory["id"], + "messages": memory["messages"], + "created_at": memory["created_at"], + "metadata": memory["metadata"], + "score": 0.9 # Mock score + } + + # For v1 format, wrap in memory object + if version != "v2": + results.append({ + "id": memory["id"], + "memory": memory_result, + "score": 0.9 + }) + else: + # For v2, return memory directly + results.append(memory_result) + break # Only add each memory once + + # Format response based on version + if version == "v2": + # In v2, we return an object with results array + return { + "results": results[:10], # Limit to 10 results like a typical search + "query": query + } + else: + # In v1, same structure but different content + return { + "results": results[:10], + "query": query + } + +@pytest.fixture +def mock_mem0_client(): + """Fixture that provides a mock Mem0 client for testing.""" + return MockMemoryClient() + +@pytest.fixture +def patched_mem0_client(mock_mem0_client): + """Fixture that patches the MemoryClient in the main module with our mock.""" + with patch("main_metadata_tagging.mem0_client", mock_mem0_client): + yield mock_mem0_client \ No newline at end of file diff --git a/tests/run_api_test.sh b/tests/run_api_test.sh new file mode 100755 index 0000000..7503881 --- /dev/null +++ b/tests/run_api_test.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Run real API tests for mem0-mcp + +# Navigate to project root +cd "$(dirname "$0")/.." || exit + +# Check if API key is configured +if [ -f .env ]; then + echo "Found .env file with API configuration" +else + echo "Warning: No .env file found. Make sure your API key is set in the environment." + echo "You may need to create a .env file with your API key:" + echo "API_KEY=your_api_key_here" + # Continue anyway as the key might be set in the environment +fi + +# Check for verbose flag +if [ "$1" = "-v" ] || [ "$1" = "--verbose" ]; then + echo "Running API test with verbose output..." + uv run -m tests.test_real_api --verbose +else + echo "Running API test..." + uv run -m tests.test_real_api +fi \ No newline at end of file diff --git a/tests/run_isolation_test.sh b/tests/run_isolation_test.sh new file mode 100755 index 0000000..791af96 --- /dev/null +++ b/tests/run_isolation_test.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Run project isolation test for mem0-mcp + +# Navigate to project root +cd "$(dirname "$0")/.." || exit + +# Check if API key is configured +if [ -f .env ]; then + echo "Found .env file with API configuration" +else + echo "Warning: No .env file found. Make sure your API key is set in the environment." + echo "You may need to create a .env file with your API key:" + echo "API_KEY=your_api_key_here" + # Continue anyway as the key might be set in the environment +fi + +# Check for verbose flag +if [ "$1" = "-v" ] || [ "$1" = "--verbose" ]; then + echo "Running project isolation test with verbose output..." + uv run -m tests.test_project_isolation --verbose +else + echo "Running project isolation test..." + uv run -m tests.test_project_isolation +fi \ No newline at end of file diff --git a/tests/run_tests.sh b/tests/run_tests.sh new file mode 100755 index 0000000..b784e0e --- /dev/null +++ b/tests/run_tests.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# Run tests for the mem0-mcp server + +# Navigate to project root +cd "$(dirname "$0")/.." || exit + +# Check if an argument is provided +if [ $# -eq 0 ]; then + # Run all tests with normal verbosity + echo "Running all tests..." + uv run -m pytest tests/ +elif [ "$1" = "--help" ] || [ "$1" = "-h" ]; then + # Display help information + echo "Test Runner for mem0-mcp" + echo "Usage:" + echo " ./tests/run_tests.sh Run all tests" + echo " ./tests/run_tests.sh -v Run all tests with verbose output" + echo " ./tests/run_tests.sh -vv Run all tests with very verbose output" + echo " ./tests/run_tests.sh --coverage Run tests with coverage report" + echo " ./tests/run_tests.sh add Run only add_coding_preference tests" + echo " ./tests/run_tests.sh get Run only get_all_coding_preferences tests" + echo " ./tests/run_tests.sh search Run only search_coding_preferences tests" + echo " ./tests/run_tests.sh integration Run only integration tests" +elif [ "$1" = "-v" ]; then + # Run tests with verbose output + echo "Running all tests with verbose output..." + uv run -m pytest tests/ -v +elif [ "$1" = "-vv" ]; then + # Run tests with very verbose output + echo "Running all tests with very verbose output..." + uv run -m pytest tests/ -vv +elif [ "$1" = "--coverage" ]; then + # Run tests with coverage report + echo "Running tests with coverage report..." + uv run -m pytest tests/ --cov=. --cov-report=term-missing +elif [ "$1" = "add" ]; then + # Run only add_coding_preference tests + echo "Running add_coding_preference tests..." + uv run -m pytest tests/test_add_coding_preference.py -v +elif [ "$1" = "get" ]; then + # Run only get_all_coding_preferences tests + echo "Running get_all_coding_preferences tests..." + uv run -m pytest tests/test_get_all_coding_preferences.py -v +elif [ "$1" = "search" ]; then + # Run only search_coding_preferences tests + echo "Running search_coding_preferences tests..." + uv run -m pytest tests/test_search_coding_preferences.py -v +elif [ "$1" = "integration" ]; then + # Run only integration tests + echo "Running integration tests..." + uv run -m pytest tests/test_integration.py -v +else + echo "Unknown option: $1" + echo "Use --help for available options" + exit 1 +fi \ No newline at end of file diff --git a/tests/test_add_coding_preference.py b/tests/test_add_coding_preference.py new file mode 100644 index 0000000..df6daff --- /dev/null +++ b/tests/test_add_coding_preference.py @@ -0,0 +1,95 @@ +""" +Tests for the add_coding_preference function in main_metadata_tagging.py. +""" +import pytest +import json +import sys +import os +from unittest.mock import patch + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import main_metadata_tagging + +class TestAddCodingPreference: + """Tests for the add_coding_preference function.""" + + @pytest.mark.asyncio + async def test_add_preference_success(self, patched_mem0_client): + """Test that a preference can be successfully added.""" + # Set up test data + test_code = """ + def hello_world(): + print("Hello, World!") + """ + test_project = "test_project" + + # Call the function + result = await main_metadata_tagging.add_coding_preference(test_code, test_project) + + # Verify results + assert "Successfully added preference" in result + assert test_project in result + + # Verify the memory was added to the mock client + memories = patched_mem0_client.memories[main_metadata_tagging.DEFAULT_USER_ID][test_project] + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == test_code + assert memories[0]["metadata"]["project"] == test_project + + @pytest.mark.asyncio + async def test_add_preference_default_project(self, patched_mem0_client): + """Test that a preference is added to the default project when no project is specified.""" + # Set up test data + test_code = "print('Default project test')" + + # Call the function without specifying a project + result = await main_metadata_tagging.add_coding_preference(test_code) + + # Verify results + assert "Successfully added preference" in result + assert "default_project" in result + + # Verify the memory was added to the default project + memories = patched_mem0_client.memories[main_metadata_tagging.DEFAULT_USER_ID]["default_project"] + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == test_code + assert memories[0]["metadata"]["project"] == "default_project" + + @pytest.mark.asyncio + async def test_add_multiple_preferences(self, patched_mem0_client): + """Test that multiple preferences can be added to the same project.""" + # Set up test data + test_project = "multi_pref_project" + test_codes = [ + "def function1(): return 'Test 1'", + "def function2(): return 'Test 2'", + "def function3(): return 'Test 3'" + ] + + # Add multiple preferences + for test_code in test_codes: + result = await main_metadata_tagging.add_coding_preference(test_code, test_project) + assert "Successfully added preference" in result + + # Verify all memories were added + memories = patched_mem0_client.memories[main_metadata_tagging.DEFAULT_USER_ID][test_project] + assert len(memories) == len(test_codes) + + # Verify each memory has the correct content + for i, memory in enumerate(memories): + assert memory["messages"][0]["content"] == test_codes[i] + assert memory["metadata"]["project"] == test_project + + @pytest.mark.asyncio + async def test_add_preference_error_handling(self): + """Test error handling when the client raises an exception.""" + # Patch the mem0_client.add method to raise an exception + with patch("main_metadata_tagging.mem0_client.add", side_effect=Exception("Test error")): + # Call the function + result = await main_metadata_tagging.add_coding_preference("Test code", "test_project") + + # Verify error is handled properly + assert "Error adding preference" in result + assert "Test error" in result \ No newline at end of file diff --git a/tests/test_get_all_coding_preferences.py b/tests/test_get_all_coding_preferences.py new file mode 100644 index 0000000..8d7ad9d --- /dev/null +++ b/tests/test_get_all_coding_preferences.py @@ -0,0 +1,107 @@ +""" +Tests for the get_all_coding_preferences function in main_metadata_tagging.py. +""" +import pytest +import json +import sys +import os +from unittest.mock import patch + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import main_metadata_tagging + +class TestGetAllCodingPreferences: + """Tests for the get_all_coding_preferences function.""" + + @pytest.mark.asyncio + async def test_get_all_empty_project(self, patched_mem0_client): + """Test retrieving preferences when a project has no memories.""" + # Call the function with a project that doesn't exist yet + result = await main_metadata_tagging.get_all_coding_preferences("empty_project") + + # Parse the JSON result + memories = json.loads(result) + + # Verify an empty list is returned + assert isinstance(memories, list) + assert len(memories) == 0 + + @pytest.mark.asyncio + async def test_get_all_with_memories(self, patched_mem0_client): + """Test retrieving preferences when a project has memories.""" + # Set up test data + test_project = "populated_project" + test_codes = [ + "def function1(): return 'Test 1'", + "def function2(): return 'Test 2'", + "def function3(): return 'Test 3'" + ] + + # Add test memories to the project + for test_code in test_codes: + await main_metadata_tagging.add_coding_preference(test_code, test_project) + + # Call the function + result = await main_metadata_tagging.get_all_coding_preferences(test_project) + + # Parse the JSON result + memories = json.loads(result) + + # Verify the correct number of memories is returned + assert isinstance(memories, list) + assert len(memories) == len(test_codes) + + # Verify the memory content + message_contents = [memory["messages"][0]["content"] for memory in memories] + for test_code in test_codes: + assert test_code in message_contents + + @pytest.mark.asyncio + async def test_get_all_default_project(self, patched_mem0_client): + """Test retrieving preferences from the default project.""" + # Add test memories to the default project + test_code = "print('Default project test')" + await main_metadata_tagging.add_coding_preference(test_code) + + # Call the function without specifying a project + result = await main_metadata_tagging.get_all_coding_preferences() + + # Parse the JSON result + memories = json.loads(result) + + # Verify the memory is returned + assert isinstance(memories, list) + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == test_code + + @pytest.mark.asyncio + async def test_get_all_project_isolation(self, patched_mem0_client): + """Test that memories from different projects are isolated.""" + # Add memories to two different projects + project1 = "project1" + project2 = "project2" + + await main_metadata_tagging.add_coding_preference("Project 1 Code", project1) + await main_metadata_tagging.add_coding_preference("Project 2 Code", project2) + + # Get memories from project1 + result = await main_metadata_tagging.get_all_coding_preferences(project1) + memories = json.loads(result) + + # Verify only project1 memories are returned + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == "Project 1 Code" + + @pytest.mark.asyncio + async def test_get_all_error_handling(self): + """Test error handling when the client raises an exception.""" + # Patch the mem0_client.get_all method to raise an exception + with patch("main_metadata_tagging.mem0_client.get_all", side_effect=Exception("Test error")): + # Call the function + result = await main_metadata_tagging.get_all_coding_preferences("test_project") + + # Verify error is handled properly + assert "Error getting preferences" in result + assert "Test error" in result \ No newline at end of file diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..607c330 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,200 @@ +""" +Integration tests for main_metadata_tagging.py. + +These tests verify that the different functions work together properly in +realistic use cases and workflows. +""" +import pytest +import json +import sys +import os + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import main_metadata_tagging + +class TestIntegration: + """Integration tests for the main_metadata_tagging.py functions.""" + + @pytest.mark.asyncio + async def test_full_workflow(self, patched_mem0_client): + """ + Test a full workflow: + 1. Add multiple code snippets to different projects + 2. Retrieve all snippets in a project + 3. Search for specific snippets + """ + # Step 1: Add code snippets to different projects + python_project = "python_examples" + javascript_project = "javascript_examples" + + # Python snippets + python_snippets = [ + ''' + def fibonacci(n): + """Calculate the nth Fibonacci number.""" + if n <= 1: + return n + return fibonacci(n-1) + fibonacci(n-2) + ''', + ''' + def quicksort(arr): + """Quicksort implementation.""" + if len(arr) <= 1: + return arr + pivot = arr[len(arr) // 2] + left = [x for x in arr if x < pivot] + middle = [x for x in arr if x == pivot] + right = [x for x in arr if x > pivot] + return quicksort(left) + middle + quicksort(right) + ''', + ''' + def binary_search(arr, target): + """Binary search implementation.""" + left, right = 0, len(arr) - 1 + while left <= right: + mid = (left + right) // 2 + if arr[mid] == target: + return mid + elif arr[mid] < target: + left = mid + 1 + else: + right = mid - 1 + return -1 + ''' + ] + + # JavaScript snippets + js_snippets = [ + """ + function debounce(func, wait) { + // Debounce implementation + let timeout; + return function(...args) { + clearTimeout(timeout); + timeout = setTimeout(() => func.apply(this, args), wait); + }; + } + """, + """ + function throttle(func, limit) { + // Throttle implementation + let inThrottle; + return function(...args) { + if (!inThrottle) { + func.apply(this, args); + inThrottle = true; + setTimeout(() => inThrottle = false, limit); + } + }; + } + """ + ] + + # Add all snippets + for snippet in python_snippets: + result = await main_metadata_tagging.add_coding_preference(snippet, python_project) + assert "Successfully added preference" in result + + for snippet in js_snippets: + result = await main_metadata_tagging.add_coding_preference(snippet, javascript_project) + assert "Successfully added preference" in result + + # Step 2: Retrieve all snippets in each project + python_result = await main_metadata_tagging.get_all_coding_preferences(python_project) + js_result = await main_metadata_tagging.get_all_coding_preferences(javascript_project) + + python_memories = json.loads(python_result) + js_memories = json.loads(js_result) + + # Verify the correct number of memories per project + assert len(python_memories) == len(python_snippets) + assert len(js_memories) == len(js_snippets) + + # Step 3: Search for specific code patterns + # Search for sorting algorithms in Python project + sort_result = await main_metadata_tagging.search_coding_preferences("quicksort", python_project) + sort_memories = json.loads(sort_result) + + # Verify quicksort is found + assert len(sort_memories) >= 1 + found_quicksort = False + for memory in sort_memories: + if "quicksort" in memory["messages"][0]["content"].lower(): + found_quicksort = True + break + assert found_quicksort + + # Search for event handling in JavaScript project + event_result = await main_metadata_tagging.search_coding_preferences("throttle", javascript_project) + event_memories = json.loads(event_result) + + # Verify throttle function is found + assert len(event_memories) >= 1 + found_throttle = False + for memory in event_memories: + if "throttle" in memory["messages"][0]["content"].lower(): + found_throttle = True + break + assert found_throttle + + # Search for binary search across both projects + binary_result = await main_metadata_tagging.search_coding_preferences("binary search", python_project) + binary_memories = json.loads(binary_result) + + # Verify binary search is found + assert len(binary_memories) >= 1 + found_binary = False + for memory in binary_memories: + if "binary_search" in memory["messages"][0]["content"]: + found_binary = True + break + assert found_binary + + @pytest.mark.asyncio + async def test_cross_project_search_isolation(self, patched_mem0_client): + """Test that searches are properly isolated to the specified project.""" + # Add the same code pattern to different projects with different implementations + project1 = "frontend" + project2 = "backend" + + # Add snippet with "authentication" to project1 + frontend_auth = """ + function authenticateUser(username, password) { + // Frontend authentication logic + return fetch('/api/login', { + method: 'POST', + body: JSON.stringify({ username, password }) + }).then(response => response.json()); + } + """ + await main_metadata_tagging.add_coding_preference(frontend_auth, project1) + + # Add snippet with "authentication" to project2 + backend_auth = """ + def authenticate_user(username, password): + # Backend authentication logic + hashed_password = hash_password(password) + user = User.query.filter_by(username=username).first() + if user and user.password == hashed_password: + return generate_token(user) + return None + """ + await main_metadata_tagging.add_coding_preference(backend_auth, project2) + + # Search for "authentication" in project1 + frontend_result = await main_metadata_tagging.search_coding_preferences("authentication", project1) + frontend_memories = json.loads(frontend_result) + + # Verify only frontend authentication is found + assert len(frontend_memories) == 1 + assert "Frontend authentication" in frontend_memories[0]["messages"][0]["content"] + + # Search for "authentication" in project2 + backend_result = await main_metadata_tagging.search_coding_preferences("authentication", project2) + backend_memories = json.loads(backend_result) + + # Verify only backend authentication is found + assert len(backend_memories) == 1 + assert "Backend authentication" in backend_memories[0]["messages"][0]["content"] \ No newline at end of file diff --git a/tests/test_project_isolation.py b/tests/test_project_isolation.py new file mode 100644 index 0000000..0907e70 --- /dev/null +++ b/tests/test_project_isolation.py @@ -0,0 +1,193 @@ +""" +Project Isolation Test for Mem0 API. + +This script specifically tests that the get_all and search API calls only return data +for the specified project, ensuring proper isolation between different projects. +""" +import sys +import os +import json +import time +import argparse +from datetime import datetime + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from mem0 import MemoryClient +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +def run_isolation_test(verbose=False): + """Run a test specifically focused on project isolation.""" + print("Starting Project Isolation Test") + print("==============================\n") + + # Initialize the real Mem0 client + mem0_client = MemoryClient() + + # Test user ID - use a specific one for this test to avoid interference + user_id = "isolation_test_user" + + # Create two unique project names for testing + project1 = f"project1_{int(time.time())}" + project2 = f"project2_{int(time.time())}" + + print(f"Using test projects: '{project1}' and '{project2}'") + + try: + # Step 1: Add distinct test memories to both projects + print("\nAdding test memories to both projects...") + + # Add memory to project1 + code1 = ''' + def project1_function(): + """This function belongs ONLY to project1""" + return "Project 1 function" + ''' + messages1 = [{"role": "user", "content": code1}] + mem0_client.add(messages1, user_id=user_id, output_format="v1.1", metadata={"project": project1}) + + # Add memory to project2 + code2 = ''' + def project2_function(): + """This function belongs ONLY to project2""" + return "Project 2 function" + ''' + messages2 = [{"role": "user", "content": code2}] + mem0_client.add(messages2, user_id=user_id, output_format="v1.1", metadata={"project": project2}) + + print("Added one memory to each project") + print("Waiting for memories to be indexed...") + time.sleep(3) # Give more time for indexing + + # Step 2: Test get_all for project1 + print("\nTesting get_all for project1...") + memories1 = mem0_client.get_all( + user_id=user_id, + filters={"metadata": {"project": project1}}, + version="v2" + ) + + if verbose: + print(f"Project1 memories: {json.dumps(memories1, indent=2)}") + + # Step 3: Test get_all for project2 + print("Testing get_all for project2...") + memories2 = mem0_client.get_all( + user_id=user_id, + filters={"metadata": {"project": project2}}, + version="v2" + ) + + if verbose: + print(f"Project2 memories: {json.dumps(memories2, indent=2)}") + + # Step 4: Verify isolation for get_all + print("\nVerifying isolation for get_all...") + + # Function to check if a memory contains a specific text + def contains_text(memory, text): + if isinstance(memory, dict): + # Check memory content + if "memory" in memory and isinstance(memory["memory"], str) and text in memory["memory"]: + return True + # Check messages if available + if "messages" in memory: + for msg in memory["messages"]: + if "content" in msg and text in msg["content"]: + return True + return False + + # Check that project1 memories only contain project1 content + project1_isolation = True + for memory in memories1: + if contains_text(memory, "project2_function"): + project1_isolation = False + break + + # Check that project2 memories only contain project2 content + project2_isolation = True + for memory in memories2: + if contains_text(memory, "project1_function"): + project2_isolation = False + break + + if project1_isolation and project2_isolation: + print("✅ get_all API correctly maintains project isolation") + else: + print("❌ get_all API does NOT maintain project isolation") + + # Step 5: Test search for both projects with the same query + print("\nTesting search for both projects with the same query...") + + # Search for "function" in project1 + search_query = "function" + search_result1 = mem0_client.search( + search_query, + user_id=user_id, + filters={"metadata": {"project": project1}}, + version="v2" + ) + + # Search for "function" in project2 + search_result2 = mem0_client.search( + search_query, + user_id=user_id, + filters={"metadata": {"project": project2}}, + version="v2" + ) + + if verbose: + print(f"\nProject1 search results: {json.dumps(search_result1, indent=2)}") + print(f"\nProject2 search results: {json.dumps(search_result2, indent=2)}") + + # Extract results from response + results1 = search_result1["results"] if isinstance(search_result1, dict) and "results" in search_result1 else search_result1 + results2 = search_result2["results"] if isinstance(search_result2, dict) and "results" in search_result2 else search_result2 + + # Step 6: Verify search isolation + print("\nVerifying isolation for search...") + + # Check that project1 search results only contain project1 content + search1_isolation = True + for result in results1: + if contains_text(result, "project2_function"): + search1_isolation = False + break + + # Check that project2 search results only contain project2 content + search2_isolation = True + for result in results2: + if contains_text(result, "project1_function"): + search2_isolation = False + break + + if search1_isolation and search2_isolation: + print("✅ search API correctly maintains project isolation") + else: + print("❌ search API does NOT maintain project isolation") + + # Final results + if project1_isolation and project2_isolation and search1_isolation and search2_isolation: + print("\n✅ PASSED: Both get_all and search APIs correctly maintain project isolation") + return True + else: + print("\n❌ FAILED: Project isolation issues detected") + return False + + except Exception as e: + print(f"\n❌ Error during isolation test: {str(e)}") + import traceback + traceback.print_exc() + return False + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test project isolation in Mem0 API') + parser.add_argument('-v', '--verbose', action='store_true', help='Show detailed API responses') + args = parser.parse_args() + + success = run_isolation_test(verbose=args.verbose) + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/tests/test_real_api.py b/tests/test_real_api.py new file mode 100644 index 0000000..ea3d25c --- /dev/null +++ b/tests/test_real_api.py @@ -0,0 +1,189 @@ +""" +Real API test for mem0-mcp. + +This script performs tests against the actual Mem0 API to verify that our implementation +works correctly with the real system. It tests the v2 API endpoints with proper filter usage. +""" +import sys +import os +import json +import time +import argparse +from datetime import datetime + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from mem0 import MemoryClient +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +def run_api_test(verbose=False): + """Run a real-world test against the Mem0 API.""" + print("Starting Real API Test") + print("=====================\n") + + # Create a unique project name for testing to avoid interference + test_project = f"api_test_{int(time.time())}" + print(f"Using test project: {test_project}") + + # Initialize the real Mem0 client + mem0_client = MemoryClient() + + # Optionally use organization and project if needed + # mem0_client = MemoryClient(org_id='YOUR_ORG_ID', project_id='YOUR_PROJECT_ID') + + # Test user ID + user_id = "api_test_user" + + try: + # Step 1: Add a test memory + print("\n1. Adding a test memory...") + test_code = f""" + def hello_world(): + \"\"\"A simple test function created at {datetime.now().isoformat()}\"\"\" + print("Hello, World!") + return "Hello, World!" + """ + + messages = [{"role": "user", "content": test_code}] + result = mem0_client.add(messages, user_id=user_id, output_format="v1.1", metadata={"project": test_project}) + + if verbose: + print(f"Add result: {json.dumps(result, indent=2)}") + + # The result might be a list or a dictionary depending on the API version + memory_id = None + if isinstance(result, dict) and 'id' in result: + memory_id = result['id'] + elif isinstance(result, list) and len(result) > 0 and 'id' in result[0]: + memory_id = result[0]['id'] + + if memory_id: + print(f"Memory added with ID: {memory_id}") + else: + print(f"Memory added successfully, but couldn't extract ID from response format") + + # Give the API a moment to index the new memory + print("Waiting for memory to be indexed...") + time.sleep(2) + + # Step 2: Search for the memory + print("\n2. Searching for the memory...") + search_query = "hello world" + search_result = mem0_client.search( + search_query, + user_id=user_id, + filters={"metadata": {"project": test_project}}, + version="v2" + ) + + if verbose: + print(f"Search result: {json.dumps(search_result, indent=2)}") + + # Handle different response formats + search_results_count = 0 + if isinstance(search_result, dict) and "results" in search_result: + search_results_count = len(search_result["results"]) + elif isinstance(search_result, list): + search_results_count = len(search_result) + + if search_results_count > 0: + print(f"Found {search_results_count} memory matches") + else: + print("No memories found") + + # Step 3: Get all memories for the project + print("\n3. Getting all memories for the project...") + all_memories = mem0_client.get_all( + user_id=user_id, + filters={"metadata": {"project": test_project}}, + version="v2" + ) + + if verbose: + print(f"All memories result: {json.dumps(all_memories, indent=2)}") + + # Handle different response formats + memories_count = 0 + if isinstance(all_memories, dict) and "results" in all_memories: + memories_count = len(all_memories["results"]) + elif isinstance(all_memories, list): + memories_count = len(all_memories) + + print(f"Retrieved {memories_count} memories") + + # Step 4: Verify project isolation + print("\n4. Verifying project isolation...") + other_project = f"other_project_{int(time.time())}" + other_code = f""" + def another_function(): + \"\"\"A function for a different project at {datetime.now().isoformat()}\"\"\" + return "Another function" + """ + + # Add memory to another project + messages = [{"role": "user", "content": other_code}] + mem0_client.add(messages, user_id=user_id, output_format="v1.1", metadata={"project": other_project}) + + # Give the API a moment to index the new memory + print("Waiting for second memory to be indexed...") + time.sleep(2) + + # Verify that searching in the first project doesn't return the second project's memory + search_result = mem0_client.search( + "function", + user_id=user_id, + filters={"metadata": {"project": test_project}}, + version="v2" + ) + + if verbose: + print(f"Project isolation search result: {json.dumps(search_result, indent=2)}") + + # Check if the second project's content is not found in the first project + is_isolated = True + results_to_check = [] + + if isinstance(search_result, dict) and "results" in search_result: + results_to_check = search_result["results"] + elif isinstance(search_result, list): + results_to_check = search_result + + for item in results_to_check: + # Extract message content from the result + messages = [] + if "messages" in item: + messages = item["messages"] + elif "memory" in item and "messages" in item["memory"]: + messages = item["memory"]["messages"] + + # Check each message for the "another_function" content + for message in messages: + if "content" in message and "another_function" in message["content"]: + is_isolated = False + break + + if is_isolated: + print("Project isolation verified: Different projects maintain separate memories") + else: + print("WARNING: Project isolation may not be working correctly") + + print("\nAPI Test Completed Successfully!") + return True + + except Exception as e: + print(f"\n❌ Error during API test: {str(e)}") + import traceback + traceback.print_exc() + return False + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test Mem0 API with v2 endpoints') + parser.add_argument('-v', '--verbose', action='store_true', help='Show detailed API responses') + args = parser.parse_args() + + success = run_api_test(verbose=args.verbose) + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/tests/test_search_coding_preferences.py b/tests/test_search_coding_preferences.py new file mode 100644 index 0000000..4de97fe --- /dev/null +++ b/tests/test_search_coding_preferences.py @@ -0,0 +1,125 @@ +""" +Tests for the search_coding_preferences function in main_metadata_tagging.py. +""" +import pytest +import json +import sys +import os +from unittest.mock import patch + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import main_metadata_tagging + +class TestSearchCodingPreferences: + """Tests for the search_coding_preferences function.""" + + @pytest.mark.asyncio + async def test_search_empty_project(self, patched_mem0_client): + """Test searching when a project has no memories.""" + # Call the search function on an empty project + result = await main_metadata_tagging.search_coding_preferences("test query", "empty_project") + + # Parse the JSON result + memories = json.loads(result) + + # Verify an empty list is returned + assert isinstance(memories, list) + assert len(memories) == 0 + + @pytest.mark.asyncio + async def test_search_with_matches(self, patched_mem0_client): + """Test searching when there are matching memories.""" + # Set up test data + test_project = "search_project" + test_codes = [ + "def find_me(): return 'This should be found'", + "def ignore_me(): return 'This should not be found'", + "def also_find_me(): return 'This should also be found'" + ] + + # Add test memories to the project + for test_code in test_codes: + await main_metadata_tagging.add_coding_preference(test_code, test_project) + + # Call the search function with a query that should match two of the memories + result = await main_metadata_tagging.search_coding_preferences("find", test_project) + + # Parse the JSON result + memories = json.loads(result) + + # Verify the correct number of memories is returned + assert isinstance(memories, list) + assert len(memories) == 2 # Should find two memories with "find" in them + + # Verify the memory content + message_contents = [memory["messages"][0]["content"] for memory in memories] + assert "def find_me(): return 'This should be found'" in message_contents + assert "def also_find_me(): return 'This should also be found'" in message_contents + assert "def ignore_me(): return 'This should not be found'" not in message_contents + + @pytest.mark.asyncio + async def test_search_no_matches(self, patched_mem0_client): + """Test searching when there are no matching memories.""" + # Set up test data + test_project = "no_match_project" + await main_metadata_tagging.add_coding_preference("def example(): pass", test_project) + + # Call the search function with a query that shouldn't match + result = await main_metadata_tagging.search_coding_preferences("nonexistent", test_project) + + # Parse the JSON result + memories = json.loads(result) + + # Verify no memories are returned + assert isinstance(memories, list) + assert len(memories) == 0 + + @pytest.mark.asyncio + async def test_search_default_project(self, patched_mem0_client): + """Test searching in the default project.""" + # Add test memory to the default project + test_code = "print('Default project searchable content')" + await main_metadata_tagging.add_coding_preference(test_code) + + # Call the search function without specifying a project + result = await main_metadata_tagging.search_coding_preferences("searchable") + + # Parse the JSON result + memories = json.loads(result) + + # Verify the memory is returned + assert isinstance(memories, list) + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == test_code + + @pytest.mark.asyncio + async def test_search_project_isolation(self, patched_mem0_client): + """Test that search is isolated to the specified project.""" + # Add memories to two different projects with similar content + project1 = "search_isolation1" + project2 = "search_isolation2" + + await main_metadata_tagging.add_coding_preference("Project 1 special code", project1) + await main_metadata_tagging.add_coding_preference("Project 2 special code", project2) + + # Search in project1 for "special" + result = await main_metadata_tagging.search_coding_preferences("special", project1) + memories = json.loads(result) + + # Verify only project1 memories are returned + assert len(memories) == 1 + assert memories[0]["messages"][0]["content"] == "Project 1 special code" + + @pytest.mark.asyncio + async def test_search_error_handling(self): + """Test error handling when the client raises an exception.""" + # Patch the mem0_client.search method to raise an exception + with patch("main_metadata_tagging.mem0_client.search", side_effect=Exception("Test error")): + # Call the function + result = await main_metadata_tagging.search_coding_preferences("query", "test_project") + + # Verify error is handled properly + assert "Error searching preferences" in result + assert "Test error" in result \ No newline at end of file