Skip to content

Commit

Permalink
Fixed pruning logic to handle data832 raw and scratch file deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
davramov committed Sep 4, 2024
1 parent 155e1fb commit bafc54e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
13 changes: 7 additions & 6 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,13 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
alcf_scratch_path_zarr=scratch_path_zarr,
nersc_scratch_path_tiff=None,
nersc_scratch_path_zarr=None,
data832_scratch_path_tiff=f"scratch/{scratch_path_tiff}",
data832_scratch_path_zarr=f"scratch/{scratch_path_zarr}",
data832_raw_path=alcf_raw_path,
data832_scratch_path_tiff=f"{scratch_path_tiff}",
data832_scratch_path_zarr=f"{scratch_path_zarr}",
one_minute=True
)
assert isinstance(result, list), "Result should be a list"
assert result == [True, True, True, True], "Result does not match expected values"
assert result == [True, True, True, True, True], "Result does not match expected values"
mock_transfer_to_alcf.reset_mock()
mock_reconstruction_flow.reset_mock()
mock_alcf_tiff_to_zarr_flow.reset_mock()
Expand All @@ -293,7 +294,7 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
mock_transfer_to_nersc.assert_not_called()
mock_schedule_pruning.assert_not_called()
assert isinstance(result, list), "Result should be a list"
assert result == [False, False, False, False], "Result does not match expected values"
assert result == [False, False, False, False, False], "Result does not match expected values"

mock_transfer_to_alcf.reset_mock()
mock_reconstruction_flow.reset_mock()
Expand All @@ -314,7 +315,7 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
mock_transfer_to_nersc.assert_not_called()
mock_schedule_pruning.assert_not_called()
assert isinstance(result, list), "Result should be a list"
assert result == [False, False, False, False], "Result does not match expected values"
assert result == [False, False, False, False, False], "Result does not match expected values"

mock_transfer_to_alcf.reset_mock()
mock_reconstruction_flow.reset_mock()
Expand All @@ -335,4 +336,4 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
mock_transfer_to_nersc.assert_not_called()
mock_schedule_pruning.assert_not_called()
assert isinstance(result, list), "Result should be a list"
assert result == [False, False, False, False], "Result does not match expected values"
assert result == [False, False, False, False, False], "Result does not match expected values"
48 changes: 30 additions & 18 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def schedule_pruning(
alcf_scratch_path_zarr: str = None,
nersc_scratch_path_tiff: str = None,
nersc_scratch_path_zarr: str = None,
data832_raw_path: str = None,
data832_scratch_path_tiff: str = None,
data832_scratch_path_zarr: str = None,
one_minute: bool = False) -> bool:
Expand Down Expand Up @@ -246,6 +247,7 @@ def schedule_pruning(
(alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay),
(nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay),
(nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay),
(data832_raw_path, "data832_raw", data832_delay),
(data832_scratch_path_tiff, "data832_scratch", data832_delay),
(data832_scratch_path_zarr, "data832_scratch", data832_delay)
]
Expand Down Expand Up @@ -582,41 +584,48 @@ def process_new_832_ALCF_flow(folder_name: str,
logger.info("Tiff to Zarr Successful.")

# Step 3: Send reconstructed data (tiffs and zarr) to data832
# Send reconstructed data (tiff) to data832
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_tiff}")
data832_transfer_success = transfer_data_to_data832(scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_transfer_success:
data832_tiff_transfer_success = transfer_data_to_data832(scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_tiff_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Send reconstructed data (zarr) to data832
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_zarr}")
data832_transfer_success = transfer_data_to_data832(scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_transfer_success:
data832_zarr_transfer_success = transfer_data_to_data832(scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_zarr_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Step 4: Schedule deletion of files from ALCF, NERSC, and data832
logger.info("Scheduling deletion of files from ALCF, NERSC, and data832")
# alcf_transfer_success = True
nersc_transfer_success = False
# alcf_reconstruction_success = True
# alcf_tiff_to_zarr_success = True
# data832_tiff_transfer_success = True
# data832_zarr_transfer_success = True

schedule_pruning(
alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None,
alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_reconstruction_success else None,
alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_tiff_to_zarr_success else None,
nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None,
nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None,
data832_scratch_path_tiff=f"scratch/{scratch_path_tiff}" if data832_transfer_success else None,
data832_scratch_path_zarr=f"scratch/{scratch_path_zarr}" if data832_transfer_success else None,
data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None,
data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None,
one_minute=True # Set to False for production durations
)

Expand All @@ -627,17 +636,20 @@ def process_new_832_ALCF_flow(folder_name: str,
f"alcf_reconstruction_success: {alcf_reconstruction_success}, "
f"alcf_tiff_to_zarr_success: {alcf_tiff_to_zarr_success}, "
# f"nersc_transfer_success: {nersc_transfer_success}"
f"data832_transfer_success: {data832_transfer_success}"
f"data832_tiff_transfer_success: {data832_tiff_transfer_success}, "
f"data832_zarr_transfer_success: {data832_zarr_transfer_success}"

)

return [alcf_transfer_success,
alcf_reconstruction_success,
alcf_tiff_to_zarr_success,
data832_transfer_success]
data832_tiff_transfer_success,
data832_zarr_transfer_success]

else:
logger.info("Export control is enabled or send_to_alcf is set to False. No action taken.")
return [False, False, False, False]
return [False, False, False, False, False]


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def prune_alcf832_raw(relative_path: str):
if_older_than_days=0,
tranfer_client=config.tc,
source_endpoint=config.alcf832_raw,
check_endpoint=config.nersc832_alsdev_raw,
check_endpoint=config.data832_raw,
logger=p_logger,
max_wait_seconds=max_wait_seconds,
)
Expand All @@ -101,7 +101,7 @@ def prune_alcf832_scratch(relative_path: str):
if_older_than_days=0,
tranfer_client=config.tc,
source_endpoint=config.alcf832_scratch,
check_endpoint=config.nersc832_alsdev_scratch,
check_endpoint=config.data832_scratch,
logger=p_logger,
max_wait_seconds=max_wait_seconds,
)
Expand Down

0 comments on commit bafc54e

Please sign in to comment.