Skip to content

Commit a1789bc

Browse files
authored
Migrate the yaml used by the eager flow to flow.flex.yaml, code support both flow.flex.yaml and flow.dag.yaml. (#2548)
# Description Please add an informative description that covers that changes made by the pull request and link all relevant issues. 1. Modify the **yaml** used for all **eager tests** to **flow.flex.yaml.** 2. Modify the **yaml** of the **eager flow** in the code to **flow.flex.yaml**. 3. Add **flow** when saving run to/retrieving run from the **database** # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [x] Pull request includes test coverage for the included changes.
1 parent cc2adc0 commit a1789bc

File tree

50 files changed

+167
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+167
-134
lines changed

src/promptflow-core/promptflow/_constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
FLOW_DAG_YAML = "flow.dag.yaml"
2828
FLOW_FLEX_YAML = "flow.flex.yaml"
2929
PROMPTY_EXTENSION = ".prompty"
30+
FLOW_FILE_SUFFIX = (".yaml", ".yml", PROMPTY_EXTENSION)
3031

3132
CHAT_HISTORY = "chat_history"
3233

src/promptflow-core/promptflow/_utils/flow_utils.py

+26-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
CHAT_HISTORY,
1414
DEFAULT_ENCODING,
1515
FLOW_DAG_YAML,
16+
FLOW_FILE_SUFFIX,
1617
FLOW_FLEX_YAML,
1718
PROMPT_FLOW_DIR_NAME,
1819
PROMPTY_EXTENSION,
@@ -95,13 +96,19 @@ def resolve_flow_path(
9596
f"Please specify a file or remove the extra YAML.",
9697
privacy_info=[str(flow_path)],
9798
)
98-
elif flow_path.is_file() or flow_path.suffix in (".yaml", ".yml"):
99+
elif flow_path.is_file() or flow_path.suffix.lower() in FLOW_FILE_SUFFIX:
99100
flow_folder = flow_path.parent
100101
flow_file = flow_path.name
101102
else: # flow_path doesn't exist
102103
flow_folder = flow_path
103104
flow_file = FLOW_DAG_YAML
104105

106+
file_path = flow_folder / flow_file
107+
if file_path.suffix.lower() not in FLOW_FILE_SUFFIX:
108+
raise UserErrorException(
109+
error_format=f"The flow file suffix must be yaml or yml, " f"and cannot be {file_path.suffix}"
110+
)
111+
105112
if not check_flow_exist:
106113
return flow_folder.resolve().absolute(), flow_file
107114

@@ -111,10 +118,10 @@ def resolve_flow_path(
111118
privacy_info=[flow_path.absolute().as_posix()],
112119
)
113120

114-
if not (flow_folder / flow_file).is_file():
121+
if not file_path.is_file():
115122
raise UserErrorException(
116-
f"Can't find file {flow_file}, " f"in the flow path {flow_folder.absolute().as_posix()}.",
117-
privacy_info=[flow_folder.absolute().as_posix()],
123+
f"Flow file {file_path.absolute().as_posix()} does not exist.",
124+
privacy_info=[file_path.absolute().as_posix()],
118125
)
119126

120127
return flow_folder.resolve().absolute(), flow_file
@@ -141,20 +148,25 @@ def dump_flow_dag(flow_dag: dict, flow_path: Path):
141148

142149

143150
def is_flex_flow(
144-
*, file_path: Union[str, Path, None] = None, yaml_dict: Optional[dict] = None, working_dir: Optional[Path] = None
151+
*,
152+
flow_path: Union[str, Path, PathLike, None] = None,
153+
yaml_dict: Optional[dict] = None,
154+
working_dir: Union[str, Path, PathLike, None] = None,
155+
check_flow_exist=True,
145156
):
146157
"""Check if the flow is a flex flow."""
147-
if file_path is None and yaml_dict is None:
158+
if flow_path is None and yaml_dict is None:
148159
raise UserErrorException("Either file_path or yaml_dict should be provided.")
149-
if file_path is not None and yaml_dict is not None:
160+
if flow_path is not None and yaml_dict is not None:
150161
raise UserErrorException("Only one of file_path and yaml_dict should be provided.")
151-
if file_path is not None:
152-
file_path = Path(file_path)
153-
if working_dir is not None and not file_path.is_absolute():
154-
file_path = working_dir / file_path
155-
if file_path.suffix.lower() not in [".yaml", ".yml"]:
156-
return False
157-
yaml_dict = load_yaml(file_path)
162+
if flow_path is not None:
163+
flow_path, flow_file = resolve_flow_path(flow_path, base_path=working_dir, check_flow_exist=False)
164+
file_path = flow_path / flow_file
165+
if file_path.is_file() and file_path.suffix.lower() in (".yaml", ".yml"):
166+
yaml_dict = load_yaml(file_path)
167+
elif not check_flow_exist:
168+
return flow_file == FLOW_FLEX_YAML
169+
158170
return isinstance(yaml_dict, dict) and "entry" in yaml_dict
159171

160172

src/promptflow-core/promptflow/executor/flow_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ def create(
211211
return ScriptExecutor(flow_file, storage=storage)
212212
if not isinstance(flow_file, (Path, str)):
213213
raise NotImplementedError("Only support Path or str for flow_file.")
214-
if is_flex_flow(file_path=flow_file, working_dir=working_dir):
214+
if is_flex_flow(flow_path=flow_file, working_dir=working_dir):
215215
from ._script_executor import ScriptExecutor
216216

217217
return ScriptExecutor(

src/promptflow-core/tests/conftest.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from pathlib import Path
22

3+
from promptflow._utils.flow_utils import resolve_flow_path
4+
35
TEST_CONFIG_ROOT = Path(__file__).parent.parent.parent / "promptflow" / "tests" / "test_configs"
46
FLOW_ROOT = TEST_CONFIG_ROOT / "flows"
57
EAGER_FLOW_ROOT = TEST_CONFIG_ROOT / "eager_flows"
@@ -10,6 +12,11 @@ def get_flow_folder(folder_name, root: str = FLOW_ROOT) -> Path:
1012
return flow_folder_path
1113

1214

13-
def get_yaml_file(folder_name, root: str = FLOW_ROOT, file_name: str = "flow.dag.yaml") -> Path:
14-
yaml_file = get_flow_folder(folder_name, root) / file_name
15+
def get_yaml_file(folder_name, root: str = FLOW_ROOT, file_name: str = None) -> Path:
16+
if file_name is None:
17+
flow_path, flow_file = resolve_flow_path(get_flow_folder(folder_name, root), check_flow_exist=False)
18+
yaml_file = flow_path / flow_file
19+
else:
20+
yaml_file = get_flow_folder(folder_name, root) / file_name
21+
1522
return yaml_file

src/promptflow-devkit/promptflow/_proxy/_base_executor_proxy.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import httpx
1313

14-
from promptflow._constants import DEFAULT_ENCODING, LINE_TIMEOUT_SEC
14+
from promptflow._constants import DEFAULT_ENCODING, FLOW_FLEX_YAML, LINE_TIMEOUT_SEC
1515
from promptflow._core._errors import NotSupported, UnexpectedError
1616
from promptflow._proxy._errors import ExecutorServiceUnhealthy
1717
from promptflow._sdk._constants import (
@@ -23,7 +23,7 @@
2323
)
2424
from promptflow._utils.async_utils import async_run_allowing_running_loop
2525
from promptflow._utils.exception_utils import ErrorResponse, ExceptionPresenter
26-
from promptflow._utils.flow_utils import is_flex_flow, read_json_content, resolve_flow_path
26+
from promptflow._utils.flow_utils import is_flex_flow, read_json_content
2727
from promptflow._utils.logger_utils import bulk_logger
2828
from promptflow._utils.utils import load_json
2929
from promptflow.contracts.run_info import FlowRunInfo
@@ -52,7 +52,7 @@ def generate_flow_tools_json(
5252
load_in_subprocess: bool = True,
5353
) -> dict:
5454
"""Generate flow.tools.json for the specified flow."""
55-
if is_flex_flow(file_path=flow_file, working_dir=working_dir):
55+
if is_flex_flow(flow_path=flow_file, working_dir=working_dir):
5656
return {}
5757
else:
5858
return cls._generate_flow_tools_json(flow_file, working_dir, dump, timeout, load_in_subprocess)
@@ -93,7 +93,7 @@ def generate_flow_json(
9393
:return: The metadata of the flow.
9494
:rtype: Dict[str, Any]
9595
"""
96-
if is_flex_flow(file_path=flow_file, working_dir=working_dir):
96+
if is_flex_flow(flow_path=flow_file, working_dir=working_dir):
9797
return cls._generate_flow_json(flow_file, working_dir, dump, timeout, load_in_subprocess)
9898
else:
9999
return {}
@@ -239,9 +239,8 @@ def get_inputs_definition(self):
239239
"""Get the inputs definition of an eager flow"""
240240
from promptflow.contracts.flow import FlowInputDefinition
241241

242-
_, flow_file = resolve_flow_path(self.working_dir, check_flow_exist=False)
243242
flow_meta = self.generate_flow_json(
244-
flow_file=self.working_dir / flow_file,
243+
flow_file=self.working_dir / FLOW_FLEX_YAML,
245244
working_dir=self.working_dir,
246245
dump=False,
247246
)

src/promptflow-devkit/promptflow/_proxy/_csharp_executor_proxy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def dump_metadata(cls, flow_file: Path, working_dir: Path) -> NoReturn:
8585
@classmethod
8686
def get_outputs_definition(cls, flow_file: Path, working_dir: Path) -> dict:
8787
# TODO: no outputs definition for eager flow for now
88-
if is_flex_flow(file_path=flow_file, working_dir=working_dir):
88+
if is_flex_flow(flow_path=flow_file, working_dir=working_dir):
8989
return {}
9090

9191
# TODO: get this from self._get_flow_meta for both eager flow and non-eager flow then remove

src/promptflow-devkit/promptflow/_sdk/_constants.py

+4
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,10 @@ class OSType:
489489
LINUX = "Linux"
490490

491491

492+
class RunMode:
493+
EAGER = "Eager"
494+
495+
492496
# Note: Keep these for backward compatibility
493497
CustomStrongTypeConnectionConfigs = CustomStrongTypeConnectionConfigs
494498
ConnectionType = ConnectionType

src/promptflow-devkit/promptflow/_sdk/_service/apis/ui.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def get(self):
168168
flow_path = get_set_flow_yaml(flow, experiment)
169169
flow_path_dir, flow_path_file = resolve_flow_path(flow_path)
170170
flow_info = load_yaml(flow_path_dir / flow_path_file)
171-
if is_flex_flow(file_path=flow_path_dir / flow_path_file):
171+
if is_flex_flow(flow_path=flow_path_dir / flow_path_file):
172172
# call api provided by han to get flow input
173173
flow_input = {}
174174
flow_info.update(flow_input)

src/promptflow-devkit/promptflow/_sdk/_utils.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from keyring.errors import NoKeyringError
3333
from marshmallow import ValidationError
3434

35-
from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_DAG_YAML, FlowLanguage
35+
from promptflow._constants import ENABLE_MULTI_CONTAINER_KEY, EXTENSION_UA, FLOW_FLEX_YAML, FlowLanguage
3636
from promptflow._core.entry_meta_generator import generate_flow_meta as _generate_flow_meta
3737
from promptflow._sdk._constants import (
3838
AZURE_WORKSPACE_REGEX_FORMAT,
@@ -54,6 +54,8 @@
5454
VARIANTS,
5555
AzureMLWorkspaceTriad,
5656
CommonYamlFields,
57+
RunInfoSources,
58+
RunMode,
5759
)
5860
from promptflow._sdk._errors import (
5961
DecryptConnectionError,
@@ -62,7 +64,7 @@
6264
UnsecureConnectionError,
6365
)
6466
from promptflow._sdk._vendor import IgnoreFile, get_ignore_file, get_upload_files_from_folder
65-
from promptflow._utils.flow_utils import resolve_flow_path
67+
from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path
6668
from promptflow._utils.logger_utils import get_cli_sdk_logger
6769
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
6870
from promptflow._utils.yaml_utils import dump_yaml, load_yaml, load_yaml_string
@@ -984,6 +986,7 @@ def generate_yaml_entry(entry: Union[str, PathLike, Callable], code: Path = None
984986
@contextmanager
985987
def create_temp_flex_flow_yaml(entry: Union[str, PathLike, Callable], code: Path = None):
986988
"""Create a temporary flow.dag.yaml in code folder"""
989+
987990
logger.info("Create temporary entry for flex flow.")
988991
if callable(entry):
989992
entry = callable_to_entry_string(entry)
@@ -994,7 +997,7 @@ def create_temp_flex_flow_yaml(entry: Union[str, PathLike, Callable], code: Path
994997
code = Path(code)
995998
if not code.exists():
996999
raise UserErrorException(f"Code path {code.as_posix()} does not exist.")
997-
flow_yaml_path = code / FLOW_DAG_YAML
1000+
flow_yaml_path = code / FLOW_FLEX_YAML
9981001
existing_content = None
9991002

10001003
try:
@@ -1048,6 +1051,22 @@ def callable_to_entry_string(callable_obj: Callable) -> str:
10481051
return f"{module_str}:{func_str}"
10491052

10501053

1054+
def is_flex_run(run: "Run") -> bool:
1055+
if run._run_source == RunInfoSources.LOCAL:
1056+
try:
1057+
# The flow yaml may have been temporarily generated and deleted after creating a run.
1058+
# So check_flow_exist=False.
1059+
return is_flex_flow(flow_path=run.flow, check_flow_exist=False)
1060+
except Exception as e:
1061+
# For run with incomplete flow snapshot, ignore load flow error to make sure it can still show.
1062+
logger.debug(f"Failed to check is flex flow from {run.flow} due to {e}.")
1063+
return False
1064+
elif run._run_source in [RunInfoSources.INDEX_SERVICE, RunInfoSources.RUN_HISTORY]:
1065+
return run._properties.get("azureml.promptflow.run_mode") == RunMode.EAGER
1066+
# TODO(2901279): support eager mode for run created from run folder
1067+
return False
1068+
1069+
10511070
generate_flow_meta = _generate_flow_meta
10521071
# DO NOT remove the following line, it's used by the runtime imports from _sdk/_utils directly
10531072
get_used_connection_names_from_dict = get_used_connection_names_from_dict

src/promptflow-devkit/promptflow/_sdk/entities/_flows/base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pathlib import Path
77
from typing import Union
88

9-
from promptflow._constants import DEFAULT_ENCODING, PROMPTY_EXTENSION
9+
from promptflow._constants import DEFAULT_ENCODING, FLOW_FILE_SUFFIX
1010
from promptflow._sdk.entities._validation import SchemaValidatableMixin
1111
from promptflow._utils.flow_utils import is_flex_flow, is_prompty_flow, resolve_flow_path
1212
from promptflow._utils.yaml_utils import load_yaml_string
@@ -182,7 +182,7 @@ def _load_prepare(cls, source: Union[str, PathLike]):
182182
flow_dir, flow_filename = resolve_flow_path(source)
183183
flow_path = flow_dir / flow_filename
184184

185-
if flow_path.suffix not in [".yaml", ".yml", PROMPTY_EXTENSION]:
185+
if flow_path.suffix not in FLOW_FILE_SUFFIX:
186186
raise UserErrorException("Source must be a directory or a 'flow.dag.yaml' file or a prompty file")
187187
return flow_dir, flow_path
188188

src/promptflow-devkit/promptflow/_sdk/operations/_flow_operations.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import pydash
2121

22-
from promptflow._constants import FLOW_DAG_YAML, LANGUAGE_KEY, PROMPT_FLOW_DIR_NAME, FlowLanguage
22+
from promptflow._constants import FLOW_FLEX_YAML, LANGUAGE_KEY, PROMPT_FLOW_DIR_NAME, FlowLanguage
2323
from promptflow._proxy import ProxyFactory
2424
from promptflow._sdk._configuration import Configuration
2525
from promptflow._sdk._constants import (
@@ -1109,7 +1109,7 @@ def _save(
11091109
if LANGUAGE_KEY in kwargs:
11101110
data[LANGUAGE_KEY] = language
11111111

1112-
target_flow_file = target_flow_directory / FLOW_DAG_YAML
1112+
target_flow_file = target_flow_directory / FLOW_FLEX_YAML
11131113
# schema validation, here target_flow_file doesn't exist actually
11141114
FlexFlow(path=target_flow_file, code=code, data=data, entry=data["entry"])._validate(raise_error=True)
11151115

src/promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py

+5-21
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,19 @@
1414

1515
from filelock import FileLock
1616

17-
from promptflow._constants import FLOW_DAG_YAML, OUTPUT_FILE_NAME, OutputsFolderName
17+
from promptflow._constants import FLOW_DAG_YAML, FLOW_FLEX_YAML, OUTPUT_FILE_NAME, OutputsFolderName
1818
from promptflow._sdk._constants import (
1919
HOME_PROMPT_FLOW_DIR,
2020
LINE_NUMBER,
2121
LOCAL_STORAGE_BATCH_SIZE,
2222
PROMPT_FLOW_DIR_NAME,
2323
LocalStorageFilenames,
24-
RunInfoSources,
2524
)
2625
from promptflow._sdk._errors import BulkRunException, InvalidRunError
27-
from promptflow._sdk._load_functions import load_flow
2826
from promptflow._sdk._utils import (
2927
PromptflowIgnoreFile,
3028
generate_flow_tools_json,
29+
is_flex_run,
3130
json_dump,
3231
json_load,
3332
json_loads_parse_const_as_str,
@@ -36,7 +35,7 @@
3635
write_open,
3736
)
3837
from promptflow._sdk.entities import Run
39-
from promptflow._sdk.entities._flows import FlexFlow, Flow
38+
from promptflow._sdk.entities._flows import Flow
4039
from promptflow._utils.exception_utils import PromptflowExceptionPresenter
4140
from promptflow._utils.flow_utils import is_prompty_flow
4241
from promptflow._utils.logger_utils import LogContext, get_cli_sdk_logger
@@ -201,9 +200,10 @@ def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
201200
run_mode=run_mode,
202201
flow_logs_folder=self.path / LocalStorageFilenames.FLOW_LOGS_FOLDER,
203202
)
203+
self._eager_mode = is_flex_run(run)
204204
# snapshot
205205
self._snapshot_folder_path = prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
206-
self._dag_path = self._snapshot_folder_path / FLOW_DAG_YAML
206+
self._dag_path = self._snapshot_folder_path / (FLOW_FLEX_YAML if self._eager_mode else FLOW_DAG_YAML)
207207
self._flow_tools_json_path = (
208208
self._snapshot_folder_path / PROMPT_FLOW_DIR_NAME / LocalStorageFilenames.FLOW_TOOLS_JSON
209209
)
@@ -229,28 +229,12 @@ def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
229229
self._exception_path = self.path / LocalStorageFilenames.EXCEPTION
230230

231231
self._dump_meta_file()
232-
self._eager_mode = self._calculate_eager_mode(run)
233232
self._is_prompty_flow = is_prompty_flow(run.flow)
234233

235234
@property
236235
def eager_mode(self) -> bool:
237236
return self._eager_mode
238237

239-
@classmethod
240-
def _calculate_eager_mode(cls, run: Run) -> bool:
241-
if run._run_source == RunInfoSources.LOCAL:
242-
try:
243-
flow_obj = load_flow(source=run.flow)
244-
return isinstance(flow_obj, FlexFlow)
245-
except Exception as e:
246-
# For run with incomplete flow snapshot, ignore load flow error to make sure it can still show.
247-
logger.debug(f"Failed to load flow from {run.flow} due to {e}.")
248-
return False
249-
elif run._run_source in [RunInfoSources.INDEX_SERVICE, RunInfoSources.RUN_HISTORY]:
250-
return run._properties.get("azureml.promptflow.run_mode") == "Eager"
251-
# TODO(2901279): support eager mode for run created from run folder
252-
return False
253-
254238
def delete(self) -> None:
255239
def on_rmtree_error(func, path, exc_info):
256240
raise InvalidRunError(f"Failed to delete run {self.path} due to {exc_info[1]}.")

src/promptflow-devkit/promptflow/_sdk/operations/_run_operations.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
FlowRunProperties,
1717
ListViewType,
1818
RunInfoSources,
19+
RunMode,
1920
RunStatus,
2021
)
2122
from promptflow._sdk._errors import InvalidRunStatusError, RunExistsError, RunNotFoundError, RunOperationParameterError
@@ -360,7 +361,7 @@ def _visualize(self, runs: List[Run], html_path: Optional[str] = None) -> None:
360361
metrics=local_storage.load_metrics(parse_const_as_str=True),
361362
dag=local_storage.load_dag_as_string(),
362363
flow_tools_json=local_storage.load_flow_tools_json(),
363-
mode="eager" if local_storage.eager_mode else "",
364+
mode=RunMode.EAGER.lower() if local_storage.eager_mode else "",
364365
)
365366
details.append(copy.deepcopy(detail))
366367
metadatas.append(asdict(metadata))

0 commit comments

Comments
 (0)