Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[promptflow][Release] 1.0.0 release branch merge back #1064

Merged
merged 17 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions examples/flows/chat/chat-with-pdf/tests/chat_with_pdf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import unittest
import promptflow
from base_test import BaseTest
from promptflow.exceptions import ValidationException


class TestChatWithPDF(BaseTest):
Expand Down Expand Up @@ -72,28 +71,26 @@ def test_bulk_run_valid_mapping(self):
self.assertEqual(details.shape[0], 3)

def test_bulk_run_mapping_missing_one_column(self):
# in this case, run won't be created because the question column is missed in the data
data_path = os.path.join(
self.flow_path, "data/invalid-data-missing-column.jsonl"
)
with self.assertRaises(ValidationException):
self.create_chat_run(
column_mapping={
"question": "${data.question}",
},
data=data_path
)
run = self.create_chat_run(
column_mapping={
"question": "${data.question}",
},
data=data_path
)
self.assertEqual(run.status, "Failed")

def test_bulk_run_invalid_mapping(self):
# in this case, run won't be created.
with self.assertRaises(ValidationException):
self.create_chat_run(
column_mapping={
"question": "${data.question_not_exist}",
"pdf_url": "${data.pdf_url}",
"chat_history": "${data.chat_history}",
}
)
run = self.create_chat_run(
column_mapping={
"question": "${data.question_not_exist}",
"pdf_url": "${data.pdf_url}",
"chat_history": "${data.chat_history}",
}
)
self.assertEqual(run.status, "Failed")


if __name__ == "__main__":
Expand Down
7 changes: 6 additions & 1 deletion src/promptflow/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 0.1.0b9 (Upcoming)
## 1.0.0 (2023.11.09)

### Features Added

Expand All @@ -11,6 +11,11 @@
### Bugs Fixed

- [SDK/CLI] Keep original format in run output.jsonl.
- [Executor] Fix the bug that raise an error when an aggregation node references a bypassed node

### Improvements

- [Executor] Set the outputs of the bypassed nodes as None

## 0.1.0b8 (2023.10.26)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,9 @@ def load_outputs(self) -> RunOutputs:
df = pd.read_json(f, orient="records", lines=True)
return df.to_dict("list")

# get total number of line runs from inputs
num_line_runs = len(list(self.load_inputs().values())[0])
with open(self._outputs_path, mode="r", encoding=DEFAULT_ENCODING) as f:
df = pd.read_json(f, orient="records", lines=True)
# if all line runs are failed, no need to fill
if len(df) > 0:
df = self._outputs_padding(df, num_line_runs)
df.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt
df = df.set_index(LINE_NUMBER)
return df.to_dict("list")

Expand Down Expand Up @@ -429,14 +424,14 @@ def _prepare_folder(path: Union[str, Path]) -> Path:
return path

