Skip to content

Commit

Permalink
Use same structure for main flow and evaluation value in LineSummary (#…
Browse files Browse the repository at this point in the history
…2845)

# Description
Keep same behavior with local trace.
Use same structure for main flow and evaluation value in LineSummary. 
Maybe could free us from adding fields one by one.


# All Promptflow Contribution checklist:
- [X] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [X] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [X] Title of the pull request is clear and informative.
- [X] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [X] Pull request includes test coverage for the included changes.

Co-authored-by: robbenwang <[email protected]>
  • Loading branch information
huaiyan and robbenwang authored Apr 17, 2024
1 parent 6d91214 commit 5901354
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 166 deletions.
55 changes: 14 additions & 41 deletions src/promptflow-azure/promptflow/azure/_storage/cosmosdb/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@dataclass
class SummaryLine:
"""
This class represents an Item in Summary container
This class represents an Item in LineSummary container and each value for evaluations dict.
"""

id: str
Expand All @@ -54,27 +54,6 @@ class SummaryLine:
line_run_id: str = None


@dataclass
class LineEvaluation:
"""
This class represents an evaluation value in Summary container item.
"""

outputs: typing.Dict
trace_id: str
root_span_id: str
name: str
created_by: typing.Dict
collection_id: str
flow_id: str = None
# Only for batch run
batch_run_id: str = None
line_number: str = None
# Only for line run
line_run_id: str = None


class Summary:
def __init__(self, span: Span, collection_id: str, created_by: typing.Dict, logger: logging.Logger) -> None:
self.span = span
Expand All @@ -93,7 +72,7 @@ def persist(self, client: ContainerProxy):
# For non root span, write a placeholder item to LineSummary table.
self._persist_running_item(client)
return
self._parse_inputs_outputs_from_events()
self._prepare_db_item()

# Persist root span as a line run.
self._persist_line_run(client)
Expand Down Expand Up @@ -191,9 +170,8 @@ def _process_value(value):
else:
return _process_value(content)

def _persist_line_run(self, client: ContainerProxy):
attributes: dict = self.span.attributes

def _prepare_db_item(self):
self._parse_inputs_outputs_from_events()
session_id = self.session_id
start_time = self.span.start_time.isoformat()
end_time = self.span.end_time.isoformat()
Expand All @@ -202,6 +180,7 @@ def _persist_line_run(self, client: ContainerProxy):
# Convert ISO 8601 formatted strings to datetime objects
latency = (self.span.end_time - self.span.start_time).total_seconds()
# calculate `cumulative_token_count`
attributes: dict = self.span.attributes
completion_token_count = int(attributes.get(SpanAttributeFieldName.COMPLETION_TOKEN_COUNT, 0))
prompt_token_count = int(attributes.get(SpanAttributeFieldName.PROMPT_TOKEN_COUNT, 0))
total_token_count = int(attributes.get(SpanAttributeFieldName.TOTAL_TOKEN_COUNT, 0))
Expand Down Expand Up @@ -237,10 +216,13 @@ def _persist_line_run(self, client: ContainerProxy):
elif SpanAttributeFieldName.BATCH_RUN_ID in attributes and SpanAttributeFieldName.LINE_NUMBER in attributes:
item.batch_run_id = attributes[SpanAttributeFieldName.BATCH_RUN_ID]
item.line_number = attributes[SpanAttributeFieldName.LINE_NUMBER]
self.item = item

self.logger.info(f"Persist main run for LineSummary id: {item.id}")
def _persist_line_run(self, client: ContainerProxy):

self.logger.info(f"Persist main run for LineSummary id: {self.item.id}")
# Use upsert because we may create running item in advance.
return client.upsert_item(body=asdict(item))
return client.upsert_item(body=asdict(self.item))

def _insert_evaluation_with_retry(self, client: ContainerProxy):
for attempt in range(3):
Expand All @@ -258,15 +240,6 @@ def _insert_evaluation_with_retry(self, client: ContainerProxy):

def _insert_evaluation(self, client: ContainerProxy):
attributes: dict = self.span.attributes
item = LineEvaluation(
trace_id=self.span.trace_id,
root_span_id=self.span.span_id,
collection_id=self.collection_id,
outputs=self.outputs,
name=self.span.name,
created_by=self.created_by,
)

# None is the default value for the field.
referenced_line_run_id = attributes.get(SpanAttributeFieldName.REFERENCED_LINE_RUN_ID, None)
referenced_batch_run_id = attributes.get(SpanAttributeFieldName.REFERENCED_BATCH_RUN_ID, None)
Expand Down Expand Up @@ -296,18 +269,18 @@ def _insert_evaluation(self, client: ContainerProxy):
raise InsertEvaluationsRetriableException(f"Cannot find main run by parameter {parameters}.")

if SpanAttributeFieldName.LINE_RUN_ID in attributes:
item.line_run_id = attributes[SpanAttributeFieldName.LINE_RUN_ID]
key = self.span.name
else:
batch_run_id = attributes[SpanAttributeFieldName.BATCH_RUN_ID]
item.batch_run_id = batch_run_id
item.line_number = line_number
# Use the batch run id, instead of the name, as the key in the evaluations dictionary.
# Customers may execute the same evaluation flow multiple times for a batch run.
# We should be able to save all evaluations, as customers use batch runs in a critical manner.
key = batch_run_id

patch_operations = [{"op": "add", "path": f"/evaluations/{key}", "value": asdict(item)}]
item_dict = asdict(self.item)
# Remove unnecessary fields from the item
del item_dict["evaluations"]
patch_operations = [{"op": "add", "path": f"/evaluations/{key}", "value": item_dict}]
self.logger.info(f"Insert evaluation for LineSummary main_id: {main_id}")
return client.patch_item(item=main_id, partition_key=main_partition_key, patch_operations=patch_operations)

Expand Down
Loading

0 comments on commit 5901354

Please sign in to comment.