Skip to content

Commit

Permalink
feat: support flow.flex.yaml for csharp serve (#3019)
Browse files Browse the repository at this point in the history
# Description

Allow run `pf flow serve` on below csharp scenario 
1. flex flow, including class init
2. use yaml as source

Add an util function to infer language of a flow

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [x] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [x] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
elliotzh authored Apr 26, 2024
1 parent 5c510d5 commit 98d3e69
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 107 deletions.
120 changes: 15 additions & 105 deletions src/promptflow-devkit/promptflow/_cli/_pf/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
import importlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import webbrowser
from pathlib import Path
from urllib.parse import urlencode, urlunparse

from promptflow._cli._params import (
AppendToDictAction,
Expand Down Expand Up @@ -40,14 +37,14 @@
copy_extra_files,
)
from promptflow._cli._utils import _copy_to_flow, activate_action, confirm, inject_sys_path, list_of_dict_to_dict
from promptflow._constants import ConnectionProviderConfig, FlowLanguage
from promptflow._constants import ConnectionProviderConfig
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME
from promptflow._sdk._pf_client import PFClient
from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._sdk._utils import generate_yaml_entry_without_recover
from promptflow._sdk.operations._flow_operations import FlowOperations
from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path
from promptflow._sdk._utils.chat_utils import construct_chat_page_url
from promptflow._sdk._utils.serve_utils import start_flow_service
from promptflow._utils.flow_utils import is_flex_flow
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.exceptions import ErrorTarget, UserErrorException

Expand Down Expand Up @@ -517,23 +514,12 @@ def _test_flow_multi_modal(args, pf_client):
else:
from promptflow._sdk._tracing import _invoke_pf_svc

# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def generate_url(flow_path, port, url_params, enable_internal_features=False):
encrypted_flow_path = encrypt_flow_path(flow_path)
query_dict = {"flow": encrypted_flow_path}
if Configuration.get_instance().is_internal_features_enabled() or enable_internal_features:
query_dict.update({"enable_internal_features": "true", **url_params})
query_params = urlencode(query_dict)
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))

pfs_port = _invoke_pf_svc()
flow = generate_yaml_entry_without_recover(entry=args.flow)
# flex flow without yaml file doesn't support /eval in chat window
enable_internal_features = True if flow != args.flow else False
flow_path_dir, flow_path_file = resolve_flow_path(flow)
flow_path = str(flow_path_dir / flow_path_file)
chat_page_url = generate_url(
flow_path,
enable_internal_features = Configuration.get_instance().is_internal_features_enabled() or flow != args.flow
chat_page_url = construct_chat_page_url(
flow,
pfs_port,
list_of_dict_to_dict(args.url_params),
enable_internal_features=enable_internal_features,
Expand Down Expand Up @@ -598,95 +584,19 @@ def _test_flow_experiment(args, pf_client, inputs, environment_variables):


def serve_flow(args):
from promptflow._sdk._load_functions import load_flow

logger.info("Start serve model: %s", args.source)
# Set environment variable for local test
source = Path(args.source)
logger.info(
"Start promptflow server with port %s",
args.port,
)
os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix()
flow = load_flow(args.source)
if flow.language == FlowLanguage.CSharp:
serve_flow_csharp(args, source)
else:
serve_flow_python(args, source)
logger.info("Promptflow app ended")


def serve_flow_csharp(args, source):
from promptflow._proxy._csharp_executor_proxy import EXECUTOR_SERVICE_DLL

try:
# Change working directory to model dir
logger.info(f"Change working directory to model dir {source}")
os.chdir(source)
command = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--port",
str(args.port),
"--yaml_path",
"flow.dag.yaml",
"--assembly_folder",
".",
"--connection_provider_url",
"",
"--log_path",
"",
"--serving",
]
subprocess.run(command, stdout=sys.stdout, stderr=sys.stderr)
except KeyboardInterrupt:
pass


def _resolve_python_flow_additional_includes(source) -> Path:
# Resolve flow additional includes
from promptflow.client import load_flow

flow = load_flow(source)
with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path:
if resolved_flow_path == flow.path:
return source
# Copy resolved flow to temp folder if additional includes exists
# Note: DO NOT use resolved flow path directly, as when inner logic raise exception,
# temp dir will fail due to file occupied by other process.
temp_flow_path = Path(tempfile.TemporaryDirectory().name)
shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True)

return temp_flow_path


def serve_flow_python(args, source):
from promptflow._sdk._configuration import Configuration
from promptflow.core._serving.app import create_app

