Skip to content

Commit

Permalink
[SDK] Record aggregation node error in bulk run (#1580)
Browse files Browse the repository at this point in the history
# Description

Please add an informative description that covers that changes made by
the pull request and link all relevant issues.

Warn and record.
CLI experience:

![image](https://github.com/microsoft/promptflow/assets/7776147/793517c2-2683-4d46-a3ac-e6ca37dce67f)
SDK experience:

![image](https://github.com/microsoft/promptflow/assets/7776147/91bbdde2-8578-47b6-8b0b-2c892e69b35c)



This pull request includes several changes to improve the functionality
and logging of the codebase. The most important changes include adding a
new variable to improve warning messages during a bulk run, updating
import statements and renaming keys in exception classes for
consistency, and adding new test data and a test method for the
`aggregation_node_failed` flow.

Main functionality and logging improvements:

* <a
href="diffhunk://#diff-e3499cf713c66612d91723759de1c5076c22689b2ce2c89678f82de624b57c2cL99-R110">`src/promptflow/promptflow/_sdk/_submitter/run_submitter.py`</a>:
Added a new variable `error_log` to improve warning messages during a
bulk run.
* <a
href="diffhunk://#diff-1a5c2463c2ef997c5203a6b2e77b247d812961b7a6df5f0f45c02579b6656719L294-R300">`src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py`</a>:
Added a loop to collect error messages from failed aggregation nodes and
renamed a key in the exception dictionary. <a
href="diffhunk://#diff-1a5c2463c2ef997c5203a6b2e77b247d812961b7a6df5f0f45c02579b6656719L294-R300">[1]</a>
<a
href="diffhunk://#diff-1a5c2463c2ef997c5203a6b2e77b247d812961b7a6df5f0f45c02579b6656719L322-R324">[2]</a>
* <a
href="diffhunk://#diff-22682fe335b54b3d9e9535b43000db9a81b718ad54ce82fccc27eb8f51ba99ddL4-R4">`src/promptflow/promptflow/_sdk/_errors.py`</a>:
Updated import statement and renamed a key in the exception class. <a
href="diffhunk://#diff-22682fe335b54b3d9e9535b43000db9a81b718ad54ce82fccc27eb8f51ba99ddL4-R4">[1]</a>
<a
href="diffhunk://#diff-22682fe335b54b3d9e9535b43000db9a81b718ad54ce82fccc27eb8f51ba99ddL71-R80">[2]</a>

Test improvements:

* <a
href="diffhunk://#diff-fd2280ec43a75adf07bef635f79fe8054a7272f14ab4d1464bd24f41a9169953R1-R3">`src/promptflow/tests/test_configs/flows/aggregation_node_failed/data.jsonl`</a>:
Added additional test data for the `aggregation_node_failed` flow.
* <a
href="diffhunk://#diff-94a59a05643476869fa3c6bc45466f1582944a935488075e2e63b6a6a196958fR1061-R1082">`src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py`</a>:
Added a new test method `test_aggregation_node_failed` to test the
`aggregation_node_failed` flow and assert the run's status and error
messages.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
D-W- authored Dec 22, 2023
1 parent 6ff0eeb commit 1324b13
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/promptflow/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _prepare_home_dir() -> Path:
DEFAULT_ENCODING = "utf-8"
LOCAL_STORAGE_BATCH_SIZE = 1
LOCAL_SERVICE_PORT = 5000
BULK_RUN_LINE_ERRORS = "BulkRunLineErrors"
BULK_RUN_ERRORS = "BulkRunErrors"
RUN_MACRO = "${run}"
VARIANT_ID_MACRO = "${variant_id}"
TIMESTAMP_MACRO = "${timestamp}"
Expand Down
9 changes: 5 additions & 4 deletions src/promptflow/promptflow/_sdk/_errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from promptflow._sdk._constants import BULK_RUN_LINE_ERRORS
from promptflow._sdk._constants import BULK_RUN_ERRORS
from promptflow.exceptions import ErrorTarget, PromptflowException


Expand Down Expand Up @@ -68,15 +68,16 @@ class GenerateFlowToolsJsonError(PromptflowException):
class BulkRunException(PromptflowException):
"""Exception raised when bulk run failed."""

def __init__(self, *, message="", failed_lines, total_lines, line_errors, module: str = None, **kwargs):
def __init__(self, *, message="", failed_lines, total_lines, errors, module: str = None, **kwargs):
self.failed_lines = failed_lines
self.total_lines = total_lines
self._additional_info = {
BULK_RUN_LINE_ERRORS: line_errors,
BULK_RUN_ERRORS: errors,
}

message = f"First error message is: {message}"
if isinstance(failed_lines, int) and isinstance(total_lines, int):
# bulk run error is line error only when failed_lines > 0
if isinstance(failed_lines, int) and isinstance(total_lines, int) and failed_lines > 0:
message = f"Failed to run {failed_lines}/{total_lines} lines. " + message
super().__init__(message=message, target=ErrorTarget.RUNTIME, module=module, **kwargs)

Expand Down
22 changes: 14 additions & 8 deletions src/promptflow/promptflow/_sdk/_submitter/run_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,22 @@ def _submit_bulk_run(self, flow: Flow, run: Run, local_storage: LocalStorageOper
output_dir=local_storage.outputs_folder,
run_id=run_id,
)

error_logs = []
if batch_result.failed_lines > 0:
# Log warning message when there are failed line run in bulk run.
error_log = f"{batch_result.failed_lines} out of {batch_result.total_lines} runs failed in batch run."
if run.properties.get(FlowRunProperties.OUTPUT_PATH, None):
error_log = (
error_log
+ f" Please check out {run.properties[FlowRunProperties.OUTPUT_PATH]} for more details."
)
logger.warning(error_log)
error_logs.append(
f"{batch_result.failed_lines} out of {batch_result.total_lines} runs failed in batch run."
)
if batch_result.error_summary.aggr_error_dict:
# log warning message when there are failed aggregation nodes in bulk run.
aggregation_nodes = list(batch_result.error_summary.aggr_error_dict.keys())
error_logs.append(f"aggregation nodes {aggregation_nodes} failed in batch run.")
# update error log
if error_logs and run.properties.get(FlowRunProperties.OUTPUT_PATH, None):
error_logs.append(
f" Please check out {run.properties[FlowRunProperties.OUTPUT_PATH]} for more details."
)
logger.warning("\n".join(error_logs))
# The bulk run is completed if the batch_engine.run successfully completed.
status = Status.Completed.value
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,13 @@ def dump_exception(self, exception: Exception, batch_result: BatchResult) -> Non
:param batch_result: Bulk run outputs. If exception not raised, store line run error messages.
"""
# extract line run errors
message = ""
errors = []
if batch_result:
for line_error in batch_result.error_summary.error_list:
errors.append(line_error.to_dict())
# collect aggregation node error
for node_name, aggr_error in batch_result.error_summary.aggr_error_dict.items():
errors.append({"error": aggr_error, "aggregation_node_name": node_name})
if errors:
try:
# use first line run error message as exception message if no exception raised
Expand All @@ -318,7 +320,7 @@ def dump_exception(self, exception: Exception, batch_result: BatchResult) -> Non
error=exception,
failed_lines=batch_result.failed_lines if batch_result else "unknown",
total_lines=batch_result.total_lines if batch_result else "unknown",
line_errors={"errors": errors},
errors={"errors": errors},
)
with open(self._exception_path, mode="w", encoding=DEFAULT_ENCODING) as f:
json.dump(
Expand Down
22 changes: 22 additions & 0 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,3 +1058,25 @@ def test_create_run_with_existing_run_folder(self, pf):
assert metrics == {}
pf.stream(run_name)
pf.visualize([run_name])

def test_aggregation_node_failed(self, pf):
failed_run = pf.run(
flow=f"{FLOWS_DIR}/aggregation_node_failed",
data=f"{FLOWS_DIR}/aggregation_node_failed/data.jsonl",
)
# even if all lines failed, the bulk run's status is completed.
assert failed_run.status == "Completed"
# error messages will store in local
local_storage = LocalStorageOperations(failed_run)

assert os.path.exists(local_storage._exception_path)
exception = local_storage.load_exception()
assert "First error message is" in exception["message"]
# line run failures will be stored in additionalInfo
assert len(exception["additionalInfo"][0]["info"]["errors"]) == 1

# show run will get error message
run = pf.runs.get(name=failed_run.name)
run_dict = run._to_dict()
assert "error" in run_dict
assert run_dict["error"] == exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"groundtruth": "Tomorrow's weather will be sunny.","prediction": "The weather will be sunny tomorrow."}
{"groundtruth": "Hello,","prediction": "World."}
{"groundtruth": "Promptflow is a super easy-to-use tool, right?","prediction": "Yes!"}

0 comments on commit 1324b13

Please sign in to comment.