Skip to content

Commit

Permalink
refactor: add a shared base for local executor proxy and runtime exec…
Browse files Browse the repository at this point in the history
…utor proxy (#1943)

# Description

This pull request involves significant changes to the
`src/promptflow/promptflow/batch/_csharp_executor_proxy.py` file and the
addition of a new file
`src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py`. The
changes mainly revolve around the refactoring of the
`CSharpExecutorProxy` class into a new `CSharpBaseExecutorProxy` class
and a revised `CSharpLocalExecutorProxy` class. The refactoring aims to
improve code reuse and organization.

Here are the most important changes:

**Refactoring and code organization:**

*
[`src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py`](diffhunk://#diff-2237f977aca2e9808fd5c2957c79d4be28b0525c02cdfd801c0160ad3315836dR1-R103):
This is a new file that contains the base class
`CSharpBaseExecutorProxy` for the C# executor proxy. It includes methods
and properties that were previously in the `CSharpExecutorProxy` class.

*
[`src/promptflow/promptflow/batch/_csharp_executor_proxy.py`](diffhunk://#diff-f8219170007d8a89eed104a166ab8c0c9d3af5b5c5e61225659d982f470aeb95L4-L51):
The `CSharpExecutorProxy` class has been renamed to
`CSharpLocalExecutorProxy` and now inherits from the new
`CSharpBaseExecutorProxy` class. Several methods previously in this
class have been moved to the base class.

**Code simplification:**

*
[`src/promptflow/promptflow/batch/_csharp_executor_proxy.py`](diffhunk://#diff-f8219170007d8a89eed104a166ab8c0c9d3af5b5c5e61225659d982f470aeb95L110-R107):
The `create` method has been simplified by using the
`_construct_service_startup_command` method from the base class to
generate the command for starting the service.

*
[`src/promptflow/promptflow/batch/_csharp_executor_proxy.py`](diffhunk://#diff-f8219170007d8a89eed104a166ab8c0c9d3af5b5c5e61225659d982f470aeb95L151-L182):
The `exec_aggregation_async` and `_get_tool_metadata` methods, which are
not supported or used in the local executor proxy, have been removed.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
elliotzh authored Feb 4, 2024
1 parent 66220aa commit 8cc7e20
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 69 deletions.
102 changes: 102 additions & 0 deletions src/promptflow/promptflow/batch/_csharp_base_executor_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
from pathlib import Path
from typing import Any, List, Mapping, Optional

from promptflow._core._errors import MetaFileNotFound, MetaFileReadError
from promptflow._sdk._constants import DEFAULT_ENCODING, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME
from promptflow.batch._base_executor_proxy import APIBasedExecutorProxy
from promptflow.executor._result import AggregationResult

EXECUTOR_SERVICE_DLL = "Promptflow.dll"


class CSharpBaseExecutorProxy(APIBasedExecutorProxy):
"""Base class for csharp executor proxy for local and runtime."""

def __init__(
self,
*,
working_dir: Path,
):
self._working_dir = working_dir

@property
def working_dir(self) -> Path:
return self._working_dir

def _get_flow_meta(self) -> dict:
# TODO: this should be got from flow.json for all languages by default? If so, we need to promote working_dir
# to be a required parameter in the super constructor.
flow_meta_json_path = self.working_dir / ".promptflow" / "flow.json"
if not flow_meta_json_path.is_file():
raise MetaFileNotFound(
message_format=(
# TODO: pf flow validate should be able to generate flow.json
"Failed to fetch meta of inputs: cannot find {file_path}, please retry."
),
file_path=flow_meta_json_path.absolute().as_posix(),
)

with open(flow_meta_json_path, mode="r", encoding=DEFAULT_ENCODING) as flow_meta_json_path:
return json.load(flow_meta_json_path)

async def exec_aggregation_async(
self,
batch_inputs: Mapping[str, Any],
aggregation_inputs: Mapping[str, Any],
run_id: Optional[str] = None,
) -> AggregationResult:
# TODO: aggregation is not supported for now?
return AggregationResult({}, {}, {})

@classmethod
def _construct_service_startup_command(
cls,
port,
log_path,
error_file_path,
yaml_path: str = "flow.dag.yaml",
log_level: str = "Warning",
assembly_folder: str = ".",
) -> List[str]:
return [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--execution_service",
"--port",
port,
"--yaml_path",
yaml_path,
"--assembly_folder",
assembly_folder,
"--log_path",
log_path,
"--log_level",
log_level,
"--error_file_path",
error_file_path,
]

@classmethod
def _get_tool_metadata(cls, flow_file: Path, working_dir: Path) -> dict:
# TODO: this should be got from flow.tools.json for all languages by default? If so,
# we need to promote working_dir to be a required parameter in the super constructor.
flow_tools_json_path = working_dir / PROMPT_FLOW_DIR_NAME / FLOW_TOOLS_JSON
if flow_tools_json_path.is_file():
with open(flow_tools_json_path, mode="r", encoding=DEFAULT_ENCODING) as f:
try:
return json.load(f)
except json.JSONDecodeError:
raise MetaFileReadError(
message_format="Failed to fetch meta of tools: {file_path} is not a valid json file.",
file_path=flow_tools_json_path.absolute().as_posix(),
)
raise MetaFileNotFound(
message_format=(
"Failed to fetch meta of tools: cannot find {file_path}, please build the flow project first."
),
file_path=flow_tools_json_path.absolute().as_posix(),
)
91 changes: 22 additions & 69 deletions src/promptflow/promptflow/batch/_csharp_executor_proxy.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,44 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import os
import socket
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import Any, Mapping, Optional
from typing import Optional

from promptflow._core._errors import MetaFileNotFound, MetaFileReadError, UnexpectedError
from promptflow._sdk._constants import DEFAULT_ENCODING, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME
from promptflow._core._errors import UnexpectedError
from promptflow._sdk._constants import DEFAULT_ENCODING
from promptflow._utils.yaml_utils import dump_yaml
from promptflow.batch._base_executor_proxy import APIBasedExecutorProxy
from promptflow.executor._result import AggregationResult
from promptflow.batch._csharp_base_executor_proxy import CSharpBaseExecutorProxy
from promptflow.storage._run_storage import AbstractRunStorage

EXECUTOR_SERVICE_DOMAIN = "http://localhost:"
EXECUTOR_SERVICE_DLL = "Promptflow.dll"


class CSharpExecutorProxy(APIBasedExecutorProxy):
class CSharpExecutorProxy(CSharpBaseExecutorProxy):
def __init__(
self, *, process: subprocess.Popen, port: str, working_dir: Path, temp_dag_file: Optional[Path] = None
self,
*,
process: subprocess.Popen,
port: str,
working_dir: Path,
temp_dag_file: Optional[Path] = None,
):
self._process = process
self._port = port
self._working_dir = working_dir
super().__init__(
working_dir=working_dir,
)
self._temp_dag_file = temp_dag_file

@property
def api_endpoint(self) -> str:
return EXECUTOR_SERVICE_DOMAIN + self._port

def _get_flow_meta(self) -> dict:
# TODO: this should be got from flow.json for all languages by default?
flow_meta_json_path = self._working_dir / ".promptflow" / "flow.json"
if not flow_meta_json_path.is_file():
raise MetaFileNotFound(
message_format=(
# TODO: pf flow validate should be able to generate flow.json
"Failed to fetch meta of inputs: cannot find {file_path}, please retry."
),
file_path=flow_meta_json_path.absolute().as_posix(),
)

with open(flow_meta_json_path, mode="r", encoding=DEFAULT_ENCODING) as flow_meta_json_path:
return json.load(flow_meta_json_path)

@classmethod
def _generate_flow_meta(cls, flow_file: str, assembly_folder: Path):
command = [
Expand Down Expand Up @@ -107,24 +97,14 @@ async def create(
else:
temp_dag_file = None

command = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--execution_service",
"--port",
port,
"--yaml_path",
flow_file.as_posix(),
"--assembly_folder",
".",
"--log_path",
log_path,
"--log_level",
"Warning",
"--error_file_path",
init_error_file,
]
process = subprocess.Popen(command)
process = subprocess.Popen(
cls._construct_service_startup_command(
port=port,
log_path=log_path,
error_file_path=init_error_file,
yaml_path=flow_file.as_posix(),
)
)
executor_proxy = cls(
process=process,
port=port,
Expand All @@ -148,38 +128,11 @@ async def destroy(self):
if self._temp_dag_file and os.path.isfile(self._temp_dag_file):
Path(self._temp_dag_file).unlink()

async def exec_aggregation_async(
self,
batch_inputs: Mapping[str, Any],
aggregation_inputs: Mapping[str, Any],
run_id: Optional[str] = None,
) -> AggregationResult:
return AggregationResult({}, {}, {})

def _is_executor_active(self):
"""Check if the process is still running and return False if it has exited"""
# get the exit code of the process by poll() and if it is None, it means the process is still running
return self._process.poll() is None

@classmethod
def _get_tool_metadata(cls, flow_file: Path, working_dir: Path) -> dict:
flow_tools_json_path = working_dir / PROMPT_FLOW_DIR_NAME / FLOW_TOOLS_JSON
if flow_tools_json_path.is_file():
with open(flow_tools_json_path, mode="r", encoding=DEFAULT_ENCODING) as f:
try:
return json.load(f)
except json.JSONDecodeError:
raise MetaFileReadError(
message_format="Failed to fetch meta of tools: {file_path} is not a valid json file.",
file_path=flow_tools_json_path.absolute().as_posix(),
)
raise MetaFileNotFound(
message_format=(
"Failed to fetch meta of tools: cannot find {file_path}, please build the flow project first."
),
file_path=flow_tools_json_path.absolute().as_posix(),
)

@classmethod
def find_available_port(cls) -> str:
"""Find an available port on localhost"""
Expand Down

0 comments on commit 8cc7e20

Please sign in to comment.