Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ dependencies = [
"langchain-mcp-adapters>=0.1.14,<0.2.0", # Pin to avoid incompatibility with langchain-core<1.0.0
"agent-lifecycle-toolkit~=0.4.4",
"astrapy>=2.1.0,<3.0.0",
"aioboto3>=15.2.0,<16.0.0"
"aioboto3>=15.2.0,<16.0.0",
"ag-ui-protocol>=0.1.10",
]


Expand Down
1 change: 1 addition & 0 deletions src/lfx/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"filelock>=3.20.0",
"pypdf>=5.1.0",
"cryptography>=43.0.0",
"ag-ui-protocol>=0.1.10",
]

[project.scripts]
Expand Down
Empty file.
111 changes: 111 additions & 0 deletions src/lfx/src/lfx/events/observability/lifecycle_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import functools
from collections.abc import Awaitable, Callable
from typing import Any

from ag_ui.encoder.encoder import EventEncoder

from lfx.log.logger import logger

AsyncMethod = Callable[..., Awaitable[Any]]

encoder: EventEncoder = EventEncoder()


def observable(observed_method: AsyncMethod) -> AsyncMethod:
"""Decorator to make an async method observable by emitting lifecycle events.

Decorated classes are expected to implement specific methods to emit AGUI events:
- `before_callback_event(*args, **kwargs)`: Called before the decorated method executes.
It should return a dictionary representing the event payload.
- `after_callback_event(result, *args, **kwargs)`: Called after the decorated method
successfully completes. It should return a dictionary representing the event payload.
The `result` of the decorated method is passed as the first argument.
- `error_callback_event(exception, *args, **kwargs)`: (Optional) Called if the decorated
method raises an exception. It should return a dictionary representing the error event payload.
The `exception` is passed as the first argument.

If these methods are implemented, the decorator will call them to generate event payloads.
If an implementation is missing, the corresponding event publishing will be skipped without error.

Payloads returned by these methods can include custom metrics by placing them
under the 'langflow' key within the 'raw_events' dictionary.

Example:
class MyClass:
display_name = "My Observable Class"

def before_callback_event(self, *args, **kwargs):
return {"event_name": "my_method_started", "data": {"input_args": args}}

async def my_method(self, event_manager: EventManager, data: str):
# ... method logic ...
return "processed_data"

def after_callback_event(self, result, *args, **kwargs):
return {"event_name": "my_method_completed", "data": {"output": result}}

def error_callback_event(self, exception, *args, **kwargs):
return {"event_name": "my_method_failed", "error": str(exception)}

@observable
async def my_observable_method(self, event_manager: EventManager, data: str):
# ... method logic ...
pass
"""

async def check_event_manager(self, **kwargs):
if "event_manager" not in kwargs or kwargs["event_manager"] is None:
await logger.awarning(
f"EventManager not available/provided, skipping observable event publishing "
f"from {self.__class__.__name__}"
)
return False
return True

async def before_callback(self, *args, **kwargs):
if not await check_event_manager(self, **kwargs):
return

