diff --git a/mcpgateway/translate.py b/mcpgateway/translate.py index 990de5b39..5a6321bc2 100644 --- a/mcpgateway/translate.py +++ b/mcpgateway/translate.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- + '''Location: ./mcpgateway/translate.py Copyright 2025 SPDX-License-Identifier: Apache-2.0 @@ -123,7 +124,7 @@ import shlex import signal import sys -from typing import Any, AsyncIterator, cast, Dict, List, Optional, Sequence, Tuple +from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlencode import uuid @@ -146,6 +147,7 @@ from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from starlette.applications import Starlette from starlette.routing import Route +from starlette.types import Receive, Scope, Send # First-Party from mcpgateway.services.logging_service import LoggingService @@ -380,6 +382,10 @@ async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> N >>> asyncio.run(test_start()) # doctest: +SKIP True """ + # Stop existing subprocess before starting a new one + if self._proc is not None: + await self.stop() + LOGGER.info(f"Starting stdio subprocess: {self._cmd}") # Build environment from base + configured + additional @@ -388,12 +394,18 @@ async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> N if additional_env_vars: env.update(additional_env_vars) + # System-critical environment variables that must never be cleared + system_critical_vars = {"PATH", "HOME", "TMPDIR", "TEMP", "TMP", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", "PYTHONHOME", "PYTHONPATH"} + # Clear any mapped env vars that weren't provided in headers to avoid inheritance if self._header_mappings: for env_var_name in self._header_mappings.values(): - if env_var_name not in (additional_env_vars or {}): - env[env_var_name] = "" + if env_var_name not in (additional_env_vars or {}) and env_var_name not in system_critical_vars: + # Delete the variable instead of setting to empty string to avoid + # breaking subprocess initialization + env.pop(env_var_name, None) + LOGGER.debug(f"Subprocess environment variables: {list(env.keys())}") self._proc = await asyncio.create_subprocess_exec( *shlex.split(self._cmd), stdin=asyncio.subprocess.PIPE, @@ -406,6 +418,8 @@ async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> N if not self._proc.stdin or not self._proc.stdout: raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {self._cmd}") + LOGGER.debug("Subprocess started successfully") + self._stdin = self._proc.stdin self._pump_task = asyncio.create_task(self._pump_stdout()) @@ -677,7 +691,7 @@ def _build_fastapi( # Add CORS middleware if origins specified if cors_origins: app.add_middleware( - cast("type", CORSMiddleware), + CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], @@ -1073,7 +1087,7 @@ async def _run_stdio_to_sse( log_level=log_level, lifespan="off", ) - server = uvicorn.Server(config) + uvicorn_server = uvicorn.Server(config) shutting_down = asyncio.Event() # 🔄 make shutdown idempotent @@ -1103,24 +1117,15 @@ async def _shutdown() -> None: await stdio.stop() # Graceful shutdown by setting the shutdown event # Use getattr to safely access should_exit attribute - setattr(server, "should_exit", getattr(server, "should_exit", False) or True) + setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): with suppress(NotImplementedError): # Windows lacks add_signal_handler - - def shutdown_handler(*args): # pylint: disable=unused-argument - """Handle shutdown signal by creating shutdown task. - - Args: - *args: Signal handler arguments (unused). - """ - asyncio.create_task(_shutdown()) - - loop.add_signal_handler(sig, shutdown_handler) + loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) LOGGER.info(f"Bridge ready → http://{host}:{port}{sse_path}") - await server.serve() + await uvicorn_server.serve() await _shutdown() # final cleanup @@ -1377,7 +1382,7 @@ async def _run_stdio_to_streamable_http( LOGGER.info(f"Starting stdio to streamable HTTP bridge for command: {cmd}") # Create a simple MCP server that will proxy to stdio subprocess - server = MCPServer(name="stdio-proxy") + mcp_server = MCPServer(name="stdio-proxy") # Create subprocess for stdio communication process = await asyncio.create_subprocess_exec( @@ -1392,13 +1397,13 @@ async def _run_stdio_to_streamable_http( # Set up the streamable HTTP session manager with the server session_manager = StreamableHTTPSessionManager( - app=server, + app=mcp_server, stateless=stateless, json_response=json_response, ) # Create Starlette app to host the streamable HTTP endpoint - async def handle_mcp(request) -> None: + async def handle_mcp(request: Request) -> None: """Handle MCP requests via streamable HTTP. Args: @@ -1418,8 +1423,8 @@ async def handle_mcp(request) -> None: >>> asyncio.run(test_handle()) True """ - # The session manager handles all the protocol details - await session_manager.handle_request(request.scope, request.receive, request.send) + # The session manager handles all the protocol details - Note: I don't like accessing _send directly -JPS + await session_manager.handle_request(request.scope, request.receive, request._send) # pylint: disable=W0212 routes = [ Route("/mcp", handle_mcp, methods=["GET", "POST"]), @@ -1430,12 +1435,8 @@ async def handle_mcp(request) -> None: # Add CORS middleware if specified if cors: - # Import here to avoid unnecessary dependency when CORS not used - # Third-Party - from starlette.middleware.cors import CORSMiddleware as StarletteCORS # pylint: disable=import-outside-toplevel - app.add_middleware( - cast("type", StarletteCORS), + CORSMiddleware, allow_origins=cors, allow_credentials=True, allow_methods=["*"], @@ -1450,7 +1451,7 @@ async def handle_mcp(request) -> None: log_level=log_level, lifespan="off", ) - server = uvicorn.Server(config) + uvicorn_server = uvicorn.Server(config) shutting_down = asyncio.Event() @@ -1466,21 +1467,12 @@ async def _shutdown() -> None: await asyncio.wait_for(process.wait(), 5) # Graceful shutdown by setting the shutdown event # Use getattr to safely access should_exit attribute - setattr(server, "should_exit", getattr(server, "should_exit", False) or True) + setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): with suppress(NotImplementedError): # Windows lacks add_signal_handler - - def shutdown_handler(*args): # pylint: disable=unused-argument - """Handle shutdown signal by creating shutdown task. - - Args: - *args: Signal handler arguments (unused). - """ - asyncio.create_task(_shutdown()) - - loop.add_signal_handler(sig, shutdown_handler) + loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) # Pump messages between stdio and HTTP async def pump_stdio_to_http() -> None: @@ -1537,7 +1529,7 @@ async def pump_http_to_stdio(data: str) -> None: try: LOGGER.info(f"Streamable HTTP bridge ready → http://{host}:{port}/mcp") - await server.serve() + await uvicorn_server.serve() finally: pump_task.cancel() await _shutdown() @@ -1816,7 +1808,7 @@ async def _run_multi_protocol_server( # pylint: disable=too-many-positional-arg # Add CORS middleware if specified if cors: app.add_middleware( - cast("type", CORSMiddleware), + CORSMiddleware, allow_origins=cors, allow_credentials=True, allow_methods=["*"], @@ -2060,7 +2052,7 @@ async def mcp_post(request: Request) -> Response: return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED) # ASGI wrapper to route GET/other /mcp scopes to streamable_manager.handle_request - async def mcp_asgi_wrapper(scope, receive, send): + async def mcp_asgi_wrapper(scope: Scope, receive: Receive, send: Send) -> None: """ ASGI middleware that intercepts HTTP requests to the `/mcp` endpoint. @@ -2069,9 +2061,9 @@ async def mcp_asgi_wrapper(scope, receive, send): passed to the original FastAPI application. Args: - scope (dict): The ASGI scope dictionary containing request metadata. - receive (Callable): An awaitable that yields incoming ASGI events. - send (Callable): An awaitable used to send ASGI events. + scope (Scope): The ASGI scope dictionary containing request metadata. + receive (Receive): An awaitable that yields incoming ASGI events. + send (Send): An awaitable used to send ASGI events. """ if scope.get("type") == "http" and scope.get("path") == "/mcp" and streamable_manager: # Let StreamableHTTPSessionManager handle session-oriented streaming @@ -2082,7 +2074,7 @@ async def mcp_asgi_wrapper(scope, receive, send): await original_app(scope, receive, send) # Replace the app used by uvicorn with the ASGI wrapper - app = mcp_asgi_wrapper + app = mcp_asgi_wrapper # type: ignore[assignment] # ---------------------- Server lifecycle ---------------------- config = uvicorn.Config( @@ -2112,16 +2104,7 @@ async def _shutdown() -> None: loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): with suppress(NotImplementedError): - - def shutdown_handler(*args): # pylint: disable=unused-argument - """Handle shutdown signal by creating shutdown task. - - Args: - *args: Signal handler arguments (unused). - """ - asyncio.create_task(_shutdown()) - - loop.add_signal_handler(sig, shutdown_handler) + loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) # If we have a streamable manager, start its context so it can accept ASGI /mcp if streamable_manager: diff --git a/tests/unit/mcpgateway/test_translate.py b/tests/unit/mcpgateway/test_translate.py index 9edacb8cb..67d716452 100644 --- a/tests/unit/mcpgateway/test_translate.py +++ b/tests/unit/mcpgateway/test_translate.py @@ -958,8 +958,12 @@ def _fake_main(argv=None): assert executed == ["main_called"] +@pytest.mark.filterwarnings("ignore::RuntimeWarning") def test_main_function_stdio(monkeypatch, translate): - """Test main() function with --stdio argument.""" + """Test main() function with --stdio argument. + + Note: This test closes coroutines which may generate RuntimeWarnings during garbage collection. + """ executed: list[str] = [] async def _fake_stdio_runner(*args): @@ -982,8 +986,12 @@ def _fake_asyncio_run(coro): assert "asyncio_run" in executed +@pytest.mark.filterwarnings("ignore::RuntimeWarning") def test_main_function_sse(monkeypatch, translate): - """Test main() function with --sse argument.""" + """Test main() function with --sse argument. + + Note: This test closes coroutines which may generate RuntimeWarnings during garbage collection. + """ executed: list[str] = [] async def _fake_sse_runner(*args): @@ -1003,8 +1011,13 @@ def _fake_asyncio_run(coro): assert "asyncio_run" in executed +@pytest.mark.filterwarnings("ignore::RuntimeWarning") def test_main_function_keyboard_interrupt(monkeypatch, translate, capsys): - """Test main() function handles KeyboardInterrupt gracefully.""" + """Test main() function handles KeyboardInterrupt gracefully. + + Note: This test raises KeyboardInterrupt which prevents the coroutine from being awaited, + resulting in a RuntimeWarning during garbage collection. This is expected behavior. + """ def _raise_keyboard_interrupt(*args): raise KeyboardInterrupt() @@ -1019,8 +1032,13 @@ def _raise_keyboard_interrupt(*args): assert captured.out == "\n" # Should print newline to restore shell prompt +@pytest.mark.filterwarnings("ignore::RuntimeWarning") def test_main_function_not_implemented_error(monkeypatch, translate, capsys): - """Test main() function handles NotImplementedError.""" + """Test main() function handles NotImplementedError. + + Note: This test raises NotImplementedError which prevents the coroutine from being awaited, + resulting in a RuntimeWarning during garbage collection. This is expected behavior. + """ # def _raise_not_implemented(coro, *a, **kw): # # close the coroutine if the autouse fixture didn't remove it @@ -1405,29 +1423,9 @@ def __init__(self, routes=None): def add_middleware(self, middleware_class, **kwargs): calls.append(f"add_middleware_{middleware_class.__name__}") - # Mock Starlette CORS middleware import - class MockCORSMiddleware: - def __init__(self, **kwargs): - pass - - # Mock the import path for CORS middleware - # Standard - import types - - cors_module = types.ModuleType("cors") - cors_module.CORSMiddleware = MockCORSMiddleware - middleware_module = types.ModuleType("middleware") - middleware_module.cors = cors_module - starlette_module = types.ModuleType("starlette") - starlette_module.middleware = middleware_module - # Standard import sys - sys.modules["starlette"] = starlette_module - sys.modules["starlette.middleware"] = middleware_module - sys.modules["starlette.middleware.cors"] = cors_module - class MockTask: def cancel(self): pass @@ -1470,7 +1468,7 @@ async def mock_shutdown(): await translate._run_stdio_to_streamable_http("echo test", 8000, "info", cors=["http://example.com"]) # Verify CORS middleware was added (using our Mock class name) - assert "add_middleware_MockCORSMiddleware" in calls + assert "add_middleware_CORSMiddleware" in calls finally: # Clean up sys.modules to avoid affecting other tests sys.modules.pop("starlette", None) diff --git a/tests/unit/mcpgateway/test_translate_stdio_endpoint.py b/tests/unit/mcpgateway/test_translate_stdio_endpoint.py index 4f08cdd9e..c25b04818 100644 --- a/tests/unit/mcpgateway/test_translate_stdio_endpoint.py +++ b/tests/unit/mcpgateway/test_translate_stdio_endpoint.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- +#│ -*- coding: utf-8 -*- """Unit tests for StdIOEndpoint with environment variable support. Location: ./tests/unit/mcpgateway/test_translate_stdio_endpoint.py @@ -9,49 +9,47 @@ Tests for StdIOEndpoint class modifications to support dynamic environment variables. """ -import sys import asyncio -import pytest -import tempfile +import json +import logging import os +import sys +import tempfile from unittest.mock import Mock, patch +import pytest + # First-Party -from mcpgateway.translate import StdIOEndpoint, _PubSub +from mcpgateway.translate import _PubSub, StdIOEndpoint class TestStdIOEndpointEnvironmentVariables: """Test StdIOEndpoint with environment variable support.""" - @pytest.fixture - def test_script(self): - """Create a test script that prints environment variables.""" - script_content = """#!/usr/bin/env python3 -import os -import json -import sys + async def _read_output(self, pubsub: _PubSub, timeout: float = 2.0) -> str: + """Helper method to read output from pubsub subscriber. -# Print specified environment variables -env_vars = {} -for var in sys.argv[1:]: - if var in os.environ: - env_vars[var] = os.environ[var] + Args: + pubsub: The pubsub instance to subscribe to + timeout: Maximum time to wait for output in seconds -print(json.dumps(env_vars)) -sys.stdout.flush() -""" + Returns: + The output string received from the subprocess - with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: - f.write(script_content) - f.flush() - os.chmod(f.name, 0o755) - try: - yield f.name - finally: - try: - os.unlink(f.name) - except OSError: - pass + Raises: + asyncio.TimeoutError: If no output received within timeout + """ + queue = pubsub.subscribe() + try: + output = await asyncio.wait_for(queue.get(), timeout=timeout) + return output + finally: + pubsub.unsubscribe(queue) + + @pytest.fixture + def test_script(self): + """Return the path to the test script that echoes environment variables.""" + return os.path.join(os.path.dirname(__file__), "fixtures", "stdio_echo_env.py") @pytest.fixture def echo_script(self): @@ -98,22 +96,28 @@ def test_stdio_endpoint_init_without_env_vars(self): assert endpoint._pubsub is pubsub assert endpoint._env_vars == {} assert endpoint._proc is None + assert endpoint._stdin is None + assert endpoint._pump_task is None @pytest.mark.asyncio - async def test_start_with_initial_env_vars(self, test_script): + async def test_start_with_initial_env_vars(self, test_script, caplog): """Test starting StdIOEndpoint with initial environment variables.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") pubsub = _PubSub() env_vars = {"GITHUB_TOKEN": "test-token-123", "TENANT_ID": "acme-corp"} - endpoint = StdIOEndpoint(f"python3 {test_script}", pubsub, env_vars) + # Pass env var names as command line arguments, not via stdin + endpoint = StdIOEndpoint("jq -cMn env", pubsub, env_vars) await endpoint.start() try: - # Send request to check environment variables - await endpoint.send('["GITHUB_TOKEN", "TENANT_ID"]\n') + # Read output from pubsub to verify environment variables were set + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify environment variables were properly set + assert result["GITHUB_TOKEN"] == "test-token-123" + assert result["TENANT_ID"] == "acme-corp" # Check that process was started assert endpoint._proc is not None @@ -124,21 +128,26 @@ async def test_start_with_initial_env_vars(self, test_script): await endpoint.stop() @pytest.mark.asyncio - async def test_start_with_additional_env_vars(self, test_script): + async def test_start_with_additional_env_vars(self, test_script, caplog): """Test starting StdIOEndpoint with additional environment variables.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") pubsub = _PubSub() initial_env_vars = {"BASE_VAR": "base-value"} additional_env_vars = {"GITHUB_TOKEN": "additional-token", "TENANT_ID": "additional-tenant"} - endpoint = StdIOEndpoint(f"python3 {test_script}", pubsub, initial_env_vars) + # Pass env var names as command line arguments + endpoint = StdIOEndpoint("jq -cMn env", pubsub, initial_env_vars) await endpoint.start(additional_env_vars=additional_env_vars) try: - # Send request to check environment variables - await endpoint.send('["BASE_VAR", "GITHUB_TOKEN", "TENANT_ID"]\n') + # Read output from pubsub to verify environment variables + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify all environment variables were properly set + assert result["BASE_VAR"] == "base-value" + assert result["GITHUB_TOKEN"] == "additional-token" + assert result["TENANT_ID"] == "additional-tenant" # Check that process was started assert endpoint._proc is not None @@ -147,21 +156,25 @@ async def test_start_with_additional_env_vars(self, test_script): await endpoint.stop() @pytest.mark.asyncio - async def test_environment_variable_override(self, test_script): + async def test_environment_variable_override(self, test_script, caplog): """Test that additional environment variables override initial ones.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") pubsub = _PubSub() initial_env_vars = {"GITHUB_TOKEN": "initial-token", "BASE_VAR": "base-value"} additional_env_vars = {"GITHUB_TOKEN": "override-token"} # Override initial - endpoint = StdIOEndpoint(f"python3 {test_script}", pubsub, initial_env_vars) + # Pass env var names as command line arguments + endpoint = StdIOEndpoint("jq -cMn env", pubsub, initial_env_vars) await endpoint.start(additional_env_vars=additional_env_vars) try: - # Send request to check environment variables - await endpoint.send('["GITHUB_TOKEN", "BASE_VAR"]\n') + # Read output from pubsub to verify environment variables + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify that additional env var overrode the initial one + assert result["GITHUB_TOKEN"] == "override-token" + assert result["BASE_VAR"] == "base-value" # Check that process was started assert endpoint._proc is not None @@ -174,7 +187,7 @@ async def test_start_without_env_vars(self, echo_script): """Test starting StdIOEndpoint without environment variables.""" pubsub = _PubSub() - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub) await endpoint.start() try: @@ -191,11 +204,12 @@ async def test_start_without_env_vars(self, echo_script): await endpoint.stop() @pytest.mark.asyncio + @pytest.mark.filterwarnings("error::RuntimeWarning") async def test_start_twice_handled_gracefully(self, echo_script): """Test that starting an already started endpoint is handled gracefully.""" pubsub = _PubSub() - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub) await endpoint.start() try: @@ -232,7 +246,7 @@ async def test_stop_after_start(self, echo_script): """Test stopping after starting.""" pubsub = _PubSub() - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub) await endpoint.start() assert endpoint._proc is not None @@ -251,31 +265,35 @@ async def test_stop_after_start(self, echo_script): assert endpoint._pump_task.done() # Task should be finished @pytest.mark.asyncio - async def test_multiple_env_vars(self, test_script): + async def test_multiple_env_vars(self, test_script, caplog): """Test with multiple environment variables.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") + pubsub = _PubSub() - env_vars = os.environ.copy() - env_vars.update( - { - "GITHUB_TOKEN": "github-token-123", - "TENANT_ID": "acme-corp", - "API_KEY": "api-key-456", - "ENVIRONMENT": "production", - "DEBUG": "false", - } - ) + env_vars = { + "GITHUB_TOKEN": "github-token-123", + "TENANT_ID": "acme-corp", + "API_KEY": "api-key-456", + "ENVIRONMENT": "production", + "DEBUG": "false", + } - endpoint = StdIOEndpoint(f"{sys.executable} {test_script}", pubsub, env_vars) + endpoint = StdIOEndpoint( "jq -cMn env", pubsub, env_vars) await endpoint.start() try: - # Send request to check all environment variables - await endpoint.send('["GITHUB_TOKEN", "TENANT_ID", "API_KEY", "ENVIRONMENT", "DEBUG"]\n') + # Read output from pubsub to verify environment variables + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify all environment variables were properly set + assert result["GITHUB_TOKEN"] == "github-token-123" + assert result["TENANT_ID"] == "acme-corp" + assert result["API_KEY"] == "api-key-456" + assert result["ENVIRONMENT"] == "production" + assert result["DEBUG"] == "false" # Check that process was started assert endpoint._proc is not None @@ -289,7 +307,7 @@ async def test_empty_env_vars(self, echo_script): pubsub = _PubSub() env_vars: dict[str, str] = {} - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub, env_vars) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub, env_vars) await endpoint.start() try: @@ -310,7 +328,7 @@ async def test_none_env_vars(self, echo_script): """Test with None environment variables.""" pubsub = _PubSub() - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub, None) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub, None) await endpoint.start() try: @@ -327,8 +345,9 @@ async def test_none_env_vars(self, echo_script): await endpoint.stop() @pytest.mark.asyncio - async def test_env_vars_with_special_characters(self, test_script): + async def test_env_vars_with_special_characters(self, test_script, caplog): """Test environment variables with special characters.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") pubsub = _PubSub() env_vars = { "API_TOKEN": "Bearer token-123!@#$%^&*()", @@ -336,15 +355,19 @@ async def test_env_vars_with_special_characters(self, test_script): "JSON_CONFIG": '{"key": "value", "number": 123}', } - endpoint = StdIOEndpoint(f"python3 {test_script}", pubsub, env_vars) + # Pass env var names as command line arguments + endpoint = StdIOEndpoint("jq -cMn env", pubsub, env_vars) await endpoint.start() try: - # Send request to check environment variables - await endpoint.send('["API_TOKEN", "URL", "JSON_CONFIG"]\n') + # Read output from pubsub to verify environment variables + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify environment variables with special characters were properly set + assert result["API_TOKEN"] == "Bearer token-123!@#$%^&*()" + assert result["URL"] == "https://api.example.com/v1" + assert result["JSON_CONFIG"] == '{"key": "value", "number": 123}' # Check that process was started assert endpoint._proc is not None @@ -353,8 +376,10 @@ async def test_env_vars_with_special_characters(self, test_script): await endpoint.stop() @pytest.mark.asyncio - async def test_large_env_vars(self, test_script): + async def test_large_env_vars(self, test_script, caplog): """Test with large environment variable values.""" + caplog.set_level(logging.DEBUG, logger="mcpgateway.translate") + pubsub = _PubSub() large_value = "x" * 1000 # 1KB value env_vars = { @@ -362,15 +387,18 @@ async def test_large_env_vars(self, test_script): "NORMAL_VAR": "normal", } - endpoint = StdIOEndpoint(f"python3 {test_script}", pubsub, env_vars) + # Pass env var names as command line arguments + endpoint = StdIOEndpoint("jq -cMn env", pubsub, env_vars) await endpoint.start() try: - # Send request to check environment variables - await endpoint.send('["LARGE_TOKEN", "NORMAL_VAR"]\n') + # Read output from pubsub to verify environment variables + output = await self._read_output(pubsub) + result = json.loads(output.strip()) - # Wait for response - await asyncio.sleep(0.1) + # Verify large environment variable was properly set + assert result["LARGE_TOKEN"] == large_value + assert result["NORMAL_VAR"] == "normal" # Check that process was started assert endpoint._proc is not None @@ -495,7 +523,7 @@ async def test_old_start_method_still_works(self, echo_script): """Test that old start method still works.""" pubsub = _PubSub() - endpoint = StdIOEndpoint(f"python3 {echo_script}", pubsub) + endpoint = StdIOEndpoint(f"{sys.executable} {echo_script}", pubsub) await endpoint.start() # No additional_env_vars parameter try: