Source code for schrodinger.application.matsci.reaction_workflow_utils

"""
Utilities for reaction workflows.

Copyright Schrodinger, LLC. All rights reserved.
"""

import argparse
import copy
import csv
import glob
import itertools
import json
import os
import re
import shutil
import warnings
from collections import OrderedDict
from collections import namedtuple
from types import SimpleNamespace

import networkx as nx
import numpy
import yaml
from scipy import constants

from schrodinger import structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import anharmonic
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import etarotamers
from schrodinger.application.matsci import \
    jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import property_names as pnames
from schrodinger.application.matsci.genetic_optimization import \
    genetic_optimization as go
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from schrodinger.utils import units
from schrodinger.forcefield import OPLS_NAME_F16

FLAG_N_CONFORMERS = '-n_conformers'
FLAG_PP_REL_ENERGY_THRESH = '-pp_rel_energy_thresh'
FLAG_RETURN_CSEARCH_FILES = '-return_csearch_files'

FLAG_JAGUAR = '-jaguar'
FLAG_JAGUAR_KEYWORDS = '-jaguar_keywords'
FLAG_TEMP_START = '-temp_start'
FLAG_TEMP_STEP = '-temp_step'
FLAG_TEMP_N = '-temp_n'
FLAG_PRESS_START = '-press_start'
FLAG_PRESS_STEP = '-press_step'
FLAG_PRESS_N = '-press_n'
FLAG_MAX_I_FREQ = anharmonic.FLAG_MAX_I_FREQ
FLAG_TPP = jobutils.FLAG_TPP

FLAG_DESCRIPTORS = '-descriptors'
FLAG_LIGFILTER = '-ligfilter'
FLAG_CANVAS = '-canvas'
FLAG_MOLDESCRIPTORS = '-moldescriptors'
FLAG_INCLUDE_VECTORIZED = '-include_vectorized'
FLAG_FINGERPRINTS = '-fingerprints'
FLAG_SAVEFILES = '-savefiles'

FLAG_INPUT_FILE = '-input_file'

DEFAULT_JOB_NAME = 'reaction_workflow'

DEFAULT_N_CONFORMERS = 5
# keys and values should be strings
DEFAULT_JAGUAR_KEYWORDS = OrderedDict([('dftname', 'B3LYP'), ('iuhf', '1'),
                                       ('basis', 'LACVP**')])
DEFAULT_TPP = 1
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 does
DEFAULT_FORCE_FIELD_NAME = mm.OPLS_NAME_F16  # OPLS3

RESERVED_JAGUAR_KEYS = set([
    'molchg', 'multip', 'igeopt', 'ifreq', 'itrvec', 'nhesref', 'ntemp',
    'tmpini', 'tmpstp', 'npress', 'press', 'press_step'
])

REACTION_WF_TAG = '_rxnwf'

REACTION_WF_STRUCTURE_KEY = 'b_matsci_Reaction_Workflow_Structure'
CONFORMERS_GROUP_KEY = 's_matsci_Reaction_Workflow_Conformers_Group'
SIBLING_GROUP_KEY = 's_matsci_Reaction_Workflow_Sibling_Group'
# for the following keep the original key for backwards compatibility
PARENT_SIBLING_GROUPS_KEY = 's_matsci_Reaction_Workflow_Parent_Conformer_Groups'
CHARGE_KEY = 'i_m_Molecular_charge'
MULTIPLICITY_KEY = 'i_m_Spin_multiplicity'
KEEP_ATOM_KEY = 'b_matsci_Reaction_Workflow_Keep_Atom'
SUPERPOSITION_ATOM_KEY = 'i_matsci_Reaction_Workflow_Superposition_Atom'
RESTRAINED_ATOM_KEY = 'b_matsci_Reaction_Workflow_Restrained_Atom'
RESTRAINED_DISTANCES_KEY = 's_matsci_Reaction_Workflow_Restrained_Distances'
RESTRAINED_ANGLES_KEY = 's_matsci_Reaction_Workflow_Restrained_Angles'
RESTRAINED_DIHEDRALS_KEY = 's_matsci_Reaction_Workflow_Restrained_Dihedrals'
TRANSITION_STATE_STRUCTURE_KEY = \
    'b_matsci_Reaction_Workflow_Transition_State_Structure'

INDEX_SEPARATOR = ','
PARENT_SEPARATOR = ','
SEPARATOR = ';'

# must use str-ed values here rather than mm.<function>
PROP_TYPE_GET_DICT = {
    'r': 'mmct_atom_property_get_real',
    'b': 'mmct_atom_property_get_bool',
    'i': 'mmct_atom_property_get_int',
    's': 'mmct_atom_property_get_string'
}

ENTRY_ID_KEY = pnames.ENTRY_ID_PROP

TempData = namedtuple('TempData', ['temp_start', 'temp_step', 'temp_n'])
PressData = namedtuple('PressData', ['press_start', 'press_step', 'press_n'])

HARTREE_UNITS = ['au']
KCAL_PER_MOL_UNITS = ['kcal/mol']
EV_UNITS = ['eV']
KJOULES_PER_MOL_UNITS = ['kJ/mol']

# kcal/mol/K
IDEAL_GAS_CONSTANT = (constants.R / constants.calorie) / constants.kilo

STAGE_SEPARATOR = '_stage_'

REL_REACTANTS_W_X_EXT = '_conf_avg_rel_reactants.csv'
REL_REACTANTS_WO_X_EXT = '_conf_avg_wo_x_rel_reactants.csv'
REL_REACTANTS_LOWEST_EXT = '_lowest_conf_rel_reactants.csv'
REL_PARENTS_W_X_EXT = '_conf_avg_rel_parents.json'
REL_PARENTS_WO_X_EXT = '_conf_avg_wo_x_rel_parents.json'
REL_PARENTS_LOWEST_EXT = '_lowest_conf_rel_parents.json'

REL_REACTANTS_EXTS = [
    REL_REACTANTS_W_X_EXT, REL_REACTANTS_WO_X_EXT, REL_REACTANTS_LOWEST_EXT
]
REL_PARENTS_EXTS = [
    REL_PARENTS_W_X_EXT, REL_PARENTS_WO_X_EXT, REL_PARENTS_LOWEST_EXT
]

REL_EXT_TO_KWARG_DICT = {
    REL_REACTANTS_W_X_EXT: 'edict_w_x',
    REL_REACTANTS_WO_X_EXT: 'edict_wo_x',
    REL_REACTANTS_LOWEST_EXT: 'edict_lowest',
    REL_PARENTS_W_X_EXT: 'edict_w_x',
    REL_PARENTS_WO_X_EXT: 'edict_wo_x',
    REL_PARENTS_LOWEST_EXT: 'edict_lowest'
}

ANHARMONIC_TAG = '_anharmonic'

RATE_CONSTANTS_W_X_EXT = '_conf_avg_rate_constants.csv'

# energies in kJ/mol, MacroModel output uses a '-S-OPLS' extension
MMOD_ENERGY_KEY = f'r_mmod_Potential_Energy-{OPLS_NAME_F16}'
MMOD_REL_ENERGY_KEY = f'r_mmod_Relative_Potential_Energy-{OPLS_NAME_F16}'

FFLD_ENERGY_GLOBAL_TAG = '_global'

# kJ/mol
CSEARCH_REL_ENERGY_THRESH = 50.

# Ang.
CSEARCH_RMSD_THRESH = 0.1

JMSWF_LAUNCH_DIR = 'jaguar_multistage_workflow'
ANHARM_LAUNCH_DIR = 'anharmonic'

# the variable name pp_rel_energy_thresh uses the prefix pp to indicate
# that this is the relative energy used in postprocessing MacroModel results
# and is different from the variable name rel_energy_thresh, used elsewhere in this
# code, which is the relative energy used in the MacroModel csearch
PP_CSEARCH_REL_ENERGY_THRESH = None  # kJ/mol or None

DESCRIPTORS_LAUNCH_DIR_HEADER = 'descriptors_'


