"""
Equilibrium molecular dynamics classes/function shared by transport
property calculations using Einstein and Green-Kubo methods.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributor: Teng Zhang
import argparse
import glob
import itertools
import math
import os
import numpy
import random
import copy
import shutil
from collections.abc import Iterable
from scipy import constants as scipy_const
from types import SimpleNamespace
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci.nano import xtal
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import textlogger
from schrodinger.job import queue as jobdj
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
PROGRAM_NAME_DIFFUSION = 'Diffusion Coefficient'
DEFAULT_JOBNAME_DIFFUSION = 'diffusion_coefficient'
PROGRAM_NAME_VISCOSITY = 'Viscosity'
DEFAULT_JOBNAME_VISCOSITY = 'viscosity'
CUBIC_SPLINE = 'cubic spline'
CUMTRAPZ = 'cumtrapz'
PICOSECOND = 1.e-12
FEMTO2PICO = 0.001
NANO2PICO = 1000.0
BAR2PASCAL = 100000
PAS2CP = 1000.
TAU_START = 'Tau_Start_(ns)'
TAU_END = 'Tau_End_(ns)'
DIFFUSIVITY_UNIT = "m^2/s"
DIFFUSION_UNIT_CONVERT = scipy_const.angstrom**2 / PICOSECOND
MATSCI_DIFFUSION_COEFFICIENT = 'r_matsci_Diffusion_Coefficient'
MATSCI_DIFFUSION_STR_BASE = 's_matsci_Diffusion_Coefficient'
DIFFUSIVITY_TAU_START_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_{TAU_START}'
DIFFUSIVITY_TAU_END_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_{TAU_END}'
DIFFUSIVITY_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_1D_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_1D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_2D_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_2D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_3D_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_3D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_X_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_X_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_Y_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_Y_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_Z_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_Z_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_%s_R2'
DIFFUSIVITY_1D_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_1D_%s_R2'
DIFFUSIVITY_2D_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_2D_%s_R2'
DIFFUSIVITY_3D_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_3D_%s_R2'
DIFFUSIVITY_X_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_X_%s_R2'
DIFFUSIVITY_Y_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_Y_%s_R2'
DIFFUSIVITY_Z_R2_PROP = MATSCI_DIFFUSION_COEFFICIENT + '_Z_%s_R2'
VISCOSITY_UNIT = 'cP'
VISCOSITY_UNIT_CONVERT = BAR2PASCAL**2 * PICOSECOND * scipy_const.angstrom**3 * PAS2CP
VISCOSITY_BASE_PROP = 'r_matsci_%s_Viscosity_%s_'
VISCOSITY_PROP = VISCOSITY_BASE_PROP + f'({VISCOSITY_UNIT})'
VISCOSITY_TAU_START_PROP = VISCOSITY_BASE_PROP + TAU_START
VISCOSITY_TAU_END_PROP = VISCOSITY_BASE_PROP + TAU_END
MDSIM = 'mdsim'
PLUGIN = 'plugin'
REMOVE_COM_MOTION = 'remove_com_motion'
FIRST = 'first'
START = 'start'
DOT = '_dot_'
TRAJECTORY = 'trajectory'
INTERVAL = 'interval'
OPTIONS = 'options'
TRAJECTORY_INTERVAL = DOT.join([TRAJECTORY, INTERVAL])
MDSIM_PLUGIN_REMOVE_COM_MOTION = f"{MDSIM}.{PLUGIN}.{REMOVE_COM_MOTION}"
MDSIM_PLUGIN_REMOVE_COM_MOTION_FIRST = f"{MDSIM_PLUGIN_REMOVE_COM_MOTION}.{FIRST}"
MDSIM_PLUGIN_REMOVE_COM_MOTION_INTERVAL = f"{MDSIM_PLUGIN_REMOVE_COM_MOTION}.{INTERVAL}"
INPUT_CMS = 'input_cms'
TIME = 'time'
TIMESTEP = 'timestep'
ENSEMBLE = 'ensemble'
TEMPERATURE = 'temperature'
PRESSURE = 'pressure'
RANDOM_SEED = 'random_seed'
PROCS = 'procs'
GPU = 'gpu'
MD_OPTIONS_TYPES = {
INPUT_CMS: str,
TIME: float,
TIMESTEP: float,
ENSEMBLE: str,
TEMPERATURE: float,
PRESSURE: float,
TRAJECTORY_INTERVAL: float,
RANDOM_SEED: float,
PROCS: int,
GPU: bool
}
ENERGY_GROUPS = 'energy_groups'
ENEGRP_OPTIONS_DEFAULTS = {INTERVAL: None, START: None, ENERGY_GROUPS: None}
NONE_TYPE = type(None)
ENEGRP_OPTIONS_TYPES = {
INTERVAL: (float, NONE_TYPE),
START: (float, NONE_TYPE),
ENERGY_GROUPS: (list, NONE_TYPE)
}
MSJ = 'msj'
ENE = 'ene'
MULTISIM_LOG = 'multisim_log'
OCMS = 'ocms'
LOG = 'log'
TRJ = 'trj'
CFG = 'cfg'
WRITE_VELOCITY = 'write_velocity'
ENEGRP = 'enegrp'
LAST = 'last'
VISCOSITY_FILE = 'viscosity_file'
AUTOCORRELATION_FILE = 'autocorrelation_file'
MSD_FILE = 'MSD_file'
ZIP_OUTPUTS = 'zip_outputs'
SAVE_OPTIONS_DEFAULTS = {
MSJ: True,
ENE: True,
MULTISIM_LOG: True,
LOG: True,
TRJ: False,
WRITE_VELOCITY: False,
OCMS: True,
VISCOSITY_FILE: False,
ZIP_OUTPUTS: False,
AUTOCORRELATION_FILE: False,
LAST: True,
CFG: True
}
TRJ_EXT = TRJ
INPUT_CMS = 'input_cms'
FLAG_ASL = '-asl'
FLAG_SMILES = '-smiles_pattern'
FLAG_ENSEMBLE = jobutils.FLAG_MD_ENSEMBLE
NVE = desmondutils.ENSEMBLE_NVE
NVT = desmondutils.ENSEMBLE_NVT
NPT = desmondutils.ENSEMBLE_NPT
ENSEMBLES = [NVE, NVT, NPT]
FLAG_MD_TEMP = jobutils.FLAG_MD_TEMP
FLAG_MD_PRESSURE = jobutils.FLAG_MD_PRESS
FLAG_MD_TIME = jobutils.FLAG_MD_TIME
FLAG_MD_TIMESTEP = jobutils.FLAG_MD_TIMESTEP
FLAG_MD_SAVE_TRJ = '-md_save_trj'
FLAG_RUN_TYPE = '-run_type'
FULL_RUN = 'full'
POST_RUN = 'post'
MULTI_RUN = 'multi'
FIT_ONLY = 'fit'
ALL_RUN_TYPE = [FULL_RUN, POST_RUN, MULTI_RUN, FIT_ONLY]
FLAG_BASE_FILENAME = '-base_filename'
FLAG_DATA_LAST_FRAC = '-data_last_frac'
FLAG_DATA_START = '-data_start'
FLAG_TRJ_FRAME_NUM = '-trj_frame_num'
FLAG_TAU_START = '-tau_start'
FLAG_TAU_END = '-tau_end'
FLAG_TRJ_FOLDER = '-trj_folder'
BASE_FILENAME = 'BASE_FILENAME'
FLAG_ENEGRP_DAT = '-enegrp_dat'
FLAG_ENEGRP_START = '-md_enegrp_start'
FLAG_MD_SAVE_ENEGRP = '-md_save_enegrp'
FLAG_GPU = jobutils.FLAG_GPU
FLAG_BLOCK_NUM = '-block_num'
FLAG_VARIATION = '-variation'
STD_FILE_EXT = '_block_std'
FLAG_MD_SIM_NUM = '-md_sim_num'
FLAG_INPUT_SEARCH = '-search_inputs'
MIN_TIME_NS = 0.01
MAX_TIMESTEP_FS = 100
MIN_TIMESTEP_FS = 0.01
MAX_TRJ_FRAME_START = 1000000
ARG_DEFAULTS = {
FLAG_MD_PRESSURE: 1.01325,
FLAG_MD_TEMP: 300.0,
FLAG_MD_TIME: 1.0,
FLAG_MD_TIMESTEP: 1.0,
jobutils.FLAG_MD_TRJ_INT: 1.0,
FLAG_DATA_START: 10,
FLAG_DATA_LAST_FRAC: 0.8,
FLAG_TAU_START: 0.01,
FLAG_TAU_END: 0.99,
FLAG_ASL: 'all',
FLAG_BLOCK_NUM: 1,
FLAG_VARIATION: 0.10,
jobutils.FLAG_MD_ENEGRP_INT: 0.01,
FLAG_ENEGRP_START: 0.0,
FLAG_MD_SIM_NUM: 10
}
DATA_LAST_PCT_RANGE = [0.001, 1.]
ALL_MD_FLAGS = [
FLAG_ENSEMBLE, FLAG_MD_TEMP, FLAG_MD_PRESSURE, FLAG_MD_TIME,
FLAG_MD_TIMESTEP, jobutils.FLAG_MD_TRJ_INT, FLAG_MD_SAVE_TRJ,
jobutils.FLAG_RANDOM_SEED
]
MAX_TIME_NS = 10000.
MIN_DATA_POINT_NUM = 50
# Stub left to avoid removing a public function from a public module
memory_usage_psutil = jobutils.memory_usage_psutil
[docs]def get_core_num():
"""
Return the number of the cores specified by users. None means not specified.
:return: the number of cores specified by users.
:rtype: int or None
"""
toplvl_host_args = os.environ.get('TOPLEVEL_HOST_ARGS')
try:
host_args = toplvl_host_args.split('-HOST')[1]
except (IndexError, AttributeError):
return None
try:
# Use all CPUs for one multisim job (see PANEL-7891 comment)
return int(host_args.split(":")[-1])
except (IndexError, ValueError):
# Users pass -HOST localhost instead of -HOST localhost:1
return None
[docs]def get_reserves_cores_state(options):
"""
Return the state of reserves cores so that some python standard library
functions can have access to all CPU resources.
:param check_runtype: If true, the FLAG_RUN_TYPE in the options is checked
:type check_runtype: bool
:return: True, if reserves_cores_state should be set as True to allow some
python function to utilize multiple CPUs.
:rtype: bool
"""
if jobutils.get_option(options, FLAG_RUN_TYPE) == FULL_RUN:
return False
# If core num is specified by users explicitly, reserves_cores of True allows
# users to access all CPUs in some python standard library functions
return bool(get_core_num())
[docs]def clearProps(cms_model, str_in_key='Diffusion'):
"""
Clear the diffusion related properties and should be called before new
calculations.
:param cms_model: A cms model to clear the properties
:type cms_model: 'schrodinger.application.desmond.cms.Cms'
"""
props = [x for x in cms_model.property.keys() if str_in_key in x]
for prop in props:
desmondutils.delete_cms_property(cms_model, prop)
[docs]def get_files(filename, ext):
"""
Get all the files with same extension, if ext is provided.
:type filename: str
:param filename: file to be located and returned
:type ext: str or None
:param ext: extension to search files of the same type
:rtype: list of str
:return: list of filenames with path
"""
if ext:
file_path = os.path.dirname(filename)
return glob.glob(os.path.join(file_path, '*' + ext))
else:
return [filename]
[docs]def flatten_list(list_to_flatten):
"""
Flatten [[a1, a2], [b1,]..] to [a1, a2, b1, ..].
:type list_to_flatten: list
:param list_to_flatten: [[a1, a2], [b1],...]; if [a1, a2, b1,...] is the
input list, a copy will be returned.
:rtype: list
:return: [a1, a2.., b1, b2.., ..]
"""
if list_to_flatten and isinstance(list_to_flatten[0], Iterable):
flat_list = []
for sublist in list_to_flatten:
flat_list += sublist
return flat_list
else:
return list_to_flatten[:]
[docs]def check_finite_molecules(struct, atom_ids=None):
"""
Check whether the molecules in structure are finite.
:param struct: the structure whose molecules will be checked
:type struct: `schrodinger.structure.Structure`
:param atom_ids: molecules sharing atoms with this set will be checked. If
None, all the molecules are checked.
:type atom_ids: set of int, list of int, list of list, or None
:return: message if any molecules are infinite
:rtype: string or None
"""
if atom_ids and not isinstance(atom_ids, set):
atom_ids = set(flatten_list(atom_ids))
for mol in struct.molecule:
atom_indices = mol.getAtomIndices()
shared_indexes = atom_indices if atom_ids is None else atom_ids.intersection(
atom_indices)
if shared_indexes:
if xtal.is_infinite2(struct, atom_indices=atom_indices):
example_ids = sorted(shared_indexes)[:3]
msg = (
f"The selected atoms {', '.join(map(str, example_ids))}... are "
f"in an infinite molecule.")
return msg
if atom_ids:
atom_ids.difference_update(shared_indexes)
if not atom_ids:
return
[docs]def get_md_results(basename,
options,
log,
log_error,
save_opts=None,
smart_distribution=True):
"""
From parsed options, setup and run md simulation.
:param basename: basename for md simulation job
:type basename: str
:param options: The parsed command line option
:type options: Named tuple
:param log: log information
:type log: function
:param log_error: log error information and exit
:type log_error: function
:param smart_distribution: turn on / off the smart_distribution
:type smart_distribution: bool
:rtype: str, str, str
:return: output cms, trj folder, and enegrp_dat
"""
host = jobutils.get_jobhost_name()
jdj = jobdj.JobDJ(
verbosity='normal', max_failures=jobdj.NOLIMIT, hosts=[(host, 1)])
if not smart_distribution:
jdj.disableSmartDistribution()
md_opts = {
INPUT_CMS: options.input_cms,
TIME: options.md_time,
TIMESTEP: options.md_timestep,
ENSEMBLE: options.md_ensemble,
TEMPERATURE: options.md_temp,
PRESSURE: options.md_press,
TRAJECTORY_INTERVAL: options.md_trj_int,
RANDOM_SEED: options.seed,
PROCS: 1,
GPU: True
}
md_opts = SimpleNamespace(**md_opts)
md_job_info = setup_md_job(basename, md_opts, save_opts=save_opts)
jdj.addJob(md_job_info.cmd)
log("Molecular dynamics simulation is running...", pad=True)
jdj.run()
if jdj.total_failed:
log_error("Molecular dynamics simulation failed.")
else:
log("Molecular dynamics simulation finished.")
return md_job_info.ocms_name, md_job_info.trj_folder, md_job_info.enegrp_dat
[docs]def setup_md_job(name_base,
md_opts,
enegrp_opts=None,
save_opts=None,
command_dir=None):
"""
Write the .msj file, form the command, figure out the output filenames, and
register files to backend for molecular dynamics simulations.
Note: if command_dir is not None, please call this method inside the command_dir
as the returned ocms_name doesn't have command_dir in the file path.
:param name_base: str
:type name_base: base name for the md job
:param md_opts: 'type.SimpleNamespace'
:type icms: this data struct has attributes: icms for input cms filename,
time for simulation time in ps, timestep for md timestep in ps,
ensemble for md ensemble (NVE, NVT, etc.), temperature for md temperature
in K, pressure for md pressure in bar, trajectory_dot_interval for time interval
in ps, random_seed for the velocity thermal randomization, procs for the
total number of processors, gpu for the request of GPU resource.
:param enegrp_opts: 'type.SimpleNamespace' or None
:type enegrp_opts: this data struct has attributes: energy_groups for the
list of keywords for desmond energy_groups, interval for output time
interval in ps, start for the output starting time in ps. If None, no
enegrp file is generated.
:param save_opts: 'type.SimpleNamespace' or None
:type save_opts: this data struct has boolean attributes to save files likes:
msj, ene, multisim_log, ocms, log, trj, cfg, write_velocity, enegrp, last,
autocorrelation_file, zip_outputs, viscosity_file. If None, use default
settings to decide the files to be copied back.
:param command_dir: str
:type command_dir: this is the path from the job start dir to the subjob dir.
The setup function is expected to run in the subjob setup stage, and thus
the subjob dir should be passed. If there is only one master job (no sub
dir), current dir is used. In addition, the returned file paths are the
the basename, and then users need to form the full file path when they
decide to run in subfolder or master folder directly.
:rtype: 'type.SimpleNamespace'
:return: data struct with cmd command to submit md, output cms, trj folder,
enegrp file, jobname
"""
# Validation of the input params
assert isinstance(md_opts, SimpleNamespace)
for md_key, md_value_type in MD_OPTIONS_TYPES.items():
if not hasattr(md_opts, md_key):
raise ValueError(f"md_opts doesn't have {md_key} as an attribute.")
md_value = getattr(md_opts, md_key)
isinstance(md_value, md_value_type)
if enegrp_opts is None:
if save_opts is None:
save_opts = SimpleNamespace()
setattr(save_opts, ENEGRP, False)
else:
assert isinstance(enegrp_opts, SimpleNamespace)
for enegrp_key, enegrp_value in ENEGRP_OPTIONS_DEFAULTS.items():
if not hasattr(enegrp_opts, enegrp_key):
setattr(enegrp_opts, enegrp_key, enegrp_value)
for enegrp_key, enegrp_value_type in ENEGRP_OPTIONS_TYPES.items():
enegrp_value = getattr(enegrp_opts, enegrp_key)
if not isinstance(enegrp_value, enegrp_value_type):
raise TypeError(
f"{enegrp_value} as {enegrp_key} is not of {enegrp_value_type}."
)
if enegrp_value is None:
# None means auto-setting: set it according to MD trj settings
trj_attr = DOT.join([TRAJECTORY, enegrp_key])
md_value = getattr(md_opts, trj_attr, None)
if md_value is not None:
setattr(enegrp_opts, enegrp_key, md_value)
if save_opts is not None:
assert isinstance(save_opts, SimpleNamespace)
# Additional unused attributes for certain driver can be added here.
for save_key, save_value in SAVE_OPTIONS_DEFAULTS.items():
if not hasattr(save_opts, save_key):
setattr(save_opts, save_key, save_value)
save_value = getattr(save_opts, save_key)
assert isinstance(save_value, bool)
# Filenames are based on jobnames
jobname = name_base + '_md'
msj_name = jobname + '.msj'
multisim_log_file = jobname + '_multisim.log'
ocms_name = jobname + '-out.cms'
enegrp_dat = jobname + '_enegrp.dat'
trj_folder = jobname + '_trj'
log_file = jobname + '.log'
ene_file = jobname + '.ene'
cfg_file = jobname + '-out.cfg'
backend = jobcontrol.get_backend()
if backend:
# Register output files
outfiles_status = {
msj_name: save_opts.msj,
multisim_log_file: save_opts.multisim_log,
ocms_name: save_opts.ocms,
log_file: save_opts.log,
ene_file: save_opts.ene,
cfg_file: save_opts.cfg,
trj_folder: save_opts.trj,
enegrp_dat: save_opts.enegrp
}
for output_file, to_save in outfiles_status.items():
if not to_save:
continue
if command_dir:
output_file = os.path.join(command_dir, output_file)
backend.addOutputFile(output_file)
early_step = [md_opts.timestep, md_opts.timestep, md_opts.timestep * 3]
# Every timestep, zero momentum
backend_plugin = "{"
backend_plugin += f" {MDSIM_PLUGIN_REMOVE_COM_MOTION_FIRST} = 0"
backend_plugin += f" {MDSIM_PLUGIN_REMOVE_COM_MOTION_INTERVAL} = 0"
if enegrp_opts is not None:
if enegrp_opts.start:
backend_plugin += f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{FIRST} = %s" % enegrp_opts.start
if enegrp_opts.interval:
backend_plugin += (
f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{INTERVAL} = %s" %
enegrp_opts.interval)
if enegrp_opts.energy_groups:
backend_plugin += (
f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{OPTIONS} = [%s]" %
' '.join(enegrp_opts.energy_groups))
backend_plugin += "}"
stage = desmondutils.MDMSJStringer(
time=md_opts.time,
temp=md_opts.temperature,
pressure=md_opts.pressure,
timestep=early_step,
ensemble=md_opts.ensemble,
random_seed=md_opts.random_seed,
backend=backend_plugin,
trajectory_dot_interval=md_opts.trajectory_dot_interval,
trajectory_dot_write_velocity='true' \
if save_opts.write_velocity else 'false',
energy_group='true' if enegrp_opts else 'false',
last=save_opts.last)
system = cms.Cms(md_opts.input_cms)
desmondutils.create_msj([stage], filename=msj_name, check_cg=system)
cmd = desmondutils.get_multisim_command(
md_opts.input_cms, ocms_name, msj_name=msj_name, job_name=jobname)
job_info = SimpleNamespace(
cmd=cmd,
ocms_name=ocms_name,
trj_folder=trj_folder,
enegrp_dat=enegrp_dat,
jobname=jobname)
return job_info
[docs]def trim_parser_skip_md(options, extra=None):
"""
If md is skipped, delete any options for setting up molecular dynamics simulation.
:param options: Named tuple
:type options: Parsed command options
:param extra: None or list of str
:type extra: extra flags to be deleted
:rtype: options: Named tuple
:return: Parsed command options with no md flags
"""
if extra is None:
extra = []
for key in ALL_MD_FLAGS + extra:
delattr(options, key[1:])
return options
[docs]def add_md_basic_argument(parser):
"""
:param parser: The parser to add additional options
:type parser: `argparse.ArgumentParser`
:rtype: 'argparse._ArgumentGroup'
:return: argparse ArgumentGroup with MD setting options
Add necessary parser arguments for setting up a MD simulation.
"""
md_basic = parser.add_argument_group('MD Basic Setting')
md_basic.add_argument(
INPUT_CMS,
metavar='DESMOND_CMS_FILE',
type=parserutils.type_cms_file,
help='Specify the Desmond output *cms file on which to '
'run molecular dynamics simulations.')
md_basic.add_argument(
FLAG_ENSEMBLE,
metavar='MD_ENSEMBLE',
default=NVE,
choices=ENSEMBLES,
help="Molecular dynamics ensemble (%s) for data collection." %
", ".join(ENSEMBLES))
md_basic.add_argument(
FLAG_MD_TEMP,
metavar='KELVIN',
type=parserutils.type_positive_float,
default=ARG_DEFAULTS[FLAG_MD_TEMP],
help="Temperature (K) of molecular dynamics simulations.")
md_basic.add_argument(
FLAG_MD_PRESSURE,
metavar='BAR',
type=float,
default=ARG_DEFAULTS[FLAG_MD_PRESSURE],
help="Pressure (bar) of NPT molecular dynamics simulations.")
md_basic.add_argument(
FLAG_MD_TIME,
metavar='NANOSECOND',
type=parserutils.type_positive_float,
default=ARG_DEFAULTS[FLAG_MD_TIME],
help="Time (ns) of molecular dynamics simulations.")
md_basic.add_argument(
FLAG_MD_TIMESTEP,
metavar='MD_TIMESTEP',
type=parserutils.type_positive_float,
default=ARG_DEFAULTS[FLAG_MD_TIMESTEP],
help="Timestep (fs) of molecular dynamics simulations.")
# MATSCI - 9064: remove cpu desmond support
md_basic.add_argument(
FLAG_GPU, action="store_true", default=True, help=argparse.SUPPRESS)
jobutils.add_random_seed_parser_argument(md_basic)
return md_basic
[docs]def add_md_output_argument(parser, extra_options):
"""
Add parser arguments to record output information for MD simulations.
:param parser: The parser to add additional options
:type parser: `argparse.ArgumentParser`
:param extra_options: list of str
:type extra_options: extra options can be added according to these keywords
:rtype: 'argparse._ArgumentGroup'
:return: argparse ArgumentGroup with MD output recording information
"""
md_output = parser.add_argument_group('MD Output Setting')
md_output.add_argument(
jobutils.FLAG_MD_TRJ_INT,
metavar='PICOSECONDS',
type=parserutils.type_nonnegative_float,
default=ARG_DEFAULTS[jobutils.FLAG_MD_TRJ_INT],
help="Trajectories will be recorded every FLAG_MD_TRJ_INT (ps) "
"during the molecular dynamics simulations.")
if FLAG_MD_SIM_NUM in extra_options:
# Replica runs for ensemble average may skip the saving of *-out.cms
jobutils.add_save_desmond_files_parser_argument(md_output)
else:
md_output.add_argument(
FLAG_MD_SAVE_TRJ,
action='store_true',
help="Save the trajectory from the molecular dynamics simulation.")
# EnergyGroup-based analysis
if FLAG_ENEGRP_DAT in extra_options:
md_output.add_argument(
FLAG_ENEGRP_DAT,
metavar='DESMOND_ENEGRP_DAT',
help=(
"Please provide energy group file with pressure tensor recorded "
"if molecular dynamics simulation is skipped."))
md_output.add_argument(
jobutils.FLAG_MD_ENEGRP_INT,
metavar='PICOSECONDS',
type=parserutils.type_nonnegative_float,
default=ARG_DEFAULTS[jobutils.FLAG_MD_ENEGRP_INT],
help="Pressure tensor will be recorded every FLAG_MD_ENEGRP_INT (ps) "
"during the molecular dynamics simulations.")
md_output.add_argument(
FLAG_MD_SAVE_ENEGRP,
action="store_true",
help="Pressure tensor outputs (*_enegrp.dat) from molecular dynamics "
"simulations will be saved ")
if FLAG_MD_SIM_NUM in extra_options:
# The molecular simulation before this time is used to reach
# equilibrium state (no enegrp output)
md_output.add_argument(
FLAG_ENEGRP_START,
metavar='PICOSECONDS',
type=parserutils.type_nonnegative_float,
default=ARG_DEFAULTS[FLAG_ENEGRP_START],
help="Start to record pressure tensor when simulation time "
"reaches ENEGRP_START (ps).")
# FLAG_TRJ_FOLDER provides trj input, indicating this is a trajectory analysis
if FLAG_TRJ_FOLDER in extra_options:
md_output.add_argument(
FLAG_TRJ_FOLDER,
metavar='TRJ_FOLDER',
help="Analyze this trajectory, if %s %s" % (FLAG_RUN_TYPE,
POST_RUN))
return md_output
[docs]def add_analysis_argument(parser, extra_options):
"""
Add parser arguments for analysis.
:param parser: The parser to add additional options
:type parser: `argparse.ArgumentParser`
:param extra_options: list of str
:type extra_options: extra options can be added according to these keywords
:rtype: 'argparse._ArgumentGroup'
:return: argparse ArgumentGroup with analysis options
"""
analysis_parameter = parser.add_argument_group('Analysis Parameter')
if FLAG_DATA_LAST_FRAC in extra_options:
analysis_parameter.add_argument(
FLAG_DATA_LAST_FRAC,
action="store",
metavar='FRACTION',
default=ARG_DEFAULTS[FLAG_DATA_LAST_FRAC],
type=lambda x: parserutils.type_ranged_num(x, top=DATA_LAST_PCT_RANGE[1],
bottom=DATA_LAST_PCT_RANGE[0],
top_allowed=True,
bottom_allowed=False),
help="The last this fraction of the molecular dynamics data will be"
" used for analysis.")
else:
analysis_parameter.add_argument(
FLAG_DATA_START,
action="store",
metavar='DATA_NUMBER',
default=int(ARG_DEFAULTS[FLAG_DATA_START]),
type=parserutils.type_nonnegative_int,
help=
"DATA_NUMBER at the beginning of a molecular dynamics data will be"
" excluded from analysis.")
# ASL/SMILE selection based analysis
if FLAG_ASL in extra_options:
analysis_parameter.add_argument(
FLAG_ASL,
action="store",
metavar='ASL',
default=ARG_DEFAULTS[FLAG_ASL],
help="ASL string to define atom selection.")
if FLAG_SMILES in extra_options:
analysis_parameter.add_argument(
FLAG_SMILES,
action="store",
metavar='SMILES',
help="Only analyze molecules matching this SMILES string")
# FLAG_TRJ_FOLDER indicate this is a trajectory analysis
if FLAG_TRJ_FOLDER in extra_options:
analysis_parameter.add_argument(
FLAG_TRJ_FRAME_NUM,
action="store",
metavar='TRJ_NUMBER',
type=parserutils.type_positive_int,
help="Read this number of trajectory frames or until the end.")
# The following should be deprecated
if FLAG_BLOCK_NUM in extra_options:
analysis_parameter.add_argument(
FLAG_BLOCK_NUM,
action="store",
metavar='BLOCK_NUM',
default=ARG_DEFAULTS[FLAG_BLOCK_NUM],
type=parserutils.type_positive_int,
help=
"Data from original single simulation will be divided into BLOCK_NUM"
" short successive blocks, and each block will be analyzed independently"
" to calculate standard deviation for coefficient of variation check"
)
if FLAG_BLOCK_NUM in extra_options or FLAG_MD_SIM_NUM in extra_options:
analysis_parameter.add_argument(
FLAG_VARIATION,
action="store",
metavar='VARIATION',
type=parserutils.type_positive_float,
help="If provided, the tau_end is adjusted according to VARIATION "
"(standard deviation over mean) so that Tau region has variation "
"less than VARIATION.")
return analysis_parameter
[docs]def add_parser_opts(parser,
final_prop,
mid_prop,
use_tau=True,
extra_options=None):
"""
Add all necessary parser arguments for setting up a MD simulation
and optional arguments for certain green-kubo property calculation.
:param parser: The parser for error information
:type parser: `argparse.ArgumentParser`
:param final_prop: str
:type final_prop: the final property needs to be calculated
:param mid_prop: str
:type mid_prop: the property needs to be calculated before the final prop
:param use_tau: bool
:type use_tau: Tau limits are added, if True
:param extra_options: None or list of str
:type extra_options: extra options can be added according to these keywords
:rtype: 'argparse._ArgumentGroup', 'argparse._ArgumentGroup', 'argparse._ArgumentGroup'
:return: argparse ArgumentGroups with setting options
"""
if extra_options is None:
extra_options = []
md_basic = add_md_basic_argument(parser)
md_output = add_md_output_argument(parser, extra_options)
analysis_parameter = add_analysis_argument(parser, extra_options)
if FLAG_MD_SIM_NUM in extra_options:
analysis_indep_run = parser.add_argument_group('Independent Run')
parser.add_argument(
FLAG_RUN_TYPE,
action='store',
metavar='RUN_TYPE',
default=FULL_RUN,
help=
("Define the type of this calculation. %s: run molecular dynamics "
"simulations and post analysis; %s: skip the molecular dynamics "
"simulations and perform post-analysis; %s: post-analysis only includes "
"multi-run average and curve fitting; %s: only perform curve fitting on "
"the averaged data." % (FULL_RUN, POST_RUN, MULTI_RUN, FIT_ONLY)))
# Correlation-based analysis
if use_tau:
analysis_parameter.add_argument(
FLAG_TAU_START,
action="store",
metavar='NANOSECOND',
default=ARG_DEFAULTS[FLAG_TAU_START],
type=parserutils.type_nonnegative_float,
help=
"%s data before MD_TAU_START (ns) are excluded for %s calculation."
% (mid_prop.capitalize(), final_prop))
analysis_parameter.add_argument(
FLAG_TAU_END,
action="store",
metavar='NANOSECOND',
type=parserutils.type_positive_float,
default=MAX_TIME_NS,
help="%s data after MD_TAU_END (ns) are excluded for %s calculation."
% (mid_prop.capitalize(), final_prop))
# Independent runs for ensemble average
if FLAG_MD_SIM_NUM in extra_options:
analysis_indep_run.add_argument(
FLAG_MD_SIM_NUM,
action='store',
metavar='MD_SIM_NUM',
type=parserutils.type_positive_int,
default=ARG_DEFAULTS[FLAG_MD_SIM_NUM],
help=
"MD_SIM_NUM numbers of independent molecular dynamics simulations each"
" followed by post-analysis are fired off under jobcontrol.")
analysis_indep_run.add_argument(
FLAG_INPUT_SEARCH,
action='store_true',
help=
"When %s %s or %s %s, this flag allows users to search the folder "
"containing the user-defined file so that multiple files with the "
"same extension from previous molecular dynamics simulations "
"or post-analyses are used as inputs for this calculation." %
(FLAG_RUN_TYPE, POST_RUN, FLAG_RUN_TYPE, MULTI_RUN))
return md_basic, md_output, analysis_parameter
[docs]def check_options(options,
parser,
option_flag,
min_data_point,
check_tau=True,
check_gpu=False):
"""
Validate the options.
:type options: Named tuple
:param options: The parsed command line options
:type parser: `argparser.ArgumentParser`
:param parser: parser to log error information
:type option_flag: str
:param option_flag: time interval is defined by this flag
:type min_data_point: int
:param min_data_point: minimum required data point
:type check_tau: bool
:param check_tau: validate tau range against available time range
:type check_gpu: bool
:param check_gpu: check GPU/CPU resource availability. For a remote host,
-gpu flag requires a GPU host, and no -gpu flag requires a CPU HOST.
For localhost and no job control, only check whether GPU resource is
available when -gpu flag exists.
"""
input_cms = fileutils.get_existing_filepath(options.input_cms)
if not input_cms:
parser.error(
f"The input cms file does not exist. ({options.input_cms})")
options.input_cms = input_cms
if options.run_type not in ALL_RUN_TYPE:
parser.error("%s must be one of the following %s." %
(FLAG_RUN_TYPE, ', '.join(ALL_RUN_TYPE)))
if check_tau:
options.tau_start *= NANO2PICO
options.tau_end *= NANO2PICO
if options.run_type == FULL_RUN:
# in driver time unit is ps, compatible with Desmond
options.md_time *= NANO2PICO
options.md_timestep *= FEMTO2PICO
if check_tau and options.run_type == FULL_RUN:
check_tau_options(options, parser, option_flag, min_data_point)
if options.run_type == POST_RUN:
if option_flag == jobutils.FLAG_MD_TRJ_INT:
if not options.trj_folder:
import schrodinger.application.desmond.packages.topo as topo
msys_model, cms_model = topo.read_cms(options.input_cms)
options.trj_folder = topo.find_traj_path(
cms_model, base_dir=os.path.dirname(options.input_cms))
if not options.trj_folder:
parser.error(
"Please provide the trajectory folder path. (-trj_folder)")
options.trj_folder = fileutils.get_existing_filepath(
options.trj_folder)
if not options.trj_folder:
parser.error(
"The specified trajectory folder path does not exist. (-trj_folder)"
)
elif option_flag == jobutils.FLAG_MD_ENEGRP_INT:
if not options.enegrp_dat:
parser.error(
"Please provide the enegrp_dat file via %s flag to run "
"post analysis (%s %s)." % (FLAG_ENEGRP_DAT, FLAG_RUN_TYPE,
POST_RUN))
options.enegrp_dat = fileutils.get_existing_filepath(
options.enegrp_dat)
if not options.enegrp_dat:
parser.error(
"The specified enegrp file does not exist. (-enegrp_dat)")
if check_gpu:
hostname = jobutils.get_jobhost_name()
is_gpu_avail = jobutils.is_jobhost_gpu_available()
if hostname in [None, 'localhost']:
if options.gpu and not is_gpu_avail:
parser.error(f"Please remove {jobutils.FLAG_GPU}, or use a "
f"GPU host via -HOST.")
elif jobutils.is_hostname_known(
hostname) and options.gpu != is_gpu_avail:
parser.error(f"Please use {jobutils.FLAG_GPU} for a remote "
"GPU host, or remove it for a remote CPU host.")
[docs]def check_tau_options(options, parser, option_flag, min_data_point):
"""
Validate the Tau options.
:type options: Named tuple
:param options: The parsed command line options
:type parser: `argparser.ArgumentParser`
:param parser: parser to log error information
:type option_flag: str
:param option_flag: time interval is defined by this flag
:type min_data_point: int
:param min_data_point: minimum required data point
"""
production_time = options.md_time
# FLAG_ENEGRP_START defines when Desmond starts to output enegrp data
if option_flag == jobutils.FLAG_MD_ENEGRP_INT:
enegrp_start = jobutils.get_option(options, FLAG_ENEGRP_START)
if enegrp_start:
production_time = options.md_time - enegrp_start
time_interval = jobutils.get_option(options, option_flag)
available_data_num = math.floor(production_time / time_interval)
# Either data_start or data_last_frac. See add_analysis_argument()
# Skip data by frame number
data_start = jobutils.get_option(options, FLAG_DATA_START)
if data_start:
available_data_num -= data_start
# Skip data using the last percentage
data_last_frac = jobutils.get_option(options, FLAG_DATA_LAST_FRAC)
if data_last_frac:
available_data_num = math.floor(available_data_num * data_last_frac)
tau_start_frame = math.ceil(options.tau_start / time_interval)
tau_end_frame = math.floor(options.tau_end / time_interval)
tau_data_point_num = min(available_data_num - tau_start_frame,
tau_end_frame - tau_start_frame)
if tau_data_point_num < min_data_point:
msg = (f"Only {int(tau_data_point_num)} Tau data points are found, "
f"but a minimum of {min_data_point} are needed. Please "
f"increase simulation time ({FLAG_MD_TIME}), increase Tau "
f"end ({FLAG_TAU_END}), decrease data record interval "
f"({option_flag}), skip less simulation data "
f"({FLAG_DATA_START} / {FLAG_DATA_LAST_FRAC}), or decrease "
f"Tau start ({FLAG_TAU_START}).")
if option_flag == jobutils.FLAG_MD_ENEGRP_INT and enegrp_start:
msg = msg[:-1] + (
f', or decrease enegrp_start ({FLAG_ENEGRP_START}).')
parser.error(msg)
[docs]class EquilibriumMdBase(object):
"""
This is a base class for equilibrium MD subclass,
and should be not be instantiated directly.
"""
[docs] def __init__(self, input_cms, log=None, log_error=None):
"""
:param input_cms: str
:type input_cms: cms input file
:param log: callable func
:type log: func to log info
:param log_error: callable func
:type log_error: func to log error and exit
"""
import schrodinger.application.desmond.packages.topo as topo
self.input_cms = input_cms
self.log = log
self.log_error = log_error
# The following dictionary is saved in the cms model by savePropAndCms()
self.prop2save = {} # 'r_matsci_xxx' as key and a float as value
(self.msys_model, self.cms_model) = topo.read_cms(self.input_cms)
[docs] def checkSetTimeInterval(self, time_intervals=None):
"""
Time interval must be the same for all trajectery.
:type time_intervals: list or None
:param time_intervals: time intervals between every two frames.
"""
if not time_intervals:
time_intervals = []
pre_time = self.time_list[0]
for cur_time in self.time_list[1:]:
time_intervals.append(cur_time - pre_time)
pre_time = cur_time
time_intervals = numpy.matrix(time_intervals)
self.time_interval = numpy.average(time_intervals)
time_equal_crit = self.time_interval * 0.00001
time_intervals_diff = (
time_intervals - self.time_interval) / time_equal_crit
if time_intervals_diff.round().any():
self.log_error('Different time intervals are found. Aborting...')
[docs] def checkConstantVol(self, allow_npt=True):
"""
Calculate volume and check whether it changes.
:type allow_npt: bool
:param allow_npt: allow volume to change, if True
"""
volumes = numpy.matrix(self.vol_list)
self.volume = numpy.average(volumes)
if not allow_npt:
volume_equal_crit = self.volume * 0.00001
volume_diff = (volumes - self.volume) / volume_equal_crit
if volume_diff.round().any():
self.log_error(
'The volume of the system changes, but constant volume'
'is required for viscosity calculation. Aborting...')
[docs] def calDensity(self):
"""
Caluate the average system density.
"""
# unit kg/m^3
self.density = self.cms_model.total_weight * scipy_const.value(
'atomic mass constant') / (self.volume * scipy_const.angstrom**3)
[docs] def savePropAndCms(self, cms_out=None, wam_type=None):
"""
Save properties to cms model, delete trj information if needed,
and write out cms file.
:param str cms_out: name of the output cms.
:type wam_type: int or None
:param wam_type: One of the enums defined in workflow_action_menu.h if
the results should have a Workflow Action Menu in Maestro
"""
jobutils.set_source_path(self.cms_model)
for key, value in self.prop2save.items():
try:
value = float('%.5g' % value)
except TypeError:
pass
desmondutils.add_cms_property(self.cms_model, key, value)
if not self.save_trj_data:
for prop in [cms.Cms.PROP_CMS, cms.Cms.PROP_TRJ]:
desmondutils.delete_cms_property(self.cms_model, prop)
if cms_out is None:
cms_out = self.outfile
if wam_type:
jobutils.write_cms_with_wam(self.cms_model, cms_out, wam_type)
else:
self.cms_model.write(cms_out)
[docs]class TrjBase(EquilibriumMdBase):
"""
This is a base class for equilibrium MD subclass that uses trajectory,
and should be not be instantiated directly.
"""
[docs] def readTrj(self, make_whole=False):
"""
Read trj frames from trj folder.
:type make_whole: bool
:param make_whole: if True, bond across PBCs are fixed so that two bonded
particles won't be on opposite sides of the system
"""
import schrodinger.application.desmond.packages.traj as traj
import schrodinger.application.desmond.packages.topo as topo
# MATSCI-5668: Create a low memory trajectory frame iterator
traj.Source.CACHE_LIMIT_BYTES = 0
self.log('Reading trajectory...', pad=True)
try:
all_trjs = traj.read_traj(str(self.trj_folder))
except (IOError, TypeError) as msg:
self.log_error(
"Trajectory reading failed, error was: %s" % str(msg))
if self.log:
self.log(f"Total trajectory frame number: {len(all_trjs)}")
if hasattr(self, 'trj_frame_num') and self.trj_frame_num:
self.trjs = all_trjs[self.data_start:
self.trj_frame_num + self.data_start]
else:
# Last frame may have different time interval
self.trjs = all_trjs[self.data_start:-1]
if not self.trjs:
# If only one trajectory frame exists, use it instead of skipping
self.trjs = all_trjs[self.data_start:]
if not self.trjs:
self.log_error(
"{0} trajectory frames are found, but after excluding first {1} frames, "
"no trajectory frames are left for further analysis.".format(
len(all_trjs), self.data_start))
self.total_trj_frame_num = len(self.trjs)
if make_whole:
# Fix bonds across PBCs
try:
topo.make_whole(self.msys_model, self.trjs)
except ValueError:
# MATSCI-9443: Equilibrium method doesn't work with dynamic atom selection
self.log_error(
"Please provide a classical molecular dynamics trajectory."
" (the hybrid GCMC/MD simulation is not supported)")
self.log(
"Available trajectory frame number: %i " % self.total_trj_frame_num)
[docs] def trajFrames(self, make_whole=False):
"""
Fix pbc bonds (if needed), yield the trajectory frame, and garbage-collect
to release the occupied memory.
:type make_whole: bool
:param make_whole: if True, bond across PBCs are fixed so that two bonded
particles won't be on opposite sides of the system
:rtype: `schrodinger.application.desmond.packages.traj.frame`
:return: a trajectory frame
"""
import schrodinger.application.desmond.packages.topo as topo
for trj in self.trjs:
if make_whole:
# Fix bonds across PBCs
topo.make_whole(self.msys_model, [trj])
yield trj
# garbage-collect a used trajectory frame
trj._source.clear_cache()
trj._object = None
[docs] def checkTau(self, total_trj_frame_num, min_tau_data_point):
"""
Check whether trj frame number is large enough.
:param total_trj_frame_num: int
:type total_trj_frame_num: number of total data point
:param min_tau_data_point: int
:type min_tau_data_point: minimum requested data
"""
# Find the trj_interval and calulate tau trj frame limit
time = []
top2_frame = itertools.islice(self.trjs, 2)
for frame in top2_frame:
time.append(frame.time)
trj_interval = time[1] - time[0]
self.log("Trajectory time interval: %.5f (ps)" % trj_interval)
self.tau_frame_end = int(math.floor(self.tau_end / trj_interval))
self.tau_frame_start = int(math.ceil(self.tau_start / trj_interval))
trajectory_max_time = total_trj_frame_num * trj_interval
# Set tau_end to be <= max tau, and validate tau_start
# Trj frame for backend, time (ns) for frontend
if self.tau_frame_end > total_trj_frame_num:
self.log(
"-tau_end ({0} ns) exceeds the available trajectory range. "
"Setting -tau_end to the largest possible value {1} ns.".format(
self.tau_end / NANO2PICO, trajectory_max_time / NANO2PICO),
pad=True,
pad_below=True)
self.tau_frame_end = total_trj_frame_num
if self.tau_frame_start >= total_trj_frame_num:
self.log_error(
"-tau_start ({0} ns) is larger than total trajectory time length ({1} ns). "
"Aborting...".format(self.tau_start,
trajectory_max_time / NANO2PICO))
elif self.tau_frame_start >= self.tau_frame_end:
self.log_error(
"-tau_start ({0} ns) is larger than -tau_end ({1} ns). Aborting..."
.format(self.tau_start / NANO2PICO, self.tau_end / NANO2PICO))
elif self.tau_frame_start + min_tau_data_point > self.tau_frame_end:
self.log_error(
"Only {0} trajectory frames available for Tau calculation, "
"but the analysis needs at least {1} Tau data points."
"Please extend the MD simulation, enlarge the Tau range, or "
"increase the trajectory output frequency.".format(
self.tau_frame_end - self.tau_frame_start,
min_tau_data_point))
[docs] def checkVel(self):
"""
Check whether each frame contains velocity information.
"""
for trj in self.trjs:
if trj.vel() is None:
self.log_error(
"Please make sure trajectory folder has "
"velocity information in every trajectory frame.")
[docs]class MixInBlockStd(object):
"""
This is a base class for dividing long simulation into blocks
to get standard deviation, and should be not be instantiated directly.
"""
[docs] def checkBlockTau(self, total_step_num, min_tau_data_point):
"""
Chech whether there are minimum data points between Tau start and end.
:param total_step_num: int
:type total_step_num: number of total data point
:param min_tau_data_point: int
:type min_tau_data_point: minimum requested data
"""
self.log("Time interval: %.5f (ps) \n" % self.time_interval)
self.tau_frame_end = int(math.floor(self.tau_end / self.time_interval))
self.tau_frame_start = int(
math.ceil(self.tau_start / self.time_interval))
total_tau = total_step_num * self.time_interval
step_num = total_step_num // self.block_num
tau_available = step_num * self.time_interval
note = ("(note: %s blocks reduce the total %s ns Tau range to 1/%s)." %
(self.block_num, total_tau / NANO2PICO, self.block_num))
if self.tau_frame_start > step_num:
msg_error = (
"The Tau start (%s ns) is larger than the Tau range (%s ns) for one block."
% (self.tau_start / NANO2PICO,
step_num * self.time_interval / NANO2PICO))
msg_error += note
self.log_error(msg_error)
if self.tau_frame_end > step_num:
msg = (
"Setting the original Tau end (%s ns) to the largest possibile value"
" (%s ns)." % (self.tau_end / NANO2PICO,
tau_available / NANO2PICO))
self.tau_end = tau_available
self.tau_frame_end = step_num
msg += note
self.log(msg, pad=True)
if self.tau_frame_end - self.tau_frame_start < min_tau_data_point:
msg_error = (
"Between Tau start (%s ns) and Tau end (%s ns) only %s data points"
" are available, but a minimum of %s are requested." %
(self.tau_start / NANO2PICO, self.tau_end / NANO2PICO,
self.tau_frame_end - self.tau_frame_start, min_tau_data_point))
self.log_error(msg_error)
[docs]class SingleMDJob(jaguarworkflows.RobustSubmissionJob):
"""
Job class to submit a molecular dynamics simulation.
"""
[docs] def __init__(self,
jobname,
md_opts,
enegrp_opts,
save_opts,
command_dir=os.curdir):
"""
:type jobname: str
:param jobname: The basename for the job
:type md_opts: 'SimpleNamespace'
:param md_opts: molecular dynamics options
:type enegrp_opts: 'SimpleNamespace'
:param enegrp_opts: enegrp options
:type save_opts: 'SimpleNamespace'
:param save_opts: files to save
:type command_dir: str
:param command_dir: the job launch dir
"""
self.jobname = jobname
self.md_opts = md_opts
self.enegrp_opts = enegrp_opts
self.save_opts = save_opts
self.command_dir = command_dir
# _command is set in setup method based on above options
self._command = None
self.md_job_info = None
jaguarworkflows.RobustSubmissionJob.__init__(
self, self._command, name=self.jobname, command_dir=command_dir)
[docs] def setup(self):
"""
Over write parent class method.
"""
shutil.copy(
os.path.join(os.pardir, os.path.basename(self.md_opts.input_cms)),
os.curdir)
# addOutputFile() needs command_dir as the backend points to the parent
# job start dir
self.md_job_info = setup_md_job(
self.jobname,
self.md_opts,
enegrp_opts=self.enegrp_opts,
save_opts=self.save_opts,
command_dir=self.command_dir)
self._command = self.md_job_info.cmd
[docs]class MultiDriver(object):
"""
Class to fire off multiple subjobs.
"""
[docs] def __init__(self, options, jobname_base, logger=None, smart_jdj=False):
"""
:type options: Named tuple
:param options: The parsed command line options
:type jobname_base: str
:param jobname_base: base name of this job
:type logger: `logging.Logger`
:param logger: The logger for this builder
:type smart_jdj: bool
:param smart_jdj: whether to use smart distribution of jobs
"""
self.options = options
self.jobname_base = jobname_base
self.logger = logger
self.smart_jdj = smart_jdj
self.jdj = None
self.input_cms = self.options.input_cms
self.backend = jobcontrol.get_backend()
self.out_cms = self.jobname_base + '-out.cms'
jobutils.add_outfile_to_backend(self.out_cms, set_structure_output=True)
[docs] def run(self):
"""
Run multiple calculation and perform ensemble average.
"""
if self.options.run_type == FULL_RUN:
# Run MD in parallel
self.setUpQue()
self.initiateAndAddMdJobs()
self.runSubjobs()
# Record the output file from each subjob
pass
elif self.options.run_type == POST_RUN:
# Read MD outputs from previous runs
pass
elif self.options.run_type == MULTI_RUN:
# Read outfiles from previous calculations
pass
if self.options.run_type != FIT_ONLY:
# FULL_RUN, POST_RUN, and MULTI_RUN need
self.convertMultiToOne()
else:
self.loadDataFromAllInOneFile()
# Final curve fitting
pass
[docs] def convertMultiToOne(self):
"""
Convert data from files from multiple subjobs into one single file.
"""
raise NotImplementedError('This method combines all the final '
'outfiles from subjobs into one.')
[docs] def loadDataFromAllInOneFile(self):
"""
Set the all-in-one file based on user input.
"""
raise NotImplementedError(
'This method loads the data for curve fitting.')
[docs] def setUpQue(self):
"""
Setup job queue.
"""
self.jdj = jobdj.JobDJ(verbosity='normal', max_failures=jobdj.NOLIMIT)
if not self.smart_jdj:
self.jdj.disableSmartDistribution()
[docs] def initiateAndAddMdJobs(self,
SingleMDJob=SingleMDJob,
has_enegrp=False,
additional_save_info=None,
**kwargs):
"""
Construct options, and initiate SingleMDJob jobs, and added them to jobdj.
:param SingleMDJob: instantiate this class to an object that can be added
to jobdj
:type SingleMDJob: 'queue.JobControlJob'
:param has_enegrp: if False, no enegrp file is generated.
:type has_enegrp: bool
:param additional_save_info: additional information on what files to be
copied back.
:type additional_save_info: 'types.SimpleNamespace'
"""
if jobutils.get_option(self.options, jobutils.SAVE_FLAG):
# jobutils.SAVE_TRJ controls the saving states of the subjobs
save_ocms = self.options.save_trj_data in [
jobutils.SAVE_CMS, jobutils.SAVE_TRJ
]
save_trj = self.options.save_trj_data == jobutils.SAVE_TRJ
else:
# FLAG_MD_SAVE_TRJ controls the saving states of the master jobs,
# and that of the subjob if there is only one subjob
assert hasattr(self.options, FLAG_MD_SAVE_TRJ[1:])
save_ocms = True
save_trj = self.options.md_save_trj
md_opts = {
INPUT_CMS: self.options.input_cms,
TIME: self.options.md_time,
TIMESTEP: self.options.md_timestep,
ENSEMBLE: self.options.md_ensemble,
TEMPERATURE: self.options.md_temp,
PRESSURE: self.options.md_press,
TRAJECTORY_INTERVAL: self.options.md_trj_int,
RANDOM_SEED: self.options.seed,
PROCS: 1,
GPU: True
}
md_opts = SimpleNamespace(**md_opts)
enegrp_opts = None
if has_enegrp:
enegrp_opts = {
INTERVAL: self.options.md_enegrp_int,
START: self.options.md_enegrp_start,
ENERGY_GROUPS: ['pressure_tensor']
}
enegrp_opts = SimpleNamespace(**enegrp_opts)
save_opts = {
OCMS: save_ocms,
TRJ: save_trj,
}
if additional_save_info:
save_opts.update(additional_save_info)
save_opts = SimpleNamespace(**save_opts)
if self.options.seed and self.options.md_sim_num > 1:
jobutils.seed_random_number_generator(self.options, self.log)
for md_run_idx in range(1, self.options.md_sim_num + 1):
jobname_base = '_'.join([self.jobname_base, str(md_run_idx)])
new_md_opts = copy.deepcopy(md_opts)
if self.options.md_sim_num > 1:
random_seed = random.randint(parserutils.RANDOM_SEED_MIN,
parserutils.RANDOM_SEED_MAX)
new_md_opts.random_seed = random_seed
command_dir = jobname_base
job = SingleMDJob(
jobname_base,
new_md_opts,
enegrp_opts,
save_opts,
command_dir=command_dir,
**kwargs)
self.jdj.addJob(job)
[docs] def log(self, msg, **kwargs):
"""
Add a message to the log file
:type msg: str
:param msg: The message to log
Additional keyword arguments are passed to the textlogger.log_msg function
"""
kwargs['logger'] = self.logger
textlogger.log_msg(msg, **kwargs)
[docs] def log_error(self, msg):
"""
Add a message to the log file and exit with an error code
:type msg: str
:param msg: The message to log
"""
textlogger.log_error(msg, logger=self.logger)
[docs] def runSubjobs(self):
"""
Run molecular dynamics jobs and Summarize the status of the runs.
"""
self.log(
f'{self.jdj.total_added} independent molecular dynamics simulations are running...'
)
self.jdj.run()
if not self.jdj.done_jobs:
self.log_error('None of the molecular dynamics simulations are '
'successful. Aborting...')
fail_jobnames = [
job.name for job in self.jdj.failed_jobs if job.name.endswith('_md')
]
if fail_jobnames:
self.log(
f"{self.jobname_base} calculations will not include the failed MD runs: {', '.join(fail_jobnames)}."
)
else:
self.log(
'All independent simulations are successful.', pad_below=True)