Skip to content

Commit

Permalink
Merge branch 'main' into users/yangtongxu/traceReady
Browse files Browse the repository at this point in the history
  • Loading branch information
riddlexu authored Apr 25, 2024
2 parents e49fdf2 + 935176f commit 341ec48
Show file tree
Hide file tree
Showing 24 changed files with 480 additions and 384 deletions.
5 changes: 3 additions & 2 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@
"dcid",
"piezo",
"Piezo",
"cmpop"
"cmpop",
"omap"
],
"flagWords": [
"Prompt Flow"
],
"allowCompoundWords": true
}
}
17 changes: 5 additions & 12 deletions src/promptflow-azure/promptflow/azure/_storage/blob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import threading
import traceback
from typing import Optional, Tuple
from typing import Callable, Tuple

from azure.ai.ml import MLClient
from azure.ai.ml._azure_environments import _get_storage_endpoint_from_metadata
Expand All @@ -25,17 +25,10 @@ def get_datastore_container_client(
subscription_id: str,
resource_group_name: str,
workspace_name: str,
credential: Optional[object] = None,
get_credential: Callable,
) -> Tuple[ContainerClient, str]:
try:
if credential is None:
# in cloud scenario, runtime will pass in credential
# so this is local to cloud only code, happens in prompt flow service
# which should rely on Azure CLI credential only
from azure.identity import AzureCliCredential

credential = AzureCliCredential()

