Skip to content

Commit

Permalink
persist input from run results
Browse files Browse the repository at this point in the history
  • Loading branch information
D-W- committed Oct 23, 2023
1 parent 14a49a0 commit a3765c4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from promptflow.contracts.run_info import RunInfo as NodeRunInfo
from promptflow.contracts.run_info import Status
from promptflow.contracts.run_mode import RunMode
from promptflow.executor._result import LineResult
from promptflow.executor.flow_executor import BulkResult
from promptflow.storage import AbstractRunStorage

Expand Down Expand Up @@ -239,7 +240,14 @@ def load_io_spec(self) -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, s
flow_dag = yaml.safe_load(f)
return flow_dag["inputs"], flow_dag["outputs"]

def dump_inputs(self, inputs: RunInputs) -> None:
def dump_inputs(self, line_results: List[LineResult]) -> None:
inputs = []
for line_result in line_results:
try:
inputs.append(line_result.run_info.inputs)
except Exception:
# ignore when single line doesn't have inputs
pass
df = pd.DataFrame(inputs)
with open(self._inputs_path, mode="w", encoding=DEFAULT_ENCODING) as f:
# policy: http://policheck.azurewebsites.net/Pages/TermInfo.aspx?LCID=9&TermID=203588
Expand Down Expand Up @@ -389,6 +397,7 @@ def persist_result(self, result: Optional[BulkResult]) -> None:
return
self.dump_outputs(result.outputs)
self.dump_metrics(result.metrics)
self.dump_inputs(result.line_results)

@staticmethod
def _prepare_folder(path: Union[str, Path]) -> Path:
Expand Down
3 changes: 1 addition & 2 deletions src/promptflow/promptflow/_sdk/operations/_run_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ def _submit_bulk_run(self, flow: Flow, run: Run, local_storage: LocalStorageOper
# persist snapshot and result
# snapshot: flow directory and (mapped) inputs
local_storage.dump_snapshot(flow)
local_storage.dump_inputs(mapped_inputs)
# result: outputs and metrics
# persist inputs, outputs and metrics
local_storage.persist_result(bulk_result)
# exceptions
local_storage.dump_exception(exception=exception, bulk_results=bulk_result)
Expand Down
30 changes: 30 additions & 0 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,3 +745,33 @@ def test_system_metrics_in_properties(self, pf) -> None:
assert FlowRunProperties.SYSTEM_METRICS in run.properties
assert isinstance(run.properties[FlowRunProperties.SYSTEM_METRICS], dict)
assert "total_tokens" in run.properties[FlowRunProperties.SYSTEM_METRICS]

def test_run_get_inputs(self, pf):
# inputs should be persisted when defaults are used
run = pf.run(
flow=f"{FLOWS_DIR}/default_input",
data=f"{DATAS_DIR}/webClassification1.jsonl",
)
inputs = pf.runs._get_inputs(run=run)
assert inputs == {"line_number": [0], "question": ["input value from default"]}

# inputs should be persisted when data value are used
run = pf.run(
flow=f"{FLOWS_DIR}/flow_with_dict_input",
data=f"{DATAS_DIR}/dictInput1.jsonl",
)
inputs = pf.runs._get_inputs(run=run)
assert inputs == {"key": [{"key": "value in data"}], "line_number": [0]}

# inputs should be persisted when column-mapping are used
run = pf.run(
flow=f"{FLOWS_DIR}/flow_with_dict_input",
data=f"{DATAS_DIR}/webClassification1.jsonl",
column_mapping={"key": {"value": "value in column-mapping"}, "url": "${data.url}"},
)
inputs = pf.runs._get_inputs(run=run)
assert inputs == {
"key": [{"value": "value in column-mapping"}],
"line_number": [0],
"url": ["https://www.youtube.com/watch?v=o5ZQyXaAv1g"],
}
1 change: 1 addition & 0 deletions src/promptflow/tests/test_configs/datas/dictInput1.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"key": {"key": "value in data"}}

0 comments on commit a3765c4

Please sign in to comment.