"""
Utilities for reaction workflows.
Copyright Schrodinger, LLC. All rights reserved.
"""
import argparse
import copy
import csv
import glob
import itertools
import json
import os
import re
import shutil
import warnings
from collections import OrderedDict
from collections import namedtuple
from types import SimpleNamespace
import networkx as nx
import numpy
import yaml
from scipy import constants
from schrodinger import structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import anharmonic
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import etarotamers
from schrodinger.application.matsci import \
jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import property_names as pnames
from schrodinger.application.matsci.genetic_optimization import \
genetic_optimization as go
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from schrodinger.utils import units
from schrodinger.forcefield import OPLS_NAME_F16
FLAG_N_CONFORMERS = '-n_conformers'
FLAG_PP_REL_ENERGY_THRESH = '-pp_rel_energy_thresh'
FLAG_RETURN_CSEARCH_FILES = '-return_csearch_files'
FLAG_JAGUAR = '-jaguar'
FLAG_JAGUAR_KEYWORDS = '-jaguar_keywords'
FLAG_TEMP_START = '-temp_start'
FLAG_TEMP_STEP = '-temp_step'
FLAG_TEMP_N = '-temp_n'
FLAG_PRESS_START = '-press_start'
FLAG_PRESS_STEP = '-press_step'
FLAG_PRESS_N = '-press_n'
FLAG_MAX_I_FREQ = anharmonic.FLAG_MAX_I_FREQ
FLAG_TPP = jobutils.FLAG_TPP
FLAG_DESCRIPTORS = '-descriptors'
FLAG_LIGFILTER = '-ligfilter'
FLAG_CANVAS = '-canvas'
FLAG_MOLDESCRIPTORS = '-moldescriptors'
FLAG_INCLUDE_VECTORIZED = '-include_vectorized'
FLAG_FINGERPRINTS = '-fingerprints'
FLAG_SAVEFILES = '-savefiles'
FLAG_INPUT_FILE = '-input_file'
DEFAULT_JOB_NAME = 'reaction_workflow'
DEFAULT_N_CONFORMERS = 5
# keys and values should be strings
DEFAULT_JAGUAR_KEYWORDS = OrderedDict([('dftname', 'B3LYP'), ('iuhf', '1'),
('basis', 'LACVP**')])
DEFAULT_TPP = 1
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 does
DEFAULT_FORCE_FIELD_NAME = mm.OPLS_NAME_F16 # OPLS3
RESERVED_JAGUAR_KEYS = set([
'molchg', 'multip', 'igeopt', 'ifreq', 'itrvec', 'nhesref', 'ntemp',
'tmpini', 'tmpstp', 'npress', 'press', 'press_step'
])
REACTION_WF_TAG = '_rxnwf'
REACTION_WF_STRUCTURE_KEY = 'b_matsci_Reaction_Workflow_Structure'
CONFORMERS_GROUP_KEY = 's_matsci_Reaction_Workflow_Conformers_Group'
SIBLING_GROUP_KEY = 's_matsci_Reaction_Workflow_Sibling_Group'
# for the following keep the original key for backwards compatibility
PARENT_SIBLING_GROUPS_KEY = 's_matsci_Reaction_Workflow_Parent_Conformer_Groups'
CHARGE_KEY = 'i_m_Molecular_charge'
MULTIPLICITY_KEY = 'i_m_Spin_multiplicity'
KEEP_ATOM_KEY = 'b_matsci_Reaction_Workflow_Keep_Atom'
SUPERPOSITION_ATOM_KEY = 'i_matsci_Reaction_Workflow_Superposition_Atom'
RESTRAINED_ATOM_KEY = 'b_matsci_Reaction_Workflow_Restrained_Atom'
RESTRAINED_DISTANCES_KEY = 's_matsci_Reaction_Workflow_Restrained_Distances'
RESTRAINED_ANGLES_KEY = 's_matsci_Reaction_Workflow_Restrained_Angles'
RESTRAINED_DIHEDRALS_KEY = 's_matsci_Reaction_Workflow_Restrained_Dihedrals'
TRANSITION_STATE_STRUCTURE_KEY = \
'b_matsci_Reaction_Workflow_Transition_State_Structure'
INDEX_SEPARATOR = ','
PARENT_SEPARATOR = ','
SEPARATOR = ';'
# must use str-ed values here rather than mm.<function>
PROP_TYPE_GET_DICT = {
'r': 'mmct_atom_property_get_real',
'b': 'mmct_atom_property_get_bool',
'i': 'mmct_atom_property_get_int',
's': 'mmct_atom_property_get_string'
}
ENTRY_ID_KEY = pnames.ENTRY_ID_PROP
TempData = namedtuple('TempData', ['temp_start', 'temp_step', 'temp_n'])
PressData = namedtuple('PressData', ['press_start', 'press_step', 'press_n'])
HARTREE_UNITS = ['au']
KCAL_PER_MOL_UNITS = ['kcal/mol']
EV_UNITS = ['eV']
KJOULES_PER_MOL_UNITS = ['kJ/mol']
# kcal/mol/K
IDEAL_GAS_CONSTANT = (constants.R / constants.calorie) / constants.kilo
STAGE_SEPARATOR = '_stage_'
REL_REACTANTS_W_X_EXT = '_conf_avg_rel_reactants.csv'
REL_REACTANTS_WO_X_EXT = '_conf_avg_wo_x_rel_reactants.csv'
REL_REACTANTS_LOWEST_EXT = '_lowest_conf_rel_reactants.csv'
REL_PARENTS_W_X_EXT = '_conf_avg_rel_parents.json'
REL_PARENTS_WO_X_EXT = '_conf_avg_wo_x_rel_parents.json'
REL_PARENTS_LOWEST_EXT = '_lowest_conf_rel_parents.json'
REL_REACTANTS_EXTS = [
REL_REACTANTS_W_X_EXT, REL_REACTANTS_WO_X_EXT, REL_REACTANTS_LOWEST_EXT
]
REL_PARENTS_EXTS = [
REL_PARENTS_W_X_EXT, REL_PARENTS_WO_X_EXT, REL_PARENTS_LOWEST_EXT
]
REL_EXT_TO_KWARG_DICT = {
REL_REACTANTS_W_X_EXT: 'edict_w_x',
REL_REACTANTS_WO_X_EXT: 'edict_wo_x',
REL_REACTANTS_LOWEST_EXT: 'edict_lowest',
REL_PARENTS_W_X_EXT: 'edict_w_x',
REL_PARENTS_WO_X_EXT: 'edict_wo_x',
REL_PARENTS_LOWEST_EXT: 'edict_lowest'
}
ANHARMONIC_TAG = '_anharmonic'
RATE_CONSTANTS_W_X_EXT = '_conf_avg_rate_constants.csv'
# energies in kJ/mol, MacroModel output uses a '-S-OPLS' extension
MMOD_ENERGY_KEY = f'r_mmod_Potential_Energy-{OPLS_NAME_F16}'
MMOD_REL_ENERGY_KEY = f'r_mmod_Relative_Potential_Energy-{OPLS_NAME_F16}'
FFLD_ENERGY_GLOBAL_TAG = '_global'
# kJ/mol
CSEARCH_REL_ENERGY_THRESH = 50.
# Ang.
CSEARCH_RMSD_THRESH = 0.1
JMSWF_LAUNCH_DIR = 'jaguar_multistage_workflow'
ANHARM_LAUNCH_DIR = 'anharmonic'
# the variable name pp_rel_energy_thresh uses the prefix pp to indicate
# that this is the relative energy used in postprocessing MacroModel results
# and is different from the variable name rel_energy_thresh, used elsewhere in this
# code, which is the relative energy used in the MacroModel csearch
PP_CSEARCH_REL_ENERGY_THRESH = None # kJ/mol or None
DESCRIPTORS_LAUNCH_DIR_HEADER = 'descriptors_'
[docs]def get_idxs_marked_atoms(st, prop):
"""
Return a list of indices of atoms in the given
structure that have the given property defined.
:type st: schrodinger.structure.Structure
:param st: the structure
:type prop: str
:param prop: the property that marks the atoms
:rtype: list
:return: contains indices of atoms
"""
afunc = getattr(mm, PROP_TYPE_GET_DICT[prop[0]])
try:
# if no exception then atom index 1 has the property
# defined
afunc(st, prop, 1)
except mm.MmException as mmexc:
# either is not available as an atom property
# or is available but no atom defines it
if mmexc.rc == mm.MMCT_ATOM_PROPERTY_NOT_DEFINED_IN_CT:
gather = False
# atom index other than 1 has the property defined
elif mmexc.rc == mm.MMCT_ATOM_PROPERTY_UNDEFINED_ATOM:
gather = True
else:
raise
else:
gather = True
idxs = []
if gather:
for atom in st.atom:
if atom.property.get(prop):
idxs.append(atom.index)
return idxs
[docs]def get_restrain_atom_idxs(st):
"""
Return a list of indices of restrain atoms
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains indices of restrain atoms
"""
return get_idxs_marked_atoms(st, RESTRAINED_ATOM_KEY)
[docs]def get_idx_groups(text):
"""
Get index groups from the given string.
:type text: str
:param text: the string
:raise: InvalidInput if there is a formatting issue
:rtype: list
:return: contains list of indices
"""
# valid patterns are like '(i,j,...);(k,l,...);...'
idxs = []
for token in text.split(SEPARATOR):
if not token:
continue
try:
obj = eval(token)
except (SyntaxError, NameError):
raise InvalidInput
if not isinstance(obj, tuple):
raise InvalidInput
if not all(isinstance(i, int) for i in obj):
raise InvalidInput
idxs.append(list(obj))
return idxs
def _get_restrain_group_idxs(st, prop):
"""
Return a list of lists of indices of a restrain group
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type prop: str
:param prop: the restrain property
:rtype: list
:return: contains lists of restrain indices
"""
idxs = st.property.get(prop, [])
if idxs:
idxs = get_idx_groups(idxs)
return idxs
[docs]def get_restrain_distance_idxs(st):
"""
Return a list of lists of indices of restrain distances
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain distances
"""
return _get_restrain_group_idxs(st, RESTRAINED_DISTANCES_KEY)
[docs]def get_restrain_angle_idxs(st):
"""
Return a list of lists of indices of restrain angles
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain angles
"""
return _get_restrain_group_idxs(st, RESTRAINED_ANGLES_KEY)
[docs]def get_restrain_dihedral_idxs(st):
"""
Return a list of lists of indices of restrain dihedrals
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain dihedrals
"""
return _get_restrain_group_idxs(st, RESTRAINED_DIHEDRALS_KEY)
[docs]def get_idx_groups_str(idx_groups):
"""
Get a string representation of the given index groups.
:type idx_groups: list
:param idx_groups: contains lists of indices
:rtype: str
:return: the string
"""
strs = []
for idx_group in idx_groups:
astr = '(' + INDEX_SEPARATOR.join([str(i) for i in idx_group]) + ')'
strs.append(astr)
return SEPARATOR.join(strs)
[docs]def get_jaguar_keywords_list(jaguar_keywords_dict):
"""
Return the Jaguar keywords list from the given dict.
:type jaguar_keywords_dict: OrderedDict
:param jaguar_keywords_dict: the Jaguar keywords dict
:rtype: list
:return: the Jaguar keywords list
"""
return ['='.join(item) for item in jaguar_keywords_dict.items()]
[docs]def type_cast_seed(seed):
"""
Type cast the seed.
:type seed: str or unicode
:param seed: seed for random number generator
:rtype: int
:return: the seed
"""
return parserutils.type_random_seed(
seed,
seed_min=go.CONF_SEARCH_SEED_RANGE[0],
seed_max=go.CONF_SEARCH_SEED_RANGE[1])
[docs]def type_cast_jaguar_keywords(jaguar_keywords,
reserved_keys=RESERVED_JAGUAR_KEYS,
exception_type=argparse.ArgumentTypeError):
"""
Type cast the Jaguar keywords.
:type jaguar_keywords: str or unicode or list
:param jaguar_keywords: the Jaguar keywords, a whitespace
delimited string of '<key>=<value>' tokens or a list
of such tokens
:type reserved_keys: set
:param reserved_keys: contains reserved Jaguar keys
:type exception_type: type
:param exception_type: the exception type to raise if invalid
:rtype: OrderedDict
:return: the Jaguar keywords OrderedDict
"""
if not jaguar_keywords:
msg = ('Jaguar keywords must be provided.')
raise exception_type(msg)
if isinstance(jaguar_keywords, list):
jaguar_keywords = ' '.join(jaguar_keywords)
jaguar_keywords = str(jaguar_keywords)
try:
jaguarworkflows.keyword_string_to_dict(jaguar_keywords)
except ValueError as err:
msg = ('The following issue has been found with the '
'specified Jaguar keywords: {err}. Such options '
'must be specified using whitespace separated '
'<key>=<value> pairs.').format(err=str(err))
raise exception_type(msg)
adict = OrderedDict([token.split('=') for token in jaguar_keywords.split()])
if reserved_keys.intersection(set(adict.keys())):
msg = ('The following Jaguar keys are reserved for this workflow: '
'{keys}.').format(keys=sorted(reserved_keys))
raise exception_type(msg)
return adict
[docs]def check_ff_assignment(sts, ffld_name=DEFAULT_FORCE_FIELD_NAME):
"""
Check the assignment of the given force field to the given
structures.
:type sts: list
:param sts: contains schrodinger.structure.Structure
:type ffld_name: str
:param ffld_name: the force field name
:raise ValueError: if invalid
"""
# currently the following does not support an mmlewis_apply
# clean up layer (which can change the incoming atom types),
# see minimize.Minimizer.setStructure for more details
ffld_version = mm.opls_name_to_version(ffld_name)
invalid_sts = desmondutils.find_forcefield_invalid_structures(
sts, ffld_version)
invalid_idxs = [str(sts.index(st) + 1) for st in invalid_sts]
if invalid_idxs:
idxs = ','.join(invalid_idxs)
msg = ('The {name} force field could not be assigned to '
'the following structures: {idxs}.').format(
name=ffld_name, idxs=idxs)
raise ValueError(msg)
[docs]def get_molecular_weight(st, idxs=None):
"""
Return the molecular weight (amu) taken over the given atom
indices in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idxs: list
:param idxs: the atom indices
:rtype: float
:return: the molecular weight (amu)
"""
# both st.total_weight and atom.atomic_weight use implicit hydrogens
# so do it by hand
if not idxs:
idxs = range(1, st.atom_total + 1)
weight = 0
for idx in idxs:
weight += mm.mmelement_get_atomic_weight_by_atomic_number(
st.atom[idx].atomic_number)
return weight
def _get_molecular_weight(st, idxs=None):
"""
Return the molecular weight (amu) taken over the given atom
indices in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idxs: list
:param idxs: the atom indices
:rtype: float
:return: the molecular weight (amu)
"""
# see MATSCI-6764 where a precision issue was detected
return round(get_molecular_weight(st, idxs=idxs), 3)
[docs]def check_reaction_wf_structures(rxn_sts,
ffld_name=None,
mass_conserved=False,
keep_atoms_only=False):
"""
Check the given reaction workflow structues.
:type rxn_sts: str or list
:param rxn_sts: the reaction workflow structures, a file
name or list of schrodinger.structure.Structure
:type ffld_name: str
:param ffld_name: the force field name to use when optionally
checking its assignment to the given structures
:type mass_conserved: bool
:param mass_conserved: check that mass is conserved (see also
keep_atoms_only kwarg)
:type keep_atoms_only: bool
:param keep_atoms_only: specifies that only keep atoms be considered
when checking if mass is conserved (see also mass_conserved kwarg)
:raise ValueError: if invalid
"""
if isinstance(rxn_sts, str):
if not fileutils.is_maestro_file(rxn_sts):
msg = ('The file {afile} is not a Maestro file.'
).format(afile=rxn_sts)
raise ValueError(msg)
if not fileutils.get_basename(rxn_sts).endswith(REACTION_WF_TAG):
msg = ('The file {afile} is not a reaction workflow file '
'as its base name does not end in {tag}.').format(
afile=rxn_sts, tag=REACTION_WF_TAG)
raise ValueError(msg)
rxn_sts = [st for st in structure.StructureReader(rxn_sts)]
titles = set()
for idx, st in enumerate(rxn_sts, 1):
if not st.property.get(REACTION_WF_STRUCTURE_KEY):
msg = ('Structure number {idx} does not belong to a '
'reaction workflow.').format(idx=idx)
raise ValueError(msg)
if not st.property.get(CONFORMERS_GROUP_KEY):
msg = ('Structure number {idx} does not belong to a '
'conformer group.').format(idx=idx)
raise ValueError(msg)
titles.add(st.title)
if len(titles) < len(rxn_sts):
msg = ('Structures must have unique titles.')
raise ValueError(msg)
sibling_conformers_dict = bin_structures_by_property(
rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
getters = [
get_restrain_distance_idxs, get_restrain_angle_idxs,
get_restrain_dihedral_idxs
]
found_parents = True
masses = set()
for sibling_group, conformers_dict in sibling_conformers_dict.items():
num_tss = 0
parents = set()
mass = 0
for conformer_group, conformers in conformers_dict.items():
# check conformers
check_conformers(conformers, conformer_group)
first_conformer = conformers[0]
num_tss += first_conformer.property.get(
TRANSITION_STATE_STRUCTURE_KEY, 0)
parents.add(first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY))
# check restraint idxs
keep_idxs = get_idxs_marked_atoms(first_conformer, KEEP_ATOM_KEY)
if keep_idxs:
all_restrain_idxs = set()
for getter in getters:
for restrain_idxs in getter(first_conformer):
all_restrain_idxs.update(restrain_idxs)
if not all_restrain_idxs.issubset(keep_idxs):
msg = (
f'The structures in conformer group {conformer_group} '
'have atoms involved in distance, angle, and/or dihedral '
'restraints that are not among the found keep atoms.')
raise ValueError(msg)
if keep_idxs and keep_atoms_only:
mass += _get_molecular_weight(first_conformer, idxs=keep_idxs)
else:
mass += _get_molecular_weight(first_conformer)
masses.add(mass)
# check transition states
if num_tss > 1:
msg = (
f'The structures in sibling group {sibling_group} can not have '
'more than a single transition state.')
raise ValueError(msg)
# check parent equivalency
if len(parents) > 1:
msg = (
f'The structures in sibling group {sibling_group} do not have '
'identical parents.')
raise ValueError(msg)
# check parent existence
parents_tmp = list(parents)[0]
if parents_tmp:
for parent in parents_tmp.split(PARENT_SEPARATOR):
if parent not in sibling_conformers_dict.keys():
msg = (
f'Sibling group {sibling_group} specifies a non-existent parent '
f'sibling group {parent}.')
raise ValueError(msg)
# check reactant siblings
if None in parents:
if not found_parents:
msg = (f'Structures in sibling group {sibling_group} '
'have no parents specified. All groups '
'other than reactant siblings must '
'have parent groups defined.')
raise ValueError(msg)
found_parents = False
if found_parents:
msg = ('No reactants were found.')
raise ValueError(msg)
if mass_conserved and len(masses) > 1:
msg = ('Mass is not conserved along the reaction path.')
raise ValueError(msg)
if ffld_name:
check_ff_assignment(rxn_sts, ffld_name=ffld_name)
[docs]def bin_structures_by_property(sts, key=CONFORMERS_GROUP_KEY, inner_key=None):
"""
Return a dictionary of structures binned by a property with
the given key. If inner_key is provided then return a dictionary
of dictionaries of structures with the inner dictionaries keyed
by inner_key and outer dictionaries keyed by key.
:type sts: list
:param sts: the structures
:type key: str
:param key: the key for the property by which to bin
:type inner_key: str
:param inner_key: additionally bin by this inner_key
:rtype: dict or dict of dict
:return: dictionary where keys are properties and values
are lists of structures or dictionary of dictionaries
where the outer dictionary is keyed by key and inner
dictionary is keyed by inner_key and values of the inner
dictionary are lists of structures
"""
# for backwards compatibility in the data structure if the inner
# property exists but the outer property does not then for the
# outer property use the inner property and update the structure
adict = {}
for st in sts:
aprop = st.property.get(key)
if inner_key:
inner_aprop = st.property.get(inner_key)
if aprop is None and inner_aprop:
aprop = inner_aprop
st.property[key] = aprop
adict.setdefault(aprop, {}).setdefault(inner_aprop, []).append(st)
else:
adict.setdefault(aprop, []).append(st)
return adict
def _check_structures_for_property_equivalence(sts,
key=RESTRAINED_ATOM_KEY,
is_atom_key=True):
"""
Check that the given structures have equivalent property values
for the given structure or atom key. If it is an atom key then
the key must be a boolean key as this function only ensures that
the groups of atoms for which the given property exists are
equivalent as opposed to checking equivalence among the property
values.
:type sts: list
:param sts: the structures
:type key: str
:param key: the key for the property for which to check
equivalence
:type is_atom_key: bool
:param is_atom_key: True if the given key is an atom key in which
case only property existence is checked as opposed to property
values
:raise ValueError: if invalid
"""
aprops = set()
for st in sts:
if is_atom_key:
aprop = tuple(get_idxs_marked_atoms(st, key))
if not aprop:
aprop = None
else:
aprop = st.property.get(key)
aprops.add(aprop)
if len(aprops) > 1:
msg = ('The given structures have inequivalent {key} properties.'
).format(key=key)
raise ValueError(msg)
[docs]def postprocess_conformational_search(conformer, out_mae_path):
"""
Postprocess a MacroModel conformational search. Rewrite the given MacroModel
out *mae file so that the conformers in it have properly updated properties.
:type conformer: schrodinger.structure.Structure
:param conformer: a representative conformer that seeded the
search being postprocessed
:type out_mae_path: str
:param out_mae_path: the file path to the MacroModel out *mae file
"""
# see MATSCI-5508 where it was seen that charge and multiplicity
# properties sometimes do not propagate
charge = conformer.property.get(CHARGE_KEY)
multiplicity = conformer.property.get(MULTIPLICITY_KEY)
# see MATSCI-7495 since using MacroModel's "serial" search relative energies
# need updating, energies in kJ/mol
min_energy = min(st.property[MMOD_ENERGY_KEY]
for st in structure.StructureReader(out_mae_path))
ff_rel_energy_key_g = f'{MMOD_REL_ENERGY_KEY}{FFLD_ENERGY_GLOBAL_TAG}'
conformers_out = []
for st in structure.StructureReader(out_mae_path):
if charge is not None:
st.property[CHARGE_KEY] = charge
if multiplicity is not None:
st.property[MULTIPLICITY_KEY] = multiplicity
st.property[
ff_rel_energy_key_g] = st.property[MMOD_ENERGY_KEY] - min_energy
conformers_out.append(st)
structure.write_cts(conformers_out, out_mae_path)
[docs]def get_int_tuples_from_str_property(st, key, separator=SEPARATOR):
"""
Return a list of tuples of integers from the given string structure
property.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: the property key
:type separator: str
:param separator: the tuple separator used for the given property
:rtype: list
:return: contains tuples of integers
"""
aproperty = st.property.get(key)
if not aproperty:
return []
else:
return [eval(token) for token in aproperty.split(separator)]
[docs]def update_index_properties(st, old_to_new):
"""
Update the index properties of the given structure.
:type st: `structure.Structure`
:param st: the structure
:type old_to_new: dict
:param old_to_new: a map of old-to-new atom indices
"""
keys = (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
RESTRAINED_DIHEDRALS_KEY)
for key in keys:
idxs = _get_new_restrain_group_idxs(st, key, old_to_new)
text = get_idx_groups_str(idxs)
if text:
st.property[key] = text
[docs]def get_core_idxs(st):
"""
Return a set of atom indices for the core of the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: set
:return: core atom indices
"""
# keep atoms are not to be considered as core atoms
atom_keys = [SUPERPOSITION_ATOM_KEY, RESTRAINED_ATOM_KEY]
st_keys = [
RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
RESTRAINED_DIHEDRALS_KEY
]
idxs = set()
for atom in st.atom:
for key in atom_keys:
if atom.property.get(key):
idxs.add(atom.index)
for key in st_keys:
for jdxs in _get_restrain_group_idxs(st, key):
idxs.update(jdxs)
return idxs
def _get_new_restrain_group_idxs(st, key, old_to_new, offset=0):
"""
Return new restrain group indices.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: determines the type of restrain coordinate
:type old_to_new: dict
:param old_to_new: the old-to-new atom index map
:type offset: int
:param offset: an offset added to all new restrain
group indices, useful when assembling structures
from parts of other structures
:rtype: list
:return: contains new restrain group indices
"""
# skip the entire group of old indices if any of them either (1)
# are not present in the map or (2) present but map to None
new_restrain_idxs = []
for old_list in _get_restrain_group_idxs(st, key):
try:
new_list = [old_to_new[idx] for idx in old_list]
except KeyError:
continue
if None in new_list:
continue
new_restrain_idxs.append([idx + offset for idx in new_list])
return new_restrain_idxs
[docs]class ReactionWorkflowFile(RepresentativeConformersMixin):
"""
Manage a reaction workflow file.
"""
[docs] def __init__(self, rxnwf_file):
"""
Create an instance.
:type rxnwf_file: str
:param rxnwf_file: the reaction workflow file
"""
sts = [st for st in structure.StructureReader(rxnwf_file)]
self.sibling_conformers_dict = bin_structures_by_property(
sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
[docs] def getReactantsSiblingGroupName(self):
"""
Return the reactants sibling group name.
:rtype: str
:return: the reactants sibling group name
"""
for sibling_group, conformer_group, conformer in self.representativeConformers(
):
parents = conformer.property.get(PARENT_SIBLING_GROUPS_KEY)
if not parents:
return sibling_group
[docs]class ReactionWorkflowEnergyAnalysis(ReactionWorkflowFile):
"""
Manage a reaction workflow energy analysis.
"""
[docs] def __init__(self, rxnwf_file, energy_keys):
"""
Create an instance.
:type rxnwf_file: str
:param rxnwf_file: the reaction workflow file
:type energy_keys: list
:param energy_keys: structure property energy keys to consider, if it
is temperature dependent then include the temperature (K) as a number
followed by 'K' in the key and the corresponding energy must be in
supported units (au, kcal/mol, eV, kJ/mol) and must be present in the
key as '(<units>)'
"""
super().__init__(rxnwf_file)
self.energy_keys = energy_keys
[docs] @staticmethod
def getTemperature(energy_key):
"""
Return the temperature (K) for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the temperature (K) if there is one
"""
return jaguarworkflows.get_temperature(energy_key)
[docs] @staticmethod
def getUnits(energy_key):
"""
Return the units for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: str, None
:return: the units if there is one
"""
# units like 'kcal/mol' in '(kcal/mol)'
match = re.search(r'\((\D+)\)', energy_key)
if match:
return match.group(1)
base_energy_key = energy_key.split(STAGE_SEPARATOR)[0]
_units = jmswfu.JAGUAR_PROP_UNITS_DICT.get(base_energy_key)
if _units:
return _units[1:-1]
[docs] @staticmethod
def getKcalPerMolConversion(energy_key):
"""
Return the kcal/mol conversion factor for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the kcal/mol conversion factor if there is one
"""
_units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key)
if _units in HARTREE_UNITS:
conversion = units.KCAL_PER_MOL_PER_HARTREE
elif _units in KCAL_PER_MOL_UNITS:
conversion = 1
elif _units in EV_UNITS:
conversion = units.KCAL_PER_MOL_PER_EV
elif _units in KJOULES_PER_MOL_UNITS:
conversion = units.KCAL_PER_MOL_PER_EV
conversion /= units.KJOULES_PER_MOL_PER_EV
else:
conversion = None
return conversion
def _getTotalEnergyEnsemble(self,
conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=True,
only_lowest_energy=False):
"""
Return an ensemble of total energies for the given conformers
dictionary of siblings and given energy key.
:type conformers_dict: dict
:param conformers_dict: keys are conformer group names, values are
lists of `structure.Structure`
:type energy_key: str
:param energy_key: the relevant energy key
:type conversion: float
:param conversion: the energy conversion factor to kcal/mol
:type temp: float
:param temp: the temperature in K
:type do_boltzmann: bool
:param do_boltzmann: if True perform a Boltzmann average, otherwise an
algebraic average
:type include_x_terms: bool
:param include_x_terms: whether to include cross terms in the
conformational averaging
:type only_lowest_energy: bool
:param only_lowest_energy: use only the lowest energy conformer
rather than averaging over conformers
:rtype: list
:return: ensemble of total energies
"""
if include_x_terms and only_lowest_energy:
msg = ('Simultaneous use of include_x_terms and '
'only_lowest_energy is not supported.')
raise ReactionWorkflowException(msg)
# the following getter takes a list that contains a sublist of conformers
# for each reactant, if including cross terms then return the iterator
# for all possible collections of mixed conformers of reactants else return
# the original list
getter = lambda x: itertools.product(*x) if include_x_terms else x
ensemble = []
for sample in getter(conformers_dict.values()):
energies = [st.property.get(energy_key) for st in sample]
if None in energies:
return []
if include_x_terms:
ensemble.append(sum(energies))
else:
min_energy = min(energies)
if only_lowest_energy:
ensemble.append(min_energy)
else:
if do_boltzmann:
deltas = conversion * (
numpy.array(energies) - min_energy)
factors = numpy.exp(-1 * deltas /
(IDEAL_GAS_CONSTANT * temp))
avg_delta = sum([
delta * factor
for delta, factor in zip(deltas, factors)
])
partition_f = sum(factors)
avg_delta /= partition_f
avg_delta /= conversion # handle conversion later
energy = min_energy + avg_delta # return total energy
ensemble.append(energy)
else:
ensemble.append(numpy.mean(energies))
if not include_x_terms and ensemble:
ensemble = [sum(ensemble)]
return ensemble
[docs] def getConfAvgRelEnergies(self,
include_x_terms=True,
only_lowest_energy=False):
"""
Return the conformationally averaged energies relative to that
of the reactants.
:type include_x_terms: bool
:param include_x_terms: whether to include cross terms in the
conformational averaging
:type only_lowest_energy: bool
:param only_lowest_energy: use only the lowest energy conformer
rather than averaging over conformers
:raise ReactionWorkflowException: if there is an issue
:rtype: dict
:return: keys are sibling group names, values are dicts with
energy keys as keys and energy values as values
"""
if include_x_terms and only_lowest_energy:
msg = ('Simultaneous use of include_x_terms and '
'only_lowest_energy is not supported.')
raise ReactionWorkflowException(msg)
# with cross terms in the conformational averaging all possible
# collections of conformers within a sibling group as well
# between sibling groups are included in the averaging which
# is done last, for example a high energy conformer of A is
# allowed to react with a low energy conformer of B, etc. and that
# relative to some collection of conformers from another sibling
# group both (high, low), (low, high), etc. collections of (A, B)
# are included, without cross terms the averaging is done first
# over each molecule in the sibling group thereby reducing the
# energy of the group down to a single number
# canonical ensemble
# relevant energies reported relative to reactants
reactants_sibling_group = self.getReactantsSiblingGroupName()
reactants_conformers_dict = self.getReactantsConformersDict()
# for each energy key for both reactants and non-reactants
# obtain a conformeric ensemble of energies summed over siblings
# and collect energy differences for all pairs then perform a
# Boltzmann average if the energy is both temperature dependent
# and of known units and including cross terms, otherwise perform
# an algebraic average
edict = {}
for energy_key in self.energy_keys:
conversion = self.getKcalPerMolConversion(energy_key)
temp = self.getTemperature(energy_key)
do_boltzmann = conversion and temp
conversion = conversion or 1
reactants_energies = self._getTotalEnergyEnsemble(
reactants_conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy)
if not reactants_energies:
# this means that at least a single reactant doesn't
# have the given energy_key defined, so skip the energy_key
continue
header = self.getHeader(energy_key)
edict.setdefault(reactants_sibling_group, {})[header] = 0.
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
if sibling_group == reactants_sibling_group:
continue
energies = self._getTotalEnergyEnsemble(
conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy)
if not energies:
# this means that at least a single non-reactant child doesn't
# have the given energy_key defined, so first undefine it for
# all sibling groups and then skip the energy_key by breaking
# out of the non-reactant child loop
for _edict in edict.values():
_edict.pop(header, None)
break
deltas = numpy.array([
conversion * (enr - er)
for enr, er in itertools.product(energies,
reactants_energies)
])
if do_boltzmann and include_x_terms:
factors = numpy.exp(-1 * deltas /
(IDEAL_GAS_CONSTANT * temp))
avg_delta = sum([
delta * factor
for delta, factor in zip(deltas, factors)
])
partition_f = sum(factors)
# see MATSCI-7153 - where the denominator can be problematic
with warnings.catch_warnings():
warnings.filterwarnings('error')
try:
avg_delta /= partition_f
except RuntimeWarning:
msg = (
'Large energy differences with respect to reactants '
'have been detected. This is typically a sign of a severely '
'imbalanced chemical reaction in the reaction workflow input '
'file.')
raise ReactionWorkflowException(msg)
else:
avg_delta = numpy.mean(deltas)
edict.setdefault(sibling_group, {})[header] = avg_delta
return edict
[docs] def getGraph(self):
"""
Return a NetworkX directed graph of the reaction workflow
energy level diagram.
:rtype: networkx.DiGraph
:return: nodes are sibling group names and have energy
dictionary kwargs, edges are directed (parent,
child) pairs
"""
edict_w_x = self.getConfAvgRelEnergies(
include_x_terms=True, only_lowest_energy=False)
edict_wo_x = self.getConfAvgRelEnergies(
include_x_terms=False, only_lowest_energy=False)
edict_lowest = self.getConfAvgRelEnergies(
include_x_terms=False, only_lowest_energy=True)
graph = nx.DiGraph()
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
graph.add_node(
sibling_group,
edict_w_x=edict_w_x[sibling_group],
edict_wo_x=edict_wo_x[sibling_group],
edict_lowest=edict_lowest[sibling_group])
# parents are equivalent just grab an arbitrary first item
parents = next(iter(conformers_dict.values()))[0].property.get(
PARENT_SIBLING_GROUPS_KEY)
if not parents:
continue
for parent_sibling_group in parents.split(PARENT_SEPARATOR):
graph.add_edge(parent_sibling_group, sibling_group)
return graph
[docs] @staticmethod
def getOrderedNodeNames(graph):
"""
Return an ordered list of node names from the given graph.
:type graph: networkx.DiGraph
:param graph: nodes are sibling group names and have energy
dictionary kwargs, edges are directed (parent,
child) pairs
:rtype: list
:return: contains ordered node names
"""
# for example, all lines point right, up-right, or down-right
# (just a non-physical example):
#
# I2 -- TS2
# / \ / \
# TS1 I3 P
# / \ /
# R I1
#
# is ordered as R,TS1,I1,I2,I3,TS2,P
# find the reactant node
for node in graph.nodes():
if not list(graph.predecessors(node)):
reactant_node = node
break
# start with the reactant node
nodes = [reactant_node]
current_nodes = [reactant_node]
# collect nodes in order by adding for the current nodes
# all successor nodes that have no predecessor nodes that
# have not already been collected, currently successor nodes
# are sorted by name rather than energy, avoid adding the
# same node twice which can happen for loops in the graph,
# the current nodes to search are updated until all nodes
# have been collected
while len(nodes) < graph.number_of_nodes():
tmp = []
for current_node in current_nodes:
for successor_node in sorted(graph.successors(current_node)):
if not set(graph.predecessors(successor_node)).difference(
nodes):
if successor_node not in nodes:
nodes.append(successor_node)
tmp.append(successor_node)
current_nodes = list(tmp)
return nodes
[docs] def writeDataFiles(self):
"""
Write data files.
"""
# get ordered nodes
graph = self.getGraph()
node_names = self.getOrderedNodeNames(graph)
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
out_files = []
# handle the conformationally averaged energies relative to reactants
for ext in REL_REACTANTS_EXTS:
conf_avg_rel_reactants_file = base_name + ext
kwarg = REL_EXT_TO_KWARG_DICT[ext]
with csv_unicode.writer_open(conf_avg_rel_reactants_file) as afile:
writer = csv.writer(afile)
for idx, name in enumerate(node_names):
edict = graph.node[name][kwarg]
if not idx:
headers = sorted(edict.keys())
writer.writerow(['Sibling_Group_Name'] + headers)
values = [name] + [edict[header] for header in headers]
writer.writerow(values)
out_files.append(conf_avg_rel_reactants_file)
# handle the conformationally averaged energies relative to parents
if len(node_names) > 1:
for ext in REL_PARENTS_EXTS:
conf_avg_rel_parents_file = base_name + ext
kwarg = REL_EXT_TO_KWARG_DICT[ext]
final_edict = {}
for child_name in node_names:
child_edict = graph.node[child_name][kwarg]
parents_edict = {}
for parent_name in graph.predecessors(child_name):
parent_edict = graph.node[parent_name][kwarg]
parents_edict[parent_name] = dict(
[(key, child_edict[key] - parent_edict[key])
for key in child_edict.keys()])
if parents_edict:
final_edict[child_name] = parents_edict
with open(conf_avg_rel_parents_file, 'w') as afile:
json.dump(
final_edict,
afile,
sort_keys=True,
indent=4,
separators=(',', ':'))
out_files.append(conf_avg_rel_parents_file)
backend = None
for out_file in out_files:
jobutils.add_outfile_to_backend(out_file, backend)
[docs]def get_stage_idx(astr):
"""
Return the stage index from the given string.
:type astr: str
:param astr: the string
:rtype: int
:return: the stage index
"""
# input can be a file
astr = fileutils.get_basename(astr)
head, stage_hash = astr.split(STAGE_SEPARATOR)
stage_idx = stage_hash.split('_')[0]
return int(stage_idx)
[docs]def check_TS_vetting(out_file):
"""
Return False if TS vetting failed in the given Jaguar
out file.
:type out_file: str
:param out_file: Jaguar out file
:rtype: bool
:return: False if TS vetting failed
"""
with open(out_file, 'r') as afile:
for line in afile:
if line.startswith('Unable to find any valid transition vectors.'):
return False
return True
[docs]def get_sub_host_str(obj, sub_host_attr, n_procs_attr):
"""
Return the command line -HOST argument for using a subhost.
:type obj: object
:param obj: the object, possibly having the given attributes
defined
:type sub_host_attr: str
:param sub_host_attr: the attribute for the subhost
:type n_procs_attr: str
:param n_procs_attr: the attribute for the number of processors
"""
try:
host = f'{getattr(obj, sub_host_attr)}:{getattr(obj, n_procs_attr)}'
except AttributeError:
host = jaguarworkflows.AUTOHOST
return host
[docs]class JMSWFException(Exception):
pass
[docs]class JMSWFMixin(RepresentativeConformersMixin):
"""
Manage a Jaguar multistage workflow.
"""
# various structure and atom properties dictate the behavior of this class
def _overwriteConformerEIDS(self):
"""
Overwrite the Maestro entry ID properties for the conformers.
"""
# needed for Jaguar multistage workflow
idx = 0
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
for conformer_group, conformers in conformers_dict.items():
for conformer in conformers:
idx += 1
eid = conformer.property[ENTRY_ID_KEY]
self.eids_dict.setdefault(eid, []).append(str(idx))
conformer.property[ENTRY_ID_KEY] = str(idx)
def _getJMSWFGenResKeywords(self, st, include_temp_press_data=True):
"""
Return the general reserved Jaguar multistage workflow keywords
for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type include_temp_press_data: bool
:param include_temp_press_data: whether to include the temperature
and pressure data
:rtype: OrderedDict
:return: the keywords
"""
adict = OrderedDict([('molchg', str(st.property.get(CHARGE_KEY, 0))),
('multip', str(
st.property.get(MULTIPLICITY_KEY, 1)))])
temp_data = getattr(self, 'temp_data', None)
press_data = getattr(self, 'press_data', None)
if include_temp_press_data and temp_data and press_data:
adict['tmpini'] = str(temp_data.temp_start)
adict['tmpstp'] = str(temp_data.temp_step)
adict['ntemp'] = str(temp_data.temp_n)
adict['press'] = str(press_data.press_start)
adict['press_step'] = str(press_data.press_step)
adict['npress'] = str(press_data.press_n)
return adict
def _getJMSWFOptKeywords(self, st, set_freq=True):
"""
Return the Jaguar multistage workflow optimization keywords
for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type set_freq: bool
:param set_freq: if True then set 'ifreq=1'
:rtype: str
:return: the string of keywords
"""
reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
st, include_temp_press_data=set_freq)
reserved_jaguar_keywords['igeopt'] = '1'
if set_freq:
reserved_jaguar_keywords['ifreq'] = '1'
jaguar_keywords = self.jaguar_keywords.copy()
jaguar_keywords.update(reserved_jaguar_keywords)
return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
@staticmethod
def _getJMSWFCoords(st, are_restraints=True):
"""
Return the Jaguar multistage workflow coords for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type are_restraints: bool
:param are_restraints: True if the coords are restraint coords, as
opposed to active coords
:rtype: list
:return: contains strings defining the coords
"""
atype_key_pairs = [(4, RESTRAINED_DISTANCES_KEY),
(5, RESTRAINED_ANGLES_KEY),
(6, RESTRAINED_DIHEDRALS_KEY)]
fields = ['{atype}', '{idxs}']
if are_restraints:
fields = [jmswfu.NONE] + fields
aformat = jmswfu.DELIM.join(fields)
coords = []
for atype, key in atype_key_pairs:
params = get_int_tuples_from_str_property(st, key)
for param in params:
idxs = jmswfu.DELIM.join(str(idx) for idx in param)
coord = aformat.format(atype=atype, idxs=idxs)
coords.append(coord)
return coords
def _getJMSWFTSKeywords(self, st):
"""
Return the Jaguar multistage workflow transition state search
keywords for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: str
:return: the string of keywords
"""
reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
st, include_temp_press_data=True)
reserved_jaguar_keywords['igeopt'] = '2'
reserved_jaguar_keywords['ifreq'] = '1'
reserved_jaguar_keywords['itrvec'] = '-6'
reserved_jaguar_keywords['nhesref'] = '-1'
# Jaguar TS vetting options, intentionally use the default
# for ts_vet_olap_cut, for ts_vet use the value +3 which considers
# both the eigenvector overlap with active atoms and distances
# among active atoms, despite the Jaguar documentation using +3
# as opposed to -3 does not cause a job failure so check
# afterwards
reserved_jaguar_keywords['ts_vet'] = '3'
jaguar_keywords = self.jaguar_keywords.copy()
jaguar_keywords.update(reserved_jaguar_keywords)
return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
def _getJMSWFStages(self):
"""
Return the Jaguar multistage workflow stages.
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
has_ts = getattr(self, 'has_ts', False)
# if there is a TS stage then prevent divergence in the stage
# indices by breaking up the work needed for non-TS stages
# into two stages
aformat = jmswfu.DELIM.join(['{eid}', '{rest}'])
opt_jaguar_keywords_lines = []
opt_restraints_lines = []
ts_jaguar_keywords_lines = []
ts_restraints_lines = []
ts_actives_lines = []
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
for conformer_group, conformers in conformers_dict.items():
for conformer in conformers:
eid = conformer.property[ENTRY_ID_KEY]
all_opt_restraints = self._getJMSWFCoords(
conformer, are_restraints=True)
set_freq = not has_ts and not bool(all_opt_restraints)
opt_jaguar_keywords = self._getJMSWFOptKeywords(
conformer, set_freq=set_freq)
opt_jaguar_keywords_lines.append(
aformat.format(eid=eid, rest=opt_jaguar_keywords))
for opt_restraints in all_opt_restraints:
opt_restraints_lines.append(
aformat.format(eid=eid, rest=opt_restraints))
is_ts = conformer.property.get(
TRANSITION_STATE_STRUCTURE_KEY)
if has_ts or is_ts:
if not is_ts:
set_freq = not bool(all_opt_restraints)
ts_jaguar_keywords = self._getJMSWFOptKeywords(
conformer, set_freq=set_freq)
for opt_restraints in all_opt_restraints:
ts_restraints_lines.append(
aformat.format(
eid=eid, rest=opt_restraints))
else:
ts_jaguar_keywords = self._getJMSWFTSKeywords(
conformer)
for ts_actives in self._getJMSWFCoords(
conformer, are_restraints=False):
ts_actives_lines.append(
aformat.format(eid=eid, rest=ts_actives))
ts_jaguar_keywords_lines.append(
aformat.format(eid=eid, rest=ts_jaguar_keywords))
# the final hessian from a restrained optimization may not be a good
# guess for a transition state search so do not inherit it from the
# parent
in_txt_file = os.path.join(self._jmswf_launch_dir,
self._jmswf_in_txt_file)
with open(in_txt_file, 'w') as afile:
afile.write(jmswfu.NEW_STAGE + '\n')
afile.write(jmswfu.KEYWORDS + '\n')
for opt_jaguar_keywords_line in opt_jaguar_keywords_lines:
afile.write(opt_jaguar_keywords_line + '\n')
if opt_restraints_lines:
afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
for opt_restraints_line in opt_restraints_lines:
afile.write(opt_restraints_line + '\n')
if ts_jaguar_keywords_lines:
afile.write(jmswfu.NEW_STAGE + '\n')
afile.write(jmswfu.PARENT + '\n')
afile.write(
jmswfu.DELIM.join(['1', jmswfu.WAVEFUNCTION]) + '\n')
afile.write(jmswfu.KEYWORDS + '\n')
for ts_jaguar_keywords_line in ts_jaguar_keywords_lines:
afile.write(ts_jaguar_keywords_line + '\n')
if ts_restraints_lines:
afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
for ts_restraints_line in ts_restraints_lines:
afile.write(ts_restraints_line + '\n')
if ts_actives_lines:
afile.write(jmswfu.ACTIVE_COORDINATES + '\n')
for ts_actives_line in ts_actives_lines:
afile.write(ts_actives_line + '\n')
stages = jmswfu.read_stage_datafile(in_txt_file)
fileutils.force_remove(in_txt_file)
return stages
def _updateJMSWFStages(self, stages):
"""
Update the Jaguar multistage workflow stages.
:type stages: list
:param stages: contains the jmswfu.StageData for the Jaguar
multistage workflow
:raise JMSWFException: if there is an issue
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
# here we need to read in the extra stages file and modify
# the returned jmswfu.StageData objects so that they can
# be seamlessly appended to the given stages, the first
# of these extra stages is skipped so as to allow analysis
# to potentially be the first extra stage
extra_stages = jmswfu.read_stage_datafile(self.extra_stages_file)[1:]
n_prev_stages = len(stages) # will be either 1 or 2
for index, extra_stage in enumerate(extra_stages, n_prev_stages + 1):
# update the index of this stage
extra_stage.index = index
# update any parent indices
for stage in extra_stage.parent_data:
stage.stage += n_prev_stages - 1
# update the parenting indices for any analysis stages
for stage in extra_stage.analyze_data:
stage.parent_st_idx += n_prev_stages - 1
stage.parent_idxs = [
i + n_prev_stages - 1 for i in stage.parent_idxs
]
# update entry IDs, the original entry IDs have been reset to start at
# 1 and now include values for any generated conformers, see
# ._overwriteConformerEIDS() and .eids_dict which for example
# might contain something like eids_dict[17] = [4,5,6,7] if for the
# original entry ID 17 there were 4 conformers generated which were
# assigned entry IDs 4, 5, 6, and 7, all entry_data data needs
# updated entry IDs
for stage_subsection, eid_dict in extra_stage.entry_data.items():
# see MATSCI-6429 - since the main interface and all known use cases
# force equivalent Jaguar keyword data for all structure entries in an
# extra stage we will overwrite stale entry IDs by making this assumption
# and choosing an arbitrary mapping of stale entry IDs to current entry
# IDs
uncommon = set(eid_dict.keys()).difference(
self.eids_dict.keys())
if len(eid_dict) == len(self.eids_dict) and uncommon:
eid_dict = dict(
zip(self.eids_dict.keys(), eid_dict.values()))
new_eid_dict = {}
for eid, datas in eid_dict.items():
for new_eid in self.eids_dict.get(eid, []):
new_datas = []
for data in datas:
new_data = copy.deepcopy(data)
new_data.eid = new_eid
new_datas.append(new_data)
new_eid_dict[new_eid] = new_datas
if not new_eid_dict:
msg = (
f'The extra stages file {self.extra_stages_file} contains '
'at least a single stage that does not define stage data '
'for any of the entries in the input file.')
raise JMSWFException(msg)
extra_stage.entry_data[stage_subsection] = new_eid_dict
return stages + extra_stages
def _setUpJMSWF(self):
"""
Set up the Jaguar multistage workflow.
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
extra_stages_file = getattr(self, 'extra_stages_file', None)
os.makedirs(self._jmswf_launch_dir, exist_ok=True)
with structure.StructureWriter(
os.path.join(self._jmswf_launch_dir,
self._jmswf_in_mae_file)) as writer:
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
for conformer_group, conformers in conformers_dict.items():
for conformer in conformers:
writer.append(conformer)
# jmswfu manages the global smap file which if there aren't any
# frequency jobs will be removed at the end
base_name = os.path.join(self._jmswf_launch_dir,
self._jmswf_base_name) + '-out'
jmswfu.create_smap(base_name, self._jmswf_out_mae_file)
stages = self._getJMSWFStages()
if extra_stages_file:
stages = self._updateJMSWFStages(stages)
jmswfu.write_stages_file(stages,
os.path.join(self._jmswf_launch_dir,
self._jmswf_in_txt_file))
return stages
[docs] def runJMSWF(self):
"""
Run the Jaguar multistage workflow.
:raise JMSWFException: if there is an issue
"""
stages = self._setUpJMSWF()
options = argparse.Namespace(
TPP=self.tpp,
name=self._jmswf_base_name,
input_file=self._jmswf_in_mae_file)
host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs')
job_dj = jaguarworkflows.create_queue(options=options, host=host)
# uses jaguarworkflows.RobustSubmissionJob by default
with fileutils.chdir(self._jmswf_launch_dir):
workflows = jmswfu.create_workflows(
options,
job_dj,
stages,
self._jmswf_out_smap_file,
hierarchical=False)
with structure.StructureWriter(self._jmswf_out_mae_file) as writer:
jaguarworkflows.run_workflows(job_dj, workflows, writer)
backend = None
jmswfu.finalize_smap(self._jmswf_out_smap_file, backend)
# for restarts it is possible that nothing was done
all_failed = job_dj.total_added and len(
job_dj._failed) == job_dj.total_added
# handle partial failures in self.setRepresentatives
if self.return_jaguar_files or all_failed:
jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
if all_failed:
msg = ('All Jaguar multistage workflow jobs have failed. '
'All output files will be returned.')
raise JMSWFException(msg)
[docs] def prepareJMSWFOutput(self):
"""
Prepare Jaguar multistage workflow output.
"""
backend = None
exts = ['-out.maegz', '-out.smap'] + jmswfu.SMAP_ELIGIBLE_EXTENSIONS
for ext in exts:
pattern = os.path.join(self._jmswf_launch_dir, '*' + ext)
for apath in glob.glob(pattern):
afile = os.path.split(apath)[1]
shutil.copy(apath, afile)
jobutils.add_outfile_to_backend(afile, backend)
[docs] def checkJMSWFOutputs(self, out_files):
"""
Return False if any of the given Jaguar out files
should be treated as a failure.
:type out_files: list
:param out_files: contains Jaguar output files
:rtype: bool
:return: False if any of the given Jaguar out files
should be treated as a failure
"""
# for transition state structures vetting is used however a bad
# result doesn't cause a Jaguar failure, nor does multiple imaginary
# frequencies for either optimizations or transition states
for out_file in out_files:
# Jaguar failures have already been handled in the calling code
# so missing Jaguar *out files means that the stage is an analysis
# stage
out_path = os.path.join(self._jmswf_launch_dir, out_file)
if not os.path.exists(out_path):
continue
# skip non-frequency jobs
jag_out = JaguarOutput(out_path)
normal_modes = anharmonic.get_normal_modes(jag_out)
if not normal_modes:
continue
# check TS, when asking for vetting there are three outcomes:
# (1) vetting performed and a vetted vector found
# (2) vetting performed and a vetted vector not found
# (3) vetting not performed (see MATSCI-5145)
#
# see MATSCI-10176 - jag_out.getStructure() does NOT include input
# Maestro structure properties
mae_path = os.path.join(self._jmswf_launch_dir,
f'{jag_out.restart}.mae')
st = structure.Structure.read(mae_path)
if st.property.get(TRANSITION_STATE_STRUCTURE_KEY):
if not check_TS_vetting(out_path):
# the found transition state does not involve the requested
# active atoms
return False
normal_mode = jag_out.vetted_ts_vector
if normal_mode and normal_mode.frequency != min(
[x.frequency for idx, x in normal_modes]):
# the normal mode involving the active atoms is not the lowest
# frequency mode
return False
# check imaginary frequencies
in_path = os.path.join(self._jmswf_launch_dir,
f'{jag_out.restart}.in')
if not os.path.exists(in_path):
return False
jag_in = JaguarInput(input=in_path)
try:
anharmonic.check_imaginary_frequencies(
jag_out, jag_in, max_i_freq=self.max_i_freq)
except anharmonic.AnharmonicException:
return False
return True
@staticmethod
def _setProperties(st, key, stage_idx, value):
"""
Set properties on the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: the property key
:type stage_idx: int
:param stage_idx: the stage index for the property
:type value: float
:param value: the property value
:rtype: set
:return: the created property keys
"""
rep_key = key + STAGE_SEPARATOR + str(stage_idx)
adict = {rep_key: value}
st.property.update(adict)
return set(adict.keys())
def _updateSiblingConformersDict(self):
"""
Update the sibling conformers dict.
"""
# currently the self.sibling_conformers_dict contains conformers with
# redundant structure titles if the conformers came from MacroModel, while
# the conformers collected in the representatives coming from the Jaguar
# multistage workflow job have been uniqueified using a '-<index>' appended
# to the title, here we need to update the self.sibling_conformers_dict to
# have unique titles, additionally the current self.sibling_conformers_dict
# contains conformers considered as failures in the calling code and so also
# remove any bad conformers
rep_titles = set(rep_st.title for rep_st in self.rep_sts)
for sibling_group, conformers_dict in list(
self.sibling_conformers_dict.items()):
for conformer_group, conformers in list(conformers_dict.items()):
prev_title = None
good_conformers = []
for idx, conformer in enumerate(conformers, 1):
title = conformer.title
if title == prev_title and idx > 1:
title += f'-{idx}'
else:
prev_title = title
if title in rep_titles:
conformer.title = title
good_conformers.append(conformer)
self.sibling_conformers_dict[sibling_group][
conformer_group] = good_conformers
[docs] def setRepresentatives(self):
"""
Associated with each structure is output data from potentially multiple
Jaguar multistage workflow stages. Pick representative structures to
carry the data for all stages.
:raise JMSWFException: if there is an issue
"""
has_ts = getattr(self, 'has_ts', False)
# bin stage structures by original structure title,
# titles are like 'reactant_stage_1', 'transition_state-2_stage_4_analysis_2',
# etc.
title_stage_sts_dict = {}
for st in structure.StructureReader(self._jmswf_out_mae_file):
title, stage_hash = st.title.split(STAGE_SEPARATOR)
title_stage_sts_dict.setdefault(title, []).append((st, stage_hash))
stages = jmswfu.read_stage_datafile(
os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file))
# choose the most recent structure from the original workflow
# as a representative structure to carry the properties from all
# extra stages, property keys are modified to end as '_stage_<idx>'
if has_ts:
rep_stage_idx = 2
else:
rep_stage_idx = 1
warned = False
copied = self.return_jaguar_files
for title, pairs in title_stage_sts_dict.items():
skip_msg = False
# the IndexError indicates that the required original workflow
# Jaguar calculation has failed for this conformer, continue as there
# may be other successful conformers
try:
rep_st = pairs[rep_stage_idx - 1][0].copy()
except IndexError:
skip_msg = ('Some Jaguar multistage workflow jobs have failed.')
if not skip_msg:
out_files = [f'{pair[0].title}.out' for pair in pairs]
if not self.checkJMSWFOutputs(out_files):
skip_msg = (
'Postprocessing of at least a single Jaguar multistage workflow '
'job has failed.')
if skip_msg:
if not warned:
if self.logger:
skip_msg += (' All output files will be returned.')
self.logger.warning(skip_msg)
warned = True
if not copied:
jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
copied = True
continue
rep_st.title = title
for st, stage_hash in pairs[rep_stage_idx - 1:]:
stage_idx = int(stage_hash.split('_')[0])
stage = stages[stage_idx - 1]
for key in stage.getPropertyKeys(st=st):
rep_st.property.pop(key, None)
value = st.property.get(key)
if value is not None:
_rep_keys = self._setProperties(rep_st, key, stage_idx,
value)
self.rep_keys.update(_rep_keys)
self.rep_sts.append(rep_st)
if not self.rep_sts:
msg = (
'Postprocessing of all Jaguar multistage workflow jobs has failed.'
)
raise JMSWFException(msg)
self._updateSiblingConformersDict()
# validate the output workflow file
sibling_conformers_dict = bin_structures_by_property(
self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
for sibling_group, conformer_group, conformer in self.representativeConformers(
):
conformers_dict = sibling_conformers_dict.get(sibling_group)
if not conformers_dict:
msg = (
'All Jaguar multistage workflow calculations for sibling '
f'group {sibling_group} have failed.')
raise JMSWFException(msg)
if not conformers_dict.get(conformer_group):
msg = (
'All Jaguar multistage workflow calculations for conformer '
f'group {conformer_group} have failed.')
raise JMSWFException(msg)
[docs] def finalizeJMSWFOutput(self):
"""
Finalize the Jaguar multistage workflow output.
:rtype: str
:return: the Jaguar multistage workflow output file
"""
jmswf_tag = getattr(self, 'jmswf_tag', '')
# create a valid non-redundant output file from the
# Jaguar multistage workflow output file, currently do not link any smap
# file as there can only be a single one linked and they are all available
# in the Jaguar multistage workflow output file
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) + '-out' + jmswf_tag
out_mae_file = base_name + '.mae'
structure.write_cts(self.rep_sts, out_mae_file)
smap_dict = {}
for idx, rep_st in enumerate(self.rep_sts, 1):
out_vib_file_format = rep_st.title + STAGE_SEPARATOR + '%s.vib'
out_vib_file = out_vib_file_format % '2'
if not os.path.exists(out_vib_file):
out_vib_file = out_vib_file_format % '1'
if not os.path.exists(out_vib_file):
continue
smap_dict[out_vib_file] = idx
if smap_dict:
out_smap_file = jmswfu.create_smap(
base_name, out_mae_file, smap_dict=smap_dict)
else:
out_smap_file = None
backend = None
if out_smap_file:
jobutils.add_outfile_to_backend(out_smap_file, backend)
jobutils.add_outfile_to_backend(
out_mae_file, backend, set_structure_output=True)
return out_mae_file
[docs]class DescriptorsMixin(object):
"""
Manage running descriptors.
"""
DEFAULT_JOB_NAME = 'automatic_reaction_workflow'
[docs] def runDescriptors(self, files):
"""
Run descriptors.
:type files: list
:param files: the files on which to run descriptors
"""
# handle restarts
job_name = jobutils.get_jobname(self.DEFAULT_JOB_NAME)
zip_files = jaguar_restart.get_restart_zip_files(
base_name_patterns=[f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}*'])
jaguar_restart.prepare_restart_dirs(zip_files)
host = get_sub_host_str(self, 'jmswf_host', 'n_rxnwf_subjobs')
# keep Jaguar &gen section to a minimum for now
keys = ['dftname', 'basis']
jaguar_keywords = {key: self.jaguar_keywords[key] for key in keys}
jaguar_keywords = get_jaguar_keywords_list(jaguar_keywords)
jaguar_keywords = ' '.join(jaguar_keywords)
options = argparse.Namespace(TPP=self.tpp)
job_dj = jaguarworkflows.create_queue(options=options, host=host)
base_cmd = ['complex_descriptors.py']
base_cmd += [FLAG_JAGUAR]
base_cmd += [FLAG_JAGUAR_KEYWORDS, jaguar_keywords]
base_cmd += [FLAG_TPP, str(self.tpp)]
base_cmd += [FLAG_LIGFILTER]
base_cmd += [FLAG_CANVAS]
base_cmd += [FLAG_MOLDESCRIPTORS]
base_cmd += [FLAG_INCLUDE_VECTORIZED]
base_cmd += [FLAG_FINGERPRINTS]
if self.return_descriptor_files:
base_cmd += [FLAG_SAVEFILES]
if host != jaguarworkflows.AUTOHOST:
base_cmd += ['-HOST', self.jmswf_host]
# collect jobs, the complex descriptors driver does more than
# just running Jaguar jobs so currently restart each job even
# if all Jaguar *out files are available, for such cases the Jaguar
# jobs are not restarted
launch_dirs = []
for afile in files:
job_name = fileutils.get_basename(afile)
launch_dir = job_name = f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}'
cmd = list(base_cmd)
cmd += ['-JOBNAME', job_name]
cmd += [FLAG_INPUT_FILE, afile]
os.makedirs(launch_dir, exist_ok=True)
shutil.copy(afile, os.path.join(launch_dir, afile))
job = jaguarworkflows.RobustSubmissionJob(
cmd, command_dir=launch_dir)
job_dj.addJob(job)
launch_dirs.append(launch_dir)
if self.logger:
self.logger.info('Running descriptors on all files.')
self.logger.info('')
job_dj.run()
job_be = jobcontrol.get_backend()
for launch_dir in launch_dirs:
zip_file = launch_dir + '.zip'
zip_directory(launch_dir, fileobj=zip_file)
if job_be:
job_be.copyOutputFile(zip_file)
[docs]class ReactionWorkflowException(Exception):
pass
[docs]class ReactionWorkflow(ConformationalSearchMixin, JMSWFMixin):
"""
Manage a reaction workflow.
"""
[docs] def __init__(self,
reaction_wf_input_sts,
n_conformers=DEFAULT_N_CONFORMERS,
pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH,
seed=parserutils.RANDOM_SEED_DEFAULT,
only_rings=True,
return_csearch_files=False,
jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS,
temp_data=None,
press_data=None,
return_jaguar_files=False,
anharm=False,
return_anharm_files=False,
anharm_max_freq=anharmonic.DEFAULT_MAX_FREQ,
anharm_factor_data=None,
rate_constants=False,
return_rate_constant_files=False,
wigner_tunnel_corr=False,
extra_stages_file=None,
max_i_freq=anharmonic.DEFAULT_MAX_I_FREQ,
n_jmswf_subjobs=1,
subhost=None,
tpp=DEFAULT_TPP,
logger=None):
"""
Create an instance.
:type reaction_wf_input_sts: list
:param reaction_wf_input_sts: reaction workflow input structures
:type n_conformers: int
:param n_conformers: number of conformers to search for
:type pp_rel_energy_thresh: None or float
:param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then
only the n_conformers lowest energy conformers are returned
:type seed: int
:param seed: seed for random number generator
:type only_rings: bool
:param only_rings: if True then only allow rotation of eta-bound rings,
if False then also allow rotation of ligands where the eta-bound
motif is acyclic, for example ethene, etc.
:type return_csearch_files: bool
:param return_csearch_files: whether to return all output files
from the conformational search subjobs
:type jaguar_keywords: OrderedDict
:param jaguar_keywords: Jaguar keywords
:type temp_data: TempData
:param temp_data: the temperature data for thermochemical properties
:type press_data: PressData
:param press_data: the pressure data for thermochemical properties
:type return_jaguar_files: bool
:param return_jaguar_files: whether to return all output files
from the Jaguar subjobs
:type anharm: bool
:param anharm: whether to run the anharmonic workflow
:type return_anharm_files: bool
:param return_anharm_files: whether to return all output files
from the anharmonic workflow
:type anharm_max_freq: float
:param anharm_max_freq: anharmonic potentials are created for normal modes with
harmonic frequencies less than this value in wavenumbers (cm^-1)
:type anharm_factor_data: anharmonic.SeqData or None
:param anharm_factor_data: unitless factor data for factors that multiply
a normal mode displacement, if None then the defaults are used,
the number of points is in the positive direction only, excluding
zero and the negative direction, for example using a value
of 4 in turn means 2 * 4 + 1 = 9 points total
:type rate_constants: bool
:param rate_constants: whether to report rate constant(s) for the rate
determining step of the reaction using canonical transition state
theory
:type return_rate_constant_files: bool
:param return_rate_constant_files: whether to return all output files
from the rate constant subjobs
:type wigner_tunnel_corr: bool
:param wigner_tunnel_corr: whether to include the Wigner tunneling correction
when computinig rate constant(s)
:type extra_stages_file: str
:param extra_stages_file: the name of a file containing extra
stages for a Jaguar Multistage Workflow subjob that will
be performed on all output structures from the reaction workflow,
the first of these extra stages is always skipped so as to allow
analysis to potentially be the first extra stage
:type max_i_freq: float
:param max_i_freq: tolerate small imaginary frequencies less than this value in
wavenumbers (cm^-1)
:type n_jmswf_subjobs: int
:param n_jmswf_subjobs: the maximum number of simultaneous Jaguar
multistage workflow subjobs
:type subhost: str
:param subhost: the host to use for subjobs
:type tpp: int
:param tpp: the number of threads to use for Jaguar
subjobs, i.e. -TPP (threads-per-process)
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.reaction_wf_input_sts = reaction_wf_input_sts
self.n_conformers = n_conformers
self.pp_rel_energy_thresh = pp_rel_energy_thresh
self.seed = seed
self.only_rings = only_rings
self.return_csearch_files = return_csearch_files
self.jaguar_keywords = jaguar_keywords
self.temp_data = temp_data or TempData(
temp_start=jmswfu.DEFAULT_TEMP_START,
temp_step=jmswfu.DEFAULT_TEMP_STEP,
temp_n=jmswfu.DEFAULT_TEMP_N)
self.press_data = press_data or PressData(
press_start=jmswfu.DEFAULT_PRESS_START,
press_step=jmswfu.DEFAULT_PRESS_STEP,
press_n=jmswfu.DEFAULT_PRESS_N)
self.return_jaguar_files = return_jaguar_files
self.anharm = anharm
self.return_anharm_files = return_anharm_files
self.anharm_max_freq = anharm_max_freq
self.anharm_factor_data = anharm_factor_data or anharmonic.SeqData(
start=anharmonic.DEFAULT_F_START,
step=anharmonic.DEFAULT_F_STEP,
n_points=anharmonic.DEFAULT_F_N_POINTS)
self.rate_constants = rate_constants
self.return_rate_constant_files = return_rate_constant_files
self.wigner_tunnel_corr = wigner_tunnel_corr
self.extra_stages_file = extra_stages_file
self.max_i_freq = max_i_freq
self.n_jmswf_subjobs = n_jmswf_subjobs
self.subhost = subhost
self.tpp = tpp
self.logger = logger
self.sibling_conformers_dict = None
self.eids_dict = {}
self._jmswf_launch_dir = JMSWF_LAUNCH_DIR
self._jmswf_base_name = 'jaguar_multistage_workflow'
self._jmswf_in_mae_file = self._jmswf_base_name + '.maegz'
self._jmswf_in_txt_file = self._jmswf_base_name + '.txt'
self._jmswf_out_mae_file = self._jmswf_base_name + '-out.maegz'
self._jmswf_out_smap_file = self._jmswf_base_name + '-out.smap'
self.has_ts = False
self.rdss = None
self._anharm_dir = ANHARM_LAUNCH_DIR
self._ctst_dir = 'CTST_rate_constants'
self.rep_keys = set()
self.rep_sts = []
self.jmswf_tag = REACTION_WF_TAG
[docs] def validateRateConstants(self):
"""
Validate rate constants.
:raise ReactionWorkflowException: if there is an issue
"""
sibling_conformers_dict = bin_structures_by_property(
self.reaction_wf_input_sts,
key=SIBLING_GROUP_KEY,
inner_key=CONFORMERS_GROUP_KEY)
eligible_for_rate_constants = False
for sibling_group, conformer_group, first_conformer in self.representativeConformers(
sibling_conformers_dict=sibling_conformers_dict):
if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY):
self.has_ts = True
if first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY):
eligible_for_rate_constants = True
break
if self.rate_constants and not eligible_for_rate_constants:
msg = ('Rate constant calculations require child transition state '
'structures.')
raise ReactionWorkflowException(msg)
[docs] def validateAnharmonic(self):
"""
Validate anharmonic.
:raise ReactionWorkflowException: if there is an issue
"""
sibling_conformers_dict = bin_structures_by_property(
self.reaction_wf_input_sts,
key=SIBLING_GROUP_KEY,
inner_key=CONFORMERS_GROUP_KEY)
if not self.has_ts and self.anharm:
getters = [
get_restrain_distance_idxs, get_restrain_angle_idxs,
get_restrain_dihedral_idxs
]
only_restrained_opts = True
for sibling_group, conformer_group, first_conformer in self.representativeConformers(
sibling_conformers_dict=sibling_conformers_dict):
for getter in getters:
if getter(first_conformer):
break
else:
only_restrained_opts = False
if not only_restrained_opts:
break
if only_restrained_opts:
msg = (
'The anharmonic workflow requires frequencies but '
'all Jaguar multistage jobs are restrained optimizations.')
raise ReactionWorkflowException(msg)
[docs] def validateSubhost(self):
"""
Validate subhost.
:raise ReactionWorkflowException: if there is an issue
"""
host = 'localhost'
if jobcontrol.get_backend():
host_list = jobcontrol.get_backend_host_list()
if host_list:
host = host_list[0][0]
if host != 'localhost' and self.subhost == 'localhost':
msg = ('Jobs requesting a remote driver host and local '
'subjob host are not supported.')
raise ReactionWorkflowException(msg)
if not self.subhost:
self.subhost = host
[docs] def validate(self):
"""
Validate.
:raise ReactionWorkflowException: if there is an issue
"""
self.validateRateConstants()
self.validateAnharmonic()
self.validateSubhost()
def _setUpAnharmonic(self):
"""
Set up anharmonic workflow calculations.
:raise ReactionWorkflowException: if there is an issue
"""
# determine which JMSWF stages calculate frequencies, currently allow
# for the possibility of an extra stage calculating frequencies (so
# consider not only the first one or two reaction workflow stages),
# for each stage collect all Jaguar *.out and *.01.in files
in_files_by_stage = {}
pattern = os.path.join(self._jmswf_launch_dir, '*.out')
for out_path in glob.glob(pattern):
jag_out = JaguarOutput(out_path)
if not anharmonic.get_normal_modes(jag_out):
continue
out_files = [os.path.split(out_path)[1]]
if not self.checkJMSWFOutputs(out_files):
continue
rin_path = os.path.splitext(out_path)[0] + '.01.in'
if not os.path.exists(rin_path):
msg = (f'The Jaguar restart input file {rin_path} can '
'not be found.')
raise ReactionWorkflowException(msg)
out_file = os.path.split(out_path)[1]
rin_file = os.path.split(rin_path)[1]
stage_idx = get_stage_idx(out_file)
in_files_by_stage.setdefault(stage_idx, []).append((out_file,
rin_file))
# ensure that for each of these stages each conformer in the reaction
# workflow has an anharmonic input deck
for idx, all_files in in_files_by_stage.items():
if len(all_files) != len(self.rep_sts):
msg = (
'Some Jaguar output files from Jaguar Multistage Workflow '
f'stage {idx} are missing frequency data.')
raise ReactionWorkflowException(msg)
# prepare anharmonic subjobs in a separate directory
os.makedirs(self._anharm_dir, exist_ok=True)
for all_files in in_files_by_stage.values():
for files in all_files:
for afile in files:
apath = os.path.join(self._jmswf_launch_dir, afile)
bpath = os.path.join(self._anharm_dir, afile)
shutil.copy(apath, bpath)
[docs] def runAnharmonic(self):
"""
Run the anharmonic workflow.
"""
self._setUpAnharmonic()
# allow multiple simultaneous anharmonic subjobs
options = argparse.Namespace(TPP=self.tpp)
host = f'{self.subhost}:{self.n_jmswf_subjobs}'
job_dj = jaguarworkflows.create_queue(options=options, host=host)
# allow multiple simultaneous Jaguar single point subjobs for each
# anharmonic subjob, continue processing even if there are zero
# normal modes to be treated anharmonically
cmd = ['anharmonic_workflow_driver.py']
cmd += [anharmonic.FLAG_MAX_FREQ, str(self.anharm_max_freq)]
cmd += [
anharmonic.FLAG_F_DATA,
str(self.anharm_factor_data.start),
str(self.anharm_factor_data.step),
str(self.anharm_factor_data.n_points)
]
cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
cmd += [anharmonic.FLAG_PROCESS_NO_ANHARMONICITIES]
cmd += [FLAG_TPP, str(self.tpp)]
cmd += ['-HOST', host]
# collect jobs, restart *.out files are marked with ANHARMONIC_TAG
# and should be skipped here, in contrast to JMSWF which only runs
# Jaguar jobs the anharmonic driver does more than just running
# Jaguar jobs so currently restart each job even if all Jaguar *out
# files are available, for such cases the Jaguar jobs are not restarted
pattern = os.path.join(self._anharm_dir, '*.out')
for out_path in glob.glob(pattern):
if ANHARMONIC_TAG in out_path:
continue
out_file = os.path.split(out_path)[1]
base_name, ext = os.path.splitext(out_file)
rin_file = base_name + '.01.in'
job_name = base_name + ANHARMONIC_TAG
_cmd = list(cmd)
_cmd += [anharmonic.FLAG_JAG_OUT_FILE, out_file]
_cmd += [anharmonic.FLAG_JAG_RIN_FILE, rin_file]
_cmd += ['-JOBNAME', job_name]
jc_job = jaguarworkflows.RobustSubmissionJob(
_cmd, command_dir=self._anharm_dir, name=job_name)
job_dj.addJob(jc_job)
# run jobs
job_dj.run()
[docs] def processAnharmonic(self):
"""
Process the anharmonic workflow.
:raise ReactionWorkflowException: if there is an issue
"""
# check output, for now require success for each conformer
pattern = os.path.join(self._anharm_dir, f'*{anharmonic.OUT_EXT}')
mae_paths = glob.glob(pattern)
stage_idxs = set(get_stage_idx(mae_path) for mae_path in mae_paths)
force_return_anharm_files = len(mae_paths) != len(stage_idxs) * len(
self.rep_sts)
# copy output
if self.return_anharm_files or force_return_anharm_files:
jobutils.add_zipfile_to_backend(self._anharm_dir)
if force_return_anharm_files:
msg = ('At least a single anharmonic workflow subjob has failed.')
raise ReactionWorkflowException(msg)
# update representative structures with anharmonic properties for relevant
# stages, update keys, and store energy keys
for rep_st in self.rep_sts:
for stage_idx in stage_idxs:
mae_path = os.path.join(self._anharm_dir,
f'{rep_st.title}{STAGE_SEPARATOR}{stage_idx}{ANHARMONIC_TAG}' \
f'{anharmonic.OUT_EXT}')
st = structure.Structure.read(mae_path)
for key, value in st.property.items():
if key.startswith(anharmonic.KEY_STARTER):
_rep_keys = self._setProperties(rep_st, key, stage_idx,
value)
if 'Total' in key:
self.rep_keys.update(_rep_keys)
def _setRateDeterminingSteps(self):
"""
Set a list of rate determining steps.
:raise ReactionWorkflowException: if there is an issue
"""
rxnwfea = ReactionWorkflowEnergyAnalysis
# use conformationally averaged energies relative to parents
# including cross terms to determine the rate determining step
# of the reaction
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
rel_parents_file = base_name + REL_PARENTS_W_X_EXT
with open(rel_parents_file, 'r') as afile:
rel_parents_dict = json.load(afile)
# bin (child - parent) energy differences by relevant stages to
# determine the rate determining step for each stage, the energy
# to use is U_0 = E_SCF + E_ZPE which is temperature independent
# and thus previously algebraically averaged over conformers and
# is now linearly summable, currently harmonic ZPE is used even
# if modeling anharmonicities
scf_energy_h = rxnwfea.getHeader(jaguarworkflows.GAS_PHASE_ENERGY_PROP)
zpe_energy_h = rxnwfea.getHeader(jaguarworkflows.ZERO_POINT_ENERGY_PROP)
stage_child_parent_energies_dict = {}
for child_name, parent_dict in rel_parents_dict.items():
for parent_name, edict in parent_dict.items():
stage_energies_dict = {}
for h, energy in edict.items():
if h.startswith(scf_energy_h) or h.startswith(zpe_energy_h):
stage_idx = get_stage_idx(h)
stage_energies_dict.setdefault(stage_idx,
[]).append(energy)
for stage_idx, energies in stage_energies_dict.items():
if len(energies) == 2:
stage_child_parent_energies_dict.setdefault(
stage_idx, []).append((child_name, parent_name,
sum(energies)))
if not stage_child_parent_energies_dict:
msg = ('Rate constant calulations require at least a single stage '
'in which Jaguar frequencies are calculated.')
raise ReactionWorkflowException(msg)
# collect rate determining steps by sorting by energy and finding the
# highest energy transition state
rdss = []
for stage_idx, values in stage_child_parent_energies_dict.items():
rds = None
for child_name, parent_name, energy in sorted(
values, key=lambda x: x[2], reverse=True):
for sibling_group, conformer_group, rep_child_conf in self.representativeConformers(
specific_sibling_group=child_name):
if rep_child_conf.property.get(
TRANSITION_STATE_STRUCTURE_KEY):
rds = SimpleNamespace(
stage_idx=stage_idx,
child_name=child_name,
parent_name=parent_name)
break
if rds:
rdss.append(rds)
break
if not rdss:
msg = ('Rate constant calulations require at least a single child '
'structure to be marked as a transition state structure.')
raise ReactionWorkflowException(msg)
self.rdss = rdss
def _getJaguarOutputPathsChildAndParent(self, rds):
"""
Return paths to the Jaguar output files for child and
parent conformers for the given rate determining step.
:type rds: SimpleNamespace
:param rds: the rate determining step
:rtype: list, list
:return: the child and parent conformer paths the latter of
which is a list of lists of paths for each parent structure
"""
# among the child sibling group collect spectator masses and
# for the found TS collect conformer titles
child_spectator_masses = []
child_ts_conformer_titles = []
for conformer_group, conformers in self.sibling_conformers_dict[
rds.child_name].items():
first_conformer = conformers[0]
if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY):
for conformer in conformers:
child_ts_conformer_titles.append(conformer.title)
else:
child_spectator_masses.append(
_get_molecular_weight(first_conformer))
# isolate spectators and determine which parent siblings
# made the TS
binned_parent_conformer_titles = {}
for conformer_group, conformers in self.sibling_conformers_dict[
rds.parent_name].items():
first_conformer = conformers[0]
parent_mass = _get_molecular_weight(first_conformer)
if parent_mass in child_spectator_masses:
child_spectator_masses.remove(parent_mass)
else:
for conformer in conformers:
binned_parent_conformer_titles.setdefault(
conformer_group, []).append(conformer.title)
binned_parent_conformer_titles = \
list(binned_parent_conformer_titles.values())
get_paths = lambda x: [os.path.join(self._jmswf_launch_dir,
title + STAGE_SEPARATOR + str(rds.stage_idx) + '.out')
for title in x]
child_paths = get_paths(child_ts_conformer_titles)
binned_parent_paths = [
get_paths(titles) for titles in binned_parent_conformer_titles
]
return child_paths, binned_parent_paths
def _setUpCTST(self):
"""
Set up canonical transition state theory calculations.
"""
self._setRateDeterminingSteps()
# the Wigner tunneling correction is computed from the imaginary
# frequency of the transition state and ultimately multiples its
# partition function to alter the rate constant
wigner_tunnel_corr = 'yes' if self.wigner_tunnel_corr else 'no'
temps = ' '.join([
str(self.temp_data.temp_start + i * self.temp_data.temp_step)
for i in range(0, self.temp_data.temp_n)
])
# collect all required CTST jobs
get_conf_title = lambda x: fileutils.get_basename(x).split(STAGE_SEPARATOR)[0]
for rds in self.rdss:
child_paths, binned_parent_paths = self._getJaguarOutputPathsChildAndParent(
rds)
rds.jobs = []
for child_path in child_paths:
child_title = get_conf_title(child_path)
for parent_paths in itertools.product(*binned_parent_paths):
parent_title = '_'.join(
get_conf_title(parent_path)
for parent_path in parent_paths)
job_name = (f'tst{STAGE_SEPARATOR}{rds.stage_idx}_'
f'{parent_title}_{child_title}')
cmd = [os.path.join(os.environ['SCHRODINGER'], 'run')]
cmd += ['tst_rate_startup.py']
cmd += ['-wigner_tunnel_corr', wigner_tunnel_corr]
cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
cmd += ['-temperature', temps]
cmd += ['-COMPLEX_OUTPUT', child_path]
cmd += ['-COMPLEX_TRV', 'trv']
if self.anharm:
base_name = fileutils.get_basename(child_path)
mae_path = os.path.join(
self._anharm_dir,
f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}')
cmd += ['-COMPLEX_ANHARM_OUTPUT_MAE', mae_path]
for idx, parent_path in enumerate(parent_paths):
cmd += [f'-REAGENT_OUTPUT_{idx}', parent_path]
cmd += [f'-REAGENT_TRV_{idx}', 'trv']
if self.anharm:
base_name = fileutils.get_basename(parent_path)
mae_path = os.path.join(
self._anharm_dir,
f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}'
)
cmd += [
f'-REAGENT_ANHARM_OUTPUT_MAE_{idx}', mae_path
]
cmd += ['-DEBUG'] # enable verbose logging
cmd += [job_name]
rds.jobs.append(cmd)
[docs] def runCTST(self):
"""
Run canonical transition state theory calculations to determine rate
constant(s) for the rate determining step of the reaction.
"""
self._setUpCTST()
# see MATSCI-6611 - originally jobs were launched simultaneously on
# self.subhost using a JobDJ filled with SubprocessJob types but it
# was found that starting the first of such jobs was hanging
# indefinitely on Windows and so it was switched to use subprocess
# (so running on the original -HOST) in serial
for rds in self.rdss:
rds.stdouts = []
for job in rds.jobs:
try:
stdout = subprocess.check_output(
job, stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError:
stdout = ''
rds.stdouts.append(stdout)
[docs] def prepareCTSTOutput(self):
"""
Prepare CTST output.
:raise ReactionWorkflowException: if there is an issue
"""
# process raw output data
failed_stage_idx = None
for rds in self.rdss:
temp_data_dict = {}
for stdout in rds.stdouts:
for line in stdout.splitlines():
if line.startswith('{'):
data = yaml.safe_load(line.strip())
temperature = float(data['Temperature'][0])
energy = float(data['Energy Barrier'][0])
r_partition_f = float(
data['Reagents Partition Function'][0])
ts_partition_f = float(
data['Complex Partition Function'][0])
mantissa, order_of_magnitude = data[
'TST Rate Constant'][0].split(' x ')
rate_constant = float(mantissa) * eval(
order_of_magnitude.replace('^', '**'))
rate_constant_units = data['TST Rate Constant'][1]
data = (energy, r_partition_f, ts_partition_f,
rate_constant, rate_constant_units)
temp_data_dict.setdefault(temperature, []).append(data)
if not temp_data_dict:
failed_stage_idx = rds.stage_idx
break
for temp, data in temp_data_dict.items():
energies, r_partition_fs, ts_partition_fs, rate_constants, \
rate_constant_units = [numpy.array(i) for i in zip(*data)]
# energies are temperature independent so perform an algebraic
# average
avg_energy = numpy.mean(energies)
# Boltzmann average the temperature dependent partition functions
# and rate constants
factors = numpy.exp(-1 * energies / (IDEAL_GAS_CONSTANT * temp))
partition_f = sum(factors)
avg_r_partition_f = numpy.dot(r_partition_fs,
factors) / partition_f
avg_ts_partition_f = numpy.dot(ts_partition_fs,
factors) / partition_f
avg_rate_constant = numpy.dot(rate_constants,
factors) / partition_f
temp_data_dict[temp] = (avg_energy, avg_r_partition_f,
avg_ts_partition_f, avg_rate_constant,
rate_constant_units[0])
rds.temp_data_dict = temp_data_dict
# process raw output files
if failed_stage_idx is not None or self.return_rate_constant_files:
os.makedirs(self._ctst_dir, exist_ok=True)
for rds in self.rdss:
for job, stdout in zip(rds.jobs, rds.stdouts):
log_file = os.path.join(self._ctst_dir, job[-1] + '.log')
with open(log_file, 'w') as afile:
afile.write(stdout)
jobutils.add_zipfile_to_backend(self._ctst_dir)
if failed_stage_idx:
msg = (
'All rate constant calculations for stage '
f'{failed_stage_idx} have failed for all child and parent '
'conformers.')
raise ReactionWorkflowException(msg)
# process summary output file
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
rate_constants_file = base_name + RATE_CONSTANTS_W_X_EXT
with csv_unicode.writer_open(rate_constants_file) as afile:
writer = csv.writer(afile)
writer.writerow([
'Stage_Index', 'Parent_Sibling_Group_Name',
'Child_Sibling_Group_Name', 'Temperature/K',
'Conf_Avg_SCF+ZPE_Energy_Barrier/kcal/mol',
'Conf_Avg_Reactants_Partition_F', 'Conf_Avg_TS_Partition_F',
'Conf_Avg_CTST_Rate_Constant', 'CTST_Rate_Constant_Units'
])
for rds in self.rdss:
for temp, data in rds.temp_data_dict.items():
writer.writerow(
[rds.stage_idx, rds.parent_name, rds.child_name, temp] +
list(data))
backend = None
jobutils.add_outfile_to_backend(rate_constants_file, backend)
[docs] def run(self):
"""
Run the reaction workflow.
:raise ReactionWorkflowException: if there is an issue
"""
zip_files = jaguar_restart.get_restart_zip_files(
base_names=[JMSWF_LAUNCH_DIR, ANHARM_LAUNCH_DIR])
jaguar_restart.prepare_restart_dirs(zip_files)
self.validate()
try:
self.createConformers(self.reaction_wf_input_sts)
except ConformationalSearchException as err:
raise ReactionWorkflowException(str(err))
try:
self._overwriteConformerEIDS()
self.runJMSWF()
self.prepareJMSWFOutput()
self.setRepresentatives()
except JMSWFException as err:
raise ReactionWorkflowException(str(err))
if self.anharm:
self.runAnharmonic()
self.processAnharmonic()
out_mae_file = self.finalizeJMSWFOutput()
ReactionWorkflowEnergyAnalysis(out_mae_file,
self.rep_keys).writeDataFiles()
if self.rate_constants:
self.runCTST()
self.prepareCTSTOutput()