From e3574a9f577e0407ad84ee790a798d945e20c9ce Mon Sep 17 00:00:00 2001 From: Lina Tang Date: Fri, 26 Apr 2024 11:27:11 +0800 Subject: [PATCH] [Executor] Fix flex flow metrices (#3010) # Description Currently, we always update metrices no matter whether the output of aggregation function is a dictionary or not, which is not reasonable. The expected output of aggregation function should be a dictionary, if not, we will skip metrices update and add a warning in logger. # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: Lina Tang --- .../promptflow/executor/_script_executor.py | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/promptflow-core/promptflow/executor/_script_executor.py b/src/promptflow-core/promptflow/executor/_script_executor.py index 744fa5f6834..251d68c1b96 100644 --- a/src/promptflow-core/promptflow/executor/_script_executor.py +++ b/src/promptflow-core/promptflow/executor/_script_executor.py @@ -195,11 +195,22 @@ def _exec_aggregation( output, metrics = None, {} try: output = self._aggr_func(**{self._aggr_input_name: inputs}) - metrics = output if isinstance(output, dict) else {"metrics": output} - for k, v in metrics.items(): - log_metric(k, v) - except Exception: - pass + if isinstance(output, dict): + metrics = output + for k, v in metrics.items(): + log_metric(k, v) + else: + logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.") + except Exception as e: + error_type_and_message = f"({e.__class__.__name__}) {e}" + e = ScriptExecutionError( + message_format="Execution failure in '{func_name}': {error_type_and_message}", + func_name=self._aggr_func.__name__, + error_type_and_message=error_type_and_message, + ) + error = ExceptionPresenter.create(e).to_dict(include_debug_info=True) + logger.warning(f"Failed to execute aggregation function with error: {error}") + logger.warning("The flow will have empty metrics.") return AggregationResult(output, metrics, {}) async def exec_aggregation_async( @@ -216,12 +227,15 @@ async def exec_aggregation_async( return await self._exec_aggregation_async(aggregation_inputs) async def _exec_aggregation_async(self, inputs): - output = None + output, metrics = None, {} try: output = await self._aggr_func_async(**{self._aggr_input_name: inputs}) - metrics = output if isinstance(output, dict) else {"metrics": output} - for k, v in metrics.items(): - log_metric(k, v) + if isinstance(output, dict): + metrics = output + for k, v in metrics.items(): + log_metric(k, v) + else: + logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.") except Exception as e: error_type_and_message = f"({e.__class__.__name__}) {e}" e = ScriptExecutionError(