Skip to content

Commit

Permalink
Merge branch 'main' into task/addSimulator
Browse files Browse the repository at this point in the history
  • Loading branch information
nagkumar91 authored Apr 15, 2024
2 parents 89d1aec + 9f51888 commit b58e70b
Show file tree
Hide file tree
Showing 21 changed files with 807 additions and 492 deletions.
514 changes: 279 additions & 235 deletions .github/workflows/promptflow-release-testing-matrix.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
_ScopeDependentOperations,
)

from promptflow._sdk._errors import ConnectionClassNotFoundError
from promptflow._sdk.entities._connection import CustomConnection, _Connection
from promptflow._sdk.entities._connection import _Connection
from promptflow.azure._restclient.flow_service_caller import FlowServiceCaller
from promptflow.core._connection_provider._workspace_connection_provider import WorkspaceConnectionProvider
from promptflow.core._errors import OpenURLFailedUserError
Expand Down Expand Up @@ -45,29 +44,8 @@ def __init__(
self._credential,
)

@classmethod
def _convert_core_connection_to_sdk_connection(cls, core_conn):
# TODO: Refine this and connection operation ones to (devkit) _Connection._from_core_object
sdk_conn_mapping = _Connection.SUPPORTED_TYPES
sdk_conn_cls = sdk_conn_mapping.get(core_conn.type)
if sdk_conn_cls is None:
raise ConnectionClassNotFoundError(
f"Correspond sdk connection type not found for core connection type: {core_conn.type!r}, "
f"please re-install the 'promptflow' package."
)
common_args = {
"name": core_conn.name,
"module": core_conn.module,
"expiry_time": core_conn.expiry_time,
"created_date": core_conn.created_date,
"last_modified_date": core_conn.last_modified_date,
}
if sdk_conn_cls is CustomConnection:
return sdk_conn_cls(configs=core_conn.configs, secrets=core_conn.secrets, **common_args)
return sdk_conn_cls(**dict(core_conn), **common_args)

def get(self, name, **kwargs):
return self._convert_core_connection_to_sdk_connection(self._provider.get(name))
return _Connection._from_core_connection(self._provider.get(name))