static_folder = args.static_folder
if static_folder:
static_folder = Path(static_folder).absolute().as_posix()
config = list_of_dict_to_dict(args.config)
pf_config = Configuration(overrides=config)
logger.info(f"Promptflow config: {pf_config}")
connection_provider = pf_config.get_connection_provider()
source = _resolve_python_flow_additional_includes(source)
os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix()
logger.info(f"Change working directory to model dir {source}")
os.chdir(source)
app = create_app(
static_folder=static_folder,
start_flow_service(
source=Path(args.source),
static_folder=args.static_folder,
config=list_of_dict_to_dict(args.config),
environment_variables=list_of_dict_to_dict(args.environment_variables),
connection_provider=connection_provider,
init=list_of_dict_to_dict(args.init),
host=args.host,
port=args.port,
skip_open_browser=args.skip_open_browser,
)
if not args.skip_open_browser:
target = f"http://{args.host}:{args.port}"
logger.info(f"Opening browser {target}...")
webbrowser.open(target)
# Debug is not supported for now as debug will rerun command, and we changed working directory.
app.run(port=args.port, host=args.host)
logger.info("Promptflow app ended")


def build_flow(args):
Expand Down
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from promptflow._sdk._constants import LOCAL_MGMT_DB_PATH, CreatedByFieldName
from promptflow._sdk._service.apis.collector import trace_collector
from promptflow._sdk._tracing import process_otlp_trace_request
from promptflow._sdk._utils.general_utils import resolve_flow_language
from promptflow._sdk._version import VERSION
from promptflow._utils.context_utils import _change_working_dir, inject_sys_path
from promptflow._utils.credential_scrubber import CredentialScrubber
Expand Down
21 changes: 21 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_utils/chat_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from urllib.parse import urlencode, urlunparse

from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._utils.flow_utils import resolve_flow_path


def construct_flow_absolute_path(flow: str) -> str:
flow_dir, flow_file = resolve_flow_path(flow)
return (flow_dir / flow_file).absolute().resolve().as_posix()


# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def construct_chat_page_url(flow, port, url_params, enable_internal_features=False):
flow_path_dir, flow_path_file = resolve_flow_path(flow)
flow_path = str(flow_path_dir / flow_path_file)
encrypted_flow_path = encrypt_flow_path(flow_path)
query_dict = {"flow": encrypted_flow_path}
if enable_internal_features:
query_dict.update({"enable_internal_features": "true", **url_params})
query_params = urlencode(query_dict)
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))
25 changes: 24 additions & 1 deletion src/promptflow-devkit/promptflow/_sdk/_utils/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from keyring.errors import NoKeyringError
from marshmallow import ValidationError

from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_FLEX_YAML, FlowLanguage
from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_FLEX_YAML, LANGUAGE_KEY, FlowLanguage
from promptflow._core.entry_meta_generator import generate_flow_meta as _generate_flow_meta
from promptflow._sdk._constants import (
AZURE_WORKSPACE_REGEX_FORMAT,
Expand Down Expand Up @@ -1103,3 +1103,26 @@ def load_input_data(data_path):
return json.load(f)
else:
raise ValueError("Only support jsonl or json file as input.")


def resolve_flow_language(
*,
flow_path: Union[str, Path, PathLike, None] = None,
yaml_dict: Optional[dict] = None,
working_dir: Union[str, Path, PathLike, None] = None,
# add kwargs given this function will be used in runtime and may have more parameters in the future
**kwargs,
) -> str:
"""Get language of a flow. Will return 'python' for Prompty."""
if flow_path is None and yaml_dict is None:
raise UserErrorException("Either flow_path or yaml_dict should be provided.")
if flow_path is not None and yaml_dict is not None:
raise UserErrorException("Only one of flow_path and yaml_dict should be provided.")
if flow_path is not None:
flow_path, flow_file = resolve_flow_path(flow_path, base_path=working_dir, check_flow_exist=False)
file_path = flow_path / flow_file
if file_path.is_file() and file_path.suffix.lower() in (".yaml", ".yml"):
yaml_dict = load_yaml(file_path)
else:
raise UserErrorException(f"Invalid flow path {file_path.as_posix()}, must exist and of suffix yaml or yml.")
return yaml_dict.get(LANGUAGE_KEY, FlowLanguage.Python)
167 changes: 167 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_utils/serve_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import contextlib
import json
import logging
import os
import shutil
import socket
import subprocess
import sys
import tempfile
import uuid
import webbrowser
from pathlib import Path
from typing import Any, Dict, Generator

from promptflow._constants import PROMPT_FLOW_DIR_NAME, FlowLanguage
from promptflow._proxy._csharp_inspector_proxy import EXECUTOR_SERVICE_DLL
from promptflow._sdk._utils.general_utils import resolve_flow_language
from promptflow._utils.flow_utils import resolve_flow_path

logger = logging.getLogger(__name__)


def find_available_port() -> str:
"""Find an available port on localhost"""
# TODO: replace find_available_port in CSharpExecutorProxy with this one
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
_, port = s.getsockname()
return str(port)


def _resolve_python_flow_additional_includes(source) -> Path:
# Resolve flow additional includes
from promptflow.client import load_flow

flow = load_flow(source)
from promptflow._sdk.operations import FlowOperations

with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path:
if resolved_flow_path == flow.path:
return source
# Copy resolved flow to temp folder if additional includes exists
# Note: DO NOT use resolved flow path directly, as when inner logic raise exception,
# temp dir will fail due to file occupied by other process.
temp_flow_path = Path(tempfile.TemporaryDirectory().name)
shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True)

