Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support flow.flex.yaml for csharp serve #3019

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 15 additions & 105 deletions src/promptflow-devkit/promptflow/_cli/_pf/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
import importlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import webbrowser
from pathlib import Path
from urllib.parse import urlencode, urlunparse

from promptflow._cli._params import (
AppendToDictAction,
Expand Down Expand Up @@ -40,14 +37,14 @@
copy_extra_files,
)
from promptflow._cli._utils import _copy_to_flow, activate_action, confirm, inject_sys_path, list_of_dict_to_dict
from promptflow._constants import ConnectionProviderConfig, FlowLanguage
from promptflow._constants import ConnectionProviderConfig
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME
from promptflow._sdk._pf_client import PFClient
from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._sdk._utils import generate_yaml_entry_without_recover
from promptflow._sdk.operations._flow_operations import FlowOperations
from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path
from promptflow._sdk._utils.chat_utils import construct_chat_page_url
from promptflow._sdk._utils.serve_utils import start_flow_service
from promptflow._utils.flow_utils import is_flex_flow
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.exceptions import ErrorTarget, UserErrorException

Expand Down Expand Up @@ -517,23 +514,12 @@ def _test_flow_multi_modal(args, pf_client):
else:
from promptflow._sdk._tracing import _invoke_pf_svc

# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def generate_url(flow_path, port, url_params, enable_internal_features=False):
encrypted_flow_path = encrypt_flow_path(flow_path)
query_dict = {"flow": encrypted_flow_path}
if Configuration.get_instance().is_internal_features_enabled() or enable_internal_features:
query_dict.update({"enable_internal_features": "true", **url_params})
query_params = urlencode(query_dict)
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))

pfs_port = _invoke_pf_svc()
flow = generate_yaml_entry_without_recover(entry=args.flow)
# flex flow without yaml file doesn't support /eval in chat window
enable_internal_features = True if flow != args.flow else False
flow_path_dir, flow_path_file = resolve_flow_path(flow)
flow_path = str(flow_path_dir / flow_path_file)
chat_page_url = generate_url(
flow_path,
enable_internal_features = Configuration.get_instance().is_internal_features_enabled() or flow != args.flow
chat_page_url = construct_chat_page_url(
flow,
pfs_port,
list_of_dict_to_dict(args.url_params),
enable_internal_features=enable_internal_features,
Expand Down Expand Up @@ -598,95 +584,19 @@ def _test_flow_experiment(args, pf_client, inputs, environment_variables):


def serve_flow(args):
from promptflow._sdk._load_functions import load_flow

logger.info("Start serve model: %s", args.source)
# Set environment variable for local test
source = Path(args.source)
logger.info(
"Start promptflow server with port %s",
args.port,
)
os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix()
flow = load_flow(args.source)
if flow.language == FlowLanguage.CSharp:
serve_flow_csharp(args, source)
else:
serve_flow_python(args, source)
logger.info("Promptflow app ended")


def serve_flow_csharp(args, source):
from promptflow._proxy._csharp_executor_proxy import EXECUTOR_SERVICE_DLL

try:
# Change working directory to model dir
logger.info(f"Change working directory to model dir {source}")
os.chdir(source)
command = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--port",
str(args.port),
"--yaml_path",
"flow.dag.yaml",
"--assembly_folder",
".",
"--connection_provider_url",
"",
"--log_path",
"",
"--serving",
]
subprocess.run(command, stdout=sys.stdout, stderr=sys.stderr)
except KeyboardInterrupt:
pass


def _resolve_python_flow_additional_includes(source) -> Path:
# Resolve flow additional includes
from promptflow.client import load_flow

flow = load_flow(source)
with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path:
if resolved_flow_path == flow.path:
return source
# Copy resolved flow to temp folder if additional includes exists
# Note: DO NOT use resolved flow path directly, as when inner logic raise exception,
# temp dir will fail due to file occupied by other process.
temp_flow_path = Path(tempfile.TemporaryDirectory().name)
shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True)

return temp_flow_path


def serve_flow_python(args, source):
from promptflow._sdk._configuration import Configuration
from promptflow.core._serving.app import create_app

