"""
Copyright Schrodinger, LLC. All rights reserved.
"""
import argparse
import contextlib
import os
import os.path
import random
import re
import shutil
import sys
import tarfile
import zipfile
from collections import OrderedDict
from collections import defaultdict
import psutil
import schrodinger
from schrodinger import gpgpu
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants as dconst
from schrodinger.application.matsci import coarsegrain
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import property_names
from schrodinger.application.matsci import textlogger
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import launchapi
from schrodinger.job import launcher
from schrodinger.job import launchparams
from schrodinger.job import queue
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.structutils import minimize
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import fileutils
from schrodinger.utils import license
FLAG_KEYWORDS = '-keywords'
SAVE_FLAG = "-save_trj_data"
SAVE_NONE = "none"
SAVE_CMS = "cms"
SAVE_TRJ = "trj"
SAVE_FLAG_OPTS = [SAVE_NONE, SAVE_CMS, SAVE_TRJ]
FLAG_COMBINE_TRJ = '-combine_trj'
AMCELL_NO_SYSTEM_OUT = '-amcell.maegz'
FLAG_MONOMER_CHARGE = '-monomer_ff_q'
FLAG_MD_TIME = '-md_time' # In ns or ps
FLAG_MD_TIMESTEP = '-md_timestep' # In fs
FLAG_MD_TRJ_INT = '-md_trj_int' # In ps
FLAG_MD_ENEGRP_INT = '-md_enegrp_int' # In ps
FLAG_MD_ENESEQ_INT = '-md_eneseq_int' # In ps
FLAG_MD_TEMP = '-md_temp' # In K
FLAG_MD_PRESS = '-md_press' # In bar
FLAG_MD_ENSEMBLE = '-md_ensemble'
FLAG_MD_ISOTROPY = '-md_isotropy'
FLAG_GPU = '-gpu'
SPLIT_COMPONENTS_FLAG = '-split_components'
FLAG_FORCEFIELD = '-forcefield'
FLAG_PTENSOR_AVG = '-ptensor_avg'
FLAG_RANDOM_SEED = '-seed'
# Subjob compression
FLAG_COMPRESS_SUBJOBS = '-compress_subjobs'
WATER_FF_TYPE_FLAG = '-water_fftype'
MD_FLAGS_DEFAULTS = {
FLAG_MD_TEMP: 300.0,
FLAG_MD_PRESS: 1.01325,
FLAG_MD_TIME: 100.0,
FLAG_MD_TIMESTEP: 2.0,
FLAG_MD_TRJ_INT: 10.0,
FLAG_MD_ENEGRP_INT: 10.0,
FLAG_MD_ENESEQ_INT: 10.0,
FLAG_MD_ENSEMBLE: desmondutils.ENSEMBLE_NVT,
FLAG_MD_ISOTROPY: dconst.IsotropyPolicy.ISOTROPIC,
WATER_FF_TYPE_FLAG: desmondutils.SPC,
SPLIT_COMPONENTS_FLAG: False,
FLAG_RANDOM_SEED: parserutils.RANDOM_SEED_DEFAULT,
SAVE_FLAG: SAVE_NONE,
FLAG_FORCEFIELD: mm.OPLS_NAME_F14
}
DRIVER = 'driver'
ARGS = 'args'
RESTART_PARAMETERS_FILENAME = 'parameters.cmd'
FLAG_USEZIPDATA = '-use_zip_data'
FLAG_RESTART_PROJ = '-restart_proj'
FLAG_RESTART_DISP = '-restart_disp'
FLAG_RESTART_VIEWNAME = '-restart_viewname'
FLAG_RESTART_HOST = '-restart_host'
FLAG_RESTART_JOBNAME = '-restart_jobname'
FLAG_CONFIG_YAML = '-config_yaml'
RESTART_PROGRAM_NAME = 'RestartWorkflow'
RESTART_DEFAULT_JOBNAME = 'restart_workflow'
CLEAN_AND_UNIQUIFY_MAX_LEN = 100
DOLLARS = re.compile(r'([^\\])\$')
SOURCE_PATH_PROP = property_names.SOURCE_PATH_PROP
DEFAULT_SEED_HELP = "Seed for the random number generator used in simulations."
TGZ_FORMAT = tarfile.PAX_FORMAT
FROM_SUBJOB = 'from_subjob'
LOG_TAG = 'log_tag'
CHECKED_OUT_MATSCI_MAIN = None
WAM_TYPES = wam.WorkflowType
# TPP
FLAG_TPP = '-TPP'
DEFAULT_TPP = 1
[docs]def get_logging_tag(job, tag=LOG_TAG):
"""
Get the logging tag of a job.
:param job: a job object
:type job: `schrodinger.job.queue.JobControlJob`
:param tag: the job attribute containing the info.
:type tag: str
:return: the logging tag
:rtype: str
"""
return getattr(job, tag, "")
[docs]def add_outfile_to_backend(file_fn,
backend=None,
set_structure_output=False,
stream=False):
"""
Add output file to the backend.
:type file_fn: str
:param file_fn: Filename
:type backend: `schrodinger.job._Backend` or None
:param backend: Backend handle. If None, a backend will be checked for. If
no backend is found, nothing will be done.
:type set_structure_output: bool
:param set_structure_output: If True, set this structure as output
:type stream: bool
:param stream: If True, stream the file to the submission host
"""
if not backend:
backend = jobcontrol.get_backend()
if not backend:
return
if stream:
backend.addLogFile(file_fn)
else:
backend.addOutputFile(file_fn)
if set_structure_output:
backend.setStructureOutputFile(file_fn)
[docs]def log_structures_found(qjob, structure_files, log,
jobstates_for_logging=None):
"""
Log structures info for a job.
:type qjob: `schrodinger.job.queue.JobControlJob`
:param qjob: The subjob to find structures
:type structure_files: dict
:param structure_files: Keys are subjobs, values are sets of structure
file names
:type log: callable
:param log: function(msg) writes msg to log file
:type jobstates_for_logging: None or list of str
:param jobstates_for_logging: Log info for subjobs in these states
"""
if jobstates_for_logging is None:
jobstates_for_logging = {queue.JobState.DONE, queue.JobState.FAILED}
if qjob.state not in jobstates_for_logging:
return
sfiles = structure_files[qjob]
# When logging information about a job object, if the object has an attribute
# named the module constant LOG_TAG, the value of that attribute will precede
# the rest of the log message. This enables log file lines to be attributed
# to specific jobs.
msg = get_logging_tag(qjob)
if sfiles:
msg += f"Found completed structures: {', '.join(sfiles)}"
else:
jobname = qjob.name if qjob.name else ' '.join(qjob._command)
msg += f'No output structure found for {jobname}'
log(msg)
[docs]def run_jobdj_and_add_files(jobdj,
log,
expect_exts=None,
exclude_exts=None,
jobdj_dir=os.curdir):
"""
Run the subjobs currently queued up in the jobdj, adding their files to the
current backend for copying back to the job directory and locating the
expected structure output file for each job.
:type jobdj: `schrodinger.job.queue.JobDJ`
:param jobdj: The JobDJ with queued up jobs to run
:type log: callable
:param log: function(msg) writes msg to log file
:type expect_exts: None or list of str
:param expect_exts: The expected extensions of the output files
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
:type jobdj_dir: str
:param jobdj_dir: jobdj_dir is the relative path from where the backend is
created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2
as an example, normally backend and jobdj are created in /scr/user/jobname/,
and thus jobdj_dir by default is os.curdir. If the jobdj is created inside
/scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created
in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/,
jobdj is in /scr/user/jobname/subdir1, and the subjob can run in
/scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2)
:rtype: list
:return: A list of structure output file names, sorted alphabetically
"""
# Run all jobs and make sure we grab their output
backend = jobcontrol.get_backend()
structure_files = defaultdict(set)
completed_jobs = set()
if expect_exts is None:
expect_exts = ['.cms', '.mae', '.zip']
if exclude_exts is None:
exclude_exts = []
def _finalize(qjob, force=False):
"""
Process status changes for a JobDJ job. Log relevant information for
completed jobs.
:param qjob: JobDJ's snapshot of job state at status change
:type qjob: schrodinger.job.queue.BaseJob
:param bool force: Whether to consider jobs complete even if
job.isComplete() returns False
"""
if not isinstance(qjob, queue.JobControlJob):
return
# The status change could be to RUNNING
if not qjob.hasExited():
return
# We attempt to finalize each job as it is retryable failed, failed
# or done so that its files are copied back even if the subjob fails
# or the parent job gets killed before all subjobs finish
if finalize_subjob(
qjob,
backend,
structure_files,
expect_exts,
log,
exclude_exts=exclude_exts,
jobdj_dir=jobdj_dir) or force:
completed_jobs.add(qjob)
# Log information about results found and not found for completed
# Done and Failed subjobs
log_structures_found(qjob, structure_files, log)
# status_change_callback is called every time a job status changes. For
# example, RUNNING->DONE.
jobdj.run(status_change_callback=_finalize)
for qjob in jobdj.all_jobs:
if qjob in completed_jobs:
continue
# If multiple jobs complete simulatenously, there may be some jobs missing
# from the status_change_callback. Also jobs might be returned before
# isComplete returns True. This should catch all these caes.
_finalize(qjob, force=True)
# Sorting ensures that structures for similar systems appear near each
# other in the PT after incorporation
output_st_files = []
for subjob_outfiles in structure_files.values():
output_st_files += [
os.path.basename(cmd_dir_filename)
for cmd_dir_filename in subjob_outfiles
]
output_st_files.sort()
return output_st_files
[docs]def finalize_subjob(subjob,
backend,
structure_files,
structure_extensions,
log,
exclude_exts=None,
jobdj_dir=os.curdir):
"""
Mark subjob output and log files for copying back to the job directory,
and find the structure output file if there is one
:type subjob: `schrodinger.job.queue.JobControlJob` or None
:param subjob: The subjob to mark files from
:type backend: `schrodinger.job.jobcontrol._Backend` or None
:param backend: The current backend or None if there isn't one
:type structure_files: dict
:param structure_files: If an output structure file is found, it will
be added to this dict. Keys are subjobs, values are sets of structure
file names
:type structure_extensions: list of str
:param structure_extensions: The expected extension of the structure files
:type log: function
:param log: function(msg) writes msg to log file
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
:type jobdj_dir: str
:param jobdj_dir: jobdj_dir is the relative path from where the backend is
created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2
as an example, normally backend and jobdj are created in /scr/user/jobname/,
and thus jobdj_dir by default is os.curdir. If the jobdj is created inside
/scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created
in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/,
jobdj is in /scr/user/jobname/subdir1, and the subjob can run in
/scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2)
:rtype: bool
:return: True if the job has completed, False if not
"""
ajob = subjob.getJob()
if not ajob:
# Job has not been submitted yet, or is just in that process
return
if exclude_exts is None:
exclude_exts = []
outfiles = ajob.OutputFiles
if hasattr(subjob, 'outfiles'):
# Permittivity workflow may set outfiles outside the subjob and this
# combines the standard ajob.OutputFiles with the customized job.outfiles
outfiles += subjob.outfiles
sub_dir = subjob.getCommandDir()
path = sub_dir if jobdj_dir == os.curdir else os.path.join(
jobdj_dir, sub_dir)
add_subjob_files_to_backend(
ajob, path=path, backend=backend, exclude_exts=exclude_exts)
for filename in outfiles:
if sub_dir:
filename = os.path.join(sub_dir, filename)
if not os.path.exists(filename):
continue
if any([filename.endswith(x) for x in exclude_exts]):
continue
extension = fileutils.splitext(filename)[1]
for structure_extension in structure_extensions:
if extension.startswith(structure_extension):
structure_files[subjob].add(filename)
# Completed means finished running. It does not mean success or failure.
# When logging information about a job object, if the object has an attribute
# named the module constant LOG_TAG, the value of that attribute will precede
# the rest of the log message. This enables log file lines to be attributed
# to specific jobs.
if ajob.isComplete():
msg = get_logging_tag(subjob)
msg += f'Job {ajob.Name} completed'
log(msg)
return True
else:
return False
[docs]def add_subjob_files_to_backend(subjob,
path=None,
backend=None,
exclude_exts=None):
"""
Add all the output and log files from a subjob to the backend of this job so
that they get copied back to the original job directory.
:note: subjob log files are added as output files instead of log files. They
will not be streamed back to the original job directory but instead
copied back at the end of this job like a normal output file.
:type subjob: `schrodinger.job.jobcontrol.Job` or
`schrodinger.job.queue.JobControlJob`
:param subjob: The subjob to add files from.
:type path: str
:param path: The path to the subjob directory from where the backend is created
if it was not run in the same directory as this job. Use `FROM_SUBJOB` to
get the subjob directory from a JobControlJob object - this will be ignored
if subjob is a Job object.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The backend if one exists
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
"""
if not backend:
backend = jobcontrol.get_backend()
if not backend:
return
if isinstance(subjob, queue.JobControlJob):
if path == FROM_SUBJOB:
path = subjob.getCommandDir()
subjob = subjob.getJob()
if not subjob:
return
if exclude_exts is None:
exclude_exts = []
# Subjob log files may have already been registered with this backend as
# a log (streaming) file so that the subjob log gets streamed back
# into the original job directory. It is an error to register a file
# with the same backend as both a streaming and output file. We'll check
# against the list of streaming log files to avoid this error.
this_job_logfiles = set(backend.getJob().LogFiles)
for filename in subjob.OutputFiles + subjob.LogFiles:
if any([filename.endswith(x) for x in exclude_exts]):
continue
if path:
filename = os.path.join(path, filename)
# Trying to register a log file as an out file both raises a traceback
# and prints messages to the log file, so we do a pre-check rather than
# a try/except to avoid the log file messages
if filename not in this_job_logfiles:
backend.addOutputFile(filename)
[docs]def determine_source_path(backend=None, job=None):
"""
Determine the original job directory. This is obtained from the job control
Job object for this process. If no Job object is found, the current
directory is used.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The job control backend. Will be used to obtain the job
object if no job is supplied. If neither backend or job are supplied,
the backend will be obtained from job control (if one exists).
:type job: `schrodinger.job.jobcontrol.Job`
:param job: The job control job for this process. If not supplied, will be
obtained from the backend (if one exists).
:rtype: str
:return: The directory that is the source path. Will be either the
OrigLaunchDir property of the job or the current directory if not
running under job control.
"""
if not job:
if not backend:
backend = jobcontrol.get_backend()
if backend:
job = backend.getJob()
else:
# Not running under job control, so running in the local directory
return os.getcwd()
try:
sourcedir = job.OrigLaunchDir
except AttributeError:
# We don't know that this could ever happen, but just being safe
sourcedir = ""
return sourcedir
[docs]def set_source_path(struct, backend=None, job=None, path=None):
"""
Set the source path property to the original job directory. This is obtained
from the job control Job object for this process. If no Job object is found,
the current directory is used.
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to set the property on. Note that property
setting works properly on both Structure and Cms objects.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The job control backend. Will be used to obtain the job
object if no job is supplied. If neither backend or job are supplied,
the backend will be obtained from job control (if one exists).
:type job: `schrodinger.job.jobcontrol.Job`
:param job: The job control job for this process. If not supplied, will be
obtained from the backend (if one exists).
:param str path: Manually set this path to the source directory, overriding
all other options
:rtype: str
:return: The directory set as the source path. Will be either the
OrigLaunchDir property of the job or the current directory if not
running under job control.
"""
if path:
sourcedir = path
else:
sourcedir = determine_source_path(backend=backend, job=job)
# Note - deleting the property first is a workaround for SHARED-6890 just in
# case we are working in Maestro
if isinstance(struct, cms.Cms):
desmondutils.delete_cms_property(struct, SOURCE_PATH_PROP)
desmondutils.add_cms_property(struct, SOURCE_PATH_PROP, sourcedir)
else:
struct.property.pop(SOURCE_PATH_PROP, None)
struct.property[SOURCE_PATH_PROP] = sourcedir
return sourcedir
[docs]def get_source_path(source, existence_check=True):
"""
Get the source path to the original job directory
:type source: `schrodinger.structure.Structure` or
`schrodinger.project.ProjectRow`
:param source: Either the ProjectRow or the structure to obtain the
source information from. If a structure, can be either a Structure or a Cms
object.
:type existence_check: bool
:param existence_check: If True (default), a blank string will be returned
if the source path does not exist. If False, the path is returned regardless
of whether it exists or not.
:rtype: str
:return: The original job directory or a blank string if none is found
"""
try:
path = source.property.get(SOURCE_PATH_PROP, "")
except (AttributeError, TypeError):
# This is neither a Structure nor a ProjectRow
raise TypeError('source must be a Structure or ProjectRow')
if path and (not existence_check or os.path.exists(path)):
# Add on any job subdirectory for this structure
subdir = source.property.get(property_names.SUBDIRECTORY_PROP)
if subdir:
subpath = os.path.join(path, subdir)
if (not existence_check or os.path.exists(subpath)):
path = subpath
return path
return ""
[docs]def get_file_path(struct, prop):
"""
Get the path of the file defined by source path and property name
:param struct: The structure whose property is checked
:type struct: `schrodinger.structure.Structure`
:param prop: property pointing to a file
:type prop: str
:return: path of the file if found
:rtype: str or None
"""
filename = struct.property.get(prop)
if not filename:
return
source_path = get_source_path(struct)
filepath = os.path.join(source_path, filename)
if not os.path.isfile(filepath):
return
return filepath
[docs]def fix_jaguar_file_links(struct, subdir=None):
"""
Change the Jaguar file link properties to point to the original job
directory instead of the subjob directory
:param schrodinger.structure.Structure struct: The structure to fix
:param str subdir: The subdirectory for these files in the main job
directory
"""
backend = jobcontrol.get_backend()
if not backend:
return
orig_dir = determine_source_path(backend=backend)
if subdir:
orig_dir = '/'.join((orig_dir, subdir))
for prop, val in struct.property.items():
if jaguarworkflows.is_jaguar_file_property(prop):
fname = os.path.basename(val)
# We use '/'.join rather than os.path.join in order to create paths
# that work on any system
struct.property[prop] = '/'.join((orig_dir, fname))
[docs]def prepare_job_spec_builder(argv,
program_name,
default_jobname,
input_fn=None,
set_driver_reserves_cores=False,
schrodinger_product=None):
"""
Prepare generic job specification builder.
If set_driver_reserves_cores script (driver) is set to True, script is
expected to use all the cores (cpus), similar to umbrella mode in multisim.
For an example see stress-strain driver. For all other cases (such as in
opto/hopping/amorphous) keep set_driver_reserves_cores to False.
:type argv: list
:param argv: The list of command line arguments, including the script name
at [0], similar to that returned by sys.argv
:type program_name: str
:param program_name: Program name
:type default_jobname: str
:param default_jobname: Default job name
:type input_fn: str
:param input_fn: Input filename
:type set_driver_reserves_cores: bool
:param set_driver_reserves_cores: If True, enable
launchapi.setDriverReservesCores
:type schrodinger_product: str
:param schrodinger_product: A product directory to search for the
script/executable. This should be the name of a directory under
SCHRODINGER without the trailing version (i.e. the "-v*" part).
:rtype: `launchapi.JobSpecificationArgsBuilder`
:return: Job specification builder object
"""
job_builder = launchapi.JobSpecificationArgsBuilder(
argv,
use_jobname_log=True,
schrodinger_product=schrodinger_product,
program_name=program_name,
default_jobname=default_jobname)
if input_fn and os.path.isfile(input_fn):
job_builder.setInputFile(input_fn)
if set_driver_reserves_cores:
job_builder.setDriverReservesCores(True)
return job_builder
[docs]def add_folder_to_job_builder(job_builder, folder_path):
"""
Add folder (trajectory folder) to job directory.
:type job_builder: `launchapi.JobSpecificationArgsBuilder`
:param job_builder: Job specification builder object.
:type folder_path: str
:param folder_path: Full path to the folder that needs to copied.
"""
file_path = fileutils.get_files_from_folder(folder_path)
for (abs_pathname, runtime_path) in file_path:
job_builder.setInputFile(abs_pathname, runtime_path=runtime_path)
[docs]def get_stripped_defaults(adict):
"""
Remove dashes in the beginning of the keys. Used with ArgumentParser.
:type adict: dict{string:any}
:param adict: Dictionary of defaults
:rtype: dict{string:any}
@rparam: Dictionary of defaults with dashed stripped from the keys
"""
ret = {}
for key, val in adict.items():
if key.startswith('-'):
key = key[1:]
ret[key] = val
return ret
[docs]def argparser_set_defaults(parser, defaults):
"""
Set default values to the arguments already present in the parser. Note
that, parser.set_defaults contaminates namespace with the arguments that
might be absent from the parser.
:type parser: `argparse.ArgumentParser`
:param parser: The parser to set defaults
:type defaults: dict{string:any}
:param defaults: Dictionary of the defaults
"""
defaults = get_stripped_defaults(defaults)
# See cpython/Lib/argparse.py :: set_defaults
for action in parser._actions:
if action.dest in defaults:
parser._defaults[action.dest] = defaults[action.dest]
action.default = defaults[action.dest]
[docs]def parse_restart_parameters_file(path):
"""
Parse parameters file.
Format of the file:
1st line is driver's filename
2nd line is original arguments passed to the driver
:type path: str
:param path: Path to the file with original arguments
:rtype: dict
:return: Dictionary with parsed values
"""
params = {}
with open(path, 'r') as pfile:
params[DRIVER] = pfile.readline().strip()
params[ARGS] = pfile.readline().strip().split()
return params
[docs]def write_restart_parameters_file(driver, args, outpath):
"""
Write out original arguments to parameters file and add the file as an
output file to any existing jobcontrol backend.
:type driver: str
:param driver: Driver's filename
:type args: list
:param args: Original arguments passed to driver
:type outpath: str
:param outpath: Path to the parameters file to write original arguments
"""
args_str = ' '.join(args)
# Remove all occurrences of -HOST/-host in args
args_str = re.sub(r'(?i)-HOST [^\s]+', '', args_str).strip()
# Remove all occurrences of -TPP in args
args_str = re.sub(r'(?i)%s [^\s]+' % '-TPP', '', args_str).strip()
# For now, remove all occurrences of FLAG_USEZIPDATA in args
args_str = re.sub(r'(?i)%s [^\s]+' % FLAG_USEZIPDATA, '', args_str).strip()
with open(outpath, 'w') as outfile:
outfile.write('%s\n' % driver)
outfile.write(args_str + '\n')
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(outpath)
[docs]def get_restart_id_filename(jobname):
"""
For the given restart jobname, return the name of the file containing the
job id.
:rtype: str
:return: The name of the restart jobid file
"""
return jobname + '.jobid'
[docs]def archive_job_data(path, files_path):
"""
Create a gzipped tar archive in the current directory from the provided list
of file paths. All the error handling is on the caller function.
:type path: path
:param path: Path to the new archive to be created
:type path: files_path
:param path: List of files to be archived
"""
with tarfile.open(name=path, mode='w:gz', format=TGZ_FORMAT) as tar:
for file_path in set(files_path):
tar.add(file_path)
[docs]def create_restart_launcher(script, prog, input_name, output_name, zip_name,
options, args):
(viewname, disp, proj, host, jobname) = get_restart_options(options)
scriptlauncher = launcher.Launcher(
script=script,
jobname=jobname,
viewname=viewname,
disp=disp,
proj=proj,
prog=prog,
copyscript=False,
runtoplevel=True)
args = ['-use_zip_data', zip_name] + args
args = ['-HOST', host] + args
scriptlauncher.addScriptArgs(args)
scriptlauncher.addInputFile(input_name)
scriptlauncher.addInputFile(zip_name)
scriptlauncher.addOutputFile(output_name)
scriptlauncher.setStructureOutputFile(output_name)
# No need to update the parameters file from the restart
return scriptlauncher
[docs]def create_restart_jobcmd(driver_path, zip_fn, restart_options, args,
default_jobname):
"""
Generate command-line list for the job restart.
:type driver_path: str
:param driver_path: Path to the driver
:type zip_fn: str
:param zip_fn: Filename of the archive with all restart files
:type restart_options: `argparse.Namespace`
:param restart_options: The object holding all the option values
:type args: list
:param args: List of the arguments passed to the original run
:type default_jobname: str
:param default_jobname: Default job name
:rtype: list
@rparam: List of parameters ready for job submission
"""
(viewname, disp, proj, host, jobname) = get_restart_options(restart_options)
# Prepending
args[:0] = [FLAG_USEZIPDATA, zip_fn]
if host:
args[:0] = ['-HOST', host]
if proj:
args[:0] = ['-PROJ', proj]
if disp:
args[:0] = ['-DISP', disp]
if viewname:
args[:0] = ['-VIEWNAME', viewname]
if jobname:
args[:0] = ['-JOBNAME', jobname]
else:
args[:0] = ['-JOBNAME', default_jobname]
args = ['$SCHRODINGER/run', driver_path] + args
return args
[docs]def write_idfile(jobobj):
"""
Store the job id in a file as a signal to the GUI that a new job has been
launched.
:type jobobj: `schrodinger.job.jobcontrol.Job`
:param jobobj: The object holding all the option values
"""
filename = get_restart_id_filename(jobobj.name)
idfile = open(filename, 'w')
idfile.write(jobobj.jobid)
idfile.close()
[docs]def get_option(options, flag):
"""
Return the option value associated with flag
:type options: `argparse.Namespace`
:param options: The object holding all the option values
:type flag: str
:param flag: The flag for the desired option.
:rtype: any
:return: The value associated with flag, or None if flag (minus any leading
dashes) is not found as a property on options
"""
if flag.startswith('-'):
flag = flag[1:]
try:
return getattr(options, flag)
except AttributeError:
return None
[docs]def set_option(options, flag, value):
"""
Set the option value associated with flag
:type options: `argparse.Namespace`
:param options: The object holding all the option values
:type flag: str
:param flag: The flag for the desired option. If the string starts with a
'-', the '-' is removed.
:type value: any
:param value: The value to set the option.flag value to
"""
if flag.startswith('-'):
flag = flag[1:]
setattr(options, flag, value)
[docs]def add_restart_parser_arguments(parser):
"""
Add restart arguments to the parser
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add arguments to
"""
rsopts = parser.add_argument_group('Restart options')
rsopts.add_argument(
FLAG_USEZIPDATA,
metavar='PATH',
help='Extract the archive before running. Any completed files '
'in the archive will be used instead of submitting new jobs.')
rsopts.add_argument(
FLAG_RESTART_PROJ,
metavar='PROJECT_NAME',
help='Project name the restart job is associated with.')
pde = launchparams.ProjectDisposition
disp_choices = [pde.APPEND.value, pde.IGNORE.value]
disp_str = ', '.join(disp_choices)
rsopts.add_argument(
FLAG_RESTART_DISP,
metavar='INCORPORATION',
choices=disp_choices,
help='Incorporation state for the restart job. Should be one of: '
'%s.' % disp_str)
rsopts.add_argument(
FLAG_RESTART_VIEWNAME,
metavar='VIEWNAME',
help='Viewname for the panel submitting the restart job. Leave '
'blank if running from the command line.')
rsopts.add_argument(
FLAG_RESTART_HOST,
metavar='HOSTNAME:PROCESSORS',
help='Host for restart in the form of "hostname:cpus".')
rsopts.add_argument(
FLAG_RESTART_JOBNAME, metavar='JOBNAME', help='Jobname for restart.')
[docs]def get_restart_options(options):
"""
Get the command line options from the -restart_x flags
:rtype: tuple
:return: tuple of strings (viewname, incorporation, project, host:cpu,
jobname)
"""
restart_viewname = get_option(options, FLAG_RESTART_VIEWNAME)
restart_disp = get_option(options, FLAG_RESTART_DISP)
restart_proj = get_option(options, FLAG_RESTART_PROJ)
restart_host = get_option(options, FLAG_RESTART_HOST)
restart_jobname = get_option(options, FLAG_RESTART_JOBNAME)
return (restart_viewname, restart_disp, restart_proj, restart_host,
restart_jobname)
[docs]def add_keyword_parser_argument(parser,
keyword_flags=None,
default='',
arghelp=None,
action='store'):
"""
Add a keyword argument to the parser
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
:type keyword_flags: None or list of str
:param keyword_flags: the customized flags for this argument
:type default: str or None
:param default: The customized default values for this argument
:type arghelp: str or None
:param arghelp: The customized help message for this argument
:type action: `argparse.Action` or str
:param action: The customized action for this argument or the standard ones:
'store', 'store_true', 'store_false', 'append' and so on.
"""
if keyword_flags is None:
keyword_flags = ['-keywords', '-k']
if not arghelp:
arghelp = ('Jaguar keywords to include in calculations. Can be given '
'multiple keywords, ex. -k dftname=b3lyp basis=midix... If '
'this is the last flag before the input/output file names, '
'use -- to terminate the list of keywords.')
parser.add_argument(
*keyword_flags,
default=default,
action=action,
metavar='KEYWORD=VALUE KEYWORD=VALUE',
nargs='+',
help=arghelp)
[docs]def parse_keyword_list(options, exit_on_error=True, logger=None,
validate=False):
"""
Adds keystring and keydict properties to the argparse options object that
are the list of keyword arguments turned into, respectively, a string or a
dictionary. Valid keyword syntax is checked for.
Typical usage:
jobutils.add_keyword_parser_argument(parser)
options = parser.parse_args(args)
jobutils.parse_keyword_list(options)
print options.keystring, options.keydict
:type options: `argparse.Namespace`
:param options: The parser Namespace object with a keywords attribute
:type exit_on_error: bool
:param exit_on_error: True if the ValueError for keyword syntax should
result in a message being printed and sys.exit() being called, False if the
error should just be raised.
:type logger: `logging.Logger`
:param logger: The logger to use for recording error messages, otherwise
they'll be printed.
:type validate: bool
:param validate: Whether keywords/values not recognized by Jaguar should be
considered an error.
:raises ValueError: If an invalid keyword pair syntax is detected and
handle_error is False
:raises `schrodinger.application.jaguar.validation.JaguarKeywordException`
if validate=True, exit_on_error=False and an invalid Jaguar keyword/value
has been used.
"""
if options.keywords:
options.keystring = ' '.join(options.keywords)
else:
options.keystring = ""
options.keystring = options.keystring.lower()
try:
options.keydict = jaguarworkflows.keyword_string_to_dict(
options.keystring)
except ValueError as msg:
if exit_on_error:
err_msg = 'Keyword parsing failed - stopping:\n%s' % str(msg)
textlogger.log(logger, err_msg)
sys.exit(1)
else:
raise
if validate:
# MATSCI-5669
import warnings
msg = ('Jaguar keyword validation is no longer performed')
warnings.warn(msg, DeprecationWarning, stacklevel=2)
[docs]def add_save_desmond_files_parser_argument(parser):
"""
Add a `SAVE_FLAG` argument to the parser
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
"""
parser.add_argument(
SAVE_FLAG,
choices=SAVE_FLAG_OPTS,
default=SAVE_NONE,
help="Specify whether intermediate CMS or trajectory "
"files be included with the job output. Choices "
"are {0} (save no files), {1} (save .cms files) and {2} "
"(save .cms and trajectory files). Default is "
"{3}.".format(SAVE_NONE, SAVE_CMS, SAVE_TRJ, SAVE_NONE))
[docs]def add_compress_subjobs_parser_argument(parser, help=None):
"""
Add a `FLAG_COMPRESS_SUBJOBS` argument to the parser
:param `argparse.ArgumentParser` parser: The parser to add this argument to
:type str help: The help message
"""
if help is None:
help = 'Compress files from successful subjobs'
parser.add_argument(FLAG_COMPRESS_SUBJOBS, action='store_true', help=help)
[docs]def add_desmond_parser_arguments(parser, args, defaults=None):
"""
Add desmond related arguments to the parser
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
:type args: list
:param args: List of arguments to add to the parser
:type defaults: dict or None
:param defaults: Default values for the arguments
"""
if defaults is None:
defaults = MD_FLAGS_DEFAULTS
else:
user_defaults = defaults
defaults = MD_FLAGS_DEFAULTS.copy()
defaults.update(user_defaults)
for arg in args:
if arg == SPLIT_COMPONENTS_FLAG:
parser.add_argument(
SPLIT_COMPONENTS_FLAG,
action='store_true',
help='Split system in components when Desmond system is built. '
'This can speed up the the force field assignment for systems '
'that contain multiple identical molecules.')
elif arg == FLAG_MD_TEMP:
parser.add_argument(
FLAG_MD_TEMP,
default=defaults[FLAG_MD_TEMP],
type=parserutils.type_positive_float,
help='Temperature (in K) of the simulations.')
elif arg == FLAG_MD_PRESS:
parser.add_argument(
FLAG_MD_PRESS,
default=defaults[FLAG_MD_PRESS],
type=parserutils.type_positive_float,
help='Pressure (in bar) of the simulations.')
elif arg == FLAG_MD_TIME:
parser.add_argument(
FLAG_MD_TIME,
default=defaults[FLAG_MD_TIME],
type=parserutils.type_positive_float,
help='MD time (in ps) of the simulations.')
elif arg == FLAG_MD_TIMESTEP:
parser.add_argument(
FLAG_MD_TIMESTEP,
default=defaults[FLAG_MD_TIMESTEP],
type=parserutils.type_positive_float,
help='MD time step (in fs) of the simulations.')
elif arg == FLAG_MD_TRJ_INT:
parser.add_argument(
FLAG_MD_TRJ_INT,
default=defaults[FLAG_MD_TRJ_INT],
type=parserutils.type_positive_float,
help='MD trajectory recording interval (in ps).')
elif arg == FLAG_MD_ENEGRP_INT:
parser.add_argument(
FLAG_MD_ENEGRP_INT,
default=defaults[FLAG_MD_ENEGRP_INT],
type=parserutils.type_positive_float,
help='MD energy group recording interval (in ps).')
elif arg == FLAG_MD_ENESEQ_INT:
parser.add_argument(
FLAG_MD_ENESEQ_INT,
default=defaults[FLAG_MD_ENESEQ_INT],
type=parserutils.type_positive_float,
help='MD energy recording interval (in ps).')
elif arg == FLAG_MD_ENSEMBLE:
parser.add_argument(
FLAG_MD_ENSEMBLE,
default=defaults[FLAG_MD_ENSEMBLE],
choices=desmondutils.ENSEMBLES,
metavar='ENSEMBLE',
help='Desmond ensemble. Known values are: %s' % ', '.join(
desmondutils.ENSEMBLES))
elif arg == FLAG_MD_ISOTROPY:
parser.add_argument(
FLAG_MD_ISOTROPY,
default=defaults[FLAG_MD_ISOTROPY],
choices=dconst.IsotropyPolicy,
metavar='ISOTROPY_POLICY',
help='Desmond barostat isotropy policy. Known values are: '
'%s' % ', '.join(dconst.IsotropyPolicy))
elif arg == FLAG_GPU:
# MATSCI - 9064: remove cpu desmond support
parser.add_argument(
FLAG_GPU,
action='store_true',
default=True,
help=argparse.SUPPRESS)
elif arg == WATER_FF_TYPE_FLAG:
parser.add_argument(
WATER_FF_TYPE_FLAG,
action='store',
metavar='WATER_TYPE',
default=defaults[WATER_FF_TYPE_FLAG],
choices=desmondutils.WATER_FFTYPES.keys(),
help='Force field type for water molecules. Must be used '
f'with {SPLIT_COMPONENTS_FLAG}. If using "{desmondutils.NONE}"'
', the Residue Name atom property is used instead.')
elif arg == FLAG_RANDOM_SEED:
add_random_seed_parser_argument(parser)
elif arg == SAVE_FLAG:
add_save_desmond_files_parser_argument(parser)
elif arg == FLAG_FORCEFIELD:
parser.add_argument(
FLAG_FORCEFIELD,
default=defaults[FLAG_FORCEFIELD],
type=parserutils.type_forcefield,
metavar='FORCE_FIELD',
help='Force field to use. %s' %
parserutils.valid_forcefield_info())
elif arg == FLAG_MONOMER_CHARGE:
parser.add_argument(
FLAG_MONOMER_CHARGE,
action='store_true',
help=
'Use forcefield charges computed for the monomer for the polymer. '
f'This option is particularly useful for {mm.OPLS_NAME_F16}, which '
f'uses {mm.OPLS_NAME_F14} charges by default for molecules larger '
'than 100 heavy atoms.')
elif arg == FLAG_COMBINE_TRJ:
parser.add_argument(
FLAG_COMBINE_TRJ,
action='store_true',
help='Merge all trajectories into a single one.')
else:
raise TypeError('Unrecognized parser argument: ' + str(arg))
[docs]def add_traj_analysis_parser_arguments(parser):
"""
Add trajectory analysis parser arguments/
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
"""
parser.add_argument(
parserutils.FLAG_CMS_PATH,
help='Input cms file',
required=True,
type=parserutils.type_cms_file)
parser.add_argument(
parserutils.FLAG_TRJ_PATH,
required=True,
help='Directory of the trajectory')
parser.add_argument(
parserutils.FLAG_TRJ_MIN,
help='Minimum of trajectory frame to be analysed.',
type=parserutils.type_nonnegative_int)
parser.add_argument(
parserutils.FLAG_TRJ_MAX,
help='Maximum of trajectory frame to be analysed.',
type=parserutils.type_nonnegative_int)
[docs]def add_random_seed_parser_argument(parser,
additional_args=(),
help=DEFAULT_SEED_HELP):
"""
Add the command line flag for the random number seed
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
:param tuple additional_args: Arguments that should also be accepted
in addition to FLAG_RANDOM_SEED
:param str help: The help string to show for random seed. The default
string will be used if nothing is supplied.
"""
args = [FLAG_RANDOM_SEED]
args.extend(additional_args)
parser.add_argument(
*args,
type=parserutils.type_random_seed,
default=parserutils.RANDOM_SEED_DEFAULT,
help=help)
[docs]def seed_random_number_generator(options, log=None):
"""
Seed the random number generator based on the command line options. If there
is no seed in the command line options, a random value is used.
:type options: `argparse.Namespace`
:param options: The command line options from argparse. Note that passing in
None for options will have the same affect as if the seed flag does not
exist on options (i.e. a random value will be generated).
:type log: function
:param log: A function to log the seed value. Should take a single str
argument.
:rtype: int
:return: The seed used for the generator
"""
seed = get_option(options, FLAG_RANDOM_SEED)
if seed is None:
seed = random.randint(parserutils.RANDOM_SEED_MIN,
parserutils.RANDOM_SEED_MAX)
random.seed(seed)
if log:
log(f'Random number generator seeded with {seed}')
return seed
[docs]def check_license(panel=None,
token=license.MATERIALSCIENCE_MAIN,
name="",
as_validator=False):
"""
Check if a valid token exists. If called from Maestro, also check out and
hold a MATERIALSCIENCE_MAIN token.
:type panel: schrodinger.ui.qt.appframework.AppFramework
:param panel: panel to use to put up an error dialog if no license
:type token: `schrodinger.utils.license` constant
:param token: A token type from the schrodinger.utils.license module, such
as MATERIALSCIENCE_MAIN or MATERIALSCIENCE_GA
:type name: str
:param name: The user-identifiable name for the token - used for error
messages. If not provided, the string used in the license module for this
token (if one exists) will be used.
:type as_validator: bool
:param as_validator: If True, this function will work as an AF2 validation
method. Instead of posting a dialog or printing a message for a failed
license check, it will return (False, error_message).
:rtype: bool or (bool, str)
:return: True if valid license exists. If no valid license exists, False
will be returned by default, but (False, msg) will be returned if
as_validator=True. Note that (False, msg) evalutes to True so must be
handled by the calling routine as not a boolean if as_validator=True.
"""
global CHECKED_OUT_MATSCI_MAIN
if not name:
try:
name = license.LICENSE_NAMES[token]
except KeyError:
pass
msg = ''
# Check out and hold a MATERIALSCIENCE_MAIN license if calling from Maestro
if schrodinger.get_maestro():
if not CHECKED_OUT_MATSCI_MAIN or not CHECKED_OUT_MATSCI_MAIN.isValid():
# Tampering with licensing is a violation of the license agreement
CHECKED_OUT_MATSCI_MAIN = license.License(
license.MATERIALSCIENCE_MAIN)
if not CHECKED_OUT_MATSCI_MAIN.isValid():
msg = ('There are no remaining MATERIALSCIENCE_MAIN license '
'tokens. Materials Science features cannot be used in '
'Maestro.')
# Tampering with licensing is a violation of the license agreement
if not msg and not license.is_license_available(token):
msg = (f'No {name} license token is available. '
f'No calculation can be run.')
if msg:
if as_validator:
return (False, msg)
elif panel:
panel.error(msg)
else:
print(msg)
return False
return True
[docs]def create_run_dir(panel, jobname):
"""
Create a subdirectory to run a job in, asking the user and removing existing
directories if needed.
:type panel: schrodinger.ui.qt.appframework.AppFramework
:param panel: panel to use to put up an error dialog if no license
:type jobname: str
:param jobname: The name of the job. The directory will be jobname + _dir
:rtype: str or None
:return: The path to the directory or None if an existing directory was
found and the user elected not to remove it
"""
outdir = os.path.join(os.getcwd(), jobname + '_dir')
if os.path.exists(outdir):
if not panel.question('The job directory, %s, already exists.\nWould'
' you like to delete its contents and '
'continue?' % os.path.basename(outdir)):
return None
def force_remove(func, path, excinfo):
# Try to remove any difficult to rm file
if func in (os.rmdir, os.remove):
fileutils.force_remove(path)
else:
raise
shutil.rmtree(outdir, onerror=force_remove)
os.mkdir(outdir)
return outdir
[docs]def string_to_value(string):
"""
Change a text string from a file to a value. Converts string values of
special Python tokens such as True, False or None to the Python tokens.
Converts numbers to int or float if possible.
:type string: str
:param string: The string to convert
:return: string converted to, in order of preference: [True|False|None],
int, float, or input type
"""
literals = {'True': True, 'False': False, 'None': None}
if string in literals:
# Special words
value = literals[string]
else:
# Try to convert to a number if possible
try:
value = int(string)
except ValueError:
try:
value = float(string)
except ValueError:
value = string
return value
[docs]@contextlib.contextmanager
def working_directory(path):
"""
A context manager which changes the working directory to the given
path, and then changes it back to its previous value on exit.
"""
prev_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(prev_cwd)
[docs]class StringCleaner(object):
"""
Manages the cleaning of strings.
"""
[docs] def __init__(self, extra_replacement_pairs=None, separator='-'):
"""
Populate an instance with some defaults. The replacement
dictionary needs to be set such that the most specific
replacements occur last. This is because the replacements
should be done in a certain order, for example ('C:\\', '')
should be done before (':', '') and ('\\', ''), and because
people tend to append to an iterable rather than prepend we
will traverse the iterable backwards.
:type extra_replacement_pairs: list of tuples
:param extra_replacement_pairs: each tuple in this list contains
a single replacement pair, i.e. a single substring to be replaced
and a single substring to replace it.
:type separator: str
:param separator: in the case of non-unique strings this is the
string that separates the non-unique part from the number of times
used which is the unique part.
"""
def pair(from_str, to_str=''):
return (from_str, to_str)
BASE_PAIRS = [
pair('\\'),
pair('/'),
pair(r'\\'),
pair('?'),
pair('%'),
pair('*'),
pair(':'),
pair('|'),
pair('"'),
pair('>'),
pair('<'),
pair('('),
pair(')'),
pair('+'),
pair(',')
]
COMBIGLD_PAIRS = [
pair(' ', '_'),
pair(';'),
pair(']'),
pair('['),
pair('][', '-'),
pair('[]')
]
ALL_PAIRS = BASE_PAIRS + COMBIGLD_PAIRS
if extra_replacement_pairs:
ALL_PAIRS += extra_replacement_pairs
self.replacement_dict = OrderedDict(ALL_PAIRS)
self.separator = separator
self.prev_names = {}
[docs] def cleanAndUniquify(self,
input_str,
clear_prev=False,
max_len=CLEAN_AND_UNIQUIFY_MAX_LEN):
"""
Shorten if necessary, replace certain characters in an input string
and then uniquify the string by comparing with a dictionary of
previous names and number of times used.
:type input_str: str
:param input_str: the input string we want cleaned and uniqified
:type clear_prev: bool
:param clear_prev: specify if the dictionary of previous names
should first be cleared
:type max_len: int
:param max_len: maximum length of the input_str allowed, otherwise
it will be shortened to the max_len value
:rtype: str
:return: the input string now cleaned and uniquified
"""
# Shorten string if necessary
output_str = input_str[:max_len]
for from_substr in list(self.replacement_dict)[::-1]:
to_substr = self.replacement_dict[from_substr]
output_str = output_str.replace(from_substr, to_substr)
if clear_prev:
self.prev_names.clear()
if not self.prev_names.get(output_str):
self.prev_names[output_str] = 1
else:
self.prev_names[output_str] += 1
output_str += self.separator + str(self.prev_names[output_str])
return output_str
[docs]def clean_string(string, default='title'):
"""
Cleans the given string by removing special characters to make it
acceptable for a file name. If the string is blank, it will be replaced by
the value of default.
:type string: str
:param string: The string to clean.
:type default: str
:param default: The name to use if string is blank
:rtype: str
:return: A string usable as a filename
"""
cleaner = StringCleaner()
unclean = string or default
return cleaner.cleanAndUniquify(unclean)
[docs]def zip_and_set_incorporation(zipname, filelist):
"""
Zip up all the requested files and set the resulting archive as the job
control backend structure output file (if runnning under job control).
:type zipname: str
:param zipname: The name of the archive to create
:type filelist: list
:param filelist: Each item in filelist is the name of a file to add to file
zipname
"""
zipper = zipfile.ZipFile(zipname, 'w')
for filename in filelist:
zipper.write(filename)
zipper.close()
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(zipname)
backend.setStructureOutputFile(zipname)
[docs]def escape_dollars_for_cmdline(pattern):
r"""
Make sure $ are escaped in the given pattern for command lines
:type pattern: str
:param pattern: A string to modify
:rtype: str
:return: pattern with a \ in front of $
"""
return re.sub(DOLLARS, r'\1\\$', pattern)
[docs]class CellRunInfo(object):
"""
Holds the information for the run for a single cell
"""
[docs] def __init__(self,
structs,
basename,
replicate,
multiple_cells,
component=None,
repeat_unit_info=None):
"""
Create a CellRunInfo object
:type structs: list
:param structs: The list of structures to include in the cell
:type basename: str
:param basename: The generic basename for job files. This will be
modified based on the value of replicate, component and multiple_cells
to form the base name for files for this specific cell.
:type replicate: int
:param replicate: Which replicate this is for - 1-based
:type multiple_cells: bool
:param multiple_cells: Whether there will be multiple replicates of this
cell
:type component: int or None
:param component: The structure number this cell is for, or None if this
is a mixed structure cell
:type repeat_unit_info: list or None
:param repeat_unit_info: Each item of the list is a tuple. The first item
of the list is the sequence of monomer one-letter codes that give the
repeat unit sequence. The second item is a tag to be added to the
polymer name for that sequence (used for enumerated sequences)
"""
self.structs = structs
self.basename = basename
self.replicate = replicate
self.cru = None
if component:
self.basename = self.basename + '_component%d' % component
else:
self.basename = self.basename + '_all_components'
if repeat_unit_info and repeat_unit_info[1]:
self.cru = repeat_unit_info[0]
self.basename = self.basename + '_%s' % repeat_unit_info[1]
if multiple_cells:
self.basename = self.basename + '_%d' % replicate
self.is_cg = all(coarsegrain.is_coarse_grain(x) for x in structs)
[docs]class MultijobDriver(object):
"""
Resubmit the driver as subjobs
"""
COMPOSITION_FLAG = '-composition'
COMPOSITION_FLAG_SHORT = '-c'
[docs] def __init__(self,
runlist,
options,
args,
log,
remote_script,
default_jobname,
basename=None):
"""
Create multiple cells in parallel running a subjob for each cell. Zip up the
resulting cms files into a jobname.zip file and set it as the structure
output file to be incorporated.
:type runlist: list of `CellRunInfo`
:param runlist: Each item of runlist will generate a subjob and a single
cell
:type options: `argparse.Namespace`
:param options: The command line options
:type args: iterable
:param args: The command line arguments as passed in by sys
:type log: function
:param log: function(msg) writes msg to log file
:type remote_script: string
:param remote_script: the dir and name for driver to resubmit
:type default_jobname: string
:param default_jobname: Default job name
:type basename: str or None
:param basename: The generic basename defined from inputfile.
"""
if hasattr(options, 'output_basename'):
basename = options.output_basename
elif not basename:
basename = default_jobname
self.multijob_driver(runlist, options, args, basename, log,
remote_script)
[docs] def remove_flag(self, args, flag, and_value=False):
"""
Remove a flag from the comand line flags
:type args: list
:param args: The list of command line arguments
:type flag: str
:param flag: The flag to remove
:type and_value: bool
:param and_value: Also remove the value associated with the flag - it is
assumed that this is the following list item
"""
try:
index = args.index(flag)
except ValueError:
# Flag was not used, so we don't need to do anything
return
if and_value:
del args[index:index + 2]
else:
del args[index]
[docs] def replace_value(self, args, old_value, new_value):
"""
Replace the list item with value=old_value with the new value
:type args: list
:param args: The list of command line arguments
:type old_value: str
:param old_value: The value to replace
:type new_value: str
:param new_value: The value to replace old_value with
"""
try:
index = args.index(old_value)
except ValueError:
return
args[index] = new_value
[docs] def multijob_driver(self, runlist, options, args, basename, log,
remote_script):
"""
Create multiple cells in parallel running a subjob for each cell. Zip
up the resulting cms files into a jobname.zip file and set it as the
structure output file to be incorporated.
:type runlist: list of `CellRunInfo`
:param runlist: Each item of runlist will generate a subjob and a single
cell
:type options: `argparse.Namespace`
:param options: The command line options
:type args: iterable
:param args: The command line arguments as passed in by sys
:type basename: str
:param basename: The zipped .zip or . maegz filename for all job files.
:type log: function
:param log: function(msg) writes msg to log file
:type remote_script: string
:param remote_script: the dir and name for driver to resubmit
"""
log('Setting up job queue')
sub_args = list(args)
jobdj = queue.JobDJ(
verbosity='normal', max_failures=queue.NOLIMIT, max_retries=3)
for host, procs in jobdj._hosts.items():
log('Host: %s, processors:%d' % (host, procs))
# Remove flags we don't want for subjobs
for flag in ['-homogeneous', '-no_disordered_cell']:
self.remove_flag(sub_args, flag)
self.remove_flag(sub_args, '-ncells', and_value=True)
# homogeneous cells should not have composition flags as they only have
# one component
homo_args = sub_args[:]
self.remove_flag(homo_args, self.COMPOSITION_FLAG, and_value=True)
# User might have used the short form (-c)
self.remove_flag(homo_args, self.COMPOSITION_FLAG_SHORT, and_value=True)
# Create a subjob for each cell to create
for runinfo in runlist:
if len(runinfo.structs) == 1:
cmdargs = homo_args[:]
else:
cmdargs = sub_args[:]
if options.seed:
seed = options.seed * runinfo.replicate
self.replace_value(cmdargs, str(options.seed), str(seed))
if runinfo.cru:
self.replace_value(cmdargs, options.repeat_unit, runinfo.cru)
# Set up the input and output file names
if hasattr(options, 'output_basename'):
self.replace_value(cmdargs, options.output_basename,
runinfo.basename)
if hasattr(options, 'title'):
self.replace_value(cmdargs, options.title, runinfo.basename)
subinput = runinfo.basename + '.maegz'
fileutils.force_remove(subinput)
for struct in runinfo.structs:
struct.append(subinput)
self.replace_value(cmdargs, options.input_file, subinput)
cmd = ['run', remote_script] + cmdargs
rjob = jaguarworkflows.RobustSubmissionJob(cmd)
jobdj.addJob(rjob)
log('Number of jobs to run: %d' % jobdj.total_added)
# Run all jobs and make sure we grab their output
expect_exts = ['.maegz' if options.no_system else '.cms']
st_files = run_jobdj_and_add_files(jobdj, log, expect_exts=expect_exts)
backend = jobcontrol.get_backend()
if options.no_system:
# Read all the output structures and compile them into one file
outname = basename + AMCELL_NO_SYSTEM_OUT
writer = structure.StructureWriter(outname)
struct_titles = {}
for struct in structure.MultiFileStructureReader(st_files):
# same component of different structures needs different titles
# in PT
if struct.title not in struct_titles:
struct_titles[struct.title] = 0
else:
struct_titles[struct.title] += 1
struct.title = struct.title + '_' + str(
struct_titles[struct.title])
writer.append(struct)
writer.close()
if backend:
backend.addOutputFile(outname)
backend.setStructureOutputFile(outname)
else:
# Desmond CMS files need to be incorporated into a zip file.
# Zip up all the cms files and incorporate them all
if backend:
jobname = backend.getJob().Name
else:
jobname = basename
outname = jobname + '.zip'
zip_and_set_incorporation(outname, st_files)
log('Output compiled in %s' % outname)
[docs]def get_jobname(default_jobname):
"""
Return a the jobname from backend, command line (-NAMEJOB / environment), DEFAULT_JOBNAME
:type default_jobname: str
:param default_jobname: default_jobname of the current module
:rtype: string
:return: Jobname
"""
assert default_jobname
return jobcontrol.get_jobname(default_jobname) or default_jobname
[docs]def get_procs():
"""
Get number of processors from backend or command-line arguments.
:rtype: int
:return: Number of processors
"""
backend = jobcontrol.get_backend()
if backend:
if jobcontrol.get_backend_host_list():
return jobcontrol.get_backend_host_list()[0][1] or 1
else:
return 1
else:
if jobcontrol.get_command_line_host_list():
return jobcontrol.get_command_line_host_list()[0][1] or 1
else:
return 1
return 1
[docs]def memory_usage_psutil():
"""
return the memory usage in MB
:rtype: float
:rparam: memory usage in MB
"""
process = psutil.Process(os.getpid())
mem = process.memory_info()[0] / float(2**20)
return mem
[docs]def get_size_of(an_object, size_unit='megabytes'):
"""
Return the size of an object in size_unit. The object can be any type of object.
All built-in objects will return correct results, but this does not have to
hold true for third-party extensions as it is implementation specific.
:param an_object:
:type an_object: any type of python object
:return: the size of an object in size_unit
:rtype: float
"""
unit_to_power = {'bytes': 0, 'kilobytes': 1, 'megabytes': 2, 'gigabytes': 3}
return sys.getsizeof(an_object) / 1024**unit_to_power[size_unit]
[docs]def get_jobhosts():
"""
Return the job hosts from backend or command line.
:rtype: None or list of tuple
:rparam: the hosts or None
"""
hosts = jobcontrol.get_backend_host_list()
if not hosts:
hosts = jobcontrol.get_command_line_host_list()
return hosts
[docs]def get_jobhost():
"""
Return the first job hosts from backend or command line.
:rtype: list or None
:rparam: the first host or None
"""
hosts = get_jobhosts()
if not hosts:
return
host = hosts[0]
if host[1] is not None:
return list(host)
return [host[0], 1]
[docs]def get_jobhost_name():
"""
Return the job host name from backend or command line.
:rtype: str or None
:rparam: the host name
"""
hosts = get_jobhost()
return hosts[0] if hosts else None
[docs]def is_hostname_known(hostname):
"""
Check whether hostname is defined in the host file.
:type hostname: str
:param hostname: the hostname to check against
:rtype: bool
:rparam: True, if the hostname is defined in the host file
"""
hosts_list = jobcontrol.get_hosts()
host_names = {h.name for h in hosts_list}
return hostname in host_names
[docs]def is_jobhost_gpu_available():
"""
Check whether the gpu is available on the host. First check SUBHOST, if defined.
Then, check HOST, if defined. At last, check localhost.
:rtype: bool
:rparam: True means gpu is available.
"""
hostname = get_jobhost_name()
if hostname in [None, 'localhost']:
# The calculation runs on local
return gpgpu.is_any_gpu_available()
# This checks the gpu availability for SUBHOST, if defined.
# This checks the gpu availability for HOST, if SUBHOST not defined
# and HOST defined.
host = jobcontrol.get_host(hostname)
# Define the GPU availability based on gpgpu (SUPPORT-128375)
return bool(host.gpgpu)
[docs]def add_zipfile_to_backend(adir):
"""
Add a zip file of the given directory to the job backend.
:type adir: str
:param adir: the directory
"""
zip_file = f'{adir}.zip'
zip_directory(adir, fileobj=zip_file)
backend = None
add_outfile_to_backend(zip_file, backend)
[docs]def get_backend_first_host():
"""
Get backend first host.
:rtype: str, int
:return: host, number of processors
"""
assert jobcontrol.get_backend()
hostlist = jobcontrol.get_backend_host_list()
if hostlist:
return hostlist[0]
else:
return ('localhost', 1)
[docs]def write_cms_with_wam(cms_model, filename, wam_type):
"""
Write the cms model to a file with the provided WAM property
:param `cms.Cms` cms_model: The cms model to write to file
:param str filename: The cms path
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer:
writer.append(cms_model._raw_fsys_ct)
for ct in cms_model.comp_ct:
ct.append(filename, "CMS")
[docs]def add_wam_to_cms(filename, wam_type):
"""
Rewrite the cms with the WAM type added
:param str filename: The cms path
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
cms_model = cms.Cms(filename)
write_cms_with_wam(cms_model, filename, wam_type)
[docs]def write_mae_with_wam(struct, filename, wam_type=None):
"""
:param `structure.Structure` struct: The structure to write to file
:param str filename: The mae path
:param int wam_type: One of the enums defined in workflow_action_menu.h
:raise ValueError: If the file path is not for a Maestro file
"""
if fileutils.get_structure_file_format(filename) != structure.MAESTRO:
raise ValueError(f"{filename} is not a Maestro file path.")
with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer:
writer.append(struct)
[docs]def set_structure_wam(struct, wam_type):
"""
Sets the WAM property for the passed structure
:param `structure.Structure` struct: The structure to set the WAM for
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
wam.set_workflow_action_menu(struct, wam_type)
[docs]def set_smart_distribution_from_environ(jobq):
"""
Set smart distribution of the queue to on/off based on the environment
variable.
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The JobDJ object
"""
enable_sd = os.environ.get('SCHRODINGER_MATSCI_REUSE_DRIVER_HOST')
if enable_sd is None:
return
try:
state = msutils.setting_to_bool(str(enable_sd))
except ValueError: # Invalid string
pass
else:
jobq.smart_distribution = state
[docs]def add_tpp_parser_argument(parser):
"""
Add a `FLAG_TPP` argument to the parser
:type parser: `argparse.ArgumentParser`
:param parser: The parser to add this argument to
"""
parser.add_argument(
FLAG_TPP,
type=parserutils.type_positive_int,
default=DEFAULT_TPP,
help='Specify the number '
'of threads to use for parallelizing each subjob that supports '
'threading.')