diff --git a/examples/sfapi_NerscClient_example.ipynb b/examples/sfapi_NerscClient_example.ipynb new file mode 100644 index 0000000..a4a129c --- /dev/null +++ b/examples/sfapi_NerscClient_example.ipynb @@ -0,0 +1,153 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from orchestration.nersc import NerscClient" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "client_id_path = \"../clientid.txt\"\n", + "sfapi_key_path = \"../sfapi_training.json\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = NerscClient(client_id_path, sfapi_key_path)\n", + "\n", + "user = client.user()\n", + "user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "home_path = f\"/global/homes/{user.name[0]}/{user.name}\"\n", + "scratch_path = f\"/pscratch/sd/{user.name[0]}/{user.name}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### The job script below will run a simple python program to generate random numbers\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "N = 15\n", + "\n", + "job_script = f\"\"\"#!/bin/bash\n", + "\n", + "#SBATCH -q debug\n", + "#SBATCH -A als\n", + "#SBATCH -N 1\n", + "#SBATCH -C cpu\n", + "#SBATCH -t 00:10:00\n", + "#SBATCH -J sfapi-demo\n", + "#SBATCH --exclusive\n", + "#SBATCH --output={scratch_path}/nerscClient-test/sfapi-demo-%j.out\n", + "#SBATCH --error={scratch_path}/nerscClient-test/sfapi-demo-%j.error\n", + "\n", + "module load python\n", + "# Prints N random numbers to form a normal disrobution\n", + "python -c \"import numpy as np; numbers = np.random.normal(size={N}); [print(n) for n in numbers]\"\n", + "\"\"\" " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(job_script)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "### Make sure our output folder is there for our data to go to\n", + "client.perlmutter.run(f\"mkdir -p {scratch_path}/nerscClient-test\")\n", + "# We can run ls on the directory to see that it was created\n", + "[output_dir] = client.perlmutter.ls(f\"{scratch_path}/nerscClient-demo\", directory=True)\n", + "\n", + "\n", + "# Check that the directory is there\n", + "output_dir.is_dir()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "### Submit the job and wait for the job to complete\n", + "job = client.perlmutter.submit_job(job_script)\n", + "print(job)\n", + "# Let's save the job id to use later \n", + "job_id = job.jobid\n", + "\n", + "print(f\"Waiting for job {job_id} to finish!\")\n", + "# Wait for the job to finish\n", + "job.complete()\n", + "print(\"Done!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job.state\n", + "print(job.state)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/orchestration/flows/bl7012/config.py b/orchestration/flows/bl7012/config.py index 7fdace0..df44163 100644 --- a/orchestration/flows/bl7012/config.py +++ b/orchestration/flows/bl7012/config.py @@ -1,7 +1,7 @@ from globus_sdk import TransferClient from orchestration import globus -from orchestration.nersc import NerscClient +from ptycho_nersc import NerscPtychoClient class Config7012: @@ -14,7 +14,7 @@ def __init__( self.endpoints = globus.build_endpoints(config) self.apps = globus.build_apps(config) self.tc: TransferClient = globus.init_transfer_client(self.apps["als_transfer"]) - self.nersc = NerscClient( + self.nersc = NerscPtychoClient( path_client_id, path_priv_key, ) diff --git a/orchestration/flows/bl7012/ptycho_nersc.py b/orchestration/flows/bl7012/ptycho_nersc.py new file mode 100644 index 0000000..c2aaf04 --- /dev/null +++ b/orchestration/flows/bl7012/ptycho_nersc.py @@ -0,0 +1,48 @@ +import json +import logging +import time + +from authlib.integrations.requests_client import OAuth2Session +from authlib.oauth2.rfc7523 import PrivateKeyJWT + +from orchestration.ptycho_jobscript import ( + get_job_script, + cdtool_args_string, + ptychocam_args_string, + cdtools_parms, + ptychocam_parms, +) + +from orchestration.nersc import NerscClient + +class NerscPtychoClient(NerscClient): + def __init__( + self, + path_client_id, + path_priv_key, + logger=None, + ): + super().__init__(path_client_id, path_priv_key, logger) + + def cdtools(self, cxiname, path_job_script, path_cdtools_nersc, n_gpu, **kwargs): + args_string = cdtool_args_string( + cxiname, path_cdtools_nersc, cdtools_parms, **kwargs + ) + job_script = get_job_script(path_job_script, n_gpu, args_string) + self.logger.info(f"Job script: {job_script}") + + self.submit_job(job_script) + self.task_wait() + + def ptychocam( + self, cxiname, path_job_script, path_ptychocam_nersc, n_gpu, **kwargs + ): + args_string = ptychocam_args_string( + cxiname, path_ptychocam_nersc, ptychocam_parms, **kwargs + ) + job_script = get_job_script(path_job_script, n_gpu, args_string) + + self.logger.info(f"Job script: {job_script}") + + self.submit_job(job_script) + self.task_wait() diff --git a/orchestration/nersc.py b/orchestration/nersc.py index c3b5256..27e1808 100644 --- a/orchestration/nersc.py +++ b/orchestration/nersc.py @@ -1,20 +1,23 @@ import json import logging +from pathlib import Path import time from authlib.integrations.requests_client import OAuth2Session from authlib.oauth2.rfc7523 import PrivateKeyJWT +from authlib.jose import JsonWebKey -from orchestration.ptycho_jobscript import ( - get_job_script, - cdtool_args_string, - ptychocam_args_string, - cdtools_parms, - ptychocam_parms, -) +from sfapi_client import Client +from sfapi_client._sync.client import SFAPI_BASE_URL, SFAPI_TOKEN_URL +from sfapi_client.compute import Machine +# Temporary patch till the sfapi_client is updated +from sfapi_client.jobs import JobSacct +from sfapi_client.compute import Compute +JobSacct.model_rebuild() -class NerscClient: + +class NerscClient(Client): def __init__( self, path_client_id, @@ -28,10 +31,20 @@ def __init__( logging.basicConfig(level=logging.INFO) if logger is None else logger ) + # Reading the client_id and private key from the files self.client_id = None self.pri_key = None - self.session = None - self.init_session() + #self.session = None + self.init_client_info() + + + super().__init__(self.client_id, self.pri_key) + + # NERSC specific directory paths initialization + self.home_path = None + self.scratch_path = None + self.init_directory_paths() + self.task = None self.task_id = None self.job = None @@ -39,6 +52,7 @@ def __init__( self.job_state = None self.job_script_string = None self.has_ran = False + self.perlmutter = self.compute(Machine.perlmutter) def get_client_id(self): with open(self.path_client_id, "r") as f: @@ -46,132 +60,55 @@ def get_client_id(self): def get_private_key(self): with open(self.path_private_key, "r") as f: - self.private_key = f.read() + self.pri_key = JsonWebKey.import_key(json.loads(f.read())) - def init_session( - self, - token_url="https://oidc.nersc.gov/c2id/token", - grant_type="client_credentials", + def get_machine_status(self): + return self.perlmutter.status + + def init_client_info( + self ): self.get_client_id() self.get_private_key() - self.session = OAuth2Session( - self.client_id, - self.private_key, - PrivateKeyJWT(token_url), - grant_type=grant_type, - token_endpoint=token_url, - ) - self.session.fetch_token() + + def init_directory_paths(self): + self.home_path = f"/global/homes/{self.user().name[0]}/{self.user().name}" + self.scratch_path = f"/pscratch/sd/{self.user().name[0]}/{self.user().name}" def request_task_status(self): - r = self.session.get( - "https://api.nersc.gov/api/v1.2/tasks/{}".format(self.task_id) - ) - self.task = r.json() + """Could need session variable in class, if this function + is to be used, due to information access requirements.""" + pass def request_job_status(self): - r = self.session.get( - "https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter/{}".format( - self.jobid - ) - ) - self.job = r.json() - - def update_task_id(self): - self.task_id = self.task["task_id"] + self.job = self.perlmutter.job(jobid=self.jobid) def update_job_id(self): - task_result = self.task["result"] - if task_result is None: - self.logger.info(f"Waiting for job submission with task_id {self.task_id}") - elif isinstance(task_result, str): - r = json.loads(task_result) - self.jobid = r["jobid"] + if self.job is None: + self.logger.info(f"No job found") + else: + self.jobid = self.job.jobid def update_job_state(self): self.request_job_status() - job_output = self.job["output"] - if len(job_output) > 0: - self.job_state = job_output[0]["state"] - if all([not self.has_ran, self.job_state == "RUNNING"]): + self.job_state = self.job.state + + if self.job_state == "RUNNING": self.has_ran = True - elif self.has_ran & (len(job_output) == 0): - self.job_state = "COMPLETE" + elif self.job_state == "COMPLETE": self.logger.info(f"Job {self.jobid} with COMPLETE status") - def submit_job(self, script_string): + def submit_job(self, job_script): self.task = None self.job = None self.jobid = None self.task_id = None self.has_ran = False - job_api_url = "https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter" - data = {"job": script_string, "isPath": False} - r = self.session.post(job_api_url, data=data) - self.task = r.json() - self.update_task_id() - self.logger.info(f"Submitted task id: {self.task_id}") - - def cdtools(self, cxiname, path_job_script, path_cdtools_nersc, n_gpu, **kwargs): - args_string = cdtool_args_string( - cxiname, path_cdtools_nersc, cdtools_parms, **kwargs - ) - job_script = get_job_script(path_job_script, n_gpu, args_string) - self.logger.info(f"Job script: {job_script}") + self.job_script_string = job_script + #self.logger.info(f"Submitting job with script: {job_script}") + self.job = self.perlmutter.submit_job(job_script) + self.update_job_id() + #self.update_job_state() + #self.logger.info(f"Submitted job id: {self.jobid}") - self.submit_job(job_script) - self.task_wait() - - def ptychocam( - self, cxiname, path_job_script, path_ptychocam_nersc, n_gpu, **kwargs - ): - args_string = ptychocam_args_string( - cxiname, path_ptychocam_nersc, ptychocam_parms, **kwargs - ) - job_script = get_job_script(path_job_script, n_gpu, args_string) - - self.logger.info(f"Job script: {job_script}") - - self.submit_job(job_script) - self.task_wait() - - def task_wait(self, max_wait_seconds=600, sleep_time=2): - start = time.time() - self.job_state = "QUEUE" - done = False - self.update_task_id() - - while not done: - self.request_task_status() - self.logger.info(f"Task status: {self.task}") - elapsed = time.time() - start - - if elapsed > max_wait_seconds: - self.logger.info("Waiting for completion of task but time out ") - self.logger.info( - f"Configured to wait {max_wait_seconds}, elapsed is {elapsed} " - ) - self.logger.info("Job may complete in background.") - - if "id" not in list(self.task.keys()): - self.logger.info( - f"Task {self.task_id} not found and may have been completed" - ) - done = True - - elif (self.task["status"] == "completed") | (self.task["status"] == "new"): - if self.jobid is None: - self.update_job_id() - else: - self.update_job_state() - - if self.job_state == "COMPLETE": - done = True - self.logger.info(f"Job {self.jobid} has been completed") - self.logger.info(f"Job{self.jobid} with {self.job_state} status") - - time.sleep(sleep_time) - - return True diff --git a/requirements.txt b/requirements.txt index 27d4f1b..9aa7013 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ python-dotenv prefect==2.14.3 pyscicat @ git+https://github.com/dylanmcreynolds/pyscicat@fix_auth pyyaml -authlib \ No newline at end of file +authlib +sfapi_client \ No newline at end of file