From 16c8cca16d7e5e6fa16bc9f75817402a9598ca72 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Mon, 13 Jan 2025 03:24:58 +0100 Subject: [PATCH 01/10] Update mcp_sse.py Uses Python's built-in asyncio.timeout() context manager Properly handles timeout exceptions Maintains the same functionality but with correct async context management --- .../base/langflow/components/tools/mcp_sse.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 8c813a57464c..6f29ae7e6a79 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -38,19 +38,23 @@ async def connect_to_server( if headers is None: headers = {} url = await self.pre_check_redirect(url) + + # Use asyncio.timeout() instead of timeout_context + try: + async with asyncio.timeout(timeout_seconds): + sse_transport = await self.exit_stack.enter_async_context( + sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) + ) + self.sse, self.write = sse_transport + self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) - async with timeout_context(timeout_seconds): - sse_transport = await self.exit_stack.enter_async_context( - sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) - ) - self.sse, self.write = sse_transport - self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) - - await self.session.initialize() + await self.session.initialize() - # List available tools - response = await self.session.list_tools() - return response.tools + # List available tools + response = await self.session.list_tools() + return response.tools + except asyncio.TimeoutError: + raise TimeoutError(f"Connection to {url} timed out after {timeout_seconds} seconds") class MCPSse(Component): From 14c2a6fdf38728667092d488c4c2465436eb423c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 13 Jan 2025 02:46:26 +0000 Subject: [PATCH 02/10] [autofix.ci] apply automated fixes --- src/backend/base/langflow/components/tools/mcp_sse.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 6f29ae7e6a79..e73e40657d8e 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -10,7 +10,6 @@ from langflow.custom import Component from langflow.field_typing import Tool from langflow.io import MessageTextInput, Output -from langflow.utils.async_helpers import timeout_context # Define constant for status code HTTP_TEMPORARY_REDIRECT = 307 @@ -38,7 +37,7 @@ async def connect_to_server( if headers is None: headers = {} url = await self.pre_check_redirect(url) - + # Use asyncio.timeout() instead of timeout_context try: async with asyncio.timeout(timeout_seconds): From 995b4d388153079cde329903c3ffbcb0a7faf540 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Mon, 13 Jan 2025 04:09:41 +0100 Subject: [PATCH 03/10] add asyncio +clean up comment --- src/backend/base/langflow/components/tools/mcp_sse.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index e73e40657d8e..43debf9becd6 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -1,5 +1,6 @@ # from langflow.field_typing import Data from contextlib import AsyncExitStack +import asyncio import httpx from mcp import ClientSession, types @@ -37,8 +38,7 @@ async def connect_to_server( if headers is None: headers = {} url = await self.pre_check_redirect(url) - - # Use asyncio.timeout() instead of timeout_context + try: async with asyncio.timeout(timeout_seconds): sse_transport = await self.exit_stack.enter_async_context( From 50b079f23af7f7a12be8cfecf7ac52b0ea37f18d Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 13 Jan 2025 03:11:14 +0000 Subject: [PATCH 04/10] [autofix.ci] apply automated fixes --- src/backend/base/langflow/components/tools/mcp_sse.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 43debf9becd6..5b0eb2da8b1d 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -1,6 +1,6 @@ # from langflow.field_typing import Data -from contextlib import AsyncExitStack import asyncio +from contextlib import AsyncExitStack import httpx from mcp import ClientSession, types @@ -38,7 +38,7 @@ async def connect_to_server( if headers is None: headers = {} url = await self.pre_check_redirect(url) - + try: async with asyncio.timeout(timeout_seconds): sse_transport = await self.exit_stack.enter_async_context( From 7daa5d6ebe492895c347d89046448d5a63f4d06d Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Tue, 14 Jan 2025 02:03:32 +0100 Subject: [PATCH 05/10] missing arg_schema in Tool Missing args_schema inside cause that tools are generated without input schema and are not able to be properly executed as agent know tool, but dost know what input field tool have. Same problem looks to be in MCP STDIO. --- src/backend/base/langflow/components/tools/mcp_sse.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 5b0eb2da8b1d..212bc574869c 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -92,6 +92,7 @@ async def build_output(self) -> list[Tool]: Tool( name=tool.name, # maybe format this description=tool.description, + args_schema=args_schema, coroutine=create_tool_coroutine(tool.name, args_schema, self.client.session), func=create_tool_func(tool.name, self.client.session), ) From d58a4fbf5fc9f58afd771f065b05eebd151f994e Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Tue, 14 Jan 2025 02:08:20 +0100 Subject: [PATCH 06/10] fix Ruff Check Line 56: Error: B904 Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling TRY003 Avoid specifying long messages outside the exception class EM102 Exception must not use an f-string literal, assign to variable first --- src/backend/base/langflow/components/tools/mcp_sse.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 212bc574869c..b33babf9386b 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -52,9 +52,9 @@ async def connect_to_server( # List available tools response = await self.session.list_tools() return response.tools - except asyncio.TimeoutError: - raise TimeoutError(f"Connection to {url} timed out after {timeout_seconds} seconds") - + except asyncio.TimeoutError as err: + error_message = f"Connection to {url} timed out after {timeout_seconds} seconds" + raise TimeoutError(error_message) from err class MCPSse(Component): client = MCPSseClient() From 22c512c70c17b69186ceb47db9de9666f16eea43 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jan 2025 01:09:57 +0000 Subject: [PATCH 07/10] [autofix.ci] apply automated fixes --- src/backend/base/langflow/components/tools/mcp_sse.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index b33babf9386b..62d06081c274 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -56,6 +56,7 @@ async def connect_to_server( error_message = f"Connection to {url} timed out after {timeout_seconds} seconds" raise TimeoutError(error_message) from err + class MCPSse(Component): client = MCPSseClient() tools = types.ListToolsResult From fff5ffe648442a32d0a54db447d303fc3db78445 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Fri, 24 Jan 2025 21:54:17 +0100 Subject: [PATCH 08/10] remove asyncio.timeout Remove asyncio.timeout() (not valid for Py3.10) and replace it by asyncio.wait_for() --- .../base/langflow/components/tools/mcp_sse.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 62d06081c274..f15a84892ab7 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -38,24 +38,26 @@ async def connect_to_server( if headers is None: headers = {} url = await self.pre_check_redirect(url) - try: - async with asyncio.timeout(timeout_seconds): - sse_transport = await self.exit_stack.enter_async_context( - sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) - ) - self.sse, self.write = sse_transport - self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) - - await self.session.initialize() - - # List available tools - response = await self.session.list_tools() - return response.tools + await asyncio.wait_for( + self._connect_with_timeout(url, headers, timeout_seconds, sse_read_timeout_seconds), + timeout=timeout_seconds + ) + # List available tools + response = await self.session.list_tools() + return response.tools except asyncio.TimeoutError as err: error_message = f"Connection to {url} timed out after {timeout_seconds} seconds" raise TimeoutError(error_message) from err + async def _connect_with_timeout(self, url: str, headers: dict[str, str] | None, timeout_seconds: int, sse_read_timeout_seconds: int): + sse_transport = await self.exit_stack.enter_async_context( + sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) + ) + self.sse, self.write = sse_transport + self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) + await self.session.initialize() + class MCPSse(Component): client = MCPSseClient() From c8ccdce78d1a37b4c459f7986f4e5b7f0c058c2a Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 24 Jan 2025 20:55:53 +0000 Subject: [PATCH 09/10] [autofix.ci] apply automated fixes --- src/backend/base/langflow/components/tools/mcp_sse.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index f15a84892ab7..d73cdd25df30 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -41,7 +41,7 @@ async def connect_to_server( try: await asyncio.wait_for( self._connect_with_timeout(url, headers, timeout_seconds, sse_read_timeout_seconds), - timeout=timeout_seconds + timeout=timeout_seconds, ) # List available tools response = await self.session.list_tools() @@ -50,7 +50,9 @@ async def connect_to_server( error_message = f"Connection to {url} timed out after {timeout_seconds} seconds" raise TimeoutError(error_message) from err - async def _connect_with_timeout(self, url: str, headers: dict[str, str] | None, timeout_seconds: int, sse_read_timeout_seconds: int): + async def _connect_with_timeout( + self, url: str, headers: dict[str, str] | None, timeout_seconds: int, sse_read_timeout_seconds: int + ): sse_transport = await self.exit_stack.enter_async_context( sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) ) From 26b307afdb0307b4545be6b292132204ce076fbd Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Fri, 24 Jan 2025 22:12:40 +0100 Subject: [PATCH 10/10] Ruff (TRY300) Move return response.tools inside an else block. This makes it clearer that tools are returned only if the connection is successful, and not if a TimeoutError occurs. --- src/backend/base/langflow/components/tools/mcp_sse.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index d73cdd25df30..2b119b4173d0 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -45,10 +45,11 @@ async def connect_to_server( ) # List available tools response = await self.session.list_tools() - return response.tools except asyncio.TimeoutError as err: error_message = f"Connection to {url} timed out after {timeout_seconds} seconds" raise TimeoutError(error_message) from err + else: # Only executed if no TimeoutError + return response.tools async def _connect_with_timeout( self, url: str, headers: dict[str, str] | None, timeout_seconds: int, sse_read_timeout_seconds: int