From 80a91436fe0f6121a0169d25696a0c9fe3eb881b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Aug 2024 15:53:18 -0700 Subject: [PATCH] Cleaning up orchestration/scripts, linting, rename/refactor check_globus_compute.py, check_globus_transfer.py, and globus_tomopy_flow_init.py --- examples/alcf_compute_endpoint_test.py | 49 ---- examples/test_cc_auth.py | 151 ----------- .../scripts}/Tomopy_for_ALS.ipynb | 2 +- .../{flows/bl832 => scripts}/alcf.py | 0 orchestration/scripts/check_globus_compute.py | 85 ++++++ .../scripts/check_globus_compute_status.py | 54 ---- .../scripts/check_globus_transfer.py | 249 ++++++++++++++++++ .../scripts/globus_tomopy_flow_init.py | 97 ++++--- .../scripts/login_to_globus_and_prefect.sh | 18 ++ 9 files changed, 420 insertions(+), 285 deletions(-) delete mode 100644 examples/alcf_compute_endpoint_test.py delete mode 100644 examples/test_cc_auth.py rename {examples => orchestration/scripts}/Tomopy_for_ALS.ipynb (99%) rename orchestration/{flows/bl832 => scripts}/alcf.py (100%) create mode 100644 orchestration/scripts/check_globus_compute.py delete mode 100644 orchestration/scripts/check_globus_compute_status.py create mode 100644 orchestration/scripts/check_globus_transfer.py create mode 100755 orchestration/scripts/login_to_globus_and_prefect.sh diff --git a/examples/alcf_compute_endpoint_test.py b/examples/alcf_compute_endpoint_test.py deleted file mode 100644 index c8d3d75..0000000 --- a/examples/alcf_compute_endpoint_test.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -import globus_sdk -import logging -from globus_compute_sdk import Client as ComputeClient -from dotenv import load_dotenv - -# Set up detailed logging -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) - -# Load environment variables from .env file -load_dotenv() - -# Load environment variables -CLIENT_ID = os.getenv('GLOBUS_CLIENT_ID') -CLIENT_SECRET = os.getenv('GLOBUS_CLIENT_SECRET') -COMPUTE_ENDPOINT_ID = os.getenv('GLOBUS_COMPUTE_ENDPOINT') - -# Ensure that CLIENT_ID and CLIENT_SECRET are loaded -if not CLIENT_ID or not CLIENT_SECRET: - logger.error("CLIENT_ID or CLIENT_SECRET environment variables are not set.") - exit(1) -else: - logger.debug(f"CLIENT_ID: {CLIENT_ID}") - -# Define the necessary scopes for Globus Compute -SCOPES = "https://auth.globus.org/scopes/compute.api.globus.org/all" - -# Initialize the ConfidentialAppAuthClient -auth_client = globus_sdk.ConfidentialAppAuthClient(CLIENT_ID, CLIENT_SECRET) - -try: - # Request a token with the required scopes - token_response = auth_client.oauth2_client_credentials_tokens(requested_scopes=SCOPES) - compute_tokens = token_response.by_resource_server['compute.api.globus.org'] - access_token = compute_tokens['access_token'] - - # Initialize the ComputeClient with the token - compute_client = ComputeClient(authorizer=globus_sdk.AccessTokenAuthorizer(access_token)) - - # Test the Compute endpoint by listing tasks or any other available method - try: - # Assuming you want to list tasks or similar action - tasks = compute_client.get_tasks() - logger.info(f"Compute tasks: {tasks}") - except globus_sdk.GlobusAPIError as e: - logger.error(f"Failed to retrieve tasks: {e}") -except globus_sdk.GlobusAPIError as e: - logger.error(f"Failed to obtain tokens: {e}") diff --git a/examples/test_cc_auth.py b/examples/test_cc_auth.py deleted file mode 100644 index b71136a..0000000 --- a/examples/test_cc_auth.py +++ /dev/null @@ -1,151 +0,0 @@ -from dotenv import load_dotenv -import globus_sdk -import os -import time -from prefect import flow, task, get_run_logger - -# Load environment variables -load_dotenv() - -# Set the client ID and fetch client secret from environment -CLIENT_ID = os.getenv('GLOBUS_CLIENT_ID') -CLIENT_SECRET = os.getenv('GLOBUS_CLIENT_SECRET') -SCOPES = "urn:globus:auth:scope:transfer.api.globus.org:all" -ENDPOINT_ID = "UUID" - - -def initialize_transfer_client(): - confidential_client = globus_sdk.ConfidentialAppAuthClient(client_id=CLIENT_ID, client_secret=CLIENT_SECRET) - cc_authorizer = globus_sdk.ClientCredentialsAuthorizer(confidential_client, SCOPES) - return globus_sdk.TransferClient(authorizer=cc_authorizer) - - -@task -def check_permissions(transfer_client, endpoint_id): - logger = get_run_logger() - try: - endpoint = transfer_client.get_endpoint(endpoint_id) - logger.info(f"Endpoint ID: {endpoint['id']}") - logger.info(f"Endpoint display name: {endpoint['display_name']}") - logger.info(f"Endpoint owner: {endpoint['owner_string']}") - # Print other relevant information about the endpoint - except globus_sdk.GlobusAPIError as err: - logger.error(f"Error fetching endpoint information: {err.message}") - raise - - -@task -def list_directory(transfer_client, endpoint_id, path): - logger = get_run_logger() - start_time = time.time() - try: - response = transfer_client.operation_ls(endpoint_id, path=path) - logger.info(f"Contents of {path} in endpoint {endpoint_id}:") - if not response: - logger.info(f"No contents found in {path}.") - for item in response: - logger.info(f"{item['type']} - {item['name']}") - except globus_sdk.GlobusAPIError as err: - logger.error(f"Error accessing {path} in endpoint {endpoint_id}: {err.message}") - if err.info.consent_required: - logger.error(f"Got a ConsentRequired error with scopes: {err.info.consent_required.required_scopes}") - elif err.code == "PermissionDenied": - logger.error(f"Permission denied for accessing {path}. Ensure proper permissions are set.") - elif err.http_status == 500: - logger.error(f"Server error when accessing {path} in endpoint {endpoint_id}.") - else: - logger.error(f"An unexpected error occurred: {err}") - finally: - elapsed_time = time.time() - start_time - logger.info(f"list_directory task took {elapsed_time:.2f} seconds") - - -@task -def create_directory(transfer_client, endpoint_id, base_path, directory_name): - logger = get_run_logger() - start_time = time.time() - try: - # Ensure base path is realistic and ends with '/' - if not base_path.endswith('/'): - base_path += '/' - if base_path.startswith('/'): - base_path = base_path.lstrip('/') - - full_path = base_path + directory_name - - # Validate the path - if full_path.startswith('/'): - raise ValueError(f"Invalid directory path: {full_path}") - - # Attempt to create the directory - transfer_client.operation_mkdir(endpoint_id, full_path) - logger.info(f"Successfully created directory {full_path} in endpoint {endpoint_id}.") - except ValueError as ve: - logger.error(f"ValueError: {ve}") - except globus_sdk.GlobusAPIError as err: - logger.error(f"Error creating directory {full_path} in endpoint {endpoint_id}: {err.message}") - if err.info.consent_required: - logger.error(f"Got a ConsentRequired error with scopes: {err.info.consent_required.required_scopes}") - elif err.code == "PermissionDenied": - logger.error(f"Permission denied for creating directory {full_path}. Ensure proper permissions.") - elif err.http_status == 500: - logger.error(f"Server error when creating directory {full_path} in endpoint {endpoint_id}.") - else: - logger.error(f"An unexpected error occurred: {err}") - finally: - elapsed_time = time.time() - start_time - logger.info(f"create_directory task took {elapsed_time:.2f} seconds") - - -@task -def remove_directory(transfer_client, endpoint_id, path): - logger = get_run_logger() - start_time = time.time() - try: - delete_data = globus_sdk.DeleteData(transfer_client, endpoint_id, recursive=True) - delete_data.add_item(path) - transfer_result = transfer_client.submit_delete(delete_data) - logger.info(f"Successfully submitted request to remove directory {path} in endpoint {endpoint_id}.") - logger.info(f"Task ID: {transfer_result['task_id']}") - except globus_sdk.GlobusAPIError as err: - logger.error(f"Error removing directory {path} in endpoint {endpoint_id}: {err.message}") - if err.info.consent_required: - logger.error(f"Got a ConsentRequired error with scopes: {err.info.consent_required.required_scopes}") - elif err.code == "PermissionDenied": - logger.error(f"Permission denied for removing directory {path}. Ensure proper permissions are set.") - elif err.http_status == 500: - logger.error(f"Server error when removing directory {path} in endpoint {endpoint_id}.") - else: - logger.error(f"An unexpected error occurred: {err}") - finally: - elapsed_time = time.time() - start_time - logger.info(f"remove_directory task took {elapsed_time:.2f} seconds") - - -@flow -def main_flow(): - transfer_client = initialize_transfer_client() - endpoint_id = ENDPOINT_ID - base_path = "" - - # Check permissions for the endpoint - check_permissions(transfer_client, endpoint_id) - - # List the contents of the root directory - logger = get_run_logger() - logger.info("Listing / directory:") - list_directory(transfer_client, endpoint_id, base_path) - - # Create a new directory in the root directory - new_directory_name = "test/" - create_directory(transfer_client, endpoint_id, base_path, new_directory_name) - - remove_directory(transfer_client, endpoint_id, "bl832_test/scratch/BLS-00564_dyparkinson/") - - # List the contents again to verify the new directory - logger.info(f"\nListing / directory after creating {new_directory_name}:") - list_directory(transfer_client, endpoint_id, base_path+new_directory_name) - - -if __name__ == "__main__": - main_flow() diff --git a/examples/Tomopy_for_ALS.ipynb b/orchestration/scripts/Tomopy_for_ALS.ipynb similarity index 99% rename from examples/Tomopy_for_ALS.ipynb rename to orchestration/scripts/Tomopy_for_ALS.ipynb index 70ac1fe..8d6e5d1 100644 --- a/examples/Tomopy_for_ALS.ipynb +++ b/orchestration/scripts/Tomopy_for_ALS.ipynb @@ -467,7 +467,7 @@ "import globus_sdk\n", "\n", "# from utils import get_flows_client, get_specific_flow_client\n", - "from ..orchestration.globus.flows import get_flows_client, get_specific_flow_client\n", + "from orchestration.globus.flows import get_flows_client, get_specific_flow_client\n", "\n", "# Tutorial client ID\n", "# We recommend replacing this with your own client for any production use-cases\n", diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/scripts/alcf.py similarity index 100% rename from orchestration/flows/bl832/alcf.py rename to orchestration/scripts/alcf.py diff --git a/orchestration/scripts/check_globus_compute.py b/orchestration/scripts/check_globus_compute.py new file mode 100644 index 0000000..f62e61d --- /dev/null +++ b/orchestration/scripts/check_globus_compute.py @@ -0,0 +1,85 @@ +import argparse +from dotenv import load_dotenv +from typing import Optional + +from globus_compute_sdk.sdk.login_manager import LoginManager +from globus_compute_sdk import Client +from prefect import task, flow, get_run_logger + +load_dotenv() + + +@task +def get_login_manager(environment: Optional[str] = None) -> LoginManager: + """ + Create and return a LoginManager instance for Globus Compute. + + :param environment: Optional environment name for token storage. + :return: LoginManager instance + """ + return LoginManager(environment=environment) + + +@flow(name="check-compute-status") +def check_globus_compute_status(endpoint_id: str) -> bool: + """ + Check the status of a Globus Compute endpoint and determine if it is online. + + :param endpoint_id: UUID of the Globus Compute endpoint. + :return: bool - True if the status is 'online', False otherwise. + """ + logger = get_run_logger() + try: + # Initialize the LoginManager + login_manager = get_login_manager() + + # Ensure the user is logged in + login_manager.ensure_logged_in() + + # Initialize the Globus Compute client with the LoginManager + compute_client = Client(login_manager=login_manager) + + # Check endpoint status + endpoint_status = compute_client.get_endpoint_status(endpoint_id) + + # Log the full status details + logger.info(f"Endpoint {endpoint_id} status: {endpoint_status}") + + # Determine if the status is 'online' + status = endpoint_status.get('status') + if status == 'online': + logger.info(f"Endpoint {endpoint_id} is online.") + return True + else: + logger.info(f"Endpoint {endpoint_id} is not online. Status: {status}") + return False + except Exception as e: + logger.error(f"Failed to check endpoint status: {str(e)}") + return False + + +def main() -> None: + """ + Main function to parse command-line arguments and check the Globus Compute endpoint status. + Example usage" + python check_globus_compute.py --endpoint_id "your-uuid-here" + + IMPORTANT: + Ensure you are logged into Globus Compute + export GLOBUS_COMPUTE_CLIENT_ID="your-client-id" & export GLOBUS_COMPUTE_CLIENT_SECRET="your-client-secret" + :return: None + """ + parser = argparse.ArgumentParser(description="Check the status of a Globus Compute endpoint.") + parser.add_argument('--endpoint_id', type=str, required=True, help="The UUID of the Globus Compute endpoint.") + + args = parser.parse_args() + + online = check_globus_compute_status(args.endpoint_id) + if online: + print(f"Endpoint {args.endpoint_id} is online.") + else: + print(f"Endpoint {args.endpoint_id} is not online.") + + +if __name__ == "__main__": + main() diff --git a/orchestration/scripts/check_globus_compute_status.py b/orchestration/scripts/check_globus_compute_status.py deleted file mode 100644 index eb3346f..0000000 --- a/orchestration/scripts/check_globus_compute_status.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -import os -from dotenv import load_dotenv -from globus_compute_sdk.sdk.login_manager import LoginManager -from globus_compute_sdk import Client - -# Load environment variables from .env file -load_dotenv() - - -def get_login_manager(environment: str = None) -> LoginManager: - """ - Create and return a LoginManager instance for Globus Compute. - - :param environment: Optional environment name for token storage. - :return: LoginManager instance - """ - return LoginManager(environment=environment) - - -def check_endpoint(endpoint_id): - """ - Check the status of a Globus Compute endpoint. - - :param endpoint_id: UUID of the Globus Compute endpoint - :return: None - """ - try: - # Initialize the LoginManager - login_manager = get_login_manager() - - # Ensure the user is logged in - login_manager.ensure_logged_in() - - # Initialize the Globus Compute client with the LoginManager - compute_client = Client(login_manager=login_manager) - - # Check endpoint status - endpoint_status = compute_client.get_endpoint_status(endpoint_id) - print(f"Endpoint {endpoint_id} status: {endpoint_status}") - except Exception as e: - print(f"Failed to check endpoint status: {e}") - - -if __name__ == "__main__": - """ - Check the status of the Globus Compute endpoint specified by the - GLOBUS_COMPUTE_ENDPOINT environment variable. - - IMPORTANT: run this in the terminal to login before running this script: - export export GLOBUS_COMPUTE_CLIENT_ID="uuid" & GLOBUS_COMPUTE_CLIENT_SECRET="uuid" - """ - endpoint_id = os.getenv("GLOBUS_COMPUTE_ENDPOINT") - check_endpoint(endpoint_id) diff --git a/orchestration/scripts/check_globus_transfer.py b/orchestration/scripts/check_globus_transfer.py new file mode 100644 index 0000000..816ef9e --- /dev/null +++ b/orchestration/scripts/check_globus_transfer.py @@ -0,0 +1,249 @@ +import argparse +from dotenv import load_dotenv +import os +import time + +import globus_sdk +from prefect import flow, task, get_run_logger +from typing import Tuple, Optional + +load_dotenv() + +# Set the client ID and fetch client secret from environment +CLIENT_ID: Optional[str] = os.getenv('GLOBUS_CLIENT_ID') +CLIENT_SECRET: Optional[str] = os.getenv('GLOBUS_CLIENT_SECRET') +SCOPES: str = "urn:globus:auth:scope:transfer.api.globus.org:all" + + +@task +def initialize_transfer_client() -> Tuple[Optional[globus_sdk.TransferClient], bool]: + """ + Initialize and return a Globus TransferClient using confidential client credentials. + + Returns: + Tuple[Optional[globus_sdk.TransferClient], bool]: The TransferClient object and a success flag. + """ + logger = get_run_logger() + try: + logger.info("Initializing TransferClient...") + confidential_client = globus_sdk.ConfidentialAppAuthClient(client_id=CLIENT_ID, + client_secret=CLIENT_SECRET) + cc_authorizer = globus_sdk.ClientCredentialsAuthorizer(confidential_client, SCOPES) + transfer_client = globus_sdk.TransferClient(authorizer=cc_authorizer) + logger.info("TransferClient initialized successfully.") + return transfer_client, True + except Exception as e: + logger.error(f"Failed to initialize TransferClient: {str(e)}") + return None, False + + +@task +def check_permissions(transfer_client: globus_sdk.TransferClient, endpoint_id: str) -> bool: + """ + Check and log file permissions and other relevant details of a Globus transfer endpoint. + + Args: + transfer_client (globus_sdk.TransferClient): An authenticated TransferClient object. + endpoint_id (str): The UUID of the endpoint to check. + + Returns: + bool: True if successful, False otherwise. + """ + logger = get_run_logger() + try: + endpoint = transfer_client.get_endpoint(endpoint_id) + logger.info(f"Endpoint ID: {endpoint['id']}") + logger.info(f"Endpoint display name: {endpoint['display_name']}") + logger.info(f"Endpoint owner: {endpoint['owner_string']}") + return True + except globus_sdk.GlobusAPIError as err: + logger.error(f"Error fetching endpoint information: {err.message}") + return False + + +@task +def list_directory(transfer_client: globus_sdk.TransferClient, endpoint_id: str, path: str = "") -> bool: + """ + List the contents of a specified directory on a Globus endpoint. + + Args: + transfer_client (globus_sdk.TransferClient): An authenticated TransferClient object. + endpoint_id (str): The UUID of the endpoint to list the directory on. + path (str): The path of the directory to list. + + Returns: + bool: True if successful, False otherwise. + """ + logger = get_run_logger() + start_time = time.time() + success = False + try: + response = transfer_client.operation_ls(endpoint_id, path=path) + logger.info(f"Contents of {path} in endpoint {endpoint_id}:") + if not response: + logger.info(f"No contents found in {path}.") + for item in response: + logger.info(f"{item['type']} - {item['name']}") + success = True + except globus_sdk.GlobusAPIError as err: + logger.error(f"Error accessing {path} in endpoint {endpoint_id}: {err.message}") + finally: + elapsed_time = time.time() - start_time + logger.info(f"list_directory task took {elapsed_time:.2f} seconds") + return success + + +@task +def create_directory(transfer_client: globus_sdk.TransferClient, + endpoint_id: str, + base_path: str = "", + directory_name: str = "test/") -> bool: + """ + Create a directory on a specified Globus endpoint. + + Args: + transfer_client (globus_sdk.TransferClient): An authenticated TransferClient object. + endpoint_id (str): The UUID of the endpoint to create the directory on. + base_path (str): The base path where the directory will be created. + directory_name (str): The name of the directory to create. + + Returns: + bool: True if successful, False otherwise. + """ + logger = get_run_logger() + start_time = time.time() + success = False + try: + if not base_path.endswith('/'): + base_path += '/' + if base_path.startswith('/'): + base_path = base_path.lstrip('/') + + full_path = base_path + directory_name + + if full_path.startswith('/'): + raise ValueError(f"Invalid directory path: {full_path}") + + transfer_client.operation_mkdir(endpoint_id, full_path) + logger.info(f"Successfully created directory {full_path} in endpoint {endpoint_id}.") + success = True + except (ValueError, globus_sdk.GlobusAPIError) as err: + logger.error(f"Error creating directory {full_path} in endpoint {endpoint_id}: {str(err)}") + finally: + elapsed_time = time.time() - start_time + logger.info(f"create_directory task took {elapsed_time:.2f} seconds") + return success + + +@task +def remove_directory(transfer_client: globus_sdk.TransferClient, endpoint_id: str, path: str = "test/") -> bool: + """ + Remove a directory on a specified Globus endpoint. + + Args: + transfer_client (globus_sdk.TransferClient): An authenticated TransferClient object. + endpoint_id (str): The UUID of the endpoint where the directory is located. + path (str): The path of the directory to remove. + + Returns: + bool: True if successful, False otherwise. + """ + logger = get_run_logger() + start_time = time.time() + success = False + try: + delete_data = globus_sdk.DeleteData(transfer_client, endpoint_id, recursive=True) + delete_data.add_item(path) + transfer_result = transfer_client.submit_delete(delete_data) + logger.info(f"Successfully submitted request to remove directory {path} in endpoint {endpoint_id}.") + logger.info(f"Task ID: {transfer_result['task_id']}") + success = True + except globus_sdk.GlobusAPIError as err: + logger.error(f"Error removing directory {path} in endpoint {endpoint_id}: {err.message}") + finally: + elapsed_time = time.time() - start_time + logger.info(f"remove_directory task took {elapsed_time:.2f} seconds") + return success + + +@flow(name="check-globus-transfer") +def check_globus_transfer_permissions(endpoint_id: str, + transfer_client: Optional[globus_sdk.TransferClient], + list_contents: bool = True, + create_test_directory: bool = True, + delete_test_directory: bool = True) -> None: + """ + Check permissions, list directory contents, create a test directory, + and remove a directory on a Globus endpoint. + + Args: + endpoint_id (str): The UUID of the endpoint to check. + transfer_client (Optional[globus_sdk.TransferClient]): An authenticated TransferClient object or None. + list_contents (bool, optional): Whether to list directory contents. Default is True. + create_test_directory (bool, optional): Whether to create a test directory. Default is True. + delete_test_directory (bool, optional): Whether to delete the test directory. Default is True. + """ + logger = get_run_logger() + if transfer_client is None: + transfer_client, success = initialize_transfer_client() + if not success or transfer_client is None: + logger.error("Failed to initialize TransferClient. Exiting flow.") + return + + success_check_permissions = check_permissions(transfer_client, endpoint_id) + logger.info(f"check_permissions successful: {success_check_permissions}") + + if list_contents: + logger.info("Listing / directory:") + success_list_directory = list_directory(transfer_client, endpoint_id, "") + logger.info(f"list_directory successful: {success_list_directory}") + + if create_test_directory: + new_directory_name = "test/" + success_create_directory = create_directory(transfer_client, endpoint_id, "", new_directory_name) + logger.info(f"create_directory successful: {success_create_directory}") + + if delete_test_directory: + success_remove_directory = remove_directory(transfer_client, endpoint_id, "test/") + logger.info(f"remove_directory successful: {success_remove_directory}") + + if list_contents and create_test_directory: + logger.info(f"Listing / directory after creating {new_directory_name}:") + success_list_directory_after = list_directory(transfer_client, endpoint_id, new_directory_name) + logger.info(f"list_directory (after creating test directory) successful: {success_list_directory_after}") + + +def main() -> None: + """ + Main function to parse command-line arguments and run the check_globus_transfer_permissions flow. + + Run from the command line: + python check_globus_transfer.py --endpoint_id "your-endpoint-id" + + Command-line arguments: + --endpoint_id (str): The UUID of the endpoint to operate on. + --list_contents (bool): Whether to list directory contents. Default is True. + --create_test_directory (bool): Whether to create a test directory. Default is True. + --delete_test_directory (bool): Whether to delete the test directory. Default is True. + """ + parser = argparse.ArgumentParser(description="Run Globus transfer operations on a specified endpoint.") + parser.add_argument('--endpoint_id', type=str, required=True, help="The UUID of the Globus endpoint.") + parser.add_argument('--list_contents', type=bool, default=True, help="Whether to list directory contents.") + parser.add_argument('--create_test_directory', type=bool, default=True, + help="Whether to create a test directory.") + parser.add_argument('--delete_test_directory', type=bool, default=True, + help="Whether to delete the test directory.") + + args = parser.parse_args() + + check_globus_transfer_permissions( + endpoint_id=args.endpoint_id, + transfer_client=None, + list_contents=args.list_contents, + create_test_directory=args.create_test_directory, + delete_test_directory=args.delete_test_directory + ) + + +if __name__ == "__main__": + main() diff --git a/orchestration/scripts/globus_tomopy_flow_init.py b/orchestration/scripts/globus_tomopy_flow_init.py index dab85e2..b34a7a5 100644 --- a/orchestration/scripts/globus_tomopy_flow_init.py +++ b/orchestration/scripts/globus_tomopy_flow_init.py @@ -1,7 +1,9 @@ -from dotenv import load_dotenv from globus_compute_sdk import Client, Executor -from orchestration.globus.flows import get_flows_client +from prefect.blocks.system import Secret +from prefect import task, flow, get_run_logger +from check_globus_compute import check_globus_compute_status +from orchestration.globus.flows import get_flows_client """ init.py only needs to be run once to authenticate the polaris (alcf) endpoint ID on the target machine. @@ -9,7 +11,19 @@ """ -def reconstruction_wrapper(rundir, parametersfile="inputOneSliceOfEach.txt"): +@task +def get_polaris_endpoint_id() -> str: + """ + Get the UUID of the Polaris endpoint on ALCF. + + :return: str - UUID of the Polaris endpoint. + """ + compute_endpoint_id = Secret.load("globus-compute-endpoint").get() + check_globus_compute_status(compute_endpoint_id) + return compute_endpoint_id + + +def reconstruction_wrapper(rundir, h5_file_name, folder_path): """ Python function that wraps around the application call for Tomopy reconstruction on ALCF @@ -21,23 +35,40 @@ def reconstruction_wrapper(rundir, parametersfile="inputOneSliceOfEach.txt"): str: confirmation message regarding reconstruction and time to completion """ import os - import subprocess import time - try: - start = time.time() + import subprocess + + rec_start = time.time() + + # Move to directory where data are located + os.chdir(rundir) + + # Run reconstruction.py + command = f"python /eagle/IRIBeta/als/example/test_recon.py {h5_file_name} {folder_path}" + recon_res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Move to directory where data are located - os.chdir(rundir) + rec_end = time.time() - # Run reconstruction.py - command = f"python /eagle/IRIBeta/als/example/reconstruction.py {parametersfile}" - res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print(f"Reconstructed data in {folder_path}/{h5_file_name} in {rec_end-rec_start} seconds;\n {recon_res}") - end = time.time() - return f"Reconstructed data in {parametersfile} in {end-start:.2f} seconds;\n {res.stdout.decode()}" + start = time.time() - except Exception as e: - return f"Error during reconstruction: {str(e)}" + # Convert tiff files to zarr + file_name = h5_file_name[:-3] if h5_file_name.endswith('.h5') else h5_file_name + command = ( + f"python /eagle/IRIBeta/als/example/tiff_to_zarr.py " + f"/eagle/IRIBeta/als/bl832_test/scratch/{folder_path}/rec{file_name}/" + ) + zarr_res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + end = time.time() + + print(f"Converted tiff files to zarr in {end-start} seconds;\n {zarr_res}") + + return ( + f"Reconstructed data specified in {folder_path} / {h5_file_name} in {rec_end-rec_start} seconds;\n" + f"{recon_res} \nConverted tiff files to zarr in {end-start} seconds;\n {zarr_res}" + ) def create_flow_definition(): @@ -82,23 +113,29 @@ def create_flow_definition(): return flow_definition -if __name__ == "__main__": +@flow(name="setup-reconstruction-flow") +def setup_reconstruction_flow() -> None: + logger = get_run_logger() + + # Login to Globus Compute Endpoint gc = Client() - dotenv_file = load_dotenv() - polaris_endpoint_id = "UUID" - gce = Executor(endpoint_id=polaris_endpoint_id) - future = gce.submit(reconstruction_wrapper, "/eagle/IRIBeta/als/example") - # print(future.result()) + gce = Executor(endpoint_id=get_polaris_endpoint_id()) + reconstruction_func = gc.register_function(reconstruction_wrapper) + logger.info(f"Registered function UUID: {reconstruction_func}") print(reconstruction_func) - future = gce.submit_to_registered_function(args=["/eagle/IRIBeta/als/example"], - function_id=reconstruction_func) - future.result() - flow_definition = create_flow_definition() - fc = get_flows_client() - flow = fc.create_flow(definition=flow_definition, title="Reconstruction flow", input_schema={}) + + flows_client = get_flows_client() + flow = flows_client.create_flow(definition=create_flow_definition(), + title="Reconstruction flow", + input_schema={}) flow_id = flow['id'] - print(flow) + logger.info(f"Created flow: {flow}") + logger.info(f"Flow UUID: {flow_id}") + flow_scope = flow['globus_auth_scope'] - print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}') - # set_key(dotenv_file, "flow_id", str(flow_id)) + logger.info(f'Flow scope:\n{flow_scope}') + + +if __name__ == "__main__": + setup_reconstruction_flow() diff --git a/orchestration/scripts/login_to_globus_and_prefect.sh b/orchestration/scripts/login_to_globus_and_prefect.sh new file mode 100755 index 0000000..8764f80 --- /dev/null +++ b/orchestration/scripts/login_to_globus_and_prefect.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Run this script to login to Globus and Prefect from env variables +# Example: source ./login_to_globus_and_prefect.sh +# Load environment variables from the .env file +if [ -f .env ]; then + source .env + +else + echo ".env file not found" + exit 1 +fi + +export GLOBUS_CLIENT_ID="$GLOBUS_CLIENT_ID" +export GLOBUS_CLIENT_SECRET="$GLOBUS_CLIENT_SECRET" +export GLOBUS_COMPUTE_CLIENT_ID="$GLOBUS_CLIENT_ID" +export GLOBUS_COMPUTE_CLIENT_SECRET="$GLOBUS_CLIENT_SECRET" +export PREFECT_API_KEY="$PREFECT_API_KEY" +export PREFECT_API_URL="$PREFECT_API_URL" \ No newline at end of file