Skip to content

Commit b35afcd

Browse files
authored
feat: Added no copy catalog (#229)
1 parent 893ea07 commit b35afcd

File tree

14 files changed

+139
-8
lines changed

14 files changed

+139
-8
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""
2+
You can execute this pipeline by:
3+
4+
python examples/04-catalog/catalog_python.py
5+
"""
6+
7+
from examples.common.functions import check_files_do_not_exist, write_files
8+
from runnable import Catalog, Pipeline, PythonTask, ShellTask
9+
10+
11+
def main():
12+
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"], store_copy=False)
13+
generate_data = PythonTask(
14+
name="generate_data_python",
15+
function=write_files,
16+
catalog=write_catalog,
17+
)
18+
19+
delete_files_command = """
20+
rm df.csv || true && \
21+
rm data_folder/data.txt || true
22+
"""
23+
# delete from local files after generate
24+
# since its local catalog, we delete to show "get from catalog"
25+
delete_local_after_generate = ShellTask(
26+
name="delete_after_generate",
27+
command=delete_files_command,
28+
)
29+
30+
# Since store_copy was set to False, this step should fail
31+
check_files_do_not_exist_task = PythonTask(
32+
name="check_files_do_not_exist",
33+
function=check_files_do_not_exist,
34+
terminate_with_success=True,
35+
)
36+
37+
pipeline = Pipeline(
38+
steps=[
39+
generate_data,
40+
delete_local_after_generate,
41+
check_files_do_not_exist_task,
42+
]
43+
)
44+
_ = pipeline.execute()
45+
46+
return pipeline
47+
48+
49+
if __name__ == "__main__":
50+
main()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from examples.common.functions import write_files
2+
from runnable import Catalog, PythonJob
3+
4+
5+
def main():
6+
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"], store_copy=False)
7+
job = PythonJob(
8+
function=write_files,
9+
catalog=write_catalog,
10+
)
11+
12+
job.execute()
13+
14+
return job
15+
16+
17+
if __name__ == "__main__":
18+
main()

examples/common/functions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import os
23
from pathlib import Path
34
from typing import Dict, List, Union
45

@@ -124,6 +125,12 @@ def read_files():
124125
assert data.strip() == "hello world"
125126

126127

128+
def check_files_do_not_exist():
129+
run_id = os.environ.get("RUNNABLE_RUN_ID")
130+
assert not Path(f".catalog/{run_id}/df.csv").exists()
131+
assert not Path(f".catalog/{run_id}/data_folder/data.txt").exists()
132+
133+
127134
def process_chunk(chunk: int):
128135
"""
129136
An example function that processes a chunk of data.

extensions/catalog/any_path.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ def get(self, name: str) -> List[DataCatalog]:
9595
return data_catalogs
9696

9797
def put(
98-
self, name: str, allow_file_not_found_exc: bool = False
98+
self,
99+
name: str,
100+
allow_file_not_found_exc: bool = False,
101+
store_copy: bool = True,
99102
) -> List[DataCatalog]:
100103
"""
101104
Put the files matching the glob pattern into the catalog.
@@ -154,7 +157,15 @@ def put(
154157
data_catalogs.append(data_catalog)
155158

156159
# TODO: Think about syncing only if the file is changed
157-
self.upload_to_catalog(file)
160+
if store_copy:
161+
logger.debug(
162+
f"Copying file {file} to the catalog location for run_id: {run_id}"
163+
)
164+
self.upload_to_catalog(file)
165+
else:
166+
logger.debug(
167+
f"Not copying file {file} to the catalog location for run_id: {run_id}"
168+
)
158169

159170
if not data_catalogs and not allow_file_not_found_exc:
160171
raise Exception(f"Did not find any files matching {name} in {copy_from}")

extensions/job_executor/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class GenericJobExecutor(BaseJobExecutor):
2929
@property
3030
def _context(self):
3131
assert context.run_context
32+
assert isinstance(context.run_context, context.JobContext)
3233
return context.run_context
3334

3435
def _get_parameters(self) -> Dict[str, JsonParameter]:
@@ -147,7 +148,9 @@ def _sync_catalog(
147148
data_catalogs = []
148149
for name_pattern in catalog_settings:
149150
data_catalog = self._context.catalog.put(
150-
name=name_pattern, allow_file_not_found_exc=allow_file_not_found_exc
151+
name=name_pattern,
152+
allow_file_not_found_exc=allow_file_not_found_exc,
153+
store_copy=self._context.catalog_store_copy,
151154
)
152155

153156
logger.debug(f"Added data catalog: {data_catalog} to job log")

extensions/pipeline_executor/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ def _sync_catalog(
160160

161161
elif stage == "put":
162162
data_catalog = self._context.catalog.put(
163-
name=name_pattern, allow_file_not_found_exc=allow_file_no_found_exc
163+
name=name_pattern,
164+
allow_file_not_found_exc=allow_file_no_found_exc,
165+
store_copy=node_catalog_settings.get("store_copy", True),
164166
)
165167
else:
166168
raise Exception(f"Stage {stage} not supported")

runnable/catalog.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def get(self, name: str) -> List[DataCatalog]:
5757

5858
@abstractmethod
5959
def put(
60-
self, name: str, allow_file_not_found_exc: bool = False
60+
self, name: str, allow_file_not_found_exc: bool = False, store_copy: bool = True
6161
) -> List[DataCatalog]:
6262
"""
6363
Put the file by 'name' from the 'compute_data_folder' in the catalog for the run_id.
@@ -120,7 +120,10 @@ def get(self, name: str) -> List[DataCatalog]:
120120
return []
121121

122122
def put(
123-
self, name: str, allow_file_not_found_exc: bool = False
123+
self,
124+
name: str,
125+
allow_file_not_found_exc: bool = False,
126+
store_copy: bool = True,
124127
) -> List[DataCatalog]:
125128
"""
126129
Does nothing

runnable/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class JobContext(RunnableContext):
475475
default=None,
476476
description="Catalog settings to be used for the job.",
477477
)
478+
catalog_store_copy: bool = Field(default=True, alias="catalog_store_copy")
478479

479480
@computed_field # type: ignore
480481
@cached_property

runnable/nodes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,11 +411,13 @@ def _get_executor_config(self, executor_type) -> str:
411411
return self.overrides.get(executor_type) or ""
412412

413413

414+
# Unfortunately, this is defined in 2 places. Look in SDK
414415
class CatalogStructure(BaseModel):
415416
model_config = ConfigDict(extra="forbid") # Need to forbid
416417

417418
get: List[str] = Field(default_factory=list)
418419
put: List[str] = Field(default_factory=list)
420+
store_copy: bool = Field(default=True, alias="store_copy")
419421

420422

421423
class ExecutableNode(TraversalNode):

runnable/sdk.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class Catalog(BaseModel):
6060
Attributes:
6161
get (List[str]): List of glob patterns to get from central catalog to the compute data folder.
6262
put (List[str]): List of glob patterns to put into central catalog from the compute data folder.
63+
store_copy (bool): Whether to store a copy of the data in the central catalog.
6364
6465
Examples:
6566
>>> from runnable import Catalog
@@ -74,6 +75,7 @@ class Catalog(BaseModel):
7475
# compute_data_folder: str = Field(default="", alias="compute_data_folder")
7576
get: List[str] = Field(default_factory=list, alias="get")
7677
put: List[str] = Field(default_factory=list, alias="put")
78+
store_copy: bool = Field(default=True, alias="store_copy")
7779

7880

7981
class BaseTraversal(ABC, BaseModel):
@@ -845,6 +847,11 @@ def return_catalog_settings(self) -> Optional[List[str]]:
845847
return []
846848
return self.catalog.put
847849

850+
def return_bool_catalog_store_copy(self) -> bool:
851+
if self.catalog is None:
852+
return True
853+
return self.catalog.store_copy
854+
848855
def _is_called_for_definition(self) -> bool:
849856
"""
850857
If the run context is set, we are coming in only to get the pipeline definition.
@@ -888,6 +895,7 @@ def execute(
888895
}
889896

890897
run_context = context.JobContext.model_validate(configurations)
898+
run_context.catalog_store_copy = self.return_bool_catalog_store_copy()
891899

892900
assert isinstance(run_context.job_executor, BaseJobExecutor)
893901

0 commit comments

Comments
 (0)