Skip to content

Commit

Permalink
Add a simple check to ensure executor process is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
weedqian committed Sep 22, 2023
1 parent 9b9db37 commit ab8386a
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def _new_process(self):
input_queue = Queue()
output_queue = Queue()

# Create a new process and wait for it to be ready.
# Test if the process can receive the message.
input_queue.put("start")

current_log_context = LogContext.get_current()
process = Process(
target=_process_wrapper,
Expand All @@ -118,8 +123,19 @@ def _new_process(self):
current_log_context.get_initializer() if current_log_context else None,
OperationContext.get_instance().get_context_dict(),
),
daemon=True,
)
process.start()

try:
# Wait for the process to be ready.
ready_msg = output_queue.get(timeout=30)
assert ready_msg == "ready"
except queue.Empty:
logger.info(f"Sub process {process.pid} not ready.")
self.end_process(process)
return None, None, None

return process, input_queue, output_queue

def end_process(self, process):
Expand All @@ -128,6 +144,10 @@ def end_process(self, process):

def _timeout_process_wrapper(self, task_queue: Queue, idx: int, timeout_time, result_list):
process, input_queue, output_queue = self._new_process()
if process is None:
return
logger.info(f"TW Process {os.getpid()} created new process {process.pid}")

while True:
try:
args = task_queue.get(timeout=1)
Expand All @@ -139,7 +159,7 @@ def _timeout_process_wrapper(self, task_queue: Queue, idx: int, timeout_time, re
input_queue.put(args)
inputs, line_number, run_id = args[:3]

self._processing_idx[line_number] = process.name
self._processing_idx[line_number] = f"{process.name}::{process.pid}"

start_time = datetime.now()
completed = False
Expand Down Expand Up @@ -301,6 +321,7 @@ def _process_wrapper(
log_context_initialization_func,
operation_contexts_dict: dict,
):
logger.info(f"Sub process {os.getpid()} started")
OperationContext.get_instance().update(operation_contexts_dict) # Update the operation context for the new process.
if log_context_initialization_func:
with log_context_initialization_func():
Expand All @@ -326,6 +347,15 @@ def create_executor_fork(*, flow_executor: FlowExecutor, storage: AbstractRunSto
def exec_line_for_queue(executor_creation_func, input_queue: Queue, output_queue: Queue):
run_storage = QueueRunStorage(output_queue)
executor: FlowExecutor = executor_creation_func(storage=run_storage)

# Wait for the start signal
start_msg = input_queue.get()
logger.info(f"Sub process {os.getpid()} received start signal: {start_msg}")

# Send ready signal
output_queue.put("ready")
logger.info(f"Sub process {os.getpid()} sent ready signal")

while True:
try:
args = input_queue.get(1)
Expand Down

0 comments on commit ab8386a

Please sign in to comment.