@staticmethod
def _outputs_padding(df: pd.DataFrame, expected_rows: int) -> pd.DataFrame:
def _outputs_padding(df: pd.DataFrame, inputs_line_numbers: List[int]) -> pd.DataFrame:
if len(df) == len(inputs_line_numbers):
return df
missing_lines = []
lines_set = set(df[LINE_NUMBER].values)
for i in range(expected_rows):
for i in inputs_line_numbers:
if i not in lines_set:
missing_lines.append({LINE_NUMBER: i})
if len(missing_lines) == 0:
return df
df_to_append = pd.DataFrame(missing_lines)
res = pd.concat([df, df_to_append], ignore_index=True)
res = res.sort_values(by=LINE_NUMBER, ascending=True)
Expand All @@ -452,7 +447,7 @@ def load_inputs_and_outputs(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
outputs = pd.read_json(f, orient="records", lines=True)
# if all line runs are failed, no need to fill
if len(outputs) > 0:
outputs = self._outputs_padding(outputs, len(inputs))
outputs = self._outputs_padding(outputs, inputs["line_number"].tolist())
outputs.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt
outputs = outputs.set_index(LINE_NUMBER)
return inputs, outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from promptflow._sdk._errors import InvalidRunStatusError, RunExistsError, RunNotFoundError, RunOperationParameterError
from promptflow._sdk._orm import RunInfo as ORMRun
from promptflow._sdk._utils import incremental_print, safe_parse_object_list
from promptflow._sdk._utils import incremental_print, print_red_error, safe_parse_object_list
from promptflow._sdk._visualize_functions import dump_html, generate_html_string
from promptflow._sdk.entities import Run
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
Expand Down Expand Up @@ -140,7 +140,10 @@ def stream(self, name: Union[str, Run]) -> Run:
available_logs = local_storage.logger.get_logs()
incremental_print(available_logs, printed, file_handler)
self._print_run_summary(run)
# won't print error here, put it in run dict
# print error message when run is failed
if run.status == RunStatus.FAILED:
error_message = local_storage.load_exception()["message"]
print_red_error(error_message)
except KeyboardInterrupt:
error_message = "The output streaming for the run was interrupted, but the run is still executing."
print(error_message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import sys
import time
from io import StringIO
Expand Down Expand Up @@ -29,12 +30,13 @@ def test_context_manager(self):
log_message_function=generate_elapsed_time_messages,
args=("Test", start_time, interval_seconds, None),
):
time.sleep(10)
time.sleep(10.5)
logs = s.getvalue().split("\n")
for i in range(1, 10):
assert (
logs[i - 1]
== f"Test has been running for {i} seconds, "
+ "thread None cannot be found in sys._current_frames, "
+ "maybe it has been terminated due to unexpected errors."
)
logs = [log for log in logs if log]
log_pattern = re.compile(
r"^Test has been running for [0-9]+ seconds, thread None cannot be found in sys._current_frames, "
r"maybe it has been terminated due to unexpected errors.$"
)
assert logs, "Logs are empty."
for log in logs:
assert re.match(log_pattern, log), f"The wrong log: {log}"
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def test_resolve_tool_by_node_with_duplicated_inputs(self, resolver, mocker):
assert isinstance(exec_info.value.inner_exception, NodeInputValidationError)
assert "These inputs are duplicated" in exec_info.value.message

@pytest.mark.skipif(
condition=(sys.version_info.major == 3 and sys.version_info.minor == 11),
reason="BUG 2709800: known issue on enum in Python 3.11",
)
def test_ensure_node_inputs_type(self):
# Case 1: conn_name not in connections, should raise conn_name not found error
tool = Tool(name="mock", type="python", inputs={"conn": InputDefinition(type=["CustomConnection"])})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def end_process(healthy_ensured_process):
class TestHealthyEnsuredProcess:

def test_healthy_ensured_process(self):
context = get_multiprocessing_context("fork")
context = get_multiprocessing_context("spawn")
healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context)
assert healthy_ensured_process.is_ready is False
task_queue = Queue()
Expand All @@ -38,7 +38,7 @@ def test_healthy_ensured_process(self):
assert healthy_ensured_process.process.is_alive() is False

def test_unhealthy_process(self):
context = get_multiprocessing_context("fork")
context = get_multiprocessing_context("spawn")
healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func_timeout, context)
assert healthy_ensured_process.is_ready is False
task_queue = Queue()
Expand All @@ -49,7 +49,7 @@ def test_unhealthy_process(self):
assert healthy_ensured_process.process.is_alive() is False

def test_format_current_process(self):
context = get_multiprocessing_context("fork")
context = get_multiprocessing_context("spawn")
healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context)
healthy_ensured_process.process = patch(
'promptflow.executor._line_execution_process_pool.Process', autospec=True)
Expand All @@ -66,7 +66,7 @@ def test_format_current_process(self):

@patch('promptflow.executor._line_execution_process_pool.logger.info', autospec=True)
def test_format_completed_process(self, mock_logger_info):
context = get_multiprocessing_context("fork")
context = get_multiprocessing_context("spawn")
healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context)
healthy_ensured_process.process = patch(
'promptflow.executor._line_execution_process_pool.Process', autospec=True)
Expand Down
4 changes: 2 additions & 2 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,13 @@ def test_pf_flow_test_with_non_english_input_output(self, capsys):
stdout, _ = capsys.readouterr()
output_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.output.json"
assert output_path.exists()
with open(output_path, "r") as f:
with open(output_path, "r", encoding="utf-8") as f:
outputs = json.load(f)
assert outputs["answer"] in json.loads(stdout)["answer"]

detail_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.detail.json"
assert detail_path.exists()
with open(detail_path, "r") as f:
with open(detail_path, "r", encoding="utf-8") as f:
detail = json.load(f)
assert detail["flow_runs"][0]["inputs"]["question"] == question
assert detail["flow_runs"][0]["output"]["answer"] == outputs["answer"]
Expand Down
42 changes: 26 additions & 16 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ def create_run_against_run(client, run: Run) -> Run:
)


