Skip to content

Commit

Permalink
[Internal][Executor] Add average execution time and estimated executi…
Browse files Browse the repository at this point in the history
…on time to bulk run logs (#692)

# Description

- Add average execution time and estimated execution time to bulk run
logs in executor.
- Fix bug in sdk: specify the run mode when submitting a bulk run in
sdk.
- Related issue: #519


![image](https://github.com/microsoft/promptflow/assets/111329184/f29798b6-c1ac-492f-9477-57ef75d37307)


# All Promptflow Contribution checklist:
- [X] **The pull request does not introduce [breaking changes].**
- [x] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [X] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [X] Title of the pull request is clear and informative.
- [X] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [X] Pull request includes test coverage for the included changes.
  • Loading branch information
PeiwenGaoMS authored Oct 10, 2023
1 parent cd55ffa commit b8453f0
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 42 deletions.
2 changes: 2 additions & 0 deletions src/promptflow/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
## 0.1.0b8 (Upcoming)

### Features Added
- [Executor] Add average execution time and estimated execution time to batch run logs

### Bugs Fixed
- **pf config set**:
- Fix bug for workspace `connection.provider=azureml` doesn't work as expected.
- [SDK/CLI] Fix the bug that using sdk/cli to submit batch run did not display the log correctly.

### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from promptflow.contracts.run_info import FlowRunInfo
from promptflow.contracts.run_info import RunInfo as NodeRunInfo
from promptflow.contracts.run_info import Status
from promptflow.contracts.run_mode import RunMode
from promptflow.executor.flow_executor import BulkResult
from promptflow.storage import AbstractRunStorage

Expand Down Expand Up @@ -172,11 +173,13 @@ class LocalStorageOperations(AbstractRunStorage):

LINE_NUMBER_WIDTH = 9

def __init__(self, run: Run, stream=False):
def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
self._run = run
self.path = self._prepare_folder(get_run_output_path(self._run))

self.logger = LoggerOperations(file_path=self.path / LocalStorageFilenames.LOG, stream=stream)
self.logger = LoggerOperations(
file_path=self.path / LocalStorageFilenames.LOG, stream=stream, run_mode=run_mode
)
# snapshot
self._snapshot_folder_path = self._prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
self._dag_path = self._snapshot_folder_path / LocalStorageFilenames.DAG
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/promptflow/_sdk/operations/_run_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from promptflow._utils.utils import reverse_transpose
from promptflow.contracts.flow import Flow as ExecutableFlow
from promptflow.contracts.run_info import Status
from promptflow.contracts.run_mode import RunMode
from promptflow.exceptions import UserErrorException
from promptflow.executor import FlowExecutor

Expand Down Expand Up @@ -266,7 +267,7 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):

# running specified variant
with variant_overwrite_context(run.flow, tuning_node, variant, connections=run.connections) as flow:
local_storage = LocalStorageOperations(run, stream=stream)
local_storage = LocalStorageOperations(run, stream=stream, run_mode=RunMode.Batch)
with local_storage.logger:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)

Expand Down
14 changes: 9 additions & 5 deletions src/promptflow/promptflow/_utils/logger_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,15 @@ def update_log_path(log_path: str, input_logger: logging.Logger = None):
if input_logger:
logger_list.append(input_logger)
for logger_ in logger_list:
for wrapper in logger_.handlers:
if isinstance(wrapper, FileHandlerConcurrentWrapper):
handler: FileHandler = wrapper.handler
if handler:
wrapper.handler = type(handler)(log_path, handler._formatter)
update_single_log_path(log_path, logger_)


def update_single_log_path(log_path: str, logger_: logging.Logger):
for wrapper in logger_.handlers:
if isinstance(wrapper, FileHandlerConcurrentWrapper):
handler: FileHandler = wrapper.handler
if handler:
wrapper.handler = type(handler)(log_path, handler._formatter)


def scrub_credentials(s: str):
Expand Down
16 changes: 14 additions & 2 deletions src/promptflow/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,22 @@ def count_and_log_progress(
yield item


def log_progress(logger: logging.Logger, count: int, total_count: int, formatter="{count} / {total_count} finished."):
def log_progress(
start_time: datetime,
logger: logging.Logger,
count: int,
total_count: int,
formatter="{count} / {total_count} finished.",
):
log_interval = max(int(total_count / 10), 1)
if count % log_interval == 0 or count == total_count:
if count > 0 and (count % log_interval == 0 or count == total_count):
average_execution_time = round((datetime.now().timestamp() - start_time.timestamp()) / count, 2)
estimated_execution_time = round(average_execution_time * (total_count - count), 2)
logger.info(formatter.format(count=count, total_count=total_count))
logger.info(
f"Average execution time for completed lines: {average_execution_time} seconds. "
f"Estimated time for incomplete lines: {estimated_execution_time} seconds."
)


def extract_user_frame_summaries(frame_summaries: List[traceback.FrameSummary]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def start_new(self, task_queue: Queue):
),
# Set the process as a daemon process to automatically terminated and release system resources
# when the main process exits.
daemon=True
daemon=True,
)

self.process = process
Expand Down Expand Up @@ -108,10 +108,12 @@ def format_current_process(self, line_number: int, is_completed=False):
process_pid = self.process.pid if self.process else None
if is_completed:
logger.info(
f"Process name: {process_name}, Process id: {process_pid}, Line number: {line_number} completed.")
f"Process name: {process_name}, Process id: {process_pid}, Line number: {line_number} completed."
)
else:
logger.info(
f"Process name: {process_name}, Process id: {process_pid}, Line number: {line_number} start execution.")
f"Process name: {process_name}, Process id: {process_pid}, Line number: {line_number} start execution."
)

