Skip to content

Commit

Permalink
feat: remove load_as_component
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotzh committed Oct 8, 2023
1 parent 3170e5c commit 1dae18b
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 471 deletions.
60 changes: 1 addition & 59 deletions src/promptflow/promptflow/azure/_pf_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from os import PathLike
from pathlib import Path
from typing import IO, Any, AnyStr, Dict, List, Optional, Union
from typing import Dict, List, Optional, Union

from azure.ai.ml import MLClient
from azure.core.credentials import TokenCredential
Expand All @@ -14,7 +14,6 @@
from promptflow._sdk._errors import RunOperationParameterError
from promptflow._sdk._user_agent import USER_AGENT
from promptflow._sdk.entities import Run
from promptflow.azure._load_functions import load_flow
from promptflow.azure._restclient.service_caller_factory import _FlowServiceCallerFactory
from promptflow.azure._utils.gerneral import is_remote_uri
from promptflow.azure.operations import RunOperations
Expand Down Expand Up @@ -300,63 +299,6 @@ def visualize(self, runs: Union[List[str], List[Run]]) -> None:
"""
self.runs.visualize(runs)

def load_as_component(
self,
source: Union[str, PathLike, IO[AnyStr]],
*,
component_type: str,
columns_mapping: Dict[str, Union[str, float, int, bool]] = None,
variant: str = None,
environment_variables: Dict[str, Any] = None,
is_deterministic: bool = True,
**kwargs,
) -> "Component":
"""
Load a flow as a component.
:param source: Source of the flow. Should be a path to a flow dag yaml file or a flow directory.
:type source: Union[str, PathLike, IO[AnyStr]]
:param component_type: Type of the loaded component, support parallel only for now.
:type component_type: str
:param variant: Node variant used for the flow.
:type variant: str
:param environment_variables: Environment variables to set for the flow.
:type environment_variables: dict
:param columns_mapping: Inputs mapping for the flow.
:type columns_mapping: dict
:param is_deterministic: Whether the loaded component is deterministic.
:type is_deterministic: bool
"""
name = kwargs.pop("name", None)
version = kwargs.pop("version", None)
description = kwargs.pop("description", None)
display_name = kwargs.pop("display_name", None)
tags = kwargs.pop("tags", None)

flow = load_flow(
source=source,
relative_origin=kwargs.pop("relative_origin", None),
**kwargs,
)

if component_type != "parallel":
raise NotImplementedError(f"Component type {component_type} is not supported yet.")

# TODO: confirm if we should keep flow operations
component = self._flows.load_as_component(
flow=flow,
columns_mapping=columns_mapping,
variant=variant,
environment_variables=environment_variables,
name=name,
version=version,
description=description,
is_deterministic=is_deterministic,
display_name=display_name,
tags=tags,
)
return component

def _add_user_agent(self, kwargs) -> None:
user_agent = kwargs.pop("user_agent", None)
user_agent = f"{user_agent} {USER_AGENT}" if user_agent else USER_AGENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, Tuple, TypeVar, Union
from typing import Dict, Optional, TypeVar, Union

from azure.ai.ml._artifacts._blob_storage_helper import BlobStorageClient
from azure.ai.ml._artifacts._gen2_storage_helper import Gen2StorageClient
Expand All @@ -34,7 +34,6 @@
get_artifact_path_from_storage_url,
get_storage_client,
)
from azure.ai.ml._utils.utils import is_mlflow_uri, is_url
from azure.ai.ml.constants._common import SHORT_URI_FORMAT, STORAGE_ACCOUNT_URLS
from azure.ai.ml.entities import Environment
from azure.ai.ml.entities._assets._artifacts.artifact import Artifact, ArtifactStorageInfo
Expand Down Expand Up @@ -357,56 +356,6 @@ def _update_gen2_metadata(name, version, indicator_file, storage_client) -> None
T = TypeVar("T", bound=Artifact)


def _check_and_upload_path(
artifact: T,
asset_operations: Union["DataOperations", "ModelOperations", "CodeOperations", "FeatureSetOperations"],
artifact_type: str,
datastore_name: Optional[str] = None,
sas_uri: Optional[str] = None,
show_progress: bool = True,
) -> Tuple[T, str]:
"""Checks whether `artifact` is a path or a uri and uploads it to the datastore if necessary.
param T artifact: artifact to check and upload param
Union["DataOperations", "ModelOperations", "CodeOperations"]
asset_operations: the asset operations to use for uploading
param str datastore_name: the name of the datastore to upload to
param str sas_uri: the sas uri to use for uploading
"""

datastore_name = artifact.datastore
if (
hasattr(artifact, "local_path")
and artifact.local_path is not None
or (
hasattr(artifact, "path")
and artifact.path is not None
and not (is_url(artifact.path) or is_mlflow_uri(artifact.path))
)
):
path = (
Path(artifact.path)
if hasattr(artifact, "path") and artifact.path is not None
else Path(artifact.local_path)
)
if not path.is_absolute():
path = Path(artifact.base_path, path).resolve()
uploaded_artifact = _upload_to_datastore(
asset_operations._operation_scope,
asset_operations._datastore_operation,
path,
datastore_name=datastore_name,
asset_name=artifact.name,
asset_version=str(artifact.version),
asset_hash=artifact._upload_hash if hasattr(artifact, "_upload_hash") else None,
sas_uri=sas_uri,
artifact_type=artifact_type,
show_progress=show_progress,
ignore_file=getattr(artifact, "_ignore_file", None),
)
return uploaded_artifact


def _check_and_upload_env_build_context(
environment: Environment,
operations: "EnvironmentOperations",
Expand Down
151 changes: 2 additions & 149 deletions src/promptflow/promptflow/azure/operations/_flow_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

import logging
import os
import re
from pathlib import Path
from typing import Any, Dict
from typing import Dict

from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
from azure.ai.ml._scope_dependent_operations import (
Expand All @@ -16,11 +15,7 @@
OperationScope,
_ScopeDependentOperations,
)
from azure.ai.ml._utils._storage_utils import AzureMLDatastorePathUri
from azure.ai.ml._utils.utils import hash_dict
from azure.ai.ml.constants._common import SHORT_URI_FORMAT, AzureMLResourceType
from azure.ai.ml.operations import ComponentOperations
from azure.ai.ml.operations._code_operations import CodeOperations
from azure.ai.ml.constants._common import SHORT_URI_FORMAT
from azure.ai.ml.operations._operation_orchestrator import OperationOrchestrator
from azure.core.exceptions import HttpResponseError

Expand All @@ -32,12 +27,8 @@
)
from promptflow._sdk._utils import PromptflowIgnoreFile, generate_flow_tools_json
from promptflow._sdk._vendor._asset_utils import traverse_directory
from promptflow.azure._constants._flow import DEFAULT_STORAGE
from promptflow.azure._entities._flow import Flow
from promptflow.azure._ml import Component
from promptflow.azure._restclient.flow.models import FlowRunMode, LoadFlowAsComponentRequest
from promptflow.azure._restclient.flow_service_caller import FlowServiceCaller
from promptflow.azure._utils import is_arm_id
from promptflow.exceptions import SystemErrorException


Expand All @@ -63,10 +54,6 @@ def __init__(
self._service_caller = service_caller
self._credential = credential

@property
def _code_operations(self) -> CodeOperations:
return self._all_operations.get_operation(AzureMLResourceType.CODE, lambda x: isinstance(x, CodeOperations))

def _create_or_update(self, flow, **kwargs):
# upload to file share
self._resolve_arm_id_or_upload_dependencies(flow)
Expand Down Expand Up @@ -102,140 +89,6 @@ def _download(self, source, dest):
# TODO: support download flow
raise NotImplementedError("Not implemented yet")

@classmethod
def _clear_empty_item(cls, obj):
if not isinstance(obj, dict):
return obj
return {k: cls._clear_empty_item(v) for k, v in obj.items() if v is not None}

@classmethod
def _get_component_hash(cls, rest_object):
"""this hash should include all the burn-in information:
- code
- keys of inputs_mapping
- environment_variables, it will be burned into something like component.task.environment_variables?
some other fields will be burned into component but will impact default value of inputs:
- variant
- connections
- values of inputs_mapping
Now we use all of them as hash key.
"""
obj = rest_object.as_dict()

return hash_dict(cls._clear_empty_item(obj))

@classmethod
def _get_name_and_version(cls, *, rest_object, name: str = None, version: str = None):
if name and version:
return name, version
if name or version:
raise ValueError("name and version of the component must be provided together")
# the hash will be impacted by all editable fields, including default value of inputs_mapping
# so components with different default value of columns_mapping can't be reused from each other
return "azureml_anonymous_flow", cls._get_component_hash(rest_object)

def load_as_component(
self,
flow,
name: str = None,
version: str = None,
display_name: str = None,
description: str = None,
tags: Dict[str, str] = None,
variant: str = None,
columns_mapping: Dict[str, str] = None,
environment_variables: Dict[str, Any] = None,
connections: Dict[str, Dict[str, str]] = None,
is_deterministic: bool = True,
**kwargs,
) -> Component:
"""Load a flow as a component."""
rest_object = LoadFlowAsComponentRequest(
node_variant=variant,
inputs_mapping=columns_mapping,
environment_variables=environment_variables,
connections=connections,
display_name=display_name,
description=description,
tags=tags,
is_deterministic=is_deterministic,
# hack: MT support this only for now, will remove after MT release new version
run_mode=FlowRunMode.BULK_TEST,
)

if is_arm_id(flow):
rest_object.flow_definition_resource_id = flow.id
else:
# upload to file share
self._resolve_arm_id_or_upload_dependencies(flow)
if flow.path.startswith("azureml://"):
# upload via _check_and_upload_path
# submit with params FlowDefinitionDataStoreName and FlowDefinitionBlobPath
path_uri = AzureMLDatastorePathUri(flow.path)
rest_object.flow_definition_data_store_name = path_uri.datastore
rest_object.flow_definition_blob_path = path_uri.path
else:
# upload via CodeOperations.create_or_update
# submit with param FlowDefinitionDataUri
rest_object.flow_definition_data_uri = flow.path

rest_object.component_name, rest_object.component_version = self._get_name_and_version(
rest_object=rest_object, name=name, version=version
)

component_id = self._service_caller.create_component_from_flow(
subscription_id=self._operation_scope.subscription_id,
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._operation_scope.workspace_name,
body=rest_object,
)
name, version = re.match(r".*/components/(.*)/versions/(.*)", component_id).groups()
return self._all_operations.get_operation(
AzureMLResourceType.COMPONENT,
lambda x: isinstance(x, ComponentOperations),
).get(name, version)

def _resolve_arm_id_or_upload_dependencies_to_file_share(self, flow: Flow) -> None:
ops = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)
# resolve flow's code
self._try_resolve_code_for_flow_to_file_share(flow=flow, ops=ops)

@classmethod
def _try_resolve_code_for_flow_to_file_share(cls, flow: Flow, ops: OperationOrchestrator) -> None:
from ._artifact_utilities import _check_and_upload_path

if flow.path:
if flow.path.startswith("azureml://datastores"):
# remote path
path_uri = AzureMLDatastorePathUri(flow.path)
if path_uri.datastore != DEFAULT_STORAGE:
raise ValueError(f"Only {DEFAULT_STORAGE} is supported as remote storage for now.")
flow.path = path_uri.path
flow._code_uploaded = True
return
else:
raise ValueError("Path is required for flow.")

with flow._build_code() as code:
if code is None:
return
if flow._code_uploaded:
return
code.datastore = DEFAULT_STORAGE
uploaded_code_asset = _check_and_upload_path(
artifact=code,
asset_operations=ops._code_assets,
artifact_type="Code",
show_progress=False,
)
if "remote_path" in uploaded_code_asset:
path = uploaded_code_asset["remote_path"]
elif "remote path" in uploaded_code_asset:
path = uploaded_code_asset["remote path"]
flow.code = path
flow.path = (Path(path) / flow.path).as_posix()
flow._code_uploaded = True

def _resolve_arm_id_or_upload_dependencies(self, flow: Flow, ignore_tools_json=False) -> None:
ops = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)
# resolve flow's code
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"azure-core>=1.26.4,<2.0.0",
"azure-storage-blob>=12.13.0,<13.0.0",
"azure-identity>=1.12.0,<2.0.0",
"azure-ai-ml>=1.9.0,<2.0.0",
"azure-ai-ml>=1.11.0,<2.0.0",
"pyjwt>=2.4.0,<3.0.0", # requirement of control plane SDK
],
},
Expand Down
Loading

0 comments on commit 1dae18b

Please sign in to comment.