Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ dependencies = [
"cuga~=0.1.11",
"agent-lifecycle-toolkit~=0.4.4",
"astrapy>=2.1.0,<3.0.0",
"aioboto3>=15.2.0,<16.0.0"
"aioboto3>=15.2.0,<16.0.0",
"ag-ui-protocol>=0.1.10",
]


Expand Down
1 change: 1 addition & 0 deletions src/lfx/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"validators>=0.34.0,<1.0.0",
"filelock>=3.20.0",
"pypdf>=5.1.0",
"ag-ui-protocol>=0.1.10",
]

[project.scripts]
Expand Down
Empty file.
111 changes: 111 additions & 0 deletions src/lfx/src/lfx/events/observability/lifecycle_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import functools
from collections.abc import Awaitable, Callable
from typing import Any

from ag_ui.encoder import EventEncoder

from lfx.log.logger import logger

AsyncMethod = Callable[..., Awaitable[Any]]


def observable(observed_method: AsyncMethod) -> AsyncMethod:
"""Make an async method emit lifecycle events by invoking optional callback hooks on the hosting instance.

The hosting class may implement the following optional hooks to produce event payloads:
- before_callback_event(*args, **kwargs) -> dict: called before the decorated method runs.
- after_callback_event(result, *args, **kwargs) -> dict: called after the decorated method completes successfully.
- error_callback_event(exception, *args, **kwargs) -> dict: called if the decorated method raises an exception.

If a hook is implemented, its returned dictionary will be encoded via EventEncoder and prepared for publishing; if a hook is absent, the corresponding event is skipped. Payloads may include custom metrics under the 'langflow' key inside a 'raw_events' dictionary.

Returns:
The wrapped async function that preserves the original method's behavior while invoking lifecycle hooks when available.
"""

async def check_event_manager(self, **kwargs):
"""Check whether an EventManager instance is present in the provided keyword arguments.

Parameters:
kwargs: Expects an 'event_manager' key whose value is the EventManager used for publishing lifecycle events.

Returns:
`True` if 'event_manager' exists in kwargs and is not None, `False` otherwise.
"""
if "event_manager" not in kwargs or kwargs["event_manager"] is None:
await logger.awarning(
f"EventManager not available/provided, skipping observable event publishing "
f"from {self.__class__.__name__}"
)
return False
return True

async def before_callback(self, *args, **kwargs):
"""Invoke the instance's pre-execution lifecycle hook to produce and encode an event payload.

Checks for a valid `event_manager` in `kwargs`; if absent the function returns without action.
If the hosting instance implements `before_callback_event(*args, **kwargs)`, calls it to obtain a payload,
encodes the payload with EventEncoder (and prepares it for publishing). If the hook is not implemented,
logs a warning and skips publishing.
"""
if not await check_event_manager(self, **kwargs):
return

