diff --git a/pyproject.toml b/pyproject.toml index 6463fc637490..609691310425 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -138,7 +138,8 @@ dependencies = [ "cuga~=0.1.11", "agent-lifecycle-toolkit~=0.4.4", "astrapy>=2.1.0,<3.0.0", - "aioboto3>=15.2.0,<16.0.0" + "aioboto3>=15.2.0,<16.0.0", + "ag-ui-protocol>=0.1.10", ] diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 4f00afb455f0..f043e36f3549 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -215,7 +215,7 @@ async def stop_streamable_http_manager() -> None: await _streamable_http.stop() -streamable_http_route_config = { # use for all streamable http routes (except for the health check) +streamable_http_route_config = { # use for all streamable http routes (except for the health check) "methods": ["GET", "POST", "DELETE"], "response_class": ResponseNoOp, } diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index e873bad4a190..38bada8c129c 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -386,6 +386,7 @@ async def _handle_project_sse_messages( current_project_ctx.reset(project_token) current_request_variables_ctx.reset(req_vars_token) + @router.post("/{project_id}") @router.post("/{project_id}/") async def handle_project_messages( @@ -438,7 +439,6 @@ async def _dispatch_project_streamable_http( return ResponseNoOp(status_code=200) - streamable_http_route_config = { "methods": ["GET", "POST", "DELETE"], "response_class": ResponseNoOp, diff --git a/src/backend/tests/unit/api/utils/test_config_utils.py b/src/backend/tests/unit/api/utils/test_config_utils.py index 0dc9efcb74da..9dc0238eb8f3 100644 --- a/src/backend/tests/unit/api/utils/test_config_utils.py +++ b/src/backend/tests/unit/api/utils/test_config_utils.py @@ -20,7 +20,7 @@ def _build_server_config(base_url: str, project_id, transport: str): """Return URL and server config for a given transport.""" suffix = "streamable" if transport == "streamable" else "sse" url = f"{base_url}/api/v1/mcp/project/{project_id}/{suffix}" - if transport == "streamable": # noqa: SIM108 + if transport == "streamable": # noqa: SIM108 args = ["mcp-proxy", "--transport", "streamablehttp", url] else: args = ["mcp-proxy", url] diff --git a/src/lfx/pyproject.toml b/src/lfx/pyproject.toml index 0c59307455f4..5378dcb92056 100644 --- a/src/lfx/pyproject.toml +++ b/src/lfx/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "validators>=0.34.0,<1.0.0", "filelock>=3.20.0", "pypdf>=5.1.0", + "ag-ui-protocol>=0.1.10", ] [project.scripts] diff --git a/src/lfx/src/lfx/events/observability/__init__.py b/src/lfx/src/lfx/events/observability/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/lfx/src/lfx/events/observability/lifecycle_events.py b/src/lfx/src/lfx/events/observability/lifecycle_events.py new file mode 100644 index 000000000000..ef52a68fdae3 --- /dev/null +++ b/src/lfx/src/lfx/events/observability/lifecycle_events.py @@ -0,0 +1,126 @@ +import functools +from collections.abc import Awaitable, Callable +from typing import Any + +from ag_ui.encoder import EventEncoder + +from lfx.log.logger import logger + +AsyncMethod = Callable[..., Awaitable[Any]] + + +def observable(observed_method: AsyncMethod) -> AsyncMethod: + """Make an async method emit lifecycle events by invoking optional lifecycle hooks on its host before execution, after successful completion, and on exceptions. + + If implemented on the host object, the following hooks will be called to produce event payloads: + - before_callback_event(*args, **kwargs): called before the observed method executes. + - after_callback_event(result, *args, **kwargs): called after the observed method completes successfully; receives the method result as the first argument. + - error_callback_event(exception, *args, **kwargs): called if the observed method raises an exception; receives the exception as the first argument. + + Event publishing is skipped if an "event_manager" keyword argument is not provided or is None. Payloads produced by the hooks are encoded with EventEncoder; payloads may include custom metrics under the 'langflow' key within 'raw_events'. Missing hook implementations are ignored (no error). + + Returns: + A wrapper async function that preserves the original method's metadata, invokes the described lifecycle hooks at the appropriate times, and returns the original method's result. + """ + + async def check_event_manager(self, **kwargs): + """Verify that an EventManager is provided in the call's keyword arguments. + + Parameters: + **kwargs: Keyword arguments from the observed method call; expected to include an `event_manager` entry. + + Returns: + bool: `true` if an `event_manager` keyword with a non-None value is present, `false` otherwise. + + Notes: + Logs a warning when the `event_manager` is missing or None. + """ + if "event_manager" not in kwargs or kwargs["event_manager"] is None: + await logger.awarning( + f"EventManager not available/provided, skipping observable event publishing " + f"from {self.__class__.__name__}" + ) + return False + return True + + async def before_callback(self, *args, **kwargs): + """Invoke the instance's `before_callback_event` hook (if implemented), encode its returned payload, and prepare it for publishing. + + Checks that a non-None `event_manager` is present in `kwargs` before proceeding; if absent, the function logs a warning and returns without action. If `before_callback_event` exists on the instance, calls it with `*args` and `**kwargs`, encodes the resulting payload with `EventEncoder`, and leaves the payload ready to be published (publishing is not implemented here). If the hook is not implemented, logs a warning and skips publishing. + + Parameters: + *args: Positional arguments forwarded to `before_callback_event`. + **kwargs: Keyword arguments forwarded to `before_callback_event` (must include `event_manager` to enable publishing). + """ + if not await check_event_manager(self, **kwargs): + return + + if hasattr(self, "before_callback_event"): + event_payload = self.before_callback_event(*args, **kwargs) + encoder = EventEncoder() + event_payload = encoder.encode(event_payload) + # TODO: Publish event + else: + await logger.awarning( + f"before_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing." + ) + + async def after_callback(self, res: Any | None = None, *args, **kwargs): + """Handle the post-execution lifecycle event by encoding and publishing the payload produced by the host's `after_callback_event` hook. + + If an `event_manager` is not present in kwargs, the function does nothing. If the hosting object defines `after_callback_event`, that hook is called with the method result and any forwarded arguments; its returned payload is encoded via EventEncoder and prepared for publishing (actual publish is TODO). If `after_callback_event` is not implemented, a warning is logged and no event is published. + + Parameters: + res (Any | None): The result returned by the observed method; may be None. + *args: Positional arguments forwarded to the host's `after_callback_event` hook. + **kwargs: Keyword arguments forwarded to the host's `after_callback_event` hook; used to locate `event_manager`. + """ + if not await check_event_manager(self, **kwargs): + return + if hasattr(self, "after_callback_event"): + event_payload = self.after_callback_event(res, *args, **kwargs) + encoder = EventEncoder() + event_payload = encoder.encode(event_payload) + # TODO: Publish event + else: + await logger.awarning( + f"after_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing." + ) + + @functools.wraps(observed_method) + async def wrapper(self, *args, **kwargs): + """Wraps the original async method to emit lifecycle events before execution, after successful completion, and on error. + + The wrapper calls a before callback (if implemented) prior to invoking the original method, an after callback (if implemented) after a successful call, and an error callback (if implemented) when the wrapped method raises. Event payloads produced by these callbacks are encoded via EventEncoder; publishing is performed elsewhere. + + Parameters: + *args: Positional arguments forwarded to the before/after/error callbacks and the wrapped method. + **kwargs: Keyword arguments forwarded to the before/after/error callbacks and the wrapped method. + + Returns: + The result returned by the wrapped async method. + + Raises: + Exception: Re-raises any exception thrown by the wrapped method after attempting to handle and encode an error event. + """ + await before_callback(self, *args, **kwargs) + result = None + try: + result = await observed_method(self, *args, **kwargs) + await after_callback(self, result, *args, **kwargs) + except Exception as e: + await logger.aerror(f"Exception in {self.__class__.__name__}: {e}") + if hasattr(self, "error_callback_event"): + try: + event_payload = self.error_callback_event(e, *args, **kwargs) + encoder = EventEncoder() + event_payload = encoder.encode(event_payload) + # TODO: Publish error event + except Exception as callback_e: + await logger.aerror( + f"Exception during error_callback_event for {self.__class__.__name__}: {callback_e}" + ) + raise e + return result + + return wrapper diff --git a/src/lfx/src/lfx/graph/graph/base.py b/src/lfx/src/lfx/graph/graph/base.py index b6e969465ed2..0c5630aa0274 100644 --- a/src/lfx/src/lfx/graph/graph/base.py +++ b/src/lfx/src/lfx/graph/graph/base.py @@ -15,6 +15,9 @@ from itertools import chain from typing import TYPE_CHECKING, Any, cast +from ag_ui.core import RunFinishedEvent, RunStartedEvent + +from lfx.events.observability.lifecycle_events import observable from lfx.exceptions.component import ComponentBuildError from lfx.graph.edge.base import CycleEdge, Edge from lfx.graph.graph.constants import Finish, lazy_load_vertex_dict @@ -708,11 +711,17 @@ def define_vertices_lists(self) -> None: self._is_state_vertices.append(vertex.id) def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input_type: InputType | None) -> None: - """Updates input vertices' parameters with the provided inputs, filtering by component list and input type. + """Update parameters of input vertices that match the specified components and input type. + + Only vertices listed in `self._is_input_vertices` are considered. If `input_components` is provided, a vertex is updated only if its id or display_name appears in that list; if `input_components` is empty, all input vertices are considered. If `input_type` is provided and not equal to the string `"any"`, a vertex is updated only if `input_type` appears in the vertex's id (case-insensitive). Updated parameters replace existing values. - Only vertices whose IDs or display names match the specified input components and whose IDs contain - the input type (unless input type is 'any' or None) are updated. Raises a ValueError if a specified - vertex is not found. + Parameters: + input_components (list[str]): Component ids or display names to restrict which input vertices to update. An empty list means all input vertices. + inputs (dict[str, str]): Mapping of input parameter names to values to apply to matching vertices. + input_type (InputType | None): Optional type filter; when not `None` and not `"any"`, only vertices whose id contains this type (case-insensitive) are updated. + + Raises: + ValueError: If a vertex id listed in `self._is_input_vertices` cannot be found. """ for vertex_id in self._is_input_vertices: vertex = self.get_vertex(vertex_id) @@ -728,6 +737,7 @@ def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input raise ValueError(msg) vertex.update_raw_params(inputs, overwrite=True) + @observable async def _run( self, *, @@ -740,20 +750,16 @@ async def _run( fallback_to_env_vars: bool, event_manager: EventManager | None = None, ) -> list[ResultData | None]: - """Runs the graph with the given inputs. + """Execute the graph with the provided inputs and collect requested outputs. - Args: - inputs (Dict[str, str]): The input values for the graph. - input_components (list[str]): The components to run for the inputs. - input_type: (Optional[InputType]): The input type. - outputs (list[str]): The outputs to retrieve from the graph. - stream (bool): Whether to stream the results or not. - session_id (str): The session ID for the graph. - fallback_to_env_vars (bool): Whether to fallback to environment variables. - event_manager (EventManager | None): The event manager for the graph. + Runs a single graph execution: applies input values, tags vertices with the session ID, optionally stores the graph in the chat cache, executes the processing pipeline, finalizes tracing, and gathers results from output vertices or explicitly requested outputs. Returns: - List[Optional["ResultData"]]: The outputs of the graph. + A list containing each collected vertex result in execution order; entries correspond to outputs produced by output vertices or those named in `outputs`. + + Raises: + TypeError: If the primary input value is not a string. + ValueError: If `input_components` is not a list, if a required vertex cannot be found, or if an error occurs during graph processing. """ if input_components and not isinstance(input_components, list): msg = f"Invalid components value: {input_components}. Expected list" @@ -2288,7 +2294,15 @@ def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]], return predecessor_map, successor_map def __to_dict(self) -> dict[str, dict[str, list[str]]]: - """Converts the graph to a dictionary.""" + """Return a mapping of each vertex id to its predecessor and successor vertex id lists. + + Produces a dictionary where each key is a vertex id and the value is a dict with two keys: + "successors" — list of ids of all successor vertices reachable from that vertex; + "predecessors" — list of ids of immediate predecessor vertices for that vertex. + + Returns: + graph_map (dict[str, dict[str, list[str]]]): Mapping of vertex id -> {"successors": [...], "predecessors": [...]}. + """ result: dict = {} for vertex in self.vertices: vertex_id = vertex.id @@ -2296,3 +2310,47 @@ def __to_dict(self) -> dict[str, dict[str, list[str]]]: predecessors = [i.id for i in self.get_predecessors(vertex)] result |= {vertex_id: {"successors": sucessors, "predecessors": predecessors}} return result + + def raw_event_metrics(self, optional_fields: dict | None = None) -> dict: + """Create a timestamped metrics dictionary and merge in any provided optional fields. + + Parameters: + optional_fields (dict | None): Additional key-value pairs to include in the metrics. If omitted, no extra fields are added. + + Returns: + dict: A dictionary containing a numeric `timestamp` (seconds since the epoch) plus any keys from `optional_fields`. + """ + if optional_fields is None: + optional_fields = {} + import time + + return {"timestamp": time.time(), **optional_fields} + + def before_callback_event(self, *args, **kwargs) -> RunStartedEvent: # noqa: ARG002 + """Create a RunStartedEvent capturing the current run id, flow id, and optional raw metrics. + + If the Graph implements `raw_event_metrics`, that method is called with `{"total_components": }` and its result is included as the event's `raw_event`; otherwise `raw_event` is an empty dict. + + Returns: + RunStartedEvent: Event containing `run_id`, `thread_id` (flow id), and `raw_event` metrics. + """ + metrics = {} + if hasattr(self, "raw_event_metrics"): + metrics = self.raw_event_metrics({"total_components": len(self.vertices)}) + return RunStartedEvent(run_id=self._run_id, thread_id=self.flow_id, raw_event=metrics) + + def after_callback_event(self, result: Any = None, *args, **kwargs) -> RunFinishedEvent: # noqa: ARG002 + """Create a RunFinishedEvent for the current run, attaching run identifiers and optional metrics. + + Parameters: + result (Any): Final result of the run (ignored when constructing the event). + *args: Ignored. + **kwargs: Ignored. + + Returns: + RunFinishedEvent: Event populated with the graph's run_id, flow_id as thread_id, `result` set to None, and `raw_event` metrics (includes `total_components` when available). + """ + metrics = {} + if hasattr(self, "raw_event_metrics"): + metrics = self.raw_event_metrics({"total_components": len(self.vertices)}) + return RunFinishedEvent(run_id=self._run_id, thread_id=self.flow_id, result=None, raw_event=metrics) diff --git a/src/lfx/src/lfx/graph/vertex/base.py b/src/lfx/src/lfx/graph/vertex/base.py index 1d8cdbb595ce..32329e3eb129 100644 --- a/src/lfx/src/lfx/graph/vertex/base.py +++ b/src/lfx/src/lfx/graph/vertex/base.py @@ -8,6 +8,9 @@ from enum import Enum from typing import TYPE_CHECKING, Any +from ag_ui.core import StepFinishedEvent, StepStartedEvent + +from lfx.events.observability.lifecycle_events import observable from lfx.exceptions.component import ComponentBuildError from lfx.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData from lfx.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction @@ -168,6 +171,15 @@ def get_built_result(self): # If the Vertex.type is a power component # then we need to return the built object # instead of the result dict + """Return the vertex's public build result, normalizing interface-component outputs and unbuilt markers. + + Returns: + The public result for the vertex: + - For interface components, returns the built object (or `built_object.content` if the object exposes a `content` attribute). + - If `built_object` is a string, it is used as `built_result`. + - If `built_result` is an `UnbuiltResult`, returns an empty dict `{}`. + - If `built_result` is a dict, returns it unchanged; otherwise returns `{"result": built_result}`. + """ if self.is_interface_component and not isinstance(self.built_object, UnbuiltObject): result = self.built_object # if it is not a dict or a string and hasattr model_dump then @@ -180,10 +192,14 @@ def get_built_result(self): if isinstance(self.built_result, UnbuiltResult): return {} + return self.built_result if isinstance(self.built_result, dict) else {"result": self.built_result} def set_artifacts(self) -> None: - pass + """No-op placeholder that attaches build artifacts to the vertex. + + Intended to store or update artifacts produced during a component's build; current implementation performs no action. + """ @property def edges(self) -> list[CycleEdge]: @@ -375,22 +391,35 @@ def update_raw_params(self, new_params: Mapping[str, str | list[str]], *, overwr self.updated_raw_params = True def instantiate_component(self, user_id=None) -> None: + """Lazily instantiate and attach the component instance for this vertex. + + If the vertex does not already have a component instance, calls the loader to create one and stores it on self.custom_component. Accepts an optional user_id to provide user context during instantiation. + """ if not self.custom_component: self.custom_component, _ = initialize.loading.instantiate_class( user_id=user_id, vertex=self, ) + @observable async def _build( self, fallback_to_env_vars, user_id=None, event_manager: EventManager | None = None, ) -> None: - """Initiate the build process.""" + """Build this vertex by preparing params, instantiating the component, running the component build, and marking the vertex as built. + + Parameters: + fallback_to_env_vars (bool): If True, allow environment variables as fallback values for missing parameters. + user_id (Optional[str]): User identifier to use when instantiating the component. + event_manager (Optional[EventManager]): Event manager to attach to the instantiated component and to forward to the loader. + + Raises: + ValueError: If the vertex's base type is not found. + """ await logger.adebug(f"Building {self.display_name}") await self._build_each_vertex_in_params_dict() - if self.base_type is None: msg = f"Base type for vertex {self.display_name} not found" raise ValueError(msg) @@ -819,8 +848,60 @@ def built_object_repr(self) -> str: return "Built successfully ✨" if self.built_object is not None else "Failed to build 😵‍💫" def apply_on_outputs(self, func: Callable[[Any], Any]) -> None: - """Applies a function to the outputs of the vertex.""" + """Apply a function to each output value provided by the vertex's custom component. + + Parameters: + func (Callable[[Any], Any]): Function to be called with each output value; return values are ignored. + + """ if not self.custom_component or not self.custom_component.outputs: return # Apply the function to each output [func(output) for output in self.custom_component.get_outputs_map().values()] + + # AGUI/AG UI Event Streaming Callbacks/Methods - (Optional, see Observable decorator) + def raw_event_metrics(self, optional_fields: dict | None) -> dict: + """Compose a metrics payload containing a timestamp and any provided optional fields. + + Parameters: + optional_fields (dict | None): Additional key/value pairs to include in the payload; None is treated as an empty dict. + + Returns: + dict: A dictionary with a "timestamp" key (float seconds since the epoch) merged with the provided optional fields. + """ + if optional_fields is None: + optional_fields = {} + import time + + return {"timestamp": time.time(), **optional_fields} + + def before_callback_event(self, *args, **kwargs) -> StepStartedEvent: # noqa: ARG002 + """Create an AGUI StepStartedEvent representing this vertex starting execution. + + The event contains an AGUI-compatible payload under `raw_event` with a `langflow` + metrics object. The metrics include `component_id` (this vertex's id) and any + additional entries provided by the vertex's `raw_event_metrics` method, if present. + + Returns: + StepStartedEvent: event with `step_name` set to the vertex display name and + `raw_event` containing the `langflow` metrics. + """ + metrics = {} + if hasattr(self, "raw_event_metrics"): + metrics = self.raw_event_metrics({"component_id": self.id}) + + return StepStartedEvent(step_name=self.display_name, raw_event={"langflow": metrics}) + + def after_callback_event(self, result, *args, **kwargs) -> StepFinishedEvent: # noqa: ARG002 + """Create an AGUI-compatible StepFinishedEvent for this vertex. + + Parameters: + result: The final result produced by the vertex. + + Returns: + StepFinishedEvent: Event with `step_name` set to the vertex `display_name` and `raw_event` containing a `langflow` metrics payload (includes `component_id` when available). + """ + metrics = {} + if hasattr(self, "raw_event_metrics"): + metrics = self.raw_event_metrics({"component_id": self.id}) + return StepFinishedEvent(step_name=self.display_name, raw_event={"langflow": metrics}) diff --git a/src/lfx/tests/unit/events/observability/__init__.py b/src/lfx/tests/unit/events/observability/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/lfx/tests/unit/events/observability/test_lifecycle_events.py b/src/lfx/tests/unit/events/observability/test_lifecycle_events.py new file mode 100644 index 000000000000..ba08b8b1a597 --- /dev/null +++ b/src/lfx/tests/unit/events/observability/test_lifecycle_events.py @@ -0,0 +1,320 @@ +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# Import the actual decorator we want to test +from lfx.events.observability.lifecycle_events import observable + + +# Mock classes for dependencies +class MockEventManager: + """Mock for lfx.events.event_manager.EventManager.""" + + def __init__(self): + # We'll use AsyncMock for publish + """Create a mock event manager with an asynchronous `publish` method. + + Initializes the instance by assigning `self.publish` to an `AsyncMock` so tests can await and assert calls to event publishing. + """ + self.publish = AsyncMock() + + +class MockLogger: + """Mock for lfx.log.logger.logger.""" + + def __init__(self): + """Initialize a mock logger with asynchronous warning and error callables. + + Creates two async-capable attributes used in tests: + - `awarning`: mock for async warning calls. + - `aerror`: mock for async error calls. + """ + self.awarning = AsyncMock() + self.aerror = AsyncMock() + + +# --- Pytest Fixtures --- + + +@pytest.fixture +def mock_dependencies(): + """Create and patch test doubles for the logger, event manager, and event encoder used by lifecycle_events. + + Patches lfx.events.observability.lifecycle_events.logger with a MockLogger instance and patches EventEncoder with a mock class that returns an encoder whose `encode` method returns {"encoded": True, "original_event": payload}. Yields a dictionary containing the prepared mocks: + + Yields: + dict: A mapping with keys: + - event_manager: MockEventManager instance used to simulate publishing. + - logger: MockLogger instance with async warning/error methods. + - encoder_cls: MagicMock that returns the mock encoder instance when called. + """ + # 1. Logger Mock + mock_logger_instance = MockLogger() + + # 2. EventManager Mock + mock_event_manager = MockEventManager() + + # 3. Encoder Mock - create a mock instance with a mocked encode method + mock_encoder_instance = MagicMock() + mock_encoder_instance.encode = MagicMock(side_effect=lambda payload: {"encoded": True, "original_event": payload}) + + # Mock the EventEncoder class to return our mock instance + mock_encoder_cls = MagicMock(return_value=mock_encoder_instance) + + # Patch the actual imports in the lifecycle_events module + with ( + patch("lfx.events.observability.lifecycle_events.logger", mock_logger_instance), + patch("lfx.events.observability.lifecycle_events.EventEncoder", mock_encoder_cls), + ): + yield { + "event_manager": mock_event_manager, + "logger": mock_logger_instance, + "encoder_cls": mock_encoder_cls, + } + + +@pytest.fixture(autouse=True) +def reset_mocks(mock_dependencies): + """Resets the state of the mocks before each test.""" + # Ensure all mocks are reset before test execution + mock_dependencies["logger"].awarning.reset_mock() + mock_dependencies["logger"].aerror.reset_mock() + mock_dependencies["encoder_cls"].reset_mock() + mock_dependencies["event_manager"].publish.reset_mock() + + +# --- Test Classes (remain largely the same, but now used by pytest functions) --- + + +class TestClassWithCallbacks: + display_name = "ObservableTest" + + def before_callback_event(self, *args, **kwargs): + """Builds a payload describing a "start" lifecycle event for the observable method. + + Parameters: + *args: Positional arguments passed to the observable method. + **kwargs: Keyword arguments passed to the observable method. + + Returns: + dict: Payload with keys: + - "lifecycle": "start" + - "args_len": number of positional arguments + - "kw_keys": list of keyword argument names + """ + return {"lifecycle": "start", "args_len": len(args), "kw_keys": list(kwargs.keys())} + + def after_callback_event(self, result: Any, *args, **kwargs): # noqa: ARG002 + """Create the "end" lifecycle event payload containing the method result and the names of keyword arguments. + + Parameters: + result: The value returned by the wrapped method. + *args: Positional arguments passed to the wrapped method (ignored in payload). + **kwargs: Keyword arguments passed to the wrapped method; their keys are recorded. + + Returns: + dict: Payload with keys: + - "lifecycle": "end" + - "result": the provided `result` + - "kw_keys": list of keyword argument names from `kwargs` + """ + return {"lifecycle": "end", "result": result, "kw_keys": list(kwargs.keys())} + + def error_callback_event(self, exception: Exception, *args, **kwargs): # noqa: ARG002 + """Builds an "error" lifecycle payload from an exception and the provided keyword arguments. + + Parameters: + exception (Exception): The exception to extract message and type from. + *args: Ignored positional arguments. + **kwargs: Keyword arguments whose keys will be included in the payload. + + Returns: + dict: Payload with keys: + - `lifecycle`: the string "error". + - `error`: the exception message (string). + - `error_type`: the exception class name (string). + - `kw_keys`: list of keys from `kwargs`. + """ + return { + "lifecycle": "error", + "error": str(exception), + "error_type": type(exception).__name__, + "kw_keys": list(kwargs.keys()), + } + + # Mock observable method + @observable + async def run_success(self, event_manager: MockEventManager, data: str) -> str: # noqa: ARG002 + """Process input data and return it prefixed with "Processed:". + + Returns: + result (str): The input `data` formatted as "Processed:{data}". + """ + await asyncio.sleep(0.001) + return f"Processed:{data}" + + @observable + async def run_exception(self, event_manager: MockEventManager, data: str) -> str: # noqa: ARG002 + """Test helper that sleeps briefly then raises a ValueError. + + Raises: + ValueError: Always raised after the brief delay. + """ + await asyncio.sleep(0.001) + raise ValueError + + +class TestClassWithoutCallbacks: + display_name = "NonObservableTest" + + @observable + async def run_success(self, event_manager: MockEventManager, data: str) -> str: # noqa: ARG002 + """Process input data and return it prefixed with "Processed:". + + Returns: + result (str): The input `data` formatted as "Processed:{data}". + """ + await asyncio.sleep(0.001) + return f"Processed:{data}" + + +# --- Pytest Test Functions --- + + +# Use pytest.mark.asyncio for running async functions +@pytest.mark.asyncio +async def test_successful_run_with_callbacks(mock_dependencies): + instance = TestClassWithCallbacks() + data = "test_data" + + event_manager = mock_dependencies["event_manager"] + + result = await instance.run_success(event_manager=event_manager, data=data) + + # 1. Assert result + assert result == f"Processed:{data}" + + # 2. Assert encoder was called twice (once for BEFORE, once for AFTER) + assert mock_dependencies["encoder_cls"].call_count == 2 + + # 3. Verify the encoder was called with the correct payloads + encoder_instance = mock_dependencies["encoder_cls"].return_value + assert encoder_instance.encode.call_count == 2 + + # Get the actual calls to encode + encode_calls = encoder_instance.encode.call_args_list + + # First call should be the BEFORE event + before_payload = encode_calls[0][0][0] + assert before_payload["lifecycle"] == "start" + assert before_payload["args_len"] == 0 + assert "event_manager" in before_payload["kw_keys"] + assert "data" in before_payload["kw_keys"] + + # Second call should be the AFTER event + after_payload = encode_calls[1][0][0] + assert after_payload["lifecycle"] == "end" + assert after_payload["result"] == f"Processed:{data}" + assert "event_manager" in after_payload["kw_keys"] + assert "data" in after_payload["kw_keys"] + + # 4. Assert no warnings or errors were logged + mock_dependencies["logger"].awarning.assert_not_called() + mock_dependencies["logger"].aerror.assert_not_called() + + +@pytest.mark.asyncio +async def test_exception_run_with_callbacks(mock_dependencies): + instance = TestClassWithCallbacks() + + event_manager = mock_dependencies["event_manager"] + + # The decorator now re-raises the exception after logging and encoding the error event + with pytest.raises(ValueError): + await instance.run_exception(event_manager=event_manager, data="fail_data") + + # 1. Assert error was logged + mock_dependencies["logger"].aerror.assert_called_once() + mock_dependencies["logger"].aerror.assert_called_with("Exception in TestClassWithCallbacks: ") + + # 2. Assert encoder was called twice (once for BEFORE event, once for ERROR event) + assert mock_dependencies["encoder_cls"].call_count == 2 + + # 3. Verify the encoder was called with the correct payloads + encoder_instance = mock_dependencies["encoder_cls"].return_value + assert encoder_instance.encode.call_count == 2 + + # Get the actual calls to encode + encode_calls = encoder_instance.encode.call_args_list + + # First call should be the BEFORE event + before_payload = encode_calls[0][0][0] + assert before_payload["lifecycle"] == "start" + + # Second call should be the ERROR event + error_payload = encode_calls[1][0][0] + assert error_payload["lifecycle"] == "error" + assert error_payload["error"] == "" + assert error_payload["error_type"] == "ValueError" + + # 4. Assert no warnings were logged + mock_dependencies["logger"].awarning.assert_not_called() + + +@pytest.mark.asyncio +async def test_run_without_event_manager(mock_dependencies): + """Run TestClassWithCallbacks.run_success with no EventManager and verify behavior. + + Parameters: + mock_dependencies (dict): Fixture-provided mocks including 'logger', 'event_manager', and 'encoder_cls'. + + Details: + - Calls run_success with event_manager set to None and verifies the method returns the expected processed string. + - Verifies the logger.awarning was called twice with a message indicating the EventManager is not available. + """ + instance = TestClassWithCallbacks() + data = "no_manager" + + # No event_manager passed (or explicitly passed as None) + result = await instance.run_success(event_manager=None, data=data) + + # 1. Assert result is correct + assert result == f"Processed:{data}" + + # 2. Assert warning for missing EventManager was logged twice (once for before, once for after) + assert mock_dependencies["logger"].awarning.call_count == 2 + mock_dependencies["logger"].awarning.assert_any_call( + "EventManager not available/provided, skipping observable event publishing from TestClassWithCallbacks" + ) + + +@pytest.mark.asyncio +async def test_run_without_callbacks(mock_dependencies): + """Verify behavior when an observable-decorated method is called on a class that does not implement lifecycle callbacks. + + Checks that the method returns its expected result, that two warning messages are logged for the missing `before_callback_event` and `after_callback_event` callbacks, and that no error logs are emitted. + """ + instance = TestClassWithoutCallbacks() + data = "no_callbacks" + + event_manager = mock_dependencies["event_manager"] + + # Run the method with a manager + result = await instance.run_success(event_manager=event_manager, data=data) + + # 1. Assert result is correct + assert result == f"Processed:{data}" + + # 2. Assert warnings for missing callbacks were logged + assert mock_dependencies["logger"].awarning.call_count == 2 + mock_dependencies["logger"].awarning.assert_any_call( + "before_callback_event not implemented for TestClassWithoutCallbacks. Skipping event publishing." + ) + mock_dependencies["logger"].awarning.assert_any_call( + "after_callback_event not implemented for TestClassWithoutCallbacks. Skipping event publishing." + ) + + # 3. Assert no errors were logged + mock_dependencies["logger"].aerror.assert_not_called() diff --git a/src/lfx/tests/unit/graph/graph/test_base.py b/src/lfx/tests/unit/graph/graph/test_base.py index dddb61689a01..4081f4c89d8f 100644 --- a/src/lfx/tests/unit/graph/graph/test_base.py +++ b/src/lfx/tests/unit/graph/graph/test_base.py @@ -1,6 +1,7 @@ from collections import deque import pytest +from ag_ui.core import RunFinishedEvent, RunStartedEvent from lfx.components.input_output import ChatInput, ChatOutput, TextOutputComponent from lfx.graph import Graph from lfx.graph.graph.constants import Finish @@ -111,3 +112,117 @@ def test_graph_set_with_valid_component(): tool = YfinanceToolComponent() tool_calling_agent = ToolCallingAgentComponent() tool_calling_agent.set(tools=[tool]) + + +def test_graph_before_callback_event(): + """Test that before_callback_event generates the correct RunStartedEvent payload.""" + # Create a simple graph with two components and a flow_id + chat_input = ChatInput(_id="chat_input") + chat_output = ChatOutput(input_value="test", _id="chat_output") + chat_output.set(sender_name=chat_input.message_response) + graph = Graph(chat_input, chat_output, flow_id="test_flow_id") + + # Call before_callback_event + event = graph.before_callback_event() + + # Assert the event is a RunStartedEvent + assert isinstance(event, RunStartedEvent) + + # Assert the event has the correct run_id and thread_id + assert event.run_id == graph._run_id + assert event.thread_id == graph.flow_id + assert event.thread_id == "test_flow_id" + + # Assert the raw_event contains metrics + assert event.raw_event is not None + assert isinstance(event.raw_event, dict) + + # Assert the raw_event contains timestamp + assert "timestamp" in event.raw_event + assert isinstance(event.raw_event["timestamp"], float) + + # Assert the raw_event contains total_components + assert "total_components" in event.raw_event + assert event.raw_event["total_components"] == len(graph.vertices) + assert event.raw_event["total_components"] == 2 # chat_input and chat_output + + +def test_graph_after_callback_event(): + """Test that after_callback_event generates the correct RunFinishedEvent payload.""" + # Create a simple graph with two components and a flow_id + chat_input = ChatInput(_id="chat_input") + chat_output = ChatOutput(input_value="test", _id="chat_output") + chat_output.set(sender_name=chat_input.message_response) + graph = Graph(chat_input, chat_output, flow_id="test_flow_id") + + # Call after_callback_event + event = graph.after_callback_event(result="test_result") + + # Assert the event is a RunFinishedEvent + assert isinstance(event, RunFinishedEvent) + + # Assert the event has the correct run_id and thread_id + assert event.run_id == graph._run_id + assert event.thread_id == graph.flow_id + assert event.thread_id == "test_flow_id" + + # Assert the result is None (as per the implementation) + assert event.result is None + + # Assert the raw_event contains metrics + assert event.raw_event is not None + assert isinstance(event.raw_event, dict) + + # Assert the raw_event contains timestamp + assert "timestamp" in event.raw_event + assert isinstance(event.raw_event["timestamp"], float) + + # Assert the raw_event contains total_components + assert "total_components" in event.raw_event + assert event.raw_event["total_components"] == len(graph.vertices) + assert event.raw_event["total_components"] == 2 # chat_input and chat_output + + +def test_graph_raw_event_metrics(): + """Test that raw_event_metrics generates the correct metrics dictionary.""" + # Create a simple graph with flow_id + chat_input = ChatInput(_id="chat_input") + chat_output = ChatOutput(input_value="test", _id="chat_output") + chat_output.set(sender_name=chat_input.message_response) + graph = Graph(chat_input, chat_output, flow_id="test_flow_id") + + # Call raw_event_metrics with optional fields + metrics = graph.raw_event_metrics({"custom_field": "custom_value"}) + + # Assert metrics is a dictionary + assert isinstance(metrics, dict) + + # Assert timestamp is present and is a float + assert "timestamp" in metrics + assert isinstance(metrics["timestamp"], float) + + # Assert custom field is present + assert "custom_field" in metrics + assert metrics["custom_field"] == "custom_value" + + +def test_graph_raw_event_metrics_no_optional_fields(): + """Test that raw_event_metrics works without optional fields.""" + # Create a simple graph with flow_id + chat_input = ChatInput(_id="chat_input") + chat_output = ChatOutput(input_value="test", _id="chat_output") + chat_output.set(sender_name=chat_input.message_response) + graph = Graph(chat_input, chat_output, flow_id="test_flow_id") + + # Call raw_event_metrics without optional fields + metrics = graph.raw_event_metrics() + + # Assert metrics is a dictionary + assert isinstance(metrics, dict) + + # Assert timestamp is present and is a float + assert "timestamp" in metrics + assert isinstance(metrics["timestamp"], float) + + # Assert only timestamp is present (no optional fields) + assert len(metrics) == 1 diff --git a/src/lfx/tests/unit/graph/vertex/test_vertex_base.py b/src/lfx/tests/unit/graph/vertex/test_vertex_base.py index f1e1ea2623cf..27271650210d 100644 --- a/src/lfx/tests/unit/graph/vertex/test_vertex_base.py +++ b/src/lfx/tests/unit/graph/vertex/test_vertex_base.py @@ -7,6 +7,8 @@ from unittest.mock import Mock import pytest +from ag_ui.core import StepFinishedEvent, StepStartedEvent +from lfx.components.input_output import ChatInput from lfx.graph.edge.base import Edge from lfx.graph.vertex.base import ParameterHandler, Vertex from lfx.services.storage.service import StorageService @@ -263,3 +265,133 @@ def test_process_field_parameters_table_field_invalid(parameter_handler, mock_ve with pytest.raises(ValueError, match="Invalid value type"): parameter_handler.process_field_parameters() + + +def test_vertex_before_callback_event(): + """Test that Vertex.before_callback_event generates the correct StepStartedEvent payload.""" + # Create a graph with a ChatInput component, which creates a vertex + from lfx.graph import Graph + + chat_input = ChatInput(_id="test_vertex_id") + chat_output = ChatInput(_id="output_id") # Need two components for Graph + graph = Graph(chat_input, chat_output, flow_id="test_flow") + + # Get the vertex from the graph + vertex = graph.vertices[0] # First vertex should be chat_input + assert vertex.id == "test_vertex_id" + + # Call before_callback_event + event = vertex.before_callback_event() + + # Assert the event is a StepStartedEvent + assert isinstance(event, StepStartedEvent) + + # Assert the event has the correct step_name + assert event.step_name == vertex.display_name + + # Assert the raw_event contains the langflow metrics + assert event.raw_event is not None + assert isinstance(event.raw_event, dict) + assert "langflow" in event.raw_event + + # Assert the langflow metrics contain expected fields + langflow_metrics = event.raw_event["langflow"] + assert isinstance(langflow_metrics, dict) + assert "timestamp" in langflow_metrics + assert isinstance(langflow_metrics["timestamp"], float) + assert "component_id" in langflow_metrics + assert langflow_metrics["component_id"] == vertex.id + assert langflow_metrics["component_id"] == "test_vertex_id" + + +def test_vertex_after_callback_event(): + """Test that Vertex.after_callback_event generates the correct StepFinishedEvent payload.""" + # Create a graph with a ChatInput component, which creates a vertex + from lfx.graph import Graph + + chat_input = ChatInput(_id="test_vertex_id") + chat_output = ChatInput(_id="output_id") # Need two components for Graph + graph = Graph(chat_input, chat_output, flow_id="test_flow") + + # Get the vertex from the graph + vertex = graph.vertices[0] # First vertex should be chat_input + assert vertex.id == "test_vertex_id" + + # Call after_callback_event with a result + test_result = "test_result_value" + event = vertex.after_callback_event(result=test_result) + + # Assert the event is a StepFinishedEvent + assert isinstance(event, StepFinishedEvent) + + # Assert the event has the correct step_name + assert event.step_name == vertex.display_name + + # Assert the raw_event contains the langflow metrics + assert event.raw_event is not None + assert isinstance(event.raw_event, dict) + assert "langflow" in event.raw_event + + # Assert the langflow metrics contain expected fields + langflow_metrics = event.raw_event["langflow"] + assert isinstance(langflow_metrics, dict) + assert "timestamp" in langflow_metrics + assert isinstance(langflow_metrics["timestamp"], float) + assert "component_id" in langflow_metrics + assert langflow_metrics["component_id"] == vertex.id + assert langflow_metrics["component_id"] == "test_vertex_id" + + +def test_vertex_raw_event_metrics(): + """Test that Vertex.raw_event_metrics generates the correct metrics dictionary.""" + # Create a graph with a ChatInput component, which creates a vertex + from lfx.graph import Graph + + chat_input = ChatInput(_id="test_vertex_id") + chat_output = ChatInput(_id="output_id") # Need two components for Graph + graph = Graph(chat_input, chat_output, flow_id="test_flow") + + # Get the vertex from the graph + vertex = graph.vertices[0] # First vertex should be chat_input + assert vertex.id == "test_vertex_id" + + # Call raw_event_metrics with optional fields + metrics = vertex.raw_event_metrics({"custom_field": "custom_value"}) + + # Assert metrics is a dictionary + assert isinstance(metrics, dict) + + # Assert timestamp is present and is a float + assert "timestamp" in metrics + assert isinstance(metrics["timestamp"], float) + + # Assert custom field is present + assert "custom_field" in metrics + assert metrics["custom_field"] == "custom_value" + + +def test_vertex_raw_event_metrics_no_optional_fields(): + """Test that Vertex.raw_event_metrics works without optional fields.""" + # Create a graph with a ChatInput component, which creates a vertex + from lfx.graph import Graph + + chat_input = ChatInput(_id="test_vertex_id") + chat_output = ChatInput(_id="output_id") # Need two components for Graph + graph = Graph(chat_input, chat_output, flow_id="test_flow") + + # Get the vertex from the graph + vertex = graph.vertices[0] # First vertex should be chat_input + assert vertex.id == "test_vertex_id" + + # Call raw_event_metrics without optional fields (pass None) + metrics = vertex.raw_event_metrics(None) + + # Assert metrics is a dictionary + assert isinstance(metrics, dict) + + # Assert timestamp is present and is a float + assert "timestamp" in metrics + assert isinstance(metrics["timestamp"], float) + + # The metrics should contain only timestamp when no optional fields are provided + assert len(metrics) == 1 diff --git a/src/lfx/tests/unit/services/settings/test_mcp_composer.py b/src/lfx/tests/unit/services/settings/test_mcp_composer.py index 7cd4dbfc41eb..14fd44683b1c 100644 --- a/src/lfx/tests/unit/services/settings/test_mcp_composer.py +++ b/src/lfx/tests/unit/services/settings/test_mcp_composer.py @@ -364,9 +364,7 @@ async def test_legacy_sse_url_preserved_in_composer_state(self, mcp_service): kwargs = mock_start.call_args.kwargs assert kwargs["legacy_sse_url"] == legacy_url assert mcp_service.project_composers[project_id]["legacy_sse_url"] == legacy_url - assert ( - mcp_service.project_composers[project_id]["streamable_http_url"] == streamable_url - ) + assert mcp_service.project_composers[project_id]["streamable_http_url"] == streamable_url @pytest.mark.asyncio async def test_legacy_sse_url_defaults_when_not_provided(self, mcp_service): diff --git a/uv.lock b/uv.lock index 5f01f43f7a85..c03f6d2e3ffe 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10, <3.14" resolution-markers = [ "python_full_version >= '3.13' and platform_machine == 'arm64' and sys_platform == 'darwin'", @@ -49,6 +49,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/d2/c581486aa6c4fbd7394c23c47b83fa1a919d34194e16944241daf9e762dd/accelerate-1.12.0-py3-none-any.whl", hash = "sha256:3e2091cd341423207e2f084a6654b1efcd250dc326f2a37d6dde446e07cabb11", size = 380935, upload-time = "2025-11-21T11:27:44.522Z" }, ] +[[package]] +name = "ag-ui-protocol" +version = "0.1.10" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/bb/5a5ec893eea5805fb9a3db76a9888c3429710dfb6f24bbb37568f2cf7320/ag_ui_protocol-0.1.10.tar.gz", hash = "sha256:3213991c6b2eb24bb1a8c362ee270c16705a07a4c5962267a083d0959ed894f4", size = 6945, upload-time = "2025-11-06T15:17:17.068Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/8f/78/eb55fabaab41abc53f52c0918a9a8c0f747807e5306273f51120fd695957/ag_ui_protocol-0.1.10-py3-none-any.whl", hash = "sha256:c81e6981f30aabdf97a7ee312bfd4df0cd38e718d9fc10019c7d438128b93ab5", size = 7889, upload-time = "2025-11-06T15:17:15.325Z" }, +] + [[package]] name = "agent-lifecycle-toolkit" version = "0.4.4" @@ -5531,6 +5543,7 @@ name = "langflow" version = "1.7.0" source = { editable = "." } dependencies = [ + { name = "ag-ui-protocol" }, { name = "agent-lifecycle-toolkit" }, { name = "aioboto3" }, { name = "aiofile" }, @@ -5740,6 +5753,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "ag-ui-protocol", specifier = ">=0.1.10" }, { name = "agent-lifecycle-toolkit", specifier = "~=0.4.4" }, { name = "aioboto3", specifier = ">=15.2.0,<16.0.0" }, { name = "aiofile", specifier = ">=3.9.0,<4.0.0" }, @@ -6393,6 +6407,7 @@ name = "lfx" version = "0.2.0" source = { editable = "src/lfx" } dependencies = [ + { name = "ag-ui-protocol" }, { name = "aiofile" }, { name = "aiofiles" }, { name = "asyncer" }, @@ -6442,6 +6457,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "ag-ui-protocol", specifier = ">=0.1.10" }, { name = "aiofile", specifier = ">=3.8.0,<4.0.0" }, { name = "aiofiles", specifier = ">=24.1.0,<25.0.0" }, { name = "asyncer", specifier = ">=0.0.8,<1.0.0" },