Source code for schrodinger.application.matsci.jobutils

"""
Copyright Schrodinger, LLC. All rights reserved.
"""

import argparse
import contextlib
import os
import os.path
import random
import re
import shutil
import sys
import tarfile
import zipfile
from collections import OrderedDict
from collections import defaultdict

import psutil

import schrodinger
from schrodinger import gpgpu
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants as dconst
from schrodinger.application.matsci import coarsegrain
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import property_names
from schrodinger.application.matsci import textlogger
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import launchapi
from schrodinger.job import launcher
from schrodinger.job import launchparams
from schrodinger.job import queue
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.structutils import minimize
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import fileutils
from schrodinger.utils import license

FLAG_KEYWORDS = '-keywords'

SAVE_FLAG = "-save_trj_data"
SAVE_NONE = "none"
SAVE_CMS = "cms"
SAVE_TRJ = "trj"
SAVE_FLAG_OPTS = [SAVE_NONE, SAVE_CMS, SAVE_TRJ]
FLAG_COMBINE_TRJ = '-combine_trj'

AMCELL_NO_SYSTEM_OUT = '-amcell.maegz'
FLAG_MONOMER_CHARGE = '-monomer_ff_q'
FLAG_MD_TIME = '-md_time'  # In ns or ps
FLAG_MD_TIMESTEP = '-md_timestep'  # In fs
FLAG_MD_TRJ_INT = '-md_trj_int'  # In ps
FLAG_MD_ENEGRP_INT = '-md_enegrp_int'  # In ps
FLAG_MD_ENESEQ_INT = '-md_eneseq_int'  # In ps
FLAG_MD_TEMP = '-md_temp'  # In K
FLAG_MD_PRESS = '-md_press'  # In bar
FLAG_MD_ENSEMBLE = '-md_ensemble'
FLAG_MD_ISOTROPY = '-md_isotropy'
FLAG_GPU = '-gpu'
SPLIT_COMPONENTS_FLAG = '-split_components'
FLAG_FORCEFIELD = '-forcefield'
FLAG_PTENSOR_AVG = '-ptensor_avg'

FLAG_RANDOM_SEED = '-seed'

# Subjob compression
FLAG_COMPRESS_SUBJOBS = '-compress_subjobs'

WATER_FF_TYPE_FLAG = '-water_fftype'

MD_FLAGS_DEFAULTS = {
    FLAG_MD_TEMP: 300.0,
    FLAG_MD_PRESS: 1.01325,
    FLAG_MD_TIME: 100.0,
    FLAG_MD_TIMESTEP: 2.0,
    FLAG_MD_TRJ_INT: 10.0,
    FLAG_MD_ENEGRP_INT: 10.0,
    FLAG_MD_ENESEQ_INT: 10.0,
    FLAG_MD_ENSEMBLE: desmondutils.ENSEMBLE_NVT,
    FLAG_MD_ISOTROPY: dconst.IsotropyPolicy.ISOTROPIC,
    WATER_FF_TYPE_FLAG: desmondutils.SPC,
    SPLIT_COMPONENTS_FLAG: False,
    FLAG_RANDOM_SEED: parserutils.RANDOM_SEED_DEFAULT,
    SAVE_FLAG: SAVE_NONE,
    FLAG_FORCEFIELD: mm.OPLS_NAME_F14
}

DRIVER = 'driver'
ARGS = 'args'

RESTART_PARAMETERS_FILENAME = 'parameters.cmd'
FLAG_USEZIPDATA = '-use_zip_data'
FLAG_RESTART_PROJ = '-restart_proj'
FLAG_RESTART_DISP = '-restart_disp'
FLAG_RESTART_VIEWNAME = '-restart_viewname'
FLAG_RESTART_HOST = '-restart_host'
FLAG_RESTART_JOBNAME = '-restart_jobname'
FLAG_CONFIG_YAML = '-config_yaml'

RESTART_PROGRAM_NAME = 'RestartWorkflow'
RESTART_DEFAULT_JOBNAME = 'restart_workflow'

CLEAN_AND_UNIQUIFY_MAX_LEN = 100

DOLLARS = re.compile(r'([^\\])\$')

SOURCE_PATH_PROP = property_names.SOURCE_PATH_PROP

DEFAULT_SEED_HELP = "Seed for the random number generator used in simulations."

TGZ_FORMAT = tarfile.PAX_FORMAT

FROM_SUBJOB = 'from_subjob'
LOG_TAG = 'log_tag'

CHECKED_OUT_MATSCI_MAIN = None

WAM_TYPES = wam.WorkflowType

# TPP
FLAG_TPP = '-TPP'
DEFAULT_TPP = 1


