From 563330924cf2c24371644178eda610850b5bfef0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 22 Aug 2024 13:38:56 -0700 Subject: [PATCH] Merging refactored test_globus_flow.py code, as well as cleanup/linting --- orchestration/_tests/test_832.py | 9 --------- orchestration/_tests/test_globus_flow.py | 22 +++++++++++++--------- orchestration/flows/bl832/alcf.py | 23 ++++++++--------------- 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/orchestration/_tests/test_832.py b/orchestration/_tests/test_832.py index ff4df73..a5562cb 100644 --- a/orchestration/_tests/test_832.py +++ b/orchestration/_tests/test_832.py @@ -4,15 +4,6 @@ import time from globus_sdk import TransferData -from pytest import MonkeyPatch - -# Mock Secret.load globally at the start of the file -mock_secret = MagicMock() -mock_secret.value = str(uuid4()) -with patch('prefect.blocks.system.Secret.load', return_value=mock_secret): - # Import modules that might use Prefect secrets - from ..flows.bl832 import move - from orchestration.flows.bl832.move import process_new_832_file class MockTransferClient: diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 06dc32f..d9f5e6b 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -9,7 +9,6 @@ from pydantic import BaseModel, ConfigDict, PydanticDeprecatedSince20 import pytest from pytest_mock import MockFixture -# from unittest.mock import MagicMock, patch, call from .test_globus import MockTransferClient @@ -214,11 +213,15 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture): # Mock the Config832 class inserting into the module being tested mock_config = MockConfig832() - - mock_transfer_to_alcf = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_alcf', return_value=True) - mock_reconstruction_flow = mocker.patch('orchestration.flows.bl832.alcf.alcf_tomopy_reconstruction_flow', return_value=True) - mock_transfer_to_nersc = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_nersc', return_value=True) - mock_schedule_pruning = mocker.patch('orchestration.flows.bl832.alcf.schedule_pruning', return_value=True) + + mock_transfer_to_alcf = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_alcf', + return_value=True) + mock_reconstruction_flow = mocker.patch('orchestration.flows.bl832.alcf.alcf_tomopy_reconstruction_flow', + return_value=True) + mock_transfer_to_nersc = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_nersc', + return_value=True) + mock_schedule_pruning = mocker.patch('orchestration.flows.bl832.alcf.schedule_pruning', + return_value=True) alcf_raw_path = f"{folder_name}/{file_name}.h5" if True else None scratch_path_tiff = f"{folder_name}/rec{file_name}/" if True else None @@ -241,8 +244,10 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture): folder_name=folder_name, file_name=f"{file_name}.h5") mock_transfer_to_nersc.assert_has_calls([ - mocker.call(scratch_path_tiff, mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch), - mocker.call(scratch_path_zarr, mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch) + mocker.call(scratch_path_tiff, + mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch), + mocker.call(scratch_path_zarr, + mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch) ]) mock_schedule_pruning.assert_called_once_with( @@ -305,4 +310,3 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture): mock_schedule_pruning.assert_not_called() assert isinstance(result, list), "Result should be a list" assert result == [False, False, False], "Result does not match expected values" - diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 7a3cc76..76e65ac 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,19 +1,19 @@ import datetime from dotenv import load_dotenv +import os +from pathlib import Path +import time + from globus_compute_sdk import Client, Executor import globus_sdk from globus_sdk import TransferClient +from prefect import flow, task, get_run_logger +from prefect.blocks.system import JSON, Secret + from orchestration.flows.bl832.config import Config832 from orchestration.globus.flows import get_flows_client, get_specific_flow_client from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prefect import schedule_prefect_flow -import os -from pathlib import Path -from prefect import flow, task, get_run_logger -from prefect.blocks.system import JSON -# from pydantic import BaseModel -import time -from prefect.blocks.system import Secret dotenv_file = load_dotenv() @@ -279,10 +279,6 @@ def alcf_tomopy_reconstruction_flow( # Initialize the Globus Compute Client gcc = Client() - # polaris_endpoint_id = os.getenv("GLOBUS_COMPUTE_ENDPOINT") # COMPUTE endpoint, not TRANSFER endpoint - # reconstruction_func = os.getenv("GLOBUS_RECONSTRUCTION_FUNC") - # source_collection_endpoint = os.getenv("GLOBUS_IRIBETA_CGS_ENDPOINT") - # destination_collection_endpoint = os.getenv("GLOBUS_IRIBETA_CGS_ENDPOINT") polaris_endpoint_id = Secret.load("globus-compute-endpoint") if polaris_endpoint_id is None: @@ -294,7 +290,7 @@ def alcf_tomopy_reconstruction_flow( destination_collection_endpoint = Secret.load("globus-iribeta-cgs-endpoint") # logger.info(f"Using compute_endpoint_id: {polaris_endpoint_id.get()}") - # logger.info(f"Using reconstruction_func: {reconstruction_func.get()}") + logger.info(f"Using reconstruction_func: {reconstruction_func.get()}") # logger.info(f"Using source_collection_endpoint: {source_collection_endpoint.get()}") function_inputs = {"rundir": "/eagle/IRIBeta/als/bl832_test/raw", @@ -331,10 +327,7 @@ def alcf_tomopy_reconstruction_flow( collection_ids = [flow_input["input"]["source"]["id"], flow_input["input"]["destination"]["id"]] - # collection_ids = [flow_input.source["id"], flow_input.destination["id"]] - # Flow ID (only generate once!) - # flow_id = os.getenv("GLOBUS_FLOW_ID") flow_id = Secret.load("globus-flow-id") logger.info(f"reconstruction_func: {reconstruction_func.get()}")