diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index 8bb4803a0ede..094b1837c164 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -108,6 +108,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _new_process(self): input_queue = Queue() output_queue = Queue() + + # Create a new process and wait for it to be ready. + # Test if the process can receive the message. + input_queue.put("start") + current_log_context = LogContext.get_current() process = Process( target=_process_wrapper, @@ -118,8 +123,19 @@ def _new_process(self): current_log_context.get_initializer() if current_log_context else None, OperationContext.get_instance().get_context_dict(), ), + daemon=True, ) process.start() + + try: + # Wait for the process to be ready. + ready_msg = output_queue.get(timeout=30) + assert ready_msg == "ready" + except queue.Empty: + logger.info(f"Sub process {process.pid} not ready.") + self.end_process(process) + return None, None, None + return process, input_queue, output_queue def end_process(self, process): @@ -128,6 +144,10 @@ def end_process(self, process): def _timeout_process_wrapper(self, task_queue: Queue, idx: int, timeout_time, result_list): process, input_queue, output_queue = self._new_process() + if process is None: + return + logger.info(f"TW Process {os.getpid()} created new process {process.pid}") + while True: try: args = task_queue.get(timeout=1) @@ -139,7 +159,7 @@ def _timeout_process_wrapper(self, task_queue: Queue, idx: int, timeout_time, re input_queue.put(args) inputs, line_number, run_id = args[:3] - self._processing_idx[line_number] = process.name + self._processing_idx[line_number] = f"{process.name}::{process.pid}" start_time = datetime.now() completed = False @@ -301,6 +321,7 @@ def _process_wrapper( log_context_initialization_func, operation_contexts_dict: dict, ): + logger.info(f"Sub process {os.getpid()} started") OperationContext.get_instance().update(operation_contexts_dict) # Update the operation context for the new process. if log_context_initialization_func: with log_context_initialization_func(): @@ -326,6 +347,15 @@ def create_executor_fork(*, flow_executor: FlowExecutor, storage: AbstractRunSto def exec_line_for_queue(executor_creation_func, input_queue: Queue, output_queue: Queue): run_storage = QueueRunStorage(output_queue) executor: FlowExecutor = executor_creation_func(storage=run_storage) + + # Wait for the start signal + start_msg = input_queue.get() + logger.info(f"Sub process {os.getpid()} received start signal: {start_msg}") + + # Send ready signal + output_queue.put("ready") + logger.info(f"Sub process {os.getpid()} sent ready signal") + while True: try: args = input_queue.get(1)