Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine sdk resume test #2593

Merged
merged 15 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,76 @@ def test_run_resume(self, pf: PFClient, randstr: Callable[[str], str]):
assert run2.name == name2
assert run2._resume_from == run.name

def test_run_resume_token(self, pf: PFClient, randstr: Callable[[str], str], capfd: pytest.CaptureFixture):
name = "resume_from_run_with_llm_and_token"
try:
original_run = pf.runs.get(run=name)
except RunNotFoundError:
original_run = pf.run(
flow=f"{FLOWS_DIR}/web_classification_random_fail",
data=f"{FLOWS_DIR}/web_classification_random_fail/data.jsonl",
column_mapping={"url": "${data.url}"},
variant="${summarize_text_content.variant_0}",
name=name,
)
original_run = pf.runs.stream(run=name)
assert isinstance(original_run, Run)
assert original_run.name == name
original_token = original_run.properties["azureml.promptflow.total_tokens"]
assert original_run.status == "Completed"
# Since the data have 15 lines, we can assume the original run has succeeded lines in over 99% cases
original_details = pf.get_details(original_run)
original_success_count = len(original_details[original_details["outputs.category"].notnull()])

resume_name = randstr("name")
resume_run = pf.run(resume_from=original_run, name=resume_name)
resume_run = pf.runs.stream(run=resume_name)
assert isinstance(resume_run, Run)
assert resume_run.name == resume_name
assert resume_run._resume_from == original_run.name
resume_token = resume_run.properties["azureml.promptflow.total_tokens"]
assert int(original_token) < int(resume_token)

# assert skip in the log
out, _ = capfd.readouterr()
assert f"Skipped the execution of {original_success_count} existing results." in out

def test_run_resume_with_image_aggregation(
self, pf: PFClient, randstr: Callable[[str], str], capfd: pytest.CaptureFixture
):
name = "resume_from_run_with_image_and_aggregation_node"
try:
original_run = pf.runs.get(run=name)
except RunNotFoundError:
original_run = pf.run(
flow=f"{FLOWS_DIR}/eval_flow_with_image_resume_random_fail",
data=f"{FLOWS_DIR}/eval_flow_with_image_resume_random_fail/input_data",
column_mapping={"input_image": "${data.input_image}"},
name=name,
)
original_run = pf.runs.stream(run=name)
assert isinstance(original_run, Run)
assert original_run.name == name
assert original_run.status == "Completed"
# Since the data have 15 lines, we can assume the original run has succeeded lines in over 99% cases
original_details = pf.get_details(original_run)
original_success_count = len(original_details[original_details["outputs.output_image"].notnull()])

resume_name = randstr("name")
resume_run = pf.run(resume_from=original_run, name=resume_name)
resume_run = pf.runs.stream(run=resume_name)
assert isinstance(resume_run, Run)
assert resume_run.name == resume_name
assert resume_run._resume_from == original_run.name

original_metrics = pf.runs.get_metrics(run=name)
resume_metrics = pf.runs.get_metrics(run=resume_name)
assert original_metrics["image_count"] < resume_metrics["image_count"]

# assert skip in the log
out, _ = capfd.readouterr()
assert f"Skipped the execution of {original_success_count} existing results." in out

