From 09bd62a2f980278bf9b3eeb5121970a4d47b144e Mon Sep 17 00:00:00 2001 From: Lina Tang Date: Thu, 25 Apr 2024 17:02:59 +0800 Subject: [PATCH 1/3] Fix flex flow metrices --- .../promptflow/executor/_script_executor.py | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/promptflow-core/promptflow/executor/_script_executor.py b/src/promptflow-core/promptflow/executor/_script_executor.py index 744fa5f6834..442de15a509 100644 --- a/src/promptflow-core/promptflow/executor/_script_executor.py +++ b/src/promptflow-core/promptflow/executor/_script_executor.py @@ -195,11 +195,22 @@ def _exec_aggregation( output, metrics = None, {} try: output = self._aggr_func(**{self._aggr_input_name: inputs}) - metrics = output if isinstance(output, dict) else {"metrics": output} - for k, v in metrics.items(): - log_metric(k, v) + if isinstance(output, dict): + metrics = output + for k, v in metrics.items(): + log_metric(k, v) + else: + logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.") except Exception: - pass + error_type_and_message = f"({e.__class__.__name__}) {e}" + e = ScriptExecutionError( + message_format="Execution failure in '{func_name}': {error_type_and_message}", + func_name=self._aggr_func.__name__, + error_type_and_message=error_type_and_message, + ) + error = ExceptionPresenter.create(e).to_dict(include_debug_info=True) + logger.warning(f"Failed to execute aggregation function with error: {error}") + logger.warning("The flow will have empty metrics.") return AggregationResult(output, metrics, {}) async def exec_aggregation_async( @@ -219,9 +230,12 @@ async def _exec_aggregation_async(self, inputs): output = None try: output = await self._aggr_func_async(**{self._aggr_input_name: inputs}) - metrics = output if isinstance(output, dict) else {"metrics": output} - for k, v in metrics.items(): - log_metric(k, v) + if isinstance(output, dict): + metrics = output + for k, v in metrics.items(): + log_metric(k, v) + else: + logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.") except Exception as e: error_type_and_message = f"({e.__class__.__name__}) {e}" e = ScriptExecutionError( From 260806db1a5611c9a9b525e385ae10633a2791b1 Mon Sep 17 00:00:00 2001 From: Lina Tang Date: Thu, 25 Apr 2024 17:21:19 +0800 Subject: [PATCH 2/3] Fix tests --- src/promptflow-core/promptflow/executor/_script_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promptflow-core/promptflow/executor/_script_executor.py b/src/promptflow-core/promptflow/executor/_script_executor.py index 442de15a509..04fed87db5e 100644 --- a/src/promptflow-core/promptflow/executor/_script_executor.py +++ b/src/promptflow-core/promptflow/executor/_script_executor.py @@ -201,7 +201,7 @@ def _exec_aggregation( log_metric(k, v) else: logger.warning("The output of aggregation function isn't a dictionary, skip the metrices update.") - except Exception: + except Exception as e: error_type_and_message = f"({e.__class__.__name__}) {e}" e = ScriptExecutionError( message_format="Execution failure in '{func_name}': {error_type_and_message}", From 250db56e16ea19623d2c832ff5154b3ee6806c9e Mon Sep 17 00:00:00 2001 From: Lina Tang Date: Thu, 25 Apr 2024 18:54:55 +0800 Subject: [PATCH 3/3] Fix bugs --- src/promptflow-core/promptflow/executor/_script_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promptflow-core/promptflow/executor/_script_executor.py b/src/promptflow-core/promptflow/executor/_script_executor.py index 04fed87db5e..251d68c1b96 100644 --- a/src/promptflow-core/promptflow/executor/_script_executor.py +++ b/src/promptflow-core/promptflow/executor/_script_executor.py @@ -227,7 +227,7 @@ async def exec_aggregation_async( return await self._exec_aggregation_async(aggregation_inputs) async def _exec_aggregation_async(self, inputs): - output = None + output, metrics = None, {} try: output = await self._aggr_func_async(**{self._aggr_input_name: inputs}) if isinstance(output, dict):