[docs]def get_idxs_marked_atoms(st, prop): """ Return a list of indices of atoms in the given structure that have the given property defined. :type st: schrodinger.structure.Structure :param st: the structure :type prop: str :param prop: the property that marks the atoms :rtype: list :return: contains indices of atoms """ afunc = getattr(mm, PROP_TYPE_GET_DICT[prop[0]]) try: # if no exception then atom index 1 has the property # defined afunc(st, prop, 1) except mm.MmException as mmexc: # either is not available as an atom property # or is available but no atom defines it if mmexc.rc == mm.MMCT_ATOM_PROPERTY_NOT_DEFINED_IN_CT: gather = False # atom index other than 1 has the property defined elif mmexc.rc == mm.MMCT_ATOM_PROPERTY_UNDEFINED_ATOM: gather = True else: raise else: gather = True idxs = [] if gather: for atom in st.atom: if atom.property.get(prop): idxs.append(atom.index) return idxs
[docs]def get_restrain_atom_idxs(st): """ Return a list of indices of restrain atoms in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains indices of restrain atoms """ return get_idxs_marked_atoms(st, RESTRAINED_ATOM_KEY)
[docs]class InvalidInput(Exception): pass
[docs]def get_idx_groups(text): """ Get index groups from the given string. :type text: str :param text: the string :raise: InvalidInput if there is a formatting issue :rtype: list :return: contains list of indices """ # valid patterns are like '(i,j,...);(k,l,...);...' idxs = [] for token in text.split(SEPARATOR): if not token: continue try: obj = eval(token) except (SyntaxError, NameError): raise InvalidInput if not isinstance(obj, tuple): raise InvalidInput if not all(isinstance(i, int) for i in obj): raise InvalidInput idxs.append(list(obj)) return idxs
def _get_restrain_group_idxs(st, prop): """ Return a list of lists of indices of a restrain group in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type prop: str :param prop: the restrain property :rtype: list :return: contains lists of restrain indices """ idxs = st.property.get(prop, []) if idxs: idxs = get_idx_groups(idxs) return idxs
[docs]def get_restrain_distance_idxs(st): """ Return a list of lists of indices of restrain distances in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain distances """ return _get_restrain_group_idxs(st, RESTRAINED_DISTANCES_KEY)
[docs]def get_restrain_angle_idxs(st): """ Return a list of lists of indices of restrain angles in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain angles """ return _get_restrain_group_idxs(st, RESTRAINED_ANGLES_KEY)
[docs]def get_restrain_dihedral_idxs(st): """ Return a list of lists of indices of restrain dihedrals in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain dihedrals """ return _get_restrain_group_idxs(st, RESTRAINED_DIHEDRALS_KEY)
[docs]def get_idx_groups_str(idx_groups): """ Get a string representation of the given index groups. :type idx_groups: list :param idx_groups: contains lists of indices :rtype: str :return: the string """ strs = [] for idx_group in idx_groups: astr = '(' + INDEX_SEPARATOR.join([str(i) for i in idx_group]) + ')' strs.append(astr) return SEPARATOR.join(strs)
[docs]def get_jaguar_keywords_list(jaguar_keywords_dict): """ Return the Jaguar keywords list from the given dict. :type jaguar_keywords_dict: OrderedDict :param jaguar_keywords_dict: the Jaguar keywords dict :rtype: list :return: the Jaguar keywords list """ return ['='.join(item) for item in jaguar_keywords_dict.items()]
[docs]def type_cast_seed(seed): """ Type cast the seed. :type seed: str or unicode :param seed: seed for random number generator :rtype: int :return: the seed """ return parserutils.type_random_seed( seed, seed_min=go.CONF_SEARCH_SEED_RANGE[0], seed_max=go.CONF_SEARCH_SEED_RANGE[1])
[docs]def type_cast_jaguar_keywords(jaguar_keywords, reserved_keys=RESERVED_JAGUAR_KEYS, exception_type=argparse.ArgumentTypeError): """ Type cast the Jaguar keywords. :type jaguar_keywords: str or unicode or list :param jaguar_keywords: the Jaguar keywords, a whitespace delimited string of '<key>=<value>' tokens or a list of such tokens :type reserved_keys: set :param reserved_keys: contains reserved Jaguar keys :type exception_type: type :param exception_type: the exception type to raise if invalid :rtype: OrderedDict :return: the Jaguar keywords OrderedDict """ if not jaguar_keywords: msg = ('Jaguar keywords must be provided.') raise exception_type(msg) if isinstance(jaguar_keywords, list): jaguar_keywords = ' '.join(jaguar_keywords) jaguar_keywords = str(jaguar_keywords) try: jaguarworkflows.keyword_string_to_dict(jaguar_keywords) except ValueError as err: msg = ('The following issue has been found with the ' 'specified Jaguar keywords: {err}. Such options ' 'must be specified using whitespace separated ' '<key>=<value> pairs.').format(err=str(err)) raise exception_type(msg) adict = OrderedDict([token.split('=') for token in jaguar_keywords.split()]) if reserved_keys.intersection(set(adict.keys())): msg = ('The following Jaguar keys are reserved for this workflow: ' '{keys}.').format(keys=sorted(reserved_keys)) raise exception_type(msg) return adict
[docs]def check_ff_assignment(sts, ffld_name=DEFAULT_FORCE_FIELD_NAME): """ Check the assignment of the given force field to the given structures. :type sts: list :param sts: contains schrodinger.structure.Structure :type ffld_name: str :param ffld_name: the force field name :raise ValueError: if invalid """ # currently the following does not support an mmlewis_apply # clean up layer (which can change the incoming atom types), # see minimize.Minimizer.setStructure for more details ffld_version = mm.opls_name_to_version(ffld_name) invalid_sts = desmondutils.find_forcefield_invalid_structures( sts, ffld_version) invalid_idxs = [str(sts.index(st) + 1) for st in invalid_sts] if invalid_idxs: idxs = ','.join(invalid_idxs) msg = ('The {name} force field could not be assigned to ' 'the following structures: {idxs}.').format( name=ffld_name, idxs=idxs) raise ValueError(msg)
[docs]def get_molecular_weight(st, idxs=None): """ Return the molecular weight (amu) taken over the given atom indices in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type idxs: list :param idxs: the atom indices :rtype: float :return: the molecular weight (amu) """ # both st.total_weight and atom.atomic_weight use implicit hydrogens # so do it by hand if not idxs: idxs = range(1, st.atom_total + 1) weight = 0 for idx in idxs: weight += mm.mmelement_get_atomic_weight_by_atomic_number( st.atom[idx].atomic_number) return weight
def _get_molecular_weight(st, idxs=None): """ Return the molecular weight (amu) taken over the given atom indices in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type idxs: list :param idxs: the atom indices :rtype: float :return: the molecular weight (amu) """ # see MATSCI-6764 where a precision issue was detected return round(get_molecular_weight(st, idxs=idxs), 3)
[docs]def check_conformers(conformers, conformers_group_hash): """ Check conformers. If the given structures are conformers then their atom numberings are all changed in place so that they are equivalent to that of the first of the given conformers. :type conformers: list :param conformers: contains schrodinger.structure.Structure of conformers :type conformers_group_hash: str :param conformers_group_hash: a group hash :raise ValueError: if invalid """ smiles, masses = set(), set() for st in conformers: smiles.add(analyze.generate_smiles(st)) masses.add(_get_molecular_weight(st)) if len(smiles) > 1 or len(masses) > 1: msg = ('At least a single structure in conformer group ' '{group} does not belong because it is not a ' 'conformer.').format(group=conformers_group_hash) raise ValueError(msg) # see MATSCI-5538 - where it was found that MacroModel conformational # search requires input conformers to have identical atom ordering, # force all conformers to be numbered like the first conformer, this # is also needed for the following validation # see MATSCI-8580 - where it was found that a water molecule was # renumbered even though it appears that it didn't need to be, # use_symmetry=False prevents it first_conformer = conformers[0] for idx, conformer in enumerate(conformers[1:], 1): conformers[idx] = rmsd.renumber_conformer( first_conformer, conformer, use_symmetry=False, in_place=True, has_hydrogens=True) # yapf: disable check_key_is_atom_key_pairs = [ (REACTION_WF_STRUCTURE_KEY, False), (CONFORMERS_GROUP_KEY, False), (SIBLING_GROUP_KEY, False), (PARENT_SIBLING_GROUPS_KEY, False), (CHARGE_KEY, False), (MULTIPLICITY_KEY, False), (KEEP_ATOM_KEY, True), (RESTRAINED_ATOM_KEY, True), (RESTRAINED_DISTANCES_KEY, False), (RESTRAINED_ANGLES_KEY, False), (RESTRAINED_DIHEDRALS_KEY, False), (TRANSITION_STATE_STRUCTURE_KEY, False) ] # yapf: enable for key, is_atom_key in check_key_is_atom_key_pairs: try: _check_structures_for_property_equivalence( conformers, key=key, is_atom_key=is_atom_key) except ValueError as err: msg = ('The structures in conformer group {group} are ' 'invalid. {err}').format( group=conformers_group_hash, err=str(err)) raise ValueError(msg) # check superposition differently because of the way the data # is stored in order to honor ordering, sets of indices need # to be sorted because rmsd.renumber_conformers can return # inequivalent atom orderings, perhaps because it doesn't # consider what is considered here to actually be conformers # (see for example unit test # reaction_workflow_utils_test.FunctionTester.test_check_conformers # when run without the following sort) all_idxs = set() for st in conformers: idxs = [] for atom in st.atom: idx = atom.property.get(SUPERPOSITION_ATOM_KEY) if idx: idxs.append(idx) all_idxs.add(tuple(sorted(idxs))) if len(all_idxs) > 1: msg = ('The structures in conformer group {group} are ' 'invalid. The given structures have inequivalent {key} ' 'properties.').format( group=conformers_group_hash, key=SUPERPOSITION_ATOM_KEY) raise ValueError(msg)
[docs]def check_reaction_wf_structures(rxn_sts, ffld_name=None, mass_conserved=False, keep_atoms_only=False): """ Check the given reaction workflow structues. :type rxn_sts: str or list :param rxn_sts: the reaction workflow structures, a file name or list of schrodinger.structure.Structure :type ffld_name: str :param ffld_name: the force field name to use when optionally checking its assignment to the given structures :type mass_conserved: bool :param mass_conserved: check that mass is conserved (see also keep_atoms_only kwarg) :type keep_atoms_only: bool :param keep_atoms_only: specifies that only keep atoms be considered when checking if mass is conserved (see also mass_conserved kwarg) :raise ValueError: if invalid """ if isinstance(rxn_sts, str): if not fileutils.is_maestro_file(rxn_sts): msg = ('The file {afile} is not a Maestro file.' ).format(afile=rxn_sts) raise ValueError(msg) if not fileutils.get_basename(rxn_sts).endswith(REACTION_WF_TAG): msg = ('The file {afile} is not a reaction workflow file ' 'as its base name does not end in {tag}.').format( afile=rxn_sts, tag=REACTION_WF_TAG) raise ValueError(msg) rxn_sts = [st for st in structure.StructureReader(rxn_sts)] titles = set() for idx, st in enumerate(rxn_sts, 1): if not st.property.get(REACTION_WF_STRUCTURE_KEY): msg = ('Structure number {idx} does not belong to a ' 'reaction workflow.').format(idx=idx) raise ValueError(msg) if not st.property.get(CONFORMERS_GROUP_KEY): msg = ('Structure number {idx} does not belong to a ' 'conformer group.').format(idx=idx) raise ValueError(msg) titles.add(st.title) if len(titles) < len(rxn_sts): msg = ('Structures must have unique titles.') raise ValueError(msg) sibling_conformers_dict = bin_structures_by_property( rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) getters = [ get_restrain_distance_idxs, get_restrain_angle_idxs, get_restrain_dihedral_idxs ] found_parents = True masses = set() for sibling_group, conformers_dict in sibling_conformers_dict.items(): num_tss = 0 parents = set() mass = 0 for conformer_group, conformers in conformers_dict.items(): # check conformers check_conformers(conformers, conformer_group) first_conformer = conformers[0] num_tss += first_conformer.property.get( TRANSITION_STATE_STRUCTURE_KEY, 0) parents.add(first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY)) # check restraint idxs keep_idxs = get_idxs_marked_atoms(first_conformer, KEEP_ATOM_KEY) if keep_idxs: all_restrain_idxs = set() for getter in getters: for restrain_idxs in getter(first_conformer): all_restrain_idxs.update(restrain_idxs) if not all_restrain_idxs.issubset(keep_idxs): msg = ( f'The structures in conformer group {conformer_group} ' 'have atoms involved in distance, angle, and/or dihedral ' 'restraints that are not among the found keep atoms.') raise ValueError(msg) if keep_idxs and keep_atoms_only: mass += _get_molecular_weight(first_conformer, idxs=keep_idxs) else: mass += _get_molecular_weight(first_conformer) masses.add(mass) # check transition states if num_tss > 1: msg = ( f'The structures in sibling group {sibling_group} can not have ' 'more than a single transition state.') raise ValueError(msg) # check parent equivalency if len(parents) > 1: msg = ( f'The structures in sibling group {sibling_group} do not have ' 'identical parents.') raise ValueError(msg) # check parent existence parents_tmp = list(parents)[0] if parents_tmp: for parent in parents_tmp.split(PARENT_SEPARATOR): if parent not in sibling_conformers_dict.keys(): msg = ( f'Sibling group {sibling_group} specifies a non-existent parent ' f'sibling group {parent}.') raise ValueError(msg) # check reactant siblings if None in parents: if not found_parents: msg = (f'Structures in sibling group {sibling_group} ' 'have no parents specified. All groups ' 'other than reactant siblings must ' 'have parent groups defined.') raise ValueError(msg) found_parents = False if found_parents: msg = ('No reactants were found.') raise ValueError(msg) if mass_conserved and len(masses) > 1: msg = ('Mass is not conserved along the reaction path.') raise ValueError(msg) if ffld_name: check_ff_assignment(rxn_sts, ffld_name=ffld_name)
[docs]def type_cast_reaction_wf_input(reaction_wf_input, exception_type=argparse.ArgumentTypeError, mass_conserved=False): """ Type cast the reaction workflow input. :type reaction_wf_input: str or unicode or list :param reaction_wf_input: the reaction workflow input, a file name or list of schrodinger.structure.Structure :type exception_type: type :param exception_type: the exception type to raise if invalid :type mass_conserved: bool :param mass_conserved: check that mass is conserved :rtype: str or list :return: the reaction workflow input, a file name or list of schrodinger.structure.Structure """ if not isinstance(reaction_wf_input, list): reaction_wf_input = str(reaction_wf_input) if not os.path.exists(reaction_wf_input): msg = ('The file {afile} does not exist.' ).format(afile=reaction_wf_input) raise exception_type(msg) if not fileutils.is_maestro_file(reaction_wf_input): msg = ('The file {afile} is not a Maestro file.' ).format(afile=reaction_wf_input) raise exception_type(msg) if not fileutils.get_basename(reaction_wf_input).endswith( REACTION_WF_TAG): msg = ('The file {afile} is not a reaction workflow file ' 'as its base name does not end in {tag}.').format( afile=reaction_wf_input, tag=REACTION_WF_TAG) raise exception_type(msg) reaction_wf_input_sts = [ st for st in structure.StructureReader(reaction_wf_input) ] else: reaction_wf_input_sts = list(reaction_wf_input) if not reaction_wf_input_sts: msg = ('No structures have been provided.') raise exception_type(msg) try: check_reaction_wf_structures( reaction_wf_input_sts, ffld_name=None, mass_conserved=mass_conserved) except ValueError as err: msg = str(err) + (' A valid reaction workflow input file must ' 'first be prepared.') raise exception_type(msg) return reaction_wf_input
[docs]def bin_structures_by_property(sts, key=CONFORMERS_GROUP_KEY, inner_key=None): """ Return a dictionary of structures binned by a property with the given key. If inner_key is provided then return a dictionary of dictionaries of structures with the inner dictionaries keyed by inner_key and outer dictionaries keyed by key. :type sts: list :param sts: the structures :type key: str :param key: the key for the property by which to bin :type inner_key: str :param inner_key: additionally bin by this inner_key :rtype: dict or dict of dict :return: dictionary where keys are properties and values are lists of structures or dictionary of dictionaries where the outer dictionary is keyed by key and inner dictionary is keyed by inner_key and values of the inner dictionary are lists of structures """ # for backwards compatibility in the data structure if the inner # property exists but the outer property does not then for the # outer property use the inner property and update the structure adict = {} for st in sts: aprop = st.property.get(key) if inner_key: inner_aprop = st.property.get(inner_key) if aprop is None and inner_aprop: aprop = inner_aprop st.property[key] = aprop adict.setdefault(aprop, {}).setdefault(inner_aprop, []).append(st) else: adict.setdefault(aprop, []).append(st) return adict
def _check_structures_for_property_equivalence(sts, key=RESTRAINED_ATOM_KEY, is_atom_key=True): """ Check that the given structures have equivalent property values for the given structure or atom key. If it is an atom key then the key must be a boolean key as this function only ensures that the groups of atoms for which the given property exists are equivalent as opposed to checking equivalence among the property values. :type sts: list :param sts: the structures :type key: str :param key: the key for the property for which to check equivalence :type is_atom_key: bool :param is_atom_key: True if the given key is an atom key in which case only property existence is checked as opposed to property values :raise ValueError: if invalid """ aprops = set() for st in sts: if is_atom_key: aprop = tuple(get_idxs_marked_atoms(st, key)) if not aprop: aprop = None else: aprop = st.property.get(key) aprops.add(aprop) if len(aprops) > 1: msg = ('The given structures have inequivalent {key} properties.' ).format(key=key) raise ValueError(msg)
[docs]def append_unique_conformers(sts, unique_sts, rmsd_thresh=CSEARCH_RMSD_THRESH, n_conformers=None): """ Append any unique conformers found in the given structures to the given unique structures. :type sts: list :param sts: schrodinger.structure.Structure candidate conformers :type unique_sts: list :param unique_sts: schrodinger.structure.Structure unique conformers :type rmsd_thresh: float :param rmsd_thresh: the maximum allowable RMSD (Ang.) between two structures before they can be considered different conformers :type n_conformers: int or None :param n_conformers: number of sought conformers or None if there isn't one """ # heavy atoms only idxs = [atom.index for atom in sts[0].atom if atom.atomic_number != 1] # consider how many unique conformers are given if not unique_sts: unique_sts.append(sts[0]) start_idx = 1 else: start_idx = 0 # early exit if there are sufficient conformers if n_conformers and len(unique_sts) == n_conformers: return for st in sts[start_idx:]: for unique_st in unique_sts: armsd = rmsd.superimpose( st, idxs, unique_st, idxs, use_symmetry=False, move_which=rmsd.CT) if armsd < rmsd_thresh: # non-unique conformer found break else: # unique conformer found unique_sts.append(st) # early exit if there are sufficient conformers if n_conformers and len(unique_sts) == n_conformers: return
[docs]def get_conformers(sts, n_conformers, pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH, rmsd_thresh=CSEARCH_RMSD_THRESH): """ Return either (1) at most the given number of conformers or (2) all conformers with relative energies less than the given value. If (2) then an attempt is made to return at least the given number of conformers even if that means having relative energies larger than the given value. :type sts: list :param sts: schrodinger.structure.Structure conformers :type n_conformers: int :param n_conformers: either the maximum number of conformers if pp_rel_energy_thresh is None or a target minimum number of conformers if pp_rel_energy_thresh is given :type pp_rel_energy_thresh: None or float :param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then only the n_conformers lowest energy conformers are returned :type rmsd_thresh: float :param rmsd_thresh: the maximum allowable RMSD (Ang.) between two structures before they can be considered different conformers :rtype: list :return: schrodinger.structure.Structure conformers """ # first sort by total energy sorted_sts = sorted(sts, key=lambda x: x.property[MMOD_ENERGY_KEY]) if pp_rel_energy_thresh is None: key = MMOD_ENERGY_KEY else: # second sort by relative energy key = MMOD_REL_ENERGY_KEY sorted_sts = sorted(sorted_sts, key=lambda x: x.property[key]) # get unique (in the sense of RMSD) conformers unique_sts = [] if pp_rel_energy_thresh is None: append_unique_conformers( sorted_sts, unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=n_conformers) else: # collect conformers with the proper relative energies sts = [] for idx, st in enumerate(sorted_sts): if st.property[key] > pp_rel_energy_thresh: break sts.append(st) # uniqueify them append_unique_conformers( sts, unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=None) # if there is an insufficient number of conformers then try # to obtain at least n_conformers if n_conformers > len(unique_sts): append_unique_conformers( sorted_sts[idx:], unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=n_conformers) return unique_sts
[docs]def get_int_tuples_from_str_property(st, key, separator=SEPARATOR): """ Return a list of tuples of integers from the given string structure property. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: the property key :type separator: str :param separator: the tuple separator used for the given property :rtype: list :return: contains tuples of integers """ aproperty = st.property.get(key) if not aproperty: return [] else: return [eval(token) for token in aproperty.split(separator)]
[docs]def update_index_properties(st, old_to_new): """ Update the index properties of the given structure. :type st: `structure.Structure` :param st: the structure :type old_to_new: dict :param old_to_new: a map of old-to-new atom indices """ keys = (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY, RESTRAINED_DIHEDRALS_KEY) for key in keys: idxs = _get_new_restrain_group_idxs(st, key, old_to_new) text = get_idx_groups_str(idxs) if text: st.property[key] = text
[docs]def get_core_idxs(st): """ Return a set of atom indices for the core of the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: set :return: core atom indices """ # keep atoms are not to be considered as core atoms atom_keys = [SUPERPOSITION_ATOM_KEY, RESTRAINED_ATOM_KEY] st_keys = [ RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY, RESTRAINED_DIHEDRALS_KEY ] idxs = set() for atom in st.atom: for key in atom_keys: if atom.property.get(key): idxs.add(atom.index) for key in st_keys: for jdxs in _get_restrain_group_idxs(st, key): idxs.update(jdxs) return idxs
def _get_new_restrain_group_idxs(st, key, old_to_new, offset=0): """ Return new restrain group indices. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: determines the type of restrain coordinate :type old_to_new: dict :param old_to_new: the old-to-new atom index map :type offset: int :param offset: an offset added to all new restrain group indices, useful when assembling structures from parts of other structures :rtype: list :return: contains new restrain group indices """ # skip the entire group of old indices if any of them either (1) # are not present in the map or (2) present but map to None new_restrain_idxs = [] for old_list in _get_restrain_group_idxs(st, key): try: new_list = [old_to_new[idx] for idx in old_list] except KeyError: continue if None in new_list: continue new_restrain_idxs.append([idx + offset for idx in new_list]) return new_restrain_idxs
[docs]def representative_conformers(sibling_conformers_dict, specific_sibling_group=None): """ Generator over representative conformers. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :type specific_sibling_group: str or None :param specific_sibling_group: if not None then restrict conformers to be generated only over this sibling group :rtype: tuple :return: the sibling and conformer group names and the representative conformer or None if one doesn't exist """ # all conformers in each list of structures are identical (having identical # properties, etc.) and so choose the first as the representative for sibling_group, conformers_dict in sibling_conformers_dict.items(): if specific_sibling_group and specific_sibling_group != sibling_group: continue for conformer_group, conformers in conformers_dict.items(): if conformers: conformer = conformers[0] else: conformer = None yield tuple([sibling_group, conformer_group, conformer])
[docs]class RepresentativeConformersMixin(object):
[docs] def representativeConformers(self, sibling_conformers_dict=None, specific_sibling_group=None): """ Generator over representative conformers. :type sibling_conformers_dict: dict or None :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures, if None then the class attr is used :type specific_sibling_group: str or None :param specific_sibling_group: if not None then restrict conformers to be generated only over this sibling group :rtype: tuple :return: the sibling and conformer group names and the representative conformer """ if sibling_conformers_dict is None: sibling_conformers_dict = self.sibling_conformers_dict return representative_conformers( sibling_conformers_dict, specific_sibling_group=specific_sibling_group)
[docs]class ReactionWorkflowFile(RepresentativeConformersMixin): """ Manage a reaction workflow file. """
[docs] def __init__(self, rxnwf_file): """ Create an instance. :type rxnwf_file: str :param rxnwf_file: the reaction workflow file """ sts = [st for st in structure.StructureReader(rxnwf_file)] self.sibling_conformers_dict = bin_structures_by_property( sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
[docs] def getReactantsSiblingGroupName(self): """ Return the reactants sibling group name. :rtype: str :return: the reactants sibling group name """ for sibling_group, conformer_group, conformer in self.representativeConformers( ): parents = conformer.property.get(PARENT_SIBLING_GROUPS_KEY) if not parents: return sibling_group
[docs] def getReactantsConformersDict(self): """ Return the reactants conformers dictionary. :rtype: dict :return: keys are conformer group names, values are lists of `structure.Structure` """ sibling_group = self.getReactantsSiblingGroupName() return self.sibling_conformers_dict[sibling_group]
[docs]class ReactionWorkflowEnergyAnalysis(ReactionWorkflowFile): """ Manage a reaction workflow energy analysis. """
[docs] def __init__(self, rxnwf_file, energy_keys): """ Create an instance. :type rxnwf_file: str :param rxnwf_file: the reaction workflow file :type energy_keys: list :param energy_keys: structure property energy keys to consider, if it is temperature dependent then include the temperature (K) as a number followed by 'K' in the key and the corresponding energy must be in supported units (au, kcal/mol, eV, kJ/mol) and must be present in the key as '(<units>)' """ super().__init__(rxnwf_file) self.energy_keys = energy_keys
[docs] @staticmethod def getHeader(energy_key): """ Return a header for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: str :return: the header """ # handle prefix in r_<product>_<header> header = '_'.join(energy_key.split('_')[2:]) conversion = ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion( energy_key) if conversion: _units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key) if _units in header: header = header.replace(f'({_units})', '(kcal/mol)') else: if STAGE_SEPARATOR in header: head, tail = header.split(STAGE_SEPARATOR) header = head + '_(kcal/mol)' + STAGE_SEPARATOR + tail else: header += '_(kcal/mol)' return header
[docs] @staticmethod def getTemperature(energy_key): """ Return the temperature (K) for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: float, None :return: the temperature (K) if there is one """ return jaguarworkflows.get_temperature(energy_key)
[docs] @staticmethod def getUnits(energy_key): """ Return the units for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: str, None :return: the units if there is one """ # units like 'kcal/mol' in '(kcal/mol)' match = re.search(r'\((\D+)\)', energy_key) if match: return match.group(1) base_energy_key = energy_key.split(STAGE_SEPARATOR)[0] _units = jmswfu.JAGUAR_PROP_UNITS_DICT.get(base_energy_key) if _units: return _units[1:-1]
[docs] @staticmethod def getKcalPerMolConversion(energy_key): """ Return the kcal/mol conversion factor for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: float, None :return: the kcal/mol conversion factor if there is one """ _units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key) if _units in HARTREE_UNITS: conversion = units.KCAL_PER_MOL_PER_HARTREE elif _units in KCAL_PER_MOL_UNITS: conversion = 1 elif _units in EV_UNITS: conversion = units.KCAL_PER_MOL_PER_EV elif _units in KJOULES_PER_MOL_UNITS: conversion = units.KCAL_PER_MOL_PER_EV conversion /= units.KJOULES_PER_MOL_PER_EV else: conversion = None return conversion
def _getTotalEnergyEnsemble(self, conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=True, only_lowest_energy=False): """ Return an ensemble of total energies for the given conformers dictionary of siblings and given energy key. :type conformers_dict: dict :param conformers_dict: keys are conformer group names, values are lists of `structure.Structure` :type energy_key: str :param energy_key: the relevant energy key :type conversion: float :param conversion: the energy conversion factor to kcal/mol :type temp: float :param temp: the temperature in K :type do_boltzmann: bool :param do_boltzmann: if True perform a Boltzmann average, otherwise an algebraic average :type include_x_terms: bool :param include_x_terms: whether to include cross terms in the conformational averaging :type only_lowest_energy: bool :param only_lowest_energy: use only the lowest energy conformer rather than averaging over conformers :rtype: list :return: ensemble of total energies """ if include_x_terms and only_lowest_energy: msg = ('Simultaneous use of include_x_terms and ' 'only_lowest_energy is not supported.') raise ReactionWorkflowException(msg) # the following getter takes a list that contains a sublist of conformers # for each reactant, if including cross terms then return the iterator # for all possible collections of mixed conformers of reactants else return # the original list getter = lambda x: itertools.product(*x) if include_x_terms else x ensemble = [] for sample in getter(conformers_dict.values()): energies = [st.property.get(energy_key) for st in sample] if None in energies: return [] if include_x_terms: ensemble.append(sum(energies)) else: min_energy = min(energies) if only_lowest_energy: ensemble.append(min_energy) else: if do_boltzmann: deltas = conversion * ( numpy.array(energies) - min_energy) factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp)) avg_delta = sum([ delta * factor for delta, factor in zip(deltas, factors) ]) partition_f = sum(factors) avg_delta /= partition_f avg_delta /= conversion # handle conversion later energy = min_energy + avg_delta # return total energy ensemble.append(energy) else: ensemble.append(numpy.mean(energies)) if not include_x_terms and ensemble: ensemble = [sum(ensemble)] return ensemble
[docs] def getConfAvgRelEnergies(self, include_x_terms=True, only_lowest_energy=False): """ Return the conformationally averaged energies relative to that of the reactants. :type include_x_terms: bool :param include_x_terms: whether to include cross terms in the conformational averaging :type only_lowest_energy: bool :param only_lowest_energy: use only the lowest energy conformer rather than averaging over conformers :raise ReactionWorkflowException: if there is an issue :rtype: dict :return: keys are sibling group names, values are dicts with energy keys as keys and energy values as values """ if include_x_terms and only_lowest_energy: msg = ('Simultaneous use of include_x_terms and ' 'only_lowest_energy is not supported.') raise ReactionWorkflowException(msg) # with cross terms in the conformational averaging all possible # collections of conformers within a sibling group as well # between sibling groups are included in the averaging which # is done last, for example a high energy conformer of A is # allowed to react with a low energy conformer of B, etc. and that # relative to some collection of conformers from another sibling # group both (high, low), (low, high), etc. collections of (A, B) # are included, without cross terms the averaging is done first # over each molecule in the sibling group thereby reducing the # energy of the group down to a single number # canonical ensemble # relevant energies reported relative to reactants reactants_sibling_group = self.getReactantsSiblingGroupName() reactants_conformers_dict = self.getReactantsConformersDict() # for each energy key for both reactants and non-reactants # obtain a conformeric ensemble of energies summed over siblings # and collect energy differences for all pairs then perform a # Boltzmann average if the energy is both temperature dependent # and of known units and including cross terms, otherwise perform # an algebraic average edict = {} for energy_key in self.energy_keys: conversion = self.getKcalPerMolConversion(energy_key) temp = self.getTemperature(energy_key) do_boltzmann = conversion and temp conversion = conversion or 1 reactants_energies = self._getTotalEnergyEnsemble( reactants_conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy) if not reactants_energies: # this means that at least a single reactant doesn't # have the given energy_key defined, so skip the energy_key continue header = self.getHeader(energy_key) edict.setdefault(reactants_sibling_group, {})[header] = 0. for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): if sibling_group == reactants_sibling_group: continue energies = self._getTotalEnergyEnsemble( conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy) if not energies: # this means that at least a single non-reactant child doesn't # have the given energy_key defined, so first undefine it for # all sibling groups and then skip the energy_key by breaking # out of the non-reactant child loop for _edict in edict.values(): _edict.pop(header, None) break deltas = numpy.array([ conversion * (enr - er) for enr, er in itertools.product(energies, reactants_energies) ]) if do_boltzmann and include_x_terms: factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp)) avg_delta = sum([ delta * factor for delta, factor in zip(deltas, factors) ]) partition_f = sum(factors) # see MATSCI-7153 - where the denominator can be problematic with warnings.catch_warnings(): warnings.filterwarnings('error') try: avg_delta /= partition_f except RuntimeWarning: msg = ( 'Large energy differences with respect to reactants ' 'have been detected. This is typically a sign of a severely ' 'imbalanced chemical reaction in the reaction workflow input ' 'file.') raise ReactionWorkflowException(msg) else: avg_delta = numpy.mean(deltas) edict.setdefault(sibling_group, {})[header] = avg_delta return edict
[docs] def getGraph(self): """ Return a NetworkX directed graph of the reaction workflow energy level diagram. :rtype: networkx.DiGraph :return: nodes are sibling group names and have energy dictionary kwargs, edges are directed (parent, child) pairs """ edict_w_x = self.getConfAvgRelEnergies( include_x_terms=True, only_lowest_energy=False) edict_wo_x = self.getConfAvgRelEnergies( include_x_terms=False, only_lowest_energy=False) edict_lowest = self.getConfAvgRelEnergies( include_x_terms=False, only_lowest_energy=True) graph = nx.DiGraph() for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): graph.add_node( sibling_group, edict_w_x=edict_w_x[sibling_group], edict_wo_x=edict_wo_x[sibling_group], edict_lowest=edict_lowest[sibling_group]) # parents are equivalent just grab an arbitrary first item parents = next(iter(conformers_dict.values()))[0].property.get( PARENT_SIBLING_GROUPS_KEY) if not parents: continue for parent_sibling_group in parents.split(PARENT_SEPARATOR): graph.add_edge(parent_sibling_group, sibling_group) return graph
[docs] @staticmethod def getOrderedNodeNames(graph): """ Return an ordered list of node names from the given graph. :type graph: networkx.DiGraph :param graph: nodes are sibling group names and have energy dictionary kwargs, edges are directed (parent, child) pairs :rtype: list :return: contains ordered node names """ # for example, all lines point right, up-right, or down-right # (just a non-physical example): # # I2 -- TS2 # / \ / \ # TS1 I3 P # / \ / # R I1 # # is ordered as R,TS1,I1,I2,I3,TS2,P # find the reactant node for node in graph.nodes(): if not list(graph.predecessors(node)): reactant_node = node break # start with the reactant node nodes = [reactant_node] current_nodes = [reactant_node] # collect nodes in order by adding for the current nodes # all successor nodes that have no predecessor nodes that # have not already been collected, currently successor nodes # are sorted by name rather than energy, avoid adding the # same node twice which can happen for loops in the graph, # the current nodes to search are updated until all nodes # have been collected while len(nodes) < graph.number_of_nodes(): tmp = [] for current_node in current_nodes: for successor_node in sorted(graph.successors(current_node)): if not set(graph.predecessors(successor_node)).difference( nodes): if successor_node not in nodes: nodes.append(successor_node) tmp.append(successor_node) current_nodes = list(tmp) return nodes
[docs] def writeDataFiles(self): """ Write data files. """ # get ordered nodes graph = self.getGraph() node_names = self.getOrderedNodeNames(graph) base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) out_files = [] # handle the conformationally averaged energies relative to reactants for ext in REL_REACTANTS_EXTS: conf_avg_rel_reactants_file = base_name + ext kwarg = REL_EXT_TO_KWARG_DICT[ext] with csv_unicode.writer_open(conf_avg_rel_reactants_file) as afile: writer = csv.writer(afile) for idx, name in enumerate(node_names): edict = graph.node[name][kwarg] if not idx: headers = sorted(edict.keys()) writer.writerow(['Sibling_Group_Name'] + headers) values = [name] + [edict[header] for header in headers] writer.writerow(values) out_files.append(conf_avg_rel_reactants_file) # handle the conformationally averaged energies relative to parents if len(node_names) > 1: for ext in REL_PARENTS_EXTS: conf_avg_rel_parents_file = base_name + ext kwarg = REL_EXT_TO_KWARG_DICT[ext] final_edict = {} for child_name in node_names: child_edict = graph.node[child_name][kwarg] parents_edict = {} for parent_name in graph.predecessors(child_name): parent_edict = graph.node[parent_name][kwarg] parents_edict[parent_name] = dict( [(key, child_edict[key] - parent_edict[key]) for key in child_edict.keys()]) if parents_edict: final_edict[child_name] = parents_edict with open(conf_avg_rel_parents_file, 'w') as afile: json.dump( final_edict, afile, sort_keys=True, indent=4, separators=(',', ':')) out_files.append(conf_avg_rel_parents_file) backend = None for out_file in out_files: jobutils.add_outfile_to_backend(out_file, backend)
[docs]def get_stage_idx(astr): """ Return the stage index from the given string. :type astr: str :param astr: the string :rtype: int :return: the stage index """ # input can be a file astr = fileutils.get_basename(astr) head, stage_hash = astr.split(STAGE_SEPARATOR) stage_idx = stage_hash.split('_')[0] return int(stage_idx)
[docs]def check_TS_vetting(out_file): """ Return False if TS vetting failed in the given Jaguar out file. :type out_file: str :param out_file: Jaguar out file :rtype: bool :return: False if TS vetting failed """ with open(out_file, 'r') as afile: for line in afile: if line.startswith('Unable to find any valid transition vectors.'): return False return True
[docs]def get_sub_host_str(obj, sub_host_attr, n_procs_attr): """ Return the command line -HOST argument for using a subhost. :type obj: object :param obj: the object, possibly having the given attributes defined :type sub_host_attr: str :param sub_host_attr: the attribute for the subhost :type n_procs_attr: str :param n_procs_attr: the attribute for the number of processors """ try: host = f'{getattr(obj, sub_host_attr)}:{getattr(obj, n_procs_attr)}' except AttributeError: host = jaguarworkflows.AUTOHOST return host
[docs]class ConformationalSearchException(Exception): pass
[docs]class ConformationalSearchMixin(object): """ Manage a MacroModel conformational search. """
[docs] @staticmethod def genEtaRotamers(sibling_conformers_dict, only_rings=True): """ Generate eta-rotamers. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :type only_rings: bool :param only_rings: if True then only allow rotation of eta-bound rings, if False then also allow rotation of ligands where the eta-bound motif is acyclic, for example ethene, etc. :rtype: dict :return: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures, for eta-complexes incoming conformers have been replaced with all possible rotamers """ new_sibling_conformers_dict = {} for sibling_group, conformers_dict in sibling_conformers_dict.items(): new_conformers_dict = {} for conformer_group, conformers in conformers_dict.items(): new_conformers = [] for conformer in conformers: try: rotamers = etarotamers.get_rotamers( conformer, only_rings=only_rings) except etarotamers.EtaRotamersException: # this means that the conformers are not eta-complexes, # no rotamers needed, use original conformers new_conformers = conformers break else: new_conformers.extend(rotamers) new_conformers_dict[conformer_group] = new_conformers new_sibling_conformers_dict[sibling_group] = new_conformers_dict return new_sibling_conformers_dict
[docs] def createConformers(self, sts): """ Create the conformers. :type sts: list :param sts: contains schrodinger.structure.Structure, the structures for which to create conformers, each unique type of structure should have a unique conformer group structure property keyed by CONFORMERS_GROUP_KEY, structures sharing the same CONFORMERS_GROUP_KEY should be conformers of the same structure and are used to seed the conformational search, an additional optional SIBLING_GROUP_KEY can be used to distinguish related groups of conformers, atoms marked with the property RESTRAINED_ATOM_KEY will be restrained :raise ConformationalSearchException: if there is an issue """ sibling_conformers_in_dict = bin_structures_by_property( sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) if not self.n_conformers: self.sibling_conformers_dict = sibling_conformers_in_dict return t_sibling_conformers_in_dict = self.genEtaRotamers( sibling_conformers_in_dict, only_rings=self.only_rings) host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs') # assume equivalence of restrain indices among the structures in # each bin, in this case run_conformational_search adds jobs to the # given job_dj launch_dir = 'csearch' job_dj = jaguarworkflows.create_queue(host=host) for sibling_group, conformers_dict in t_sibling_conformers_in_dict.items( ): for conformer_group, conformers in conformers_dict.items(): first_conformer = conformers[0] restrain_idxs = get_restrain_atom_idxs(first_conformer) run_conformational_search( conformers, n_conformers=self.n_conformers, restrain_idxs=restrain_idxs, seed=self.seed, launch_dir=launch_dir, base_name=conformer_group, job_dj=job_dj) # MacroModel information like errors, etc. is sent to stdout which # is the JobControl log file, so the following run was originally # silenced but then allowed so that the JobControl progress can # be seen job_dj.run() all_failed = not job_dj.isComplete() force_return_csearch_files = all_failed if not all_failed: pp_rel_energy_thresh = getattr(self, 'pp_rel_energy_thresh', None) failed_hashes = [] sibling_conformers_out_dict = {} for sibling_group, conformers_dict in t_sibling_conformers_in_dict.items( ): sibling_conformers_out_dict[sibling_group] = {} for conformer_group, conformers in conformers_dict.items(): out_mae_path = os.path.join(launch_dir, conformer_group + '-out.mae') # see MATSCI-1974 for the reasoning behind the second check if os.path.exists( out_mae_path) and structure.count_structures( out_mae_path): postprocess_conformational_search( conformers[0], out_mae_path) conformers_out = [ st for st in structure.StructureReader(out_mae_path) ] conformers_out = get_conformers( conformers_out, self.n_conformers, pp_rel_energy_thresh=pp_rel_energy_thresh) else: failed_hashes.append(conformer_group) conformers_out = list(sibling_conformers_in_dict[ sibling_group][conformer_group]) sibling_conformers_out_dict[sibling_group][ conformer_group] = conformers_out if failed_hashes: force_return_csearch_files = True n_total_conformers = sum( len(conformers_dict) for conformers_dict in t_sibling_conformers_in_dict.values()) all_failed = len(failed_hashes) == n_total_conformers if not all_failed and self.logger: msg = ( 'MacroModel conformational search jobs for the ' 'following conformer groups have failed: {groups}. ' 'Proceeding with original conformers for these groups. ' 'All output files will be returned.' ).format(groups=','.join(failed_hashes)) self.logger.warning(msg) if not all_failed: self.sibling_conformers_dict = sibling_conformers_out_dict if self.return_csearch_files or force_return_csearch_files: jobutils.add_zipfile_to_backend(launch_dir) if all_failed: msg = ('MacroModel conformational search jobs have failed.') raise ConformationalSearchException(msg)
[docs]class JMSWFException(Exception): pass
[docs]class JMSWFMixin(RepresentativeConformersMixin): """ Manage a Jaguar multistage workflow. """ # various structure and atom properties dictate the behavior of this class def _overwriteConformerEIDS(self): """ Overwrite the Maestro entry ID properties for the conformers. """ # needed for Jaguar multistage workflow idx = 0 for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): for conformer_group, conformers in conformers_dict.items(): for conformer in conformers: idx += 1 eid = conformer.property[ENTRY_ID_KEY] self.eids_dict.setdefault(eid, []).append(str(idx)) conformer.property[ENTRY_ID_KEY] = str(idx) def _getJMSWFGenResKeywords(self, st, include_temp_press_data=True): """ Return the general reserved Jaguar multistage workflow keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type include_temp_press_data: bool :param include_temp_press_data: whether to include the temperature and pressure data :rtype: OrderedDict :return: the keywords """ adict = OrderedDict([('molchg', str(st.property.get(CHARGE_KEY, 0))), ('multip', str( st.property.get(MULTIPLICITY_KEY, 1)))]) temp_data = getattr(self, 'temp_data', None) press_data = getattr(self, 'press_data', None) if include_temp_press_data and temp_data and press_data: adict['tmpini'] = str(temp_data.temp_start) adict['tmpstp'] = str(temp_data.temp_step) adict['ntemp'] = str(temp_data.temp_n) adict['press'] = str(press_data.press_start) adict['press_step'] = str(press_data.press_step) adict['npress'] = str(press_data.press_n) return adict def _getJMSWFOptKeywords(self, st, set_freq=True): """ Return the Jaguar multistage workflow optimization keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type set_freq: bool :param set_freq: if True then set 'ifreq=1' :rtype: str :return: the string of keywords """ reserved_jaguar_keywords = self._getJMSWFGenResKeywords( st, include_temp_press_data=set_freq) reserved_jaguar_keywords['igeopt'] = '1' if set_freq: reserved_jaguar_keywords['ifreq'] = '1' jaguar_keywords = self.jaguar_keywords.copy() jaguar_keywords.update(reserved_jaguar_keywords) return ' '.join(get_jaguar_keywords_list(jaguar_keywords)) @staticmethod def _getJMSWFCoords(st, are_restraints=True): """ Return the Jaguar multistage workflow coords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type are_restraints: bool :param are_restraints: True if the coords are restraint coords, as opposed to active coords :rtype: list :return: contains strings defining the coords """ atype_key_pairs = [(4, RESTRAINED_DISTANCES_KEY), (5, RESTRAINED_ANGLES_KEY), (6, RESTRAINED_DIHEDRALS_KEY)] fields = ['{atype}', '{idxs}'] if are_restraints: fields = [jmswfu.NONE] + fields aformat = jmswfu.DELIM.join(fields) coords = [] for atype, key in atype_key_pairs: params = get_int_tuples_from_str_property(st, key) for param in params: idxs = jmswfu.DELIM.join(str(idx) for idx in param) coord = aformat.format(atype=atype, idxs=idxs) coords.append(coord) return coords def _getJMSWFTSKeywords(self, st): """ Return the Jaguar multistage workflow transition state search keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: str :return: the string of keywords """ reserved_jaguar_keywords = self._getJMSWFGenResKeywords( st, include_temp_press_data=True) reserved_jaguar_keywords['igeopt'] = '2' reserved_jaguar_keywords['ifreq'] = '1' reserved_jaguar_keywords['itrvec'] = '-6' reserved_jaguar_keywords['nhesref'] = '-1' # Jaguar TS vetting options, intentionally use the default # for ts_vet_olap_cut, for ts_vet use the value +3 which considers # both the eigenvector overlap with active atoms and distances # among active atoms, despite the Jaguar documentation using +3 # as opposed to -3 does not cause a job failure so check # afterwards reserved_jaguar_keywords['ts_vet'] = '3' jaguar_keywords = self.jaguar_keywords.copy() jaguar_keywords.update(reserved_jaguar_keywords) return ' '.join(get_jaguar_keywords_list(jaguar_keywords)) def _getJMSWFStages(self): """ Return the Jaguar multistage workflow stages. :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ has_ts = getattr(self, 'has_ts', False) # if there is a TS stage then prevent divergence in the stage # indices by breaking up the work needed for non-TS stages # into two stages aformat = jmswfu.DELIM.join(['{eid}', '{rest}']) opt_jaguar_keywords_lines = [] opt_restraints_lines = [] ts_jaguar_keywords_lines = [] ts_restraints_lines = [] ts_actives_lines = [] for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): for conformer_group, conformers in conformers_dict.items(): for conformer in conformers: eid = conformer.property[ENTRY_ID_KEY] all_opt_restraints = self._getJMSWFCoords( conformer, are_restraints=True) set_freq = not has_ts and not bool(all_opt_restraints) opt_jaguar_keywords = self._getJMSWFOptKeywords( conformer, set_freq=set_freq) opt_jaguar_keywords_lines.append( aformat.format(eid=eid, rest=opt_jaguar_keywords)) for opt_restraints in all_opt_restraints: opt_restraints_lines.append( aformat.format(eid=eid, rest=opt_restraints)) is_ts = conformer.property.get( TRANSITION_STATE_STRUCTURE_KEY) if has_ts or is_ts: if not is_ts: set_freq = not bool(all_opt_restraints) ts_jaguar_keywords = self._getJMSWFOptKeywords( conformer, set_freq=set_freq) for opt_restraints in all_opt_restraints: ts_restraints_lines.append( aformat.format( eid=eid, rest=opt_restraints)) else: ts_jaguar_keywords = self._getJMSWFTSKeywords( conformer) for ts_actives in self._getJMSWFCoords( conformer, are_restraints=False): ts_actives_lines.append( aformat.format(eid=eid, rest=ts_actives)) ts_jaguar_keywords_lines.append( aformat.format(eid=eid, rest=ts_jaguar_keywords)) # the final hessian from a restrained optimization may not be a good # guess for a transition state search so do not inherit it from the # parent in_txt_file = os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file) with open(in_txt_file, 'w') as afile: afile.write(jmswfu.NEW_STAGE + '\n') afile.write(jmswfu.KEYWORDS + '\n') for opt_jaguar_keywords_line in opt_jaguar_keywords_lines: afile.write(opt_jaguar_keywords_line + '\n') if opt_restraints_lines: afile.write(jmswfu.GEOM_CONSTRAINTS + '\n') for opt_restraints_line in opt_restraints_lines: afile.write(opt_restraints_line + '\n') if ts_jaguar_keywords_lines: afile.write(jmswfu.NEW_STAGE + '\n') afile.write(jmswfu.PARENT + '\n') afile.write( jmswfu.DELIM.join(['1', jmswfu.WAVEFUNCTION]) + '\n') afile.write(jmswfu.KEYWORDS + '\n') for ts_jaguar_keywords_line in ts_jaguar_keywords_lines: afile.write(ts_jaguar_keywords_line + '\n') if ts_restraints_lines: afile.write(jmswfu.GEOM_CONSTRAINTS + '\n') for ts_restraints_line in ts_restraints_lines: afile.write(ts_restraints_line + '\n') if ts_actives_lines: afile.write(jmswfu.ACTIVE_COORDINATES + '\n') for ts_actives_line in ts_actives_lines: afile.write(ts_actives_line + '\n') stages = jmswfu.read_stage_datafile(in_txt_file) fileutils.force_remove(in_txt_file) return stages def _updateJMSWFStages(self, stages): """ Update the Jaguar multistage workflow stages. :type stages: list :param stages: contains the jmswfu.StageData for the Jaguar multistage workflow :raise JMSWFException: if there is an issue :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ # here we need to read in the extra stages file and modify # the returned jmswfu.StageData objects so that they can # be seamlessly appended to the given stages, the first # of these extra stages is skipped so as to allow analysis # to potentially be the first extra stage extra_stages = jmswfu.read_stage_datafile(self.extra_stages_file)[1:] n_prev_stages = len(stages) # will be either 1 or 2 for index, extra_stage in enumerate(extra_stages, n_prev_stages + 1): # update the index of this stage extra_stage.index = index # update any parent indices for stage in extra_stage.parent_data: stage.stage += n_prev_stages - 1 # update the parenting indices for any analysis stages for stage in extra_stage.analyze_data: stage.parent_st_idx += n_prev_stages - 1 stage.parent_idxs = [ i + n_prev_stages - 1 for i in stage.parent_idxs ] # update entry IDs, the original entry IDs have been reset to start at # 1 and now include values for any generated conformers, see # ._overwriteConformerEIDS() and .eids_dict which for example # might contain something like eids_dict[17] = [4,5,6,7] if for the # original entry ID 17 there were 4 conformers generated which were # assigned entry IDs 4, 5, 6, and 7, all entry_data data needs # updated entry IDs for stage_subsection, eid_dict in extra_stage.entry_data.items(): # see MATSCI-6429 - since the main interface and all known use cases # force equivalent Jaguar keyword data for all structure entries in an # extra stage we will overwrite stale entry IDs by making this assumption # and choosing an arbitrary mapping of stale entry IDs to current entry # IDs uncommon = set(eid_dict.keys()).difference( self.eids_dict.keys()) if len(eid_dict) == len(self.eids_dict) and uncommon: eid_dict = dict( zip(self.eids_dict.keys(), eid_dict.values())) new_eid_dict = {} for eid, datas in eid_dict.items(): for new_eid in self.eids_dict.get(eid, []): new_datas = [] for data in datas: new_data = copy.deepcopy(data) new_data.eid = new_eid new_datas.append(new_data) new_eid_dict[new_eid] = new_datas if not new_eid_dict: msg = ( f'The extra stages file {self.extra_stages_file} contains ' 'at least a single stage that does not define stage data ' 'for any of the entries in the input file.') raise JMSWFException(msg) extra_stage.entry_data[stage_subsection] = new_eid_dict return stages + extra_stages def _setUpJMSWF(self): """ Set up the Jaguar multistage workflow. :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ extra_stages_file = getattr(self, 'extra_stages_file', None) os.makedirs(self._jmswf_launch_dir, exist_ok=True) with structure.StructureWriter( os.path.join(self._jmswf_launch_dir, self._jmswf_in_mae_file)) as writer: for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): for conformer_group, conformers in conformers_dict.items(): for conformer in conformers: writer.append(conformer) # jmswfu manages the global smap file which if there aren't any # frequency jobs will be removed at the end base_name = os.path.join(self._jmswf_launch_dir, self._jmswf_base_name) + '-out' jmswfu.create_smap(base_name, self._jmswf_out_mae_file) stages = self._getJMSWFStages() if extra_stages_file: stages = self._updateJMSWFStages(stages) jmswfu.write_stages_file(stages, os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file)) return stages
[docs] def runJMSWF(self): """ Run the Jaguar multistage workflow. :raise JMSWFException: if there is an issue """ stages = self._setUpJMSWF() options = argparse.Namespace( TPP=self.tpp, name=self._jmswf_base_name, input_file=self._jmswf_in_mae_file) host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs') job_dj = jaguarworkflows.create_queue(options=options, host=host) # uses jaguarworkflows.RobustSubmissionJob by default with fileutils.chdir(self._jmswf_launch_dir): workflows = jmswfu.create_workflows( options, job_dj, stages, self._jmswf_out_smap_file, hierarchical=False) with structure.StructureWriter(self._jmswf_out_mae_file) as writer: jaguarworkflows.run_workflows(job_dj, workflows, writer) backend = None jmswfu.finalize_smap(self._jmswf_out_smap_file, backend) # for restarts it is possible that nothing was done all_failed = job_dj.total_added and len( job_dj._failed) == job_dj.total_added # handle partial failures in self.setRepresentatives if self.return_jaguar_files or all_failed: jobutils.add_zipfile_to_backend(self._jmswf_launch_dir) if all_failed: msg = ('All Jaguar multistage workflow jobs have failed. ' 'All output files will be returned.') raise JMSWFException(msg)
[docs] def prepareJMSWFOutput(self): """ Prepare Jaguar multistage workflow output. """ backend = None exts = ['-out.maegz', '-out.smap'] + jmswfu.SMAP_ELIGIBLE_EXTENSIONS for ext in exts: pattern = os.path.join(self._jmswf_launch_dir, '*' + ext) for apath in glob.glob(pattern): afile = os.path.split(apath)[1] shutil.copy(apath, afile) jobutils.add_outfile_to_backend(afile, backend)
[docs] def checkJMSWFOutputs(self, out_files): """ Return False if any of the given Jaguar out files should be treated as a failure. :type out_files: list :param out_files: contains Jaguar output files :rtype: bool :return: False if any of the given Jaguar out files should be treated as a failure """ # for transition state structures vetting is used however a bad # result doesn't cause a Jaguar failure, nor does multiple imaginary # frequencies for either optimizations or transition states for out_file in out_files: # Jaguar failures have already been handled in the calling code # so missing Jaguar *out files means that the stage is an analysis # stage out_path = os.path.join(self._jmswf_launch_dir, out_file) if not os.path.exists(out_path): continue # skip non-frequency jobs jag_out = JaguarOutput(out_path) normal_modes = anharmonic.get_normal_modes(jag_out) if not normal_modes: continue # check TS, when asking for vetting there are three outcomes: # (1) vetting performed and a vetted vector found # (2) vetting performed and a vetted vector not found # (3) vetting not performed (see MATSCI-5145) # # see MATSCI-10176 - jag_out.getStructure() does NOT include input # Maestro structure properties mae_path = os.path.join(self._jmswf_launch_dir, f'{jag_out.restart}.mae') st = structure.Structure.read(mae_path) if st.property.get(TRANSITION_STATE_STRUCTURE_KEY): if not check_TS_vetting(out_path): # the found transition state does not involve the requested # active atoms return False normal_mode = jag_out.vetted_ts_vector if normal_mode and normal_mode.frequency != min( [x.frequency for idx, x in normal_modes]): # the normal mode involving the active atoms is not the lowest # frequency mode return False # check imaginary frequencies in_path = os.path.join(self._jmswf_launch_dir, f'{jag_out.restart}.in') if not os.path.exists(in_path): return False jag_in = JaguarInput(input=in_path) try: anharmonic.check_imaginary_frequencies( jag_out, jag_in, max_i_freq=self.max_i_freq) except anharmonic.AnharmonicException: return False return True
@staticmethod def _setProperties(st, key, stage_idx, value): """ Set properties on the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: the property key :type stage_idx: int :param stage_idx: the stage index for the property :type value: float :param value: the property value :rtype: set :return: the created property keys """ rep_key = key + STAGE_SEPARATOR + str(stage_idx) adict = {rep_key: value} st.property.update(adict) return set(adict.keys()) def _updateSiblingConformersDict(self): """ Update the sibling conformers dict. """ # currently the self.sibling_conformers_dict contains conformers with # redundant structure titles if the conformers came from MacroModel, while # the conformers collected in the representatives coming from the Jaguar # multistage workflow job have been uniqueified using a '-<index>' appended # to the title, here we need to update the self.sibling_conformers_dict to # have unique titles, additionally the current self.sibling_conformers_dict # contains conformers considered as failures in the calling code and so also # remove any bad conformers rep_titles = set(rep_st.title for rep_st in self.rep_sts) for sibling_group, conformers_dict in list( self.sibling_conformers_dict.items()): for conformer_group, conformers in list(conformers_dict.items()): prev_title = None good_conformers = [] for idx, conformer in enumerate(conformers, 1): title = conformer.title if title == prev_title and idx > 1: title += f'-{idx}' else: prev_title = title if title in rep_titles: conformer.title = title good_conformers.append(conformer) self.sibling_conformers_dict[sibling_group][ conformer_group] = good_conformers
[docs] def setRepresentatives(self): """ Associated with each structure is output data from potentially multiple Jaguar multistage workflow stages. Pick representative structures to carry the data for all stages. :raise JMSWFException: if there is an issue """ has_ts = getattr(self, 'has_ts', False) # bin stage structures by original structure title, # titles are like 'reactant_stage_1', 'transition_state-2_stage_4_analysis_2', # etc. title_stage_sts_dict = {} for st in structure.StructureReader(self._jmswf_out_mae_file): title, stage_hash = st.title.split(STAGE_SEPARATOR) title_stage_sts_dict.setdefault(title, []).append((st, stage_hash)) stages = jmswfu.read_stage_datafile( os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file)) # choose the most recent structure from the original workflow # as a representative structure to carry the properties from all # extra stages, property keys are modified to end as '_stage_<idx>' if has_ts: rep_stage_idx = 2 else: rep_stage_idx = 1 warned = False copied = self.return_jaguar_files for title, pairs in title_stage_sts_dict.items(): skip_msg = False # the IndexError indicates that the required original workflow # Jaguar calculation has failed for this conformer, continue as there # may be other successful conformers try: rep_st = pairs[rep_stage_idx - 1][0].copy() except IndexError: skip_msg = ('Some Jaguar multistage workflow jobs have failed.') if not skip_msg: out_files = [f'{pair[0].title}.out' for pair in pairs] if not self.checkJMSWFOutputs(out_files): skip_msg = ( 'Postprocessing of at least a single Jaguar multistage workflow ' 'job has failed.') if skip_msg: if not warned: if self.logger: skip_msg += (' All output files will be returned.') self.logger.warning(skip_msg) warned = True if not copied: jobutils.add_zipfile_to_backend(self._jmswf_launch_dir) copied = True continue rep_st.title = title for st, stage_hash in pairs[rep_stage_idx - 1:]: stage_idx = int(stage_hash.split('_')[0]) stage = stages[stage_idx - 1] for key in stage.getPropertyKeys(st=st): rep_st.property.pop(key, None) value = st.property.get(key) if value is not None: _rep_keys = self._setProperties(rep_st, key, stage_idx, value) self.rep_keys.update(_rep_keys) self.rep_sts.append(rep_st) if not self.rep_sts: msg = ( 'Postprocessing of all Jaguar multistage workflow jobs has failed.' ) raise JMSWFException(msg) self._updateSiblingConformersDict() # validate the output workflow file sibling_conformers_dict = bin_structures_by_property( self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) for sibling_group, conformer_group, conformer in self.representativeConformers( ): conformers_dict = sibling_conformers_dict.get(sibling_group) if not conformers_dict: msg = ( 'All Jaguar multistage workflow calculations for sibling ' f'group {sibling_group} have failed.') raise JMSWFException(msg) if not conformers_dict.get(conformer_group): msg = ( 'All Jaguar multistage workflow calculations for conformer ' f'group {conformer_group} have failed.') raise JMSWFException(msg)
[docs] def finalizeJMSWFOutput(self): """ Finalize the Jaguar multistage workflow output. :rtype: str :return: the Jaguar multistage workflow output file """ jmswf_tag = getattr(self, 'jmswf_tag', '') # create a valid non-redundant output file from the # Jaguar multistage workflow output file, currently do not link any smap # file as there can only be a single one linked and they are all available # in the Jaguar multistage workflow output file base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) + '-out' + jmswf_tag out_mae_file = base_name + '.mae' structure.write_cts(self.rep_sts, out_mae_file) smap_dict = {} for idx, rep_st in enumerate(self.rep_sts, 1): out_vib_file_format = rep_st.title + STAGE_SEPARATOR + '%s.vib' out_vib_file = out_vib_file_format % '2' if not os.path.exists(out_vib_file): out_vib_file = out_vib_file_format % '1' if not os.path.exists(out_vib_file): continue smap_dict[out_vib_file] = idx if smap_dict: out_smap_file = jmswfu.create_smap( base_name, out_mae_file, smap_dict=smap_dict) else: out_smap_file = None backend = None if out_smap_file: jobutils.add_outfile_to_backend(out_smap_file, backend) jobutils.add_outfile_to_backend( out_mae_file, backend, set_structure_output=True) return out_mae_file
[docs]class DescriptorsMixin(object): """ Manage running descriptors. """ DEFAULT_JOB_NAME = 'automatic_reaction_workflow'
[docs] def runDescriptors(self, files): """ Run descriptors. :type files: list :param files: the files on which to run descriptors """ # handle restarts job_name = jobutils.get_jobname(self.DEFAULT_JOB_NAME) zip_files = jaguar_restart.get_restart_zip_files( base_name_patterns=[f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}*']) jaguar_restart.prepare_restart_dirs(zip_files) host = get_sub_host_str(self, 'jmswf_host', 'n_rxnwf_subjobs') # keep Jaguar &gen section to a minimum for now keys = ['dftname', 'basis'] jaguar_keywords = {key: self.jaguar_keywords[key] for key in keys} jaguar_keywords = get_jaguar_keywords_list(jaguar_keywords) jaguar_keywords = ' '.join(jaguar_keywords) options = argparse.Namespace(TPP=self.tpp) job_dj = jaguarworkflows.create_queue(options=options, host=host) base_cmd = ['complex_descriptors.py'] base_cmd += [FLAG_JAGUAR] base_cmd += [FLAG_JAGUAR_KEYWORDS, jaguar_keywords] base_cmd += [FLAG_TPP, str(self.tpp)] base_cmd += [FLAG_LIGFILTER] base_cmd += [FLAG_CANVAS] base_cmd += [FLAG_MOLDESCRIPTORS] base_cmd += [FLAG_INCLUDE_VECTORIZED] base_cmd += [FLAG_FINGERPRINTS] if self.return_descriptor_files: base_cmd += [FLAG_SAVEFILES] if host != jaguarworkflows.AUTOHOST: base_cmd += ['-HOST', self.jmswf_host] # collect jobs, the complex descriptors driver does more than # just running Jaguar jobs so currently restart each job even # if all Jaguar *out files are available, for such cases the Jaguar # jobs are not restarted launch_dirs = [] for afile in files: job_name = fileutils.get_basename(afile) launch_dir = job_name = f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}' cmd = list(base_cmd) cmd += ['-JOBNAME', job_name] cmd += [FLAG_INPUT_FILE, afile] os.makedirs(launch_dir, exist_ok=True) shutil.copy(afile, os.path.join(launch_dir, afile)) job = jaguarworkflows.RobustSubmissionJob( cmd, command_dir=launch_dir) job_dj.addJob(job) launch_dirs.append(launch_dir) if self.logger: self.logger.info('Running descriptors on all files.') self.logger.info('') job_dj.run() job_be = jobcontrol.get_backend() for launch_dir in launch_dirs: zip_file = launch_dir + '.zip' zip_directory(launch_dir, fileobj=zip_file) if job_be: job_be.copyOutputFile(zip_file)
[docs]class ReactionWorkflowException(Exception): pass
[docs]class ReactionWorkflow(ConformationalSearchMixin, JMSWFMixin): """ Manage a reaction workflow. """
[docs] def __init__(self, reaction_wf_input_sts, n_conformers=DEFAULT_N_CONFORMERS, pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH, seed=parserutils.RANDOM_SEED_DEFAULT, only_rings=True, return_csearch_files=False, jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS, temp_data=None, press_data=None, return_jaguar_files=False, anharm=False, return_anharm_files=False, anharm_max_freq=anharmonic.DEFAULT_MAX_FREQ, anharm_factor_data=None, rate_constants=False, return_rate_constant_files=False, wigner_tunnel_corr=False, extra_stages_file=None, max_i_freq=anharmonic.DEFAULT_MAX_I_FREQ, n_jmswf_subjobs=1, subhost=None, tpp=DEFAULT_TPP, logger=None): """ Create an instance. :type reaction_wf_input_sts: list :param reaction_wf_input_sts: reaction workflow input structures :type n_conformers: int :param n_conformers: number of conformers to search for :type pp_rel_energy_thresh: None or float :param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then only the n_conformers lowest energy conformers are returned :type seed: int :param seed: seed for random number generator :type only_rings: bool :param only_rings: if True then only allow rotation of eta-bound rings, if False then also allow rotation of ligands where the eta-bound motif is acyclic, for example ethene, etc. :type return_csearch_files: bool :param return_csearch_files: whether to return all output files from the conformational search subjobs :type jaguar_keywords: OrderedDict :param jaguar_keywords: Jaguar keywords :type temp_data: TempData :param temp_data: the temperature data for thermochemical properties :type press_data: PressData :param press_data: the pressure data for thermochemical properties :type return_jaguar_files: bool :param return_jaguar_files: whether to return all output files from the Jaguar subjobs :type anharm: bool :param anharm: whether to run the anharmonic workflow :type return_anharm_files: bool :param return_anharm_files: whether to return all output files from the anharmonic workflow :type anharm_max_freq: float :param anharm_max_freq: anharmonic potentials are created for normal modes with harmonic frequencies less than this value in wavenumbers (cm^-1) :type anharm_factor_data: anharmonic.SeqData or None :param anharm_factor_data: unitless factor data for factors that multiply a normal mode displacement, if None then the defaults are used, the number of points is in the positive direction only, excluding zero and the negative direction, for example using a value of 4 in turn means 2 * 4 + 1 = 9 points total :type rate_constants: bool :param rate_constants: whether to report rate constant(s) for the rate determining step of the reaction using canonical transition state theory :type return_rate_constant_files: bool :param return_rate_constant_files: whether to return all output files from the rate constant subjobs :type wigner_tunnel_corr: bool :param wigner_tunnel_corr: whether to include the Wigner tunneling correction when computinig rate constant(s) :type extra_stages_file: str :param extra_stages_file: the name of a file containing extra stages for a Jaguar Multistage Workflow subjob that will be performed on all output structures from the reaction workflow, the first of these extra stages is always skipped so as to allow analysis to potentially be the first extra stage :type max_i_freq: float :param max_i_freq: tolerate small imaginary frequencies less than this value in wavenumbers (cm^-1) :type n_jmswf_subjobs: int :param n_jmswf_subjobs: the maximum number of simultaneous Jaguar multistage workflow subjobs :type subhost: str :param subhost: the host to use for subjobs :type tpp: int :param tpp: the number of threads to use for Jaguar subjobs, i.e. -TPP (threads-per-process) :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.reaction_wf_input_sts = reaction_wf_input_sts self.n_conformers = n_conformers self.pp_rel_energy_thresh = pp_rel_energy_thresh self.seed = seed self.only_rings = only_rings self.return_csearch_files = return_csearch_files self.jaguar_keywords = jaguar_keywords self.temp_data = temp_data or TempData( temp_start=jmswfu.DEFAULT_TEMP_START, temp_step=jmswfu.DEFAULT_TEMP_STEP, temp_n=jmswfu.DEFAULT_TEMP_N) self.press_data = press_data or PressData( press_start=jmswfu.DEFAULT_PRESS_START, press_step=jmswfu.DEFAULT_PRESS_STEP, press_n=jmswfu.DEFAULT_PRESS_N) self.return_jaguar_files = return_jaguar_files self.anharm = anharm self.return_anharm_files = return_anharm_files self.anharm_max_freq = anharm_max_freq self.anharm_factor_data = anharm_factor_data or anharmonic.SeqData( start=anharmonic.DEFAULT_F_START, step=anharmonic.DEFAULT_F_STEP, n_points=anharmonic.DEFAULT_F_N_POINTS) self.rate_constants = rate_constants self.return_rate_constant_files = return_rate_constant_files self.wigner_tunnel_corr = wigner_tunnel_corr self.extra_stages_file = extra_stages_file self.max_i_freq = max_i_freq self.n_jmswf_subjobs = n_jmswf_subjobs self.subhost = subhost self.tpp = tpp self.logger = logger self.sibling_conformers_dict = None self.eids_dict = {} self._jmswf_launch_dir = JMSWF_LAUNCH_DIR self._jmswf_base_name = 'jaguar_multistage_workflow' self._jmswf_in_mae_file = self._jmswf_base_name + '.maegz' self._jmswf_in_txt_file = self._jmswf_base_name + '.txt' self._jmswf_out_mae_file = self._jmswf_base_name + '-out.maegz' self._jmswf_out_smap_file = self._jmswf_base_name + '-out.smap' self.has_ts = False self.rdss = None self._anharm_dir = ANHARM_LAUNCH_DIR self._ctst_dir = 'CTST_rate_constants' self.rep_keys = set() self.rep_sts = [] self.jmswf_tag = REACTION_WF_TAG
[docs] def validateRateConstants(self): """ Validate rate constants. :raise ReactionWorkflowException: if there is an issue """ sibling_conformers_dict = bin_structures_by_property( self.reaction_wf_input_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) eligible_for_rate_constants = False for sibling_group, conformer_group, first_conformer in self.representativeConformers( sibling_conformers_dict=sibling_conformers_dict): if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY): self.has_ts = True if first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY): eligible_for_rate_constants = True break if self.rate_constants and not eligible_for_rate_constants: msg = ('Rate constant calculations require child transition state ' 'structures.') raise ReactionWorkflowException(msg)
[docs] def validateAnharmonic(self): """ Validate anharmonic. :raise ReactionWorkflowException: if there is an issue """ sibling_conformers_dict = bin_structures_by_property( self.reaction_wf_input_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) if not self.has_ts and self.anharm: getters = [ get_restrain_distance_idxs, get_restrain_angle_idxs, get_restrain_dihedral_idxs ] only_restrained_opts = True for sibling_group, conformer_group, first_conformer in self.representativeConformers( sibling_conformers_dict=sibling_conformers_dict): for getter in getters: if getter(first_conformer): break else: only_restrained_opts = False if not only_restrained_opts: break if only_restrained_opts: msg = ( 'The anharmonic workflow requires frequencies but ' 'all Jaguar multistage jobs are restrained optimizations.') raise ReactionWorkflowException(msg)
[docs] def validateSubhost(self): """ Validate subhost. :raise ReactionWorkflowException: if there is an issue """ host = 'localhost' if jobcontrol.get_backend(): host_list = jobcontrol.get_backend_host_list() if host_list: host = host_list[0][0] if host != 'localhost' and self.subhost == 'localhost': msg = ('Jobs requesting a remote driver host and local ' 'subjob host are not supported.') raise ReactionWorkflowException(msg) if not self.subhost: self.subhost = host
[docs] def validate(self): """ Validate. :raise ReactionWorkflowException: if there is an issue """ self.validateRateConstants() self.validateAnharmonic() self.validateSubhost()
def _setUpAnharmonic(self): """ Set up anharmonic workflow calculations. :raise ReactionWorkflowException: if there is an issue """ # determine which JMSWF stages calculate frequencies, currently allow # for the possibility of an extra stage calculating frequencies (so # consider not only the first one or two reaction workflow stages), # for each stage collect all Jaguar *.out and *.01.in files in_files_by_stage = {} pattern = os.path.join(self._jmswf_launch_dir, '*.out') for out_path in glob.glob(pattern): jag_out = JaguarOutput(out_path) if not anharmonic.get_normal_modes(jag_out): continue out_files = [os.path.split(out_path)[1]] if not self.checkJMSWFOutputs(out_files): continue rin_path = os.path.splitext(out_path)[0] + '.01.in' if not os.path.exists(rin_path): msg = (f'The Jaguar restart input file {rin_path} can ' 'not be found.') raise ReactionWorkflowException(msg) out_file = os.path.split(out_path)[1] rin_file = os.path.split(rin_path)[1] stage_idx = get_stage_idx(out_file) in_files_by_stage.setdefault(stage_idx, []).append((out_file, rin_file)) # ensure that for each of these stages each conformer in the reaction # workflow has an anharmonic input deck for idx, all_files in in_files_by_stage.items(): if len(all_files) != len(self.rep_sts): msg = ( 'Some Jaguar output files from Jaguar Multistage Workflow ' f'stage {idx} are missing frequency data.') raise ReactionWorkflowException(msg) # prepare anharmonic subjobs in a separate directory os.makedirs(self._anharm_dir, exist_ok=True) for all_files in in_files_by_stage.values(): for files in all_files: for afile in files: apath = os.path.join(self._jmswf_launch_dir, afile) bpath = os.path.join(self._anharm_dir, afile) shutil.copy(apath, bpath)
[docs] def runAnharmonic(self): """ Run the anharmonic workflow. """ self._setUpAnharmonic() # allow multiple simultaneous anharmonic subjobs options = argparse.Namespace(TPP=self.tpp) host = f'{self.subhost}:{self.n_jmswf_subjobs}' job_dj = jaguarworkflows.create_queue(options=options, host=host) # allow multiple simultaneous Jaguar single point subjobs for each # anharmonic subjob, continue processing even if there are zero # normal modes to be treated anharmonically cmd = ['anharmonic_workflow_driver.py'] cmd += [anharmonic.FLAG_MAX_FREQ, str(self.anharm_max_freq)] cmd += [ anharmonic.FLAG_F_DATA, str(self.anharm_factor_data.start), str(self.anharm_factor_data.step), str(self.anharm_factor_data.n_points) ] cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)] cmd += [anharmonic.FLAG_PROCESS_NO_ANHARMONICITIES] cmd += [FLAG_TPP, str(self.tpp)] cmd += ['-HOST', host] # collect jobs, restart *.out files are marked with ANHARMONIC_TAG # and should be skipped here, in contrast to JMSWF which only runs # Jaguar jobs the anharmonic driver does more than just running # Jaguar jobs so currently restart each job even if all Jaguar *out # files are available, for such cases the Jaguar jobs are not restarted pattern = os.path.join(self._anharm_dir, '*.out') for out_path in glob.glob(pattern): if ANHARMONIC_TAG in out_path: continue out_file = os.path.split(out_path)[1] base_name, ext = os.path.splitext(out_file) rin_file = base_name + '.01.in' job_name = base_name + ANHARMONIC_TAG _cmd = list(cmd) _cmd += [anharmonic.FLAG_JAG_OUT_FILE, out_file] _cmd += [anharmonic.FLAG_JAG_RIN_FILE, rin_file] _cmd += ['-JOBNAME', job_name] jc_job = jaguarworkflows.RobustSubmissionJob( _cmd, command_dir=self._anharm_dir, name=job_name) job_dj.addJob(jc_job) # run jobs job_dj.run()
[docs] def processAnharmonic(self): """ Process the anharmonic workflow. :raise ReactionWorkflowException: if there is an issue """ # check output, for now require success for each conformer pattern = os.path.join(self._anharm_dir, f'*{anharmonic.OUT_EXT}') mae_paths = glob.glob(pattern) stage_idxs = set(get_stage_idx(mae_path) for mae_path in mae_paths) force_return_anharm_files = len(mae_paths) != len(stage_idxs) * len( self.rep_sts) # copy output if self.return_anharm_files or force_return_anharm_files: jobutils.add_zipfile_to_backend(self._anharm_dir) if force_return_anharm_files: msg = ('At least a single anharmonic workflow subjob has failed.') raise ReactionWorkflowException(msg) # update representative structures with anharmonic properties for relevant # stages, update keys, and store energy keys for rep_st in self.rep_sts: for stage_idx in stage_idxs: mae_path = os.path.join(self._anharm_dir, f'{rep_st.title}{STAGE_SEPARATOR}{stage_idx}{ANHARMONIC_TAG}' \ f'{anharmonic.OUT_EXT}') st = structure.Structure.read(mae_path) for key, value in st.property.items(): if key.startswith(anharmonic.KEY_STARTER): _rep_keys = self._setProperties(rep_st, key, stage_idx, value) if 'Total' in key: self.rep_keys.update(_rep_keys)
def _setRateDeterminingSteps(self): """ Set a list of rate determining steps. :raise ReactionWorkflowException: if there is an issue """ rxnwfea = ReactionWorkflowEnergyAnalysis # use conformationally averaged energies relative to parents # including cross terms to determine the rate determining step # of the reaction base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) rel_parents_file = base_name + REL_PARENTS_W_X_EXT with open(rel_parents_file, 'r') as afile: rel_parents_dict = json.load(afile) # bin (child - parent) energy differences by relevant stages to # determine the rate determining step for each stage, the energy # to use is U_0 = E_SCF + E_ZPE which is temperature independent # and thus previously algebraically averaged over conformers and # is now linearly summable, currently harmonic ZPE is used even # if modeling anharmonicities scf_energy_h = rxnwfea.getHeader(jaguarworkflows.GAS_PHASE_ENERGY_PROP) zpe_energy_h = rxnwfea.getHeader(jaguarworkflows.ZERO_POINT_ENERGY_PROP) stage_child_parent_energies_dict = {} for child_name, parent_dict in rel_parents_dict.items(): for parent_name, edict in parent_dict.items(): stage_energies_dict = {} for h, energy in edict.items(): if h.startswith(scf_energy_h) or h.startswith(zpe_energy_h): stage_idx = get_stage_idx(h) stage_energies_dict.setdefault(stage_idx, []).append(energy) for stage_idx, energies in stage_energies_dict.items(): if len(energies) == 2: stage_child_parent_energies_dict.setdefault( stage_idx, []).append((child_name, parent_name, sum(energies))) if not stage_child_parent_energies_dict: msg = ('Rate constant calulations require at least a single stage ' 'in which Jaguar frequencies are calculated.') raise ReactionWorkflowException(msg) # collect rate determining steps by sorting by energy and finding the # highest energy transition state rdss = [] for stage_idx, values in stage_child_parent_energies_dict.items(): rds = None for child_name, parent_name, energy in sorted( values, key=lambda x: x[2], reverse=True): for sibling_group, conformer_group, rep_child_conf in self.representativeConformers( specific_sibling_group=child_name): if rep_child_conf.property.get( TRANSITION_STATE_STRUCTURE_KEY): rds = SimpleNamespace( stage_idx=stage_idx, child_name=child_name, parent_name=parent_name) break if rds: rdss.append(rds) break if not rdss: msg = ('Rate constant calulations require at least a single child ' 'structure to be marked as a transition state structure.') raise ReactionWorkflowException(msg) self.rdss = rdss def _getJaguarOutputPathsChildAndParent(self, rds): """ Return paths to the Jaguar output files for child and parent conformers for the given rate determining step. :type rds: SimpleNamespace :param rds: the rate determining step :rtype: list, list :return: the child and parent conformer paths the latter of which is a list of lists of paths for each parent structure """ # among the child sibling group collect spectator masses and # for the found TS collect conformer titles child_spectator_masses = [] child_ts_conformer_titles = [] for conformer_group, conformers in self.sibling_conformers_dict[ rds.child_name].items(): first_conformer = conformers[0] if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY): for conformer in conformers: child_ts_conformer_titles.append(conformer.title) else: child_spectator_masses.append( _get_molecular_weight(first_conformer)) # isolate spectators and determine which parent siblings # made the TS binned_parent_conformer_titles = {} for conformer_group, conformers in self.sibling_conformers_dict[ rds.parent_name].items(): first_conformer = conformers[0] parent_mass = _get_molecular_weight(first_conformer) if parent_mass in child_spectator_masses: child_spectator_masses.remove(parent_mass) else: for conformer in conformers: binned_parent_conformer_titles.setdefault( conformer_group, []).append(conformer.title) binned_parent_conformer_titles = \ list(binned_parent_conformer_titles.values()) get_paths = lambda x: [os.path.join(self._jmswf_launch_dir, title + STAGE_SEPARATOR + str(rds.stage_idx) + '.out') for title in x] child_paths = get_paths(child_ts_conformer_titles) binned_parent_paths = [ get_paths(titles) for titles in binned_parent_conformer_titles ] return child_paths, binned_parent_paths def _setUpCTST(self): """ Set up canonical transition state theory calculations. """ self._setRateDeterminingSteps() # the Wigner tunneling correction is computed from the imaginary # frequency of the transition state and ultimately multiples its # partition function to alter the rate constant wigner_tunnel_corr = 'yes' if self.wigner_tunnel_corr else 'no' temps = ' '.join([ str(self.temp_data.temp_start + i * self.temp_data.temp_step) for i in range(0, self.temp_data.temp_n) ]) # collect all required CTST jobs get_conf_title = lambda x: fileutils.get_basename(x).split(STAGE_SEPARATOR)[0] for rds in self.rdss: child_paths, binned_parent_paths = self._getJaguarOutputPathsChildAndParent( rds) rds.jobs = [] for child_path in child_paths: child_title = get_conf_title(child_path) for parent_paths in itertools.product(*binned_parent_paths): parent_title = '_'.join( get_conf_title(parent_path) for parent_path in parent_paths) job_name = (f'tst{STAGE_SEPARATOR}{rds.stage_idx}_' f'{parent_title}_{child_title}') cmd = [os.path.join(os.environ['SCHRODINGER'], 'run')] cmd += ['tst_rate_startup.py'] cmd += ['-wigner_tunnel_corr', wigner_tunnel_corr] cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)] cmd += ['-temperature', temps] cmd += ['-COMPLEX_OUTPUT', child_path] cmd += ['-COMPLEX_TRV', 'trv'] if self.anharm: base_name = fileutils.get_basename(child_path) mae_path = os.path.join( self._anharm_dir, f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}') cmd += ['-COMPLEX_ANHARM_OUTPUT_MAE', mae_path] for idx, parent_path in enumerate(parent_paths): cmd += [f'-REAGENT_OUTPUT_{idx}', parent_path] cmd += [f'-REAGENT_TRV_{idx}', 'trv'] if self.anharm: base_name = fileutils.get_basename(parent_path) mae_path = os.path.join( self._anharm_dir, f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}' ) cmd += [ f'-REAGENT_ANHARM_OUTPUT_MAE_{idx}', mae_path ] cmd += ['-DEBUG'] # enable verbose logging cmd += [job_name] rds.jobs.append(cmd)
[docs] def runCTST(self): """ Run canonical transition state theory calculations to determine rate constant(s) for the rate determining step of the reaction. """ self._setUpCTST() # see MATSCI-6611 - originally jobs were launched simultaneously on # self.subhost using a JobDJ filled with SubprocessJob types but it # was found that starting the first of such jobs was hanging # indefinitely on Windows and so it was switched to use subprocess # (so running on the original -HOST) in serial for rds in self.rdss: rds.stdouts = [] for job in rds.jobs: try: stdout = subprocess.check_output( job, stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError: stdout = '' rds.stdouts.append(stdout)
[docs] def prepareCTSTOutput(self): """ Prepare CTST output. :raise ReactionWorkflowException: if there is an issue """ # process raw output data failed_stage_idx = None for rds in self.rdss: temp_data_dict = {} for stdout in rds.stdouts: for line in stdout.splitlines(): if line.startswith('{'): data = yaml.safe_load(line.strip()) temperature = float(data['Temperature'][0]) energy = float(data['Energy Barrier'][0]) r_partition_f = float( data['Reagents Partition Function'][0]) ts_partition_f = float( data['Complex Partition Function'][0]) mantissa, order_of_magnitude = data[ 'TST Rate Constant'][0].split(' x ') rate_constant = float(mantissa) * eval( order_of_magnitude.replace('^', '**')) rate_constant_units = data['TST Rate Constant'][1] data = (energy, r_partition_f, ts_partition_f, rate_constant, rate_constant_units) temp_data_dict.setdefault(temperature, []).append(data) if not temp_data_dict: failed_stage_idx = rds.stage_idx break for temp, data in temp_data_dict.items(): energies, r_partition_fs, ts_partition_fs, rate_constants, \ rate_constant_units = [numpy.array(i) for i in zip(*data)] # energies are temperature independent so perform an algebraic # average avg_energy = numpy.mean(energies) # Boltzmann average the temperature dependent partition functions # and rate constants factors = numpy.exp(-1 * energies / (IDEAL_GAS_CONSTANT * temp)) partition_f = sum(factors) avg_r_partition_f = numpy.dot(r_partition_fs, factors) / partition_f avg_ts_partition_f = numpy.dot(ts_partition_fs, factors) / partition_f avg_rate_constant = numpy.dot(rate_constants, factors) / partition_f temp_data_dict[temp] = (avg_energy, avg_r_partition_f, avg_ts_partition_f, avg_rate_constant, rate_constant_units[0]) rds.temp_data_dict = temp_data_dict # process raw output files if failed_stage_idx is not None or self.return_rate_constant_files: os.makedirs(self._ctst_dir, exist_ok=True) for rds in self.rdss: for job, stdout in zip(rds.jobs, rds.stdouts): log_file = os.path.join(self._ctst_dir, job[-1] + '.log') with open(log_file, 'w') as afile: afile.write(stdout) jobutils.add_zipfile_to_backend(self._ctst_dir) if failed_stage_idx: msg = ( 'All rate constant calculations for stage ' f'{failed_stage_idx} have failed for all child and parent ' 'conformers.') raise ReactionWorkflowException(msg) # process summary output file base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) rate_constants_file = base_name + RATE_CONSTANTS_W_X_EXT with csv_unicode.writer_open(rate_constants_file) as afile: writer = csv.writer(afile) writer.writerow([ 'Stage_Index', 'Parent_Sibling_Group_Name', 'Child_Sibling_Group_Name', 'Temperature/K', 'Conf_Avg_SCF+ZPE_Energy_Barrier/kcal/mol', 'Conf_Avg_Reactants_Partition_F', 'Conf_Avg_TS_Partition_F', 'Conf_Avg_CTST_Rate_Constant', 'CTST_Rate_Constant_Units' ]) for rds in self.rdss: for temp, data in rds.temp_data_dict.items(): writer.writerow( [rds.stage_idx, rds.parent_name, rds.child_name, temp] + list(data)) backend = None jobutils.add_outfile_to_backend(rate_constants_file, backend)
[docs] def run(self): """ Run the reaction workflow. :raise ReactionWorkflowException: if there is an issue """ zip_files = jaguar_restart.get_restart_zip_files( base_names=[JMSWF_LAUNCH_DIR, ANHARM_LAUNCH_DIR]) jaguar_restart.prepare_restart_dirs(zip_files) self.validate() try: self.createConformers(self.reaction_wf_input_sts) except ConformationalSearchException as err: raise ReactionWorkflowException(str(err)) try: self._overwriteConformerEIDS() self.runJMSWF() self.prepareJMSWFOutput() self.setRepresentatives() except JMSWFException as err: raise ReactionWorkflowException(str(err)) if self.anharm: self.runAnharmonic() self.processAnharmonic() out_mae_file = self.finalizeJMSWFOutput() ReactionWorkflowEnergyAnalysis(out_mae_file, self.rep_keys).writeDataFiles() if self.rate_constants: self.runCTST() self.prepareCTSTOutput()