return f"Process name({process_name})-Process id({process_pid})-Line number({line_number})"

Expand Down Expand Up @@ -236,6 +238,7 @@ def _timeout_process_wrapper(self, task_queue: Queue, idx: int, timeout_time, re

self._processing_idx.pop(line_number)
log_progress(
start_time=start_time,
logger=bulk_logger,
count=len(result_list),
total_count=self._nlines,
Expand Down
3 changes: 3 additions & 0 deletions src/promptflow/tests/executor/e2etests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def test_executor_logs(self, folder_name):
executor.exec_bulk(bulk_inputs)
log_content = load_content(bulk_run_log_path)
loggers_name_list = ["execution", "execution.bulk"]
# bulk logger will print the average execution time and estimated time
bulk_logs_keywords = ["Average execution time for completed lines", "Estimated time for incomplete lines"]
assert all(logger in log_content for logger in loggers_name_list)
assert all(keyword in log_content for keyword in bulk_logs_keywords)

@pytest.mark.parametrize(
"folder_name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
bulk_logger,
scrub_credentials,
update_log_path,
update_single_log_path,
)
from promptflow.contracts.run_mode import RunMode

Expand Down Expand Up @@ -213,6 +214,36 @@ def test_update_log_path(self):
keywords = ["test update log", "test update input log", "execution.bulk", "input_logger", "INFO", "WARNING"]
assert all(keyword in log for keyword in keywords)

def test_update_single_log_path(self):
log_handler = FileHandlerConcurrentWrapper()
input_logger = logging.getLogger("input_logger")
input_logger.addHandler(log_handler)

folder_path = Path(mkdtemp())
original_log_path = str(folder_path / "original_log.log")
with LogContext(original_log_path, input_logger=input_logger, run_mode=RunMode.Batch):
bulk_logger.info("test log")
input_logger.warning("test input log")
original_log = load_content(original_log_path)
keywords = ["test log", "test input log", "execution.bulk", "input_logger", "INFO", "WARNING"]
assert all(keyword in original_log for keyword in keywords)

# Update log path
bulk_log_path = str(folder_path / "update_bulk_log.log")
update_single_log_path(bulk_log_path, bulk_logger)
input_log_path = str(folder_path / "update_input_log.log")
update_single_log_path(input_log_path, input_logger)
bulk_logger.info("test update log")
input_logger.warning("test update input log")
bulk_log = load_content(bulk_log_path)
input_log = load_content(input_log_path)
bulk_keywords = ["test update log", "execution.bulk", "INFO"]
input_keywords = ["test update input log", "input_logger", "WARNING"]
assert all(keyword in bulk_log for keyword in bulk_keywords)
assert all(keyword not in bulk_log for keyword in input_keywords)
assert all(keyword in input_log for keyword in input_keywords)
assert all(keyword not in input_log for keyword in bulk_keywords)

def test_scrub_credentials(self):
log_content = "sig=signature&key=accountkey"
folder_path = Path(mkdtemp())
Expand Down
30 changes: 8 additions & 22 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,22 +861,8 @@ def test_flow_test_with_default_chat_history(self):
with open(detail_path, "r") as f:
details = json.load(f)
expect_chat_history = [
{
"inputs": {
"question": "hi"
},
"outputs": {
"answer": "hi"
}
},
{
"inputs": {
"question": "who are you"
},
"outputs": {
"answer": "who are you"
}
}
{"inputs": {"question": "hi"}, "outputs": {"answer": "hi"}},
{"inputs": {"question": "who are you"}, "outputs": {"answer": "who are you"}},
]
assert details["flow_runs"][0]["inputs"]["chat_history"] == expect_chat_history

Expand Down Expand Up @@ -1151,12 +1137,12 @@ def test_pf_run_with_stream_log(self):
"extra=${data.url}",
"--stream",
)
assert "user log" in f.getvalue()
assert "error log" in f.getvalue()
# flow logs will stream
assert "Executing node print_val. node run id:" in f.getvalue()
# executor logs will stream
assert "Node print_val completes." in f.getvalue()
logs = f.getvalue()
# For Batch run, the executor uses bulk logger to print logs, and only prints the error log of the nodes.
existing_keywords = ["execution", "execution.bulk", "WARNING", "error log"]
assert all([keyword in logs for keyword in existing_keywords])
non_existing_keywords = ["execution.flow", "user log"]
assert all([keyword not in logs for keyword in non_existing_keywords])

def test_pf_run_no_stream_log(self):
f = io.StringIO()
Expand Down
12 changes: 5 additions & 7 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,13 +667,11 @@ def test_run_logs(self, pf):
)
local_storage = LocalStorageOperations(run=run)
logs = local_storage.logger.get_logs()
assert "user log" in logs
# error logs can be stored
assert "error log" in logs
# flow logs can be stored
assert "Executing node print_val. node run id:" in logs
# executor logs can be stored
assert "Node print_val completes." in logs
# For Batch run, the executor uses bulk logger to print logs, and only prints the error log of the nodes.
existing_keywords = ["execution", "execution.bulk", "WARNING", "error log"]
assert all([keyword in logs for keyword in existing_keywords])
non_existing_keywords = ["execution.flow", "user log"]
assert all([keyword not in logs for keyword in non_existing_keywords])

def test_get_detail_against_partial_fail_run(self, pf: PFClient) -> None:
run = pf.run(
Expand Down

0 comments on commit b8453f0

Please sign in to comment.