def assert_run_with_invalid_column_mapping(client: PFClient, run: Run, capfd: pytest.CaptureFixture) -> None:
assert run.status == RunStatus.FAILED

expected_error_message = "The input for batch run is incorrect. Couldn't find these mapping relations"

client.stream(run.name)
out, _ = capfd.readouterr()
assert expected_error_message in out

local_storage = LocalStorageOperations(run)
assert os.path.exists(local_storage._exception_path)

exception = local_storage.load_exception()
assert expected_error_message in exception["message"]
assert exception["code"] == "BulkRunException"


@pytest.mark.usefixtures("use_secrets_config_file", "setup_local_connection", "install_custom_tool_pkg")
@pytest.mark.sdk_test
@pytest.mark.e2etest
Expand Down Expand Up @@ -321,7 +338,7 @@ def test_run_reference_failed_run(self, pf):
with pytest.raises(RunNotFoundError):
pf.runs.get(name=run_name)

def test_referenced_output_not_exist(self, pf):
def test_referenced_output_not_exist(self, pf: PFClient, capfd: pytest.CaptureFixture) -> None:
# failed run won't generate output
failed_run = pf.run(
flow=f"{FLOWS_DIR}/failed_flow",
Expand All @@ -336,13 +353,7 @@ def test_referenced_output_not_exist(self, pf):
flow=f"{FLOWS_DIR}/failed_flow",
column_mapping={"text": "${run.outputs.text}"},
)

local_storage = LocalStorageOperations(run)
assert os.path.exists(local_storage._exception_path)

exception = local_storage.load_exception()
assert "The input for batch run is incorrect. Couldn't find these mapping relations" in exception["message"]
assert exception["code"] == "BulkRunException"
assert_run_with_invalid_column_mapping(pf, run, capfd)

def test_connection_overwrite_file(self, local_client, local_aoai_connection):
run = create_yaml_run(
Expand Down Expand Up @@ -650,7 +661,12 @@ def test_flow_bulk_run_with_additional_includes(self, azure_open_ai_connection:
additional_includes = _get_additional_includes(snapshot_path / "flow.dag.yaml")
assert not additional_includes

def test_input_mapping_source_not_found_error(self, azure_open_ai_connection: AzureOpenAIConnection, pf):
def test_input_mapping_source_not_found_error(
self,
azure_open_ai_connection: AzureOpenAIConnection,
pf: PFClient,
capfd: pytest.CaptureFixture,
):
# input_mapping source not found error won't create run
name = str(uuid.uuid4())
data_path = f"{DATAS_DIR}/webClassification3.jsonl"
Expand All @@ -660,13 +676,7 @@ def test_input_mapping_source_not_found_error(self, azure_open_ai_connection: Az
column_mapping={"not_exist": "${data.not_exist_key}"},
name=name,
)

local_storage = LocalStorageOperations(run)
assert os.path.exists(local_storage._exception_path)

exception = local_storage.load_exception()
assert "The input for batch run is incorrect. Couldn't find these mapping relations" in exception["message"]
assert exception["code"] == "BulkRunException"
assert_run_with_invalid_column_mapping(pf, run, capfd)

def test_input_mapping_with_dict(self, azure_open_ai_connection: AzureOpenAIConnection, pf):
data_path = f"{DATAS_DIR}/webClassification3.jsonl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ def test_outputs_padding(self) -> None:
{LINE_NUMBER: 2, "col": "b"},
]
df = pd.DataFrame(data)
expected_rows = 5
df_with_padding = LocalStorageOperations._outputs_padding(df, expected_rows)

df_with_padding = LocalStorageOperations._outputs_padding(df, inputs_line_numbers=[0, 1, 2, 3, 4])
df_with_padding.fillna("", inplace=True)
assert len(df_with_padding) == expected_rows
assert len(df_with_padding) == 5
assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 0, "col": ""}
assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 1, "col": "a"}
assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 2, "col": "b"}
assert df_with_padding.iloc[3].to_dict() == {LINE_NUMBER: 3, "col": ""}
assert df_with_padding.iloc[4].to_dict() == {LINE_NUMBER: 4, "col": ""}

# in evaluation run, inputs may not have all line number
df_with_padding = LocalStorageOperations._outputs_padding(df, inputs_line_numbers=[1, 2, 4])
df_with_padding.fillna("", inplace=True)
assert len(df_with_padding) == 3
assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 1, "col": "a"}
assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 2, "col": "b"}
assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 4, "col": ""}
Loading