static_folder = args.static_folder
if static_folder:
static_folder = Path(static_folder).absolute().as_posix()
config = list_of_dict_to_dict(args.config)
pf_config = Configuration(overrides=config)
logger.info(f"Promptflow config: {pf_config}")
connection_provider = pf_config.get_connection_provider()
source = _resolve_python_flow_additional_includes(source)
os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix()
logger.info(f"Change working directory to model dir {source}")
os.chdir(source)
app = create_app(
static_folder=static_folder,
start_flow_service(
source=Path(args.source),
static_folder=args.static_folder,
config=list_of_dict_to_dict(args.config),
environment_variables=list_of_dict_to_dict(args.environment_variables),
connection_provider=connection_provider,
init=list_of_dict_to_dict(args.init),
host=args.host,
port=args.port,
skip_open_browser=args.skip_open_browser,
)
if not args.skip_open_browser:
target = f"http://{args.host}:{args.port}"
logger.info(f"Opening browser {target}...")
webbrowser.open(target)
# Debug is not supported for now as debug will rerun command, and we changed working directory.
app.run(port=args.port, host=args.host)
logger.info("Promptflow app ended")


def build_flow(args):
Expand Down
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from promptflow._sdk._constants import LOCAL_MGMT_DB_PATH, CreatedByFieldName
from promptflow._sdk._service.apis.collector import trace_collector
from promptflow._sdk._tracing import process_otlp_trace_request
from promptflow._sdk._utils.general_utils import resolve_flow_language
from promptflow._sdk._version import VERSION
from promptflow._utils.context_utils import _change_working_dir, inject_sys_path
from promptflow._utils.credential_scrubber import CredentialScrubber
Expand Down
21 changes: 21 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_utils/chat_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from urllib.parse import urlencode, urlunparse

from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._utils.flow_utils import resolve_flow_path


def construct_flow_absolute_path(flow: str) -> str:
flow_dir, flow_file = resolve_flow_path(flow)
return (flow_dir / flow_file).absolute().resolve().as_posix()


# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def construct_chat_page_url(flow, port, url_params, enable_internal_features=False):
flow_path_dir, flow_path_file = resolve_flow_path(flow)
flow_path = str(flow_path_dir / flow_path_file)
encrypted_flow_path = encrypt_flow_path(flow_path)
query_dict = {"flow": encrypted_flow_path}
if enable_internal_features:
query_dict.update({"enable_internal_features": "true", **url_params})
query_params = urlencode(query_dict)
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))
25 changes: 24 additions & 1 deletion src/promptflow-devkit/promptflow/_sdk/_utils/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from keyring.errors import NoKeyringError
from marshmallow import ValidationError

from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_FLEX_YAML, FlowLanguage
from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_FLEX_YAML, LANGUAGE_KEY, FlowLanguage
from promptflow._core.entry_meta_generator import generate_flow_meta as _generate_flow_meta
from promptflow._sdk._constants import (
AZURE_WORKSPACE_REGEX_FORMAT,
Expand Down Expand Up @@ -1103,3 +1103,26 @@ def load_input_data(data_path):
return json.load(f)
else:
raise ValueError("Only support jsonl or json file as input.")


def resolve_flow_language(
*,
flow_path: Union[str, Path, PathLike, None] = None,
yaml_dict: Optional[dict] = None,
working_dir: Union[str, Path, PathLike, None] = None,
# add kwargs given this function will be used in runtime and may have more parameters in the future
**kwargs,
) -> str:
"""Get language of a flow. Will return 'python' for Prompty."""
if flow_path is None and yaml_dict is None:
raise UserErrorException("Either flow_path or yaml_dict should be provided.")
if flow_path is not None and yaml_dict is not None:
raise UserErrorException("Only one of flow_path and yaml_dict should be provided.")
if flow_path is not None:
flow_path, flow_file = resolve_flow_path(flow_path, base_path=working_dir, check_flow_exist=False)
file_path = flow_path / flow_file
if file_path.is_file() and file_path.suffix.lower() in (".yaml", ".yml"):
yaml_dict = load_yaml(file_path)
else:
raise UserErrorException(f"Invalid flow path {file_path.as_posix()}, must exist and of suffix yaml or yml.")
return yaml_dict.get(LANGUAGE_KEY, FlowLanguage.Python)
167 changes: 167 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_utils/serve_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import contextlib
import json
import logging
import os
import shutil
import socket
import subprocess
import sys
import tempfile
import uuid
import webbrowser
from pathlib import Path
from typing import Any, Dict, Generator

from promptflow._constants import PROMPT_FLOW_DIR_NAME, FlowLanguage
from promptflow._proxy._csharp_inspector_proxy import EXECUTOR_SERVICE_DLL
from promptflow._sdk._utils.general_utils import resolve_flow_language
from promptflow._utils.flow_utils import resolve_flow_path

logger = logging.getLogger(__name__)


def find_available_port() -> str:
"""Find an available port on localhost"""
# TODO: replace find_available_port in CSharpExecutorProxy with this one
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
_, port = s.getsockname()
return str(port)


def _resolve_python_flow_additional_includes(source) -> Path:
# Resolve flow additional includes
from promptflow.client import load_flow

flow = load_flow(source)
from promptflow._sdk.operations import FlowOperations

with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path:
if resolved_flow_path == flow.path:
return source
# Copy resolved flow to temp folder if additional includes exists
# Note: DO NOT use resolved flow path directly, as when inner logic raise exception,
# temp dir will fail due to file occupied by other process.
temp_flow_path = Path(tempfile.TemporaryDirectory().name)
shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True)

