Skip to content

Commit

Permalink
[PromptFlow][ProcessPool] Solve spawned fork process manager hangs fo…
Browse files Browse the repository at this point in the history
…r a long time when parameters are passed incorrectly. (#1839)

# Description

**Problem**
Currently, in fork mode, when spawn process raises an exception, the
process will hang and not raise the exception to main process. So we
need to solve the problem.

**Solution**

1. In fork mode, the spawn process is the bridge between the main and
fork processes. If the spawned process is no longer running, exit the
main process.
2. Add try-catch in the spawned process, if an exception occurred, log
the error message.
3. In the Monitor thread, if not get the process info within the 60s,
raise an exception.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
Hhhilulu authored Jan 29, 2024
1 parent 8812961 commit 072f995
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
7 changes: 7 additions & 0 deletions src/promptflow/promptflow/executor/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ def __init__(self, timeout):
super().__init__(message=f"Failed to get process info after {timeout} seconds", target=ErrorTarget.EXECUTOR)


class SpawnedForkProcessManagerStartFailure(SystemErrorException):
"""Exception raised when failed to start spawned fork process manager."""

def __init__(self):
super().__init__(message="Failed to start spawned fork process manager", target=ErrorTarget.EXECUTOR)


class EmptyLLMApiMapping(UserErrorException):
"""Exception raised when connection_type_to_api_mapping is empty and llm node provider can't be inferred"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def __enter__(self):
self._processes_manager = SpawnProcessManager(executor_creation_func, **common_kwargs)

self._processes_manager.start_processes()
self._processes_manager.ensure_healthy()

monitor_pool = ThreadPool(self._n_process, initializer=set_context, initargs=(contextvars.copy_context(),))
self._monitor_pool = monitor_pool
Expand All @@ -192,6 +193,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def _get_process_info(self, index):
start_time = time.time()
while True:
self._processes_manager.ensure_healthy()
try:
if time.time() - start_time > self._PROCESS_INFO_OBTAINED_TIMEOUT:
raise ProcessInfoObtainedTimeout(self._PROCESS_INFO_OBTAINED_TIMEOUT)
Expand All @@ -205,7 +207,7 @@ def _get_process_info(self, index):
time.sleep(1)
continue
except Exception as e:
bulk_logger.warning(f"Unexpected error occurred while get process info. Exception: {e}")
raise Exception(f"Unexpected error occurred while get process info. Exception: {e}")

def _ensure_process_terminated_within_timeout(self, process_id):
start_time = time.time()
Expand Down Expand Up @@ -244,6 +246,7 @@ def _monitor_workers_and_process_tasks_in_thread(
# 1. The task queue is not empty, meaning there are lines yet to be executed.
# 2. The batch run has not reached the batch timeout limit.
while not self._batch_timeout_expired(batch_start_time):
self._processes_manager.ensure_healthy()
try:
args = task_queue.get(timeout=1)
except queue.Empty:
Expand Down Expand Up @@ -495,7 +498,7 @@ def run(self, batch_inputs):
total_count=self._nlines,
)
last_log_count = current_result_count
# Check every 1 second
# Check every 1 second
async_result.wait(1)
# To ensure exceptions in thread-pool calls are propagated to the main process for proper handling
# The exceptions raised will be re-raised by the get() method.
Expand Down
33 changes: 32 additions & 1 deletion src/promptflow/promptflow/executor/_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from promptflow._core.operation_context import OperationContext
from promptflow._utils.logger_utils import LogContext, bulk_logger
from promptflow.executor._errors import SpawnedForkProcessManagerStartFailure
from promptflow.executor.flow_executor import FlowExecutor


Expand Down Expand Up @@ -52,7 +53,8 @@ def __init__(
output_queues: List[Queue],
process_info: dict,
process_target_func,
*args, **kwargs,
*args,
**kwargs,
) -> None:
self._input_queues = input_queues
self._output_queues = output_queues
Expand Down Expand Up @@ -89,6 +91,14 @@ def end_process(self, i):
"""
raise NotImplementedError("AbstractProcessManager is an abstract class, no implementation for end_process.")

def ensure_healthy(self):
"""
Checks the health of the managed processes.
This method should be implemented in subclasses to provide specific health check mechanisms.
"""
raise NotImplementedError("AbstractProcessManager is an abstract class, no implementation for end_process.")


class SpawnProcessManager(AbstractProcessManager):
"""
Expand Down Expand Up @@ -178,6 +188,16 @@ def end_process(self, i):
f"Exception: {e}"
)

def ensure_healthy(self):
"""
Checks the health of the managed processes.
Note:
Health checks for spawn mode processes are currently not performed.
Add detailed checks in this function if needed in the future.
"""
pass


class ForkProcessManager(AbstractProcessManager):
'''
Expand Down Expand Up @@ -225,6 +245,7 @@ def start_processes(self):
),
)
process.start()
self._spawned_fork_process_manager_pid = process.pid

def restart_process(self, i):
"""
Expand Down Expand Up @@ -253,6 +274,16 @@ def new_process(self, i):
"""
self._control_signal_queue.put((ProcessControlSignal.START, i))

def ensure_healthy(self):
# A 'zombie' process is a process that has finished running but still remains in
# the process table, waiting for its parent process to collect and handle its exit status.
# The normal state of the spawned process is 'running'. If the process does not start successfully
# or exit unexpectedly, its state will be 'zombie'.
if psutil.Process(self._spawned_fork_process_manager_pid).status() == "zombie":
bulk_logger.error("The spawned fork process manager failed to start.")
ex = SpawnedForkProcessManagerStartFailure()
raise ex


class SpawnedForkProcessManager(AbstractProcessManager):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import multiprocessing
import os
import sys
import uuid
from multiprocessing import Queue
from pathlib import Path
Expand All @@ -13,13 +14,15 @@
from promptflow.contracts.run_info import Status
from promptflow.exceptions import ErrorTarget, UserErrorException
from promptflow.executor import FlowExecutor
from promptflow.executor._errors import SpawnedForkProcessManagerStartFailure
from promptflow.executor._line_execution_process_pool import (
LineExecutionProcessPool,
_exec_line,
format_current_process_info,
get_available_max_worker_count,
log_process_status,
)
from promptflow.executor._process_manager import create_spawned_fork_process_manager
from promptflow.executor._result import LineResult

from ...utils import get_flow_sample_inputs, get_yaml_file
Expand Down Expand Up @@ -188,6 +191,10 @@ def not_set_environment_in_subprocess(dev_connections):
assert use_fork == (multiprocessing.get_start_method() == "fork")


def custom_create_spawned_fork_process_manager(*args, **kwargs):
create_spawned_fork_process_manager("test", *args, **kwargs)


@pytest.mark.unittest
class TestLineExecutionProcessPool:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -408,6 +415,33 @@ def test_process_not_set_environment_variable(self, dev_connections):
p.join()
assert p.exitcode == 0

@pytest.mark.skipif(sys.platform == "win32" or sys.platform == "darwin", reason="Only test on linux")
@pytest.mark.parametrize(
"flow_folder",
[
SAMPLE_FLOW,
],
)
@patch(
"promptflow.executor._process_manager.create_spawned_fork_process_manager",
custom_create_spawned_fork_process_manager,
)
def test_spawned_fork_process_manager_crashed_in_fork_mode(self, flow_folder, dev_connections):
executor = FlowExecutor.create(get_yaml_file(flow_folder), dev_connections)
run_id = str(uuid.uuid4())
bulk_inputs = get_bulk_inputs()
nlines = len(bulk_inputs)
run_id = run_id or str(uuid.uuid4())
with pytest.raises(SpawnedForkProcessManagerStartFailure) as e:
with LineExecutionProcessPool(
executor,
nlines,
run_id,
None,
) as pool:
pool.run(zip(range(nlines), bulk_inputs))
assert "Failed to start spawned fork process manager" in str(e.value)


class TestGetAvailableMaxWorkerCount:
@pytest.mark.parametrize(
Expand Down

0 comments on commit 072f995

Please sign in to comment.