credential = get_credential()
datastore_definition, datastore_credential = _get_default_datastore(
subscription_id, resource_group_name, workspace_name, credential
)
Expand Down Expand Up @@ -68,7 +61,7 @@ def get_datastore_container_client(


def _get_default_datastore(
subscription_id: str, resource_group_name: str, workspace_name: str, credential: Optional[object]
subscription_id: str, resource_group_name: str, workspace_name: str, credential
) -> Tuple[Datastore, str]:

datastore_key = _get_datastore_client_key(subscription_id, resource_group_name, workspace_name)
Expand Down Expand Up @@ -103,7 +96,7 @@ def _get_datastore_client_key(subscription_id: str, resource_group_name: str, wo


def _get_aml_default_datastore(
subscription_id: str, resource_group_name: str, workspace_name: str, credential: Optional[object]
subscription_id: str, resource_group_name: str, workspace_name: str, credential
) -> Tuple[Datastore, str]:

ml_client = MLClient(
Expand Down
14 changes: 4 additions & 10 deletions src/promptflow-azure/promptflow/azure/_storage/cosmosdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import ast
import datetime
import threading
from typing import Optional
from typing import Callable

client_map = {}
_thread_lock = threading.Lock()
Expand All @@ -18,7 +18,7 @@ def get_client(
subscription_id: str,
resource_group_name: str,
workspace_name: str,
credential: Optional[object] = None,
get_credential: Callable,
):
client_key = _get_db_client_key(container_name, subscription_id, resource_group_name, workspace_name)
container_client = _get_client_from_map(client_key)
Expand All @@ -28,13 +28,7 @@ def get_client(
with container_lock:
container_client = _get_client_from_map(client_key)
if container_client is None:
if credential is None:
# in cloud scenario, runtime will pass in credential
# so this is local to cloud only code, happens in prompt flow service
# which should rely on Azure CLI credential only
from azure.identity import AzureCliCredential

credential = AzureCliCredential()
credential = get_credential()
token = _get_resource_token(
container_name, subscription_id, resource_group_name, workspace_name, credential
)
Expand Down Expand Up @@ -77,7 +71,7 @@ def _get_resource_token(
subscription_id: str,
resource_group_name: str,
workspace_name: str,
credential: Optional[object],
credential,
) -> object:
from promptflow.azure import PFClient

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
assert cloud_run.properties["azureml.promptflow.local_to_cloud"] == "true"
assert cloud_run.properties["azureml.promptflow.snapshot_id"]
assert cloud_run.properties[Local2CloudProperties.TOTAL_TOKENS]
assert cloud_run.properties[Local2CloudProperties.EVAL_ARTIFACTS]

# if no description or tags, skip the check, since one could be {} but the other is None
if run.description:
Expand All @@ -74,12 +75,12 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
"mock_set_headers_with_user_aml_token",
"single_worker_thread_pool",
"vcr_recording",
"mock_isinstance_for_mock_datastore",
"mock_get_azure_pf_client",
"mock_trace_destination_to_cloud",
)
class TestFlowRunUpload:
@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_run(
self,
pf: PFClient,
Expand All @@ -103,9 +104,6 @@ def test_upload_run(
Local2CloudTestHelper.check_local_to_cloud_run(pf, run, check_run_details_in_cloud=True)

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_flex_flow_run_with_yaml(self, pf: PFClient, randstr: Callable[[str], str]):
name = randstr("flex_run_name_with_yaml_for_upload")
local_pf = Local2CloudTestHelper.get_local_pf(name)
Expand All @@ -125,9 +123,6 @@ def test_upload_flex_flow_run_with_yaml(self, pf: PFClient, randstr: Callable[[s
Local2CloudTestHelper.check_local_to_cloud_run(pf, run)

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_flex_flow_run_without_yaml(self, pf: PFClient, randstr: Callable[[str], str]):
name = randstr("flex_run_name_without_yaml_for_upload")
local_pf = Local2CloudTestHelper.get_local_pf(name)
Expand All @@ -148,9 +143,6 @@ def test_upload_flex_flow_run_without_yaml(self, pf: PFClient, randstr: Callable
Local2CloudTestHelper.check_local_to_cloud_run(pf, run)

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_prompty_run(self, pf: PFClient, randstr: Callable[[str], str]):
# currently prompty run is skipped for upload, this test should be finished without error
name = randstr("prompty_run_name_for_upload")
Expand All @@ -167,9 +159,6 @@ def test_upload_prompty_run(self, pf: PFClient, randstr: Callable[[str], str]):
Local2CloudTestHelper.check_local_to_cloud_run(pf, run)

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_run_with_customized_run_properties(self, pf: PFClient, randstr: Callable[[str], str]):
name = randstr("batch_run_name_for_upload_with_customized_properties")
local_pf = Local2CloudTestHelper.get_local_pf(name)
Expand Down Expand Up @@ -200,9 +189,6 @@ def test_upload_run_with_customized_run_properties(self, pf: PFClient, randstr:
assert cloud_run.properties[Local2CloudUserProperties.EVAL_ARTIFACTS] == eval_artifacts

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures(
"mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client", "mock_trace_destination_to_cloud"
)
def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):
main_run_name = randstr("main_run_name_for_test_upload_eval_run")
local_pf = Local2CloudTestHelper.get_local_pf(main_run_name)
Expand All @@ -216,8 +202,8 @@ def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):

# run an evaluation run
eval_run_name = randstr("eval_run_name_for_test_upload_eval_run")
local_lpf = Local2CloudTestHelper.get_local_pf(eval_run_name)
eval_run = local_lpf.run(
local_pf = Local2CloudTestHelper.get_local_pf(eval_run_name)
eval_run = local_pf.run(
flow=f"{FLOWS_DIR}/simple_hello_world",
data=f"{DATAS_DIR}/webClassification3.jsonl",
run=main_run_name,
Expand All @@ -229,7 +215,6 @@ def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):
assert eval_run.properties["azureml.promptflow.variant_run_id"] == main_run_name

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
@pytest.mark.usefixtures("mock_isinstance_for_mock_datastore", "mock_get_azure_pf_client")
def test_upload_flex_flow_run_with_global_azureml(self, pf: PFClient, randstr: Callable[[str], str]):
with patch("promptflow._sdk._configuration.Configuration.get_config", return_value="azureml"):
name = randstr("flex_run_name_with_global_azureml_for_upload")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sdk_cli_azure_test.conftest import DATAS_DIR, EAGER_FLOWS_DIR, FLOWS_DIR

from promptflow._sdk._errors import RunOperationParameterError, UploadUserError, UserAuthenticationError
from promptflow._sdk._utils import parse_otel_span_status_code
from promptflow._sdk._utils.tracing import _parse_otel_span_status_code
from promptflow._sdk.entities import Run
from promptflow._sdk.operations._run_operations import RunOperations
from promptflow._utils.async_utils import async_run_allowing_running_loop
Expand Down Expand Up @@ -88,7 +88,7 @@ def test_flex_flow_with_imported_func(self, pf: PFClient):
# TODO(3017093): won't support this for now
with pytest.raises(UserErrorException) as e:
pf.run(
flow=parse_otel_span_status_code,
flow=_parse_otel_span_status_code,
data=f"{DATAS_DIR}/simple_eager_flow_data.jsonl",
# set code folder to avoid snapshot too big
code=f"{EAGER_FLOWS_DIR}/multiple_entries",
Expand Down
5 changes: 5 additions & 0 deletions src/promptflow-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# promptflow-core package

## v1.10.0 (Upcoming)

### Features Added
- Add prompty feature to simplify the development of prompt templates for customers, reach [here](https://microsoft.github.io/promptflow/how-to-guides/develop-a-prompty/index.html) for more details.

### Others
- Add fastapi serving engine support.

## v1.9.0 (2024.04.17)
Expand Down
5 changes: 3 additions & 2 deletions src/promptflow-core/promptflow/_utils/flow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from promptflow._core._errors import MetaFileNotFound, MetaFileReadError
from promptflow._utils.logger_utils import LoggerFactory
from promptflow._utils.utils import strip_quotation
from promptflow._utils.utils import convert_ordered_dict_to_dict, strip_quotation
from promptflow._utils.yaml_utils import dump_yaml, load_yaml
from promptflow.contracts.flow import Flow as ExecutableFlow
from promptflow.exceptions import ErrorTarget, UserErrorException, ValidationException
Expand Down Expand Up @@ -157,7 +157,8 @@ def dump_flow_dag(flow_dag: dict, flow_path: Path):
flow_dir, flow_filename = resolve_flow_path(flow_path, check_flow_exist=False)
flow_path = flow_dir / flow_filename
with open(flow_path, "w", encoding=DEFAULT_ENCODING) as f:
dump_yaml(flow_dag, f)
# directly dumping ordered dict will bring !!omap tag in yaml
dump_yaml(convert_ordered_dict_to_dict(flow_dag, remove_empty=False), f)
return flow_path


Expand Down
45 changes: 45 additions & 0 deletions src/promptflow-core/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,48 @@ def strip_quotation(value):
return value[1:-1]
else:
return value


def is_empty_target(obj: Optional[Dict]) -> bool:
"""Determines if it's empty target
:param obj: The object to check
:type obj: Optional[Dict]
:return: True if obj is None or an empty Dict
:rtype: bool
"""
return (
obj is None
# some objs have overloaded "==" and will cause error. e.g CommandComponent obj
or (isinstance(obj, dict) and len(obj) == 0)
)


def convert_ordered_dict_to_dict(target_object: Union[Dict, List], remove_empty: bool = True) -> Union[Dict, List]:
"""Convert ordered dict to dict. Remove keys with None value.
This is a workaround for rest request must be in dict instead of
ordered dict.
:param target_object: The object to convert
:type target_object: Union[Dict, List]
:param remove_empty: Whether to omit values that are None or empty dictionaries. Defaults to True.
:type remove_empty: bool
:return: Converted ordered dict with removed None values
:rtype: Union[Dict, List]
"""
# OrderedDict can appear nested in a list
if isinstance(target_object, list):
new_list = []
for item in target_object:
item = convert_ordered_dict_to_dict(item, remove_empty=remove_empty)
if not is_empty_target(item) or not remove_empty:
new_list.append(item)
return new_list
if isinstance(target_object, dict):
new_dict = {}
for key, value in target_object.items():
value = convert_ordered_dict_to_dict(value, remove_empty=remove_empty)
if not is_empty_target(value) or not remove_empty:
new_dict[key] = value
return new_dict
return target_object
2 changes: 2 additions & 0 deletions src/promptflow-devkit/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- The `pf config set <key=value>` support set the folder where the config is saved by `--path config_folder` parameter,
and the config will take effect when **os.getcwd** is a subdirectory of the specified folder.
- Local serving container support using fastapi engine and tuning worker/thread num via environment variables, reach [here](https://microsoft.github.io/promptflow/how-to-guides/deploy-a-flow/deploy-using-docker.html) for more details.
- Prompty supports to flow test and batch run, reach [here](https://microsoft.github.io/promptflow/how-to-guides/develop-a-prompty/index.html#testing-prompty) for more details.


## v1.9.0 (2024.04.17)

Expand Down
2 changes: 1 addition & 1 deletion src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,13 +483,13 @@ class Local2CloudProperties:
"""Run properties that server needs when uploading local run to cloud."""

TOTAL_TOKENS = "azureml.promptflow.total_tokens"
EVAL_ARTIFACTS = "_azureml.evaluate_artifacts"


class Local2CloudUserProperties:
"""Run properties that user can specify when uploading local run to cloud."""

RUN_TYPE = "runType"
EVAL_ARTIFACTS = "_azureml.evaluate_artifacts"

@staticmethod
def get_all_values():
Expand Down
37 changes: 29 additions & 8 deletions src/promptflow-devkit/promptflow/_sdk/_service/apis/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from flask import request
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest

from promptflow._sdk._tracing import process_otlp_trace_request
from promptflow._sdk._errors import MissingAzurePackage
from promptflow._sdk._tracing import _is_azure_ext_installed, process_otlp_trace_request


def trace_collector(
Expand Down Expand Up @@ -41,13 +42,33 @@ def trace_collector(
if "application/x-protobuf" in content_type:
trace_request = ExportTraceServiceRequest()
trace_request.ParseFromString(request.data)
process_otlp_trace_request(
trace_request=trace_request,
get_created_by_info_with_cache=get_created_by_info_with_cache,
logger=logger,
cloud_trace_only=cloud_trace_only,
credential=credential,
)
# this function will be called in some old runtime versions
# where runtime will pass either credential object, or the function to get credential
# as we need to be compatible with this, need to handle both cases
if credential is not None:
# local prompt flow service will not pass credential, so this is runtime scenario
get_credential = credential if callable(credential) else lambda: credential # noqa: F841
process_otlp_trace_request(
trace_request=trace_request,
get_created_by_info_with_cache=get_created_by_info_with_cache,
logger=logger,
get_credential=get_credential,
cloud_trace_only=cloud_trace_only,
)
else:
# if `promptflow-azure` is not installed, pass an exception class to the function
get_credential = MissingAzurePackage
if _is_azure_ext_installed():
from azure.identity import AzureCliCredential

get_credential = AzureCliCredential
process_otlp_trace_request(
trace_request=trace_request,
get_created_by_info_with_cache=get_created_by_info_with_cache,
logger=logger,
get_credential=get_credential,
cloud_trace_only=cloud_trace_only,
)
return "Traces received", 200

# JSON protobuf encoding
Expand Down
Loading

0 comments on commit 341ec48

Please sign in to comment.