return temp_flow_path


def start_flow_service(
*,
source: Path,
static_folder: str = None,
host: str = "localhost",
port: int = 8080,
config: dict = None,
environment_variables: Dict[str, str] = None,
init: Dict[str, Any] = None,
skip_open_browser: bool = True,
):
logger.info(
"Start promptflow server with port %s",
port,
)
language = resolve_flow_language(flow_path=source)

flow_dir, flow_file_name = resolve_flow_path(source)
if language == FlowLanguage.Python:
serve_python_flow(
flow_file_name=flow_file_name,
flow_dir=flow_dir,
init=init or {},
port=port,
static_folder=static_folder,
host=host,
config=config or {},
environment_variables=environment_variables or {},
skip_open_browser=skip_open_browser,
)
else:
serve_csharp_flow(
flow_file_name=flow_file_name,
flow_dir=flow_dir,
init=init or {},
port=port,
)


def serve_python_flow(
*,
flow_file_name,
flow_dir,
port,
host,
static_folder,
config,
environment_variables,
init,
skip_open_browser: bool,
):
from promptflow._sdk._configuration import Configuration
from promptflow.core._serving.app import create_app

flow_dir = _resolve_python_flow_additional_includes(flow_dir / flow_file_name)

pf_config = Configuration(overrides=config)
logger.info(f"Promptflow config: {pf_config}")
connection_provider = pf_config.get_connection_provider()
os.environ["PROMPTFLOW_PROJECT_PATH"] = flow_dir.absolute().as_posix()
logger.info(f"Change working directory to model dir {flow_dir}")
os.chdir(flow_dir)
app = create_app(
static_folder=Path(static_folder).absolute().as_posix() if static_folder else None,
environment_variables=environment_variables,
connection_provider=connection_provider,
init=init,
)
if not skip_open_browser:
target = f"http://{host}:{port}"
logger.info(f"Opening browser {target}...")
webbrowser.open(target)
# Debug is not supported for now as debug will rerun command, and we changed working directory.
app.run(port=port, host=host)


@contextlib.contextmanager
def construct_csharp_service_start_up_command(
*, port: int, flow_file_name: str, flow_dir: Path, init: Dict[str, Any] = None
) -> Generator[str, None, None]:
cmd = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--port",
str(port),
"--yaml_path",
flow_file_name,
"--assembly_folder",
".",
"--connection_provider_url",
"",
"--log_path",
"",
"--serving",
]
if init:
init_json_path = flow_dir / PROMPT_FLOW_DIR_NAME / f"init-{uuid.uuid4()}.json"
init_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(init_json_path, "w") as f:
json.dump(init, f)
cmd.extend(["--init", init_json_path.as_posix()])
try:
yield cmd
finally:
os.remove(init_json_path)
else:
yield cmd


def serve_csharp_flow(flow_dir: Path, port: int, flow_file_name: str, init: Dict[str, Any] = None):
try:
with construct_csharp_service_start_up_command(
port=port, flow_file_name=flow_file_name, flow_dir=flow_dir, init=init
) as command:
subprocess.run(command, cwd=flow_dir, stdout=sys.stdout, stderr=sys.stderr)
except KeyboardInterrupt:
pass
Loading
Loading