if hasattr(self, "before_callback_event"):
event_payload = self.before_callback_event(*args, **kwargs)
encoder = EventEncoder()
event_payload = encoder.encode(event_payload)
# TODO: Publish event
else:
await logger.awarning(
f"before_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

async def after_callback(self, res: Any | None = None, *args, **kwargs):
"""Invoke the instance's after_callback_event to produce and encode a post-execution event payload when an EventManager is provided.

Parameters:
res (Any | None): The result produced by the observed method; forwarded to `after_callback_event`.
*args: Positional arguments forwarded to `after_callback_event`.
**kwargs: Keyword arguments forwarded to `after_callback_event`. May include `event_manager` required to publish events; if no valid `event_manager` is present, the function returns without encoding or publishing.
"""
if not await check_event_manager(self, **kwargs):
return
if hasattr(self, "after_callback_event"):
event_payload = self.after_callback_event(res, *args, **kwargs)
encoder = EventEncoder()
event_payload = encoder.encode(event_payload)
# TODO: Publish event
else:
await logger.awarning(
f"after_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

@functools.wraps(observed_method)
async def wrapper(self, *args, **kwargs):
"""Wraps the observed async method to emit lifecycle events before execution, after successful completion, and on error.

Calls the hosting instance's before_callback and after_callback helpers to produce and encode event payloads when available; if an exception occurs, encodes an error payload using the instance's error_callback_event when present, then re-raises the exception.

Returns:
The value returned by the wrapped observed method.

Raises:
Exception: Propagates any exception raised by the observed method after encoding the error event (if available).
"""
await before_callback(self, *args, **kwargs)
result = None
try:
result = await observed_method(self, *args, **kwargs)
await after_callback(self, result, *args, **kwargs)
except Exception as e:
await logger.aerror(f"Exception in {self.__class__.__name__}: {e}")
if hasattr(self, "error_callback_event"):
error_payload = self.error_callback_event(e, *args, **kwargs)
encoder = EventEncoder()
encoder.encode(error_payload)
# TODO: Publish error event
raise
return result

return wrapper
146 changes: 123 additions & 23 deletions src/lfx/src/lfx/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from itertools import chain
from typing import TYPE_CHECKING, Any, cast

from ag_ui.core import RunFinishedEvent, RunStartedEvent

from lfx.events.observability.lifecycle_events import observable
from lfx.exceptions.component import ComponentBuildError
from lfx.graph.edge.base import CycleEdge, Edge
from lfx.graph.graph.constants import Finish, lazy_load_vertex_dict
Expand Down Expand Up @@ -68,12 +71,27 @@ def __init__(
log_config: LogConfig | None = None,
context: dict[str, Any] | None = None,
) -> None:
"""Initializes a new Graph instance.
"""Create a new Graph instance and initialize its internal execution state.

Parameters:
start (Component | None): Optional start component for the graph; when provided together with `end`
the graph is added and prepared for execution.
end (Component | None): Optional end component for the graph; must be provided together with `start`.
flow_id (str | None): Optional identifier for the flow.
flow_name (str | None): Optional human-readable flow name.
description (str | None): Optional flow description.
user_id (str | None): Optional user identifier used when instantiating components.
log_config (LogConfig | None): Optional logging configuration; if provided, logging is configured.
context (dict[str, Any] | None): Optional execution context; must be a dictionary if provided.

Raises:
TypeError: If `context` is provided and is not a dict.
ValueError: If exactly one of `start` or `end` is provided.

If both start and end components are provided, the graph is initialized and prepared for execution.
If only one is provided, a ValueError is raised. The context must be a dictionary if specified,
otherwise a TypeError is raised. Internal data structures for vertices, edges, state management,
run management, and tracing are set up during initialization.
Notes:
- When both `start` and `end` are provided the graph will be wired and prepared (prepare is called).
- The constructor initializes internal structures used for vertices, edges, run management, caching,
tracing, and snapshotting.
"""
if log_config:
configure(**log_config)
Expand Down Expand Up @@ -144,7 +162,13 @@ def __init__(

@property
def lock(self):
"""Lazy initialization of asyncio.Lock to avoid event loop binding issues."""
"""Provide a lazily-initialized asyncio Lock.

Initializes the lock on first access to avoid binding it to an event loop at object construction.

Returns:
lock (asyncio.Lock): The lock instance created on first access and reused thereafter.
"""
if self._lock is None:
self._lock = asyncio.Lock()
return self._lock
Expand Down Expand Up @@ -708,11 +732,15 @@ def define_vertices_lists(self) -> None:
self._is_state_vertices.append(vertex.id)

def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input_type: InputType | None) -> None:
"""Updates input vertices' parameters with the provided inputs, filtering by component list and input type.
"""Update input vertices' raw parameters from the provided inputs, filtering which vertices are updated by a list of component identifiers and an optional input type.

Parameters:
input_components (list[str]): Vertex IDs or display names to target; if empty, all input vertices are considered.
inputs (dict[str, str]): Mapping of input field names to values to set on each matched input vertex.
input_type (InputType | None): If `None` or `"any"`, do not filter by type; otherwise only update vertices whose ID contains this type (case-insensitive).

Only vertices whose IDs or display names match the specified input components and whose IDs contain
the input type (unless input type is 'any' or None) are updated. Raises a ValueError if a specified
vertex is not found.
Raises:
ValueError: If a referenced input vertex cannot be found.
"""
for vertex_id in self._is_input_vertices:
vertex = self.get_vertex(vertex_id)
Expand All @@ -728,6 +756,7 @@ def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input
raise ValueError(msg)
vertex.update_raw_params(inputs, overwrite=True)

@observable
async def _run(
self,
*,
Expand All @@ -740,20 +769,24 @@ async def _run(
fallback_to_env_vars: bool,
event_manager: EventManager | None = None,
) -> list[ResultData | None]:
"""Runs the graph with the given inputs.

Args:
inputs (Dict[str, str]): The input values for the graph.
input_components (list[str]): The components to run for the inputs.
input_type: (Optional[InputType]): The input type.
outputs (list[str]): The outputs to retrieve from the graph.
stream (bool): Whether to stream the results or not.
session_id (str): The session ID for the graph.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
event_manager (EventManager | None): The event manager for the graph.
"""Execute the graph using the provided inputs and return the results collected from output vertices.

Parameters:
inputs (dict[str, str]): Map of input field names to string values used to populate graph inputs.
input_components (list[str]): IDs of components that should receive the inputs; empty list means no specific mapping.
input_type (InputType | None): Optional type hint for the provided inputs.
outputs (list[str]): List of output component IDs or display names to collect results from; empty list collects all output vertices.
stream (bool): If True, streaming outputs may be left as generators; if False, streaming generators will be consumed to produce final results.
session_id (str): Session identifier to attach to vertices that accept session-scoped parameters.
fallback_to_env_vars (bool): If True, allow components to read missing inputs from environment variables where supported.
event_manager (EventManager | None): Optional event manager used during processing for lifecycle or observability hooks.

Returns:
List[Optional["ResultData"]]: The outputs of the graph.
list[ResultData | None]: A list of results corresponding to output vertices; each element is a vertex result (`ResultData`) or `None` if a vertex produced no result.

Raises:
TypeError: If an expected input value is not a string.
ValueError: If provided component lists are invalid, a referenced vertex is missing, or graph processing fails.
"""
if input_components and not isinstance(input_components, list):
msg = f"Invalid components value: {input_components}. Expected list"
Expand Down Expand Up @@ -1399,6 +1432,18 @@ async def astep(
user_id: str | None = None,
event_manager: EventManager | None = None,
):
"""Advance the graph execution by building the next scheduled vertex and update run state.

Parameters:
inputs (InputValueRequest | None): Optional input values for the vertex being built.
files (list[str] | None): Optional list of file paths to provide to the vertex build.
user_id (str | None): Optional identifier of the user initiating the step.
event_manager (EventManager | None): Optional event manager used during vertex build.

Returns:
VertexBuildResult: Result of building the next vertex when a vertex was processed.
Finish: A sentinel `Finish` instance when the run queue is empty and execution is complete.
"""
if not self._prepared:
msg = "Graph not prepared. Call prepare() first."
raise ValueError(msg)
Expand All @@ -1421,6 +1466,11 @@ async def get_cache_func(*args, **kwargs): # noqa: ARG001
return None

async def set_cache_func(*args, **kwargs) -> bool: # noqa: ARG001
"""No-op fallback cache setter that accepts any arguments and always reports success.

Returns:
`true` indicating the cache operation was accepted.
"""
return True

vertex_build_result = await self.build_vertex(
Expand Down Expand Up @@ -2288,11 +2338,61 @@ def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]],
return predecessor_map, successor_map

def __to_dict(self) -> dict[str, dict[str, list[str]]]:
"""Converts the graph to a dictionary."""
"""Produce a mapping of each vertex ID to its successor and predecessor vertex ID lists.

Returns:
dict[str, dict[str, list[str]]]: A dictionary where each key is a vertex ID and each value is a dict with two keys:
- "successors": list of successor vertex IDs
- "predecessors": list of predecessor vertex IDs
"""
result: dict = {}
for vertex in self.vertices:
vertex_id = vertex.id
sucessors = [i.id for i in self.get_all_successors(vertex)]
predecessors = [i.id for i in self.get_predecessors(vertex)]
result |= {vertex_id: {"successors": sucessors, "predecessors": predecessors}}
return result

def raw_event_metrics(self, optional_fields: dict | None = None) -> dict:
"""Create a timestamped metrics payload merged with any provided fields.

Parameters:
optional_fields (dict | None): Additional key-value metrics to include in the payload.

Returns:
dict: A dictionary containing a "timestamp" key with the current POSIX time in seconds and the merged optional fields.
"""
if optional_fields is None:
optional_fields = {}
import time

return {"timestamp": time.time(), **optional_fields}

def before_callback_event(self, *args, **kwargs) -> RunStartedEvent: # noqa: ARG002
"""Create a RunStartedEvent populated with the graph's run and flow identifiers and optional raw metrics.

If the Graph exposes `raw_event_metrics`, its output is included in the event's `raw_event` (it will include `total_components` keyed to the count of vertices).

Returns:
RunStartedEvent: Event with `run_id` set to the graph's `_run_id`, `thread_id` set to `flow_id`, and `raw_event` containing any metrics.
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunStartedEvent(run_id=self._run_id, thread_id=self.flow_id, raw_event=metrics)

def after_callback_event(self, result: Any = None, *args, **kwargs) -> RunFinishedEvent: # noqa: ARG002
"""Create a RunFinishedEvent representing the end of the current run.

Parameters:
result (Any): Final run result (currently unused when constructing the event).
*args: Ignored.
**kwargs: Ignored.

Returns:
RunFinishedEvent: Event containing `run_id`, `thread_id` (flow_id), `result` set to `None`, and optional `raw_event` metrics including `total_components` when available.
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunFinishedEvent(run_id=self._run_id, thread_id=self.flow_id, result=None, raw_event=metrics)
Loading