Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ dependencies = [
"cuga~=0.1.11",
"agent-lifecycle-toolkit~=0.4.4",
"astrapy>=2.1.0,<3.0.0",
"aioboto3>=15.2.0,<16.0.0"
"aioboto3>=15.2.0,<16.0.0",
"ag-ui-protocol>=0.1.10",
]


Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def stop_streamable_http_manager() -> None:
await _streamable_http.stop()


streamable_http_route_config = { # use for all streamable http routes (except for the health check)
streamable_http_route_config = { # use for all streamable http routes (except for the health check)
"methods": ["GET", "POST", "DELETE"],
"response_class": ResponseNoOp,
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/mcp_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ async def _handle_project_sse_messages(
current_project_ctx.reset(project_token)
current_request_variables_ctx.reset(req_vars_token)


@router.post("/{project_id}")
@router.post("/{project_id}/")
async def handle_project_messages(
Expand Down Expand Up @@ -438,7 +439,6 @@ async def _dispatch_project_streamable_http(
return ResponseNoOp(status_code=200)



streamable_http_route_config = {
"methods": ["GET", "POST", "DELETE"],
"response_class": ResponseNoOp,
Expand Down
2 changes: 1 addition & 1 deletion src/backend/tests/unit/api/utils/test_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _build_server_config(base_url: str, project_id, transport: str):
"""Return URL and server config for a given transport."""
suffix = "streamable" if transport == "streamable" else "sse"
url = f"{base_url}/api/v1/mcp/project/{project_id}/{suffix}"
if transport == "streamable": # noqa: SIM108
if transport == "streamable": # noqa: SIM108
args = ["mcp-proxy", "--transport", "streamablehttp", url]
else:
args = ["mcp-proxy", url]
Expand Down
1 change: 1 addition & 0 deletions src/lfx/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"validators>=0.34.0,<1.0.0",
"filelock>=3.20.0",
"pypdf>=5.1.0",
"ag-ui-protocol>=0.1.10",
]

[project.scripts]
Expand Down
Empty file.
126 changes: 126 additions & 0 deletions src/lfx/src/lfx/events/observability/lifecycle_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import functools
from collections.abc import Awaitable, Callable
from typing import Any

from ag_ui.encoder import EventEncoder

from lfx.log.logger import logger

AsyncMethod = Callable[..., Awaitable[Any]]


def observable(observed_method: AsyncMethod) -> AsyncMethod:
"""Make an async method emit lifecycle events by invoking optional lifecycle hooks on its host before execution, after successful completion, and on exceptions.

If implemented on the host object, the following hooks will be called to produce event payloads:
- before_callback_event(*args, **kwargs): called before the observed method executes.
- after_callback_event(result, *args, **kwargs): called after the observed method completes successfully; receives the method result as the first argument.
- error_callback_event(exception, *args, **kwargs): called if the observed method raises an exception; receives the exception as the first argument.

Event publishing is skipped if an "event_manager" keyword argument is not provided or is None. Payloads produced by the hooks are encoded with EventEncoder; payloads may include custom metrics under the 'langflow' key within 'raw_events'. Missing hook implementations are ignored (no error).

Returns:
A wrapper async function that preserves the original method's metadata, invokes the described lifecycle hooks at the appropriate times, and returns the original method's result.
"""

async def check_event_manager(self, **kwargs):
"""Verify that an EventManager is provided in the call's keyword arguments.

Parameters:
**kwargs: Keyword arguments from the observed method call; expected to include an `event_manager` entry.

Returns:
bool: `true` if an `event_manager` keyword with a non-None value is present, `false` otherwise.

Notes:
Logs a warning when the `event_manager` is missing or None.
"""
if "event_manager" not in kwargs or kwargs["event_manager"] is None:
await logger.awarning(
f"EventManager not available/provided, skipping observable event publishing "
f"from {self.__class__.__name__}"
)
return False
return True

async def before_callback(self, *args, **kwargs):
"""Invoke the instance's `before_callback_event` hook (if implemented), encode its returned payload, and prepare it for publishing.

Checks that a non-None `event_manager` is present in `kwargs` before proceeding; if absent, the function logs a warning and returns without action. If `before_callback_event` exists on the instance, calls it with `*args` and `**kwargs`, encodes the resulting payload with `EventEncoder`, and leaves the payload ready to be published (publishing is not implemented here). If the hook is not implemented, logs a warning and skips publishing.

Parameters:
*args: Positional arguments forwarded to `before_callback_event`.
**kwargs: Keyword arguments forwarded to `before_callback_event` (must include `event_manager` to enable publishing).
"""
if not await check_event_manager(self, **kwargs):
return

if hasattr(self, "before_callback_event"):
event_payload = self.before_callback_event(*args, **kwargs)
encoder = EventEncoder()
event_payload = encoder.encode(event_payload)
# TODO: Publish event
else:
await logger.awarning(
f"before_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

async def after_callback(self, res: Any | None = None, *args, **kwargs):
"""Handle the post-execution lifecycle event by encoding and publishing the payload produced by the host's `after_callback_event` hook.

If an `event_manager` is not present in kwargs, the function does nothing. If the hosting object defines `after_callback_event`, that hook is called with the method result and any forwarded arguments; its returned payload is encoded via EventEncoder and prepared for publishing (actual publish is TODO). If `after_callback_event` is not implemented, a warning is logged and no event is published.

Parameters:
res (Any | None): The result returned by the observed method; may be None.
*args: Positional arguments forwarded to the host's `after_callback_event` hook.
**kwargs: Keyword arguments forwarded to the host's `after_callback_event` hook; used to locate `event_manager`.
"""
if not await check_event_manager(self, **kwargs):
return
if hasattr(self, "after_callback_event"):
event_payload = self.after_callback_event(res, *args, **kwargs)
encoder = EventEncoder()
event_payload = encoder.encode(event_payload)
# TODO: Publish event
else:
await logger.awarning(
f"after_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

@functools.wraps(observed_method)
async def wrapper(self, *args, **kwargs):
"""Wraps the original async method to emit lifecycle events before execution, after successful completion, and on error.

The wrapper calls a before callback (if implemented) prior to invoking the original method, an after callback (if implemented) after a successful call, and an error callback (if implemented) when the wrapped method raises. Event payloads produced by these callbacks are encoded via EventEncoder; publishing is performed elsewhere.

Parameters:
*args: Positional arguments forwarded to the before/after/error callbacks and the wrapped method.
**kwargs: Keyword arguments forwarded to the before/after/error callbacks and the wrapped method.

Returns:
The result returned by the wrapped async method.

Raises:
Exception: Re-raises any exception thrown by the wrapped method after attempting to handle and encode an error event.
"""
await before_callback(self, *args, **kwargs)
result = None
try:
result = await observed_method(self, *args, **kwargs)
await after_callback(self, result, *args, **kwargs)
except Exception as e:
await logger.aerror(f"Exception in {self.__class__.__name__}: {e}")
if hasattr(self, "error_callback_event"):
try:
event_payload = self.error_callback_event(e, *args, **kwargs)
encoder = EventEncoder()
event_payload = encoder.encode(event_payload)
# TODO: Publish error event
except Exception as callback_e:
await logger.aerror(
f"Exception during error_callback_event for {self.__class__.__name__}: {callback_e}"
)
raise e
return result

return wrapper
90 changes: 74 additions & 16 deletions src/lfx/src/lfx/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from itertools import chain
from typing import TYPE_CHECKING, Any, cast

from ag_ui.core import RunFinishedEvent, RunStartedEvent

from lfx.events.observability.lifecycle_events import observable
from lfx.exceptions.component import ComponentBuildError
from lfx.graph.edge.base import CycleEdge, Edge
from lfx.graph.graph.constants import Finish, lazy_load_vertex_dict
Expand Down Expand Up @@ -708,11 +711,17 @@ def define_vertices_lists(self) -> None:
self._is_state_vertices.append(vertex.id)

def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input_type: InputType | None) -> None:
"""Updates input vertices' parameters with the provided inputs, filtering by component list and input type.
"""Update parameters of input vertices that match the specified components and input type.

Only vertices listed in `self._is_input_vertices` are considered. If `input_components` is provided, a vertex is updated only if its id or display_name appears in that list; if `input_components` is empty, all input vertices are considered. If `input_type` is provided and not equal to the string `"any"`, a vertex is updated only if `input_type` appears in the vertex's id (case-insensitive). Updated parameters replace existing values.

Only vertices whose IDs or display names match the specified input components and whose IDs contain
the input type (unless input type is 'any' or None) are updated. Raises a ValueError if a specified
vertex is not found.
Parameters:
input_components (list[str]): Component ids or display names to restrict which input vertices to update. An empty list means all input vertices.
inputs (dict[str, str]): Mapping of input parameter names to values to apply to matching vertices.
input_type (InputType | None): Optional type filter; when not `None` and not `"any"`, only vertices whose id contains this type (case-insensitive) are updated.

Raises:
ValueError: If a vertex id listed in `self._is_input_vertices` cannot be found.
"""
for vertex_id in self._is_input_vertices:
vertex = self.get_vertex(vertex_id)
Expand All @@ -728,6 +737,7 @@ def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input
raise ValueError(msg)
vertex.update_raw_params(inputs, overwrite=True)

@observable
async def _run(
self,
*,
Expand All @@ -740,20 +750,16 @@ async def _run(
fallback_to_env_vars: bool,
event_manager: EventManager | None = None,
) -> list[ResultData | None]:
"""Runs the graph with the given inputs.
"""Execute the graph with the provided inputs and collect requested outputs.

Args:
inputs (Dict[str, str]): The input values for the graph.
input_components (list[str]): The components to run for the inputs.
input_type: (Optional[InputType]): The input type.
outputs (list[str]): The outputs to retrieve from the graph.
stream (bool): Whether to stream the results or not.
session_id (str): The session ID for the graph.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
event_manager (EventManager | None): The event manager for the graph.
Runs a single graph execution: applies input values, tags vertices with the session ID, optionally stores the graph in the chat cache, executes the processing pipeline, finalizes tracing, and gathers results from output vertices or explicitly requested outputs.

Returns:
List[Optional["ResultData"]]: The outputs of the graph.
A list containing each collected vertex result in execution order; entries correspond to outputs produced by output vertices or those named in `outputs`.

Raises:
TypeError: If the primary input value is not a string.
ValueError: If `input_components` is not a list, if a required vertex cannot be found, or if an error occurs during graph processing.
"""
if input_components and not isinstance(input_components, list):
msg = f"Invalid components value: {input_components}. Expected list"
Expand Down Expand Up @@ -2288,11 +2294,63 @@ def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]],
return predecessor_map, successor_map

def __to_dict(self) -> dict[str, dict[str, list[str]]]:
"""Converts the graph to a dictionary."""
"""Return a mapping of each vertex id to its predecessor and successor vertex id lists.

Produces a dictionary where each key is a vertex id and the value is a dict with two keys:
"successors" — list of ids of all successor vertices reachable from that vertex;
"predecessors" — list of ids of immediate predecessor vertices for that vertex.

Returns:
graph_map (dict[str, dict[str, list[str]]]): Mapping of vertex id -> {"successors": [...], "predecessors": [...]}.
"""
result: dict = {}
for vertex in self.vertices:
vertex_id = vertex.id
sucessors = [i.id for i in self.get_all_successors(vertex)]
predecessors = [i.id for i in self.get_predecessors(vertex)]
result |= {vertex_id: {"successors": sucessors, "predecessors": predecessors}}
return result

def raw_event_metrics(self, optional_fields: dict | None = None) -> dict:
"""Create a timestamped metrics dictionary and merge in any provided optional fields.

Parameters:
optional_fields (dict | None): Additional key-value pairs to include in the metrics. If omitted, no extra fields are added.

Returns:
dict: A dictionary containing a numeric `timestamp` (seconds since the epoch) plus any keys from `optional_fields`.
"""
if optional_fields is None:
optional_fields = {}
import time

return {"timestamp": time.time(), **optional_fields}

def before_callback_event(self, *args, **kwargs) -> RunStartedEvent: # noqa: ARG002
"""Create a RunStartedEvent capturing the current run id, flow id, and optional raw metrics.

If the Graph implements `raw_event_metrics`, that method is called with `{"total_components": <number>}` and its result is included as the event's `raw_event`; otherwise `raw_event` is an empty dict.

Returns:
RunStartedEvent: Event containing `run_id`, `thread_id` (flow id), and `raw_event` metrics.
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunStartedEvent(run_id=self._run_id, thread_id=self.flow_id, raw_event=metrics)

def after_callback_event(self, result: Any = None, *args, **kwargs) -> RunFinishedEvent: # noqa: ARG002
"""Create a RunFinishedEvent for the current run, attaching run identifiers and optional metrics.

Parameters:
result (Any): Final result of the run (ignored when constructing the event).
*args: Ignored.
**kwargs: Ignored.

Returns:
RunFinishedEvent: Event populated with the graph's run_id, flow_id as thread_id, `result` set to None, and `raw_event` metrics (includes `total_components` when available).
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunFinishedEvent(run_id=self._run_id, thread_id=self.flow_id, result=None, raw_event=metrics)
Loading