Skip to content

Commit

Permalink
use forkserver to new process
Browse files Browse the repository at this point in the history
  • Loading branch information
Hhhilulu committed Dec 8, 2023
1 parent 40f6f64 commit b11be03
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from promptflow.executor._result import LineResult
from promptflow.executor.flow_executor import DEFAULT_CONCURRENCY_BULK, FlowExecutor
from promptflow.storage import AbstractRunStorage
from ._preloaded_resolved_tools import preloaded_obj
from . import _shared_vars


def signal_handler(signum, frame):
Expand Down Expand Up @@ -159,12 +161,31 @@ def __init__(
)
bulk_logger.info(f"Set start method to default {multiprocessing.get_start_method()}.")
multiprocessing_start_method = None
elif ((multiprocessing_start_method == "fork" or
multiprocessing.get_start_method() == "fork") and
"forkserver" in sys_start_methods):
bulk_logger.info(
"Current method is 'fork' and 'forkserver' is available. Set start method to 'forkserver'.")
multiprocessing_start_method = "forkserver"
self.context = get_multiprocessing_context(multiprocessing_start_method)
use_fork = self.context.get_start_method() == "fork"
use_forkserver = self.context.get_start_method() == "forkserver"
# When using fork, we use this method to create the executor to avoid reloading the flow
# which will introduce a lot more memory.
if use_fork:
self._executor_creation_func = partial(create_executor_fork, flow_executor=flow_executor)
elif use_forkserver:
_shared_vars.flow_file = flow_executor._flow_file
_shared_vars.connections = flow_executor._connections
_shared_vars.working_dir = flow_executor._working_dir

self._executor_creation_func = partial(
FlowExecutor.create,
flow_file=flow_executor._flow_file,
connections=flow_executor._connections,
working_dir=flow_executor._working_dir,
raise_ex=False,
)
elif flow_executor._flow_file:
self._executor_creation_func = partial(
FlowExecutor.create,
Expand Down Expand Up @@ -355,6 +376,8 @@ def run(self, batch_inputs):
),
):
try:
if self.context.get_start_method() == "forkserver":
self.context.set_forkserver_preload(['_preloaded_resolved_tools'])
# The variable 'async_result' here is not the actual result of the batch run
# but an AsyncResult object that can be used to check if the execution are finished
# The actual results of the batch run are stored in 'result_list'
Expand Down Expand Up @@ -518,7 +541,8 @@ def create_executor_fork(*, flow_executor: FlowExecutor, storage: AbstractRunSto

def exec_line_for_queue(executor_creation_func, input_queue: Queue, output_queue: Queue):
run_storage = QueueRunStorage(output_queue)
executor: FlowExecutor = executor_creation_func(storage=run_storage)
loaded_tools = preloaded_obj.tools
executor: FlowExecutor = executor_creation_func(storage=run_storage, loaded_tools=loaded_tools)

# Wait for the start signal message
input_queue.get()
Expand Down
26 changes: 26 additions & 0 deletions src/promptflow/promptflow/executor/_preloaded_resolved_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from promptflow.contracts.flow import Flow
from promptflow.executor._tool_resolver import ToolResolver
from promptflow._utils.context_utils import _change_working_dir
from ._shared_vars import flow_file, connections, working_dir


class PreloadeResolvedTools:
def __init__(self, flow_file, connections, working_dir):
if flow_file and connections and working_dir:
working_dir = Flow._resolve_working_dir(flow_file, working_dir)
flow = Flow.from_yaml(flow_file, working_dir=working_dir)
flow = flow._apply_default_node_variants()
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)

with _change_working_dir(working_dir):
self._tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]
else:
self._tools = None

@property
def tools(self):
return self._tools


preloaded_obj = PreloadeResolvedTools(flow_file, connections, working_dir)
3 changes: 3 additions & 0 deletions src/promptflow/promptflow/executor/_shared_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
flow_file = None
connections = None
working_dir = None
23 changes: 15 additions & 8 deletions src/promptflow/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def create(
raise_ex: bool = True,
node_override: Optional[Dict[str, Dict[str, Any]]] = None,
line_timeout_sec: int = LINE_TIMEOUT_SEC,
loaded_tools: Optional[list] = None
) -> "FlowExecutor":
"""Create a new instance of FlowExecutor.
Expand Down Expand Up @@ -193,6 +194,7 @@ def create(
raise_ex=raise_ex,
node_override=node_override,
line_timeout_sec=line_timeout_sec,
loaded_tools=loaded_tools
)

@classmethod
Expand All @@ -207,19 +209,24 @@ def _create_from_flow(
raise_ex: bool = True,
node_override: Optional[Dict[str, Dict[str, Any]]] = None,
line_timeout_sec: int = LINE_TIMEOUT_SEC,
loaded_tools: Optional[list] = None
):
working_dir = Flow._resolve_working_dir(flow_file, working_dir)
if node_override:
flow = flow._apply_node_overrides(node_override)
flow = flow._apply_default_node_variants()
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)

with _change_working_dir(working_dir):
resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]
flow = Flow(
flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
if loaded_tools:
flow = Flow(
flow.id, flow.name, [r.node for r in loaded_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
else:
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)
with _change_working_dir(working_dir):
resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]
flow = Flow(
flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
# ensure_flow_valid including validation + resolve
# Todo: 1) split pure validation + resolve from below method 2) provide completed validation()
flow = FlowValidator._validate_nodes_topology(flow)
Expand Down

0 comments on commit b11be03

Please sign in to comment.