diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index 1da653b4a49..29153d9dc64 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -31,6 +31,8 @@ from promptflow.executor._result import LineResult from promptflow.executor.flow_executor import DEFAULT_CONCURRENCY_BULK, FlowExecutor from promptflow.storage import AbstractRunStorage +from ._preloaded_resolved_tools import preloaded_obj +from . import _shared_vars def signal_handler(signum, frame): @@ -159,12 +161,31 @@ def __init__( ) bulk_logger.info(f"Set start method to default {multiprocessing.get_start_method()}.") multiprocessing_start_method = None + elif ((multiprocessing_start_method == "fork" or + multiprocessing.get_start_method() == "fork") and + "forkserver" in sys_start_methods): + bulk_logger.info( + "Current method is 'fork' and 'forkserver' is available. Set start method to 'forkserver'.") + multiprocessing_start_method = "forkserver" self.context = get_multiprocessing_context(multiprocessing_start_method) use_fork = self.context.get_start_method() == "fork" + use_forkserver = self.context.get_start_method() == "forkserver" # When using fork, we use this method to create the executor to avoid reloading the flow # which will introduce a lot more memory. if use_fork: self._executor_creation_func = partial(create_executor_fork, flow_executor=flow_executor) + elif use_forkserver: + _shared_vars.flow_file = flow_executor._flow_file + _shared_vars.connections = flow_executor._connections + _shared_vars.working_dir = flow_executor._working_dir + + self._executor_creation_func = partial( + FlowExecutor.create, + flow_file=flow_executor._flow_file, + connections=flow_executor._connections, + working_dir=flow_executor._working_dir, + raise_ex=False, + ) elif flow_executor._flow_file: self._executor_creation_func = partial( FlowExecutor.create, @@ -362,6 +383,8 @@ def run(self, batch_inputs): ), ): try: + if self.context.get_start_method() == "forkserver": + self.context.set_forkserver_preload(['_preloaded_resolved_tools']) # The variable 'async_result' here is not the actual result of the batch run # but an AsyncResult object that can be used to check if the execution are finished # The actual results of the batch run are stored in 'result_list' @@ -489,7 +512,8 @@ def create_executor_fork(*, flow_executor: FlowExecutor, storage: AbstractRunSto def exec_line_for_queue(executor_creation_func, input_queue: Queue, output_queue: Queue): run_storage = QueueRunStorage(output_queue) - executor: FlowExecutor = executor_creation_func(storage=run_storage) + loaded_tools = preloaded_obj.tools + executor: FlowExecutor = executor_creation_func(storage=run_storage, loaded_tools=loaded_tools) # Wait for the start signal message input_queue.get() diff --git a/src/promptflow/promptflow/executor/_preloaded_resolved_tools.py b/src/promptflow/promptflow/executor/_preloaded_resolved_tools.py new file mode 100644 index 00000000000..06e82a9c4e4 --- /dev/null +++ b/src/promptflow/promptflow/executor/_preloaded_resolved_tools.py @@ -0,0 +1,26 @@ +from promptflow.contracts.flow import Flow +from promptflow.executor._tool_resolver import ToolResolver +from promptflow._utils.context_utils import _change_working_dir +from ._shared_vars import flow_file, connections, working_dir + + +class PreloadeResolvedTools: + def __init__(self, flow_file, connections, working_dir): + if flow_file and connections and working_dir: + working_dir = Flow._resolve_working_dir(flow_file, working_dir) + flow = Flow.from_yaml(flow_file, working_dir=working_dir) + flow = flow._apply_default_node_variants() + package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool] + tool_resolver = ToolResolver(working_dir, connections, package_tool_keys) + + with _change_working_dir(working_dir): + self._tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes] + else: + self._tools = None + + @property + def tools(self): + return self._tools + + +preloaded_obj = PreloadeResolvedTools(flow_file, connections, working_dir) diff --git a/src/promptflow/promptflow/executor/_shared_vars.py b/src/promptflow/promptflow/executor/_shared_vars.py new file mode 100644 index 00000000000..c8178b6b456 --- /dev/null +++ b/src/promptflow/promptflow/executor/_shared_vars.py @@ -0,0 +1,3 @@ +flow_file = None +connections = None +working_dir = None diff --git a/src/promptflow/promptflow/executor/flow_executor.py b/src/promptflow/promptflow/executor/flow_executor.py index 4160058ce16..b82de26f42d 100644 --- a/src/promptflow/promptflow/executor/flow_executor.py +++ b/src/promptflow/promptflow/executor/flow_executor.py @@ -177,6 +177,7 @@ def create( raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: int = LINE_TIMEOUT_SEC, + loaded_tools: Optional[list] = None ) -> "FlowExecutor": """Create a new instance of FlowExecutor. @@ -207,6 +208,7 @@ def create( raise_ex=raise_ex, node_override=node_override, line_timeout_sec=line_timeout_sec, + loaded_tools=loaded_tools ) @classmethod @@ -221,19 +223,24 @@ def _create_from_flow( raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: int = LINE_TIMEOUT_SEC, + loaded_tools: Optional[list] = None ): working_dir = Flow._resolve_working_dir(flow_file, working_dir) if node_override: flow = flow._apply_node_overrides(node_override) flow = flow._apply_default_node_variants() - package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool] - tool_resolver = ToolResolver(working_dir, connections, package_tool_keys) - - with _change_working_dir(working_dir): - resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes] - flow = Flow( - flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[] - ) + if loaded_tools: + flow = Flow( + flow.id, flow.name, [r.node for r in loaded_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[] + ) + else: + package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool] + tool_resolver = ToolResolver(working_dir, connections, package_tool_keys) + with _change_working_dir(working_dir): + resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes] + flow = Flow( + flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[] + ) # ensure_flow_valid including validation + resolve # Todo: 1) split pure validation + resolve from below method 2) provide completed validation() flow = FlowValidator._validate_nodes_topology(flow)