if hasattr(self, "before_callback_event"):
event_payload = self.before_callback_event(*args, **kwargs)
event_payload = encoder.encode(event_payload)
# TODO: Publish event per request, would required context based queues
else:
await logger.awarning(
f"before_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

async def after_callback(self, res: Any | None = None, *args, **kwargs):
if not await check_event_manager(self, **kwargs):
return
if hasattr(self, "after_callback_event"):
event_payload = self.after_callback_event(res, *args, **kwargs)
event_payload = encoder.encode(event_payload)
# TODO: Publish event per request, would required context based queues
else:
await logger.awarning(
f"after_callback_event not implemented for {self.__class__.__name__}. Skipping event publishing."
)

@functools.wraps(observed_method)
async def wrapper(self, *args, **kwargs):
await before_callback(self, *args, **kwargs)
result = None
try:
result = await observed_method(self, *args, **kwargs)
await after_callback(self, result, *args, **kwargs)
except Exception as e:
await logger.aerror(f"Exception in {self.__class__.__name__}: {e}")
if hasattr(self, "error_callback_event"):
try:
event_payload = self.error_callback_event(e, *args, **kwargs)
event_payload = encoder.encode(event_payload)
# TODO: Publish event per request, would required context based queues
except Exception as callback_e: # noqa: BLE001
await logger.aerror(
f"Exception during error_callback_event for {self.__class__.__name__}: {callback_e}"
)
raise
return result

return wrapper
23 changes: 23 additions & 0 deletions src/lfx/src/lfx/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from itertools import chain
from typing import TYPE_CHECKING, Any, cast

from ag_ui.core import RunFinishedEvent, RunStartedEvent

from lfx.events.observability.lifecycle_events import observable
from lfx.exceptions.component import ComponentBuildError
from lfx.graph.edge.base import CycleEdge, Edge
from lfx.graph.graph.constants import Finish, lazy_load_vertex_dict
Expand Down Expand Up @@ -728,6 +731,7 @@ def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input
raise ValueError(msg)
vertex.update_raw_params(inputs, overwrite=True)

@observable
Copy link
Collaborator Author

@dkaushik94 dkaushik94 Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up: I will remove this once the PR is reviewed and everybody has understood what the intended use of the observable decorator is. This shouldn't go to production since it's not necessary to be called until we are ready for our streaming APIs.

async def _run(
self,
*,
Expand Down Expand Up @@ -2309,3 +2313,22 @@ def __to_dict(self) -> dict[str, dict[str, list[str]]]:
predecessors = [i.id for i in self.get_predecessors(vertex)]
result |= {vertex_id: {"successors": sucessors, "predecessors": predecessors}}
return result

def raw_event_metrics(self, optional_fields: dict | None = None) -> dict:
if optional_fields is None:
optional_fields = {}
import time

return {"timestamp": time.time(), **optional_fields}

def before_callback_event(self, *args, **kwargs) -> RunStartedEvent: # noqa: ARG002
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunStartedEvent(run_id=self._run_id, thread_id=self.flow_id, raw_event=metrics)

def after_callback_event(self, result: Any = None, *args, **kwargs) -> RunFinishedEvent: # noqa: ARG002
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"total_components": len(self.vertices)})
return RunFinishedEvent(run_id=self._run_id, thread_id=self.flow_id, result=None, raw_event=metrics)
42 changes: 41 additions & 1 deletion src/lfx/src/lfx/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from enum import Enum
from typing import TYPE_CHECKING, Any

from ag_ui.core import StepFinishedEvent, StepStartedEvent

from lfx.events.observability.lifecycle_events import observable
from lfx.exceptions.component import ComponentBuildError
from lfx.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from lfx.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction
Expand Down Expand Up @@ -179,6 +182,7 @@ def get_built_result(self):

if isinstance(self.built_result, UnbuiltResult):
return {}

return self.built_result if isinstance(self.built_result, dict) else {"result": self.built_result}

def set_artifacts(self) -> None:
Expand Down Expand Up @@ -380,6 +384,7 @@ def instantiate_component(self, user_id=None) -> None:
vertex=self,
)

@observable
async def _build(
self,
fallback_to_env_vars,
Expand All @@ -389,7 +394,6 @@ async def _build(
"""Initiate the build process."""
await logger.adebug(f"Building {self.display_name}")
await self._build_each_vertex_in_params_dict()

if self.base_type is None:
msg = f"Base type for vertex {self.display_name} not found"
raise ValueError(msg)
Expand Down Expand Up @@ -833,3 +837,39 @@ def apply_on_outputs(self, func: Callable[[Any], Any]) -> None:
return
# Apply the function to each output
[func(output) for output in self.custom_component.get_outputs_map().values()]

# AGUI/AG UI Event Streaming Callbacks/Methods - (Optional, see Observable decorator)
def raw_event_metrics(self, optional_fields: dict | None) -> dict:
"""This method is used to get the metrics of the vertex by the Observable decorator.

If the vertex has a get_metrics method, it will be called, and the metrics will be captured
to stream back to the user in an AGUI compliant format.
Additional fields/metrics to be captured can be modified in this method, or in the callback methods,
which are before_callback_event and after_callback_event before returning the AGUI event.
"""
if optional_fields is None:
optional_fields = {}
import time

return {"timestamp": time.time(), **optional_fields}

def before_callback_event(self, *args, **kwargs) -> StepStartedEvent: # noqa: ARG002
"""Should be a AGUI compatible event.

VERTEX class generates a StepStartedEvent event.
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"component_id": self.id})

return StepStartedEvent(step_name=self.display_name, raw_event={"langflow": metrics})

def after_callback_event(self, result, *args, **kwargs) -> StepFinishedEvent: # noqa: ARG002
"""Should be a AGUI compatible event.

VERTEX class generates a StepFinishedEvent event.
"""
metrics = {}
if hasattr(self, "raw_event_metrics"):
metrics = self.raw_event_metrics({"component_id": self.id})
return StepFinishedEvent(step_name=self.display_name, raw_event={"langflow": metrics})
Empty file.
Loading
Loading