Skip to content

Commit

Permalink
use forkserver
Browse files Browse the repository at this point in the history
  • Loading branch information
Hhhilulu committed Dec 4, 2023
1 parent 6596468 commit 8fd1d31
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
20 changes: 20 additions & 0 deletions src/promptflow/promptflow/executor/_line_execution_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
from promptflow.executor._errors import LineExecutionTimeoutError
from promptflow.executor._result import LineResult
from promptflow.executor.flow_executor import DEFAULT_CONCURRENCY_BULK, FlowExecutor
from ._preloaded_resolved_tools import preloaded_obj
from promptflow.storage import AbstractRunStorage

flow_file = None
connections = None
working_dir = None


def signal_handler(signum, frame):
signame = signal.Signals(signum).name
Expand Down Expand Up @@ -159,18 +164,31 @@ def __init__(
)
bulk_logger.info(f"Set start method to default {multiprocessing.get_start_method()}.")
multiprocessing_start_method = None
elif ((multiprocessing_start_method == "fork" or
multiprocessing.get_start_method() == "fork") and
"forkserver" in sys_start_methods):
bulk_logger.info(
"Current method is 'fork' and 'forkserver' is available. Set start method to 'forkserver'.")
multiprocessing_start_method = "forkserver"
self.context = get_multiprocessing_context(multiprocessing_start_method)
use_fork = self.context.get_start_method() == "fork"
# When using fork, we use this method to create the executor to avoid reloading the flow
# which will introduce a lot more memory.
if use_fork:
self._executor_creation_func = partial(create_executor_fork, flow_executor=flow_executor)
elif flow_executor._flow_file:
global flow_file
flow_file = flow_executor._flow_file
global connections
connections = flow_executor._connections
global working_dir
working_dir = flow_executor._working_dir
self._executor_creation_func = partial(
FlowExecutor.create,
flow_file=flow_executor._flow_file,
connections=flow_executor._connections,
working_dir=flow_executor._working_dir,
loaded_tools=preloaded_obj.tools,
raise_ex=False,
)
else: # Legacy flow executor, will be deprecated with the legacy pf portal.
Expand Down Expand Up @@ -362,6 +380,8 @@ def run(self, batch_inputs):
),
):
try:
if self.context.get_start_method() == "forkserver":
self.context.set_forkserver_preload(['_preloaded_resolved_tools'])
# The variable 'async_result' here is not the actual result of the batch run
# but an AsyncResult object that can be used to check if the execution are finished
# The actual results of the batch run are stored in 'result_list'
Expand Down
23 changes: 23 additions & 0 deletions src/promptflow/promptflow/executor/_preloaded_resolved_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from promptflow.contracts.flow import Flow
from promptflow.executor._tool_resolver import ToolResolver
from promptflow._utils.context_utils import _change_working_dir
from ._line_execution_process_pool import flow_file, connections, working_dir


class PreloadeResolvedTools:
def __init__(self, flow_file, connections, working_dir):
flow = Flow.from_yaml(flow_file, working_dir=working_dir)
working_dir = Flow._resolve_working_dir(flow_file, working_dir)
flow = flow._apply_default_node_variants()
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)

with _change_working_dir(working_dir):
self.tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]

@property
def tools(self):
return self.tools


preloaded_obj = PreloadeResolvedTools(flow_file, connections, working_dir)
22 changes: 15 additions & 7 deletions src/promptflow/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def create(
raise_ex: bool = True,
node_override: Optional[Dict[str, Dict[str, Any]]] = None,
line_timeout_sec: int = LINE_TIMEOUT_SEC,
loaded_tools: Optional[list] = None
) -> "FlowExecutor":
"""Create a new instance of FlowExecutor.
Expand Down Expand Up @@ -207,6 +208,7 @@ def create(
raise_ex=raise_ex,
node_override=node_override,
line_timeout_sec=line_timeout_sec,
loaded_tools=loaded_tools,
)

@classmethod
Expand All @@ -221,19 +223,25 @@ def _create_from_flow(
raise_ex: bool = True,
node_override: Optional[Dict[str, Dict[str, Any]]] = None,
line_timeout_sec: int = LINE_TIMEOUT_SEC,
loaded_tools: Optional[list] = None
):
working_dir = Flow._resolve_working_dir(flow_file, working_dir)
if node_override:
flow = flow._apply_node_overrides(node_override)
flow = flow._apply_default_node_variants()
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)
if loaded_tools:
flow = Flow(
flow.id, flow.name, [r.node for r in loaded_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
else:
package_tool_keys = [node.source.tool for node in flow.nodes if node.source and node.source.tool]
tool_resolver = ToolResolver(working_dir, connections, package_tool_keys)

with _change_working_dir(working_dir):
resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]
flow = Flow(
flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
with _change_working_dir(working_dir):
resolved_tools = [tool_resolver.resolve_tool_by_node(node) for node in flow.nodes]
flow = Flow(
flow.id, flow.name, [r.node for r in resolved_tools], inputs=flow.inputs, outputs=flow.outputs, tools=[]
)
# ensure_flow_valid including validation + resolve
# Todo: 1) split pure validation + resolve from below method 2) provide completed validation()
flow = FlowValidator._validate_nodes_topology(flow)
Expand Down

0 comments on commit 8fd1d31

Please sign in to comment.