def test_run_bulk_from_yaml(self, pf, runtime: str, randstr: Callable[[str], str]):
run_id = randstr("run_id")
run = load_run(
Expand Down
165 changes: 156 additions & 9 deletions src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from promptflow._cli._pf.entry import main
from promptflow._constants import FLOW_FLEX_YAML, LINE_NUMBER_KEY, PF_USER_AGENT
from promptflow._core.metric_logger import add_metric_logger
from promptflow._sdk._constants import LOGGER_NAME, SCRUBBED_VALUE, ExperimentStatus
from promptflow._sdk._errors import RunNotFoundError
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
Expand Down Expand Up @@ -2157,7 +2158,7 @@ def test_pf_flow_test_single_node_with_detail(self, tmpdir):
path = Path(tmpdir) / filename
assert path.is_file()

def test_flow_run_resume_from(self, capfd, local_client) -> None:
def test_flow_run_resume_from(self, local_client) -> None:
run_id = str(uuid.uuid4())
# fetch std out
run_pf_command(
Expand All @@ -2170,8 +2171,13 @@ def test_flow_run_resume_from(self, capfd, local_client) -> None:
"--name",
run_id,
)
out, _ = capfd.readouterr()
assert "Completed" in out
original_run = local_client.runs.get(name=run_id)
assert original_run.status == "Completed"
output_path = os.path.join(original_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
original_output = [json.loads(line) for line in file]
# Since the data have 15 lines, we can assume the original run has succeeded lines in over 99% cases
original_success_count = len(original_output)

new_run_id = str(uuid.uuid4())
display_name = "test"
Expand All @@ -2189,12 +2195,22 @@ def test_flow_run_resume_from(self, capfd, local_client) -> None:
"tags.A=A",
"tags.B=B",
)
run = local_client.runs.get(name=new_run_id)
assert run.name == new_run_id
assert run.display_name == display_name
assert run.description == description
assert run.tags == {"A": "A", "B": "B"}
assert run._resume_from == run_id
resume_run = local_client.runs.get(name=new_run_id)
assert resume_run.name == new_run_id
assert resume_run.display_name == display_name
assert resume_run.description == description
assert resume_run.tags == {"A": "A", "B": "B"}
assert resume_run._resume_from == run_id
# assert new run resume from the original run
output_path = os.path.join(resume_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
resume_output = [json.loads(line) for line in file]
assert len(resume_output) == len(original_output)

log_path = os.path.join(resume_run.properties["output_path"], "logs.txt")
with open(log_path, "r") as file:
log_text = file.read()
assert f"Skipped the execution of {original_success_count} existing results." in log_text

def test_flow_run_resume_partially_failed_run(self, capfd, local_client) -> None:
run_id = str(uuid.uuid4())
Expand Down Expand Up @@ -2239,6 +2255,137 @@ def get_successful_lines(output_path):
)
run_id = new_run_id

def test_flow_run_resume_from_token(self, local_client) -> None:
run_id = str(uuid.uuid4())
# fetch std out
run_pf_command(
"run",
"create",
"--flow",
f"{FLOWS_DIR}/web_classification_random_fail",
"--data",
f"{FLOWS_DIR}/web_classification_random_fail/data.jsonl",
"--column-mapping",
"url='${data.url}'",
"--name",
run_id,
)
original_run = local_client.runs.get(name=run_id)
assert original_run.status == "Completed"
output_path = os.path.join(original_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
original_output = [json.loads(line) for line in file]
# Since the data have 15 lines, we can assume the original run has succeeded lines in over 99% cases
original_success_count = len(original_output)

new_run_id = str(uuid.uuid4())
display_name = "test"
description = "new description"
run_pf_command(
"run",
"create",
"--resume-from",
run_id,
"--name",
new_run_id,
"--set",
f"display_name={display_name}",
f"description={description}",
"tags.A=A",
"tags.B=B",
)
resume_run = local_client.runs.get(name=new_run_id)
assert resume_run.name == new_run_id
assert resume_run.display_name == display_name
assert resume_run.description == description
assert resume_run.tags == {"A": "A", "B": "B"}
assert resume_run._resume_from == run_id

# assert new run resume from the original run
output_path = os.path.join(resume_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
resume_output = [json.loads(line) for line in file]
assert len(resume_output) > len(original_output)

log_path = os.path.join(resume_run.properties["output_path"], "logs.txt")
with open(log_path, "r") as file:
log_text = file.read()
assert f"Skipped the execution of {original_success_count} existing results." in log_text

# assert new run consumes more token than the original run
assert (
original_run.properties["system_metrics"]["total_tokens"]
< resume_run.properties["system_metrics"]["total_tokens"]
)

def test_flow_run_resume_with_image_aggregation(self, local_client) -> None:
metrics = {}

def test_metric_logger(key, value):
metrics[key] = value

add_metric_logger(test_metric_logger)

run_id = str(uuid.uuid4())
# fetch std out
run_pf_command(
"run",
"create",
"--flow",
f"{FLOWS_DIR}/eval_flow_with_image_resume_random_fail",
"--data",
f"{FLOWS_DIR}/eval_flow_with_image_resume_random_fail/data.jsonl",
"--column-mapping",
"input_image='${data.input_image}'",
"--name",
run_id,
)
original_run = local_client.runs.get(name=run_id)
assert original_run.status == "Completed"
output_path = os.path.join(original_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
original_output = [json.loads(line) for line in file]
original_success_count = len(original_output)
original_image_count = metrics.get("image_count", None)

new_run_id = str(uuid.uuid4())
display_name = "test"
description = "new description"
run_pf_command(
"run",
"create",
"--resume-from",
run_id,
"--name",
new_run_id,
"--set",
f"display_name={display_name}",
f"description={description}",
"tags.A=A",
"tags.B=B",
)
resume_run = local_client.runs.get(name=new_run_id)
resume_image_count = metrics.get("image_count", None)
assert resume_run.name == new_run_id
assert resume_run.display_name == display_name
assert resume_run.description == description
assert resume_run.tags == {"A": "A", "B": "B"}
assert resume_run._resume_from == run_id

# assert new run resume from the original run
output_path = os.path.join(resume_run.properties["output_path"], "flow_outputs", "output.jsonl")
with open(output_path, "r") as file:
resume_output = [json.loads(line) for line in file]
assert len(resume_output) > len(original_output)

log_path = os.path.join(resume_run.properties["output_path"], "logs.txt")
with open(log_path, "r") as file:
log_text = file.read()
assert f"Skipped the execution of {original_success_count} existing results." in log_text

# assert aggregation node works
assert original_image_count < resume_image_count

def test_flow_run_exclusive_param(self, capfd) -> None:
# fetch std out
with pytest.raises(SystemExit):
Expand Down
Loading
Loading