[docs]def get_logging_tag(job, tag=LOG_TAG): """ Get the logging tag of a job. :param job: a job object :type job: `schrodinger.job.queue.JobControlJob` :param tag: the job attribute containing the info. :type tag: str :return: the logging tag :rtype: str """ return getattr(job, tag, "")
[docs]def add_outfile_to_backend(file_fn, backend=None, set_structure_output=False, stream=False): """ Add output file to the backend. :type file_fn: str :param file_fn: Filename :type backend: `schrodinger.job._Backend` or None :param backend: Backend handle. If None, a backend will be checked for. If no backend is found, nothing will be done. :type set_structure_output: bool :param set_structure_output: If True, set this structure as output :type stream: bool :param stream: If True, stream the file to the submission host """ if not backend: backend = jobcontrol.get_backend() if not backend: return if stream: backend.addLogFile(file_fn) else: backend.addOutputFile(file_fn) if set_structure_output: backend.setStructureOutputFile(file_fn)
[docs]def log_structures_found(qjob, structure_files, log, jobstates_for_logging=None): """ Log structures info for a job. :type qjob: `schrodinger.job.queue.JobControlJob` :param qjob: The subjob to find structures :type structure_files: dict :param structure_files: Keys are subjobs, values are sets of structure file names :type log: callable :param log: function(msg) writes msg to log file :type jobstates_for_logging: None or list of str :param jobstates_for_logging: Log info for subjobs in these states """ if jobstates_for_logging is None: jobstates_for_logging = {queue.JobState.DONE, queue.JobState.FAILED} if qjob.state not in jobstates_for_logging: return sfiles = structure_files[qjob] # When logging information about a job object, if the object has an attribute # named the module constant LOG_TAG, the value of that attribute will precede # the rest of the log message. This enables log file lines to be attributed # to specific jobs. msg = get_logging_tag(qjob) if sfiles: msg += f"Found completed structures: {', '.join(sfiles)}" else: jobname = qjob.name if qjob.name else ' '.join(qjob._command) msg += f'No output structure found for {jobname}' log(msg)
[docs]def run_jobdj_and_add_files(jobdj, log, expect_exts=None, exclude_exts=None, jobdj_dir=os.curdir): """ Run the subjobs currently queued up in the jobdj, adding their files to the current backend for copying back to the job directory and locating the expected structure output file for each job. :type jobdj: `schrodinger.job.queue.JobDJ` :param jobdj: The JobDJ with queued up jobs to run :type log: callable :param log: function(msg) writes msg to log file :type expect_exts: None or list of str :param expect_exts: The expected extensions of the output files :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger :type jobdj_dir: str :param jobdj_dir: jobdj_dir is the relative path from where the backend is created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2 as an example, normally backend and jobdj are created in /scr/user/jobname/, and thus jobdj_dir by default is os.curdir. If the jobdj is created inside /scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/, jobdj is in /scr/user/jobname/subdir1, and the subjob can run in /scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2) :rtype: list :return: A list of structure output file names, sorted alphabetically """ # Run all jobs and make sure we grab their output backend = jobcontrol.get_backend() structure_files = defaultdict(set) completed_jobs = set() if expect_exts is None: expect_exts = ['.cms', '.mae', '.zip'] if exclude_exts is None: exclude_exts = [] def _finalize(qjob, force=False): """ Process status changes for a JobDJ job. Log relevant information for completed jobs. :param qjob: JobDJ's snapshot of job state at status change :type qjob: schrodinger.job.queue.BaseJob :param bool force: Whether to consider jobs complete even if job.isComplete() returns False """ if not isinstance(qjob, queue.JobControlJob): return # The status change could be to RUNNING if not qjob.hasExited(): return # We attempt to finalize each job as it is retryable failed, failed # or done so that its files are copied back even if the subjob fails # or the parent job gets killed before all subjobs finish if finalize_subjob( qjob, backend, structure_files, expect_exts, log, exclude_exts=exclude_exts, jobdj_dir=jobdj_dir) or force: completed_jobs.add(qjob) # Log information about results found and not found for completed # Done and Failed subjobs log_structures_found(qjob, structure_files, log) # status_change_callback is called every time a job status changes. For # example, RUNNING->DONE. jobdj.run(status_change_callback=_finalize) for qjob in jobdj.all_jobs: if qjob in completed_jobs: continue # If multiple jobs complete simulatenously, there may be some jobs missing # from the status_change_callback. Also jobs might be returned before # isComplete returns True. This should catch all these caes. _finalize(qjob, force=True) # Sorting ensures that structures for similar systems appear near each # other in the PT after incorporation output_st_files = [] for subjob_outfiles in structure_files.values(): output_st_files += [ os.path.basename(cmd_dir_filename) for cmd_dir_filename in subjob_outfiles ] output_st_files.sort() return output_st_files
[docs]def finalize_subjob(subjob, backend, structure_files, structure_extensions, log, exclude_exts=None, jobdj_dir=os.curdir): """ Mark subjob output and log files for copying back to the job directory, and find the structure output file if there is one :type subjob: `schrodinger.job.queue.JobControlJob` or None :param subjob: The subjob to mark files from :type backend: `schrodinger.job.jobcontrol._Backend` or None :param backend: The current backend or None if there isn't one :type structure_files: dict :param structure_files: If an output structure file is found, it will be added to this dict. Keys are subjobs, values are sets of structure file names :type structure_extensions: list of str :param structure_extensions: The expected extension of the structure files :type log: function :param log: function(msg) writes msg to log file :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger :type jobdj_dir: str :param jobdj_dir: jobdj_dir is the relative path from where the backend is created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2 as an example, normally backend and jobdj are created in /scr/user/jobname/, and thus jobdj_dir by default is os.curdir. If the jobdj is created inside /scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/, jobdj is in /scr/user/jobname/subdir1, and the subjob can run in /scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2) :rtype: bool :return: True if the job has completed, False if not """ ajob = subjob.getJob() if not ajob: # Job has not been submitted yet, or is just in that process return if exclude_exts is None: exclude_exts = [] outfiles = ajob.OutputFiles if hasattr(subjob, 'outfiles'): # Permittivity workflow may set outfiles outside the subjob and this # combines the standard ajob.OutputFiles with the customized job.outfiles outfiles += subjob.outfiles sub_dir = subjob.getCommandDir() path = sub_dir if jobdj_dir == os.curdir else os.path.join( jobdj_dir, sub_dir) add_subjob_files_to_backend( ajob, path=path, backend=backend, exclude_exts=exclude_exts) for filename in outfiles: if sub_dir: filename = os.path.join(sub_dir, filename) if not os.path.exists(filename): continue if any([filename.endswith(x) for x in exclude_exts]): continue extension = fileutils.splitext(filename)[1] for structure_extension in structure_extensions: if extension.startswith(structure_extension): structure_files[subjob].add(filename) # Completed means finished running. It does not mean success or failure. # When logging information about a job object, if the object has an attribute # named the module constant LOG_TAG, the value of that attribute will precede # the rest of the log message. This enables log file lines to be attributed # to specific jobs. if ajob.isComplete(): msg = get_logging_tag(subjob) msg += f'Job {ajob.Name} completed' log(msg) return True else: return False
[docs]def add_subjob_files_to_backend(subjob, path=None, backend=None, exclude_exts=None): """ Add all the output and log files from a subjob to the backend of this job so that they get copied back to the original job directory. :note: subjob log files are added as output files instead of log files. They will not be streamed back to the original job directory but instead copied back at the end of this job like a normal output file. :type subjob: `schrodinger.job.jobcontrol.Job` or `schrodinger.job.queue.JobControlJob` :param subjob: The subjob to add files from. :type path: str :param path: The path to the subjob directory from where the backend is created if it was not run in the same directory as this job. Use `FROM_SUBJOB` to get the subjob directory from a JobControlJob object - this will be ignored if subjob is a Job object. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The backend if one exists :type exclude_exts: None or list of str :param exclude_exts: The output files found with the excluded extensions are not copied back to the original job directory or documented into logger """ if not backend: backend = jobcontrol.get_backend() if not backend: return if isinstance(subjob, queue.JobControlJob): if path == FROM_SUBJOB: path = subjob.getCommandDir() subjob = subjob.getJob() if not subjob: return if exclude_exts is None: exclude_exts = [] # Subjob log files may have already been registered with this backend as # a log (streaming) file so that the subjob log gets streamed back # into the original job directory. It is an error to register a file # with the same backend as both a streaming and output file. We'll check # against the list of streaming log files to avoid this error. this_job_logfiles = set(backend.getJob().LogFiles) for filename in subjob.OutputFiles + subjob.LogFiles: if any([filename.endswith(x) for x in exclude_exts]): continue if path: filename = os.path.join(path, filename) # Trying to register a log file as an out file both raises a traceback # and prints messages to the log file, so we do a pre-check rather than # a try/except to avoid the log file messages if filename not in this_job_logfiles: backend.addOutputFile(filename)
[docs]def determine_source_path(backend=None, job=None): """ Determine the original job directory. This is obtained from the job control Job object for this process. If no Job object is found, the current directory is used. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The job control backend. Will be used to obtain the job object if no job is supplied. If neither backend or job are supplied, the backend will be obtained from job control (if one exists). :type job: `schrodinger.job.jobcontrol.Job` :param job: The job control job for this process. If not supplied, will be obtained from the backend (if one exists). :rtype: str :return: The directory that is the source path. Will be either the OrigLaunchDir property of the job or the current directory if not running under job control. """ if not job: if not backend: backend = jobcontrol.get_backend() if backend: job = backend.getJob() else: # Not running under job control, so running in the local directory return os.getcwd() try: sourcedir = job.OrigLaunchDir except AttributeError: # We don't know that this could ever happen, but just being safe sourcedir = "" return sourcedir
[docs]def set_source_path(struct, backend=None, job=None, path=None): """ Set the source path property to the original job directory. This is obtained from the job control Job object for this process. If no Job object is found, the current directory is used. :type struct: `schrodinger.structure.Structure` :param struct: The structure to set the property on. Note that property setting works properly on both Structure and Cms objects. :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The job control backend. Will be used to obtain the job object if no job is supplied. If neither backend or job are supplied, the backend will be obtained from job control (if one exists). :type job: `schrodinger.job.jobcontrol.Job` :param job: The job control job for this process. If not supplied, will be obtained from the backend (if one exists). :param str path: Manually set this path to the source directory, overriding all other options :rtype: str :return: The directory set as the source path. Will be either the OrigLaunchDir property of the job or the current directory if not running under job control. """ if path: sourcedir = path else: sourcedir = determine_source_path(backend=backend, job=job) # Note - deleting the property first is a workaround for SHARED-6890 just in # case we are working in Maestro if isinstance(struct, cms.Cms): desmondutils.delete_cms_property(struct, SOURCE_PATH_PROP) desmondutils.add_cms_property(struct, SOURCE_PATH_PROP, sourcedir) else: struct.property.pop(SOURCE_PATH_PROP, None) struct.property[SOURCE_PATH_PROP] = sourcedir return sourcedir
[docs]def get_source_path(source, existence_check=True): """ Get the source path to the original job directory :type source: `schrodinger.structure.Structure` or `schrodinger.project.ProjectRow` :param source: Either the ProjectRow or the structure to obtain the source information from. If a structure, can be either a Structure or a Cms object. :type existence_check: bool :param existence_check: If True (default), a blank string will be returned if the source path does not exist. If False, the path is returned regardless of whether it exists or not. :rtype: str :return: The original job directory or a blank string if none is found """ try: path = source.property.get(SOURCE_PATH_PROP, "") except (AttributeError, TypeError): # This is neither a Structure nor a ProjectRow raise TypeError('source must be a Structure or ProjectRow') if path and (not existence_check or os.path.exists(path)): # Add on any job subdirectory for this structure subdir = source.property.get(property_names.SUBDIRECTORY_PROP) if subdir: subpath = os.path.join(path, subdir) if (not existence_check or os.path.exists(subpath)): path = subpath return path return ""
[docs]def get_file_path(struct, prop): """ Get the path of the file defined by source path and property name :param struct: The structure whose property is checked :type struct: `schrodinger.structure.Structure` :param prop: property pointing to a file :type prop: str :return: path of the file if found :rtype: str or None """ filename = struct.property.get(prop) if not filename: return source_path = get_source_path(struct) filepath = os.path.join(source_path, filename) if not os.path.isfile(filepath): return return filepath
[docs]def prepare_job_spec_builder(argv, program_name, default_jobname, input_fn=None, set_driver_reserves_cores=False, schrodinger_product=None): """ Prepare generic job specification builder. If set_driver_reserves_cores script (driver) is set to True, script is expected to use all the cores (cpus), similar to umbrella mode in multisim. For an example see stress-strain driver. For all other cases (such as in opto/hopping/amorphous) keep set_driver_reserves_cores to False. :type argv: list :param argv: The list of command line arguments, including the script name at [0], similar to that returned by sys.argv :type program_name: str :param program_name: Program name :type default_jobname: str :param default_jobname: Default job name :type input_fn: str :param input_fn: Input filename :type set_driver_reserves_cores: bool :param set_driver_reserves_cores: If True, enable launchapi.setDriverReservesCores :type schrodinger_product: str :param schrodinger_product: A product directory to search for the script/executable. This should be the name of a directory under SCHRODINGER without the trailing version (i.e. the "-v*" part). :rtype: `launchapi.JobSpecificationArgsBuilder` :return: Job specification builder object """ job_builder = launchapi.JobSpecificationArgsBuilder( argv, use_jobname_log=True, schrodinger_product=schrodinger_product, program_name=program_name, default_jobname=default_jobname) if input_fn and os.path.isfile(input_fn): job_builder.setInputFile(input_fn) if set_driver_reserves_cores: job_builder.setDriverReservesCores(True) return job_builder
[docs]def add_folder_to_job_builder(job_builder, folder_path): """ Add folder (trajectory folder) to job directory. :type job_builder: `launchapi.JobSpecificationArgsBuilder` :param job_builder: Job specification builder object. :type folder_path: str :param folder_path: Full path to the folder that needs to copied. """ file_path = fileutils.get_files_from_folder(folder_path) for (abs_pathname, runtime_path) in file_path: job_builder.setInputFile(abs_pathname, runtime_path=runtime_path)
[docs]def get_stripped_defaults(adict): """ Remove dashes in the beginning of the keys. Used with ArgumentParser. :type adict: dict{string:any} :param adict: Dictionary of defaults :rtype: dict{string:any} @rparam: Dictionary of defaults with dashed stripped from the keys """ ret = {} for key, val in adict.items(): if key.startswith('-'): key = key[1:] ret[key] = val return ret
[docs]def argparser_set_defaults(parser, defaults): """ Set default values to the arguments already present in the parser. Note that, parser.set_defaults contaminates namespace with the arguments that might be absent from the parser. :type parser: `argparse.ArgumentParser` :param parser: The parser to set defaults :type defaults: dict{string:any} :param defaults: Dictionary of the defaults """ defaults = get_stripped_defaults(defaults) # See cpython/Lib/argparse.py :: set_defaults for action in parser._actions: if action.dest in defaults: parser._defaults[action.dest] = defaults[action.dest] action.default = defaults[action.dest]
[docs]def parse_restart_parameters_file(path): """ Parse parameters file. Format of the file: 1st line is driver's filename 2nd line is original arguments passed to the driver :type path: str :param path: Path to the file with original arguments :rtype: dict :return: Dictionary with parsed values """ params = {} with open(path, 'r') as pfile: params[DRIVER] = pfile.readline().strip() params[ARGS] = pfile.readline().strip().split() return params
[docs]def write_restart_parameters_file(driver, args, outpath): """ Write out original arguments to parameters file and add the file as an output file to any existing jobcontrol backend. :type driver: str :param driver: Driver's filename :type args: list :param args: Original arguments passed to driver :type outpath: str :param outpath: Path to the parameters file to write original arguments """ args_str = ' '.join(args) # Remove all occurrences of -HOST/-host in args args_str = re.sub(r'(?i)-HOST [^\s]+', '', args_str).strip() # Remove all occurrences of -TPP in args args_str = re.sub(r'(?i)%s [^\s]+' % '-TPP', '', args_str).strip() # For now, remove all occurrences of FLAG_USEZIPDATA in args args_str = re.sub(r'(?i)%s [^\s]+' % FLAG_USEZIPDATA, '', args_str).strip() with open(outpath, 'w') as outfile: outfile.write('%s\n' % driver) outfile.write(args_str + '\n') backend = jobcontrol.get_backend() if backend: backend.addOutputFile(outpath)
[docs]def get_restart_id_filename(jobname): """ For the given restart jobname, return the name of the file containing the job id. :rtype: str :return: The name of the restart jobid file """ return jobname + '.jobid'
[docs]def extract_job_data(options): """ Unzip the archive in the current directory. All the error handling is on the caller function. :type options: `argparse.Namespace` :param options: The object holding all the option values """ path = get_option(options, FLAG_USEZIPDATA) if path: with tarfile.open(name=path, mode='r:gz', format=TGZ_FORMAT) as tar: tar.extractall()
[docs]def archive_job_data(path, files_path): """ Create a gzipped tar archive in the current directory from the provided list of file paths. All the error handling is on the caller function. :type path: path :param path: Path to the new archive to be created :type path: files_path :param path: List of files to be archived """ with tarfile.open(name=path, mode='w:gz', format=TGZ_FORMAT) as tar: for file_path in set(files_path): tar.add(file_path)
[docs]def create_restart_launcher(script, prog, input_name, output_name, zip_name, options, args): (viewname, disp, proj, host, jobname) = get_restart_options(options) scriptlauncher = launcher.Launcher( script=script, jobname=jobname, viewname=viewname, disp=disp, proj=proj, prog=prog, copyscript=False, runtoplevel=True) args = ['-use_zip_data', zip_name] + args args = ['-HOST', host] + args scriptlauncher.addScriptArgs(args) scriptlauncher.addInputFile(input_name) scriptlauncher.addInputFile(zip_name) scriptlauncher.addOutputFile(output_name) scriptlauncher.setStructureOutputFile(output_name) # No need to update the parameters file from the restart return scriptlauncher
[docs]def create_restart_jobcmd(driver_path, zip_fn, restart_options, args, default_jobname): """ Generate command-line list for the job restart. :type driver_path: str :param driver_path: Path to the driver :type zip_fn: str :param zip_fn: Filename of the archive with all restart files :type restart_options: `argparse.Namespace` :param restart_options: The object holding all the option values :type args: list :param args: List of the arguments passed to the original run :type default_jobname: str :param default_jobname: Default job name :rtype: list @rparam: List of parameters ready for job submission """ (viewname, disp, proj, host, jobname) = get_restart_options(restart_options) # Prepending args[:0] = [FLAG_USEZIPDATA, zip_fn] if host: args[:0] = ['-HOST', host] if proj: args[:0] = ['-PROJ', proj] if disp: args[:0] = ['-DISP', disp] if viewname: args[:0] = ['-VIEWNAME', viewname] if jobname: args[:0] = ['-JOBNAME', jobname] else: args[:0] = ['-JOBNAME', default_jobname] args = ['$SCHRODINGER/run', driver_path] + args return args
[docs]def write_idfile(jobobj): """ Store the job id in a file as a signal to the GUI that a new job has been launched. :type jobobj: `schrodinger.job.jobcontrol.Job` :param jobobj: The object holding all the option values """ filename = get_restart_id_filename(jobobj.name) idfile = open(filename, 'w') idfile.write(jobobj.jobid) idfile.close()
[docs]def get_option(options, flag): """ Return the option value associated with flag :type options: `argparse.Namespace` :param options: The object holding all the option values :type flag: str :param flag: The flag for the desired option. :rtype: any :return: The value associated with flag, or None if flag (minus any leading dashes) is not found as a property on options """ if flag.startswith('-'): flag = flag[1:] try: return getattr(options, flag) except AttributeError: return None
[docs]def set_option(options, flag, value): """ Set the option value associated with flag :type options: `argparse.Namespace` :param options: The object holding all the option values :type flag: str :param flag: The flag for the desired option. If the string starts with a '-', the '-' is removed. :type value: any :param value: The value to set the option.flag value to """ if flag.startswith('-'): flag = flag[1:] setattr(options, flag, value)
[docs]def add_restart_parser_arguments(parser): """ Add restart arguments to the parser :type parser: `argparse.ArgumentParser` :param parser: The parser to add arguments to """ rsopts = parser.add_argument_group('Restart options') rsopts.add_argument( FLAG_USEZIPDATA, metavar='PATH', help='Extract the archive before running. Any completed files ' 'in the archive will be used instead of submitting new jobs.') rsopts.add_argument( FLAG_RESTART_PROJ, metavar='PROJECT_NAME', help='Project name the restart job is associated with.') pde = launchparams.ProjectDisposition disp_choices = [pde.APPEND.value, pde.IGNORE.value] disp_str = ', '.join(disp_choices) rsopts.add_argument( FLAG_RESTART_DISP, metavar='INCORPORATION', choices=disp_choices, help='Incorporation state for the restart job. Should be one of: ' '%s.' % disp_str) rsopts.add_argument( FLAG_RESTART_VIEWNAME, metavar='VIEWNAME', help='Viewname for the panel submitting the restart job. Leave ' 'blank if running from the command line.') rsopts.add_argument( FLAG_RESTART_HOST, metavar='HOSTNAME:PROCESSORS', help='Host for restart in the form of "hostname:cpus".') rsopts.add_argument( FLAG_RESTART_JOBNAME, metavar='JOBNAME', help='Jobname for restart.')
[docs]def get_restart_options(options): """ Get the command line options from the -restart_x flags :rtype: tuple :return: tuple of strings (viewname, incorporation, project, host:cpu, jobname) """ restart_viewname = get_option(options, FLAG_RESTART_VIEWNAME) restart_disp = get_option(options, FLAG_RESTART_DISP) restart_proj = get_option(options, FLAG_RESTART_PROJ) restart_host = get_option(options, FLAG_RESTART_HOST) restart_jobname = get_option(options, FLAG_RESTART_JOBNAME) return (restart_viewname, restart_disp, restart_proj, restart_host, restart_jobname)
[docs]def add_keyword_parser_argument(parser, keyword_flags=None, default='', arghelp=None, action='store'): """ Add a keyword argument to the parser :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to :type keyword_flags: None or list of str :param keyword_flags: the customized flags for this argument :type default: str or None :param default: The customized default values for this argument :type arghelp: str or None :param arghelp: The customized help message for this argument :type action: `argparse.Action` or str :param action: The customized action for this argument or the standard ones: 'store', 'store_true', 'store_false', 'append' and so on. """ if keyword_flags is None: keyword_flags = ['-keywords', '-k'] if not arghelp: arghelp = ('Jaguar keywords to include in calculations. Can be given ' 'multiple keywords, ex. -k dftname=b3lyp basis=midix... If ' 'this is the last flag before the input/output file names, ' 'use -- to terminate the list of keywords.') parser.add_argument( *keyword_flags, default=default, action=action, metavar='KEYWORD=VALUE KEYWORD=VALUE', nargs='+', help=arghelp)
[docs]def parse_keyword_list(options, exit_on_error=True, logger=None, validate=False): """ Adds keystring and keydict properties to the argparse options object that are the list of keyword arguments turned into, respectively, a string or a dictionary. Valid keyword syntax is checked for. Typical usage: jobutils.add_keyword_parser_argument(parser) options = parser.parse_args(args) jobutils.parse_keyword_list(options) print options.keystring, options.keydict :type options: `argparse.Namespace` :param options: The parser Namespace object with a keywords attribute :type exit_on_error: bool :param exit_on_error: True if the ValueError for keyword syntax should result in a message being printed and sys.exit() being called, False if the error should just be raised. :type logger: `logging.Logger` :param logger: The logger to use for recording error messages, otherwise they'll be printed. :type validate: bool :param validate: Whether keywords/values not recognized by Jaguar should be considered an error. :raises ValueError: If an invalid keyword pair syntax is detected and handle_error is False :raises `schrodinger.application.jaguar.validation.JaguarKeywordException` if validate=True, exit_on_error=False and an invalid Jaguar keyword/value has been used. """ if options.keywords: options.keystring = ' '.join(options.keywords) else: options.keystring = "" options.keystring = options.keystring.lower() try: options.keydict = jaguarworkflows.keyword_string_to_dict( options.keystring) except ValueError as msg: if exit_on_error: err_msg = 'Keyword parsing failed - stopping:\n%s' % str(msg) textlogger.log(logger, err_msg) sys.exit(1) else: raise if validate: # MATSCI-5669 import warnings msg = ('Jaguar keyword validation is no longer performed') warnings.warn(msg, DeprecationWarning, stacklevel=2)
[docs]def add_save_desmond_files_parser_argument(parser): """ Add a `SAVE_FLAG` argument to the parser :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to """ parser.add_argument( SAVE_FLAG, choices=SAVE_FLAG_OPTS, default=SAVE_NONE, help="Specify whether intermediate CMS or trajectory " "files be included with the job output. Choices " "are {0} (save no files), {1} (save .cms files) and {2} " "(save .cms and trajectory files). Default is " "{3}.".format(SAVE_NONE, SAVE_CMS, SAVE_TRJ, SAVE_NONE))
[docs]def add_compress_subjobs_parser_argument(parser, help=None): """ Add a `FLAG_COMPRESS_SUBJOBS` argument to the parser :param `argparse.ArgumentParser` parser: The parser to add this argument to :type str help: The help message """ if help is None: help = 'Compress files from successful subjobs' parser.add_argument(FLAG_COMPRESS_SUBJOBS, action='store_true', help=help)
[docs]def add_desmond_parser_arguments(parser, args, defaults=None): """ Add desmond related arguments to the parser :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to :type args: list :param args: List of arguments to add to the parser :type defaults: dict or None :param defaults: Default values for the arguments """ if defaults is None: defaults = MD_FLAGS_DEFAULTS else: user_defaults = defaults defaults = MD_FLAGS_DEFAULTS.copy() defaults.update(user_defaults) for arg in args: if arg == SPLIT_COMPONENTS_FLAG: parser.add_argument( SPLIT_COMPONENTS_FLAG, action='store_true', help='Split system in components when Desmond system is built. ' 'This can speed up the the force field assignment for systems ' 'that contain multiple identical molecules.') elif arg == FLAG_MD_TEMP: parser.add_argument( FLAG_MD_TEMP, default=defaults[FLAG_MD_TEMP], type=parserutils.type_positive_float, help='Temperature (in K) of the simulations.') elif arg == FLAG_MD_PRESS: parser.add_argument( FLAG_MD_PRESS, default=defaults[FLAG_MD_PRESS], type=parserutils.type_positive_float, help='Pressure (in bar) of the simulations.') elif arg == FLAG_MD_TIME: parser.add_argument( FLAG_MD_TIME, default=defaults[FLAG_MD_TIME], type=parserutils.type_positive_float, help='MD time (in ps) of the simulations.') elif arg == FLAG_MD_TIMESTEP: parser.add_argument( FLAG_MD_TIMESTEP, default=defaults[FLAG_MD_TIMESTEP], type=parserutils.type_positive_float, help='MD time step (in fs) of the simulations.') elif arg == FLAG_MD_TRJ_INT: parser.add_argument( FLAG_MD_TRJ_INT, default=defaults[FLAG_MD_TRJ_INT], type=parserutils.type_positive_float, help='MD trajectory recording interval (in ps).') elif arg == FLAG_MD_ENEGRP_INT: parser.add_argument( FLAG_MD_ENEGRP_INT, default=defaults[FLAG_MD_ENEGRP_INT], type=parserutils.type_positive_float, help='MD energy group recording interval (in ps).') elif arg == FLAG_MD_ENESEQ_INT: parser.add_argument( FLAG_MD_ENESEQ_INT, default=defaults[FLAG_MD_ENESEQ_INT], type=parserutils.type_positive_float, help='MD energy recording interval (in ps).') elif arg == FLAG_MD_ENSEMBLE: parser.add_argument( FLAG_MD_ENSEMBLE, default=defaults[FLAG_MD_ENSEMBLE], choices=desmondutils.ENSEMBLES, metavar='ENSEMBLE', help='Desmond ensemble. Known values are: %s' % ', '.join( desmondutils.ENSEMBLES)) elif arg == FLAG_MD_ISOTROPY: parser.add_argument( FLAG_MD_ISOTROPY, default=defaults[FLAG_MD_ISOTROPY], choices=dconst.IsotropyPolicy, metavar='ISOTROPY_POLICY', help='Desmond barostat isotropy policy. Known values are: ' '%s' % ', '.join(dconst.IsotropyPolicy)) elif arg == FLAG_GPU: # MATSCI - 9064: remove cpu desmond support parser.add_argument( FLAG_GPU, action='store_true', default=True, help=argparse.SUPPRESS) elif arg == WATER_FF_TYPE_FLAG: parser.add_argument( WATER_FF_TYPE_FLAG, action='store', metavar='WATER_TYPE', default=defaults[WATER_FF_TYPE_FLAG], choices=desmondutils.WATER_FFTYPES.keys(), help='Force field type for water molecules. Must be used ' f'with {SPLIT_COMPONENTS_FLAG}. If using "{desmondutils.NONE}"' ', the Residue Name atom property is used instead.') elif arg == FLAG_RANDOM_SEED: add_random_seed_parser_argument(parser) elif arg == SAVE_FLAG: add_save_desmond_files_parser_argument(parser) elif arg == FLAG_FORCEFIELD: parser.add_argument( FLAG_FORCEFIELD, default=defaults[FLAG_FORCEFIELD], type=parserutils.type_forcefield, metavar='FORCE_FIELD', help='Force field to use. %s' % parserutils.valid_forcefield_info()) elif arg == FLAG_MONOMER_CHARGE: parser.add_argument( FLAG_MONOMER_CHARGE, action='store_true', help= 'Use forcefield charges computed for the monomer for the polymer. ' f'This option is particularly useful for {mm.OPLS_NAME_F16}, which ' f'uses {mm.OPLS_NAME_F14} charges by default for molecules larger ' 'than 100 heavy atoms.') elif arg == FLAG_COMBINE_TRJ: parser.add_argument( FLAG_COMBINE_TRJ, action='store_true', help='Merge all trajectories into a single one.') else: raise TypeError('Unrecognized parser argument: ' + str(arg))
[docs]def add_traj_analysis_parser_arguments(parser): """ Add trajectory analysis parser arguments/ :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to """ parser.add_argument( parserutils.FLAG_CMS_PATH, help='Input cms file', required=True, type=parserutils.type_cms_file) parser.add_argument( parserutils.FLAG_TRJ_PATH, required=True, help='Directory of the trajectory') parser.add_argument( parserutils.FLAG_TRJ_MIN, help='Minimum of trajectory frame to be analysed.', type=parserutils.type_nonnegative_int) parser.add_argument( parserutils.FLAG_TRJ_MAX, help='Maximum of trajectory frame to be analysed.', type=parserutils.type_nonnegative_int)
[docs]def add_random_seed_parser_argument(parser, additional_args=(), help=DEFAULT_SEED_HELP): """ Add the command line flag for the random number seed :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to :param tuple additional_args: Arguments that should also be accepted in addition to FLAG_RANDOM_SEED :param str help: The help string to show for random seed. The default string will be used if nothing is supplied. """ args = [FLAG_RANDOM_SEED] args.extend(additional_args) parser.add_argument( *args, type=parserutils.type_random_seed, default=parserutils.RANDOM_SEED_DEFAULT, help=help)
[docs]def seed_random_number_generator(options, log=None): """ Seed the random number generator based on the command line options. If there is no seed in the command line options, a random value is used. :type options: `argparse.Namespace` :param options: The command line options from argparse. Note that passing in None for options will have the same affect as if the seed flag does not exist on options (i.e. a random value will be generated). :type log: function :param log: A function to log the seed value. Should take a single str argument. :rtype: int :return: The seed used for the generator """ seed = get_option(options, FLAG_RANDOM_SEED) if seed is None: seed = random.randint(parserutils.RANDOM_SEED_MIN, parserutils.RANDOM_SEED_MAX) random.seed(seed) if log: log(f'Random number generator seeded with {seed}') return seed
[docs]def check_license(panel=None, token=license.MATERIALSCIENCE_MAIN, name="", as_validator=False): """ Check if a valid token exists. If called from Maestro, also check out and hold a MATERIALSCIENCE_MAIN token. :type panel: schrodinger.ui.qt.appframework.AppFramework :param panel: panel to use to put up an error dialog if no license :type token: `schrodinger.utils.license` constant :param token: A token type from the schrodinger.utils.license module, such as MATERIALSCIENCE_MAIN or MATERIALSCIENCE_GA :type name: str :param name: The user-identifiable name for the token - used for error messages. If not provided, the string used in the license module for this token (if one exists) will be used. :type as_validator: bool :param as_validator: If True, this function will work as an AF2 validation method. Instead of posting a dialog or printing a message for a failed license check, it will return (False, error_message). :rtype: bool or (bool, str) :return: True if valid license exists. If no valid license exists, False will be returned by default, but (False, msg) will be returned if as_validator=True. Note that (False, msg) evalutes to True so must be handled by the calling routine as not a boolean if as_validator=True. """ global CHECKED_OUT_MATSCI_MAIN if not name: try: name = license.LICENSE_NAMES[token] except KeyError: pass msg = '' # Check out and hold a MATERIALSCIENCE_MAIN license if calling from Maestro if schrodinger.get_maestro(): if not CHECKED_OUT_MATSCI_MAIN or not CHECKED_OUT_MATSCI_MAIN.isValid(): # Tampering with licensing is a violation of the license agreement CHECKED_OUT_MATSCI_MAIN = license.License( license.MATERIALSCIENCE_MAIN) if not CHECKED_OUT_MATSCI_MAIN.isValid(): msg = ('There are no remaining MATERIALSCIENCE_MAIN license ' 'tokens. Materials Science features cannot be used in ' 'Maestro.') # Tampering with licensing is a violation of the license agreement if not msg and not license.is_license_available(token): msg = (f'No {name} license token is available. ' f'No calculation can be run.') if msg: if as_validator: return (False, msg) elif panel: panel.error(msg) else: print(msg) return False return True
[docs]def create_run_dir(panel, jobname): """ Create a subdirectory to run a job in, asking the user and removing existing directories if needed. :type panel: schrodinger.ui.qt.appframework.AppFramework :param panel: panel to use to put up an error dialog if no license :type jobname: str :param jobname: The name of the job. The directory will be jobname + _dir :rtype: str or None :return: The path to the directory or None if an existing directory was found and the user elected not to remove it """ outdir = os.path.join(os.getcwd(), jobname + '_dir') if os.path.exists(outdir): if not panel.question('The job directory, %s, already exists.\nWould' ' you like to delete its contents and ' 'continue?' % os.path.basename(outdir)): return None def force_remove(func, path, excinfo): # Try to remove any difficult to rm file if func in (os.rmdir, os.remove): fileutils.force_remove(path) else: raise shutil.rmtree(outdir, onerror=force_remove) os.mkdir(outdir) return outdir
[docs]def string_to_value(string): """ Change a text string from a file to a value. Converts string values of special Python tokens such as True, False or None to the Python tokens. Converts numbers to int or float if possible. :type string: str :param string: The string to convert :return: string converted to, in order of preference: [True|False|None], int, float, or input type """ literals = {'True': True, 'False': False, 'None': None} if string in literals: # Special words value = literals[string] else: # Try to convert to a number if possible try: value = int(string) except ValueError: try: value = float(string) except ValueError: value = string return value
[docs]@contextlib.contextmanager def working_directory(path): """ A context manager which changes the working directory to the given path, and then changes it back to its previous value on exit. """ prev_cwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(prev_cwd)
[docs]class StringCleaner(object): """ Manages the cleaning of strings. """
[docs] def __init__(self, extra_replacement_pairs=None, separator='-'): """ Populate an instance with some defaults. The replacement dictionary needs to be set such that the most specific replacements occur last. This is because the replacements should be done in a certain order, for example ('C:\\', '') should be done before (':', '') and ('\\', ''), and because people tend to append to an iterable rather than prepend we will traverse the iterable backwards. :type extra_replacement_pairs: list of tuples :param extra_replacement_pairs: each tuple in this list contains a single replacement pair, i.e. a single substring to be replaced and a single substring to replace it. :type separator: str :param separator: in the case of non-unique strings this is the string that separates the non-unique part from the number of times used which is the unique part. """ def pair(from_str, to_str=''): return (from_str, to_str) BASE_PAIRS = [ pair('\\'), pair('/'), pair(r'\\'), pair('?'), pair('%'), pair('*'), pair(':'), pair('|'), pair('"'), pair('>'), pair('<'), pair('('), pair(')'), pair('+'), pair(',') ] COMBIGLD_PAIRS = [ pair(' ', '_'), pair(';'), pair(']'), pair('['), pair('][', '-'), pair('[]') ] ALL_PAIRS = BASE_PAIRS + COMBIGLD_PAIRS if extra_replacement_pairs: ALL_PAIRS += extra_replacement_pairs self.replacement_dict = OrderedDict(ALL_PAIRS) self.separator = separator self.prev_names = {}
[docs] def cleanAndUniquify(self, input_str, clear_prev=False, max_len=CLEAN_AND_UNIQUIFY_MAX_LEN): """ Shorten if necessary, replace certain characters in an input string and then uniquify the string by comparing with a dictionary of previous names and number of times used. :type input_str: str :param input_str: the input string we want cleaned and uniqified :type clear_prev: bool :param clear_prev: specify if the dictionary of previous names should first be cleared :type max_len: int :param max_len: maximum length of the input_str allowed, otherwise it will be shortened to the max_len value :rtype: str :return: the input string now cleaned and uniquified """ # Shorten string if necessary output_str = input_str[:max_len] for from_substr in list(self.replacement_dict)[::-1]: to_substr = self.replacement_dict[from_substr] output_str = output_str.replace(from_substr, to_substr) if clear_prev: self.prev_names.clear() if not self.prev_names.get(output_str): self.prev_names[output_str] = 1 else: self.prev_names[output_str] += 1 output_str += self.separator + str(self.prev_names[output_str]) return output_str
[docs]def clean_string(string, default='title'): """ Cleans the given string by removing special characters to make it acceptable for a file name. If the string is blank, it will be replaced by the value of default. :type string: str :param string: The string to clean. :type default: str :param default: The name to use if string is blank :rtype: str :return: A string usable as a filename """ cleaner = StringCleaner() unclean = string or default return cleaner.cleanAndUniquify(unclean)
[docs]def zip_and_set_incorporation(zipname, filelist): """ Zip up all the requested files and set the resulting archive as the job control backend structure output file (if runnning under job control). :type zipname: str :param zipname: The name of the archive to create :type filelist: list :param filelist: Each item in filelist is the name of a file to add to file zipname """ zipper = zipfile.ZipFile(zipname, 'w') for filename in filelist: zipper.write(filename) zipper.close() backend = jobcontrol.get_backend() if backend: backend.addOutputFile(zipname) backend.setStructureOutputFile(zipname)
[docs]def escape_dollars_for_cmdline(pattern): r""" Make sure $ are escaped in the given pattern for command lines :type pattern: str :param pattern: A string to modify :rtype: str :return: pattern with a \ in front of $ """ return re.sub(DOLLARS, r'\1\\$', pattern)
[docs]class CellRunInfo(object): """ Holds the information for the run for a single cell """
[docs] def __init__(self, structs, basename, replicate, multiple_cells, component=None, repeat_unit_info=None): """ Create a CellRunInfo object :type structs: list :param structs: The list of structures to include in the cell :type basename: str :param basename: The generic basename for job files. This will be modified based on the value of replicate, component and multiple_cells to form the base name for files for this specific cell. :type replicate: int :param replicate: Which replicate this is for - 1-based :type multiple_cells: bool :param multiple_cells: Whether there will be multiple replicates of this cell :type component: int or None :param component: The structure number this cell is for, or None if this is a mixed structure cell :type repeat_unit_info: list or None :param repeat_unit_info: Each item of the list is a tuple. The first item of the list is the sequence of monomer one-letter codes that give the repeat unit sequence. The second item is a tag to be added to the polymer name for that sequence (used for enumerated sequences) """ self.structs = structs self.basename = basename self.replicate = replicate self.cru = None if component: self.basename = self.basename + '_component%d' % component else: self.basename = self.basename + '_all_components' if repeat_unit_info and repeat_unit_info[1]: self.cru = repeat_unit_info[0] self.basename = self.basename + '_%s' % repeat_unit_info[1] if multiple_cells: self.basename = self.basename + '_%d' % replicate self.is_cg = all(coarsegrain.is_coarse_grain(x) for x in structs)
[docs]class MultijobDriver(object): """ Resubmit the driver as subjobs """ COMPOSITION_FLAG = '-composition' COMPOSITION_FLAG_SHORT = '-c'
[docs] def __init__(self, runlist, options, args, log, remote_script, default_jobname, basename=None): """ Create multiple cells in parallel running a subjob for each cell. Zip up the resulting cms files into a jobname.zip file and set it as the structure output file to be incorporated. :type runlist: list of `CellRunInfo` :param runlist: Each item of runlist will generate a subjob and a single cell :type options: `argparse.Namespace` :param options: The command line options :type args: iterable :param args: The command line arguments as passed in by sys :type log: function :param log: function(msg) writes msg to log file :type remote_script: string :param remote_script: the dir and name for driver to resubmit :type default_jobname: string :param default_jobname: Default job name :type basename: str or None :param basename: The generic basename defined from inputfile. """ if hasattr(options, 'output_basename'): basename = options.output_basename elif not basename: basename = default_jobname self.multijob_driver(runlist, options, args, basename, log, remote_script)
[docs] def remove_flag(self, args, flag, and_value=False): """ Remove a flag from the comand line flags :type args: list :param args: The list of command line arguments :type flag: str :param flag: The flag to remove :type and_value: bool :param and_value: Also remove the value associated with the flag - it is assumed that this is the following list item """ try: index = args.index(flag) except ValueError: # Flag was not used, so we don't need to do anything return if and_value: del args[index:index + 2] else: del args[index]
[docs] def replace_value(self, args, old_value, new_value): """ Replace the list item with value=old_value with the new value :type args: list :param args: The list of command line arguments :type old_value: str :param old_value: The value to replace :type new_value: str :param new_value: The value to replace old_value with """ try: index = args.index(old_value) except ValueError: return args[index] = new_value
[docs] def multijob_driver(self, runlist, options, args, basename, log, remote_script): """ Create multiple cells in parallel running a subjob for each cell. Zip up the resulting cms files into a jobname.zip file and set it as the structure output file to be incorporated. :type runlist: list of `CellRunInfo` :param runlist: Each item of runlist will generate a subjob and a single cell :type options: `argparse.Namespace` :param options: The command line options :type args: iterable :param args: The command line arguments as passed in by sys :type basename: str :param basename: The zipped .zip or . maegz filename for all job files. :type log: function :param log: function(msg) writes msg to log file :type remote_script: string :param remote_script: the dir and name for driver to resubmit """ log('Setting up job queue') sub_args = list(args) jobdj = queue.JobDJ( verbosity='normal', max_failures=queue.NOLIMIT, max_retries=3) for host, procs in jobdj._hosts.items(): log('Host: %s, processors:%d' % (host, procs)) # Remove flags we don't want for subjobs for flag in ['-homogeneous', '-no_disordered_cell']: self.remove_flag(sub_args, flag) self.remove_flag(sub_args, '-ncells', and_value=True) # homogeneous cells should not have composition flags as they only have # one component homo_args = sub_args[:] self.remove_flag(homo_args, self.COMPOSITION_FLAG, and_value=True) # User might have used the short form (-c) self.remove_flag(homo_args, self.COMPOSITION_FLAG_SHORT, and_value=True) # Create a subjob for each cell to create for runinfo in runlist: if len(runinfo.structs) == 1: cmdargs = homo_args[:] else: cmdargs = sub_args[:] if options.seed: seed = options.seed * runinfo.replicate self.replace_value(cmdargs, str(options.seed), str(seed)) if runinfo.cru: self.replace_value(cmdargs, options.repeat_unit, runinfo.cru) # Set up the input and output file names if hasattr(options, 'output_basename'): self.replace_value(cmdargs, options.output_basename, runinfo.basename) if hasattr(options, 'title'): self.replace_value(cmdargs, options.title, runinfo.basename) subinput = runinfo.basename + '.maegz' fileutils.force_remove(subinput) for struct in runinfo.structs: struct.append(subinput) self.replace_value(cmdargs, options.input_file, subinput) cmd = ['run', remote_script] + cmdargs rjob = jaguarworkflows.RobustSubmissionJob(cmd) jobdj.addJob(rjob) log('Number of jobs to run: %d' % jobdj.total_added) # Run all jobs and make sure we grab their output expect_exts = ['.maegz' if options.no_system else '.cms'] st_files = run_jobdj_and_add_files(jobdj, log, expect_exts=expect_exts) backend = jobcontrol.get_backend() if options.no_system: # Read all the output structures and compile them into one file outname = basename + AMCELL_NO_SYSTEM_OUT writer = structure.StructureWriter(outname) struct_titles = {} for struct in structure.MultiFileStructureReader(st_files): # same component of different structures needs different titles # in PT if struct.title not in struct_titles: struct_titles[struct.title] = 0 else: struct_titles[struct.title] += 1 struct.title = struct.title + '_' + str( struct_titles[struct.title]) writer.append(struct) writer.close() if backend: backend.addOutputFile(outname) backend.setStructureOutputFile(outname) else: # Desmond CMS files need to be incorporated into a zip file. # Zip up all the cms files and incorporate them all if backend: jobname = backend.getJob().Name else: jobname = basename outname = jobname + '.zip' zip_and_set_incorporation(outname, st_files) log('Output compiled in %s' % outname)
[docs]def get_jobname(default_jobname): """ Return a the jobname from backend, command line (-NAMEJOB / environment), DEFAULT_JOBNAME :type default_jobname: str :param default_jobname: default_jobname of the current module :rtype: string :return: Jobname """ assert default_jobname return jobcontrol.get_jobname(default_jobname) or default_jobname
[docs]def get_procs(): """ Get number of processors from backend or command-line arguments. :rtype: int :return: Number of processors """ backend = jobcontrol.get_backend() if backend: if jobcontrol.get_backend_host_list(): return jobcontrol.get_backend_host_list()[0][1] or 1 else: return 1 else: if jobcontrol.get_command_line_host_list(): return jobcontrol.get_command_line_host_list()[0][1] or 1 else: return 1 return 1
[docs]def memory_usage_psutil(): """ return the memory usage in MB :rtype: float :rparam: memory usage in MB """ process = psutil.Process(os.getpid()) mem = process.memory_info()[0] / float(2**20) return mem
[docs]def get_size_of(an_object, size_unit='megabytes'): """ Return the size of an object in size_unit. The object can be any type of object. All built-in objects will return correct results, but this does not have to hold true for third-party extensions as it is implementation specific. :param an_object: :type an_object: any type of python object :return: the size of an object in size_unit :rtype: float """ unit_to_power = {'bytes': 0, 'kilobytes': 1, 'megabytes': 2, 'gigabytes': 3} return sys.getsizeof(an_object) / 1024**unit_to_power[size_unit]
[docs]def get_jobhosts(): """ Return the job hosts from backend or command line. :rtype: None or list of tuple :rparam: the hosts or None """ hosts = jobcontrol.get_backend_host_list() if not hosts: hosts = jobcontrol.get_command_line_host_list() return hosts
[docs]def get_jobhost(): """ Return the first job hosts from backend or command line. :rtype: list or None :rparam: the first host or None """ hosts = get_jobhosts() if not hosts: return host = hosts[0] if host[1] is not None: return list(host) return [host[0], 1]
[docs]def get_jobhost_name(): """ Return the job host name from backend or command line. :rtype: str or None :rparam: the host name """ hosts = get_jobhost() return hosts[0] if hosts else None
[docs]def is_hostname_known(hostname): """ Check whether hostname is defined in the host file. :type hostname: str :param hostname: the hostname to check against :rtype: bool :rparam: True, if the hostname is defined in the host file """ hosts_list = jobcontrol.get_hosts() host_names = {h.name for h in hosts_list} return hostname in host_names
[docs]def is_jobhost_gpu_available(): """ Check whether the gpu is available on the host. First check SUBHOST, if defined. Then, check HOST, if defined. At last, check localhost. :rtype: bool :rparam: True means gpu is available. """ hostname = get_jobhost_name() if hostname in [None, 'localhost']: # The calculation runs on local return gpgpu.is_any_gpu_available() # This checks the gpu availability for SUBHOST, if defined. # This checks the gpu availability for HOST, if SUBHOST not defined # and HOST defined. host = jobcontrol.get_host(hostname) # Define the GPU availability based on gpgpu (SUPPORT-128375) return bool(host.gpgpu)
[docs]def add_zipfile_to_backend(adir): """ Add a zip file of the given directory to the job backend. :type adir: str :param adir: the directory """ zip_file = f'{adir}.zip' zip_directory(adir, fileobj=zip_file) backend = None add_outfile_to_backend(zip_file, backend)
[docs]def get_backend_first_host(): """ Get backend first host. :rtype: str, int :return: host, number of processors """ assert jobcontrol.get_backend() hostlist = jobcontrol.get_backend_host_list() if hostlist: return hostlist[0] else: return ('localhost', 1)
[docs]def write_cms_with_wam(cms_model, filename, wam_type): """ Write the cms model to a file with the provided WAM property :param `cms.Cms` cms_model: The cms model to write to file :param str filename: The cms path :param int wam_type: One of the enums defined in workflow_action_menu.h """ with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer: writer.append(cms_model._raw_fsys_ct) for ct in cms_model.comp_ct: ct.append(filename, "CMS")
[docs]def add_wam_to_cms(filename, wam_type): """ Rewrite the cms with the WAM type added :param str filename: The cms path :param int wam_type: One of the enums defined in workflow_action_menu.h """ cms_model = cms.Cms(filename) write_cms_with_wam(cms_model, filename, wam_type)
[docs]def write_mae_with_wam(struct, filename, wam_type=None): """ :param `structure.Structure` struct: The structure to write to file :param str filename: The mae path :param int wam_type: One of the enums defined in workflow_action_menu.h :raise ValueError: If the file path is not for a Maestro file """ if fileutils.get_structure_file_format(filename) != structure.MAESTRO: raise ValueError(f"{filename} is not a Maestro file path.") with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer: writer.append(struct)
[docs]def set_structure_wam(struct, wam_type): """ Sets the WAM property for the passed structure :param `structure.Structure` struct: The structure to set the WAM for :param int wam_type: One of the enums defined in workflow_action_menu.h """ wam.set_workflow_action_menu(struct, wam_type)
[docs]def set_smart_distribution_from_environ(jobq): """ Set smart distribution of the queue to on/off based on the environment variable. :type jobq: `schrodinger.job.queue.JobDJ` :param jobq: The JobDJ object """ enable_sd = os.environ.get('SCHRODINGER_MATSCI_REUSE_DRIVER_HOST') if enable_sd is None: return try: state = msutils.setting_to_bool(str(enable_sd)) except ValueError: # Invalid string pass else: jobq.smart_distribution = state
[docs]def add_tpp_parser_argument(parser): """ Add a `FLAG_TPP` argument to the parser :type parser: `argparse.ArgumentParser` :param parser: The parser to add this argument to """ parser.add_argument( FLAG_TPP, type=parserutils.type_positive_int, default=DEFAULT_TPP, help='Specify the number ' 'of threads to use for parallelizing each subjob that supports ' 'threading.')