Skip to content

Commit

Permalink
[SDK][Local] Support additional includes for eager flow test (#1808)
Browse files Browse the repository at this point in the history
# Description


This pull request includes a wide range of changes to various files in
the `promptflow` project. The most important changes include
modifications to improve code organization, attribute access, and method
signatures in the `_flow` class, as well as the addition of a new
`Notification::Cache` class for interacting with a cache using a
`PStore` object.

Main changes:

*
[`src/promptflow/promptflow/_sdk/entities/_flow.py`](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL138-R164):
Modified the `_flow` class to improve code organization and attribute
access, add a new attribute, and update method signatures.
[[1]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL138-R164)
[[2]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL111-R135)
[[3]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL193-R198)
[[4]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL210-R209)
[[5]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL260-R261)
[[6]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL348-R349)
[[7]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL95-R103)
[[8]](diffhunk://#diff-339752a6e9c04159a62708efed6b385bd91b016a0f0d2edeb32024fd19886a2bL236-R249)
*
[`src/promptflow/promptflow/_sdk/_submitter/utils.py`](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abL159-R164):
Modified the `variant_overwrite_context()` function to handle both
`EagerFlow` and `ProtectedFlow` objects, and modified the
`resolve_used_connections()` function to replace `flow.dag` with
`flow._data`.
[[1]](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abL159-R164)
[[2]](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abR173-R175)
[[3]](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abL218-R222)
[[4]](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abL150-R150)
[[5]](diffhunk://#diff-38c78274ff4a19381a2129bfd86174d225e0d8537b48c80aa8f6feaca25083abL35-R40)
*
[`src/promptflow/promptflow/_sdk/_submitter/run_submitter.py`](diffhunk://#diff-e3499cf713c66612d91723759de1c5076c22689b2ce2c89678f82de624b57c2cL63-R64):
Modified the `_run_bulk()` method to remove the check for
`local_storage.eager_mode` and call `load_flow()` with `flow_obj`
instead of `run.flow`.
*
[`src/promptflow/promptflow/_sdk/operations/_flow_operations.py`](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L15-R15):
Various changes to ensure the correct flow objects and attributes are
used for building and exporting flow connections, and to remove
unnecessary imports and add necessary imports.
[[1]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L15-R15)
[[2]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L412-R412)
[[3]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L444-R444)
[[4]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L453-R453)
[[5]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L428-R428)
[[6]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L32-R32)
[[7]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L587-R587)
[[8]](diffhunk://#diff-afdd40a5d0519512dcf9be48bd46c4caaa2291b808687de77896989af63f47e4L604-R604)

Testing improvements:

*
[`src/promptflow/tests/sdk_cli_test/unittests/test_run.py`](diffhunk://#diff-c3d1c4e4539af1a59525218043dad93dc866b761a70a16a21783e57a7d0adac5R29-R50):
Added a new fixture, modified an existing method, and updated an import
statement.
[[1]](diffhunk://#diff-c3d1c4e4539af1a59525218043dad93dc866b761a70a16a21783e57a7d0adac5R29-R50)
[[2]](diffhunk://#diff-c3d1c4e4539af1a59525218043dad93dc866b761a70a16a21783e57a7d0adac5L14-R19)
[[3]](diffhunk://#diff-c3d1c4e4539af1a59525218043dad93dc866b761a70a16a21783e57a7d0adac5L71-R81)
*
[`src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py`](diffhunk://#diff-94a59a05643476869fa3c6bc45466f1582944a935488075e2e63b6a6a196958fR1248):
Added a TODO comment for future reference and added a new test method to
test flow runs with additional included files.
[[1]](diffhunk://#diff-94a59a05643476869fa3c6bc45466f1582944a935488075e2e63b6a6a196958fR1248)
[[2]](diffhunk://#diff-94a59a05643476869fa3c6bc45466f1582944a935488075e2e63b6a6a196958fR1269-R1278)
*
[`src/promptflow/tests/test_configs/eager_flows/flow_with_additional_includes/flow.py`](diffhunk://#diff-d9a0b6426d2e584a65983c8fa36cfb185397924c6faba503480c82c006243fbeR1-R7):
Added a new entry function `my_flow_entry` to the `flow.py` file.
*
[`src/promptflow/tests/test_configs/eager_flows/flow_with_additional_includes/flow.dag.yaml`](diffhunk://#diff-a1826a5844be209b8e39da157975cc6ea52e41463dd40b44a48a63c491e37ebbR1-R4):
Modified the `flow.dag.yaml` file to update the configuration of the
flow DAG.
*
[`src/promptflow/tests/sdk_cli_test/e2etests/test_flow_test.py`](diffhunk://#diff-acf3c2285ac2a7051d59e3a69abe02224e2d6d5d69e3a0aabd2afd9311f87657R251-R257):
Added a new test method `test_eager_flow_test_with_additional_includes`
to the `test_flow_test.py` file.
*
[`src/promptflow/promptflow/_sdk/_submitter/test_submitter.py`](diffhunk://#diff-d9dd2c09a149b11eb54626edf065287eeb7b1a28e39719b8dd95377770f64138L61-R70):
Modified the `dataplane_flow` method to use a single context manager to
handle both eager flow initialization and DAG flow initialization.
# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
D-W- authored Jan 29, 2024
1 parent 9d415c4 commit 7520482
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 102 deletions.
4 changes: 2 additions & 2 deletions src/promptflow/promptflow/_cli/_pf/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from promptflow._cli._pf._run import exception_handler
from promptflow._cli._utils import _copy_to_flow, activate_action, confirm, inject_sys_path, list_of_dict_to_dict
from promptflow._constants import LANGUAGE_KEY, FlowLanguage
from promptflow._constants import FlowLanguage
from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME, ConnectionProvider
from promptflow._sdk._pf_client import PFClient
from promptflow._sdk.operations._flow_operations import FlowOperations
Expand Down Expand Up @@ -451,7 +451,7 @@ def serve_flow(args):
)
os.environ["PROMPTFLOW_PROJECT_PATH"] = source.absolute().as_posix()
flow = load_flow(args.source)
if flow.dag.get(LANGUAGE_KEY, FlowLanguage.Python) == FlowLanguage.CSharp:
if flow.language == FlowLanguage.CSharp:
serve_flow_csharp(args, source)
else:
serve_flow_python(args, source)
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow/promptflow/_sdk/_serving/flow_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(
self.logger = kwargs.get("logger", LoggerFactory.get_logger("flowinvoker"))
self.flow_entity = flow if isinstance(flow, Flow) else load_flow(source=flow)
self._executable_flow = ExecutableFlow._from_dict(
flow_dag=self.flow_entity.dag, working_dir=self.flow_entity.code
flow_dag=self.flow_entity._data, working_dir=self.flow_entity.code
)
self.connections = connections or {}
self.connections_name_overrides = connections_name_overrides or {}
Expand Down
10 changes: 3 additions & 7 deletions src/promptflow/promptflow/_sdk/_submitter/run_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,9 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):

local_storage = LocalStorageOperations(run, stream=stream, run_mode=RunMode.Batch)
with local_storage.logger:
if local_storage.eager_mode:
flow_obj = load_flow(source=run.flow)
self._submit_bulk_run(flow=flow_obj, run=run, local_storage=local_storage)
else:
# running specified variant
with variant_overwrite_context(run.flow, tuning_node, variant, connections=run.connections) as flow:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
flow_obj = load_flow(source=run.flow)
with variant_overwrite_context(flow_obj, tuning_node, variant, connections=run.connections) as flow:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)

@classmethod
def _validate_inputs(cls, run: Run):
Expand Down
21 changes: 2 additions & 19 deletions src/promptflow/promptflow/_sdk/_submitter/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,16 @@ def dataplane_flow(self):

@contextlib.contextmanager
def init(self):
if isinstance(self.flow, EagerFlow):
flow_content_manager = self._eager_flow_init
else:
flow_content_manager = self._dag_flow_init
with flow_content_manager() as submitter:
yield submitter

@contextlib.contextmanager
def _eager_flow_init(self):
# TODO(2901096): validate invalid configs like variant & connections
# no variant overwrite for eager flow
# no connection overwrite for eager flow
# TODO(2897147): support additional includes
with _change_working_dir(self.flow.code):
self._tuning_node = None
self._node_variant = None
yield self
self._dataplane_flow = None

@contextlib.contextmanager
def _dag_flow_init(self):
if self.flow_context.variant:
tuning_node, node_variant = parse_variant(self.flow_context.variant)
else:
tuning_node, node_variant = None, None

with variant_overwrite_context(
flow_path=self._origin_flow.code,
flow=self.flow,
tuning_node=tuning_node,
variant=node_variant,
connections=self.flow_context.connections,
Expand Down
28 changes: 16 additions & 12 deletions src/promptflow/promptflow/_sdk/_submitter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from promptflow._sdk._errors import InvalidFlowError
from promptflow._sdk._load_functions import load_flow
from promptflow._sdk._utils import (
_get_additional_includes,
_merge_local_code_and_additional_includes,
get_local_connections_from_executable,
get_used_connection_names_from_dict,
update_dict_value_with_connections,
)
from promptflow._sdk.entities._eager_flow import EagerFlow
from promptflow._sdk.entities._flow import Flow, ProtectedFlow
from promptflow._utils.context_utils import _change_working_dir
from promptflow._utils.flow_utils import dump_flow_dag, load_flow_dag
Expand Down Expand Up @@ -150,7 +150,7 @@ def remove_additional_includes(flow_path: Path):

@contextlib.contextmanager
def variant_overwrite_context(
flow_path: Path,
flow: Flow,
tuning_node: str = None,
variant: str = None,
connections: dict = None,
Expand All @@ -159,19 +159,23 @@ def variant_overwrite_context(
drop_node_variants: bool = False,
):
"""Override variant and connections in the flow."""
flow_dag_path, flow_dag = load_flow_dag(flow_path)
flow_dir_path = flow_dag_path.parent
if _get_additional_includes(flow_dag_path):
# Merge the flow folder and additional includes to temp folder.
with _merge_local_code_and_additional_includes(code_path=flow_path) as temp_dir:
# always overwrite variant since we need to overwrite default variant if not specified.
overwrite_variant(flow_dag, tuning_node, variant, drop_node_variants=drop_node_variants)
overwrite_connections(flow_dag, connections, working_dir=flow_dir_path)
overwrite_flow(flow_dag, overrides)
flow_dag = flow._data
flow_dir_path = Path(flow.code)
if flow.additional_includes:
# Merge the flow folder and additional includes to temp folder for both eager flow & dag flow.
with _merge_local_code_and_additional_includes(code_path=flow_dir_path) as temp_dir:
if not isinstance(flow, EagerFlow):
# always overwrite variant since we need to overwrite default variant if not specified.
overwrite_variant(flow_dag, tuning_node, variant, drop_node_variants=drop_node_variants)
overwrite_connections(flow_dag, connections, working_dir=flow_dir_path)
overwrite_flow(flow_dag, overrides)
flow_dag.pop("additional_includes", None)
dump_flow_dag(flow_dag, Path(temp_dir))
flow = load_flow(temp_dir)
yield flow
elif isinstance(flow, EagerFlow):
# eager flow don't support overwrite variant
yield flow
else:
# Generate a flow, the code path points to the original flow folder,
# the dag path points to the temp dag file after overwriting variant.
Expand Down Expand Up @@ -218,7 +222,7 @@ def resolve_used_connections(flow: ProtectedFlow, tools_meta: dict, client, conn
from .._pf_client import PFClient

client = client or PFClient()
connection_names = SubmitterHelper.get_used_connection_names(tools_meta=tools_meta, flow_dag=flow.dag)
connection_names = SubmitterHelper.get_used_connection_names(tools_meta=tools_meta, flow_dag=flow._data)
connections_to_ignore = connections_to_ignore or []
result = {}
for n in connection_names:
Expand Down
19 changes: 14 additions & 5 deletions src/promptflow/promptflow/_sdk/entities/_eager_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,28 @@ class EagerFlow(FlowBase):
def __init__(
self,
path: Union[str, PathLike],
code: Union[str, PathLike],
entry: str,
data: dict,
**kwargs,
):
self.path = Path(path)
self.code = self.path.parent
# flow.dag.yaml file path or entry.py file path
path = Path(path)
# flow.dag.yaml file's folder or entry.py's folder
code = Path(code)
# entry function name
self.entry = entry
self._data = data
super().__init__(**kwargs)
# TODO(2910062): support eager flow execution cache
super().__init__(data=data, path=path, code=code, content_hash=None, **kwargs)

@property
def language(self) -> str:
return self._data.get(LANGUAGE_KEY, FlowLanguage.Python)

@property
def additional_includes(self) -> list:
return self._data.get("additional_includes", [])

@classmethod
def _create_schema_for_validation(cls, context):
# import here to avoid circular import
Expand All @@ -41,6 +49,7 @@ def _create_schema_for_validation(cls, context):
@classmethod
def _load(cls, path: Path, entry: str = None, data: dict = None, **kwargs):
data = data or {}
code = path.parent
# schema validation on unknown fields
if path.suffix in [".yaml", ".yml"]:
data = cls._create_schema_for_validation(context={BASE_PATH_CONTEXT_KEY: path.parent}).load(data)
Expand All @@ -52,4 +61,4 @@ def _load(cls, path: Path, entry: str = None, data: dict = None, **kwargs):

if entry is None:
raise UserErrorException(f"Entry function is not specified for flow {path}")
return cls(path=path, entry=entry, data=data, **kwargs)
return cls(path=path, code=code, entry=entry, data=data, **kwargs)
73 changes: 36 additions & 37 deletions src/promptflow/promptflow/_sdk/entities/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ def __hash__(self):


class FlowBase(abc.ABC):
def __init__(self, **kwargs):
def __init__(self, *, data: dict, code: Path, path: Path, **kwargs):
self._context = FlowContext()
# flow.dag.yaml's content if provided
self._data = data
# working directory of the flow
self._code = Path(code).resolve()
# flow file path, can be script file or flow definition YAML file
self._path = Path(path).resolve()
# hash of flow's entry file, used to skip invoke if entry file is not changed
self._content_hash = kwargs.pop("content_hash", None)
super().__init__(**kwargs)

Expand All @@ -108,9 +115,24 @@ def context(self, val):
self._context = val

@property
@abc.abstractmethod
def code(self) -> Path:
"""Working directory of the flow."""
return self._code

@property
def path(self) -> Path:
"""Flow file path. Can be script file or flow definition YAML file."""
return self._path

@property
def language(self) -> str:
"""Language of the flow."""
return self._data.get(LANGUAGE_KEY, FlowLanguage.Python)

@property
def additional_includes(self) -> list:
"""Additional includes of the flow."""
return self._data.get("additional_includes", [])

@classmethod
# pylint: disable=unused-argument
Expand All @@ -132,37 +154,12 @@ class Flow(FlowBase):
def __init__(
self,
code: Union[str, PathLike],
path: Union[str, PathLike],
dag: dict,
**kwargs,
):
self._code = Path(code)
path = kwargs.pop("path", None)
self._path = Path(path) if path else None
self.variant = kwargs.pop("variant", None) or {}
self.dag = dag
super().__init__(**kwargs)

@property
def code(self) -> Path:
return self._code

@code.setter
def code(self, value: Union[str, PathLike, Path]):
self._code = value

@property
def path(self) -> Path:
flow_file = self._path or self.code / DAG_FILE_NAME
if not flow_file.is_file():
raise UserErrorException(
"The directory does not contain a valid flow.",
target=ErrorTarget.CONTROL_PLANE_SDK,
)
return flow_file

@property
def language(self) -> str:
return self.dag.get(LANGUAGE_KEY, FlowLanguage.Python)
super().__init__(data=dag, code=code, path=path, **kwargs)

@classmethod
def _is_eager_flow(cls, data: dict):
Expand Down Expand Up @@ -190,13 +187,13 @@ def load(
with open(flow_path, "r", encoding=DEFAULT_ENCODING) as f:
flow_content = f.read()
data = load_yaml_string(flow_content)
kwargs["content_hash"] = hash(flow_content)
content_hash = hash(flow_content)
is_eager_flow = cls._is_eager_flow(data)
if is_eager_flow:
return EagerFlow._load(path=flow_path, entry=entry, data=data, **kwargs)
else:
# TODO: schema validation and warning on unknown fields
return ProtectedFlow._load(path=flow_path, dag=data, **kwargs)
return ProtectedFlow._load(path=flow_path, dag=data, content_hash=content_hash, **kwargs)
# if non-YAML file is provided, treat is as eager flow
return EagerFlow._load(path=flow_path, entry=entry, **kwargs)

Expand All @@ -207,7 +204,7 @@ def _init_executable(self, tuning_node=None, variant=None):
# this is a little wired:
# 1. the executable is created from a temp folder when there is additional includes
# 2. after the executable is returned, the temp folder is deleted
with variant_overwrite_context(self.code, tuning_node, variant) as flow:
with variant_overwrite_context(self, tuning_node, variant) as flow:
from promptflow.contracts.flow import Flow as ExecutableFlow

return ExecutableFlow.from_yaml(flow_file=flow.path, working_dir=flow.code)
Expand All @@ -233,19 +230,21 @@ class ProtectedFlow(Flow, SchemaValidatableMixin):

def __init__(
self,
code: str,
path: Path,
code: Path,
dag: dict,
params_override: Optional[Dict] = None,
**kwargs,
):
super().__init__(code=code, **kwargs)
super().__init__(path=path, code=code, dag=dag, **kwargs)

self._flow_dir, self._dag_file_name = self._get_flow_definition(self.code)
self._executable = None
self._params_override = params_override

@classmethod
def _load(cls, path: Path, dag: dict, **kwargs):
return cls(code=path.parent.absolute().as_posix(), dag=dag, **kwargs)
return cls(path=path, code=path.parent, dag=dag, **kwargs)

@property
def flow_dag_path(self) -> Path:
Expand All @@ -257,7 +256,7 @@ def name(self) -> str:

@property
def display_name(self) -> str:
return self.dag.get("display_name", self.name)
return self._data.get("display_name", self.name)

@property
def tools_meta_path(self) -> Path:
Expand Down Expand Up @@ -345,7 +344,7 @@ def invoke(self, inputs: dict) -> "LineResult":
from promptflow._sdk._submitter.test_submitter import TestSubmitterViaProxy
from promptflow._sdk.operations._flow_context_resolver import FlowContextResolver

if self.dag.get(LANGUAGE_KEY, FlowLanguage.Python) == FlowLanguage.CSharp:
if self.language == FlowLanguage.CSharp:
with TestSubmitterViaProxy(flow=self, flow_context=self.context).init() as submitter:
result = submitter.exec_with_inputs(
inputs=inputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def resolve(cls, flow: Flow) -> "FlowInvoker":
"""Resolve flow to flow invoker."""
resolver = cls(flow_path=flow.path)
resolver._resolve(flow_context=flow.context)
return resolver._create_invoker(flow=flow, flow_context=flow.context)
return resolver._create_invoker(flow_context=flow.context)

def _resolve(self, flow_context: FlowContext):
"""Resolve flow context."""
Expand Down Expand Up @@ -101,12 +101,12 @@ def _resolve_connection_objs(self, flow_context: FlowContext):
connections[key] = connection_obj._to_execution_connection_dict()
return connections

def _create_invoker(self, flow: Flow, flow_context: FlowContext) -> "FlowInvoker":
def _create_invoker(self, flow_context: FlowContext) -> "FlowInvoker":
from promptflow._sdk._serving.flow_invoker import FlowInvoker

connections = self._resolve_connection_objs(flow_context=flow_context)
# use updated flow dag to create new flow object for invoker
resolved_flow = Flow(code=self.working_dir, dag=self.flow_dag)
resolved_flow = Flow(code=self.working_dir, path=self.flow_path, dag=self.flow_dag)
invoker = FlowInvoker(
flow=resolved_flow,
connections=connections,
Expand Down
Loading

0 comments on commit 7520482

Please sign in to comment.