diff --git a/.gitignore b/.gitignore index 4031af450..5251b51f3 100644 --- a/.gitignore +++ b/.gitignore @@ -149,3 +149,4 @@ htmlcov/* #VSCode files .vscode/* .vscode/launch.json +.code-workspace diff --git a/.vscode/launch.json b/.vscode/launch.json index 2436e2308..4dc237d60 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,17 +1,58 @@ -{ +{ // Use IntelliSense to learn about possible attributes. // Hover to view descriptions of existing attributes. // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ { - "name": "Python: Current File", + "name": "Python: RMG_Env", "type": "python", "request": "launch", - "program": "${file}", - "env": {"PYDEVD_WARN_EVALUATION_TIMEOUT": "500"}, + "program": "~/Code/RMG-Py/rmg.py", "console": "integratedTerminal", - "justMyCode": true + "justMyCode": false, + "args": ["${input:pickDir}/input.py"] + }, + { + "name": "Python: ARC_Env", + "type": "python", + "request": "launch", + "program": "~/Code/ARC/ARC.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": ["${input:pickDir}/input.yml"] + }, + { + "name": "Python: Restart_ARC_Env", + "type": "python", + "request": "launch", + "program": "~/Code/ARC/ARC.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": ["${input:pickDir}/restart.yml"] + }, + { + "name": "Python: T3_Env", + "type": "python", + "request": "launch", + "program": "~/Code/T3/T3.py", + "console": "integratedTerminal", + "justMyCode": true, + "args": ["${input:pickDir}/input.yml"] + } + ], + "inputs": [ + { + "id": "pickDir", + "type": "command", + "command": "extension.commandvariable.file.pickFile", + "args":{ + "include":"**/*", + "display": "fileName", + "description": "Subdirectory to process", + "showDirs": true, + "fromFolder":{"fixed":"/home/calvin/Code/Runs/"} + } } ] -} \ No newline at end of file +} diff --git a/T3.code-workspace b/T3.code-workspace index 52f20d45b..bd74af2b5 100644 --- a/T3.code-workspace +++ b/T3.code-workspace @@ -1,13 +1,10 @@ { "folders": [ { - "path": "." - }, - { - "path": "../ARC" + "path": "../../../../.t3" }, { - "path": "../RMG-Py" + "path": "." } ], "settings": {} diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 72d9378c1..000000000 --- a/requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -# Required python packages - -ase >= 3.15.0 -cclib >= 1.6 -coverage -cython >= 0.25.2 -mako -matplotlib >= 2.2.2 -mkdocs-material >= 5.1.7 -mkdocs-material-extensions -numpy == 1.15.4 -paramiko == 2.6.0 -pandas -py3Dmol == 0.8.0 -pyyaml -pydantic -pydas -pytest -qcelemental diff --git a/t3/main.py b/t3/main.py index ff2341df9..b99170824 100755 --- a/t3/main.py +++ b/t3/main.py @@ -52,6 +52,7 @@ from t3.common import PROJECTS_BASE_PATH, VALID_CHARS, delete_root_rmg_log, get_species_by_label, time_lapse from t3.logger import Logger from t3.runners.rmg_runner import rmg_runner +from t3.runners.rmg_adapter import RMGAdapter from t3.schema import InputBase from t3.simulate.factory import simulate_factory from t3.utils.writer import write_pdep_network_file, write_rmg_input_file @@ -618,29 +619,29 @@ def run_rmg(self, restart_rmg: bool = False): and not os.path.isdir(self.paths['RMG T3 kinetics lib']): self.rmg['database']['kinetics_libraries'].pop(self.rmg['database']['kinetics_libraries'].index(t3_kinetics_lib)) - write_rmg_input_file( - rmg=self.rmg, - t3=self.t3, - iteration=self.iteration, - path=self.paths['RMG input'], - walltime=self.t3['options']['max_RMG_walltime'], - ) + + # Creating the RMG Adapter class - allow for SSH + # We will need self.rmg + rmg_adapter = RMGAdapter( + rmg=self.rmg, + t3=self.t3, + iteration=self.iteration, + paths=self.paths, + logger=self.logger, + walltime=self.t3['options']['max_RMG_walltime'], + max_iterations=self.t3['options']['max_rmg_iterations'], + verbose=self.verbose, + t3_project_name=self.project, + restart_rmg=restart_rmg, + ) + rmg_adapter.run_rmg() if not os.path.isfile(self.paths['RMG input']): raise ValueError(f"The RMG input file {self.paths['RMG input']} could not be written.") tic = datetime.datetime.now() - + max_rmg_exceptions_allowed = self.t3['options']['max_RMG_exceptions_allowed'] - rmg_exception_encountered = rmg_runner(rmg_input_file_path=self.paths['RMG input'], - job_log_path=self.paths['RMG job log'], - logger=self.logger, - memory=self.rmg['memory'] * 1000 if self.rmg['memory'] is not None else None, - max_iterations=self.t3['options']['max_rmg_iterations'], - verbose=self.verbose, - t3_project_name=self.project, - rmg_execution_type=self.rmg['rmg_execution_type'], - restart_rmg=restart_rmg, - ) - if rmg_exception_encountered: + + if rmg_adapter.rmg_exception_encountered: self.rmg_exceptions_counter += 1 if self.rmg_exceptions_counter > max_rmg_exceptions_allowed: self.logger.error(f'This is the {get_number_with_ordinal_indicator(self.rmg_exceptions_counter)} ' @@ -652,10 +653,14 @@ def run_rmg(self, restart_rmg: bool = False): f'This is the {get_number_with_ordinal_indicator(self.rmg_exceptions_counter)} ' f'exception raised by RMG.\n' f'The maximum number of exceptions allowed is {max_rmg_exceptions_allowed}.') - elapsed_time = time_lapse(tic) self.logger.info(f'RMG terminated, execution time: {elapsed_time}') + + + + + # def determine_species_and_reactions_to_calculate(self) -> bool: """ Determine which species and reactions in the executed RMG job should be calculated. diff --git a/t3/runners/rmg_adapter.py b/t3/runners/rmg_adapter.py new file mode 100644 index 000000000..c4a7d5c81 --- /dev/null +++ b/t3/runners/rmg_adapter.py @@ -0,0 +1,740 @@ + +import os +import math +import shutil +import datetime +import time + +from mako.template import Template +from t3.utils.writer import to_camel_case +from t3.common import get_rmg_species_from_a_species_dict +from t3.utils.generator import generate_radicals +from t3.imports import local_t3_path, settings, submit_scripts +from t3.utils.ssh import SSHClient + + +METHOD_MAP = {'CSE': 'chemically-significant eigenvalues', + 'RS': 'reservoir state', + 'MSC': 'modified strong collision', + } + +RMG_EXECUTION_TYPE = settings['execution_type']['rmg'] +MAX_RMG_RUNS_PER_ITERATION = 5 # TODO: Why is this hard-coded? +submit_filenames = settings['submit_filenames'] +rmg_memory = settings['rmg_initial_memory'] +if RMG_EXECUTION_TYPE == 'queue': + SERVER = list(settings['servers'].keys())[0] + CPUS = settings['servers'][SERVER]['cpus'] + MEMORY = settings['servers'][SERVER]['memory'] + CLUSTER_SOFT = settings['servers'][SERVER]['cluster_soft'] +elif RMG_EXECUTION_TYPE == 'local': + SERVER = 'local' +elif RMG_EXECUTION_TYPE == 'incore': + SERVER = 'incore' +else: + raise ValueError(f'RMG execution type {RMG_EXECUTION_TYPE} is not supported.') + + + +class RMGAdapter(object): + + def __init__(self, + rmg: dict, + t3: dict, + iteration: int, + paths: dict, + logger: 'Logger', + walltime: str="00:00:00", + cpus: int=1, + memory: str="8G", # TODO: make this a parameter + max_iterations: int=1, + verbose: bool=False, + t3_project_name: str=None, + rmg_execution_type: str='incore', + restart_rmg: bool=False, + server: str=None, + testing: bool=False, + ): + self.rmg = rmg.copy() + self.t3 = t3.copy() + self.iteration = iteration + self.paths = paths + self.walltime = walltime + self.rmg_path = self.paths['RMG'] + self.rmg_input_file_path = self.paths['RMG input'] + self.max_cpus = CPUS if CPUS else cpus + self.max_memory = MEMORY if MEMORY else memory + self.t3_project_name = t3_project_name + self.max_iterations = max_iterations + self.rmg_execution_type = RMG_EXECUTION_TYPE or rmg_execution_type + if self.rmg_execution_type == 'queue': + self.max_job_time = settings['servers'][SERVER]['max_job_time'] + self.server = server or SERVER + self.testing = testing + self.logger = logger + self.previous_job_status = None + self.time_running = 0 + self.restart_rmg = restart_rmg + + if not os.path.isdir(local_t3_path): + os.makedirs(local_t3_path) + + self.files_to_upload = list() + self.folder_to_download = None + self.rmg_errors = list() + self.rmg_run_count = 0 + self.cont_run_rmg = True + self.dict_of_custom_libraries = dict() + if self.rmg_execution_type == 'queue': + self.set_file_paths() + # We need to check if the strings in the rmg database kintetic library are path to the files + for library_key, library_value in self.rmg['database'].items(): + if isinstance(library_value, list): + for library_item in range(len(library_value)): + if os.path.isdir(self.rmg['database'][library_key][library_item]): + + # Make the library item name the key, and then inside that key there are two keys: local and remote + # The local key will be the path to the library item on the local machine + # The remote key will be the path to the library item on the remote machine + if os.path.basename(self.rmg['database'][library_key][library_item]) not in self.dict_of_custom_libraries: + self.dict_of_custom_libraries[os.path.basename(self.rmg['database'][library_key][library_item])] = dict() + self.dict_of_custom_libraries[os.path.basename(self.rmg['database'][library_key][library_item])]['local'] = self.rmg['database'][library_key][library_item] + self.dict_of_custom_libraries[os.path.basename(self.rmg['database'][library_key][library_item])]['remote'] = os.path.join(self.remote_path, os.path.basename(self.rmg['database'][library_key][library_item])) + # Add the library item to the dict of custom libraries + # We now need to change it's path to the path on the server + self.rmg['database'][library_key][library_item] = os.path.join(self.remote_path, os.path.basename(self.rmg['database'][library_key][library_item])) + + + def run_rmg(self): + """ + Run RMG + """ + if self.rmg_execution_type == 'incore': + self.execute_incore() + elif self.rmg_execution_type == 'local' or self.rmg_execution_type == 'queue': + while self.cont_run_rmg: + self.set_cpu_and_mem() + self.set_file_paths() + self.set_files() + + + if self.rmg_execution_type == 'queue': + self.rmg_run_count += 1 + self.execute_queue() + # While the job is running, periodically check the status of the job + while self.job_status == 'running': + # Wait for 5 minutes before checking again + time.sleep(300) + self.time_running += 300 + self.determine_rmg_job_status() + # Log the status of the job + # If we do it every 5 mins, the log file will be flooded with the same message + # So only log if the status has changed or if 30 mins have passed + + if self.job_status != self.previous_job_status or self.time_running % 1800 == 0: + self.logger.info(f'RMG-{self.iteration}_iteration job status: {self.job_status}') + self.previous_job_status = self.job_status + + # Once the job is done, download the results + if self.job_status == 'done': + # Log that the job is done and will download the results + self.logger.info(f'RMG-{self.iteration}_iteration job status: {self.job_status}, downloading results...') + self.download_files() + + # Need to check for convergence or errors + self.check_convergance() + self.convergance() + # Get local err file path + err_path = os.path.join(self.local_rmg_path, 'err.txt') + if os.path.isfile(err_path): + os.rename(err_path, os.path.join(self.local_rmg_path, f'err_{datetime.datetime.now().strftime("%b%d_%Y_%H:%M:%S")}.txt')) + self.rmg_errors.append(self.error) + else: + raise ValueError(f'RMG execution type {self.rmg_execution_type} is not supported.') + + # set self.rmg_exceptions the opposite of self.rmg_converged + self.rmg_exception_encountered = not self.rmg_converged + + + def write_rmg_input_file(self): + """ + Write an RMG input file to the given file path. + Will create the directory if needed. + """ + rmg= self.rmg.copy() + rmg_input = '' + iteration = self.iteration - 1 # iteration is 1-indexed, convert to 0-indexed for list indexing + + # Database + database = rmg['database'] + # The following args types could be either str or list, detect str and format accordingly + if isinstance(database['kinetics_depositories'], str) and database['kinetics_depositories'][0] != "'": + database['kinetics_depositories'] = f"'{database['kinetics_depositories']}'" + if isinstance(database['kinetics_estimator'], str) and database['kinetics_estimator'][0] != "'": + database['kinetics_estimator'] = f"'{database['kinetics_estimator']}'" + if isinstance(database['kinetics_families'], str) and database['kinetics_families'][0] != "'": + database['kinetics_families'] = f"'{database['kinetics_families']}'" + database_template = """database( +thermoLibraries=${thermo_libraries}, +reactionLibraries=${kinetics_libraries}, +transportLibraries=${transport_libraries}, +seedMechanisms=${seed_mechanisms}, +kineticsDepositories=${kinetics_depositories}, +kineticsFamilies=${kinetics_families}, +kineticsEstimator=${kinetics_estimator}, +) +""" + + rmg_input += Template(database_template).render(**database) + # species + species = rmg['species'] + species_template = """ +species( + label='${label}', + reactive=${reactive}, + structure=${structure}, +) +""" + for spc in species: + if spc['adjlist'] is not None: + structure = f'adjacencyList("""' + structure += spc['adjlist'] + structure += '""")' + elif spc['smiles'] is not None: + structure = f"SMILES('{spc['smiles']}')" + elif spc['inchi'] is not None: + structure = f"InChI('{spc['inchi']}')" + else: + raise ValueError(f"A species must have either an adjlist, smiles, or inchi. Species {spc['label']} has none of these.") + rmg_input += Template(species_template).render(label=spc['label'], + reactive=spc['reactive'], + structure=structure) + if spc['seed_all_rads'] is not None: + species_to_process = get_rmg_species_from_a_species_dict(species_dict=spc, raise_error=False) + if species_to_process is not None: + radical_tuples = generate_radicals(species=species_to_process, + types=spc['seed_all_rads'], + ) + for radical_tuple in radical_tuples: + rmg_input += Template(species_template).render(label=radical_tuple[0], + reactive=True, + structure=f"SMILES('{radical_tuple[1]}')") + # reactors + reactors = rmg['reactors'] + gas_batch_constant_t_p_template = """ +simpleReactor( + temperature=${temperature}, + pressure=${pressure}, + initialMoleFractions={${concentrations()} }, + ${termination} + nSims=${conditions_per_iteration},${balance}${constant} +) +<%def name="concentrations()"> +% for spc in species_list: + % if isinstance(spc["concentration"], (int, float)): + '${spc["label"]}': ${spc["concentration"]}, + % endif + % if isinstance(spc["concentration"], (tuple, list)): + '${spc["label"]}': [${spc["concentration"][0]}, ${spc["concentration"][1]}], + % endif +% endfor + +""" + liquid_batch_constant_t_v_template = """ +liquidReactor( + temperature=${temperature}, + initialConcentrations={${concentrations()} }, + ${termination} + nSims=${conditions_per_iteration},${constant} +) +<%def name="concentrations()"> +% for spc in species_list: + % if isinstance(spc["concentration"], (int, float)): + '${spc["label"]}': (${spc["concentration"]}, 'mol/cm^3'), + % endif + % if isinstance(spc["concentration"], (tuple, list)): + '${spc["label"]}': [(${spc["concentration"][0]}, 'mol/cm^3'), (${spc["concentration"][1]}, 'mol/cm^3')], + % endif +% endfor + +""" + for reactor in reactors: + if isinstance(reactor['T'], float): + temperature = f"({reactor['T']}, 'K')" + elif isinstance(reactor['T'], list): + temperature = [(t, 'K') for t in reactor['T']] + else: + raise ValueError(f"The reactor temperature must be a float or a list,\n" + f"got {reactor['T']} which is a {type(reactor['T'])}.") + if 'species_list' in reactor.keys(): + # This is relevant when a simulate adapter breaks ranged reactors down to individual conditions. + species_list = reactor['species_list'] + else: + # This is the base case when T3 generates an RMG input file for model generation. + species_list = [{'label': spc['label'], 'concentration': spc['concentration']} for spc in species + if isinstance(spc['concentration'], (list, tuple)) + or (isinstance(spc['concentration'], (float, int)) and spc['concentration'] > 0) + or spc['balance'] or not spc['reactive']] + species_list.sort(key=lambda spc: spc['concentration'][0] if isinstance(spc['concentration'], (tuple, list)) + else spc['concentration'], reverse=True) + termination = '' + if reactor['termination_conversion'] is not None: + termination += f"terminationConversion={reactor['termination_conversion']}," + if reactor['termination_time'] is not None: + termination += '\n ' if termination else '' + termination += f"terminationTime={reactor['termination_time']}," + if reactor['termination_rate_ratio'] is not None: + termination += '\n ' if termination else '' + termination += f"terminationRateRatio={reactor['termination_rate_ratio']}," + constant = '' + for spc in species: + if spc['constant']: + if not constant: + constant = '\n constantSpecies=[' + constant += f"'{spc['label']}', " + constant += '],' if constant else '' + if reactor['type'] == 'gas batch constant T P': + if isinstance(reactor['P'], float): + pressure = f"({reactor['P']}, 'bar')" + elif isinstance(reactor['P'], list): + pressure = [(p, 'bar') for p in reactor['P']] + else: + raise ValueError(f"The reactor pressure must be a float or a list,\n" + f"got {reactor['P']} which is a {type(reactor['P'])}.") + balance = '' + for spc in species: + if spc['balance']: + balance = f"\n balanceSpecies='{spc['label']}'," + break + rmg_input += Template(gas_batch_constant_t_p_template).render( + temperature=temperature, + pressure=pressure, + species_list=species_list, + termination=termination, + conditions_per_iteration=reactor['conditions_per_iteration'], + balance=balance, + constant=constant, + ) + elif reactor['type'] == 'liquid batch constant T V': + rmg_input += Template(liquid_batch_constant_t_v_template).render( + temperature=temperature, + species_list=species_list, + termination=termination, + conditions_per_iteration=reactor['conditions_per_iteration'], + constant=constant, + ) + + # Solvent + solvent_template = """solvation(solvent='${solvent}') + +""" + solvent = '' + for spc in species: + # The schema assures that there is only one species define as a solvent + # TODO: Assue that the requested solvent actually exists in the RMG database + if spc['solvent']: + solvent = spc['label'] + break + + if solvent: + rmg_input += Template(solvent_template).render(solvent=solvent) + + # Model + model_input = rmg['model'] + model_template = """model( + toleranceMoveToCore=${tol_move_to_core}, + toleranceInterruptSimulation=${tolerance_interrupt_simulation},${args} +) +""" + model = dict() + model['tol_move_to_core'] = model_input['core_tolerance'][iteration]\ + if len(model_input['core_tolerance']) >= iteration + 1 else model_input['core_tolerance'][-1] + model['tolerance_interrupt_simulation'] = model_input['tolerance_interrupt_simulation'][iteration] \ + if len(model_input['tolerance_interrupt_simulation']) >= iteration + 1 \ + else model_input['tolerance_interrupt_simulation'][-1] + model_keys_to_skip = ['core_tolerance', 'tolerance_interrupt_simulation', 'atol', 'rtol', 'sens_atol', 'sens_rtol'] + args = '' + for key, value in model_input.items(): + if key not in model_keys_to_skip and value is not None: + args += f"\n {to_camel_case(uv=key)}={value}," + model['args'] = args + rmg_input += Template(model_template).render(**model) + + # Simulator + if self.t3['sensitivity'] is not None and self.t3['sensitivity']['adapter'] == 'RMGConstantTP': + simulator_template = """\nsimulator(atol=${atol}, rtol=${rtol}, sens_atol=${sens_atol}, sens_rtol=${sens_rtol})\n""" + rmg_input += Template(simulator_template).render(atol=model_input['atol'], + rtol=model_input['rtol'], + sens_atol=self.t3['sensitivity']['atol'], + sens_rtol=self.t3['sensitivity']['rtol'] + ) + else: + simulator_template = """\nsimulator(atol=${atol}, rtol=${rtol})\n""" + rmg_input += Template(simulator_template).render(atol=model_input['atol'], + rtol=model_input['rtol'], + ) + + # PressureDependence + if rmg['pdep'] is not None: + pdep = rmg['pdep'].copy() + pdep_template = """ +pressureDependence( + method='${method}', + maximumGrainSize=(${max_grain_size}, 'kJ/mol'), + minimumNumberOfGrains=${max_number_of_grains}, + temperatures=(${T_min}, ${T_max}, 'K', ${T_count}), + pressures=(${P_min}, ${P_max}, 'bar', ${P_count}), + interpolation=${interpolation}, + maximumAtoms=${max_atoms}, +) +""" + pdep['method'] = METHOD_MAP[pdep['method']] if pdep['method'] not in METHOD_MAP.values() else pdep['method'] + pdep['T_min'], pdep['T_max'], pdep['T_count'] = pdep['T'] + pdep['P_min'], pdep['P_max'], pdep['P_count'] = pdep['P'] + del pdep['T'] + del pdep['P'] + if pdep['interpolation'] == 'PDepArrhenius': + pdep['interpolation'] = ('PDepArrhenius',) + else: + pdep['interpolation'] = ('Chebyshev', pdep['T_basis_set'], pdep['P_basis_set']) + del pdep['T_basis_set'] + del pdep['P_basis_set'] + rmg_input += Template(pdep_template).render(**pdep) + + # options + options = rmg['options'] + if options is not None: + options_template = """ +options( + name='${seed_name}', + generateSeedEachIteration=${generate_seed_each_iteration}, + saveSeedToDatabase=${save_seed_to_database}, + units='${units}', + generateOutputHTML=${save_html}, + generatePlots=${generate_plots}, + saveSimulationProfiles=${save_simulation_profiles}, + verboseComments=${verbose_comments}, + saveEdgeSpecies=${save_edge}, + keepIrreversible=${keep_irreversible}, + trimolecularProductReversible=${trimolecular_product_reversible}, + wallTime='${walltime}', + saveSeedModulus=${save_seed_modulus}, +) +""" + options['walltime'] = self.walltime + rmg_input += Template(options_template).render(**options) + + # generatedSpeciesConstraints + species_constraints = rmg['species_constraints'] + if species_constraints is not None: + species_constraints_template = """ +generatedSpeciesConstraints( + allowed=${allowed}, + maximumCarbonAtoms=${max_C_atoms}, + maximumOxygenAtoms=${max_O_atoms}, + maximumNitrogenAtoms=${max_N_atoms}, + maximumSiliconAtoms=${max_Si_atoms}, + maximumSulfurAtoms=${max_S_atoms}, + maximumHeavyAtoms=${max_heavy_atoms}, + maximumRadicalElectrons=${max_radical_electrons}, + maximumSingletCarbenes=${max_singlet_carbenes}, + maximumCarbeneRadicals=${max_carbene_radicals}, + allowSingletO2=${allow_singlet_O2}, +) +""" + rmg_input += Template(species_constraints_template).render(**species_constraints) + + if not os.path.isdir(os.path.dirname(self.rmg_input_file_path)): + os.makedirs(os.path.dirname(self.rmg_input_file_path)) + with open(self.rmg_input_file_path, 'w') as f: + f.writelines(rmg_input) + + def set_cpu_and_mem(self): + """ + Set cpu and memory based on ESS and cluster software. + This is not an abstract method and should not be overwritten. + """ + if self.max_memory < rmg_memory: + self.self.logger.warning(f'The maximum memory of the server is {self.max_memory} GB, which is less than the defined ' + f'RMG memory {rmg_memory} GB set by the user. The RMG memory will be set to {self.max_memory}') + total_submit_script_memory = self.max_memory + else: + total_submit_script_memory = rmg_memory + + # Need to determine if the user provide GB or MB + if self.max_memory > 1E3: + self.max_memory /= 1E3 + # Convert total_submit_script_memory to MB + total_submit_script_memory *= 1E3 + # Determine amount of memory in submit script based on cluster job scheduling system. + cluster_software = CLUSTER_SOFT.lower() if SERVER is not None else None + if cluster_software in ['oge', 'sge', 'htcondor']: + # In SGE, "-l h_vmem=5000M" specifies the memory for all cores to be 5000 MB. + self.submit_script_memory = math.ceil(total_submit_script_memory) # in MB + if cluster_software in ['pbs']: + # In PBS, "#PBS -l select=1:ncpus=8:mem=12000000" specifies the memory for all cores to be 12 MB. + self.submit_script_memory = math.ceil(total_submit_script_memory) * 1E6 # in Bytes + elif cluster_software in ['slurm']: + # In Slurm, "#SBATCH --mem=2000" specifies the memory to be 2000 MB. + self.submit_script_memory = math.ceil(total_submit_script_memory) # in MB + self.set_input_file_memory() + + def set_files(self) -> None: + """ + Set files to be uploaded and downloaded. Writes the files if needed. + Modifies the self.files_to_upload and self.files_to_download attributes. + + self.files_to_download is a list of remote paths. + + self.files_to_upload is a list of dictionaries, each with the following keys: + ``'name'``, ``'source'``, ``'make_x'``, ``'local'``, and ``'remote'``. + If ``'source'`` = ``'path'``, then the value in ``'local'`` is treated as a file path. + Else if ``'source'`` = ``'input_files'``, then the value in ``'local'`` will be taken + from the respective entry in inputs.py + If ``'make_x'`` is ``True``, the file will be made executable. + """ + # 1. ** Upload ** + # 1.1. Submit script + if self.rmg_execution_type != 'incore': + # We need a submit script (submitted local or SSH) + self.write_submit_script() + + self.files_to_upload.append(self.get_file_property_dictionary( + file_name=submit_filenames[CLUSTER_SOFT])) + # 1.2. RMG input file + self.write_rmg_input_file() + # 1.3 Custom Libraries + # Need to upload the custom libraries if they exist and are not already uploaded + if self.dict_of_custom_libraries: + for lib_name, lib_paths in self.dict_of_custom_libraries.items(): + if lib_paths['local'] not in self.files_to_upload: + self.files_to_upload.append(self.get_file_property_dictionary( + file_name=lib_name, + local=lib_paths['local'], + remote=lib_paths['remote'])) + + # If this a restart, we need to upload the restart file + if self.restart_rmg: + restart_string = "restartFromSeed(path='seed')" + with open(self.rmg_input_file_path, 'r') as f: + content = f.read() + seed_path = os.path.join(self.local_rmg_path, 'seed') + if restart_string not in content and os.path.isdir(seed_path) and os.listdir(seed_path): + if os.path.isfile(os.path.join(self.local_rmg_path, 'restart_from_seed.py')): + os.rename(src=os.path.join(self.local_rmg_path, 'input.py'), + dst=os.path.join(self.local_rmg_path, 'input.py.old')) + os.rename(src=os.path.join(self.local_rmg_path, 'restart_from_seed.py'), + dst=os.path.join(self.local_rmg_path, 'input.py')) + elif os.path.isfile(os.path.join(self.local_rmg_path, 'input.py')): + with open(os.path.join(self.local_rmg_path, 'input.py'), 'r') as f: + content = f.read() + with open(os.path.join(self.local_rmg_path, 'input.py'), 'w') as f: + f.write(restart_string + '\n\n' + content) + + self.files_to_upload.append(self.get_file_property_dictionary(file_name='input.py')) + + # 2. ** Download ** + # 2.1. RMG output folder + self.folder_to_download = self.remote_path + + def set_additional_file_paths(self) -> None: + """ + Set additional file paths to be uploaded and downloaded. + Modifies the self.additional_file_paths attribute. + """ + pass + + def set_input_file_memory(self) -> None: + """ + Set the self.input_file_memory attribute. + """ + pass + + def execute_incore(self) -> None: + """ + Execute the job incore. + """ + pass + + def execute_queue(self) -> None: + """ + Execute the job in the queue. + """ + self.upload_files() + with SSHClient(SERVER) as ssh: + self.job_status, self.job_id = ssh.submit_job(remote_path=self.remote_path) + self.logger.info(f'Submitted job {self.job_id} to {SERVER}') + + def execute_local(self) -> None: + """ + Execute the job. + """ + if SERVER == 'local': + self.job_status, self.job_id = self.submit_job() + + def write_submit_script(self) -> None: + """ + Write the submit script. + """ + if SERVER is None: + return + if self.max_job_time < self.walltime: + self.walltime = self.max_job_time + + try: + submit_script = submit_scripts['rmg'][settings['servers'][SERVER]['cluster_soft']].format( + name=f'{self.t3_project_name}_RMG_{self.iteration}', + max_job_time=self.max_job_time, + cpus=self.max_cpus, + memory=self.submit_script_memory, + max_iterations=f" -i {self.ax_iterations}" if self.max_iterations else "", + ) + except KeyError as e: + raise KeyError(f'Invalid key in submit script: {e}') + + # Make sure folder exists + os.makedirs(self.rmg_path,exist_ok=True) + # Write submit script to file + with open(os.path.join(self.rmg_path, submit_filenames[CLUSTER_SOFT]), 'w') as f: + f.write(submit_script) + + + + + def set_file_paths(self) -> None: + """ + Set local and remote file paths. + """ + + self.local_iteration_path = self.paths['iteration'] + self.local_rmg_path = self.paths['RMG'] + + + if SERVER != 'incore': + path = settings['servers'][SERVER].get('path','').lower() + path = os.path.join(path, settings['servers'][SERVER]['un']) if path else '' + self.remote_path = os.path.join(path, 'runs', 'T3_Projects', self.t3_project_name, f"iteration_{self.iteration}", 'RMG') + + # Get additional file paths - but I think we copy the whole folder of RMG from Remote to Local + self.set_additional_file_paths() + + def get_file_property_dictionary(self, + file_name: str, + local: str='', + remote: str='', + source: str='path', + make_x: bool=False) -> dict: + """ + Get a dictionary that represents a file to be uploaded or downloaded to/from a server via SSH. + + Args: + file_name (str): The file name. + local (str, optional): The full local path. + remote (str, optional): The full remote path. + source (str, optional): Either ``'path'`` to treat the ``'local'`` attribute as a file path, + or ``'input_files'`` to take the respective entry from inputs.py. + make_x (bool, optional): Whether to make the file executable, default: ``False``. + + Returns: + dict: A file representation. + """ + if not file_name: + raise ValueError('file_name must be specified') + if source not in ['path', 'input_files']: + raise ValueError(f'source must be either "path" or "input_files", got {source}') + local = local or os.path.join(self.local_rmg_path, file_name) + remote = remote or os.path.join(self.remote_path, file_name) if self.remote_path else None + return{'file_name': file_name, + 'local': local, + 'remote': remote, + 'source': source, + 'make_x': make_x} + + def upload_files(self) -> None: + if not self.testing: + if self.rmg_execution_type != 'incore' and self.server != 'local': + # If the job execution type is incore, then no need to upload any files. + # Also, even if the job is submitted to the que, no need to upload files if the server is local. + with SSHClient(self.server) as ssh: + for up_file in self.files_to_upload: + self.logger.debug(f"Uploading {up_file['file_name']} source {up_file['source']} to {self.server}") + if up_file['source'] == 'path': + ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=up_file['local']) + elif up_file['source'] == 'input_files': + ssh.upload_file(remote_file_path=up_file['remote'], file_string=up_file['local']) + else: + raise ValueError(f"Unclear file source for {up_file['file_name']}. Should either be 'path' or " + f"'input_files', got: {up_file['source']}") + if up_file['make_x']: + ssh.change_mode(mode='+x', file_name=up_file['file_name'], remote_path=self.remote_path) + else: + # running locally, just copy the check file, if exists, to the job folder + for up_file in self.files_to_upload: + if up_file['file_name'] == 'check.chk': + try: + shutil.copyfile(src=up_file['local'], dst=os.path.join(self.local_path, 'check.chk')) + except shutil.SameFileError: + pass + self.initial_time = datetime.datetime.now() + + def determine_rmg_job_status(self) -> None: + """ + Determine the RMG job status. + """ + if self.rmg_execution_type == 'incore': + self.job_status = 'running' + else: + with SSHClient(self.server) as ssh: + self.job_status = ssh.check_job_status(job_id=self.job_id) + + def download_files(self) -> None: + """ + Download the relevant files. + """ + if self.rmg_execution_type != 'incore' and self.server != 'local': + # If the job execution type is incore, then no need to download any files. + # Also, even if the job is submitted to the queue, no need to download files if the server is local. + with SSHClient(self.server) as ssh: + ssh.download_folder(remote_folder_path=self.remote_path, local_folder_path=self.local_rmg_path) + + def check_convergance(self) -> None: + self.rmg_converged, self.error = False, None + rmg_log_path = os.path.join(self.local_rmg_path, 'RMG.log') + rmg_err_path = os.path.join(self.local_rmg_path, 'err.txt') + if os.path.isfile(rmg_log_path): + with open(rmg_log_path, 'r') as f: + # Read the lines of the log file in reverse order + lines = f.readlines()[::-1] + for line in lines: + if 'MODEL GENERATION COMPLETED' in line: + self.rmg_converged = True + break + if not self.rmg_converged and os.path.isfile(rmg_err_path): + with open(rmg_err_path, 'r') as f: + lines = f.readlines()[::-1] + for line in lines: + if 'Error' in line: + self.error = line.strip() + break + return self.rmg_converged, self.error + + def convergance(self) -> None: + + if not self.rmg_converged: + if self.error is not None: + self.logger.error(f'RMG job {self.job_id} failed with error: {self.error}') + # Check if memory was the failure - TODO: this is not working + #new_memory = self.get_new_memory_rmg_run() + + self.cont_run_rmg = not self.rmg_converged \ + and self.rmg_run_count < MAX_RMG_RUNS_PER_ITERATION \ + and not(len(self.rmg_errors)) >=2 and self.error is not None \ + and self.error == self.rmg_errors[-2] + self.restart_rmg = False if self.error is not None and 'Could not find one or more of the required files/directories ' \ + 'for restarting from a seed mechanism' in self.error else True + + def get_new_memory_rmg_run(self) -> None: + """ + Get a new memory for the RMG job. + """ + pass \ No newline at end of file diff --git a/t3/runners/rmg_runner.py b/t3/runners/rmg_runner.py index ca2feac79..287dbdd2c 100644 --- a/t3/runners/rmg_runner.py +++ b/t3/runners/rmg_runner.py @@ -248,7 +248,7 @@ def run_rmg_in_local_queue(project_directory: str, return job_id -def rmg_runner(rmg_input_file_path: str, +def rmg_runner(rmg_input_file_path: str, job_log_path: str, logger: 'Logger', memory: Optional[int] = None, diff --git a/t3/settings/t3_settings.py b/t3/settings/t3_settings.py index a4f9b4c0a..99003ed1c 100644 --- a/t3/settings/t3_settings.py +++ b/t3/settings/t3_settings.py @@ -10,12 +10,12 @@ # or 'local', i.e., to be submitted to the server queue if running on a server. # If running on a local server, ARC's settings for ``local`` will be used. execution_type = { - 'rmg': 'local', + 'rmg': 'queue', 'arc': 'incore', } servers = { - 'local': { + 'azure': { 'cluster_soft': 'PBS', 'cpus': 16, 'max mem': 40, # GB diff --git a/t3/settings/t3_submit.py b/t3/settings/t3_submit.py index 9f17499c7..eade57692 100644 --- a/t3/settings/t3_submit.py +++ b/t3/settings/t3_submit.py @@ -3,7 +3,7 @@ """ # Submission scripts stored as a dictionary with software as the primary key. -submit_scripts = { +#submit_scripts = { # 'rmg': """#!/bin/bash -l # #SBATCH -J {name} # #SBATCH -t 05-00:00:00 @@ -57,11 +57,41 @@ # touch final_time # # """, - 'rmg': """#!/bin/bash -l +submit_scripts = { + 'rmg': { + 'Slurm': """#!/bin/bash -l +#SBATCH -p hpc +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH --cpus-per-task={cpus} +#SBATCH --mem={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +# Source +source /home/azureuser/.bashrc +source /etc/profile.d/00-aliases.sh + +# Export +export PATH=$PATH:/home/azureuser/Code/RMG-Py +export PYTHONPATH=$PYTHONPATH:/home/azureuser/Code/RMG-Py + +# It appears this is needed to keep the julia files writable... +sudo chmod 0777 -R /opt/mambaforge + +conda activate rmg_env + +touch initial_time + +python-jl /home/azureuser/Code/RMG-Py/rmg.py {max_iterations} -n {cpus} input.py + +touch final_time + +""", + 'PBS': """#!/bin/bash -l #PBS -N {name} -#PBS -q zeus_long_q -#PBS -l walltime=168:00:00 +#PBS -q zeus_combined_q #PBS -l select=1:ncpus={cpus} #PBS -o out.txt #PBS -e err.txt @@ -73,9 +103,10 @@ touch initial_time -python-jl $rmgpy_path/rmg.py -n {cpus} input.py{max_iterations} +python-jl ~/Code/RMG-Py/rmg.py -n {cpus} input.py {max_iterations} touch final_time """, -} + } +} \ No newline at end of file diff --git a/t3/utils/ssh.py b/t3/utils/ssh.py new file mode 100644 index 000000000..1081cc0db --- /dev/null +++ b/t3/utils/ssh.py @@ -0,0 +1,703 @@ +""" +A module for SSHing into servers. +Used for giving commands, uploading, and downloading files. + +Todo: + * delete scratch files of a failed job: ssh nodeXX; rm scratch/dhdhdhd/job_number +""" + +import datetime +import logging +import os +import time +from typing import Any, Callable, List, Optional, Tuple, Union +import stat + +import paramiko + +from arc.common import get_logger +from arc.exceptions import InputError, ServerError +from t3.imports import settings + + +logger = get_logger() + + +check_status_command, delete_command, list_available_nodes_command, servers, submit_command, submit_filenames = \ + settings['check_status_command'], settings['delete_command'], settings['list_available_nodes_command'], \ + settings['servers'], settings['submit_command'], settings['submit_filenames'] + + +def check_connections(function: Callable[..., Any]) -> Callable[..., Any]: + """ + A decorator designned for ``SSHClient``to check SSH connections before + calling a method. It first checks if ``self._ssh`` is available in a + SSHClient instance and then checks if you can send ``ls`` and get response + to make sure your connection still alive. If connection is bad, this + decorator will reconnect the SSH channel, to avoid connection related + error when executing the method. + """ + def decorator(*args, **kwargs) -> Any: + self = args[0] + if self._ssh is None: # not sure if some status may cause False + self._sftp, self._ssh = self._connect() + # test connection, reference: + # https://stackoverflow.com/questions/ + # 20147902/how-to-know-if-a-paramiko-ssh-channel-is-disconnected + # According to author, maybe no better way + try: + self._ssh.exec_command('ls') + except Exception as e: + logger.debug(f'The connection is no longer valid. {e}') + self.connect() + return function(*args, **kwargs) + return decorator + + +class SSHClient(object): + """ + This is a class for communicating with remote servers via SSH. + + Args: + server (str): The server name as specified in ARCs's settings file under ``servers`` as a key. + + Attributes: + server (str): The server name as specified in ARCs's settings file under ``servers`` as a key. + address (str): The server's address. + un (str): The username to use on the server. + key (str): A path to a file containing the RSA SSH private key to the server. + _ssh (paramiko.SSHClient): A high-level representation of a session with an SSH server. + _sftp (paramiko.sftp_client.SFTPClient): SFTP client used to perform remote file operations. + """ + def __init__(self, server: str = '') -> None: + if server == '': + raise ValueError('A server name must be specified') + if (server not in servers.keys() and server is not None): + raise ValueError(f'Server name "{server}" is invalid. Currently defined servers are: {list(servers.keys())}') + self.server = server + self.address = servers[server]['address'] + self.un = servers[server]['un'] + self.key = servers[server]['key'] + self._sftp = None + self._ssh = None + logging.getLogger("paramiko").setLevel(logging.WARNING) + + def __enter__(self) -> 'SSHClient': + self.connect() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback) -> None: + self.close() + + @check_connections + def _send_command_to_server(self, + command: Union[str, list], + remote_path: str = '', + ) -> Tuple[list, list]: + """ + A wrapper for exec_command in paramiko.SSHClient. Send commands to the server. + + Args: + command (Union[str, list]): A string or an array of string commands to send. + remote_path (Optional[str]): The directory path at which the command will be executed. + + Returns: Tuple[list, list]: + - A list of lines of standard output stream. + - A list of lines of the standard error stream. + """ + if isinstance(command, list): + command = '; '.join(command) + if remote_path != '': + # execute command in remote_path directory. + # Check remote path existence, otherwise the cmd will be invalid + # and even yield different behaviors. + # Make sure to change directory back after the command is executed + if self._check_dir_exists(remote_path): + command = f'cd "{remote_path}"; {command}; cd ' + else: + raise InputError( + f'Cannot execute command at given remote_path({remote_path})') + try: + _, stdout, stderr = self._ssh.exec_command(command) + except Exception as e: # SSHException: Timeout opening channel. + logger.debug(f'ssh timed-out in the first trial. Got: {e}') + try: # try again + _, stdout, stderr = self._ssh.exec_command(command) + except Exception as e: + logger.debug(f'ssh timed-out after two trials. Got: {e}') + return ['', ], ['ssh timed-out after two trials', ] + stdout = stdout.readlines() + stderr = stderr.readlines() + return stdout, stderr + + def upload_file(self, + remote_file_path: str, + local_file_path: str = '', + file_string: str = '', + ) -> None: + """ + Upload a local file or contents from a string to the remote server. + + Args: + remote_file_path (str): The path to write into on the remote server. + local_file_path (Optional[str]): The local file path to be copied to the remote location. + file_string (Optional[str]): The file content to be copied and saved as the remote file. + + Raises: + InputError: If both `local_file_path` or `file_string` are invalid, + or `local_file_path` does not exist. + ServerError: If the file cannot be uploaded with maximum times to try + """ + if not local_file_path and not file_string: + raise InputError('Cannot upload file to server. Either `file_string` or `local_file_path`' + ' must be specified') + if local_file_path and not os.path.isfile(local_file_path) and not os.path.isdir(local_file_path): + raise InputError(f'Cannot upload a non-existing file. ' + f'Check why file in path {local_file_path} is missing.') + + # If the directory does not exist, _upload_file cannot create a file based on the given path + if os.path.isdir(local_file_path): + remote_dir_path = remote_file_path + else: + remote_dir_path = os.path.dirname(remote_file_path) + if not self._check_dir_exists(remote_dir_path): + self._create_dir(remote_dir_path) + + try: + if file_string: + with self._sftp.open(remote_file_path, 'w') as f_remote: + f_remote.write(file_string) + + elif os.path.isdir(local_file_path): + for root, dirs, files in os.walk(local_file_path): + for file in files: + local_file_path = os.path.join(root, file) + remote_file_path = os.path.join(remote_dir_path, file) + self._sftp.put(localpath=local_file_path, + remotepath=remote_file_path) + else: + self._sftp.put(localpath=local_file_path, + remotepath=remote_file_path) + except IOError: + logger.debug(f'Could not upload file {local_file_path} to {self.server}!') + raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ') + + def download_file(self, + remote_file_path: str, + local_file_path: str, + ) -> None: + """ + Download a file from the server. + + Args: + remote_file_path (str): The remote path to be downloaded from. + local_file_path (str): The local path to be downloaded to. + + Raises: + ServerError: If the file cannot be downloaded with maximum times to try + """ + if not self._check_file_exists(remote_file_path): + # Check if a file exists + # This doesn't have a real impact now to avoid screwing up ESS trsh + # but introduce an opportunity for better troubleshooting. + # The current behavior is that if the remote path does not exist + # an empty file will be created at the local path + logger.debug(f'{remote_file_path} does not exist on {self.server}.') + try: + self._sftp.get(remotepath=remote_file_path, + localpath=local_file_path) + except IOError: + logger.warning(f'Got an IOError when trying to download file ' + f'{remote_file_path} from {self.server}') + + def download_folder(self, + remote_folder_path: str, + local_folder_path: str, + ) -> None: + """ + Download a file from the server. + + Args: + remote_file_path (str): The remote path to be downloaded from. + local_file_path (str): The local path to be downloaded to. + + Raises: + ServerError: If the file cannot be downloaded with maximum times to try + """ + if not self._check_dir_exists(remote_folder_path): + # Check if a file exists + # This doesn't have a real impact now to avoid screwing up ESS trsh + # but introduce an opportunity for better troubleshooting. + # The current behavior is that if the remote path does not exist + # an empty file will be created at the local path + logger.debug(f'{remote_folder_path} does not exist on {self.server}.') + try: + self._sftp.chdir(remote_folder_path) + items = self._sftp.listdir_attr() + + # Count the number of files to download for progress logging + total_files = sum(1 for item in items if stat.S_ISREG(item.st_mode)) + downloaded_files = 0 + + for item in items: + # Get the remote item's name and full path + filename = item.filename + remote_filepath = remote_folder_path + '/' + filename + local_filepath = os.path.join(local_folder_path, filename) + + + + # Download files + # stat.S_ISREG(item.st_mode) is True if item is a file (not a folder) + if stat.S_ISREG(item.st_mode): + self._sftp.get(remote_filepath, local_filepath) + downloaded_files += 1 + + # Log progress every 10% + progress = downloaded_files / total_files * 100 + if progress % 10 == 0: + logger.info(f'Downloaded {progress}% of files from {remote_folder_path}') + + # Recursively download folders + # stat.S_ISDIR(item.st_mode) is True if item is a folder + elif stat.S_ISDIR(item.st_mode): + # create the folder in the local path + os.makedirs(local_filepath, exist_ok=True) + # recursively download the folder items + self.download_folder(remote_filepath, local_filepath) + except IOError: + logger.warning(f'Got an IOError when trying to download file ' + f'{item} from {self.server}') + + @check_connections + def read_remote_file(self, remote_file_path: str) -> list: + """ + Read a remote file. + + Args: + remote_file_path (str): The remote path to be read. + + Returns: list + A list of lines read from the file. + """ + with self._sftp.open(remote_file_path, 'r') as f_remote: + content = f_remote.readlines() + return content + + def check_job_status(self, job_id: int) -> str: + """ + Check job's status. + + Args: + job_id (int): The job's ID. + + Returns: str + Possible statuses: `before_submission`, `running`, `errored on node xx`, + `done`, and `errored: ...` + """ + cmd = check_status_command[servers[self.server]['cluster_soft']] + stdout, stderr = self._send_command_to_server(cmd) + # Status line formats: + # OGE: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster' + # SLURM: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06' + if stderr: + logger.info('\n\n') + logger.error(f'Could not check status of job {job_id} due to {stderr}') + return f'errored: {stderr}' + return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) + + def delete_job(self, job_id: Union[int, str]) -> None: + """ + Deletes a running job. + + Args: + job_id (Union[int, str]): The job's ID. + """ + cmd = f"{delete_command[servers[self.server]['cluster_soft']]} {job_id}" + self._send_command_to_server(cmd) + + def delete_jobs(self, + jobs: Optional[List[Union[str, int]]] = None + ) -> None: + """ + Delete all of the jobs on a specific server. + + Args: + jobs (List[Union[str, int]], optional): Specific ARC job IDs to delete. + """ + jobs_message = f'{len(jobs)}' if jobs is not None else 'all' + print(f'\nDeleting {jobs_message} ARC jobs from {self.server}...') + + running_job_ids = self.check_running_jobs_ids() + for job_id in running_job_ids: + if jobs is None or str(job_id) in jobs: + self.delete_job(job_id) + print(f'deleted job {job_id}') + + def check_running_jobs_ids(self) -> list: + """ + Check all jobs submitted by the user on a server. + + Returns: list + A list of job IDs. + """ + if servers[self.server]['cluster_soft'].lower() not in ['slurm', 'oge', 'sge', 'pbs', 'htcondor']: + raise ValueError(f"Server cluster software {servers['local']['cluster_soft']} is not supported.") + running_job_ids = list() + cmd = check_status_command[servers[self.server]['cluster_soft']] + stdout = self._send_command_to_server(cmd)[0] + i_dict = {'slurm': 0, 'oge': 1, 'sge': 1, 'pbs': 4, 'htcondor': -1} + split_by_dict = {'slurm': ' ', 'oge': ' ', 'sge': ' ', 'pbs': '.', 'htcondor': ' '} + cluster_soft = servers[self.server]['cluster_soft'].lower() + for i, status_line in enumerate(stdout): + if i > i_dict[cluster_soft]: + job_id = status_line.lstrip().split(split_by_dict[cluster_soft])[0] + job_id = job_id.split('.')[0] if '.' in job_id else job_id + running_job_ids.append(job_id) + return running_job_ids + + def submit_job(self, remote_path: str, + recursion: bool = False, + ) -> Tuple[Optional[str], Optional[str]]: + """ + Submit a job to the server. + + Args: + remote_path (str): The remote path contains the input file and the submission script. + recursion (bool, optional): Whether this call is within a recursion. + + Returns: Tuple[str, int] + - A string indicate the status of job submission. + Either `errored` or `submitted`. + - The job ID of the submitted job. + """ + job_status = '' + job_id = 0 + cluster_soft = servers[self.server]['cluster_soft'] + cmd = f'{submit_command[cluster_soft]} {submit_filenames[cluster_soft]}' + stdout, stderr = self._send_command_to_server(cmd, remote_path) + if len(stderr) > 0 or len(stdout) == 0: + logger.warning(f'Got stderr when submitting job:\n{stderr}') + job_status = 'errored' + for line in stderr: + if 'Requested node configuration is not available' in line: + logger.warning('User may be requesting more resources than are available. Please check server ' + 'settings, such as cpus and memory, in ARC/arc/settings/settings.py') + if cluster_soft.lower() == 'slurm' and 'AssocMaxSubmitJobLimit' in line: + logger.warning(f'Max number of submitted jobs was reached, sleeping...') + time.sleep(5 * 60) + self.submit_job(remote_path=remote_path, recursion=True) + if recursion: + return None, None + elif cluster_soft.lower() in ['oge', 'sge'] and stdout and 'submitted' in stdout[0].lower(): + job_id = stdout[0].split()[2] + elif cluster_soft.lower() == 'slurm' and stdout and 'submitted' in stdout[0].lower(): + job_id = stdout[0].split()[3] + elif cluster_soft.lower() == 'pbs': + job_id = stdout[0].split('.')[0] + elif cluster_soft.lower() == 'htcondor' and stdout and 'submitting' in stdout[0].lower(): + # Submitting job(s). + # 1 job(s) submitted to cluster 443069. + if len(stdout) and len(stdout[1].split()) and len(stdout[1].split()[-1].split('.')): + job_id = stdout[1].split()[-1][:-1] + else: + raise ValueError(f'Unrecognized cluster software: {cluster_soft}') + job_status = 'running' if job_id else job_status + return job_status, job_id + + def connect(self) -> None: + """ + A modulator function for _connect(). Connect to the server. + + Raises: + ServerError: Cannot connect to the server with maximum times to try + """ + times_tried = 0 + max_times_to_try = 1440 # continue trying for 24 hrs (24 hr * 60 min/hr)... + interval = 60 # wait 60 sec between trials + while times_tried < max_times_to_try: + times_tried += 1 + try: + self._sftp, self._ssh = self._connect() + except Exception as e: + if not times_tried % 10: + logger.info(f'Tried connecting to {self.server} {times_tried} times with no success...' + f'\nGot: {e}') + else: + print(f'Tried connecting to {self.server} {times_tried} times with no success...' + f'\nGot: {e}') + else: + logger.debug(f'Successfully connected to {self.server} at the {times_tried} trial.') + return + time.sleep(interval) + raise ServerError(f'Could not connect to server {self.server} even after {times_tried} trials.') + + def _connect(self) -> Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient]: + """ + Connect via paramiko, and open an SSH session as well as a SFTP session. + + Returns: Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient] + - An SFTP client used to perform remote file operations. + - A high-level representation of a session with an SSH server. + """ + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.load_system_host_keys() + try: + # If the server accepts the connection but the SSH daemon doesn't respond in + # 15 seconds (default in paramiko) due to network congestion, faulty switches, + # etc..., common solution is enlarging the timeout variable. + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200, key_filename=self.key) + except paramiko.ssh_exception.SSHException: + # This sometimes gives "SSHException: Error reading SSH protocol banner[Error 104] Connection reset by peer" + # Try again: + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200, key_filename=self.key) + sftp = ssh.open_sftp() + return sftp, ssh + + def close(self) -> None: + """ + Close the connection to paramiko SSHClient and SFTPClient + """ + if self._sftp is not None: + self._sftp.close() + if self._ssh is not None: + self._ssh.close() + + @check_connections + def get_last_modified_time(self, + remote_file_path_1: str, + remote_file_path_2: Optional[str] = None, + ) -> Optional[datetime.datetime]: + """ + Returns the last modified time of ``remote_file_path_1`` if the file exists, + else returns the last modified time of ``remote_file_path_2`` if the file exists. + + Args: + remote_file_path_1 (str): The remote path to file 1. + remote_file_path_2 (str, optional): The remote path to file . + + Returns: datetime.datetime + The last modified time of the file. + """ + timestamp = None + try: + timestamp = self._sftp.stat(remote_file_path_1).st_mtime + except IOError: + pass + if timestamp is None and remote_file_path_2 is not None: + try: + timestamp = self._sftp.stat(remote_file_path_2).st_mtime + except IOError: + return None + return datetime.datetime.fromtimestamp(timestamp) if timestamp is not None else None + + def list_dir(self, remote_path: str = '') -> list: + """ + List directory contents. + + Args: + remote_path (str, optional): The directory path at which the command will be executed. + """ + command = 'ls -alF' + return self._send_command_to_server(command, remote_path)[0] + + def find_package(self, package_name: str) -> list: + """ + Find the path to the package. + + Args: + package_name (str): The name of the package to search for. + """ + command = f'. ~/.bashrc; which {package_name}' + return self._send_command_to_server(command)[0] + + def list_available_nodes(self) -> list: + """ + List available nodes on the server. + + Returns: + list: lines of the node hostnames. + """ + cluster_soft = servers[self.server]['cluster_soft'] + if cluster_soft.lower() == 'htcondor': + return list() + cmd = list_available_nodes_command[cluster_soft] + stdout = self._send_command_to_server(command=cmd)[0] + nodes = list() + if cluster_soft.lower() in ['oge', 'sge']: + # Stdout line example: + # long1@node01.cluster BIP 0/0/8 -NA- lx24-amd64 aAdu + nodes = [line.split()[0].split('@')[1] + for line in stdout if '0/0/8' in line] + elif cluster_soft.lower() == 'slurm': + # Stdout line example: + # node01 alloc 1.00 none + nodes = [line.split()[0] for line in stdout + if line.split()[1] in ['mix', 'alloc', 'idle']] + elif cluster_soft.lower() in ['pbs', 'htcondor']: + logger.warning(f'Listing available nodes is not yet implemented for {cluster_soft}.') + return nodes + + def change_mode(self, + mode: str, + file_name: str, + recursive: bool = False, + remote_path: str = '', + ) -> None: + """ + Change the mode of a file or a directory. + + Args: + mode (str): The mode change to be applied, can be either octal or symbolic. + file_name (str): The path to the file or the directory to be changed. + recursive (bool, optional): Whether to recursively change the mode to all files + under a directory.``True`` for recursively change. + remote_path (str, optional): The directory path at which the command will be executed. + """ + if os.path.isfile(remote_path): + remote_path = os.path.dirname(remote_path) + recursive = ' -R' if recursive else '' + command = f'chmod{recursive} {mode} {file_name}' + self._send_command_to_server(command, remote_path) + + def remove_dir(self, remote_path: str) -> None: + """ + Remove a directory on the server. + Args: + remote_path (str): The path to the directory to be removed on the remote server. + """ + command = f'rm -r "{remote_path}"' + _, stderr = self._send_command_to_server(command) + if stderr: + raise ServerError( + f'Cannot remove dir for the given path ({remote_path}).\nGot: {stderr}') + + def _check_file_exists(self, + remote_file_path: str, + ) -> bool: + """ + Check if a file exists on the remote server. + + Args: + remote_file_path (str): The path to the file on the remote server. + + Returs: + bool: Whether the file exists on the remote server. ``True`` if it exists. + """ + command = f'[ -f "{remote_file_path}" ] && echo "File exists"' + stdout, _ = self._send_command_to_server(command, remote_path='') + if len(stdout): + return True + else: + return False + + def _check_dir_exists(self, + remote_dir_path: str, + ) -> bool: + """ + Check if a directory exists on the remote server. + + Args: + remote_dir_path (str): The path to the directory on the remote server. + + Returns: + bool: Whether the directory exists on the remote server. ``True`` if it exists. + """ + command = f'[ -d "{remote_dir_path}" ] && echo "Dir exists"' + stdout, _ = self._send_command_to_server(command) + if len(stdout): + return True + else: + return False + + def _create_dir(self, remote_path: str) -> None: + """ + Create a new directory on the server. + + Args: + remote_path (str): The path to the directory to create on the remote server. + """ + command = f'mkdir -p "{remote_path}"' + _, stderr = self._send_command_to_server(command) + if stderr: + raise ServerError( + f'Cannot create dir for the given path ({remote_path}).\nGot: {stderr}') + + +def check_job_status_in_stdout(job_id: int, + stdout: Union[list, str], + server: str, + ) -> str: + """ + A helper function for checking job status. + + Args: + job_id (int): the job ID recognized by the server. + stdout (Union[list, str]): The output of a queue status check. + server (str): The server name. + + Returns: + str: The job status on the server ('running', 'done', or 'errored'). + """ + if not isinstance(stdout, list): + stdout = stdout.splitlines() + for status_line in stdout: + if str(job_id) in status_line: + if servers[server]['cluster_soft'].lower() == 'slurm': + status = status_line.split()[4] + status_time = status_line.split()[5] + minutes, seconds = map(int, status_time.split(':')) + node_id = status_line.split()[7] + # Sometimes the node has stopped responding during configuration + # Usually a node takes approx 10 mins to configure. We shall wait for 15 mins + if status.lower() == 'cf' and minutes >= 15: + # Run a command to check if the node is still responding + with SSHClient(server) as ssh: + stdout, _ = ssh._send_command_to_server(f'scontrol show node {node_id}', remote_path='') + if 'NOT_RESPONDING' in stdout: + return 'errored' + if status.lower() in ['r', 'qw', 't', 'cg', 'pd','cf']: + return 'running' + elif status.lower() in ['bf', 'ca', 'f', 'nf', 'st', 'oom']: + return 'errored' + elif servers[server]['cluster_soft'].lower() == 'pbs': + status = status_line.split()[-2] + if status.lower() in ['r', 'q', 'c', 'e', 'w']: + return 'running' + elif status.lower() in ['h', 's']: + return 'errored' + elif servers[server]['cluster_soft'].lower() in ['oge', 'sge']: + status = status_line.split()[4] + if status.lower() in ['r', 'qw', 't']: + return 'running' + elif status.lower() in ['e']: + return 'errored' + elif servers[server]['cluster_soft'].lower() == 'htcondor': + return 'running' + else: + raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}') + + + return 'done' + +def delete_all_arc_jobs(server_list: list, + jobs: Optional[List[str]] = None, + ) -> None: + """ + Delete all ARC-spawned jobs (with job name starting with `a` and a digit) from :list:servers + (`servers` could also be a string of one server name) + Make sure you know what you're doing, so unrelated jobs won't be deleted... + Useful when terminating ARC while some (ghost) jobs are still running. + + Args: + server_list (list): List of servers to delete ARC jobs from. + jobs (Optional[List[str]]): Specific ARC job IDs to delete. + """ + if isinstance(server_list, str): + server_list = [server_list] + for server in server_list: + with SSHClient(server) as ssh: + ssh.delete_jobs(jobs) + if server_list: + print('\ndone.')