Skip to content

Commit

Permalink
Record root span name for each child span
Browse files Browse the repository at this point in the history
  • Loading branch information
robbenwang committed Apr 26, 2024
1 parent 091fa17 commit 8f3724d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,18 @@ def _persist_running_item(self, client: ContainerProxy):
trace_id = self.span.trace_id
session_id = self.session_id

attributes: dict = self.span.attributes
item = SummaryLine(
id=trace_id,
partition_key=self.collection_id,
session_id=session_id,
trace_id=trace_id,
name=attributes.get(SpanAttributeFieldName.ROOT_SPAN_NAME, None),
status=RUNNING_LINE_RUN_STATUS,
collection_id=self.collection_id,
created_by=self.created_by,
start_time=self.span.start_time.isoformat(),
)
attributes: dict = self.span.attributes
if SpanAttributeFieldName.LINE_RUN_ID in attributes:
item.line_run_id = attributes[SpanAttributeFieldName.LINE_RUN_ID]
elif SpanAttributeFieldName.BATCH_RUN_ID in attributes and SpanAttributeFieldName.LINE_NUMBER in attributes:
Expand Down
1 change: 1 addition & 0 deletions src/promptflow-core/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SpanAttributeFieldName:
FUNCTION = "function"
INPUTS = "inputs"
OUTPUT = "output"
ROOT_SPAN_NAME = "root_span_name" # Record root span name in all child spans to show correct name for running item
# token metrics
COMPLETION_TOKEN_COUNT = "llm.usage.completion_tokens"
PROMPT_TOKEN_COUNT = "llm.usage.prompt_tokens"
Expand Down
16 changes: 15 additions & 1 deletion src/promptflow-core/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,15 @@ def _start_flow_span(self, inputs: Mapping[str, Any]):
with otel_tracer.start_as_current_span(self._flow.name) as span:
# Store otel trace id in context for correlation
OperationContext.get_instance()["otel_trace_id"] = f"0x{format_trace_id(span.get_span_context().trace_id)}"

root_span_name_key = "root_span_name"
attributes = OperationContext.get_instance()._get_otel_attributes()
exist_root_span_name = root_span_name_key in attributes
# Add root span name to attributes to make sure we could show correct name for running flow in UI.
# For scenario like chat group, the root span name is already set, we don't need to set it again.
if not exist_root_span_name:
OperationContext.get_instance()._add_otel_attributes(root_span_name_key, span.name)

# initialize span
span.set_attributes(
{
Expand All @@ -853,7 +862,12 @@ def _start_flow_span(self, inputs: Mapping[str, Any]):
enrich_span_with_context(span)
# enrich span with input
enrich_span_with_input(span, inputs)
yield span
try:
yield span
finally:
# Remove root span name from attributes to make sure the next execution could set it correctly.
if not exist_root_span_name:
OperationContext.get_instance()._remove_otel_attributes(root_span_name_key)

async def _exec_inner_with_trace_async(
self,
Expand Down
31 changes: 27 additions & 4 deletions src/promptflow-tracing/promptflow/tracing/_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ def _record_keyboard_interrupt_to_span(span: Span):
raise


@contextlib.contextmanager
def _process_root_span_name(span: Span):
# We will create placeholder for root span when the process is running.
# It's very necessary to show root span name in the running placeholder.
# So, we need to add root span name to the attributes of all child spans.
root_span_name_key = "root_span_name"
attributes = OperationContext.get_instance()._get_otel_attributes()
exist_root_span_name = root_span_name_key in attributes
if not exist_root_span_name:
OperationContext.get_instance()._add_otel_attributes(root_span_name_key, span.name)
try:
yield
finally:
# It's critical to remove the root span name from the attributes after the process is done.
# If not, the root span name will be added to the attributes of the next process.
if not exist_root_span_name:
OperationContext.get_instance()._remove_otel_attributes(root_span_name_key)


class TokenCollector:
_lock = Lock()

Expand Down Expand Up @@ -187,7 +206,7 @@ def traced_generator(original_span: ReadableSpan, inputs, generator):
with otel_tracer.start_as_current_span(
f"Iterated({original_span.name})",
links=[link],
) as span, _record_keyboard_interrupt_to_span(span):
) as span, _process_root_span_name(span), _record_keyboard_interrupt_to_span(span):
enrich_span_with_original_attributes(span, original_span.attributes)
# Enrich the new span with input before generator iteration to prevent loss of input information.
# The input is as an event within this span.
Expand All @@ -211,7 +230,7 @@ async def traced_async_generator(original_span: ReadableSpan, inputs, generator)
with otel_tracer.start_as_current_span(
f"Iterated({original_span.name})",
links=[link],
) as span, _record_keyboard_interrupt_to_span(span):
) as span, _process_root_span_name(span), _record_keyboard_interrupt_to_span(span):
enrich_span_with_original_attributes(span, original_span.attributes)
# Enrich the new span with input before generator iteration to prevent loss of input information.
# The input is as an event within this span.
Expand Down Expand Up @@ -388,7 +407,9 @@ async def wrapped(*args, **kwargs):
span_name = get_node_name_from_context(used_for_span_name=True) or trace.name
# need to get everytime to ensure tracer is latest
otel_tracer = otel_trace.get_tracer("promptflow")
with otel_tracer.start_as_current_span(span_name) as span, _record_keyboard_interrupt_to_span(span):
with otel_tracer.start_as_current_span(span_name) as span, _process_root_span_name(
span
), _record_keyboard_interrupt_to_span(span):
# Store otel trace id in context for correlation
OperationContext.get_instance()["otel_trace_id"] = f"0x{format_trace_id(span.get_span_context().trace_id)}"
enrich_span_with_trace(span, trace)
Expand Down Expand Up @@ -454,7 +475,9 @@ def wrapped(*args, **kwargs):
span_name = get_node_name_from_context(used_for_span_name=True) or trace.name
# need to get everytime to ensure tracer is latest
otel_tracer = otel_trace.get_tracer("promptflow")
with otel_tracer.start_as_current_span(span_name) as span, _record_keyboard_interrupt_to_span(span):
with otel_tracer.start_as_current_span(span_name) as span, _process_root_span_name(
span
), _record_keyboard_interrupt_to_span(span):
# Store otel trace id in context for correlation
OperationContext.get_instance()["otel_trace_id"] = f"0x{format_trace_id(span.get_span_context().trace_id)}"
enrich_span_with_trace(span, trace)
Expand Down

0 comments on commit 8f3724d

Please sign in to comment.