From 16895f462cda8250d575fc2c72df1b2761136e56 Mon Sep 17 00:00:00 2001 From: pira998 Date: Sat, 20 Sep 2025 20:47:36 +0000 Subject: [PATCH 1/3] feat:Implement cross-workflow observability feature with context propagation and processors Signed-off-by: pira998 --- .../cross_workflow_tracking/README.md | 282 +++++++++++++ .../cross_workflow_tracking_example.py | 119 ++++++ .../cross_workflow_tracking/example.py | 282 +++++++++++++ .../integrated_example.py | 381 +++++++++++++++++ .../observability_config.yml | 41 ++ .../workflow_integration_example.py | 189 +++++++++ src/nat/builder/context.py | 30 ++ src/nat/builder/workflow.py | 17 +- src/nat/observability/__init__.py | 14 + src/nat/observability/context.py | 210 ++++++++++ .../processor/cross_workflow_processor.py | 165 ++++++++ src/nat/observability/workflow_utils.py | 281 +++++++++++++ src/nat/runtime/runner.py | 11 +- .../test_cross_workflow_observability.py | 387 ++++++++++++++++++ 14 files changed, 2404 insertions(+), 5 deletions(-) create mode 100644 examples/observability/cross_workflow_tracking/README.md create mode 100644 examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py create mode 100644 examples/observability/cross_workflow_tracking/example.py create mode 100644 examples/observability/cross_workflow_tracking/integrated_example.py create mode 100644 examples/observability/cross_workflow_tracking/observability_config.yml create mode 100644 examples/observability/cross_workflow_tracking/workflow_integration_example.py create mode 100644 src/nat/observability/context.py create mode 100644 src/nat/observability/processor/cross_workflow_processor.py create mode 100644 src/nat/observability/workflow_utils.py create mode 100644 tests/nat/observability/test_cross_workflow_observability.py diff --git a/examples/observability/cross_workflow_tracking/README.md b/examples/observability/cross_workflow_tracking/README.md new file mode 100644 index 0000000000..8c5ad370d0 --- /dev/null +++ b/examples/observability/cross_workflow_tracking/README.md @@ -0,0 +1,282 @@ +# Cross-Workflow Observability Example + +This example demonstrates how to use NeMo Agent Toolkit's cross-workflow observability feature to track and monitor execution across multiple interconnected workflows. This feature provides end-to-end visibility into complex workflow chains and enables comprehensive performance analysis and debugging. + +## Overview + +The cross-workflow observability feature allows you to: + +- **Propagate observability context** across multiple workflow executions +- **Track parent-child relationships** between workflows with hierarchical depth tracking +- **Maintain trace continuity** across workflow boundaries for distributed processing +- **Analyze performance and dependencies** across workflow chains with timing data +- **Debug issues** that span multiple workflows with comprehensive error tracking +- **Serialize and restore context** for distributed systems and microservices +- **Add custom attributes** for enhanced filtering and analysis + +## Key Components + +### 1. ObservabilityContext +Manages trace IDs, span hierarchies, and workflow metadata: + +```python +from nat.observability import ObservabilityContext + +# Create a root context +context = ObservabilityContext.create_root_context("main_workflow") + +# Create child context for sub-workflow +child_context = context.create_child_context("sub_workflow") +``` + +### 2. ObservabilityWorkflowInvoker +Utility for invoking workflows with context propagation: + +```python +from nat.observability import ObservabilityWorkflowInvoker + +# Invoke workflow with observability context +result = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=my_workflow, + message=input_data, + workflow_name="data_processing", + parent_context=parent_context +) +``` + +### 3. Cross-Workflow Processors +Enhance spans with cross-workflow information: + +```python +from nat.observability import CrossWorkflowProcessor, WorkflowRelationshipProcessor + +# Add to your observability configuration +processors = [ + CrossWorkflowProcessor(), + WorkflowRelationshipProcessor() +] +``` + +## Usage Patterns + +### Pattern 1: Parent-Child Workflows + +```python +async def main_workflow(input_data): + # Create root observability context + context = ObservabilityContext.create_root_context("main_workflow") + + # Process data in child workflow + processed_data = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=data_processing_workflow, + message=input_data, + workflow_name="data_processing", + parent_context=context + ) + + # Generate report in another child workflow + report = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=report_generation_workflow, + message=processed_data, + workflow_name="report_generation", + parent_context=context + ) + + return report +``` + +### Pattern 2: Sequential Workflow Chain + +```python +async def sequential_processing(data): + context = ObservabilityContext.create_root_context("sequential_processing") + + # Step 1: Validation + validated_data = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=validation_workflow, + message=data, + workflow_name="validation", + parent_context=context + ) + + # Step 2: Transformation (child of validation) + current_context = ObservabilityWorkflowInvoker.get_current_observability_context() + transformed_data = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=transformation_workflow, + message=validated_data, + workflow_name="transformation", + parent_context=current_context + ) + + # Step 3: Storage + result = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=storage_workflow, + message=transformed_data, + workflow_name="storage", + parent_context=current_context + ) + + return result +``` + +### Pattern 3: Distributed Workflow Coordination + +```python +async def distributed_processing(tasks): + context = ObservabilityContext.create_root_context("distributed_processing") + + # Process tasks in parallel, each with its own workflow + results = await asyncio.gather(*[ + ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=task_processing_workflow, + message=task, + workflow_name=f"task_processor_{i}", + parent_context=context + ) + for i, task in enumerate(tasks) + ]) + + # Aggregate results + aggregated = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=aggregation_workflow, + message=results, + workflow_name="aggregation", + parent_context=context + ) + + return aggregated +``` + +## Observability Data + +The cross-workflow observability feature adds the following attributes to spans: + +### Trace Information +- `observability.trace_id`: Unique trace identifier across all workflows +- `observability.root_span_id`: Root span of the workflow chain +- `observability.current_span_id`: Current span identifier + +### Workflow Information +- `observability.workflow_name`: Name of the current workflow +- `observability.workflow_id`: Unique identifier for the workflow instance +- `observability.workflow_depth`: Nesting level of the workflow +- `observability.workflow_status`: Current status (running, completed, failed) + +### Relationship Information +- `relationship.type`: Type of relationship (root_workflow, child_workflow) +- `relationship.parent_workflow_name`: Name of parent workflow +- `relationship.hierarchy_path`: Full path showing workflow chain +- `relationship.nesting_level`: Depth in the workflow hierarchy + +### Timing Information +- `observability.workflow_start_time`: Workflow start timestamp +- `observability.workflow_end_time`: Workflow end timestamp +- `observability.workflow_duration`: Execution duration in seconds + +## Configuration + +To enable cross-workflow observability in your NAT configuration: + +```yaml +general: + telemetry: + tracing: + main_exporter: + type: "file_exporter" + config: + file_path: "traces.jsonl" + processors: + - type: "cross_workflow_processor" + - type: "workflow_relationship_processor" +``` + +## Benefits + +1. **End-to-End Visibility**: Track execution flow across multiple workflows +2. **Performance Analysis**: Identify bottlenecks in workflow chains +3. **Dependency Mapping**: Understand workflow relationships and dependencies +4. **Debugging**: Trace issues across workflow boundaries +5. **Compliance**: Maintain audit trails for complex multi-workflow operations + +## Best Practices + +1. **Use Meaningful Workflow Names**: Choose descriptive names for better observability +2. **Propagate Context Consistently**: Always pass observability context between workflows +3. **Add Custom Attributes**: Include relevant metadata for better filtering and analysis +4. **Monitor Workflow Depth**: Be aware of deeply nested workflows for performance +5. **Handle Errors Gracefully**: Ensure observability context is maintained during error scenarios + +## Example Files + +This directory contains several example files demonstrating different aspects of cross-workflow observability: + +### 1. `example.py` +Basic demonstration of cross-workflow observability concepts with mock workflows: +- Sequential workflow processing with parent-child relationships +- Parallel workflow execution with shared context +- Nested workflow processing with multiple hierarchy levels +- Observability context creation, propagation, and serialization + +### 2. `cross_workflow_tracking_example.py` +Real NAT workflow integration demonstrating: +- Actual NAT workflow execution with observability context +- Context setting in workflow's context state +- Real-world usage with customer support scenarios +- Trace ID continuity across multiple queries + +### 3. `workflow_integration_example.py` +Advanced integration patterns showing: +- Enhanced workflow execution with observability +- Integration with router agent configurations +- Workflow-to-workflow observability propagation +- Error handling with observability context preservation + +### 4. `observability_config.yml` +Sample configuration file showing: +- Telemetry configuration for cross-workflow tracking +- File exporter setup with cross-workflow processors +- LLM and workflow configuration integration + +### 5. `integrated_example.py` +Comprehensive integration example demonstrating: +- Full NAT workflow integration with observability context propagation +- Sequential and parallel multi-workflow processing patterns +- Real workflow execution with router agent configurations +- Cross-workflow span processing and enhancement +- Context serialization for distributed processing +- Production-ready error handling and status tracking + +## Running the Examples + +### Basic Example +```bash +python example.py +``` +Demonstrates core concepts with mock workflows. + +### Real NAT Integration +```bash +python cross_workflow_tracking_example.py +``` +Shows real NAT workflow execution with observability. + +### Advanced Integration +```bash +python workflow_integration_example.py +``` +Demonstrates advanced patterns with router agent. + +### Comprehensive Integration +```bash +python integrated_example.py +``` +Shows full NAT workflow integration with complete observability features. + +## Example Applications + +- **Data Pipeline**: ETL workflows with validation, transformation, and loading stages +- **Document Processing**: OCR → Analysis → Classification → Storage workflows +- **ML Training Pipeline**: Data Prep → Feature Engineering → Training → Evaluation workflows +- **Customer Service**: Request → Routing → Processing → Response workflows +- **Financial Processing**: Validation → Risk Assessment → Processing → Notification workflows +- **Multi-Agent Systems**: Agent coordination with trace continuity across agent interactions \ No newline at end of file diff --git a/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py new file mode 100644 index 0000000000..20f847902c --- /dev/null +++ b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Simple Real Cross-Workflow Observability Example + +Uses actual NAT workflows with cross-workflow observability. +""" + +import asyncio +import os +import tempfile + +from nat.observability.context import ObservabilityContext +from nat.runtime.loader import load_workflow + + +async def create_simple_config(): + """Create a simple workflow config using built-in NAT functions.""" + + config_content = """ +llms: + nvidia_llm: + _type: nim + model_name: meta/llama-3.1-8b-instruct + temperature: 0.7 + max_tokens: 1024 + +workflow: + _type: chat_completion + llm_name: nvidia_llm + system_prompt: "You are a helpful customer support assistant. Provide clear, concise, and helpful responses." +""" + + # Create temporary config file + with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f: + f.write(config_content.strip()) + return f.name + + +async def main(): + """Demonstrate real NAT workflow with cross-workflow observability.""" + + print("=" * 50) + print("Real NAT Workflow + Cross-Workflow Observability") + print("=" * 50) + + # Create config file + config_path = await create_simple_config() + + try: + # Create root observability context + root_context = ObservabilityContext.create_root_context("customer_support_pipeline") + root_context.add_attribute("session_id", "demo_123") + root_context.add_attribute("user", "customer") + + print("\nRoot context created:") + print(f" - Trace ID: {root_context.trace_id}") + print(f" - Workflow: {root_context.workflow_chain[0].workflow_name}") + + # Sample customer queries + queries = [ + "What are your business hours?", "How can I return a product?", "What payment methods do you accept?" + ] + + # Load the workflow + async with load_workflow(config_path) as workflow: + + for i, query in enumerate(queries, 1): + print(f"\n--- Processing Query {i} ---") + print(f"Query: {query}") + + # Create child context for this query + query_context = root_context.create_child_context(f"query_handler_{i}") + query_context.add_attribute("query_type", "customer_inquiry") + query_context.add_attribute("priority", "normal") + + # Set observability context in the workflow's context state + workflow._context_state.observability_context.set(query_context) + + # Execute workflow + async with workflow.run(query) as runner: + result = await runner.result(to_type=str) + + print(f"Response: {result}") + print("Observability Info:") + print(f" - Trace ID: {query_context.trace_id}") + print(f" - Workflow Depth: {query_context.get_workflow_depth()}") + print(f" - Parent Workflow: {query_context.workflow_chain[0].workflow_name}") + print(f" - Current Workflow: {query_context.workflow_chain[-1].workflow_name}") + + # Demonstrate context serialization + print("\n--- Context Serialization ---") + context_data = root_context.to_dict() + print(f"Serialized context has {len(context_data)} keys") + print(f"Workflow chain length: {len(context_data['workflow_chain'])}") + print(f"Custom attributes: {context_data['custom_attributes']}") + + # Restore context + restored_context = ObservabilityContext.from_dict(context_data) + print(f"Restored trace ID matches: {restored_context.trace_id == root_context.trace_id}") + + finally: + # Clean up config file + os.unlink(config_path) + + print(f"\n{'='*50}") + print("✅ Real workflow integration successful!") + print("Key features demonstrated:") + print("- Real NAT workflow execution with observability") + print("- Trace ID continuity across multiple queries") + print("- Parent-child workflow relationships") + print("- Custom attributes and metadata") + print("- Context serialization for distributed systems") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/observability/cross_workflow_tracking/example.py b/examples/observability/cross_workflow_tracking/example.py new file mode 100644 index 0000000000..c9532cdcbb --- /dev/null +++ b/examples/observability/cross_workflow_tracking/example.py @@ -0,0 +1,282 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Cross-Workflow Observability Example + +This example demonstrates how to use NeMo Agent Toolkit's cross-workflow +observability feature to track execution across multiple interconnected workflows. +""" + +import asyncio +import time +from typing import Any +from typing import Dict +from typing import Optional + +from nat.observability.context import ObservabilityContext + + +# Mock functions for the example +async def validate_data(data: Dict[str, Any]) -> Dict[str, Any]: + """Validate input data.""" + print(f"Validating data: {data}") + # Add validation logic here + data["validated"] = True + return data + + +async def transform_data(data: Dict[str, Any]) -> Dict[str, Any]: + """Transform validated data.""" + print(f"Transforming data: {data}") + # Add transformation logic here + data["transformed"] = True + data["values"] = [x * 2 for x in data.get("values", [])] + return data + + +async def store_data(data: Dict[str, Any]) -> Dict[str, Any]: + """Store transformed data.""" + print(f"Storing data: {data}") + # Add storage logic here + data["stored"] = True + return data + + +async def generate_report(data: Dict[str, Any]) -> Dict[str, Any]: + """Generate a report from stored data.""" + print(f"Generating report from: {data}") + # Add report generation logic here + report = { + "report_id": "report_001", + "data_summary": f"Processed {len(data.get('values', []))} values", + "status": "completed", + "source_data": data + } + return report + + +async def simulate_workflow_execution(workflow_name: str, + input_data: Dict[str, Any], + parent_context: Optional[ObservabilityContext] = None) -> Dict[str, Any]: + """ + Simulate a workflow execution with observability context. + + Args: + workflow_name: Name of the workflow + input_data: Input data for the workflow + parent_context: Optional parent observability context + + Returns: + Result of the workflow execution + """ + # Create or propagate observability context + if parent_context: + obs_context = parent_context.create_child_context(workflow_name) + else: + obs_context = ObservabilityContext.create_root_context(workflow_name) + + # Set timing information + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.start_time = time.time() + current_workflow.status = "running" + + print(f"🔄 Executing workflow: {workflow_name}") + print(f" - Trace ID: {obs_context.trace_id}") + print(f" - Workflow depth: {obs_context.get_workflow_depth()}") + print(f" - Input: {input_data}") + + # Simulate the actual workflow function based on name + if workflow_name == "validation" or "validate" in workflow_name: + result = await validate_data(input_data) + elif workflow_name == "transformation" or "transform" in workflow_name: + result = await transform_data(input_data) + elif workflow_name == "storage" or "store" in workflow_name: + result = await store_data(input_data) + elif workflow_name == "report" in workflow_name: + result = await generate_report(input_data) + else: + # Generic processing + await asyncio.sleep(0.1) # Simulate work + result = input_data.copy() + result[f"{workflow_name}_processed"] = True + + # Update completion status + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "completed" + if current_workflow.start_time: + duration = current_workflow.end_time - current_workflow.start_time + print(f" - Duration: {duration:.3f}s") + + return result + + +async def main(): + """Main function demonstrating cross-workflow observability.""" + + print("=" * 60) + print("Cross-Workflow Observability Demonstration") + print("=" * 60) + + # Example 1: Sequential Processing with Cross-Workflow Observability + print("\n=== Example 1: Sequential Processing ===") + + # Create root observability context + root_context = ObservabilityContext.create_root_context("data_processing_pipeline") + root_context.add_attribute("pipeline_version", "1.0") + root_context.add_attribute("environment", "development") + + # Input data + input_data = {"values": [1, 2, 3, 4, 5], "metadata": {"source": "api", "timestamp": "2024-01-01T00:00:00Z"}} + + # Step 1: Validation + print("\nStep 1: Data Validation") + validated_data = await simulate_workflow_execution("validation", input_data, root_context) + + # Step 2: Transformation (child of validation) + print("\nStep 2: Data Transformation") + validation_context = root_context.create_child_context("validation") + transformed_data = await simulate_workflow_execution("transformation", validated_data, validation_context) + + # Step 3: Storage + print("\nStep 3: Data Storage") + stored_data = await simulate_workflow_execution("storage", transformed_data, validation_context) + + print(f"\nSequential processing result: {stored_data}") + + # Example 2: Parallel Processing with Shared Context + print("\n=== Example 2: Parallel Processing ===") + + # Create a new root context for parallel processing + parallel_context = ObservabilityContext.create_root_context("parallel_data_processing") + parallel_context.add_attribute("processing_mode", "parallel") + + # Multiple data sets to process in parallel + datasets = [{ + "values": [1, 2, 3], "id": "dataset_1" + }, { + "values": [4, 5, 6], "id": "dataset_2" + }, { + "values": [7, 8, 9], "id": "dataset_3" + }] + + # Process datasets in parallel, each with its own workflow chain + print("\nProcessing datasets in parallel:") + tasks = [] + for i, dataset in enumerate(datasets): + task = process_dataset_chain(dataset, i, parallel_context) + tasks.append(task) + + parallel_results = await asyncio.gather(*tasks) + + # Generate consolidated report + print("\nGenerating consolidated report:") + consolidated_report = await simulate_workflow_execution("consolidated_reporting", { + "results": parallel_results, "processing_mode": "parallel" + }, + parallel_context) + + print(f"\nParallel processing report: {consolidated_report}") + + # Example 3: Nested Workflow Processing + print("\n=== Example 3: Nested Workflow Processing ===") + + # Create context for nested processing + nested_context = ObservabilityContext.create_root_context("nested_processing_pipeline") + nested_context.add_attribute("complexity", "high") + + # Complex nested processing + complex_data = { + "batches": [{ + "values": [10, 20, 30], "batch_id": "batch_1" + }, { + "values": [40, 50, 60], "batch_id": "batch_2" + }] + } + + nested_result = await process_complex_nested_workflow(complex_data, nested_context) + + print(f"\nNested processing result: {nested_result}") + + print("\n=== Observability Summary ===") + print("This example demonstrated:") + print("- observability.trace_id: Unique trace across all workflows") + print("- observability.workflow_chain: Full workflow execution path") + print("- relationship.hierarchy_path: Parent-child workflow relationships") + print("- observability.workflow_depth: Nesting level") + print("- Custom attributes and timing information") + print("\nIn a real NAT implementation, this data would be automatically") + print("captured and sent to observability platforms like Phoenix, Weave, or Langfuse.") + + +async def process_dataset_chain(dataset: Dict[str, Any], dataset_index: int, + parent_context: ObservabilityContext) -> Dict[str, Any]: + """Process a single dataset through the complete workflow chain.""" + + # Create a child context for this dataset processing + dataset_context = parent_context.create_child_context(f"dataset_processor_{dataset_index}") + dataset_context.add_attribute("dataset_id", dataset.get("id", f"dataset_{dataset_index}")) + + print(f"\nProcessing dataset {dataset_index + 1}: {dataset.get('id', f'dataset_{dataset_index}')}") + + # Validation + validated = await simulate_workflow_execution(f"validation_{dataset_index}", dataset, dataset_context) + + # Transformation + transformed = await simulate_workflow_execution(f"transformation_{dataset_index}", validated, dataset_context) + + # Storage + stored = await simulate_workflow_execution(f"storage_{dataset_index}", transformed, dataset_context) + + return stored + + +async def process_complex_nested_workflow(data: Dict[str, Any], parent_context: ObservabilityContext) -> Dict[str, Any]: + """Process complex nested workflow with multiple levels of nesting.""" + + # Create context for batch processing + batch_context = parent_context.create_child_context("batch_processor") + batch_context.add_attribute("total_batches", len(data.get("batches", []))) + + processed_batches = [] + + print(f"\nProcessing {len(data.get('batches', []))} batches in nested workflow:") + + # Process each batch + for i, batch in enumerate(data.get("batches", [])): + # Create context for individual batch + individual_batch_context = batch_context.create_child_context(f"batch_{i}") + individual_batch_context.add_attribute("batch_id", batch.get("batch_id")) + + print(f"\nBatch {i + 1}: {batch.get('batch_id')}") + + # Process batch through sub-workflows + batch_result = await process_dataset_chain(batch, i, individual_batch_context) + + processed_batches.append(batch_result) + + # Generate batch summary report + print("\nGenerating batch summary report:") + batch_summary = await simulate_workflow_execution( + "batch_summary_report", { + "batches": processed_batches, "total_count": len(processed_batches) + }, batch_context) + + return batch_summary + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/observability/cross_workflow_tracking/integrated_example.py b/examples/observability/cross_workflow_tracking/integrated_example.py new file mode 100644 index 0000000000..c9004a5cc8 --- /dev/null +++ b/examples/observability/cross_workflow_tracking/integrated_example.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Integrated Cross-Workflow Observability Example + +This example demonstrates how to integrate cross-workflow observability +with actual NAT workflows, showing real workflow execution with observability +context propagation across multiple workflow stages. +""" + +import asyncio +import logging +import os +import tempfile +from typing import Dict, Any + +from nat.runtime.loader import load_workflow +from nat.observability.context import ObservabilityContext +from nat.builder.context import Context, ContextState +from nat.observability.processor.cross_workflow_processor import CrossWorkflowProcessor, WorkflowRelationshipProcessor +from nat.data_models.span import Span + +logger = logging.getLogger(__name__) + + +async def run_workflow_with_observability( + config_file: str, + input_data: Any, + workflow_name: str, + parent_context: ObservabilityContext | None = None +) -> Any: + """ + Run a NAT workflow with observability context propagation. + + Args: + config_file: Path to the NAT workflow configuration + input_data: Input data for the workflow + workflow_name: Name for observability tracking + parent_context: Optional parent observability context + + Returns: + Result from the workflow execution + """ + # Create or propagate observability context + if parent_context: + obs_context = parent_context.create_child_context(workflow_name) + else: + obs_context = ObservabilityContext.create_root_context(workflow_name) + + # Set timing information + import time + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.start_time = time.time() + current_workflow.status = "running" + + print(f"🔄 Executing NAT workflow: {workflow_name}") + print(f" - Config: {os.path.basename(config_file)}") + print(f" - Trace ID: {obs_context.trace_id}") + print(f" - Workflow depth: {obs_context.get_workflow_depth()}") + print(f" - Input: {input_data}") + + try: + # Load and run the actual NAT workflow + async with load_workflow(config_file) as workflow: + # Set observability context in the workflow's context state + context_state = workflow._context_state + context_state.observability_context.set(obs_context) + + # Run the workflow without the observability_context parameter + async with workflow.run(input_data) as runner: + result = await runner.result(to_type=str) + + # Update completion status + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "completed" + if current_workflow.start_time: + duration = current_workflow.end_time - current_workflow.start_time + print(f" - Duration: {duration:.3f}s") + print(f" - Result: {result}") + + return result + + except Exception as e: + # Update error status + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "failed" + current_workflow.tags["error"] = str(e) + print(f" - Error: {e}") + raise + + +async def demonstrate_observability_processors(): + """Demonstrate the cross-workflow processors with real span data.""" + + print("\n" + "=" * 60) + print("Cross-Workflow Span Processing Demonstration") + print("=" * 60) + + # Create a mock observability context with nested workflows + root_context = ObservabilityContext.create_root_context("document_processing_pipeline") + root_context.add_attribute("document_type", "research_paper") + root_context.add_attribute("processing_version", "2.1") + + # Create child contexts to simulate nested workflows + analysis_context = root_context.create_child_context("content_analysis") + analysis_context.add_attribute("analysis_type", "semantic") + + summary_context = analysis_context.create_child_context("summarization") + summary_context.add_attribute("summary_length", "medium") + + # Set the context in the global context state + context_state = ContextState.get() + context_state.observability_context.set(summary_context) + + # Create and process spans with the processors + cross_workflow_processor = CrossWorkflowProcessor() + relationship_processor = WorkflowRelationshipProcessor() + + # Create a test span + test_span = Span( + name="summarization_operation", + attributes={ + "operation_type": "text_summarization", + "model_name": "claude-3", + "input_length": 5000 + } + ) + + print("\nOriginal span attributes:") + for key, value in test_span.attributes.items(): + print(f" - {key}: {value}") + + # Process with cross-workflow processor + processed_span = await cross_workflow_processor.process(test_span) + + # Process with relationship processor + final_span = await relationship_processor.process(processed_span) + + print("\nEnhanced span attributes after cross-workflow processing:") + for key, value in sorted(final_span.attributes.items()): + if key.startswith(("observability.", "relationship.")): + print(f" - {key}: {value}") + + print("\nKey observability data added:") + print("✅ Trace ID for cross-workflow correlation") + print("✅ Workflow hierarchy and depth information") + print("✅ Parent-child relationship tracking") + print("✅ Custom attributes from workflow context") + print("✅ Workflow chain serialization") + + +async def create_sample_workflow_configs(): + """Create sample workflow configurations for the demonstration.""" + + # Create temporary directory for configs + temp_dir = tempfile.mkdtemp() + + # Data validation workflow config + validation_config = f""" +llms: + mock_llm: + _type: mock + +functions: + data_validator: + _type: mock_data_validator + +workflow: + _type: data_validator + llm_name: mock_llm +""" + + validation_config_path = os.path.join(temp_dir, "validation_config.yml") + with open(validation_config_path, "w") as f: + f.write(validation_config.strip()) + + # Data processing workflow config + processing_config = f""" +llms: + mock_llm: + _type: mock + +functions: + data_processor: + _type: mock_data_processor + +workflow: + _type: data_processor + llm_name: mock_llm +""" + + processing_config_path = os.path.join(temp_dir, "processing_config.yml") + with open(processing_config_path, "w") as f: + f.write(processing_config.strip()) + + # Analysis workflow config + analysis_config = f""" +llms: + mock_llm: + _type: mock + +functions: + data_analyzer: + _type: mock_data_analyzer + +workflow: + _type: data_analyzer + llm_name: mock_llm +""" + + analysis_config_path = os.path.join(temp_dir, "analysis_config.yml") + with open(analysis_config_path, "w") as f: + f.write(analysis_config.strip()) + + return validation_config_path, processing_config_path, analysis_config_path + + +async def main(): + """Main function demonstrating integrated cross-workflow observability.""" + + print("=" * 70) + print("Integrated Cross-Workflow Observability with NAT Workflows") + print("=" * 70) + + # Note: For this demonstration, we'll use the existing router agent example + # In a real scenario, you would have multiple different workflow configs + + current_dir = os.path.dirname(os.path.abspath(__file__)) + router_config = os.path.join(current_dir, "../../control_flow/router_agent/configs/config.yml") + + # Check if the router config exists + if not os.path.exists(router_config): + print(f"Router agent config not found at: {router_config}") + print("Please ensure the control_flow/router_agent example is available.") + print("\nFor demonstration purposes, we'll show the observability processors:") + await demonstrate_observability_processors() + return + + # Example 1: Sequential Multi-Workflow Processing + print("\n=== Example 1: Sequential Multi-Workflow Processing ===") + + # Create root observability context for the entire pipeline + pipeline_context = ObservabilityContext.create_root_context("intelligent_assistant_pipeline") + pipeline_context.add_attribute("session_id", "demo_session_001") + pipeline_context.add_attribute("user_type", "developer") + pipeline_context.add_attribute("pipeline_version", "1.0") + + # Sample inputs for different workflow stages + user_queries = [ + "What yellow fruit would you recommend?", + "I want a red fruit, what do you suggest?", + "Can you recommend a good book to read?" + ] + + results = [] + + for i, query in enumerate(user_queries): + print(f"\nProcessing query {i+1}: {query}") + + try: + # Stage 1: Query Understanding (using router agent as a proxy) + understanding_result = await run_workflow_with_observability( + config_file=router_config, + input_data=query, + workflow_name=f"query_understanding_{i+1}", + parent_context=pipeline_context + ) + + # Stage 2: Response Enhancement (simulated with another call) + enhancement_context = pipeline_context.create_child_context(f"query_understanding_{i+1}") + enhanced_result = await run_workflow_with_observability( + config_file=router_config, + input_data=f"Please elaborate on: {understanding_result}", + workflow_name=f"response_enhancement_{i+1}", + parent_context=enhancement_context + ) + + results.append({ + "query": query, + "understanding": understanding_result, + "enhanced": enhanced_result + }) + + except Exception as e: + print(f"Error processing query {i+1}: {e}") + results.append({ + "query": query, + "error": str(e) + }) + + # Example 2: Parallel Workflow Processing + print("\n=== Example 2: Parallel Workflow Processing ===") + + # Create a new context for parallel processing + parallel_context = ObservabilityContext.create_root_context("parallel_query_processing") + parallel_context.add_attribute("processing_mode", "parallel") + parallel_context.add_attribute("batch_size", len(user_queries)) + + # Process multiple queries in parallel with observability tracking + print("Processing multiple queries in parallel:") + + async def process_query_with_context(query: str, index: int) -> Dict[str, Any]: + """Process a single query with observability context.""" + try: + result = await run_workflow_with_observability( + config_file=router_config, + input_data=query, + workflow_name=f"parallel_query_{index+1}", + parent_context=parallel_context + ) + return {"query": query, "result": result, "index": index} + except Exception as e: + return {"query": query, "error": str(e), "index": index} + + # Execute parallel tasks + parallel_tasks = [ + process_query_with_context(query, i) + for i, query in enumerate(user_queries) + ] + + parallel_results = await asyncio.gather(*parallel_tasks) + + print(f"\nParallel processing completed. Processed {len(parallel_results)} queries.") + + # Example 3: Demonstrate Observability Context Serialization + print("\n=== Example 3: Context Serialization for Distributed Processing ===") + + # Serialize the context for potential distributed processing + context_data = parallel_context.to_dict() + print(f"Serialized context keys: {list(context_data.keys())}") + print(f"Workflow chain length: {len(context_data['workflow_chain'])}") + print(f"Custom attributes: {context_data['custom_attributes']}") + + # Demonstrate restoration + restored_context = ObservabilityContext.from_dict(context_data) + print(f"Restored context trace ID: {restored_context.trace_id}") + print(f"Restored workflow depth: {restored_context.get_workflow_depth()}") + + # Demonstrate processor functionality + await demonstrate_observability_processors() + + # Summary + print("\n" + "=" * 70) + print("Integration Summary") + print("=" * 70) + print("✅ Cross-workflow observability successfully integrated with NAT workflows") + print("✅ Trace continuity maintained across multiple workflow executions") + print("✅ Context propagation working in sequential and parallel scenarios") + print("✅ Span enhancement with cross-workflow metadata demonstrated") + print("✅ Context serialization for distributed systems verified") + print("\nIn a production environment, this observability data would be:") + print("- Automatically captured during all workflow executions") + print("- Sent to observability platforms (Phoenix, Weave, Langfuse, etc.)") + print("- Used for performance monitoring, debugging, and compliance tracking") + print("- Correlated across microservices and distributed workflow components") + + +if __name__ == "__main__": + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Run the integrated example + asyncio.run(main()) \ No newline at end of file diff --git a/examples/observability/cross_workflow_tracking/observability_config.yml b/examples/observability/cross_workflow_tracking/observability_config.yml new file mode 100644 index 0000000000..145618db98 --- /dev/null +++ b/examples/observability/cross_workflow_tracking/observability_config.yml @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configuration demonstrating cross-workflow observability integration + +general: + telemetry: + tracing: + main_exporter: + _type: file + output_path: "cross_workflow_traces.jsonl" + project: "cross_workflow_demo" + mode: append + enable_rolling: false + # processors: + # - _type: cross_workflow_processor + # - _type: workflow_relationship_processor + +llms: + nim_llm: + _type: nim + model_name: meta/llama-3.1-8b-instruct + temperature: 0.7 + max_tokens: 1024 + +workflow: + _type: chat_completion + llm_name: nim_llm + system_prompt: "You are a helpful assistant that provides concise, friendly responses." \ No newline at end of file diff --git a/examples/observability/cross_workflow_tracking/workflow_integration_example.py b/examples/observability/cross_workflow_tracking/workflow_integration_example.py new file mode 100644 index 0000000000..4dcd574a98 --- /dev/null +++ b/examples/observability/cross_workflow_tracking/workflow_integration_example.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +NAT Workflow Integration with Cross-Workflow Observability + +This example shows how to integrate cross-workflow observability with +real NAT workflows using the enhanced workflow.run() method. +""" + +import asyncio +import logging +import os + +from nat.observability.context import ObservabilityContext +from nat.runtime.loader import load_workflow + +logger = logging.getLogger(__name__) + + +async def enhanced_workflow_execution(): + """Demonstrate enhanced workflow execution with observability.""" + + print("=" * 60) + print("NAT Workflow + Cross-Workflow Observability Integration") + print("=" * 60) + + # Create root observability context + root_context = ObservabilityContext.create_root_context("multi_stage_assistant") + root_context.add_attribute("session_id", "demo_001") + root_context.add_attribute("user", "developer") + + # Path to the router agent config (if available) + current_dir = os.path.dirname(os.path.abspath(__file__)) + router_config = os.path.join(current_dir, "../../control_flow/router_agent/configs/config.yml") + + if not os.path.exists(router_config): + print("Router agent config not found. Creating demo workflow...") + await demo_with_mock_workflow(root_context) + return + + print(f"Using router agent config: {router_config}") + + # Example queries to process + queries = [ + "What yellow fruit would you recommend?", + "Can you suggest a good city to visit?", + "What's a good book to read?" + ] + + try: + # Load the workflow once + async with load_workflow(router_config) as workflow: + + for i, query in enumerate(queries): + print(f"\n--- Query {i+1}: {query} ---") + + # Create child context for this query + query_context = root_context.create_child_context(f"query_processing_{i+1}") + query_context.add_attribute("query_type", "user_request") + query_context.add_attribute("query_index", i + 1) + + # Set observability context in the workflow's context state + workflow._context_state.observability_context.set(query_context) + + # Execute workflow + async with workflow.run(query) as runner: + result = await runner.result(to_type=str) + + print(f"Result: {result}") + + # Demonstrate context information + current_workflow = query_context.get_current_workflow() + if current_workflow: + print(f"Workflow depth: {query_context.get_workflow_depth()}") + print(f"Trace ID: {query_context.trace_id}") + + except Exception as e: + print(f"Error in workflow execution: {e}") + await demo_with_mock_workflow(root_context) + + +async def demo_with_mock_workflow(root_context: ObservabilityContext): + """Demo with mock workflow when router agent is not available.""" + + print("\nDemo with mock workflow execution:") + + # Simulate multiple workflow stages + stages = [("input_validation", "Validating user input"), ("intent_classification", "Classifying user intent"), + ("response_generation", "Generating response"), ("quality_check", "Performing quality check")] + + result = {"input": "What's a good programming language to learn?"} + + for stage_name, description in stages: + print(f"\n🔄 {description}") + + # Create child context for this stage + stage_context = root_context.create_child_context(stage_name) + stage_context.add_attribute("stage_type", "processing") + stage_context.add_attribute("description", description) + + # Simulate processing + await asyncio.sleep(0.2) + + # Update result + result[stage_name] = f"Completed {stage_name}" + + print(f" - Stage: {stage_name}") + print(f" - Trace ID: {stage_context.trace_id}") + print(f" - Depth: {stage_context.get_workflow_depth()}") + print(f" - Parent: {stage_context.workflow_chain[-1].parent_workflow_id}") + + print(f"\nFinal result: {result}") + + +async def demonstrate_workflow_with_steps(): + """Demonstrate workflow execution with intermediate steps and observability.""" + + print("\n" + "=" * 60) + print("Workflow with Steps + Observability") + print("=" * 60) + + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(current_dir, "observability_config.yml") + + # Create observability context + steps_context = ObservabilityContext.create_root_context("workflow_with_steps") + steps_context.add_attribute("execution_mode", "with_steps") + steps_context.add_attribute("tracking_enabled", True) + + try: + # Load workflow with observability config + async with load_workflow(config_path) as workflow: + # Set observability context in the workflow's context state + workflow._context_state.observability_context.set(steps_context) + + # Execute workflow (SessionManager doesn't have result_with_steps method) + async with workflow.run("What programming language should I learn?") as runner: + result = await runner.result(to_type=str) + steps = [] # SessionManager doesn't provide intermediate steps + + print(f"Result: {result}") + print(f"Number of intermediate steps: {len(steps) if steps else 0}") + + # Show observability context info + print(f"Trace ID: {steps_context.trace_id}") + print(f"Workflow chain: {[w.workflow_name for w in steps_context.workflow_chain]}") + + except FileNotFoundError: + print(f"Config file not found: {config_path}") + print("Using mock demonstration...") + + # Mock the workflow execution with steps + await demo_with_mock_workflow(steps_context) + + +async def main(): + """Main function demonstrating the integration.""" + + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Example 1: Enhanced workflow execution + await enhanced_workflow_execution() + + # Example 2: Workflow with steps + await demonstrate_workflow_with_steps() + + print("\n" + "=" * 60) + print("Integration Complete!") + print("=" * 60) + print("\nKey Features Demonstrated:") + print("✅ Observability context integration with workflow execution") + print("✅ Observability context propagation through workflow execution") + print("✅ Automatic context propagation through workflow execution") + print("✅ Trace ID continuity across workflow stages") + print("✅ Custom attributes and metadata tracking") + print("✅ Workflow hierarchy and depth tracking") + + print("\nNext Steps:") + print("1. Add cross-workflow processors to your telemetry configuration") + print("2. Configure observability exporters (Phoenix, Weave, Langfuse)") + print("3. Use ObservabilityWorkflowInvoker for workflow-to-workflow calls") + print("4. Monitor traces in your observability platform") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/nat/builder/context.py b/src/nat/builder/context.py index 072d86f11b..ba750e843e 100644 --- a/src/nat/builder/context.py +++ b/src/nat/builder/context.py @@ -33,6 +33,7 @@ from nat.data_models.intermediate_step import StreamEventData from nat.data_models.intermediate_step import TraceMetadata from nat.data_models.invocation_node import InvocationNode +from nat.observability.context import ObservabilityContext from nat.runtime.user_metadata import RequestAttributes from nat.utils.reactive.subject import Subject @@ -73,6 +74,8 @@ def __init__(self): self._event_stream: ContextVar[Subject[IntermediateStep] | None] = ContextVar("event_stream", default=None) self._active_function: ContextVar[InvocationNode | None] = ContextVar("active_function", default=None) self._active_span_id_stack: ContextVar[list[str] | None] = ContextVar("active_span_id_stack", default=None) + self._observability_context: ContextVar[ObservabilityContext | None] = ContextVar("observability_context", + default=None) # Default is a lambda no-op which returns NoneType self.user_input_callback: ContextVar[Callable[[InteractionPrompt], Awaitable[HumanResponse | None]] @@ -107,6 +110,10 @@ def active_span_id_stack(self) -> ContextVar[list[str]]: self._active_span_id_stack.set(["root"]) return typing.cast(ContextVar[list[str]], self._active_span_id_stack) + @property + def observability_context(self) -> ContextVar[ObservabilityContext | None]: + return self._observability_context + @staticmethod def get() -> "ContextState": return ContextState() @@ -285,6 +292,29 @@ def user_auth_callback(self) -> Callable[[AuthProviderBaseConfig, AuthFlowType], raise RuntimeError("User authentication callback is not set in the context.") return callback + @property + def observability_context(self) -> ObservabilityContext | None: + """ + Retrieves the current observability context from the context state. + + This property provides access to the observability context which contains + trace IDs, span hierarchies, and workflow metadata for cross-workflow + observability tracking. + + Returns: + ObservabilityContext | None: The current observability context or None if not set. + """ + return self._context_state.observability_context.get() + + def set_observability_context(self, context: ObservabilityContext | None) -> None: + """ + Sets the observability context in the context state. + + Args: + context: The observability context to set, or None to clear. + """ + self._context_state.observability_context.set(context) + @staticmethod def get() -> "Context": """ diff --git a/src/nat/builder/workflow.py b/src/nat/builder/workflow.py index 63b10cc289..b0f1c64fb7 100644 --- a/src/nat/builder/workflow.py +++ b/src/nat/builder/workflow.py @@ -31,6 +31,7 @@ from nat.experimental.test_time_compute.models.strategy_base import StrategyBase from nat.memory.interfaces import MemoryEditor from nat.object_store.interfaces import ObjectStore +from nat.observability.context import ObservabilityContext from nat.observability.exporter.base_exporter import BaseExporter from nat.observability.exporter_manager import ExporterManager from nat.runtime.runner import Runner @@ -94,23 +95,31 @@ def exporter_manager(self) -> ExporterManager: return self._exporter_manager.get() @asynccontextmanager - async def run(self, message: InputT): + async def run(self, message: InputT, observability_context: ObservabilityContext | None = None): """ Called each time we start a new workflow run. We'll create a new top-level workflow span here. + + Args: + message: The input message for the workflow + observability_context: Optional observability context for cross-workflow tracking """ async with Runner(input_message=message, entry_fn=self._entry_fn, context_state=self._context_state, - exporter_manager=self.exporter_manager) as runner: + exporter_manager=self.exporter_manager, + observability_context=observability_context) as runner: # The caller can `yield runner` so they can do `runner.result()` or `runner.result_stream()` yield runner - async def result_with_steps(self, message: InputT, to_type: type | None = None): + async def result_with_steps(self, + message: InputT, + to_type: type | None = None, + observability_context: ObservabilityContext | None = None): - async with self.run(message) as runner: + async with self.run(message, observability_context=observability_context) as runner: from nat.eval.runtime_event_subscriber import pull_intermediate diff --git a/src/nat/observability/__init__.py b/src/nat/observability/__init__.py index a1744724ec..3b60d2e446 100644 --- a/src/nat/observability/__init__.py +++ b/src/nat/observability/__init__.py @@ -12,3 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from nat.observability.context import ObservabilityContext +from nat.observability.context import ObservabilityContextManager +from nat.observability.processor.cross_workflow_processor import CrossWorkflowProcessor +from nat.observability.processor.cross_workflow_processor import WorkflowRelationshipProcessor +from nat.observability.workflow_utils import ObservabilityWorkflowInvoker + +__all__ = [ + "ObservabilityContext", + "ObservabilityContextManager", + "ObservabilityWorkflowInvoker", + "CrossWorkflowProcessor", + "WorkflowRelationshipProcessor" +] diff --git a/src/nat/observability/context.py b/src/nat/observability/context.py new file mode 100644 index 0000000000..144eecf4a5 --- /dev/null +++ b/src/nat/observability/context.py @@ -0,0 +1,210 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from contextvars import ContextVar +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + + +@dataclass +class WorkflowMetadata: + """Metadata for a workflow execution.""" + workflow_name: str + workflow_id: str + start_time: Optional[float] = None + end_time: Optional[float] = None + status: str = "running" + parent_workflow_id: Optional[str] = None + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ObservabilityContext: + """ + Context for propagating observability information across workflow executions. + + This class manages trace IDs, span hierarchies, and workflow metadata to enable + cross-workflow observability tracking. + """ + + trace_id: str + root_span_id: str + current_span_id: str + workflow_chain: List[WorkflowMetadata] = field(default_factory=list) + custom_attributes: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def create_root_context(cls, + workflow_name: str = "root", + trace_id: Optional[str] = None, + root_span_id: Optional[str] = None) -> "ObservabilityContext": + """Create a new root observability context.""" + trace_id = trace_id or str(uuid.uuid4()) + root_span_id = root_span_id or str(uuid.uuid4()) + + root_workflow = WorkflowMetadata(workflow_name=workflow_name, workflow_id=root_span_id) + + return cls(trace_id=trace_id, + root_span_id=root_span_id, + current_span_id=root_span_id, + workflow_chain=[root_workflow]) + + def create_child_context(self, workflow_name: str, workflow_id: Optional[str] = None) -> "ObservabilityContext": + """Create a child observability context for a sub-workflow.""" + workflow_id = workflow_id or str(uuid.uuid4()) + + child_workflow = WorkflowMetadata(workflow_name=workflow_name, + workflow_id=workflow_id, + parent_workflow_id=self.current_span_id) + + # Create new context with extended workflow chain + new_chain = self.workflow_chain + [child_workflow] + + return ObservabilityContext(trace_id=self.trace_id, + root_span_id=self.root_span_id, + current_span_id=workflow_id, + workflow_chain=new_chain, + custom_attributes=self.custom_attributes.copy()) + + def create_span_context(self, span_id: str) -> "ObservabilityContext": + """Create a new context with a different current span ID.""" + return ObservabilityContext(trace_id=self.trace_id, + root_span_id=self.root_span_id, + current_span_id=span_id, + workflow_chain=self.workflow_chain.copy(), + custom_attributes=self.custom_attributes.copy()) + + def add_attribute(self, key: str, value: Any) -> None: + """Add a custom attribute to the observability context.""" + self.custom_attributes[key] = value + + def get_current_workflow(self) -> Optional[WorkflowMetadata]: + """Get the currently executing workflow metadata.""" + return self.workflow_chain[-1] if self.workflow_chain else None + + def get_workflow_depth(self) -> int: + """Get the depth of workflow nesting.""" + return len(self.workflow_chain) + + def to_dict(self) -> Dict[str, Any]: + """Convert the observability context to a dictionary for serialization.""" + return { + "trace_id": self.trace_id, + "root_span_id": self.root_span_id, + "current_span_id": self.current_span_id, + "workflow_chain": [{ + "workflow_name": w.workflow_name, + "workflow_id": w.workflow_id, + "start_time": w.start_time, + "end_time": w.end_time, + "status": w.status, + "parent_workflow_id": w.parent_workflow_id, + "tags": w.tags + } for w in self.workflow_chain], + "custom_attributes": self.custom_attributes + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ObservabilityContext": + """Create an observability context from a dictionary.""" + workflow_chain = [ + WorkflowMetadata(workflow_name=w["workflow_name"], + workflow_id=w["workflow_id"], + start_time=w.get("start_time"), + end_time=w.get("end_time"), + status=w.get("status", "running"), + parent_workflow_id=w.get("parent_workflow_id"), + tags=w.get("tags", {})) for w in data.get("workflow_chain", []) + ] + + return cls(trace_id=data["trace_id"], + root_span_id=data["root_span_id"], + current_span_id=data["current_span_id"], + workflow_chain=workflow_chain, + custom_attributes=data.get("custom_attributes", {})) + + +class ObservabilityContextManager: + """ + Manager for observability context state using ContextVars. + + This class integrates with NAT's existing ContextState system to provide + thread-safe observability context propagation. + """ + + _observability_context: ContextVar[Optional[ObservabilityContext]] = ContextVar("observability_context", + default=None) + + @classmethod + def get_current_context(cls) -> Optional[ObservabilityContext]: + """Get the current observability context.""" + return cls._observability_context.get() + + @classmethod + def set_context(cls, context: ObservabilityContext) -> None: + """Set the current observability context.""" + cls._observability_context.set(context) + + @classmethod + def create_root_context(cls, workflow_name: str = "root", trace_id: Optional[str] = None) -> ObservabilityContext: + """Create and set a new root observability context.""" + context = ObservabilityContext.create_root_context(workflow_name=workflow_name, trace_id=trace_id) + cls.set_context(context) + return context + + @classmethod + def create_child_context(cls, workflow_name: str) -> Optional[ObservabilityContext]: + """Create a child context from the current context.""" + current = cls.get_current_context() + if current is None: + return None + + child_context = current.create_child_context(workflow_name) + cls.set_context(child_context) + return child_context + + @classmethod + def propagate_context(cls, context: ObservabilityContext) -> None: + """Propagate an existing observability context.""" + cls.set_context(context) + + @classmethod + def clear_context(cls) -> None: + """Clear the current observability context.""" + cls._observability_context.set(None) + + @classmethod + def get_trace_id(cls) -> Optional[str]: + """Get the current trace ID.""" + context = cls.get_current_context() + return context.trace_id if context else None + + @classmethod + def get_current_span_id(cls) -> Optional[str]: + """Get the current span ID.""" + context = cls.get_current_context() + return context.current_span_id if context else None + + @classmethod + def add_workflow_attribute(cls, key: str, value: Any) -> None: + """Add an attribute to the current observability context.""" + context = cls.get_current_context() + if context: + context.add_attribute(key, value) diff --git a/src/nat/observability/processor/cross_workflow_processor.py b/src/nat/observability/processor/cross_workflow_processor.py new file mode 100644 index 0000000000..5a7f7c13bf --- /dev/null +++ b/src/nat/observability/processor/cross_workflow_processor.py @@ -0,0 +1,165 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nat.data_models.span import Span +from nat.observability.processor.processor import Processor +from nat.utils.type_utils import override + + +class CrossWorkflowProcessor(Processor[Span, Span]): + """ + Processor that enriches spans with cross-workflow observability information. + + This processor adds information about workflow chains, trace relationships, + and cross-workflow attributes to spans to enable better observability + across multiple workflow executions. + """ + + @override + async def process(self, item: Span) -> Span: + """ + Process a span by adding cross-workflow observability information. + + Args: + item: The original span + + Returns: + Enhanced span with cross-workflow information + """ + try: + # Get current observability context + # Import here to avoid circular import + from nat.builder.context import Context + context = Context.get() + obs_context = context.observability_context + + if obs_context: + # Add cross-workflow attributes to the span + # Add trace information + item.set_attribute("observability.trace_id", obs_context.trace_id) + item.set_attribute("observability.root_span_id", obs_context.root_span_id) + item.set_attribute("observability.current_span_id", obs_context.current_span_id) + + # Add workflow chain information + workflow_chain = obs_context.workflow_chain + if workflow_chain: + item.set_attribute("observability.workflow_depth", len(workflow_chain)) + + # Add current workflow information + current_workflow = workflow_chain[-1] + item.set_attribute("observability.workflow_name", current_workflow.workflow_name) + item.set_attribute("observability.workflow_id", current_workflow.workflow_id) + item.set_attribute("observability.workflow_status", current_workflow.status) + + if current_workflow.parent_workflow_id: + item.set_attribute("observability.parent_workflow_id", current_workflow.parent_workflow_id) + + # Add timing information if available + if current_workflow.start_time: + item.set_attribute("observability.workflow_start_time", current_workflow.start_time) + if current_workflow.end_time: + item.set_attribute("observability.workflow_end_time", current_workflow.end_time) + if current_workflow.start_time: + item.set_attribute("observability.workflow_duration", + current_workflow.end_time - current_workflow.start_time) + + # Add workflow tags + for key, value in current_workflow.tags.items(): + item.set_attribute(f"observability.workflow_tag.{key}", value) + + # Add workflow chain as a serialized attribute + workflow_chain_info = [] + for i, workflow in enumerate(workflow_chain): + workflow_info = { + "position": i, + "name": workflow.workflow_name, + "id": workflow.workflow_id, + "status": workflow.status + } + if workflow.parent_workflow_id: + workflow_info["parent_id"] = workflow.parent_workflow_id + workflow_chain_info.append(workflow_info) + + item.set_attribute("observability.workflow_chain", str(workflow_chain_info)) + + # Add custom attributes from observability context + for key, value in obs_context.custom_attributes.items(): + item.set_attribute(f"observability.custom.{key}", value) + + except Exception as e: + # If there's any error in processing, log it but don't fail the span + item.set_attribute("observability.processing_error", str(e)) + + return item + + +class WorkflowRelationshipProcessor(Processor[Span, Span]): + """ + Processor that adds relationship information between workflows. + + This processor analyzes the observability context to add explicit + parent-child relationships and workflow hierarchy information. + """ + + @override + async def process(self, item: Span) -> Span: + """ + Process a span by adding workflow relationship information. + + Args: + item: The original span + + Returns: + Enhanced span with relationship information + """ + try: + # Get current observability context + # Import here to avoid circular import + from nat.builder.context import Context + context = Context.get() + obs_context = context.observability_context + + if obs_context and obs_context.workflow_chain: + workflow_chain = obs_context.workflow_chain + + # Add relationship information + if len(workflow_chain) > 1: + # This is a child workflow + parent_workflow = workflow_chain[-2] + current_workflow = workflow_chain[-1] + + item.set_attribute("relationship.type", "child_workflow") + item.set_attribute("relationship.parent_workflow_name", parent_workflow.workflow_name) + item.set_attribute("relationship.parent_workflow_id", parent_workflow.workflow_id) + item.set_attribute("relationship.child_workflow_name", current_workflow.workflow_name) + item.set_attribute("relationship.child_workflow_id", current_workflow.workflow_id) + + # Add hierarchy path + hierarchy_path = " -> ".join([w.workflow_name for w in workflow_chain]) + item.set_attribute("relationship.hierarchy_path", hierarchy_path) + + # Add depth information + item.set_attribute("relationship.nesting_level", len(workflow_chain) - 1) + + else: + # This is a root workflow + item.set_attribute("relationship.type", "root_workflow") + item.set_attribute("relationship.nesting_level", 0) + + except Exception as e: + # If there's any error in processing, log it but don't fail the span + item.set_attribute("relationship.processing_error", str(e)) + + return item diff --git a/src/nat/observability/workflow_utils.py b/src/nat/observability/workflow_utils.py new file mode 100644 index 0000000000..327c511d7f --- /dev/null +++ b/src/nat/observability/workflow_utils.py @@ -0,0 +1,281 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from typing import TYPE_CHECKING +from typing import Any +from typing import AsyncGenerator +from typing import Optional +from typing import TypeVar + +from nat.observability.context import ObservabilityContext + +if TYPE_CHECKING: + from nat.builder.workflow import Workflow + +InputT = TypeVar("InputT") +SingleOutputT = TypeVar("SingleOutputT") +StreamingOutputT = TypeVar("StreamingOutputT") + + +class ObservabilityWorkflowInvoker: + """ + Utility class for invoking workflows with observability context propagation. + + This class provides helper methods to invoke workflows while properly + propagating and managing observability context across workflow boundaries. + """ + + @staticmethod + async def invoke_workflow_with_context(workflow: "Workflow[InputT, StreamingOutputT, SingleOutputT]", + message: InputT, + workflow_name: str, + parent_context: Optional[ObservabilityContext] = None, + to_type: Optional[type] = None) -> SingleOutputT: + """ + Invoke a workflow with observability context propagation. + + Args: + workflow: The workflow to invoke + message: The input message for the workflow + workflow_name: The name of the workflow for observability tracking + parent_context: Optional parent observability context + to_type: Optional type to convert the result to + + Returns: + The result of the workflow execution + """ + # Create or propagate observability context + obs_context = None + if parent_context: + obs_context = parent_context.create_child_context(workflow_name) + else: + # Check if there's a context in the current execution + try: + # Import here to avoid circular import + from nat.builder.context import Context + current_context = Context.get() + existing_obs_context = current_context.observability_context + if existing_obs_context: + obs_context = existing_obs_context.create_child_context(workflow_name) + else: + obs_context = ObservabilityContext.create_root_context(workflow_name) + except Exception: + obs_context = ObservabilityContext.create_root_context(workflow_name) + + # Update workflow metadata with timing + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.start_time = time.time() + + try: + async with workflow.run(message, observability_context=obs_context) as runner: + result = await runner.result(to_type=to_type) + + # Update workflow metadata on completion + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "completed" + + return result + + except Exception as e: + # Update workflow metadata on failure + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "failed" + current_workflow.tags["error"] = str(e) + raise + + @staticmethod + async def invoke_workflow_stream_with_context( + workflow: "Workflow[InputT, StreamingOutputT, SingleOutputT]", + message: InputT, + workflow_name: str, + parent_context: Optional[ObservabilityContext] = None, + to_type: Optional[type] = None) -> AsyncGenerator[StreamingOutputT, None]: + """ + Invoke a workflow with streaming output and observability context propagation. + + Args: + workflow: The workflow to invoke + message: The input message for the workflow + workflow_name: The name of the workflow for observability tracking + parent_context: Optional parent observability context + to_type: Optional type to convert the result to + + Yields: + The streaming output from the workflow + """ + # Create or propagate observability context + obs_context = None + if parent_context: + obs_context = parent_context.create_child_context(workflow_name) + else: + # Check if there's a context in the current execution + try: + # Import here to avoid circular import + from nat.builder.context import Context + current_context = Context.get() + existing_obs_context = current_context.observability_context + if existing_obs_context: + obs_context = existing_obs_context.create_child_context(workflow_name) + else: + obs_context = ObservabilityContext.create_root_context(workflow_name) + except Exception: + obs_context = ObservabilityContext.create_root_context(workflow_name) + + # Update workflow metadata with timing + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.start_time = time.time() + + try: + async with workflow.run(message, observability_context=obs_context) as runner: + async for item in runner.result_stream(to_type=to_type): + yield item + + # Update workflow metadata on completion + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "completed" + + except Exception as e: + # Update workflow metadata on failure + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "failed" + current_workflow.tags["error"] = str(e) + raise + + @staticmethod + def get_current_observability_context() -> Optional[ObservabilityContext]: + """ + Get the current observability context from the execution context. + + Returns: + The current observability context if available, None otherwise + """ + try: + # Import here to avoid circular import + from nat.builder.context import Context + current_context = Context.get() + return current_context.observability_context + except Exception: + return None + + @staticmethod + def create_observability_context(workflow_name: str, trace_id: Optional[str] = None) -> ObservabilityContext: + """ + Create a new observability context and set it in the current execution context. + + Args: + workflow_name: The name of the root workflow + trace_id: Optional trace ID to use + + Returns: + The created observability context + """ + context = ObservabilityContext.create_root_context(workflow_name, trace_id) + + # Set it in the current context if available + try: + # Import here to avoid circular import + from nat.builder.context import Context + current_context = Context.get() + current_context.set_observability_context(context) + except Exception: + pass # Context might not be available + + return context + + @staticmethod + async def invoke_with_steps_and_context(workflow: "Workflow[InputT, StreamingOutputT, SingleOutputT]", + message: InputT, + workflow_name: str, + parent_context: Optional[ObservabilityContext] = None, + to_type: Optional[type] = None) -> tuple[SingleOutputT, Any]: + """ + Invoke a workflow with intermediate steps and observability context propagation. + + Args: + workflow: The workflow to invoke + message: The input message for the workflow + workflow_name: The name of the workflow for observability tracking + parent_context: Optional parent observability context + to_type: Optional type to convert the result to + + Returns: + Tuple of (result, intermediate_steps) + """ + # Create or propagate observability context + obs_context = None + if parent_context: + obs_context = parent_context.create_child_context(workflow_name) + else: + # Check if there's a context in the current execution + try: + # Import here to avoid circular import + from nat.builder.context import Context + current_context = Context.get() + existing_obs_context = current_context.observability_context + if existing_obs_context: + obs_context = existing_obs_context.create_child_context(workflow_name) + else: + obs_context = ObservabilityContext.create_root_context(workflow_name) + except Exception: + obs_context = ObservabilityContext.create_root_context(workflow_name) + + # Update workflow metadata with timing + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.start_time = time.time() + + try: + result, steps = await workflow.result_with_steps( + message, + to_type=to_type, + observability_context=obs_context + ) + + # Update workflow metadata on completion + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "completed" + + return result, steps + + except Exception as e: + # Update workflow metadata on failure + if obs_context: + current_workflow = obs_context.get_current_workflow() + if current_workflow: + current_workflow.end_time = time.time() + current_workflow.status = "failed" + current_workflow.tags["error"] = str(e) + raise diff --git a/src/nat/runtime/runner.py b/src/nat/runtime/runner.py index ad93f984c3..863ba8446c 100644 --- a/src/nat/runtime/runner.py +++ b/src/nat/runtime/runner.py @@ -21,6 +21,7 @@ from nat.builder.context import ContextState from nat.builder.function import Function from nat.data_models.invocation_node import InvocationNode +from nat.observability.context import ObservabilityContext from nat.observability.exporter_manager import ExporterManager from nat.utils.reactive.subject import Subject @@ -48,7 +49,8 @@ def __init__(self, input_message: typing.Any, entry_fn: Function, context_state: ContextState, - exporter_manager: ExporterManager): + exporter_manager: ExporterManager, + observability_context: ObservabilityContext | None = None): """ The Runner class is used to run a workflow. It handles converting input and output data types and running the workflow with the specified concurrency. @@ -63,6 +65,8 @@ def __init__(self, The context state to use exporter_manager : ExporterManager The exporter manager to use + observability_context : ObservabilityContext | None + Optional observability context for cross-workflow tracking """ if (entry_fn is None): @@ -80,6 +84,7 @@ def __init__(self, self._input_message = input_message self._exporter_manager = exporter_manager + self._observability_context = observability_context @property def context(self) -> Context: @@ -100,6 +105,10 @@ async def __aenter__(self): function_id="root", )) + # Set observability context if provided + if self._observability_context: + self._context_state.observability_context.set(self._observability_context) + if (self._state == RunnerState.UNINITIALIZED): self._state = RunnerState.INITIALIZED else: diff --git a/tests/nat/observability/test_cross_workflow_observability.py b/tests/nat/observability/test_cross_workflow_observability.py new file mode 100644 index 0000000000..ba072b9601 --- /dev/null +++ b/tests/nat/observability/test_cross_workflow_observability.py @@ -0,0 +1,387 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import uuid +from unittest.mock import Mock, AsyncMock + +from nat.data_models.span import Span +from nat.observability.context import ObservabilityContext, ObservabilityContextManager +from nat.observability.workflow_utils import ObservabilityWorkflowInvoker +from nat.observability.processor.cross_workflow_processor import CrossWorkflowProcessor, WorkflowRelationshipProcessor + + +class TestObservabilityContext: + """Test the ObservabilityContext class.""" + + def test_create_root_context(self): + """Test creating a root observability context.""" + context = ObservabilityContext.create_root_context("test_workflow") + + assert context.trace_id is not None + assert context.root_span_id is not None + assert context.current_span_id == context.root_span_id + assert len(context.workflow_chain) == 1 + assert context.workflow_chain[0].workflow_name == "test_workflow" + assert context.workflow_chain[0].workflow_id == context.root_span_id + + def test_create_root_context_with_custom_ids(self): + """Test creating a root context with custom trace and span IDs.""" + trace_id = "custom-trace-id" + root_span_id = "custom-span-id" + + context = ObservabilityContext.create_root_context( + "test_workflow", trace_id=trace_id, root_span_id=root_span_id + ) + + assert context.trace_id == trace_id + assert context.root_span_id == root_span_id + assert context.current_span_id == root_span_id + + def test_create_child_context(self): + """Test creating a child observability context.""" + parent_context = ObservabilityContext.create_root_context("parent_workflow") + child_context = parent_context.create_child_context("child_workflow") + + assert child_context.trace_id == parent_context.trace_id + assert child_context.root_span_id == parent_context.root_span_id + assert child_context.current_span_id != parent_context.current_span_id + assert len(child_context.workflow_chain) == 2 + assert child_context.workflow_chain[0].workflow_name == "parent_workflow" + assert child_context.workflow_chain[1].workflow_name == "child_workflow" + assert child_context.workflow_chain[1].parent_workflow_id == parent_context.current_span_id + + def test_create_span_context(self): + """Test creating a new context with different span ID.""" + original_context = ObservabilityContext.create_root_context("test_workflow") + new_span_id = str(uuid.uuid4()) + span_context = original_context.create_span_context(new_span_id) + + assert span_context.trace_id == original_context.trace_id + assert span_context.root_span_id == original_context.root_span_id + assert span_context.current_span_id == new_span_id + assert len(span_context.workflow_chain) == len(original_context.workflow_chain) + + def test_add_attribute(self): + """Test adding custom attributes to the context.""" + context = ObservabilityContext.create_root_context("test_workflow") + context.add_attribute("test_key", "test_value") + + assert "test_key" in context.custom_attributes + assert context.custom_attributes["test_key"] == "test_value" + + def test_get_current_workflow(self): + """Test getting the current workflow metadata.""" + context = ObservabilityContext.create_root_context("test_workflow") + current_workflow = context.get_current_workflow() + + assert current_workflow is not None + assert current_workflow.workflow_name == "test_workflow" + + def test_get_workflow_depth(self): + """Test getting workflow depth.""" + parent_context = ObservabilityContext.create_root_context("parent") + child_context = parent_context.create_child_context("child") + grandchild_context = child_context.create_child_context("grandchild") + + assert parent_context.get_workflow_depth() == 1 + assert child_context.get_workflow_depth() == 2 + assert grandchild_context.get_workflow_depth() == 3 + + def test_to_dict_and_from_dict(self): + """Test serialization and deserialization of observability context.""" + original_context = ObservabilityContext.create_root_context("test_workflow") + original_context.add_attribute("test_attr", "test_value") + + # Convert to dict + context_dict = original_context.to_dict() + + # Convert back from dict + restored_context = ObservabilityContext.from_dict(context_dict) + + assert restored_context.trace_id == original_context.trace_id + assert restored_context.root_span_id == original_context.root_span_id + assert restored_context.current_span_id == original_context.current_span_id + assert len(restored_context.workflow_chain) == len(original_context.workflow_chain) + assert restored_context.custom_attributes == original_context.custom_attributes + + +class TestObservabilityContextManager: + """Test the ObservabilityContextManager class.""" + + def test_create_root_context(self): + """Test creating and setting a root context.""" + context = ObservabilityContextManager.create_root_context("test_workflow") + + assert context is not None + assert context.workflow_chain[0].workflow_name == "test_workflow" + + current_context = ObservabilityContextManager.get_current_context() + assert current_context is not None + assert current_context.trace_id == context.trace_id + + def test_create_child_context(self): + """Test creating a child context from current context.""" + # First create a root context + ObservabilityContextManager.create_root_context("parent_workflow") + + # Then create a child context + child_context = ObservabilityContextManager.create_child_context("child_workflow") + + assert child_context is not None + assert len(child_context.workflow_chain) == 2 + assert child_context.workflow_chain[1].workflow_name == "child_workflow" + + def test_create_child_context_without_current(self): + """Test creating a child context when no current context exists.""" + ObservabilityContextManager.clear_context() + child_context = ObservabilityContextManager.create_child_context("child_workflow") + + assert child_context is None + + def test_propagate_context(self): + """Test propagating an existing context.""" + external_context = ObservabilityContext.create_root_context("external_workflow") + ObservabilityContextManager.propagate_context(external_context) + + current_context = ObservabilityContextManager.get_current_context() + assert current_context is not None + assert current_context.trace_id == external_context.trace_id + + def test_clear_context(self): + """Test clearing the current context.""" + ObservabilityContextManager.create_root_context("test_workflow") + assert ObservabilityContextManager.get_current_context() is not None + + ObservabilityContextManager.clear_context() + assert ObservabilityContextManager.get_current_context() is None + + def test_get_trace_id(self): + """Test getting the current trace ID.""" + context = ObservabilityContextManager.create_root_context("test_workflow") + trace_id = ObservabilityContextManager.get_trace_id() + + assert trace_id == context.trace_id + + def test_get_current_span_id(self): + """Test getting the current span ID.""" + context = ObservabilityContextManager.create_root_context("test_workflow") + span_id = ObservabilityContextManager.get_current_span_id() + + assert span_id == context.current_span_id + + def test_add_workflow_attribute(self): + """Test adding an attribute to the current context.""" + ObservabilityContextManager.create_root_context("test_workflow") + ObservabilityContextManager.add_workflow_attribute("test_key", "test_value") + + current_context = ObservabilityContextManager.get_current_context() + assert current_context is not None + assert current_context.custom_attributes["test_key"] == "test_value" + + +class TestCrossWorkflowProcessor: + """Test the CrossWorkflowProcessor class.""" + + def setup_method(self): + """Set up test environment.""" + self.processor = CrossWorkflowProcessor() + + @pytest.mark.asyncio + async def test_process_span_with_context(self): + """Test processing a span with observability context available.""" + # Create a real observability context and set it in the context state + obs_context = ObservabilityContext.create_root_context("test_workflow") + obs_context.add_attribute("custom_attr", "custom_value") + + from nat.builder.context import Context + context = Context.get() + context.set_observability_context(obs_context) + + try: + span = Span(name="test_span") + processed_span = await self.processor.process(span) + + # Check that observability attributes were added + attributes = processed_span.attributes + assert "observability.trace_id" in attributes + assert "observability.root_span_id" in attributes + assert "observability.current_span_id" in attributes + assert "observability.workflow_depth" in attributes + assert "observability.workflow_name" in attributes + assert "observability.custom.custom_attr" in attributes + assert attributes["observability.custom.custom_attr"] == "custom_value" + + finally: + # Clean up context + context.set_observability_context(None) + + @pytest.mark.asyncio + async def test_process_span_without_context(self): + """Test processing a span without observability context.""" + from nat.builder.context import Context + context = Context.get() + context.set_observability_context(None) + + try: + span = Span(name="test_span") + processed_span = await self.processor.process(span) + + # Should return span with minimal changes + assert processed_span.name == span.name + # Attributes might have been slightly modified but no observability info should be added + assert "observability.trace_id" not in processed_span.attributes + + finally: + # Context already cleared + pass + + + +class TestWorkflowRelationshipProcessor: + """Test the WorkflowRelationshipProcessor class.""" + + def setup_method(self): + """Set up test environment.""" + self.processor = WorkflowRelationshipProcessor() + + @pytest.mark.asyncio + async def test_process_span_root_workflow(self): + """Test processing a span for a root workflow.""" + obs_context = ObservabilityContext.create_root_context("root_workflow") + + from nat.builder.context import Context + context = Context.get() + context.set_observability_context(obs_context) + + try: + span = Span(name="test_span") + processed_span = await self.processor.process(span) + + attributes = processed_span.attributes + assert attributes["relationship.type"] == "root_workflow" + assert attributes["relationship.nesting_level"] == 0 + + finally: + context.set_observability_context(None) + + @pytest.mark.asyncio + async def test_process_span_child_workflow(self): + """Test processing a span for a child workflow.""" + parent_context = ObservabilityContext.create_root_context("parent_workflow") + child_context = parent_context.create_child_context("child_workflow") + + from nat.builder.context import Context + context = Context.get() + context.set_observability_context(child_context) + + try: + span = Span(name="test_span") + processed_span = await self.processor.process(span) + + attributes = processed_span.attributes + assert attributes["relationship.type"] == "child_workflow" + assert attributes["relationship.parent_workflow_name"] == "parent_workflow" + assert attributes["relationship.child_workflow_name"] == "child_workflow" + assert attributes["relationship.nesting_level"] == 1 + assert "parent_workflow -> child_workflow" in attributes["relationship.hierarchy_path"] + + finally: + context.set_observability_context(None) + + +class TestObservabilityWorkflowInvoker: + """Test the ObservabilityWorkflowInvoker class.""" + + @pytest.mark.asyncio + async def test_invoke_workflow_with_context(self): + """Test invoking a workflow with observability context.""" + # Create a mock workflow with proper async context manager + mock_workflow = Mock() + mock_runner = Mock() + mock_runner.result = AsyncMock(return_value="test_result") + + # Create a proper async context manager mock + async_context_manager = AsyncMock() + async_context_manager.__aenter__ = AsyncMock(return_value=mock_runner) + async_context_manager.__aexit__ = AsyncMock(return_value=None) + mock_workflow.run = Mock(return_value=async_context_manager) + + parent_context = ObservabilityContext.create_root_context("parent_workflow") + + result = await ObservabilityWorkflowInvoker.invoke_workflow_with_context( + workflow=mock_workflow, + message="test_message", + workflow_name="child_workflow", + parent_context=parent_context + ) + + assert result == "test_result" + mock_workflow.run.assert_called_once() + + # Check that observability context was passed + call_args = mock_workflow.run.call_args + assert "observability_context" in call_args.kwargs + obs_context = call_args.kwargs["observability_context"] + assert obs_context is not None + assert len(obs_context.workflow_chain) == 2 + assert obs_context.workflow_chain[1].workflow_name == "child_workflow" + + @pytest.mark.asyncio + async def test_invoke_workflow_stream_with_context(self): + """Test invoking a workflow with streaming output and observability context.""" + # Create a mock workflow with proper async context manager + mock_workflow = Mock() + mock_runner = Mock() + + async def mock_result_stream(to_type=None): + yield "item1" + yield "item2" + + mock_runner.result_stream = mock_result_stream + + # Create a proper async context manager mock + async_context_manager = AsyncMock() + async_context_manager.__aenter__ = AsyncMock(return_value=mock_runner) + async_context_manager.__aexit__ = AsyncMock(return_value=None) + mock_workflow.run = Mock(return_value=async_context_manager) + + parent_context = ObservabilityContext.create_root_context("parent_workflow") + + results = [] + async for item in ObservabilityWorkflowInvoker.invoke_workflow_stream_with_context( + workflow=mock_workflow, + message="test_message", + workflow_name="child_workflow", + parent_context=parent_context + ): + results.append(item) + + assert results == ["item1", "item2"] + mock_workflow.run.assert_called_once() + + def test_create_observability_context(self): + """Test creating an observability context.""" + context = ObservabilityWorkflowInvoker.create_observability_context("test_workflow") + + assert context is not None + assert context.workflow_chain[0].workflow_name == "test_workflow" + + def test_get_current_observability_context(self): + """Test getting current observability context.""" + # This test would need proper context setup + context = ObservabilityWorkflowInvoker.get_current_observability_context() + # Since we don't have a proper context, this should return None + assert context is None or isinstance(context, ObservabilityContext) \ No newline at end of file From 40e724c9aee0b7a327fbd0996c07efa10f49a36f Mon Sep 17 00:00:00 2001 From: pira998 Date: Thu, 2 Oct 2025 08:43:20 +0000 Subject: [PATCH 2/3] fix: coderabbit comments on the pr Signed-off-by: pira998 --- .../cross_workflow_tracking_example.py | 4 ++-- .../integrated_example.py | 6 +++--- .../observability_config.yml | 8 ++++---- external/nat-ui | 2 +- .../processor/cross_workflow_processor.py | 10 ++++++++-- src/nat/observability/workflow_utils.py | 16 +++++++++++++--- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py index 20f847902c..5715da1f83 100644 --- a/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py +++ b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py @@ -16,7 +16,7 @@ from nat.runtime.loader import load_workflow -async def create_simple_config(): +async def create_simple_config() -> str: """Create a simple workflow config using built-in NAT functions.""" config_content = """ @@ -39,7 +39,7 @@ async def create_simple_config(): return f.name -async def main(): +async def main() -> None: """Demonstrate real NAT workflow with cross-workflow observability.""" print("=" * 50) diff --git a/examples/observability/cross_workflow_tracking/integrated_example.py b/examples/observability/cross_workflow_tracking/integrated_example.py index c9004a5cc8..ec53eeba30 100644 --- a/examples/observability/cross_workflow_tracking/integrated_example.py +++ b/examples/observability/cross_workflow_tracking/integrated_example.py @@ -174,7 +174,7 @@ async def create_sample_workflow_configs(): temp_dir = tempfile.mkdtemp() # Data validation workflow config - validation_config = f""" + validation_config = """ llms: mock_llm: _type: mock @@ -193,7 +193,7 @@ async def create_sample_workflow_configs(): f.write(validation_config.strip()) # Data processing workflow config - processing_config = f""" + processing_config = """ llms: mock_llm: _type: mock @@ -212,7 +212,7 @@ async def create_sample_workflow_configs(): f.write(processing_config.strip()) # Analysis workflow config - analysis_config = f""" + analysis_config = """ llms: mock_llm: _type: mock diff --git a/examples/observability/cross_workflow_tracking/observability_config.yml b/examples/observability/cross_workflow_tracking/observability_config.yml index 145618db98..64b32d38e7 100644 --- a/examples/observability/cross_workflow_tracking/observability_config.yml +++ b/examples/observability/cross_workflow_tracking/observability_config.yml @@ -30,10 +30,10 @@ general: llms: nim_llm: - _type: nim - model_name: meta/llama-3.1-8b-instruct - temperature: 0.7 - max_tokens: 1024 + _type: nat_test_llm + response_seq: + - "This is a test response for observability demonstration." + delay_ms: 100 workflow: _type: chat_completion diff --git a/external/nat-ui b/external/nat-ui index 16be996ab4..3927806c8e 160000 --- a/external/nat-ui +++ b/external/nat-ui @@ -1 +1 @@ -Subproject commit 16be996ab432c4384acf0223042fae8f28959e70 +Subproject commit 3927806c8ef046f2d34969bea694b5bfc9b3540b diff --git a/src/nat/observability/processor/cross_workflow_processor.py b/src/nat/observability/processor/cross_workflow_processor.py index 5a7f7c13bf..205b207f1a 100644 --- a/src/nat/observability/processor/cross_workflow_processor.py +++ b/src/nat/observability/processor/cross_workflow_processor.py @@ -13,10 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from nat.data_models.span import Span from nat.observability.processor.processor import Processor from nat.utils.type_utils import override +logger = logging.getLogger(__name__) + class CrossWorkflowProcessor(Processor[Span, Span]): """ @@ -98,8 +102,9 @@ async def process(self, item: Span) -> Span: for key, value in obs_context.custom_attributes.items(): item.set_attribute(f"observability.custom.{key}", value) - except Exception as e: + except (AttributeError, KeyError, TypeError, ValueError) as e: # If there's any error in processing, log it but don't fail the span + logger.warning(f"Error processing cross-workflow observability data: {e}", exc_info=True) item.set_attribute("observability.processing_error", str(e)) return item @@ -158,8 +163,9 @@ async def process(self, item: Span) -> Span: item.set_attribute("relationship.type", "root_workflow") item.set_attribute("relationship.nesting_level", 0) - except Exception as e: + except (AttributeError, IndexError, TypeError) as e: # If there's any error in processing, log it but don't fail the span + logger.warning(f"Error processing workflow relationship data: {e}", exc_info=True) item.set_attribute("relationship.processing_error", str(e)) return item diff --git a/src/nat/observability/workflow_utils.py b/src/nat/observability/workflow_utils.py index 327c511d7f..60ddeee05b 100644 --- a/src/nat/observability/workflow_utils.py +++ b/src/nat/observability/workflow_utils.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import time from typing import TYPE_CHECKING from typing import Any @@ -22,6 +24,8 @@ from nat.observability.context import ObservabilityContext +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from nat.builder.workflow import Workflow @@ -95,13 +99,15 @@ async def invoke_workflow_with_context(workflow: "Workflow[InputT, StreamingOutp return result except Exception as e: - # Update workflow metadata on failure + # Update workflow metadata on failure and log error if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() current_workflow.status = "failed" current_workflow.tags["error"] = str(e) + + logger.error(f"Workflow '{workflow_name}' failed with error: {e}", exc_info=True) raise @staticmethod @@ -161,13 +167,15 @@ async def invoke_workflow_stream_with_context( current_workflow.status = "completed" except Exception as e: - # Update workflow metadata on failure + # Update workflow metadata on failure and log error if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() current_workflow.status = "failed" current_workflow.tags["error"] = str(e) + + logger.error(f"Streaming workflow '{workflow_name}' failed with error: {e}", exc_info=True) raise @staticmethod @@ -271,11 +279,13 @@ async def invoke_with_steps_and_context(workflow: "Workflow[InputT, StreamingOut return result, steps except Exception as e: - # Update workflow metadata on failure + # Update workflow metadata on failure and log error if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() current_workflow.status = "failed" current_workflow.tags["error"] = str(e) + + logger.error(f"Workflow with steps '{workflow_name}' failed with error: {e}", exc_info=True) raise From 9012d2650a6c01aee3a45f6c7008cf7538953736 Mon Sep 17 00:00:00 2001 From: pira998 Date: Thu, 2 Oct 2025 09:39:06 +0000 Subject: [PATCH 3/3] fix: coderabbit comments on the pr Signed-off-by: pira998 --- .../cross_workflow_tracking_example.py | 13 ++-- .../cross_workflow_tracking/example.py | 2 +- .../processor/cross_workflow_processor.py | 4 +- src/nat/observability/workflow_utils.py | 71 +++++++------------ 4 files changed, 33 insertions(+), 57 deletions(-) diff --git a/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py index 5715da1f83..e598d0f75c 100644 --- a/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py +++ b/examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py @@ -21,15 +21,14 @@ async def create_simple_config() -> str: config_content = """ llms: - nvidia_llm: - _type: nim - model_name: meta/llama-3.1-8b-instruct - temperature: 0.7 - max_tokens: 1024 - + demo_llm: + _type: nat_test_llm + response_seq: + - "Stubbed workflow reply." + delay_ms: 0 workflow: _type: chat_completion - llm_name: nvidia_llm + llm_name: demo_llm system_prompt: "You are a helpful customer support assistant. Provide clear, concise, and helpful responses." """ diff --git a/examples/observability/cross_workflow_tracking/example.py b/examples/observability/cross_workflow_tracking/example.py index c9532cdcbb..b9e9b7bbab 100644 --- a/examples/observability/cross_workflow_tracking/example.py +++ b/examples/observability/cross_workflow_tracking/example.py @@ -105,7 +105,7 @@ async def simulate_workflow_execution(workflow_name: str, result = await transform_data(input_data) elif workflow_name == "storage" or "store" in workflow_name: result = await store_data(input_data) - elif workflow_name == "report" in workflow_name: + elif "report" in workflow_name: result = await generate_report(input_data) else: # Generic processing diff --git a/src/nat/observability/processor/cross_workflow_processor.py b/src/nat/observability/processor/cross_workflow_processor.py index 205b207f1a..b8245763fb 100644 --- a/src/nat/observability/processor/cross_workflow_processor.py +++ b/src/nat/observability/processor/cross_workflow_processor.py @@ -104,7 +104,7 @@ async def process(self, item: Span) -> Span: except (AttributeError, KeyError, TypeError, ValueError) as e: # If there's any error in processing, log it but don't fail the span - logger.warning(f"Error processing cross-workflow observability data: {e}", exc_info=True) + logger.exception("Error processing cross-workflow observability data: %s", e) item.set_attribute("observability.processing_error", str(e)) return item @@ -165,7 +165,7 @@ async def process(self, item: Span) -> Span: except (AttributeError, IndexError, TypeError) as e: # If there's any error in processing, log it but don't fail the span - logger.warning(f"Error processing workflow relationship data: {e}", exc_info=True) + logger.exception("Error processing workflow relationship data: %s", e) item.set_attribute("relationship.processing_error", str(e)) return item diff --git a/src/nat/observability/workflow_utils.py b/src/nat/observability/workflow_utils.py index 60ddeee05b..696662520a 100644 --- a/src/nat/observability/workflow_utils.py +++ b/src/nat/observability/workflow_utils.py @@ -88,27 +88,19 @@ async def invoke_workflow_with_context(workflow: "Workflow[InputT, StreamingOutp try: async with workflow.run(message, observability_context=obs_context) as runner: result = await runner.result(to_type=to_type) - - # Update workflow metadata on completion - if obs_context: - current_workflow = obs_context.get_current_workflow() - if current_workflow: - current_workflow.end_time = time.time() - current_workflow.status = "completed" - return result - - except Exception as e: - # Update workflow metadata on failure and log error + finally: + exc = sys.exc_info()[1] if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() - current_workflow.status = "failed" - current_workflow.tags["error"] = str(e) - - logger.error(f"Workflow '{workflow_name}' failed with error: {e}", exc_info=True) - raise + if exc is None: + current_workflow.status = "completed" + else: + current_workflow.status = "failed" + current_workflow.tags["error"] = str(exc) + logger.error("Workflow '%s' failed: %s", workflow_name, exc) @staticmethod async def invoke_workflow_stream_with_context( @@ -158,25 +150,18 @@ async def invoke_workflow_stream_with_context( async with workflow.run(message, observability_context=obs_context) as runner: async for item in runner.result_stream(to_type=to_type): yield item - - # Update workflow metadata on completion - if obs_context: - current_workflow = obs_context.get_current_workflow() - if current_workflow: - current_workflow.end_time = time.time() - current_workflow.status = "completed" - - except Exception as e: - # Update workflow metadata on failure and log error + finally: + exc = sys.exc_info()[1] if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() - current_workflow.status = "failed" - current_workflow.tags["error"] = str(e) - - logger.error(f"Streaming workflow '{workflow_name}' failed with error: {e}", exc_info=True) - raise + if exc is None: + current_workflow.status = "completed" + else: + current_workflow.status = "failed" + current_workflow.tags["error"] = str(exc) + logger.error("Streaming workflow '%s' failed: %s", workflow_name, exc) @staticmethod def get_current_observability_context() -> Optional[ObservabilityContext]: @@ -268,24 +253,16 @@ async def invoke_with_steps_and_context(workflow: "Workflow[InputT, StreamingOut to_type=to_type, observability_context=obs_context ) - - # Update workflow metadata on completion - if obs_context: - current_workflow = obs_context.get_current_workflow() - if current_workflow: - current_workflow.end_time = time.time() - current_workflow.status = "completed" - return result, steps - - except Exception as e: - # Update workflow metadata on failure and log error + finally: + exc = sys.exc_info()[1] if obs_context: current_workflow = obs_context.get_current_workflow() if current_workflow: current_workflow.end_time = time.time() - current_workflow.status = "failed" - current_workflow.tags["error"] = str(e) - - logger.error(f"Workflow with steps '{workflow_name}' failed with error: {e}", exc_info=True) - raise + if exc is None: + current_workflow.status = "completed" + else: + current_workflow.status = "failed" + current_workflow.tags["error"] = str(exc) + logger.error("Workflow with steps '%s' failed: %s", workflow_name, exc)