Skip to content

Commit

Permalink
Merging refactored test_globus_flow.py code, as well as cleanup/linting
Browse files Browse the repository at this point in the history
  • Loading branch information
davramov committed Aug 22, 2024
1 parent 50b0882 commit 5633309
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 33 deletions.
9 changes: 0 additions & 9 deletions orchestration/_tests/test_832.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@
import time

from globus_sdk import TransferData
from pytest import MonkeyPatch

# Mock Secret.load globally at the start of the file
mock_secret = MagicMock()
mock_secret.value = str(uuid4())
with patch('prefect.blocks.system.Secret.load', return_value=mock_secret):
# Import modules that might use Prefect secrets
from ..flows.bl832 import move
from orchestration.flows.bl832.move import process_new_832_file


class MockTransferClient:
Expand Down
22 changes: 13 additions & 9 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from pydantic import BaseModel, ConfigDict, PydanticDeprecatedSince20
import pytest
from pytest_mock import MockFixture
# from unittest.mock import MagicMock, patch, call

from .test_globus import MockTransferClient

Expand Down Expand Up @@ -214,11 +213,15 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):

# Mock the Config832 class inserting into the module being tested
mock_config = MockConfig832()

mock_transfer_to_alcf = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_alcf', return_value=True)
mock_reconstruction_flow = mocker.patch('orchestration.flows.bl832.alcf.alcf_tomopy_reconstruction_flow', return_value=True)
mock_transfer_to_nersc = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_nersc', return_value=True)
mock_schedule_pruning = mocker.patch('orchestration.flows.bl832.alcf.schedule_pruning', return_value=True)

mock_transfer_to_alcf = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_alcf',
return_value=True)
mock_reconstruction_flow = mocker.patch('orchestration.flows.bl832.alcf.alcf_tomopy_reconstruction_flow',
return_value=True)
mock_transfer_to_nersc = mocker.patch('orchestration.flows.bl832.alcf.transfer_data_to_nersc',
return_value=True)
mock_schedule_pruning = mocker.patch('orchestration.flows.bl832.alcf.schedule_pruning',
return_value=True)

alcf_raw_path = f"{folder_name}/{file_name}.h5" if True else None
scratch_path_tiff = f"{folder_name}/rec{file_name}/" if True else None
Expand All @@ -241,8 +244,10 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
folder_name=folder_name, file_name=f"{file_name}.h5")

mock_transfer_to_nersc.assert_has_calls([
mocker.call(scratch_path_tiff, mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch),
mocker.call(scratch_path_zarr, mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch)
mocker.call(scratch_path_tiff,
mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch),
mocker.call(scratch_path_zarr,
mock_config.tc, mock_config.alcf832_scratch, mock_config.nersc832_alsdev_scratch)
])

mock_schedule_pruning.assert_called_once_with(
Expand Down Expand Up @@ -305,4 +310,3 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
mock_schedule_pruning.assert_not_called()
assert isinstance(result, list), "Result should be a list"
assert result == [False, False, False], "Result does not match expected values"

23 changes: 8 additions & 15 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import datetime
from dotenv import load_dotenv
import os
from pathlib import Path
import time

from globus_compute_sdk import Client, Executor
import globus_sdk
from globus_sdk import TransferClient
from prefect import flow, task, get_run_logger
from prefect.blocks.system import JSON, Secret

from orchestration.flows.bl832.config import Config832
from orchestration.globus.flows import get_flows_client, get_specific_flow_client
from orchestration.globus.transfer import GlobusEndpoint, start_transfer
from orchestration.prefect import schedule_prefect_flow
import os
from pathlib import Path
from prefect import flow, task, get_run_logger
from prefect.blocks.system import JSON
# from pydantic import BaseModel
import time
from prefect.blocks.system import Secret


dotenv_file = load_dotenv()
Expand Down Expand Up @@ -279,10 +279,6 @@ def alcf_tomopy_reconstruction_flow(

# Initialize the Globus Compute Client
gcc = Client()
# polaris_endpoint_id = os.getenv("GLOBUS_COMPUTE_ENDPOINT") # COMPUTE endpoint, not TRANSFER endpoint
# reconstruction_func = os.getenv("GLOBUS_RECONSTRUCTION_FUNC")
# source_collection_endpoint = os.getenv("GLOBUS_IRIBETA_CGS_ENDPOINT")
# destination_collection_endpoint = os.getenv("GLOBUS_IRIBETA_CGS_ENDPOINT")

polaris_endpoint_id = Secret.load("globus-compute-endpoint")
if polaris_endpoint_id is None:
Expand All @@ -294,7 +290,7 @@ def alcf_tomopy_reconstruction_flow(
destination_collection_endpoint = Secret.load("globus-iribeta-cgs-endpoint")

# logger.info(f"Using compute_endpoint_id: {polaris_endpoint_id.get()}")
# logger.info(f"Using reconstruction_func: {reconstruction_func.get()}")
logger.info(f"Using reconstruction_func: {reconstruction_func.get()}")
# logger.info(f"Using source_collection_endpoint: {source_collection_endpoint.get()}")

function_inputs = {"rundir": "/eagle/IRIBeta/als/bl832_test/raw",
Expand Down Expand Up @@ -331,10 +327,7 @@ def alcf_tomopy_reconstruction_flow(

collection_ids = [flow_input["input"]["source"]["id"], flow_input["input"]["destination"]["id"]]

# collection_ids = [flow_input.source["id"], flow_input.destination["id"]]

# Flow ID (only generate once!)
# flow_id = os.getenv("GLOBUS_FLOW_ID")
flow_id = Secret.load("globus-flow-id")

logger.info(f"reconstruction_func: {reconstruction_func.get()}")
Expand Down

0 comments on commit 5633309

Please sign in to comment.