Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions examples/04-catalog/catalog_no_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
You can execute this pipeline by:

python examples/04-catalog/catalog_python.py
"""

from examples.common.functions import check_files_do_not_exist, write_files
from runnable import Catalog, Pipeline, PythonTask, ShellTask


def main():
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"], store_copy=False)
generate_data = PythonTask(
name="generate_data_python",
function=write_files,
catalog=write_catalog,
)

delete_files_command = """
rm df.csv || true && \
rm data_folder/data.txt || true
"""
# delete from local files after generate
# since its local catalog, we delete to show "get from catalog"
delete_local_after_generate = ShellTask(
name="delete_after_generate",
command=delete_files_command,
)

# Since store_copy was set to False, this step should fail
check_files_do_not_exist_task = PythonTask(
name="check_files_do_not_exist",
function=check_files_do_not_exist,
terminate_with_success=True,
)

pipeline = Pipeline(
steps=[
generate_data,
delete_local_after_generate,
check_files_do_not_exist_task,
]
)
_ = pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
18 changes: 18 additions & 0 deletions examples/11-jobs/catalog_no_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from examples.common.functions import write_files
from runnable import Catalog, PythonJob


def main():
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"], store_copy=False)
job = PythonJob(
function=write_files,
catalog=write_catalog,
)

job.execute()

return job


if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions examples/common/functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from pathlib import Path
from typing import Dict, List, Union

Expand Down Expand Up @@ -124,6 +125,12 @@ def read_files():
assert data.strip() == "hello world"


def check_files_do_not_exist():
run_id = os.environ.get("RUNNABLE_RUN_ID")
assert not Path(f".catalog/{run_id}/df.csv").exists()
assert not Path(f".catalog/{run_id}/data_folder/data.txt").exists()


def process_chunk(chunk: int):
"""
An example function that processes a chunk of data.
Expand Down
15 changes: 13 additions & 2 deletions extensions/catalog/any_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ def get(self, name: str) -> List[DataCatalog]:
return data_catalogs

def put(
self, name: str, allow_file_not_found_exc: bool = False
self,
name: str,
allow_file_not_found_exc: bool = False,
store_copy: bool = True,
) -> List[DataCatalog]:
"""
Put the files matching the glob pattern into the catalog.
Expand Down Expand Up @@ -154,7 +157,15 @@ def put(
data_catalogs.append(data_catalog)

# TODO: Think about syncing only if the file is changed
self.upload_to_catalog(file)
if store_copy:
logger.debug(
f"Copying file {file} to the catalog location for run_id: {run_id}"
)
self.upload_to_catalog(file)
else:
logger.debug(
f"Not copying file {file} to the catalog location for run_id: {run_id}"
)

if not data_catalogs and not allow_file_not_found_exc:
raise Exception(f"Did not find any files matching {name} in {copy_from}")
Expand Down
5 changes: 4 additions & 1 deletion extensions/job_executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class GenericJobExecutor(BaseJobExecutor):
@property
def _context(self):
assert context.run_context
assert isinstance(context.run_context, context.JobContext)
return context.run_context

def _get_parameters(self) -> Dict[str, JsonParameter]:
Expand Down Expand Up @@ -147,7 +148,9 @@ def _sync_catalog(
data_catalogs = []
for name_pattern in catalog_settings:
data_catalog = self._context.catalog.put(
name=name_pattern, allow_file_not_found_exc=allow_file_not_found_exc
name=name_pattern,
allow_file_not_found_exc=allow_file_not_found_exc,
store_copy=self._context.catalog_store_copy,
)

logger.debug(f"Added data catalog: {data_catalog} to job log")
Expand Down
4 changes: 3 additions & 1 deletion extensions/pipeline_executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def _sync_catalog(

elif stage == "put":
data_catalog = self._context.catalog.put(
name=name_pattern, allow_file_not_found_exc=allow_file_no_found_exc
name=name_pattern,
allow_file_not_found_exc=allow_file_no_found_exc,
store_copy=node_catalog_settings.get("store_copy", True),
)
else:
raise Exception(f"Stage {stage} not supported")
Expand Down
7 changes: 5 additions & 2 deletions runnable/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get(self, name: str) -> List[DataCatalog]:

@abstractmethod
def put(
self, name: str, allow_file_not_found_exc: bool = False
self, name: str, allow_file_not_found_exc: bool = False, store_copy: bool = True
) -> List[DataCatalog]:
"""
Put the file by 'name' from the 'compute_data_folder' in the catalog for the run_id.
Expand Down Expand Up @@ -120,7 +120,10 @@ def get(self, name: str) -> List[DataCatalog]:
return []

