diff --git a/src/promptflow-core/promptflow/_utils/flow_utils.py b/src/promptflow-core/promptflow/_utils/flow_utils.py index 0962a675de80..88a4c3770ca2 100644 --- a/src/promptflow-core/promptflow/_utils/flow_utils.py +++ b/src/promptflow-core/promptflow/_utils/flow_utils.py @@ -16,8 +16,10 @@ FLOW_DAG_YAML, FLOW_FILE_SUFFIX, FLOW_FLEX_YAML, + LANGUAGE_KEY, PROMPT_FLOW_DIR_NAME, PROMPTY_EXTENSION, + FlowLanguage, ) from promptflow._core._errors import MetaFileNotFound, MetaFileReadError from promptflow._utils.logger_utils import LoggerFactory @@ -184,6 +186,25 @@ def is_flex_flow( return isinstance(yaml_dict, dict) and "entry" in yaml_dict +def resolve_flow_language( + *, + flow_path: Union[str, Path, PathLike, None] = None, + yaml_dict: Optional[dict] = None, + working_dir: Union[str, Path, PathLike, None] = None, +) -> str: + """Get language of a flow. Will return 'python' for Prompty.""" + if flow_path is None and yaml_dict is None: + raise UserErrorException("Either file_path or yaml_dict should be provided.") + if flow_path is not None and yaml_dict is not None: + raise UserErrorException("Only one of file_path and yaml_dict should be provided.") + if flow_path is not None: + flow_path, flow_file = resolve_flow_path(flow_path, base_path=working_dir, check_flow_exist=False) + file_path = flow_path / flow_file + if file_path.is_file() and file_path.suffix.lower() in (".yaml", ".yml"): + yaml_dict = load_yaml(file_path) + return yaml_dict.get(LANGUAGE_KEY, FlowLanguage.Python) + + def is_prompty_flow(file_path: Union[str, Path], raise_error: bool = False): """Check if the flow is a prompty flow by extension of the flow file is .prompty.""" if not file_path or not Path(file_path).exists(): diff --git a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py index dffe6d6c8554..fd4ccd885d5c 100644 --- a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py +++ b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py @@ -6,13 +6,10 @@ import importlib import json import os -import shutil -import subprocess import sys import tempfile import webbrowser from pathlib import Path -from urllib.parse import urlencode, urlunparse from promptflow._cli._params import ( AppendToDictAction, @@ -40,14 +37,14 @@ copy_extra_files, ) from promptflow._cli._utils import _copy_to_flow, activate_action, confirm, inject_sys_path, list_of_dict_to_dict -from promptflow._constants import ConnectionProviderConfig, FlowLanguage +from promptflow._constants import ConnectionProviderConfig from promptflow._sdk._configuration import Configuration from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME from promptflow._sdk._pf_client import PFClient -from promptflow._sdk._service.utils.utils import encrypt_flow_path from promptflow._sdk._utils import generate_yaml_entry_without_recover -from promptflow._sdk.operations._flow_operations import FlowOperations -from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path +from promptflow._sdk._utils.chat_utils import construct_chat_page_url, construct_flow_absolute_path +from promptflow._sdk._utils.serve_utils import start_flow_service +from promptflow._utils.flow_utils import is_flex_flow from promptflow._utils.logger_utils import get_cli_sdk_logger from promptflow.exceptions import ErrorTarget, UserErrorException @@ -515,27 +512,22 @@ def _test_flow_multi_modal(args, pf_client): logger.info("Start streamlit with main script generated at: %s", main_script_path) pf_client.flows._chat_with_ui(script=main_script_path, skip_open_browser=args.skip_open_browser) else: + from promptflow._sdk._load_functions import load_flow from promptflow._sdk._tracing import _invoke_pf_svc - # Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info - def generate_url(flow_path, port, url_params, enable_internal_features=False): - encrypted_flow_path = encrypt_flow_path(flow_path) - query_dict = {"flow": encrypted_flow_path} - if Configuration.get_instance().is_internal_features_enabled() or enable_internal_features: - query_dict.update({"enable_internal_features": "true", **url_params}) - query_params = urlencode(query_dict) - return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, "")) - pfs_port = _invoke_pf_svc() flow = generate_yaml_entry_without_recover(entry=args.flow) # flex flow without yaml file doesn't support /eval in chat window - enable_internal_features = True if flow != args.flow else False - flow_path_dir, flow_path_file = resolve_flow_path(flow) - flow_path = str(flow_path_dir / flow_path_file) - chat_page_url = generate_url( - flow_path, - pfs_port, - list_of_dict_to_dict(args.url_params), + enable_internal_features = flow != args.flow or Configuration.get_instance().is_internal_features_enabled() + + # for entry like "package:entry_function", a temp flow.flex.yaml will be generated at flow + flow_entity = load_flow(flow) + flow_absolute_path = construct_flow_absolute_path(flow) + chat_page_url = construct_chat_page_url( + flow_absolute_path, + flow_dir=flow_entity.code, + pfs_port=pfs_port, + url_params=list_of_dict_to_dict(args.url_params), enable_internal_features=enable_internal_features, ) print(f"You can begin chat flow on {chat_page_url}") @@ -598,95 +590,19 @@ def _test_flow_experiment(args, pf_client, inputs, environment_variables): def serve_flow(args): - from promptflow._sdk._load_functions import load_flow - logger.info("Start serve model: %s", args.source) # Set environment variable for local test - source = Path(args.source) - logger.info( - "Start promptflow server with port %s", - args.port, - ) - os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix() - flow = load_flow(args.source) - if flow.language == FlowLanguage.CSharp: - serve_flow_csharp(args, source) - else: - serve_flow_python(args, source) - logger.info("Promptflow app ended") - - -def serve_flow_csharp(args, source): - from promptflow._proxy._csharp_executor_proxy import EXECUTOR_SERVICE_DLL - - try: - # Change working directory to model dir - logger.info(f"Change working directory to model dir {source}") - os.chdir(source) - command = [ - "dotnet", - EXECUTOR_SERVICE_DLL, - "--port", - str(args.port), - "--yaml_path", - "flow.dag.yaml", - "--assembly_folder", - ".", - "--connection_provider_url", - "", - "--log_path", - "", - "--serving", - ] - subprocess.run(command, stdout=sys.stdout, stderr=sys.stderr) - except KeyboardInterrupt: - pass - - -def _resolve_python_flow_additional_includes(source) -> Path: - # Resolve flow additional includes - from promptflow.client import load_flow - - flow = load_flow(source) - with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path: - if resolved_flow_path == flow.path: - return source - # Copy resolved flow to temp folder if additional includes exists - # Note: DO NOT use resolved flow path directly, as when inner logic raise exception, - # temp dir will fail due to file occupied by other process. - temp_flow_path = Path(tempfile.TemporaryDirectory().name) - shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True) - - return temp_flow_path - - -def serve_flow_python(args, source): - from promptflow._sdk._configuration import Configuration - from promptflow.core._serving.app import create_app - - static_folder = args.static_folder - if static_folder: - static_folder = Path(static_folder).absolute().as_posix() - config = list_of_dict_to_dict(args.config) - pf_config = Configuration(overrides=config) - logger.info(f"Promptflow config: {pf_config}") - connection_provider = pf_config.get_connection_provider() - source = _resolve_python_flow_additional_includes(source) - os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix() - logger.info(f"Change working directory to model dir {source}") - os.chdir(source) - app = create_app( - static_folder=static_folder, + start_flow_service( + source=Path(args.source), + static_folder=args.static_folder, + config=list_of_dict_to_dict(args.config), environment_variables=list_of_dict_to_dict(args.environment_variables), - connection_provider=connection_provider, init=list_of_dict_to_dict(args.init), + host=args.host, + port=args.port, + skip_open_browser=args.skip_open_browser, ) - if not args.skip_open_browser: - target = f"http://{args.host}:{args.port}" - logger.info(f"Opening browser {target}...") - webbrowser.open(target) - # Debug is not supported for now as debug will rerun command, and we changed working directory. - app.run(port=args.port, host=args.host) + logger.info("Promptflow app ended") def build_flow(args): diff --git a/src/promptflow-devkit/promptflow/_sdk/_utils/chat_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utils/chat_utils.py new file mode 100644 index 000000000000..7687b58ea899 --- /dev/null +++ b/src/promptflow-devkit/promptflow/_sdk/_utils/chat_utils.py @@ -0,0 +1,23 @@ +from pathlib import Path +from urllib.parse import urlencode, urlunparse + +from promptflow._utils.flow_utils import resolve_flow_path + + +def construct_flow_absolute_path(flow: str) -> str: + flow_dir, flow_file = resolve_flow_path(flow) + return (flow_dir / flow_file).absolute().resolve().as_posix() + + +def construct_chat_page_url( + flow_path: str, flow_dir: Path, pfs_port, url_params: dict, enable_internal_features: bool +) -> str: + from promptflow._sdk._service.utils.utils import encrypt_flow_path + + # Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info + query_dict = {"flow": encrypt_flow_path(flow_path), **url_params} + if enable_internal_features: + query_dict["enable_internal_features"] = "true" + query_params = urlencode(query_dict) + + return urlunparse(("http", f"127.0.0.1:{pfs_port}", "/v1.0/ui/chat", "", query_params, "")) diff --git a/src/promptflow-devkit/promptflow/_sdk/_utils/serve_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utils/serve_utils.py new file mode 100644 index 000000000000..51b4b0c5ff74 --- /dev/null +++ b/src/promptflow-devkit/promptflow/_sdk/_utils/serve_utils.py @@ -0,0 +1,185 @@ +import abc +import logging +import os +import shutil +import socket +import subprocess +import sys +import tempfile +import webbrowser +from pathlib import Path +from typing import Any, Dict + +from promptflow._constants import FlowLanguage +from promptflow._proxy._csharp_inspector_proxy import EXECUTOR_SERVICE_DLL +from promptflow._utils.flow_utils import resolve_flow_language, resolve_flow_path + +logger = logging.getLogger(__name__) + + +def find_available_port() -> str: + """Find an available port on localhost""" + # TODO: replace find_available_port in CSharpExecutorProxy with this one + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 0)) + _, port = s.getsockname() + return str(port) + + +def _resolve_python_flow_additional_includes(source) -> Path: + # Resolve flow additional includes + from promptflow.client import load_flow + + flow = load_flow(source) + from promptflow._sdk.operations import FlowOperations + + with FlowOperations._resolve_additional_includes(flow.path) as resolved_flow_path: + if resolved_flow_path == flow.path: + return source + # Copy resolved flow to temp folder if additional includes exists + # Note: DO NOT use resolved flow path directly, as when inner logic raise exception, + # temp dir will fail due to file occupied by other process. + temp_flow_path = Path(tempfile.TemporaryDirectory().name) + shutil.copytree(src=resolved_flow_path.parent, dst=temp_flow_path, dirs_exist_ok=True) + + return temp_flow_path + + +def start_flow_service( + *, + source: Path, + static_folder: str = None, + host: str = "localhost", + port: int = 8080, + config: dict = None, + environment_variables: Dict[str, str] = None, + init: Dict[str, Any] = None, + skip_open_browser: bool = True, +): + logger.info( + "Start promptflow server with port %s", + port, + ) + language = resolve_flow_language(flow_path=source) + + flow_dir, flow_file_name = resolve_flow_path(source) + + os.environ["PROMPTFLOW_PROJECT_PATH"] = flow_dir.absolute().as_posix() + if language == FlowLanguage.Python: + helper = PythonFlowServiceHelper( + flow_file_name=flow_file_name, + flow_dir=flow_dir, + init=init or {}, + port=port, + static_folder=static_folder, + host=host, + config=config or {}, + environment_variables=environment_variables or {}, + skip_open_browser=skip_open_browser, + ) + else: + helper = CSharpFlowServiceHelper( + flow_file_name=flow_file_name, + flow_dir=flow_dir, + init=init or {}, + port=port, + ) + helper.run() + + +class BaseFlowServiceHelper: + def __init__(self): + pass + + @abc.abstractmethod + def run(self): + pass + + +class PythonFlowServiceHelper(BaseFlowServiceHelper): + def __init__( + self, + *, + flow_file_name, + flow_dir, + port, + host, + static_folder, + config, + environment_variables, + init, + skip_open_browser: bool, + ): + self._static_folder = static_folder + self.flow_file_name = flow_file_name + self.flow_dir = flow_dir + self.host = host + self.port = port + self.config = config + self.environment_variables = environment_variables + self.init = init + self.skip_open_browser = skip_open_browser + super().__init__() + + @property + def static_folder(self): + if self._static_folder is None: + return None + return Path(self._static_folder).absolute().as_posix() + + def run(self): + from promptflow._sdk._configuration import Configuration + from promptflow.core._serving.app import create_app + + flow_dir = _resolve_python_flow_additional_includes(self.flow_dir / self.flow_file_name) + + pf_config = Configuration(overrides=self.config) + logger.info(f"Promptflow config: {pf_config}") + connection_provider = pf_config.get_connection_provider() + os.environ["PROMPTFLOW_PROJECT_PATH"] = flow_dir.absolute().as_posix() + logger.info(f"Change working directory to model dir {flow_dir}") + os.chdir(flow_dir) + app = create_app( + static_folder=self.static_folder, + environment_variables=self.environment_variables, + connection_provider=connection_provider, + init=self.init, + ) + if not self.skip_open_browser: + target = f"http://{self.host}:{self.port}" + logger.info(f"Opening browser {target}...") + webbrowser.open(target) + # Debug is not supported for now as debug will rerun command, and we changed working directory. + app.run(port=self.port, host=self.host) + + +class CSharpFlowServiceHelper(BaseFlowServiceHelper): + def __init__(self, *, flow_file_name, flow_dir, init, port): + self.port = port + self._init = init + self.flow_dir, self.flow_file_name = flow_dir, flow_file_name + super().__init__() + + def _construct_command(self): + return [ + "dotnet", + EXECUTOR_SERVICE_DLL, + "--port", + str(self.port), + "--yaml_path", + self.flow_file_name, + "--assembly_folder", + ".", + "--connection_provider_url", + "", + "--log_path", + "", + "--serving", + ] + + def run(self): + try: + command = self._construct_command() + subprocess.run(command, cwd=self.flow_dir, stdout=sys.stdout, stderr=sys.stderr) + except KeyboardInterrupt: + pass diff --git a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py index 4988b7f02c62..85ff66cd8a56 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py @@ -3,7 +3,7 @@ import pytest from _constants import PROMPTFLOW_ROOT -from promptflow._cli._pf._flow import _resolve_python_flow_additional_includes +from promptflow._sdk._utils.serve_utils import _resolve_python_flow_additional_includes @pytest.mark.unittest