return temp_flow_path


def start_flow_service(
*,
source: Path,
static_folder: str = None,
host: str = "localhost",
port: int = 8080,
config: dict = None,
environment_variables: Dict[str, str] = None,
init: Dict[str, Any] = None,
skip_open_browser: bool = True,
):
logger.info(
"Start promptflow server with port %s",
port,
)
language = resolve_flow_language(flow_path=source)

flow_dir, flow_file_name = resolve_flow_path(source)
if language == FlowLanguage.Python:
serve_python_flow(
flow_file_name=flow_file_name,
flow_dir=flow_dir,
init=init or {},
port=port,
static_folder=static_folder,
host=host,
config=config or {},
environment_variables=environment_variables or {},
skip_open_browser=skip_open_browser,
)
else:
serve_csharp_flow(
flow_file_name=flow_file_name,
flow_dir=flow_dir,
init=init or {},
port=port,
)


def serve_python_flow(
*,
flow_file_name,
flow_dir,
port,
host,
static_folder,
config,
environment_variables,
init,
skip_open_browser: bool,
):
from promptflow._sdk._configuration import Configuration
from promptflow.core._serving.app import create_app

flow_dir = _resolve_python_flow_additional_includes(flow_dir / flow_file_name)

pf_config = Configuration(overrides=config)
logger.info(f"Promptflow config: {pf_config}")
connection_provider = pf_config.get_connection_provider()
os.environ["PROMPTFLOW_PROJECT_PATH"] = flow_dir.absolute().as_posix()
logger.info(f"Change working directory to model dir {flow_dir}")
os.chdir(flow_dir)
app = create_app(
static_folder=Path(static_folder).absolute().as_posix() if static_folder else None,
environment_variables=environment_variables,
connection_provider=connection_provider,
init=init,
)
if not skip_open_browser:
target = f"http://{host}:{port}"
logger.info(f"Opening browser {target}...")
webbrowser.open(target)
# Debug is not supported for now as debug will rerun command, and we changed working directory.
app.run(port=port, host=host)


@contextlib.contextmanager
def construct_csharp_service_start_up_command(
*, port: int, flow_file_name: str, flow_dir: Path, init: Dict[str, Any] = None
) -> Generator[str, None, None]:
cmd = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--port",
str(port),
"--yaml_path",
flow_file_name,
"--assembly_folder",
".",
"--connection_provider_url",
"",
"--log_path",
"",
"--serving",
]
if init:
init_json_path = flow_dir / PROMPT_FLOW_DIR_NAME / f"init-{uuid.uuid4()}.json"
init_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(init_json_path, "w") as f:
json.dump(init, f)
cmd.extend(["--init", init_json_path.as_posix()])
try:
yield cmd
finally:
os.remove(init_json_path)
else:
yield cmd


def serve_csharp_flow(flow_dir: Path, port: int, flow_file_name: str, init: Dict[str, Any] = None):
try:
with construct_csharp_service_start_up_command(
port=port, flow_file_name=flow_file_name, flow_dir=flow_dir, init=init
) as command:
subprocess.run(command, cwd=flow_dir, stdout=sys.stdout, stderr=sys.stderr)
except KeyboardInterrupt:
pass
Loading

0 comments on commit 98d3e69

Please sign in to comment.