diff --git a/src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py b/src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py new file mode 100644 index 00000000000..48ad9c51ee7 --- /dev/null +++ b/src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py @@ -0,0 +1,102 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import json +from pathlib import Path +from typing import Any, List, Mapping, Optional + +from promptflow._core._errors import MetaFileNotFound, MetaFileReadError +from promptflow._sdk._constants import DEFAULT_ENCODING, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME +from promptflow.batch._base_executor_proxy import APIBasedExecutorProxy +from promptflow.executor._result import AggregationResult + +EXECUTOR_SERVICE_DLL = "Promptflow.dll" + + +class CSharpBaseExecutorProxy(APIBasedExecutorProxy): + """Base class for csharp executor proxy for local and runtime.""" + + def __init__( + self, + *, + working_dir: Path, + ): + self._working_dir = working_dir + + @property + def working_dir(self) -> Path: + return self._working_dir + + def _get_flow_meta(self) -> dict: + # TODO: this should be got from flow.json for all languages by default? If so, we need to promote working_dir + # to be a required parameter in the super constructor. + flow_meta_json_path = self.working_dir / ".promptflow" / "flow.json" + if not flow_meta_json_path.is_file(): + raise MetaFileNotFound( + message_format=( + # TODO: pf flow validate should be able to generate flow.json + "Failed to fetch meta of inputs: cannot find {file_path}, please retry." + ), + file_path=flow_meta_json_path.absolute().as_posix(), + ) + + with open(flow_meta_json_path, mode="r", encoding=DEFAULT_ENCODING) as flow_meta_json_path: + return json.load(flow_meta_json_path) + + async def exec_aggregation_async( + self, + batch_inputs: Mapping[str, Any], + aggregation_inputs: Mapping[str, Any], + run_id: Optional[str] = None, + ) -> AggregationResult: + # TODO: aggregation is not supported for now? + return AggregationResult({}, {}, {}) + + @classmethod + def _construct_service_startup_command( + cls, + port, + log_path, + error_file_path, + yaml_path: str = "flow.dag.yaml", + log_level: str = "Warning", + assembly_folder: str = ".", + ) -> List[str]: + return [ + "dotnet", + EXECUTOR_SERVICE_DLL, + "--execution_service", + "--port", + port, + "--yaml_path", + yaml_path, + "--assembly_folder", + assembly_folder, + "--log_path", + log_path, + "--log_level", + log_level, + "--error_file_path", + error_file_path, + ] + + @classmethod + def _get_tool_metadata(cls, flow_file: Path, working_dir: Path) -> dict: + # TODO: this should be got from flow.tools.json for all languages by default? If so, + # we need to promote working_dir to be a required parameter in the super constructor. + flow_tools_json_path = working_dir / PROMPT_FLOW_DIR_NAME / FLOW_TOOLS_JSON + if flow_tools_json_path.is_file(): + with open(flow_tools_json_path, mode="r", encoding=DEFAULT_ENCODING) as f: + try: + return json.load(f) + except json.JSONDecodeError: + raise MetaFileReadError( + message_format="Failed to fetch meta of tools: {file_path} is not a valid json file.", + file_path=flow_tools_json_path.absolute().as_posix(), + ) + raise MetaFileNotFound( + message_format=( + "Failed to fetch meta of tools: cannot find {file_path}, please build the flow project first." + ), + file_path=flow_tools_json_path.absolute().as_posix(), + ) diff --git a/src/promptflow/promptflow/batch/_csharp_executor_proxy.py b/src/promptflow/promptflow/batch/_csharp_executor_proxy.py index cfe9c540557..c2b3c314a4c 100644 --- a/src/promptflow/promptflow/batch/_csharp_executor_proxy.py +++ b/src/promptflow/promptflow/batch/_csharp_executor_proxy.py @@ -1,54 +1,44 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -import json import os import socket import subprocess import tempfile import uuid from pathlib import Path -from typing import Any, Mapping, Optional +from typing import Optional -from promptflow._core._errors import MetaFileNotFound, MetaFileReadError, UnexpectedError -from promptflow._sdk._constants import DEFAULT_ENCODING, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME +from promptflow._core._errors import UnexpectedError +from promptflow._sdk._constants import DEFAULT_ENCODING from promptflow._utils.yaml_utils import dump_yaml -from promptflow.batch._base_executor_proxy import APIBasedExecutorProxy -from promptflow.executor._result import AggregationResult +from promptflow.batch._csharp_base_executor_proxy import CSharpBaseExecutorProxy from promptflow.storage._run_storage import AbstractRunStorage EXECUTOR_SERVICE_DOMAIN = "http://localhost:" EXECUTOR_SERVICE_DLL = "Promptflow.dll" -class CSharpExecutorProxy(APIBasedExecutorProxy): +class CSharpExecutorProxy(CSharpBaseExecutorProxy): def __init__( - self, *, process: subprocess.Popen, port: str, working_dir: Path, temp_dag_file: Optional[Path] = None + self, + *, + process: subprocess.Popen, + port: str, + working_dir: Path, + temp_dag_file: Optional[Path] = None, ): self._process = process self._port = port - self._working_dir = working_dir + super().__init__( + working_dir=working_dir, + ) self._temp_dag_file = temp_dag_file @property def api_endpoint(self) -> str: return EXECUTOR_SERVICE_DOMAIN + self._port - def _get_flow_meta(self) -> dict: - # TODO: this should be got from flow.json for all languages by default? - flow_meta_json_path = self._working_dir / ".promptflow" / "flow.json" - if not flow_meta_json_path.is_file(): - raise MetaFileNotFound( - message_format=( - # TODO: pf flow validate should be able to generate flow.json - "Failed to fetch meta of inputs: cannot find {file_path}, please retry." - ), - file_path=flow_meta_json_path.absolute().as_posix(), - ) - - with open(flow_meta_json_path, mode="r", encoding=DEFAULT_ENCODING) as flow_meta_json_path: - return json.load(flow_meta_json_path) - @classmethod def _generate_flow_meta(cls, flow_file: str, assembly_folder: Path): command = [ @@ -107,24 +97,14 @@ async def create( else: temp_dag_file = None - command = [ - "dotnet", - EXECUTOR_SERVICE_DLL, - "--execution_service", - "--port", - port, - "--yaml_path", - flow_file.as_posix(), - "--assembly_folder", - ".", - "--log_path", - log_path, - "--log_level", - "Warning", - "--error_file_path", - init_error_file, - ] - process = subprocess.Popen(command) + process = subprocess.Popen( + cls._construct_service_startup_command( + port=port, + log_path=log_path, + error_file_path=init_error_file, + yaml_path=flow_file.as_posix(), + ) + ) executor_proxy = cls( process=process, port=port, @@ -148,38 +128,11 @@ async def destroy(self): if self._temp_dag_file and os.path.isfile(self._temp_dag_file): Path(self._temp_dag_file).unlink() - async def exec_aggregation_async( - self, - batch_inputs: Mapping[str, Any], - aggregation_inputs: Mapping[str, Any], - run_id: Optional[str] = None, - ) -> AggregationResult: - return AggregationResult({}, {}, {}) - def _is_executor_active(self): """Check if the process is still running and return False if it has exited""" # get the exit code of the process by poll() and if it is None, it means the process is still running return self._process.poll() is None - @classmethod - def _get_tool_metadata(cls, flow_file: Path, working_dir: Path) -> dict: - flow_tools_json_path = working_dir / PROMPT_FLOW_DIR_NAME / FLOW_TOOLS_JSON - if flow_tools_json_path.is_file(): - with open(flow_tools_json_path, mode="r", encoding=DEFAULT_ENCODING) as f: - try: - return json.load(f) - except json.JSONDecodeError: - raise MetaFileReadError( - message_format="Failed to fetch meta of tools: {file_path} is not a valid json file.", - file_path=flow_tools_json_path.absolute().as_posix(), - ) - raise MetaFileNotFound( - message_format=( - "Failed to fetch meta of tools: cannot find {file_path}, please build the flow project first." - ), - file_path=flow_tools_json_path.absolute().as_posix(), - ) - @classmethod def find_available_port(cls) -> str: """Find an available port on localhost"""