Skip to content

Commit

Permalink
Merge branch 'main' into kaiyi/feat/update-trace-bundle-0329
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengfeiwang committed Apr 1, 2024
2 parents 5c72047 + baa9c3b commit 53c7e18
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 16 deletions.
24 changes: 15 additions & 9 deletions src/promptflow-core/promptflow/_utils/multimedia_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _save_image_to_file(
return image_reference

def get_file_reference_encoder(
self, folder_path: Path, relative_path: Path = None, *, use_absolute_path=False
self, folder_path: Path, relative_path: Path = None, use_absolute_path=False
) -> Callable:
def pfbytes_file_reference_encoder(obj):
"""Dumps PFBytes to a file and returns its reference."""
Expand Down Expand Up @@ -253,11 +253,9 @@ def _process_multimedia_dict_recursively(value: Any, process_funcs: Dict[Callabl
for check_func, process_func in process_funcs.items():
if check_func(value):
return process_func(value)
else:
return {
k: MultimediaProcessor._process_multimedia_dict_recursively(v, process_funcs)
for k, v in value.items()
}
return {
k: MultimediaProcessor._process_multimedia_dict_recursively(v, process_funcs) for k, v in value.items()
}
else:
return value

Expand Down Expand Up @@ -489,11 +487,19 @@ def convert_pfbytes_to_base64_dict(obj: PFBytes):

# TODO:Runtime relies on these old interfaces and will be removed in the future.
def persist_multimedia_data(
value: Any, base_dir: Path, sub_dir: Path = None, multimedia_processor: MultimediaProcessor = None
value: Any,
base_dir: Path,
sub_dir: Path = None,
use_absolute_path=False,
multimedia_processor: MultimediaProcessor = None,
):
if multimedia_processor:
return multimedia_processor.persist_multimedia_data(value, base_dir, sub_dir)
return BasicMultimediaProcessor().persist_multimedia_data(value, base_dir, sub_dir)
return multimedia_processor.persist_multimedia_data(
value, base_dir, sub_dir, use_absolute_path=use_absolute_path
)
return BasicMultimediaProcessor().persist_multimedia_data(
value, base_dir, sub_dir, use_absolute_path=use_absolute_path
)


def load_multimedia_data_recursively(value: Any, multimedia_processor: MultimediaProcessor = None):
Expand Down
136 changes: 133 additions & 3 deletions src/promptflow/tests/executor/e2etests/test_traces.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
import json
import multiprocessing
import sys
import threading
import uuid
from pathlib import Path
from types import GeneratorType
from unittest.mock import patch

import opentelemetry.trace as otel_trace
import pytest
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, SpanExportResult
from opentelemetry.trace.status import StatusCode

from promptflow._utils.tool_utils import get_inputs_for_prompt_template
from promptflow.batch._result import BatchResult
from promptflow.contracts.run_info import Status
from promptflow.executor import FlowExecutor
from promptflow.executor._line_execution_process_pool import _process_wrapper
from promptflow.executor._process_manager import create_spawned_fork_process_manager
from promptflow.executor._result import LineResult
from promptflow.tracing import trace
from promptflow.tracing._utils import serialize
from promptflow.tracing.contracts.trace import TraceType

from ..process_utils import execute_function_in_subprocess
from ..utils import get_flow_folder, get_flow_sample_inputs, get_yaml_file, load_content, prepare_memory_exporter
from ..process_utils import execute_function_in_subprocess, override_process_pool_targets
from ..utils import (
MemoryRunStorage,
get_flow_folder,
get_flow_sample_inputs,
get_yaml_file,
load_content,
prepare_memory_exporter,
submit_batch_run,
)

LLM_FUNCTION_NAMES = [
"openai.resources.chat.completions.Completions.create",
Expand Down Expand Up @@ -58,6 +76,57 @@
"AzureOpenAI.chat",
]

lock = multiprocessing.Lock()


class JsonSpanExporter(SpanExporter):
_lock = threading.Lock()

def __init__(self, file_path):
self.file_path = file_path

def export(self, spans):
try:
if self._lock:
with open(self.file_path, "a") as f:
for span in spans:
f.write(span.to_json() + "\n\n")
return SpanExportResult.SUCCESS
except Exception:
return SpanExportResult.FAILURE

def shutdown(self):
pass


def mock_exporter_for_batch_tracing():
patch_targets = {
"promptflow.executor.flow_executor.setup_exporter_from_environ": mock_setup_exporter_from_environ,
}
for target, func in patch_targets.items():
patcher = patch(target, func)
patcher.start()


def mock_setup_exporter_from_environ():
with lock:
idx = len(list(Path("./.span").glob("*.jsonl")))
Path(f"./.span/line_span_{idx}.jsonl").touch()
tracer_provider = TracerProvider()
json_exporter = JsonSpanExporter(file_path=f"./.span/line_span_{idx}.jsonl")
tracer_provider.add_span_processor(SimpleSpanProcessor(json_exporter))
otel_trace.set_tracer_provider(tracer_provider)


def mock_process_wrapper(*args, **kwargs):
mock_exporter_for_batch_tracing()
_process_wrapper(*args, **kwargs)


def mock_process_manager(*args, **kwargs):
mock_exporter_for_batch_tracing()
create_spawned_fork_process_manager(*args, **kwargs)


def get_chat_input(stream):
return {
Expand Down Expand Up @@ -515,7 +584,7 @@ def assert_otel_traces_run_flow_then_traced_function(self):
def test_flow_with_nested_tool(self):
memory_exporter = prepare_memory_exporter()

line_result, line_run_id = self.submit_flow_run("flow_with_nested_tool", {"input": "Hello"}, {})
line_result, _ = self.submit_flow_run("flow_with_nested_tool", {"input": "Hello"}, {})
assert line_result.output == {"output": "Hello"}

span_list = memory_exporter.get_finished_spans()
Expand All @@ -527,6 +596,67 @@ def test_flow_with_nested_tool(self):
else:
assert span.name == "nested_tool_node"

def test_otel_trace_with_batch(self, dev_connections):
flow_file = "flow_with_trace"
execute_function_in_subprocess(self.assert_otel_traces_with_batch, dev_connections, flow_file)

def assert_otel_traces_with_batch(self, dev_connections, flow_file):
flow_folder = get_flow_folder(flow_file)
if (span_folder := flow_folder / ".span").exists():
for file in span_folder.glob("*.jsonl"):
file.unlink()
else:
span_folder.mkdir()

with override_process_pool_targets(process_manager=mock_process_manager, process_wrapper=mock_process_wrapper):
mem_run_storage = MemoryRunStorage()
batch_result = submit_batch_run(
flow_folder=flow_file,
inputs_mapping={"user_id": "${data.user_id}"},
input_file_name="inputs.jsonl",
connections=dev_connections,
storage=mem_run_storage,
)
assert isinstance(batch_result, BatchResult)

batch_run_id = list(mem_run_storage._flow_runs.values())[0].root_run_id
assert (flow_folder / ".span").exists()
trace_ids = []
for file in span_folder.glob("*.jsonl"):
spans = []
with open(file, "r") as f:
json_chunks = f.read().strip().split("\n\n")
for chunk in json_chunks:
spans.append(json.loads(chunk))
trace_ids.append(spans[0]["context"]["trace_id"])
assert len(spans) == 5, f"Got {len(spans)} spans."
root_spans = [span for span in spans if span["parent_id"] is None]
assert len(root_spans) == 1
root_span = root_spans[0]
for span in spans:
span["status"]["status_code"] = "OK"
span["attributes"]["batch_run_id"] = batch_run_id
span["attributes"]["framework"] = "promptflow"
if span["parent_id"] is None:
expected_span_type = "Flow"
elif span["attributes"].get("function", "") in LLM_FUNCTION_NAMES:
expected_span_type = "LLM"
elif span["attributes"].get("function", "") in EMBEDDING_FUNCTION_NAMES:
expected_span_type = "Embedding"
else:
expected_span_type = "Function"
msg = f"span_type: {span['attributes']['span_type']}, expected: {expected_span_type}"
assert span["attributes"]["span_type"] == expected_span_type, msg
if span != root_span: # Non-root spans should have a parent
assert span["attributes"]["function"]
inputs = json.loads(span["attributes"]["inputs"])
output = json.loads(span["attributes"]["output"])
assert isinstance(inputs, dict)
assert output is not None

for run_info in mem_run_storage._flow_runs.values():
assert f"0x{int(run_info.otel_trace_id, 16):032x}" in trace_ids

def submit_flow_run(self, flow_file, inputs, dev_connections):
executor = FlowExecutor.create(get_yaml_file(flow_file), dev_connections)
line_run_id = str(uuid.uuid4())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,32 @@ def test_create_from_yaml(self, flow_folder_name, flow_file, processor_class):
assert isinstance(processor, processor_class)

def test_process_multimedia_dict_recursively(self):
def process_func(image_dict):
def process_func_image(image_dict):
return "image_placeholder"

def process_func_text(text_dict):
return "text_placeholder"

image_dict = {"data:image/jpg;path": "logo.jpg"}
text_dict = {"type": "text", "text": "Hello, World!"}
value = {
"image": image_dict,
"text": text_dict,
"images": [image_dict, image_dict],
"object": {"image": image_dict, "other_data": "other_data"},
"object": {"image": image_dict, "text": text_dict, "other_data": "other_data"},
}
updated_value = MultimediaProcessor._process_multimedia_dict_recursively(
value, {BasicMultimediaProcessor.is_multimedia_dict: process_func}
value,
{
BasicMultimediaProcessor.is_multimedia_dict: process_func_image,
TextProcessor.is_text_dict: process_func_text,
},
)
assert updated_value == {
"image": "image_placeholder",
"text": "text_placeholder",
"images": ["image_placeholder", "image_placeholder"],
"object": {"image": "image_placeholder", "other_data": "other_data"},
"object": {"image": "image_placeholder", "text": "text_placeholder", "other_data": "other_data"},
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"user_id": 1}
{"user_id": 2}

0 comments on commit 53c7e18

Please sign in to comment.