Skip to content

Commit

Permalink
[Executor] Fix flex flow metrices (#3010)
Browse files Browse the repository at this point in the history
# Description

Currently, we always update metrices no matter whether the output of
aggregation function is a dictionary or not, which is not reasonable.
The expected output of aggregation function should be a dictionary, if
not, we will skip metrices update and add a warning in logger.

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Lina Tang <[email protected]>
  • Loading branch information
lumoslnt and Lina Tang authored Apr 26, 2024
1 parent 4351ea8 commit e3574a9
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions src/promptflow-core/promptflow/executor/_script_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,22 @@ def _exec_aggregation(
output, metrics = None, {}
try:
output = self._aggr_func(**{self._aggr_input_name: inputs})
metrics = output if isinstance(output, dict) else {"metrics": output}
for k, v in metrics.items():
log_metric(k, v)
except Exception:
pass
if isinstance(output, dict):
metrics = output
for k, v in metrics.items():
log_metric(k, v)
else:
logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.")
except Exception as e:
error_type_and_message = f"({e.__class__.__name__}) {e}"
e = ScriptExecutionError(
message_format="Execution failure in '{func_name}': {error_type_and_message}",
func_name=self._aggr_func.__name__,
error_type_and_message=error_type_and_message,
)
error = ExceptionPresenter.create(e).to_dict(include_debug_info=True)
logger.warning(f"Failed to execute aggregation function with error: {error}")
logger.warning("The flow will have empty metrics.")
return AggregationResult(output, metrics, {})

async def exec_aggregation_async(
Expand All @@ -216,12 +227,15 @@ async def exec_aggregation_async(
return await self._exec_aggregation_async(aggregation_inputs)

async def _exec_aggregation_async(self, inputs):
output = None
output, metrics = None, {}
try:
output = await self._aggr_func_async(**{self._aggr_input_name: inputs})
metrics = output if isinstance(output, dict) else {"metrics": output}
for k, v in metrics.items():
log_metric(k, v)
if isinstance(output, dict):
metrics = output
for k, v in metrics.items():
log_metric(k, v)
else:
logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.")
except Exception as e:
error_type_and_message = f"({e.__class__.__name__}) {e}"
e = ScriptExecutionError(
Expand Down

0 comments on commit e3574a9

Please sign in to comment.