def put(
self, name: str, allow_file_not_found_exc: bool = False
self,
name: str,
allow_file_not_found_exc: bool = False,
store_copy: bool = True,
) -> List[DataCatalog]:
"""
Does nothing
Expand Down
1 change: 1 addition & 0 deletions runnable/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class JobContext(RunnableContext):
default=None,
description="Catalog settings to be used for the job.",
)
catalog_store_copy: bool = Field(default=True, alias="catalog_store_copy")

@computed_field # type: ignore
@cached_property
Expand Down
2 changes: 2 additions & 0 deletions runnable/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,13 @@ def _get_executor_config(self, executor_type) -> str:
return self.overrides.get(executor_type) or ""


# Unfortunately, this is defined in 2 places. Look in SDK
class CatalogStructure(BaseModel):
model_config = ConfigDict(extra="forbid") # Need to forbid

get: List[str] = Field(default_factory=list)
put: List[str] = Field(default_factory=list)
store_copy: bool = Field(default=True, alias="store_copy")


class ExecutableNode(TraversalNode):
Expand Down
8 changes: 8 additions & 0 deletions runnable/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Catalog(BaseModel):
Attributes:
get (List[str]): List of glob patterns to get from central catalog to the compute data folder.
put (List[str]): List of glob patterns to put into central catalog from the compute data folder.
store_copy (bool): Whether to store a copy of the data in the central catalog.

Examples:
>>> from runnable import Catalog
Expand All @@ -74,6 +75,7 @@ class Catalog(BaseModel):
# compute_data_folder: str = Field(default="", alias="compute_data_folder")
get: List[str] = Field(default_factory=list, alias="get")
put: List[str] = Field(default_factory=list, alias="put")
store_copy: bool = Field(default=True, alias="store_copy")


class BaseTraversal(ABC, BaseModel):
Expand Down Expand Up @@ -845,6 +847,11 @@ def return_catalog_settings(self) -> Optional[List[str]]:
return []
return self.catalog.put

def return_bool_catalog_store_copy(self) -> bool:
if self.catalog is None:
return True
return self.catalog.store_copy

def _is_called_for_definition(self) -> bool:
"""
If the run context is set, we are coming in only to get the pipeline definition.
Expand Down Expand Up @@ -888,6 +895,7 @@ def execute(
}

run_context = context.JobContext.model_validate(configurations)
run_context.catalog_store_copy = self.return_bool_catalog_store_copy()

assert isinstance(run_context.job_executor, BaseJobExecutor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def test_sync_catalog_put(test_executor, mock_context, mocker):

# Verify catalog.put was called with correct params
mock_context.catalog.put.assert_called_once_with(
name="pattern1", allow_file_not_found_exc=True
name="pattern1", allow_file_not_found_exc=True, store_copy=True
)

# Verify result contains the data catalog
Expand Down
6 changes: 5 additions & 1 deletion tests/runnable/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,5 +345,9 @@ def test_executable_node_parse_config():
assert node.next_node == "next_step"
assert node.max_attempts == 3
assert node.on_failure == "failure_node"
assert node._get_catalog_settings() == {"get": ["input1"], "put": ["output1"]}
assert node._get_catalog_settings() == {
"get": ["input1"],
"put": ["output1"],
"store_copy": True,
}
assert node._get_executor_config("local") == "custom_config"
8 changes: 8 additions & 0 deletions tests/test_job_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ def emulator_context():
partial(conditions.should_job_have_output_parameters, {}),
],
),
(
"11-jobs/catalog_no_copy",
"",
[
partial(conditions.should_have_job_and_status),
partial(conditions.should_job_have_output_parameters, {}),
],
),
(
"11-jobs/passing_parameters_python",
"",
Expand Down
14 changes: 14 additions & 0 deletions tests/test_pipeline_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,20 @@ def argo_context():
),
],
),
(
"04-catalog/catalog_no_copy",
True,
False,
[],
"",
[
partial(conditions.should_have_num_steps, 4),
partial(conditions.should_have_catalog_execution_logs),
partial(conditions.should_be_successful),
partial(conditions.should_step_be_successful, "generate_data_python"),
partial(conditions.should_step_be_successful, "check_files_do_not_exist"),
],
),
(
"04-catalog/catalog_on_fail",
False,
Expand Down