@classmethod
def _direct_get(cls, name, subscription_id, resource_group_name, workspace_name, credential):
Expand All @@ -76,7 +54,7 @@ def _direct_get(cls, name, subscription_id, resource_group_name, workspace_name,
permission(workspace/list secrets). As create azure pf_client requires workspace read permission.
"""
provider = WorkspaceConnectionProvider(subscription_id, resource_group_name, workspace_name, credential)
return provider.get(name=name)
return _Connection._from_core_connection(provider.get(name=name))

# Keep this as promptflow tools is using this method
_build_connection_dict = WorkspaceConnectionProvider._build_connection_dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ def open_url(cls, token, url, action, host="management.azure.com", method="GET",

@classmethod
def validate_and_fallback_connection_type(cls, name, type_name, category, metadata):
# Note: Legacy CustomKeys may store different connection types, e.g. openai, serp.
# In this case, type name will not be None.
if type_name:
return type_name
# Below category has corresponding connection type in PromptFlow, so we can fall back directly.
# Note: CustomKeys may store different connection types for now, e.g. openai, serp.
if category in [
ConnectionCategory.AzureOpenAI,
ConnectionCategory.OpenAI,
Expand All @@ -179,6 +180,8 @@ def validate_and_fallback_connection_type(cls, name, type_name, category, metada
ConnectionCategory.Serverless,
]:
return category
if category == ConnectionCategory.CustomKeys:
return CustomConnection.__name__
if category == ConnectionCategory.CognitiveService:
kind = get_case_insensitive_key(metadata, "Kind")
if kind == "Content Safety":
Expand Down Expand Up @@ -343,6 +346,8 @@ def _build_connection_dict(cls, name, subscription_id, resource_group_name, work
raise OpenURLUserAuthenticationError(message=auth_error_message)
except ClientAuthenticationError as e:
raise UserErrorException(target=ErrorTarget.CORE, message=str(e), error=e)
except UserErrorException:
raise
except Exception as e:
raise SystemErrorException(target=ErrorTarget.CORE, message=str(e), error=e)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def record_tracing_metrics(self, flow_run: FlowRunInfo, node_runs: Dict[str, Run
try:
for _, run in node_runs.items():
flow_id = flow_run.flow_id if flow_run is not None else "default"
if len(run.system_metrics) > 0:
if run.system_metrics and len(run.system_metrics) > 0:
duration = run.system_metrics.get("duration", None)
if duration is not None:
duration = duration * 1000
Expand Down
8 changes: 8 additions & 0 deletions src/promptflow-core/promptflow/executor/_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ProcessTerminatedTimeout,
SpawnedForkProcessManagerStartFailure,
)
from promptflow.executor._prompty_executor import PromptyExecutor
from promptflow.executor._script_executor import ScriptExecutor
from promptflow.executor.flow_executor import FlowExecutor
from promptflow.storage import AbstractRunStorage
Expand Down Expand Up @@ -509,6 +510,13 @@ def create_spawned_fork_process_manager(


def _create_executor_fork(*, flow_executor: FlowExecutor, storage: AbstractRunStorage):
if isinstance(flow_executor, PromptyExecutor):
return PromptyExecutor(
flow_file=flow_executor._flow_file,
connections=flow_executor._connections,
working_dir=flow_executor._working_dir,
storage=storage,
)
if isinstance(flow_executor, ScriptExecutor):
return ScriptExecutor(
flow_file=flow_executor._flow_file,
Expand Down
91 changes: 80 additions & 11 deletions src/promptflow-core/promptflow/executor/_script_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import contextlib
import dataclasses
import functools
import importlib
import inspect
import uuid
Expand Down Expand Up @@ -60,6 +62,15 @@ def __init__(
self._message_format = MessageFormatType.BASIC
self._multimedia_processor = BasicMultimediaProcessor()

@contextlib.contextmanager
def _exec_line_context(self, run_id, line_number):
# TODO: refactor NodeLogManager, for script executor, we don't have node concept.
log_manager = NodeLogManager()
# No need to clear node context, log_manger will be cleared after the with block.
log_manager.set_node_context(run_id, "Flex", line_number)
with log_manager, self._update_operation_context(run_id, line_number):
yield

def exec_line(
self,
inputs: Mapping[str, Any],
Expand All @@ -69,20 +80,16 @@ def exec_line(
**kwargs,
) -> LineResult:
run_id = run_id or str(uuid.uuid4())
# TODO: refactor NodeLogManager, for script executor, we don't have node concept.
log_manager = NodeLogManager()
# No need to clear node context, log_manger will be cleared after the with block.
log_manager.set_node_context(run_id, "Flex", index)
with log_manager, self._update_operation_context(run_id, index):
with self._exec_line_context(run_id, index):
return self._exec_line(inputs, index, run_id, allow_generator_output=allow_generator_output)

def _exec_line(
def _exec_line_preprocess(
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
allow_generator_output: bool = False,
) -> LineResult:
):
line_run_id = run_id if index is None else f"{run_id}_{index}"
run_tracker = RunTracker(self._storage)
run_tracker.allow_generator_types = allow_generator_output
Expand All @@ -98,8 +105,22 @@ def _exec_line(
# Executor will add line_number to batch inputs if there is no line_number in the original inputs,
# which should be removed, so, we only preserve the inputs that are contained in self._inputs.
inputs = {k: inputs[k] for k in self._inputs if k in inputs}
output = None
traces = []
return run_info, inputs, run_tracker, None, []

def _exec_line(
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
allow_generator_output: bool = False,
) -> LineResult:
run_info, inputs, run_tracker, output, traces = self._exec_line_preprocess(
inputs,
index,
run_id,
allow_generator_output,
)
line_run_id = run_info.run_id
try:
Tracer.start_tracing(line_run_id)
if self._is_async:
Expand All @@ -118,12 +139,60 @@ def _exec_line(
run_tracker.end_run(line_run_id, ex=e, traces=traces)
finally:
run_tracker.persist_flow_run(run_info)
return self._construct_line_result(output, run_info)

def _construct_line_result(self, output, run_info):
line_result = LineResult(output, {}, run_info, {})
# Return line result with index
if index is not None and isinstance(line_result.output, dict):
line_result.output[LINE_NUMBER_KEY] = index
if run_info.index is not None and isinstance(line_result.output, dict):
line_result.output[LINE_NUMBER_KEY] = run_info.index
return line_result

async def exec_line_async(
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
allow_generator_output: bool = False,
**kwargs,
) -> LineResult:
run_id = run_id or str(uuid.uuid4())
with self._exec_line_context(run_id, index):
return await self._exec_line_async(inputs, index, run_id, allow_generator_output=allow_generator_output)

async def _exec_line_async(
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
allow_generator_output: bool = False,
) -> LineResult:
run_info, inputs, run_tracker, output, traces = self._exec_line_preprocess(
inputs,
index,
run_id,
allow_generator_output,
)
line_run_id = run_info.run_id
try:
Tracer.start_tracing(line_run_id)
if self._is_async:
output = await self._func(**inputs)
else:
partial_func = functools.partial(self._func, **inputs)
output = await asyncio.get_event_loop().run_in_executor(None, partial_func)
output = self._stringify_generator_output(output) if not allow_generator_output else output
traces = Tracer.end_tracing(line_run_id)
output_dict = convert_eager_flow_output_to_dict(output)
run_tracker.end_run(line_run_id, result=output_dict, traces=traces)
except Exception as e:
if not traces:
traces = Tracer.end_tracing(line_run_id)
run_tracker.end_run(line_run_id, ex=e, traces=traces)
finally:
run_tracker.persist_flow_run(run_info)
return self._construct_line_result(output, run_info)

def _stringify_generator_output(self, output):
if isinstance(output, dict):
return super()._stringify_generator_output(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ def test_build_azure_openai_connection_from_rest_object(self):
}
build_from_data_and_assert(data, expected)

def test_build_legacy_openai_connection_from_rest_object(self):
# Legacy OpenAI connection with type in metadata
# Test this not convert to CustomConnection
data = {
"id": "mock_id",
"name": "legacy_open_ai",
"type": "Microsoft.MachineLearningServices/workspaces/connections",
"properties": {
"authType": "CustomKeys",
"credentials": {"keys": {"api_key": "***"}},
"category": "CustomKeys",
"target": "<api-base>",
"metadata": {
"azureml.flow.connection_type": "OpenAI",
"azureml.flow.module": "promptflow.connections",
"organization": "mock",
},
},
}
expected = {
"type": "OpenAIConnection",
"module": "promptflow.connections",
"name": "legacy_open_ai",
"value": {"api_key": "***", "organization": "mock"},
}
build_from_data_and_assert(data, expected)

def test_build_strong_type_openai_connection_from_rest_object(self):
data = {
"id": "mock_id",
Expand Down Expand Up @@ -199,6 +226,31 @@ def test_build_custom_keys_connection_from_rest_object(self):
}
build_from_data_and_assert(data, expected)

def test_build_strong_type_custom_connection_from_rest_object(self):
# Test on CustomKeys type without meta
data = {
"id": "mock_id",
"name": "custom_connection",
"type": "Microsoft.MachineLearningServices/workspaces/connections",
"properties": {
"authType": "CustomKeys",
"credentials": {"keys": {"my_key1": "***", "my_key2": "***"}},
"category": "CustomKeys",
"target": "<api-base>",
"metadata": {
"general_key": "general_value",
},
},
}
expected = {
"type": "CustomConnection",
"module": "promptflow.connections",
"name": "custom_connection",
"value": {"my_key1": "***", "my_key2": "***", "general_key": "general_value"},
"secret_keys": ["my_key1", "my_key2"],
}
build_from_data_and_assert(data, expected)

def test_build_cognitive_search_connection_from_rest_object(self):
# Test on ApiKey type with CognitiveSearch category
data = {
Expand Down
33 changes: 29 additions & 4 deletions src/promptflow-core/tests/core/e2etests/test_eager_flow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from dataclasses import is_dataclass

import pytest
Expand Down Expand Up @@ -25,6 +26,18 @@ def func_entry(input_str: str) -> str:
return "Hello " + input_str


async def func_entry_async(input_str: str) -> str:
await asyncio.sleep(1)
return "Hello " + input_str


function_entries = [
(ClassEntry(), {"input_str": "world"}, "Hello world"),
(func_entry, {"input_str": "world"}, "Hello world"),
(func_entry_async, {"input_str": "world"}, "Hello world"),
]


@pytest.mark.e2etest
class TestEagerFlow:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -64,16 +77,28 @@ def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs):
line_result2 = executor.exec_line(inputs=inputs, index=0)
assert line_result1.output == line_result2.output

@pytest.mark.parametrize(
"entry, inputs, expected_output",
[(ClassEntry(), {"input_str": "world"}, "Hello world"), (func_entry, {"input_str": "world"}, "Hello world")],
)
@pytest.mark.parametrize("entry, inputs, expected_output", function_entries)
def test_flow_run_with_function_entry(self, entry, inputs, expected_output):
executor = FlowExecutor.create(entry, {})
line_result = executor.exec_line(inputs=inputs)
assert line_result.run_info.status == Status.Completed
assert line_result.output == expected_output

@pytest.mark.asyncio
@pytest.mark.parametrize("entry, inputs, expected_output", function_entries)
async def test_flow_run_with_function_entry_async(self, entry, inputs, expected_output):
executor = FlowExecutor.create(entry, {})
task1 = asyncio.create_task(executor.exec_line_async(inputs=inputs))
task2 = asyncio.create_task(executor.exec_line_async(inputs=inputs))
line_result1, line_result2 = await asyncio.gather(task1, task2)
for line_result in [line_result1, line_result2]:
assert line_result.run_info.status == Status.Completed
assert line_result.output == expected_output
delta_sec = (line_result2.run_info.end_time - line_result1.run_info.end_time).total_seconds()
delta_desc = f"{delta_sec}s from {line_result1.run_info.end_time} to {line_result2.run_info.end_time}"
msg = f"The two tasks should run concurrently, but got {delta_desc}"
assert 0 <= delta_sec < 0.1, msg

def test_flow_run_with_invalid_case(self):
flow_folder = "dummy_flow_with_exception"
flow_file = get_yaml_file(flow_folder, root=EAGER_FLOW_ROOT)
Expand Down
Loading

0 comments on commit b58e70b

Please sign in to comment.