From 8703eed2930f55cce168595b419133d11c9aa3e7 Mon Sep 17 00:00:00 2001 From: Brynn Yin <24237253+brynn-code@users.noreply.github.com> Date: Wed, 24 Apr 2024 19:09:32 +0800 Subject: [PATCH] [Test] Move serving test to core (#2974) # Description Please add an informative description that covers that changes made by the pull request and link all relevant issues. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Signed-off-by: Brynn Yin (cherry picked from commit 4af2968d24279028d16e552a247f1073cdf1efed) --- .github/workflows/promptflow-core-test.yml | 36 ++ src/promptflow-core/pyproject.toml | 2 + .../azureml-serving/e2etests/__init__.py | 0 src/promptflow-core/tests/conftest.py | 365 +++++++++++++++++- .../tests/core/e2etests/test_eager_flow.py | 6 +- .../core/e2etests/test_eager_flow_serve.py | 259 +++++++++++++ .../e2etests/test_eager_flow_serve_fastapi.py | 261 +++++++++++++ .../tests/sdk_cli_test/conftest.py | 144 ------- .../sdk_cli_test/e2etests/test_flow_serve.py | 271 ------------- .../e2etests/test_flow_serve_fastapi.py | 273 ------------- .../promptflow/recording/local/test_utils.py | 6 +- .../recordings/local/node_cache.shelve.bak | 1 + .../recordings/local/node_cache.shelve.dat | Bin 630994 -> 632970 bytes .../recordings/local/node_cache.shelve.dir | 1 + 14 files changed, 920 insertions(+), 705 deletions(-) create mode 100644 src/promptflow-core/tests/azureml-serving/e2etests/__init__.py create mode 100644 src/promptflow-core/tests/core/e2etests/test_eager_flow_serve.py create mode 100644 src/promptflow-core/tests/core/e2etests/test_eager_flow_serve_fastapi.py diff --git a/.github/workflows/promptflow-core-test.yml b/.github/workflows/promptflow-core-test.yml index 8f53b1cb990..c5ab5b3e71b 100644 --- a/.github/workflows/promptflow-core-test.yml +++ b/.github/workflows/promptflow-core-test.yml @@ -43,6 +43,24 @@ jobs: - name: generate end-to-end test config from secret run: echo '${{ secrets.PF_TRACING_E2E_TEST_CONFIG }}' >> connections.json working-directory: ${{ env.WORKING_DIRECTORY }} + - name: set test mode + run: | + echo "PROMPT_FLOW_TEST_MODE=$(if [[ "${{ github.event_name }}" == "pull_request" ]]; then echo replay; else echo live; fi)" >> $GITHUB_ENV + - name: Azure login (non pull_request workflow) + if: github.event_name != 'pull_request' + uses: azure/login@v1 + with: + creds: ${{ secrets.AZURE_CREDENTIALS }} + - name: generate live test resources (non pull_request workflow) + if: github.event_name != 'pull_request' + uses: "./.github/actions/step_generate_configs" + with: + targetFolder: ${{ env.PROMPTFLOW_DIRECTORY }} + - name: generate live test resources (pull_request workflow) + if: github.event_name == 'pull_request' + working-directory: ${{ env.PROMPTFLOW_DIRECTORY }} + run: | + cp ${{ github.workspace }}/src/promptflow/dev-connections.json.example ${{ github.workspace }}/src/promptflow/connections.json - name: run core tests run: poetry run pytest ./tests/core --cov=promptflow --cov-config=pyproject.toml --cov-report=term --cov-report=html --cov-report=xml working-directory: ${{ env.WORKING_DIRECTORY }} @@ -77,6 +95,24 @@ jobs: poetry run pip show promptflow-tracing poetry run pip show promptflow-core working-directory: ${{ env.WORKING_DIRECTORY }} + - name: set test mode + run: | + echo "PROMPT_FLOW_TEST_MODE=$(if [[ "${{ github.event_name }}" == "pull_request" ]]; then echo replay; else echo live; fi)" >> $GITHUB_ENV + - name: Azure login (non pull_request workflow) + if: github.event_name != 'pull_request' + uses: azure/login@v1 + with: + creds: ${{ secrets.AZURE_CREDENTIALS }} + - name: generate live test resources (non pull_request workflow) + if: github.event_name != 'pull_request' + uses: "./.github/actions/step_generate_configs" + with: + targetFolder: ${{ env.PROMPTFLOW_DIRECTORY }} + - name: generate live test resources (pull_request workflow) + if: github.event_name == 'pull_request' + working-directory: ${{ env.PROMPTFLOW_DIRECTORY }} + run: | + cp ${{ github.workspace }}/src/promptflow/dev-connections.json.example ${{ github.workspace }}/src/promptflow/connections.json - name: run azureml-serving tests run: poetry run pytest ./tests/azureml-serving --cov=promptflow --cov-config=pyproject.toml --cov-report=term --cov-report=html --cov-report=xml working-directory: ${{ env.WORKING_DIRECTORY }} diff --git a/src/promptflow-core/pyproject.toml b/src/promptflow-core/pyproject.toml index 390bfb1fc18..e0c82a39e3a 100644 --- a/src/promptflow-core/pyproject.toml +++ b/src/promptflow-core/pyproject.toml @@ -77,6 +77,8 @@ promptflow-recording = {path = "../promptflow-recording"} pytest = "*" pytest-cov = "*" pytest-xdist = "*" +pytest-mock = "*" +mock = "*" [build-system] requires = [ diff --git a/src/promptflow-core/tests/azureml-serving/e2etests/__init__.py b/src/promptflow-core/tests/azureml-serving/e2etests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/promptflow-core/tests/conftest.py b/src/promptflow-core/tests/conftest.py index 026fc330c9c..e692b29f45b 100644 --- a/src/promptflow-core/tests/conftest.py +++ b/src/promptflow-core/tests/conftest.py @@ -1,16 +1,30 @@ import json +import multiprocessing +import os from pathlib import Path from unittest.mock import patch import pytest +from fastapi.testclient import TestClient +from pytest_mock import MockerFixture from promptflow._utils.flow_utils import resolve_flow_path +from promptflow.core._connection_provider._connection_provider import ConnectionProvider from promptflow.core._connection_provider._dict_connection_provider import DictConnectionProvider +from promptflow.core._serving.app import create_app as create_serving_app +from promptflow.executor._line_execution_process_pool import _process_wrapper +from promptflow.executor._process_manager import create_spawned_fork_process_manager +from promptflow.recording.local import recording_array_reset +from promptflow.recording.record_mode import is_in_ci_pipeline, is_live, is_record, is_replay +from promptflow.tracing._integrations._openai_injector import inject_openai_api +PROMPTFLOW_ROOT = Path(__file__).parent.parent.parent / "promptflow" TEST_CONFIG_ROOT = Path(__file__).parent.parent.parent / "promptflow" / "tests" / "test_configs" FLOW_ROOT = TEST_CONFIG_ROOT / "flows" EAGER_FLOW_ROOT = TEST_CONFIG_ROOT / "eager_flows" -CONNECTION_FILE = Path(__file__).parent.parent / "connections.json" +CONNECTION_FILE = PROMPTFLOW_ROOT / "connections.json" +RECORDINGS_TEST_CONFIGS_ROOT = Path(PROMPTFLOW_ROOT / "../promptflow-recording/recordings/local").resolve() +COUNTER_FILE = (Path(__file__) / "../count.json").resolve() def get_flow_folder(folder_name, root: str = FLOW_ROOT) -> Path: @@ -28,21 +42,350 @@ def get_yaml_file(folder_name, root: str = FLOW_ROOT, file_name: str = None) -> return yaml_file -@pytest.fixture -def dev_connections() -> dict: - with open(CONNECTION_FILE, "r") as f: - return json.load(f) +SpawnProcess = multiprocessing.get_context("spawn").Process + + +class MockSpawnProcess(SpawnProcess): + def __init__(self, group=None, target=None, *args, **kwargs): + if target == _process_wrapper: + target = _mock_process_wrapper + if target == create_spawned_fork_process_manager: + target = _mock_create_spawned_fork_process_manager + super().__init__(group, target, *args, **kwargs) @pytest.fixture -def mock_dict_azure_open_ai_connection(dev_connections): - connection = dev_connections["azure_open_ai_connection"] - # TODO(3128519): Remove this after the connection type is added to github secrets - if "type" not in connection: - connection["type"] = "AzureOpenAIConnection" +def recording_injection(mocker: MockerFixture): + original_process_class = multiprocessing.get_context("spawn").Process + multiprocessing.get_context("spawn").Process = MockSpawnProcess + if "spawn" == multiprocessing.get_start_method(): + multiprocessing.Process = MockSpawnProcess + + patches = setup_recording_injection_if_enabled() + + try: + yield + finally: + if is_replay() or is_record(): + from promptflow.recording.local import RecordStorage + + RecordStorage.get_instance().delete_lock_file() + if is_live(): + from promptflow.recording.local import Counter + + Counter.set_file(COUNTER_FILE) + Counter.delete_count_lock_file() + recording_array_reset() + + multiprocessing.get_context("spawn").Process = original_process_class + if "spawn" == multiprocessing.get_start_method(): + multiprocessing.Process = original_process_class + + for patcher in patches: + patcher.stop() + + +def setup_recording_injection_if_enabled(): + patches = [] + + def start_patches(patch_targets): + for target, mock_func in patch_targets.items(): + patcher = patch(target, mock_func) + patches.append(patcher) + patcher.start() + + if is_replay() or is_record(): + from promptflow.recording.local import ( + RecordStorage, + inject_async_with_recording, + inject_sync_with_recording, + mock_tool, + ) + from promptflow.recording.record_mode import check_pydantic_v2 + + check_pydantic_v2() + file_path = RECORDINGS_TEST_CONFIGS_ROOT / "node_cache.shelve" + RecordStorage.get_instance(file_path) + from promptflow._core.tool import tool as original_tool + + mocked_tool = mock_tool(original_tool) + patch_targets = { + "promptflow._core.tool.tool": mocked_tool, + # "promptflow.tool": mocked_tool, + "promptflow.core.tool": mocked_tool, + "promptflow.tracing._integrations._openai_injector.inject_sync": inject_sync_with_recording, + "promptflow.tracing._integrations._openai_injector.inject_async": inject_async_with_recording, + } + start_patches(patch_targets) + + if is_live() and is_in_ci_pipeline(): + from promptflow.recording.local import Counter, inject_async_with_recording, inject_sync_with_recording + + Counter.set_file(COUNTER_FILE) + patch_targets = { + "promptflow.tracing._integrations._openai_injector.inject_sync": inject_sync_with_recording, + "promptflow.tracing._integrations._openai_injector.inject_async": inject_async_with_recording, + } + start_patches(patch_targets) + + inject_openai_api() + return patches + + +def _mock_process_wrapper(*args, **kwargs): + setup_recording_injection_if_enabled() + return _process_wrapper(*args, **kwargs) + + +def _mock_create_spawned_fork_process_manager(*args, **kwargs): + setup_recording_injection_if_enabled() + return create_spawned_fork_process_manager(*args, **kwargs) + + +@pytest.fixture +def setup_connection_provider(): + if not ConnectionProvider._instance: + connection_dict = json.loads(open(CONNECTION_FILE, "r").read()) + ConnectionProvider._instance = DictConnectionProvider(connection_dict) + # patch get instance as executor run with sub-process and lost class instance with patch( "promptflow.connections.ConnectionProvider.get_instance", - return_value=DictConnectionProvider({"azure_open_ai_connection": connection}), + return_value=ConnectionProvider._instance, ): yield + + +# ==================== serving fixtures ==================== + + +@pytest.fixture +def serving_inject_dict_provider(setup_connection_provider): + with patch( + "promptflow.core._serving.flow_invoker.ConnectionProvider.init_from_provider_config", + return_value=ConnectionProvider._instance, + ): + yield + + +def create_client_by_model( + model_name: str, + mocker: MockerFixture, + connections: dict = {}, + extension_type=None, + environment_variables={}, + model_root=FLOW_ROOT, + init=None, +): + model_path = (Path(model_root) / model_name).resolve().absolute().as_posix() + mocker.patch.dict(os.environ, {"PROMPTFLOW_PROJECT_PATH": model_path}) + if connections: + mocker.patch.dict(os.environ, connections) + if extension_type and extension_type == "azureml": + environment_variables["API_TYPE"] = "${azure_open_ai_connection.api_type}" + app = create_serving_app(environment_variables=environment_variables, extension_type=extension_type, init=init) + app.config.update( + { + "TESTING": True, + } + ) + return app.test_client() + + +@pytest.fixture +def flow_serving_client(mocker: MockerFixture): + model_path = (Path(FLOW_ROOT) / "basic-with-connection").resolve().absolute().as_posix() + mocker.patch.dict(os.environ, {"PROMPTFLOW_PROJECT_PATH": model_path}) + mocker.patch.dict(os.environ, {"USER_AGENT": "test-user-agent"}) + app = create_serving_app(environment_variables={"API_TYPE": "${azure_open_ai_connection.api_type}"}) + app.config.update( + { + "TESTING": True, + } + ) + return app.test_client() + + +@pytest.fixture +def simple_eager_flow(mocker: MockerFixture): + return create_client_by_model("simple_with_dict_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def simple_eager_flow_primitive_output(mocker: MockerFixture): + return create_client_by_model("primitive_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def simple_eager_flow_dataclass_output(mocker: MockerFixture): + return create_client_by_model("flow_with_dataclass_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def non_json_serializable_output(mocker: MockerFixture): + return create_client_by_model("non_json_serializable_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def stream_output(mocker: MockerFixture): + return create_client_by_model("stream_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def multiple_stream_outputs(mocker: MockerFixture): + return create_client_by_model("multiple_stream_outputs", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def eager_flow_evc(mocker: MockerFixture): + return create_client_by_model("environment_variables_connection", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def eager_flow_evc_override(mocker: MockerFixture): + return create_client_by_model( + "environment_variables_connection", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "${azure_open_ai_connection.api_base}"}, + ) + + +@pytest.fixture +def eager_flow_evc_override_not_exist(mocker: MockerFixture): + return create_client_by_model( + "environment_variables", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "${azure_open_ai_connection.api_type}"}, + ) + + +@pytest.fixture +def eager_flow_evc_connection_not_exist(mocker: MockerFixture): + return create_client_by_model( + "evc_connection_not_exist", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "VALUE"}, + ) + + +@pytest.fixture +def callable_class(mocker: MockerFixture): + return create_client_by_model( + "basic_callable_class", mocker, model_root=EAGER_FLOW_ROOT, init={"obj_input": "input1"} + ) + + +# ==================== FastAPI serving fixtures ==================== + + +def create_fastapi_app(**kwargs): + return create_serving_app(engine="fastapi", **kwargs) + + +@pytest.fixture +def fastapi_flow_serving_client(mocker: MockerFixture): + # model_path = (Path(MODEL_ROOT) / "basic-with-connection").resolve().absolute().as_posix() + # mocker.patch.dict(os.environ, {"PROMPTFLOW_PROJECT_PATH": model_path}) + # mocker.patch.dict(os.environ, {"USER_AGENT": "test-user-agent"}) + # app = create_fastapi_app(environment_variables={"API_TYPE": "${azure_open_ai_connection.api_type}"}) + return fastapi_create_client_by_model( + "basic-with-connection", + mocker, + mock_envs={"USER_AGENT": "test-user-agent"}, + environment_variables={"API_TYPE": "${azure_open_ai_connection.api_type}"}, + ) + # return TestClient(app) + + +def fastapi_create_client_by_model( + model_name: str, + mocker: MockerFixture, + mock_envs: dict = {}, + extension_type=None, + environment_variables={}, + model_root=FLOW_ROOT, + init=None, +): + model_path = (Path(model_root) / model_name).resolve().absolute().as_posix() + mocker.patch.dict(os.environ, {"PROMPTFLOW_PROJECT_PATH": model_path}) + if mock_envs: + mocker.patch.dict(os.environ, mock_envs) + if extension_type and extension_type == "azureml": + environment_variables["API_TYPE"] = "${azure_open_ai_connection.api_type}" + app = create_fastapi_app(environment_variables=environment_variables, extension_type=extension_type, init=init) + return TestClient(app) + + +@pytest.fixture +def fastapi_simple_eager_flow(mocker: MockerFixture): + return fastapi_create_client_by_model("simple_with_dict_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_simple_eager_flow_primitive_output(mocker: MockerFixture): + return fastapi_create_client_by_model("primitive_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_simple_eager_flow_dataclass_output(mocker: MockerFixture): + return fastapi_create_client_by_model("flow_with_dataclass_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_non_json_serializable_output(mocker: MockerFixture): + return fastapi_create_client_by_model("non_json_serializable_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_stream_output(mocker: MockerFixture): + return fastapi_create_client_by_model("stream_output", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_multiple_stream_outputs(mocker: MockerFixture): + return fastapi_create_client_by_model("multiple_stream_outputs", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_eager_flow_evc(mocker: MockerFixture): + return fastapi_create_client_by_model("environment_variables_connection", mocker, model_root=EAGER_FLOW_ROOT) + + +@pytest.fixture +def fastapi_eager_flow_evc_override(mocker: MockerFixture): + return fastapi_create_client_by_model( + "environment_variables_connection", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "${azure_open_ai_connection.api_base}"}, + ) + + +@pytest.fixture +def fastapi_eager_flow_evc_override_not_exist(mocker: MockerFixture): + return fastapi_create_client_by_model( + "environment_variables", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "${azure_open_ai_connection.api_type}"}, + ) + + +@pytest.fixture +def fastapi_eager_flow_evc_connection_not_exist(mocker: MockerFixture): + return fastapi_create_client_by_model( + "evc_connection_not_exist", + mocker, + model_root=EAGER_FLOW_ROOT, + environment_variables={"TEST": "VALUE"}, + ) + + +@pytest.fixture +def fastapi_callable_class(mocker: MockerFixture): + return fastapi_create_client_by_model( + "basic_callable_class", mocker, model_root=EAGER_FLOW_ROOT, init={"obj_input": "input1"} + ) diff --git a/src/promptflow-core/tests/core/e2etests/test_eager_flow.py b/src/promptflow-core/tests/core/e2etests/test_eager_flow.py index daff96da76b..588a29a6169 100644 --- a/src/promptflow-core/tests/core/e2etests/test_eager_flow.py +++ b/src/promptflow-core/tests/core/e2etests/test_eager_flow.py @@ -39,7 +39,7 @@ async def func_entry_async(input_str: str) -> str: ] -@pytest.mark.usefixtures("dev_connections") +@pytest.mark.usefixtures("recording_injection", "setup_connection_provider") @pytest.mark.e2etest class TestEagerFlow: @pytest.mark.parametrize( @@ -89,7 +89,7 @@ class TestEagerFlow: ), ], ) - def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs, mock_dict_azure_open_ai_connection): + def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs): flow_file = get_yaml_file(flow_folder, root=EAGER_FLOW_ROOT) # Test submitting eager flow to script executor @@ -112,7 +112,7 @@ def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs, mock_di line_result2 = executor.exec_line(inputs=inputs, index=0) assert line_result1.output == line_result2.output - def test_flow_run_with_openai_chat(self, mock_dict_azure_open_ai_connection): + def test_flow_run_with_openai_chat(self): flow_file = get_yaml_file("callable_class_with_openai", root=EAGER_FLOW_ROOT, file_name="flow.flex.yaml") executor = ScriptExecutor(flow_file=flow_file, init_kwargs={"connection": "azure_open_ai_connection"}) diff --git a/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve.py b/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve.py new file mode 100644 index 00000000000..046856db43d --- /dev/null +++ b/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve.py @@ -0,0 +1,259 @@ +import json + +import pytest +from tests.conftest import PROMPTFLOW_ROOT + +from promptflow.core._serving.utils import load_feedback_swagger +from promptflow.exceptions import UserErrorException + +TEST_CONFIGS = PROMPTFLOW_ROOT / "tests" / "test_configs" / "eager_flows" + + +@pytest.mark.e2etest +@pytest.mark.usefixtures("recording_injection", "serving_inject_dict_provider") +class TestEagerFlowServe: + def test_eager_flow_serve(self, simple_eager_flow): + response = simple_eager_flow.post("/score", data=json.dumps({"input_val": "hi"})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + assert response == {"output": "Hello world! hi"} + + def test_eager_flow_swagger(self, simple_eager_flow): + swagger_dict = json.loads(simple_eager_flow.get("/swagger.json").data.decode()) + expected_swagger = { + "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, + "info": { + "title": "Promptflow[simple_with_dict_output] API", + "version": "1.0.0", + "x-flow-name": "simple_with_dict_output", + }, + "openapi": "3.0.0", + "paths": { + "/score": { + "post": { + "requestBody": { + "content": { + "application/json": { + "example": {}, + "schema": { + "properties": {"input_val": {"default": "gpt", "type": "string"}}, + "required": ["input_val"], + "type": "object", + }, + } + }, + "description": "promptflow " "input data", + "required": True, + }, + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "properties": {"output": {"type": "string"}}, + "type": "object", + } + } + }, + "description": "successful " "operation", + }, + "400": {"description": "Invalid " "input"}, + "default": {"description": "unexpected " "error"}, + }, + "summary": "run promptflow: " "simple_with_dict_output with an " "given input", + } + } + }, + "security": [{"bearerAuth": []}], + } + feedback_swagger = load_feedback_swagger() + expected_swagger["paths"]["/feedback"] = feedback_swagger + assert swagger_dict == expected_swagger + + def test_eager_flow_serve_primitive_output(self, simple_eager_flow_primitive_output): + response = simple_eager_flow_primitive_output.post("/score", data=json.dumps({"input_val": "hi"})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + # response original value + assert response == "Hello world! hi" + + def test_eager_flow_primitive_output_swagger(self, simple_eager_flow_primitive_output): + swagger_dict = json.loads(simple_eager_flow_primitive_output.get("/swagger.json").data.decode()) + expected_swagger = { + "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, + "info": { + "title": "Promptflow[primitive_output] API", + "version": "1.0.0", + "x-flow-name": "primitive_output", + }, + "openapi": "3.0.0", + "paths": { + "/score": { + "post": { + "requestBody": { + "content": { + "application/json": { + "example": {}, + "schema": { + "properties": {"input_val": {"default": "gpt", "type": "string"}}, + "required": ["input_val"], + "type": "object", + }, + } + }, + "description": "promptflow " "input data", + "required": True, + }, + "responses": { + "200": { + "content": {"application/json": {"schema": {"type": "object"}}}, + "description": "successful " "operation", + }, + "400": {"description": "Invalid " "input"}, + "default": {"description": "unexpected " "error"}, + }, + "summary": "run promptflow: primitive_output " "with an given input", + } + } + }, + "security": [{"bearerAuth": []}], + } + feedback_swagger = load_feedback_swagger() + expected_swagger["paths"]["/feedback"] = feedback_swagger + assert swagger_dict == expected_swagger + + def test_eager_flow_serve_dataclass_output(self, simple_eager_flow_dataclass_output): + response = simple_eager_flow_dataclass_output.post( + "/score", data=json.dumps({"text": "my_text", "models": ["my_model"]}) + ) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + # response dict of dataclass + assert response == {"models": ["my_model"], "text": "my_text"} + + def test_eager_flow_serve_non_json_serializable_output(self, mocker): + with pytest.raises(UserErrorException, match="Parse interface for 'my_flow' failed:"): + # instead of giving 400 response for all requests, we raise user error on serving now + + from tests.conftest import create_client_by_model + + create_client_by_model( + "non_json_serializable_output", + mocker, + model_root=TEST_CONFIGS, + ) + + @pytest.mark.parametrize( + "accept, expected_status_code, expected_content_type", + [ + ("text/event-stream", 200, "text/event-stream; charset=utf-8"), + ("text/html", 406, "application/json"), + ("application/json", 200, "application/json"), + ("*/*", 200, "application/json"), + ("text/event-stream, application/json", 200, "text/event-stream; charset=utf-8"), + ("application/json, */*", 200, "application/json"), + ("", 200, "application/json"), + ], + ) + def test_eager_flow_stream_output( + self, + stream_output, + accept, + expected_status_code, + expected_content_type, + ): + payload = { + "input_val": "val", + } + headers = { + "Content-Type": "application/json", + "Accept": accept, + } + response = stream_output.post("/score", json=payload, headers=headers) + error_msg = f"Response code indicates error {response.status_code} - {response.data.decode()}" + assert response.status_code == expected_status_code, error_msg + assert response.content_type == expected_content_type + + if response.status_code == 406: + assert response.json["error"]["code"] == "UserError" + assert ( + f"Media type {accept} in Accept header is not acceptable. Supported media type(s) -" + in response.json["error"]["message"] + ) + + if "text/event-stream" in response.content_type: + for line in response.data.decode().split("\n"): + print(line) + else: + result = response.json + print(result) + + def test_eager_flow_multiple_stream_output(self, multiple_stream_outputs): + headers = { + "Content-Type": "application/json", + "Accept": "text/event-stream", + } + response = multiple_stream_outputs.post("/score", data=json.dumps({"input_val": 1}), headers=headers) + assert ( + response.status_code == 400 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + assert response == {"error": {"code": "UserError", "message": "Multiple stream output fields not supported."}} + + def test_eager_flow_evc(self, eager_flow_evc): + # Supported: flow with EVC in definition + response = eager_flow_evc.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + assert response == "Hello world! azure" + + def test_eager_flow_evc_override(self, eager_flow_evc_override): + # Supported: EVC's connection exist in flow definition + response = eager_flow_evc_override.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + assert response != "Hello world! ${azure_open_ai_connection.api_base}" + + def test_eager_flow_evc_override_not_exist(self, eager_flow_evc_override_not_exist): + # EVC's connection not exist in flow definition, will resolve it. + response = eager_flow_evc_override_not_exist.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + # EVC not resolved since the connection not exist in flow definition + assert response == "Hello world! azure" + + def test_eager_flow_evc_connection_not_exist(self, eager_flow_evc_connection_not_exist): + # Won't get not existed connection since it's override + response = eager_flow_evc_connection_not_exist.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.data.decode()}" + response = json.loads(response.data.decode()) + # EVC not resolved since the connection not exist in flow definition + assert response == "Hello world! VALUE" + + def test_eager_flow_with_init(self, callable_class): + response1 = callable_class.post("/score", data=json.dumps({"func_input": "input2"})) + assert ( + response1.status_code == 200 + ), f"Response code indicates error {response1.status_code} - {response1.data.decode()}" + response1 = json.loads(response1.data.decode()) + + response2 = callable_class.post("/score", data=json.dumps({"func_input": "input2"})) + assert ( + response2.status_code == 200 + ), f"Response code indicates error {response2.status_code} - {response2.data.decode()}" + response2 = json.loads(response2.data.decode()) + assert response1 == response2 diff --git a/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve_fastapi.py b/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve_fastapi.py new file mode 100644 index 00000000000..bb7d06441b2 --- /dev/null +++ b/src/promptflow-core/tests/core/e2etests/test_eager_flow_serve_fastapi.py @@ -0,0 +1,261 @@ +import json + +import pytest +from tests.conftest import PROMPTFLOW_ROOT + +from promptflow.core._serving.utils import load_feedback_swagger +from promptflow.exceptions import UserErrorException + +TEST_CONFIGS = PROMPTFLOW_ROOT / "tests" / "test_configs" / "eager_flows" + + +@pytest.mark.e2etest +@pytest.mark.usefixtures("recording_injection", "serving_inject_dict_provider") +class TestEagerFlowServeFastApi: + def test_eager_flow_serve(self, fastapi_simple_eager_flow): + response = fastapi_simple_eager_flow.post("/score", data=json.dumps({"input_val": "hi"})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + assert response == {"output": "Hello world! hi"} + + def test_eager_flow_swagger(self, fastapi_simple_eager_flow): + swagger_dict = fastapi_simple_eager_flow.get("/swagger.json").json() + expected_swagger = { + "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, + "info": { + "title": "Promptflow[simple_with_dict_output] API", + "version": "1.0.0", + "x-flow-name": "simple_with_dict_output", + }, + "openapi": "3.0.0", + "paths": { + "/score": { + "post": { + "requestBody": { + "content": { + "application/json": { + "example": {}, + "schema": { + "properties": {"input_val": {"default": "gpt", "type": "string"}}, + "required": ["input_val"], + "type": "object", + }, + } + }, + "description": "promptflow " "input data", + "required": True, + }, + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "properties": {"output": {"type": "string"}}, + "type": "object", + } + } + }, + "description": "successful " "operation", + }, + "400": {"description": "Invalid " "input"}, + "default": {"description": "unexpected " "error"}, + }, + "summary": "run promptflow: " "simple_with_dict_output with an " "given input", + } + } + }, + "security": [{"bearerAuth": []}], + } + feedback_swagger = load_feedback_swagger() + expected_swagger["paths"]["/feedback"] = feedback_swagger + assert swagger_dict == expected_swagger + + def test_eager_flow_serve_primitive_output(self, fastapi_simple_eager_flow_primitive_output): + response = fastapi_simple_eager_flow_primitive_output.post("/score", data=json.dumps({"input_val": "hi"})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + # response original value + assert response == "Hello world! hi" + + def test_eager_flow_primitive_output_swagger(self, fastapi_simple_eager_flow_primitive_output): + swagger_dict = fastapi_simple_eager_flow_primitive_output.get("/swagger.json").json() + expected_swagger = { + "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, + "info": { + "title": "Promptflow[primitive_output] API", + "version": "1.0.0", + "x-flow-name": "primitive_output", + }, + "openapi": "3.0.0", + "paths": { + "/score": { + "post": { + "requestBody": { + "content": { + "application/json": { + "example": {}, + "schema": { + "properties": {"input_val": {"default": "gpt", "type": "string"}}, + "required": ["input_val"], + "type": "object", + }, + } + }, + "description": "promptflow " "input data", + "required": True, + }, + "responses": { + "200": { + "content": {"application/json": {"schema": {"type": "object"}}}, + "description": "successful " "operation", + }, + "400": {"description": "Invalid " "input"}, + "default": {"description": "unexpected " "error"}, + }, + "summary": "run promptflow: primitive_output " "with an given input", + } + } + }, + "security": [{"bearerAuth": []}], + } + feedback_swagger = load_feedback_swagger() + expected_swagger["paths"]["/feedback"] = feedback_swagger + assert swagger_dict == expected_swagger + + def test_eager_flow_serve_dataclass_output(self, fastapi_simple_eager_flow_dataclass_output): + response = fastapi_simple_eager_flow_dataclass_output.post( + "/score", data=json.dumps({"text": "my_text", "models": ["my_model"]}) + ) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + # response dict of dataclass + assert response == {"models": ["my_model"], "text": "my_text"} + + def test_eager_flow_serve_non_json_serializable_output(self, mocker): + with pytest.raises(UserErrorException, match="Parse interface for 'my_flow' failed:"): + # instead of giving 400 response for all requests, we raise user error on serving now + + from tests.conftest import fastapi_create_client_by_model + + fastapi_create_client_by_model( + "non_json_serializable_output", + mocker, + model_root=TEST_CONFIGS, + ) + + @pytest.mark.parametrize( + "accept, expected_status_code, expected_content_type", + [ + ("text/event-stream", 200, "text/event-stream; charset=utf-8"), + ("text/html", 406, "application/json"), + ("application/json", 200, "application/json"), + ("*/*", 200, "application/json"), + ("text/event-stream, application/json", 200, "text/event-stream; charset=utf-8"), + ("application/json, */*", 200, "application/json"), + ("", 200, "application/json"), + ], + ) + def test_eager_flow_stream_output( + self, + fastapi_stream_output, + accept, + expected_status_code, + expected_content_type, + ): + payload = { + "input_val": "val", + } + headers = { + "Content-Type": "application/json", + "Accept": accept, + } + response = fastapi_stream_output.post("/score", json=payload, headers=headers) + error_msg = f"Response code indicates error {response.status_code} - {response.content.decode()}" + res_content_type = response.headers.get("content-type") + assert response.status_code == expected_status_code, error_msg + assert res_content_type == expected_content_type + + if response.status_code == 406: + data = response.json() + assert data["error"]["code"] == "UserError" + assert ( + f"Media type {accept} in Accept header is not acceptable. Supported media type(s) -" + in data["error"]["message"] + ) + + if "text/event-stream" in res_content_type: + for line in response.content.decode().split("\n"): + print(line) + else: + result = response.json() + print(result) + + def test_eager_flow_multiple_stream_output(self, fastapi_multiple_stream_outputs): + headers = { + "Content-Type": "application/json", + "Accept": "text/event-stream", + } + response = fastapi_multiple_stream_outputs.post("/score", data=json.dumps({"input_val": 1}), headers=headers) + assert ( + response.status_code == 400 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + assert response == {"error": {"code": "UserError", "message": "Multiple stream output fields not supported."}} + + def test_eager_flow_evc(self, fastapi_eager_flow_evc): + # Supported: flow with EVC in definition + response = fastapi_eager_flow_evc.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + assert response == "Hello world! azure" + + def test_eager_flow_evc_override(self, fastapi_eager_flow_evc_override): + # Supported: EVC's connection exist in flow definition + response = fastapi_eager_flow_evc_override.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + assert response != "Hello world! ${azure_open_ai_connection.api_base}" + + def test_eager_flow_evc_override_not_exist(self, fastapi_eager_flow_evc_override_not_exist): + # EVC's connection not exist in flow definition, will resolve it. + response = fastapi_eager_flow_evc_override_not_exist.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + # EVC not resolved since the connection not exist in flow definition + assert response == "Hello world! azure" + + def test_eager_flow_evc_connection_not_exist(self, fastapi_eager_flow_evc_connection_not_exist): + # Won't get not existed connection since it's override + response = fastapi_eager_flow_evc_connection_not_exist.post("/score", data=json.dumps({})) + assert ( + response.status_code == 200 + ), f"Response code indicates error {response.status_code} - {response.content.decode()}" + response = response.json() + # EVC not resolved since the connection not exist in flow definition + assert response == "Hello world! VALUE" + + def test_eager_flow_with_init(self, fastapi_callable_class): + response1 = fastapi_callable_class.post("/score", data=json.dumps({"func_input": "input2"})) + assert ( + response1.status_code == 200 + ), f"Response code indicates error {response1.status_code} - {response1.content.decode()}" + response1 = response1.json() + + response2 = fastapi_callable_class.post("/score", data=json.dumps({"func_input": "input2"})) + assert ( + response2.status_code == 200 + ), f"Response code indicates error {response2.status_code} - {response2.content.decode()}" + response2 = response2.json() + assert response1 == response2 diff --git a/src/promptflow-devkit/tests/sdk_cli_test/conftest.py b/src/promptflow-devkit/tests/sdk_cli_test/conftest.py index cab32c4a106..95aefcc52fd 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/conftest.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/conftest.py @@ -232,78 +232,6 @@ def serving_client_with_environment_variables(mocker: MockerFixture): ) -@pytest.fixture -def simple_eager_flow(mocker: MockerFixture): - return create_client_by_model("simple_with_dict_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def simple_eager_flow_primitive_output(mocker: MockerFixture): - return create_client_by_model("primitive_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def simple_eager_flow_dataclass_output(mocker: MockerFixture): - return create_client_by_model("flow_with_dataclass_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def non_json_serializable_output(mocker: MockerFixture): - return create_client_by_model("non_json_serializable_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def stream_output(mocker: MockerFixture): - return create_client_by_model("stream_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def multiple_stream_outputs(mocker: MockerFixture): - return create_client_by_model("multiple_stream_outputs", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def eager_flow_evc(mocker: MockerFixture): - return create_client_by_model("environment_variables_connection", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def eager_flow_evc_override(mocker: MockerFixture): - return create_client_by_model( - "environment_variables_connection", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "${azure_open_ai_connection.api_base}"}, - ) - - -@pytest.fixture -def eager_flow_evc_override_not_exist(mocker: MockerFixture): - return create_client_by_model( - "environment_variables", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "${azure_open_ai_connection.api_type}"}, - ) - - -@pytest.fixture -def eager_flow_evc_connection_not_exist(mocker: MockerFixture): - return create_client_by_model( - "evc_connection_not_exist", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "VALUE"}, - ) - - -@pytest.fixture -def callable_class(mocker: MockerFixture): - return create_client_by_model( - "basic_callable_class", mocker, model_root=EAGER_FLOW_ROOT, init={"obj_input": "input1"} - ) - - # ==================== FastAPI serving fixtures ==================== @@ -384,78 +312,6 @@ def fastapi_serving_client_with_environment_variables(mocker: MockerFixture): ) -@pytest.fixture -def fastapi_simple_eager_flow(mocker: MockerFixture): - return fastapi_create_client_by_model("simple_with_dict_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_simple_eager_flow_primitive_output(mocker: MockerFixture): - return fastapi_create_client_by_model("primitive_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_simple_eager_flow_dataclass_output(mocker: MockerFixture): - return fastapi_create_client_by_model("flow_with_dataclass_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_non_json_serializable_output(mocker: MockerFixture): - return fastapi_create_client_by_model("non_json_serializable_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_stream_output(mocker: MockerFixture): - return fastapi_create_client_by_model("stream_output", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_multiple_stream_outputs(mocker: MockerFixture): - return fastapi_create_client_by_model("multiple_stream_outputs", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_eager_flow_evc(mocker: MockerFixture): - return fastapi_create_client_by_model("environment_variables_connection", mocker, model_root=EAGER_FLOW_ROOT) - - -@pytest.fixture -def fastapi_eager_flow_evc_override(mocker: MockerFixture): - return fastapi_create_client_by_model( - "environment_variables_connection", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "${azure_open_ai_connection.api_base}"}, - ) - - -@pytest.fixture -def fastapi_eager_flow_evc_override_not_exist(mocker: MockerFixture): - return fastapi_create_client_by_model( - "environment_variables", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "${azure_open_ai_connection.api_type}"}, - ) - - -@pytest.fixture -def fastapi_eager_flow_evc_connection_not_exist(mocker: MockerFixture): - return fastapi_create_client_by_model( - "evc_connection_not_exist", - mocker, - model_root=EAGER_FLOW_ROOT, - environment_variables={"TEST": "VALUE"}, - ) - - -@pytest.fixture -def fastapi_callable_class(mocker: MockerFixture): - return fastapi_create_client_by_model( - "basic_callable_class", mocker, model_root=EAGER_FLOW_ROOT, init={"obj_input": "input1"} - ) - - # ==================== Recording injection ==================== # To inject patches in subprocesses, add new mock method in setup_recording_injection_if_enabled # in fork mode, this is automatically enabled. diff --git a/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve.py b/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve.py index e6c033e85dd..f13362adc7e 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve.py @@ -3,7 +3,6 @@ import re import pytest -from _constants import PROMPTFLOW_ROOT from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider @@ -13,11 +12,8 @@ from promptflow._utils.multimedia_utils import OpenaiVisionMultimediaProcessor from promptflow.core._serving.constants import FEEDBACK_TRACE_FIELD_NAME from promptflow.core._serving.utils import load_feedback_swagger -from promptflow.exceptions import UserErrorException from promptflow.tracing._operation_context import OperationContext -TEST_CONFIGS = PROMPTFLOW_ROOT / "tests" / "test_configs" / "eager_flows" - @pytest.mark.usefixtures("recording_injection", "setup_local_connection") @pytest.mark.e2etest @@ -436,270 +432,3 @@ def test_flow_with_environment_variables(serving_client_with_environment_variabl response = json.loads(response.data.decode()) assert {"output"} == response.keys() assert response["output"] == value - - -@pytest.mark.e2etest -def test_eager_flow_serve(simple_eager_flow): - response = simple_eager_flow.post("/score", data=json.dumps({"input_val": "hi"})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - assert response == {"output": "Hello world! hi"} - - -@pytest.mark.e2etest -def test_eager_flow_swagger(simple_eager_flow): - swagger_dict = json.loads(simple_eager_flow.get("/swagger.json").data.decode()) - expected_swagger = { - "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, - "info": { - "title": "Promptflow[simple_with_dict_output] API", - "version": "1.0.0", - "x-flow-name": "simple_with_dict_output", - }, - "openapi": "3.0.0", - "paths": { - "/score": { - "post": { - "requestBody": { - "content": { - "application/json": { - "example": {}, - "schema": { - "properties": {"input_val": {"default": "gpt", "type": "string"}}, - "required": ["input_val"], - "type": "object", - }, - } - }, - "description": "promptflow " "input data", - "required": True, - }, - "responses": { - "200": { - "content": { - "application/json": { - "schema": { - "properties": {"output": {"type": "string"}}, - "type": "object", - } - } - }, - "description": "successful " "operation", - }, - "400": {"description": "Invalid " "input"}, - "default": {"description": "unexpected " "error"}, - }, - "summary": "run promptflow: " "simple_with_dict_output with an " "given input", - } - } - }, - "security": [{"bearerAuth": []}], - } - feedback_swagger = load_feedback_swagger() - expected_swagger["paths"]["/feedback"] = feedback_swagger - assert swagger_dict == expected_swagger - - -@pytest.mark.e2etest -def test_eager_flow_serve_primitive_output(simple_eager_flow_primitive_output): - response = simple_eager_flow_primitive_output.post("/score", data=json.dumps({"input_val": "hi"})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - # response original value - assert response == "Hello world! hi" - - -@pytest.mark.e2etest -def test_eager_flow_primitive_output_swagger(simple_eager_flow_primitive_output): - swagger_dict = json.loads(simple_eager_flow_primitive_output.get("/swagger.json").data.decode()) - expected_swagger = { - "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, - "info": {"title": "Promptflow[primitive_output] API", "version": "1.0.0", "x-flow-name": "primitive_output"}, - "openapi": "3.0.0", - "paths": { - "/score": { - "post": { - "requestBody": { - "content": { - "application/json": { - "example": {}, - "schema": { - "properties": {"input_val": {"default": "gpt", "type": "string"}}, - "required": ["input_val"], - "type": "object", - }, - } - }, - "description": "promptflow " "input data", - "required": True, - }, - "responses": { - "200": { - "content": {"application/json": {"schema": {"type": "object"}}}, - "description": "successful " "operation", - }, - "400": {"description": "Invalid " "input"}, - "default": {"description": "unexpected " "error"}, - }, - "summary": "run promptflow: primitive_output " "with an given input", - } - } - }, - "security": [{"bearerAuth": []}], - } - feedback_swagger = load_feedback_swagger() - expected_swagger["paths"]["/feedback"] = feedback_swagger - assert swagger_dict == expected_swagger - - -@pytest.mark.e2etest -def test_eager_flow_serve_dataclass_output(simple_eager_flow_dataclass_output): - response = simple_eager_flow_dataclass_output.post( - "/score", data=json.dumps({"text": "my_text", "models": ["my_model"]}) - ) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - # response dict of dataclass - assert response == {"models": ["my_model"], "text": "my_text"} - - -@pytest.mark.e2etest -def test_eager_flow_serve_non_json_serializable_output(mocker): - with pytest.raises(UserErrorException, match="Parse interface for 'my_flow' failed:"): - # instead of giving 400 response for all requests, we raise user error on serving now - - from ..conftest import create_client_by_model - - create_client_by_model( - "non_json_serializable_output", - mocker, - model_root=TEST_CONFIGS, - ) - - -@pytest.mark.e2etest -@pytest.mark.parametrize( - "accept, expected_status_code, expected_content_type", - [ - ("text/event-stream", 200, "text/event-stream; charset=utf-8"), - ("text/html", 406, "application/json"), - ("application/json", 200, "application/json"), - ("*/*", 200, "application/json"), - ("text/event-stream, application/json", 200, "text/event-stream; charset=utf-8"), - ("application/json, */*", 200, "application/json"), - ("", 200, "application/json"), - ], -) -def test_eager_flow_stream_output( - stream_output, - accept, - expected_status_code, - expected_content_type, -): - payload = { - "input_val": "val", - } - headers = { - "Content-Type": "application/json", - "Accept": accept, - } - response = stream_output.post("/score", json=payload, headers=headers) - error_msg = f"Response code indicates error {response.status_code} - {response.data.decode()}" - assert response.status_code == expected_status_code, error_msg - assert response.content_type == expected_content_type - - if response.status_code == 406: - assert response.json["error"]["code"] == "UserError" - assert ( - f"Media type {accept} in Accept header is not acceptable. Supported media type(s) -" - in response.json["error"]["message"] - ) - - if "text/event-stream" in response.content_type: - for line in response.data.decode().split("\n"): - print(line) - else: - result = response.json - print(result) - - -@pytest.mark.e2etest -def test_eager_flow_multiple_stream_output(multiple_stream_outputs): - headers = { - "Content-Type": "application/json", - "Accept": "text/event-stream", - } - response = multiple_stream_outputs.post("/score", data=json.dumps({"input_val": 1}), headers=headers) - assert ( - response.status_code == 400 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - assert response == {"error": {"code": "UserError", "message": "Multiple stream output fields not supported."}} - - -@pytest.mark.e2etest -def test_eager_flow_evc(eager_flow_evc): - # Supported: flow with EVC in definition - response = eager_flow_evc.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - assert response == "Hello world! azure" - - -@pytest.mark.e2etest -def test_eager_flow_evc_override(eager_flow_evc_override): - # Supported: EVC's connection exist in flow definition - response = eager_flow_evc_override.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - assert response != "Hello world! ${azure_open_ai_connection.api_base}" - - -@pytest.mark.e2etest -def test_eager_flow_evc_override_not_exist(eager_flow_evc_override_not_exist): - # EVC's connection not exist in flow definition, will resolve it. - response = eager_flow_evc_override_not_exist.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - # EVC not resolved since the connection not exist in flow definition - assert response == "Hello world! azure" - - -@pytest.mark.e2etest -def test_eager_flow_evc_connection_not_exist(eager_flow_evc_connection_not_exist): - # Won't get not existed connection since it's override - response = eager_flow_evc_connection_not_exist.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.data.decode()}" - response = json.loads(response.data.decode()) - # EVC not resolved since the connection not exist in flow definition - assert response == "Hello world! VALUE" - - -@pytest.mark.e2etest -def test_eager_flow_with_init(callable_class): - response1 = callable_class.post("/score", data=json.dumps({"func_input": "input2"})) - assert ( - response1.status_code == 200 - ), f"Response code indicates error {response1.status_code} - {response1.data.decode()}" - response1 = json.loads(response1.data.decode()) - - response2 = callable_class.post("/score", data=json.dumps({"func_input": "input2"})) - assert ( - response2.status_code == 200 - ), f"Response code indicates error {response2.status_code} - {response2.data.decode()}" - response2 = json.loads(response2.data.decode()) - assert response1 == response2 diff --git a/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve_fastapi.py b/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve_fastapi.py index c3c1ff9ad95..7f97f511e13 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve_fastapi.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_flow_serve_fastapi.py @@ -3,7 +3,6 @@ import re import pytest -from _constants import PROMPTFLOW_ROOT from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider @@ -13,11 +12,8 @@ from promptflow._utils.multimedia_utils import OpenaiVisionMultimediaProcessor from promptflow.core._serving.constants import FEEDBACK_TRACE_FIELD_NAME from promptflow.core._serving.utils import load_feedback_swagger -from promptflow.exceptions import UserErrorException from promptflow.tracing._operation_context import OperationContext -TEST_CONFIGS = PROMPTFLOW_ROOT / "tests" / "test_configs" / "eager_flows" - @pytest.mark.usefixtures("recording_injection", "setup_local_connection") @pytest.mark.e2etest @@ -442,272 +438,3 @@ def test_flow_with_environment_variables(fastapi_serving_client_with_environment response = response.json() assert {"output"} == response.keys() assert response["output"] == value - - -@pytest.mark.e2etest -def test_eager_flow_serve(fastapi_simple_eager_flow): - response = fastapi_simple_eager_flow.post("/score", data=json.dumps({"input_val": "hi"})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - assert response == {"output": "Hello world! hi"} - - -@pytest.mark.e2etest -def test_eager_flow_swagger(fastapi_simple_eager_flow): - swagger_dict = fastapi_simple_eager_flow.get("/swagger.json").json() - expected_swagger = { - "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, - "info": { - "title": "Promptflow[simple_with_dict_output] API", - "version": "1.0.0", - "x-flow-name": "simple_with_dict_output", - }, - "openapi": "3.0.0", - "paths": { - "/score": { - "post": { - "requestBody": { - "content": { - "application/json": { - "example": {}, - "schema": { - "properties": {"input_val": {"default": "gpt", "type": "string"}}, - "required": ["input_val"], - "type": "object", - }, - } - }, - "description": "promptflow " "input data", - "required": True, - }, - "responses": { - "200": { - "content": { - "application/json": { - "schema": { - "properties": {"output": {"type": "string"}}, - "type": "object", - } - } - }, - "description": "successful " "operation", - }, - "400": {"description": "Invalid " "input"}, - "default": {"description": "unexpected " "error"}, - }, - "summary": "run promptflow: " "simple_with_dict_output with an " "given input", - } - } - }, - "security": [{"bearerAuth": []}], - } - feedback_swagger = load_feedback_swagger() - expected_swagger["paths"]["/feedback"] = feedback_swagger - assert swagger_dict == expected_swagger - - -@pytest.mark.e2etest -def test_eager_flow_serve_primitive_output(fastapi_simple_eager_flow_primitive_output): - response = fastapi_simple_eager_flow_primitive_output.post("/score", data=json.dumps({"input_val": "hi"})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - # response original value - assert response == "Hello world! hi" - - -@pytest.mark.e2etest -def test_eager_flow_primitive_output_swagger(fastapi_simple_eager_flow_primitive_output): - swagger_dict = fastapi_simple_eager_flow_primitive_output.get("/swagger.json").json() - expected_swagger = { - "components": {"securitySchemes": {"bearerAuth": {"scheme": "bearer", "type": "http"}}}, - "info": {"title": "Promptflow[primitive_output] API", "version": "1.0.0", "x-flow-name": "primitive_output"}, - "openapi": "3.0.0", - "paths": { - "/score": { - "post": { - "requestBody": { - "content": { - "application/json": { - "example": {}, - "schema": { - "properties": {"input_val": {"default": "gpt", "type": "string"}}, - "required": ["input_val"], - "type": "object", - }, - } - }, - "description": "promptflow " "input data", - "required": True, - }, - "responses": { - "200": { - "content": {"application/json": {"schema": {"type": "object"}}}, - "description": "successful " "operation", - }, - "400": {"description": "Invalid " "input"}, - "default": {"description": "unexpected " "error"}, - }, - "summary": "run promptflow: primitive_output " "with an given input", - } - } - }, - "security": [{"bearerAuth": []}], - } - feedback_swagger = load_feedback_swagger() - expected_swagger["paths"]["/feedback"] = feedback_swagger - assert swagger_dict == expected_swagger - - -@pytest.mark.e2etest -def test_eager_flow_serve_dataclass_output(fastapi_simple_eager_flow_dataclass_output): - response = fastapi_simple_eager_flow_dataclass_output.post( - "/score", data=json.dumps({"text": "my_text", "models": ["my_model"]}) - ) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - # response dict of dataclass - assert response == {"models": ["my_model"], "text": "my_text"} - - -@pytest.mark.e2etest -def test_eager_flow_serve_non_json_serializable_output(mocker): - with pytest.raises(UserErrorException, match="Parse interface for 'my_flow' failed:"): - # instead of giving 400 response for all requests, we raise user error on serving now - - from ..conftest import fastapi_create_client_by_model - - fastapi_create_client_by_model( - "non_json_serializable_output", - mocker, - model_root=TEST_CONFIGS, - ) - - -@pytest.mark.e2etest -@pytest.mark.parametrize( - "accept, expected_status_code, expected_content_type", - [ - ("text/event-stream", 200, "text/event-stream; charset=utf-8"), - ("text/html", 406, "application/json"), - ("application/json", 200, "application/json"), - ("*/*", 200, "application/json"), - ("text/event-stream, application/json", 200, "text/event-stream; charset=utf-8"), - ("application/json, */*", 200, "application/json"), - ("", 200, "application/json"), - ], -) -def test_eager_flow_stream_output( - fastapi_stream_output, - accept, - expected_status_code, - expected_content_type, -): - payload = { - "input_val": "val", - } - headers = { - "Content-Type": "application/json", - "Accept": accept, - } - response = fastapi_stream_output.post("/score", json=payload, headers=headers) - error_msg = f"Response code indicates error {response.status_code} - {response.content.decode()}" - res_content_type = response.headers.get("content-type") - assert response.status_code == expected_status_code, error_msg - assert res_content_type == expected_content_type - - if response.status_code == 406: - data = response.json() - assert data["error"]["code"] == "UserError" - assert ( - f"Media type {accept} in Accept header is not acceptable. Supported media type(s) -" - in data["error"]["message"] - ) - - if "text/event-stream" in res_content_type: - for line in response.content.decode().split("\n"): - print(line) - else: - result = response.json() - print(result) - - -@pytest.mark.e2etest -def test_eager_flow_multiple_stream_output(fastapi_multiple_stream_outputs): - headers = { - "Content-Type": "application/json", - "Accept": "text/event-stream", - } - response = fastapi_multiple_stream_outputs.post("/score", data=json.dumps({"input_val": 1}), headers=headers) - assert ( - response.status_code == 400 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - assert response == {"error": {"code": "UserError", "message": "Multiple stream output fields not supported."}} - - -@pytest.mark.e2etest -def test_eager_flow_evc(fastapi_eager_flow_evc): - # Supported: flow with EVC in definition - response = fastapi_eager_flow_evc.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - assert response == "Hello world! azure" - - -@pytest.mark.e2etest -def test_eager_flow_evc_override(fastapi_eager_flow_evc_override): - # Supported: EVC's connection exist in flow definition - response = fastapi_eager_flow_evc_override.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - assert response != "Hello world! ${azure_open_ai_connection.api_base}" - - -@pytest.mark.e2etest -def test_eager_flow_evc_override_not_exist(fastapi_eager_flow_evc_override_not_exist): - # EVC's connection not exist in flow definition, will resolve it. - response = fastapi_eager_flow_evc_override_not_exist.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - # EVC not resolved since the connection not exist in flow definition - assert response == "Hello world! azure" - - -@pytest.mark.e2etest -def test_eager_flow_evc_connection_not_exist(fastapi_eager_flow_evc_connection_not_exist): - # Won't get not existed connection since it's override - response = fastapi_eager_flow_evc_connection_not_exist.post("/score", data=json.dumps({})) - assert ( - response.status_code == 200 - ), f"Response code indicates error {response.status_code} - {response.content.decode()}" - response = response.json() - # EVC not resolved since the connection not exist in flow definition - assert response == "Hello world! VALUE" - - -@pytest.mark.e2etest -def test_eager_flow_with_init(fastapi_callable_class): - response1 = fastapi_callable_class.post("/score", data=json.dumps({"func_input": "input2"})) - assert ( - response1.status_code == 200 - ), f"Response code indicates error {response1.status_code} - {response1.content.decode()}" - response1 = response1.json() - - response2 = fastapi_callable_class.post("/score", data=json.dumps({"func_input": "input2"})) - assert ( - response2.status_code == 200 - ), f"Response code indicates error {response2.status_code} - {response2.content.decode()}" - response2 = response2.json() - assert response1 == response2 diff --git a/src/promptflow-recording/promptflow/recording/local/test_utils.py b/src/promptflow-recording/promptflow/recording/local/test_utils.py index 0b8e34826f4..6e7970e1172 100644 --- a/src/promptflow-recording/promptflow/recording/local/test_utils.py +++ b/src/promptflow-recording/promptflow/recording/local/test_utils.py @@ -7,14 +7,14 @@ import requests -from promptflow._cli._pf._service import _start_background_service_on_unix, _start_background_service_on_windows -from promptflow._sdk._service.utils.utils import get_pfs_port - def invoke_prompt_flow_service() -> str: # invoke prompt flow service as a standby service # so use some private APIs, instead of existing API # then this port won't be recorded in pf.config + from promptflow._cli._pf._service import _start_background_service_on_unix, _start_background_service_on_windows + from promptflow._sdk._service.utils.utils import get_pfs_port + port = str(get_pfs_port()) if platform.system() == "Windows": _start_background_service_on_windows(port) diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.bak b/src/promptflow-recording/recordings/local/node_cache.shelve.bak index 1c6ca0ed3a6..3409e81e5a0 100644 --- a/src/promptflow-recording/recordings/local/node_cache.shelve.bak +++ b/src/promptflow-recording/recordings/local/node_cache.shelve.bak @@ -110,3 +110,4 @@ '07226955e1e29e7dc59073f46aabbde06cb4d6cf', (613376, 11396) 'f7ec26ad9b25d8c0391b005443cb2707d4212360', (625152, 2616) '5dbada5cafa016d2171e6c8643ba4c8397f03d7f', (628224, 2770) +'ab3563e99d1ee052e067952ab332536a6bf5c025', (631296, 1674) diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.dat b/src/promptflow-recording/recordings/local/node_cache.shelve.dat index 2fdd46e7b4e3c1eb782a51107b3a766fb438e44f..729f9c681ac1003de26ffb764229482277b9b5e8 100644 GIT binary patch delta 866 zcmZ9KO-~b16oy-Rr(YVdwm>Nbn~G&nKtK@Ch=S1&3Wb)U16b;GI_({pc6vH931}uN z854D3Vo11gV{q-pL?dgq?quP|&gc(N|ADvDxRBjF@0|0_d!Bm_4`RBg*k-fsWLCn&v4vVa46UX`Syss4XA~qqIW*Ge zrCN#K%xcPpSXRLwKf%@7vV66uLLdXp^|GLs1!y6^TbN)|tIj!Ti=t~4U4tO;y+lzG ziF$l|ThAj18PLvh?wUt7F9}*U3t*Mw3~z6m+=bCx|V{hmDT!wcXuDPyZ8*`3=l|i4f!AB<2HQR z0FlL43@EVp>Qj8Jeu;cRgrbh844|9;FH1VUVZi!Pc|*t32FOR{lCJB#T8~y{aVJjW g2%g7_IEft|no&tX1&@=YiwW3)*t00$sOtTH0r%1f@Bjb+ delta 27 icmeCWsCMa~T0;wC3sVbo3rh=Y3tJ0&3&$4DOA7#&j|wgT diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.dir b/src/promptflow-recording/recordings/local/node_cache.shelve.dir index 1c6ca0ed3a6..3409e81e5a0 100644 --- a/src/promptflow-recording/recordings/local/node_cache.shelve.dir +++ b/src/promptflow-recording/recordings/local/node_cache.shelve.dir @@ -110,3 +110,4 @@ '07226955e1e29e7dc59073f46aabbde06cb4d6cf', (613376, 11396) 'f7ec26ad9b25d8c0391b005443cb2707d4212360', (625152, 2616) '5dbada5cafa016d2171e6c8643ba4c8397f03d7f', (628224, 2770) +'ab3563e99d1ee052e067952ab332536a6bf5c025', (631296, 1674)