diff --git a/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py b/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py index f9c916fcbe1..0dc6187c5c4 100644 --- a/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py +++ b/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py @@ -36,6 +36,7 @@ from promptflow.contracts.run_info import RunInfo as NodeRunInfo from promptflow.contracts.run_info import Status from promptflow.contracts.run_mode import RunMode +from promptflow.executor._result import LineResult from promptflow.executor.flow_executor import BulkResult from promptflow.storage import AbstractRunStorage @@ -239,7 +240,14 @@ def load_io_spec(self) -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, s flow_dag = yaml.safe_load(f) return flow_dag["inputs"], flow_dag["outputs"] - def dump_inputs(self, inputs: RunInputs) -> None: + def dump_inputs(self, line_results: List[LineResult]) -> None: + inputs = [] + for line_result in line_results: + try: + inputs.append(line_result.run_info.inputs) + except Exception: + # ignore when single line doesn't have inputs + pass df = pd.DataFrame(inputs) with open(self._inputs_path, mode="w", encoding=DEFAULT_ENCODING) as f: # policy: http://policheck.azurewebsites.net/Pages/TermInfo.aspx?LCID=9&TermID=203588 @@ -389,6 +397,7 @@ def persist_result(self, result: Optional[BulkResult]) -> None: return self.dump_outputs(result.outputs) self.dump_metrics(result.metrics) + self.dump_inputs(result.line_results) @staticmethod def _prepare_folder(path: Union[str, Path]) -> Path: diff --git a/src/promptflow/promptflow/_sdk/operations/_run_submitter.py b/src/promptflow/promptflow/_sdk/operations/_run_submitter.py index c98b32666fa..14058fc938b 100644 --- a/src/promptflow/promptflow/_sdk/operations/_run_submitter.py +++ b/src/promptflow/promptflow/_sdk/operations/_run_submitter.py @@ -330,8 +330,7 @@ def _submit_bulk_run(self, flow: Flow, run: Run, local_storage: LocalStorageOper # persist snapshot and result # snapshot: flow directory and (mapped) inputs local_storage.dump_snapshot(flow) - local_storage.dump_inputs(mapped_inputs) - # result: outputs and metrics + # persist inputs, outputs and metrics local_storage.persist_result(bulk_result) # exceptions local_storage.dump_exception(exception=exception, bulk_results=bulk_result) diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py index 2546edaf478..c5615510098 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py @@ -745,3 +745,33 @@ def test_system_metrics_in_properties(self, pf) -> None: assert FlowRunProperties.SYSTEM_METRICS in run.properties assert isinstance(run.properties[FlowRunProperties.SYSTEM_METRICS], dict) assert "total_tokens" in run.properties[FlowRunProperties.SYSTEM_METRICS] + + def test_run_get_inputs(self, pf): + # inputs should be persisted when defaults are used + run = pf.run( + flow=f"{FLOWS_DIR}/default_input", + data=f"{DATAS_DIR}/webClassification1.jsonl", + ) + inputs = pf.runs._get_inputs(run=run) + assert inputs == {"line_number": [0], "question": ["input value from default"]} + + # inputs should be persisted when data value are used + run = pf.run( + flow=f"{FLOWS_DIR}/flow_with_dict_input", + data=f"{DATAS_DIR}/dictInput1.jsonl", + ) + inputs = pf.runs._get_inputs(run=run) + assert inputs == {"key": [{"key": "value in data"}], "line_number": [0]} + + # inputs should be persisted when column-mapping are used + run = pf.run( + flow=f"{FLOWS_DIR}/flow_with_dict_input", + data=f"{DATAS_DIR}/webClassification1.jsonl", + column_mapping={"key": {"value": "value in column-mapping"}, "url": "${data.url}"}, + ) + inputs = pf.runs._get_inputs(run=run) + assert inputs == { + "key": [{"value": "value in column-mapping"}], + "line_number": [0], + "url": ["https://www.youtube.com/watch?v=o5ZQyXaAv1g"], + } diff --git a/src/promptflow/tests/test_configs/datas/dictInput1.jsonl b/src/promptflow/tests/test_configs/datas/dictInput1.jsonl new file mode 100644 index 00000000000..4a9c0bcdbb7 --- /dev/null +++ b/src/promptflow/tests/test_configs/datas/dictInput1.jsonl @@ -0,0 +1 @@ +{"key": {"key": "value in data"}}