"""
Desmond workup techniques, all based on actual numerical results. Intended for
import and use as a test workup.
Running directly allows execution of any of the workup methods
Methods for comparing the following Desmond-style information:
- `Metadynamics FES files<desmond_compare_FES>`
- `FEP energies from FEP 'result' files<desmond_FEP_energies>`
- Surface area of membrane systems
- .ene files:
- `similarity to a reference ene file<desmond_compare_ene>`
- log files: `Calculation speed (ns/day)<desmond_speed>`
- `st2 files<desmond_st_stats>`: comparison of median, mean, and standard
deviation to reference values
- `deltaE.txt files<desmond_compare_deltaE>`: compare two deltaE.txt files
Classes and helper functions have 'private' names, as only the workup methods
are intended to be imported and used as test workups.
$Revision 0.1 $
@TODO: improve speed test to print avg ns/day.
@TODO: improve speed test to be dependent on cpu^0.8
@copyright: (c) Schrodinger, Inc. All rights reserved.
"""
import glob
import os
import re
import sys
import tarfile
import gzip
from collections import Counter
from collections import defaultdict
from functools import partial
from past.utils import old_div
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import numpy
import pytest
from more_itertools import pairwise
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import stage
from schrodinger.utils import mmutil
from schrodinger.utils import sea
from . import standard_workups
if TYPE_CHECKING:
from schrodinger.application.scisol.packages.fep import graph
_version = "$Revision 0.1 $"
REPLICA = re.compile(r"\[T(\d+), *T(\d+)\] *: *\[(.+), *(.+)\]")
# *****************************************************************************
# METADYNAMICS
def _FES(filename):
"""Returns a numpy array based on a FES format file."""
fes1 = []
with open(filename) as f:
for line in f:
line = line.strip()
if "#" in line:
continue
elif line:
fes1.append([float(x) for x in line.split()])
return numpy.array(fes1)
[docs]def desmond_compare_FES(file1, file2, tolerance=0.1):
"""
Compare metadynamics FES files.
Tolerance is optional, and defaults to 0.1
"""
fes1 = _FES(file1)
fes2 = _FES(file2)
try:
rmsd = fes2 - fes1
rmsd = rmsd * rmsd
rmsd = old_div(numpy.sum(rmsd.flat), len(fes1.flat))
rmsd = numpy.sqrt(rmsd)
if rmsd > tolerance:
msg = "FES were different, RMSD=%.2f" % rmsd
raise AssertionError(msg)
return True
except ValueError:
msg = "FES shapes were different!"
raise AssertionError(msg)
#THIS CODE WILL GENERATE A FES:
#from schrodinger.application.desmond.meta import MetaDynamicsAnalysis
#meta_data = MetaDynamicsAnalysis(, inp_fname=opts.input)
#FES = meta_data.computeFES(outputname)
#
#alternately, just add an analysis stage to a metadynamics job!
# *****************************************************************************
# FEP
[docs]def desmond_FEP_energies(filename, reference, tolerance=0.5):
"""
Compares the deltaF in an FEP result file to a standard.
"""
with open(filename) as result:
for line in result:
if "deltaF" in line:
delta_f = float(line.split()[3])
if abs(delta_f - reference) > tolerance:
msg = " Different deltaF: {} vs {} ".format(
delta_f, reference)
raise AssertionError(msg)
else:
return True
msg = " deltaF not found in file %s" % filename
raise AssertionError(msg)
# *****************************************************************************
# Membrane Systems
[docs]def desmond_check_surface_area(filename,
reference,
tolerance=0.5,
side1="r_chorus_box_ax",
side2="r_chorus_box_bx"):
"""
Compares surface area to a standard with optional tolerance and optional
choice of box sides. Default sides are a and b.
example usage: desmond_check_surface_area('filename', 7.5, tolerance=0.1)
"""
struct = next(structure.MaestroReader(filename))
side1 = struct.property[side1]
side2 = struct.property[side2]
area = side1 * side2
if old_div(abs(area - reference), reference) > tolerance:
msg = f" Areas were different: {area} vs {reference}"
raise AssertionError(msg)
return True
[docs]def desmond_tgz_file_list(tgz_fname, *fnames_to_check):
import tarfile
with tarfile.open(tgz_fname) as tf:
am = tf.getmembers()
mn = [m.name for m in am]
fnames_to_check = list(fnames_to_check)
fnames_to_check.sort()
mn.sort()
if mn == fnames_to_check:
return True
msg = ' '.join((mn, fnames_to_check))
raise AssertionError(msg)
# *****************************************************************************
# .ene files
class _EneFile:
"""
Class to find and contain the information in a Desmond .ene file. Also
does comparisons based on:
- compare initial and final energies to reference
- compare temperature profiles (useful for SAmd)
- number of time points and their separation
"""
def __init__(self, filename=None, tol=10.0):
self.eneData = {}
self.eneUnits = {}
self.tol = tol
if filename:
self.load_system(filename)
else:
self._filename = None
def filename(self, filename=None):
"""
Gets and sets the filename. Prevents having a loaded system which
does not match the file name.
"""
if filename:
self.load_system(filename)
else:
return self._filename
def load_system(self, filename):
"""Loads the information from file filename into the EneFile object."""
self._filename = filename
name_rows = [] #list describes the property name in each row
with open(self._filename) as ene_file:
for line in ene_file:
if "#" == line[0]:
array = line[1:].split()
if '0' == array[0][0]:
for elem in array:
if ":" in elem:
name = elem.split(':')[-1]
name_rows.append(name)
self.eneData[name] = []
elif "(" == elem[0] and ")" == elem[-1]:
#Add the name of the unit based on the most
#recent quantity name found
self.eneUnits[name_rows[-1]] = elem[1:-1]
else:
raise Exception(
"unknown row type in row keys list")
elif name_rows:
for i, datum in enumerate(line.split()):
self.eneData[name_rows[i]].append(float(datum))
def quantity_average(self, quantity):
"""
Finds the average of a quantity.
"""
return numpy.average(self.eneData[quantity])
def compare_units(self, other):
"""
Compares two EneFile objects based on whether the quantities are
measured in the same units in both files.
"""
for value in set(list(self.eneUnits) + list(other.eneUnits)):
if self.eneUnits.get(value, "") != other.eneUnits.get(value, ""):
msg = (" Units did not match for %s (%s vs %s)" %
(value, self.eneUnits.get(value, ""),
other.eneUnits.get(value, "")))
raise AssertionError(msg)
return True
def compare_quantity(self, other, quantity):
"""
Compares two EneFile objects based on the initial and final and average value of a
named quantity.
"""
max_diff = 0.1**self.tol
if abs(self.eneData[quantity][0] - other.eneData[quantity][0]
) > max_diff:
msg = " Initial values for {} differ: {} vs {}".format(
quantity, self.eneData[quantity][0], other.eneData[quantity][0])
raise AssertionError(msg)
if abs(self.eneData[quantity][-1] - other.eneData[quantity][-1]
) > max_diff:
msg = " Final values for {} differ: {} vs {}".format(
quantity, self.eneData[quantity][-1],
other.eneData[quantity][-1])
raise AssertionError(msg)
if abs(
self.quantity_average(quantity) -
other.quantity_average(quantity)) > max_diff:
msg = " Average values for {} differ: {} vs {}".format(
quantity, self.quantity_average(quantity),
other.quantity_average(quantity))
raise AssertionError(msg)
return True
def compare_times(self, other):
"""
Compares two EneFile objects based on the number of points in each
simulation as well as the final time point.
"""
r_value = (len(self.eneData['time']) == len(other.eneData['time']) and
self.eneData['time'][-1] == other.eneData['time'][-1])
if not r_value:
raise AssertionError(" Time profiles did not match")
return True
def compare_temperature_profile(self, other):
"""
Compares two EneFile objects based on the temperatures at each
timepoint.
"""
temps_self = numpy.array(self.eneData["T"])
temps_other = numpy.array(other.eneData["T"])
diffs = numpy.abs(temps_self - temps_other)
if max(diffs) > 40:
msg = " The maximum difference in Temperature at any given time"\
" point was large: {}".format(max(diffs))
raise AssertionError(msg)
return True
[docs]def desmond_compare_ene(file1, file2, tolerance, *quantities):
"""
Compares two Desmond ene files.
Checks that any quantities requested from the ene file match
within some tolerance number of decimal points.
usage desmond_compare_ene('file1.ene', 'file2.ene', 2, 'P', 'E_c')
"""
ene1 = _EneFile(file1, tolerance)
ene2 = _EneFile(file2, tolerance)
successes = list(map(partial(ene1.compare_quantity, ene2), quantities))
return all(successes)
[docs]def desmond_compare_eneseqs(file1,
file2,
*,
columns=None,
min_length=10,
atol=1e-2,
rtol=1e-5,
mean_atol=1e-4,
mean_rtol=1e-5):
"""
Compare two columns between two Desmond ene files at matching time points.
:param columns: Column names to compare. Use all by default.
:type columns: sequence of str
:param min_length: Require at least that many matching time points.
:type min_length: int
:param atol: Absolute tolerance (see `numpy.isclose`).
:type atol: float
:param rtol: Relative tolerance (see `numpy.isclose`).
:type rtol: float
:param mean_atol: Absolute tolerance for mean values (see `numpy.isclose`).
:type mean_atol: float
:param mean_rtol: Relative tolerance for mean values (see `numpy.isclose`).
:type mean_rtol: float
"""
TIME_COLUMN = 'time'
ene1 = _EneFile(file1)
ene2 = _EneFile(file2)
if columns is None:
assert ene1.eneData.keys() == ene2.eneData.keys(), \
" Column names do not match"
columns = ene1.eneData.keys() - {TIME_COLUMN}
else:
assert set(columns) <= ene1.eneData.keys(), \
" Some of the columns are missing in file1"
assert set(columns) <= ene2.eneData.keys(), \
" Some of the columns are missing in file2"
for (label, sequence) in (('file1', ene1.eneData[TIME_COLUMN]),
('file2', ene1.eneData[TIME_COLUMN])):
assert all(a < b for a, b in pairwise(sequence)), \
f" Non-monotonic time values in {label}"
common_times, idx1, idx2 = numpy.intersect1d(
ene1.eneData[TIME_COLUMN],
ene2.eneData[TIME_COLUMN],
assume_unique=True,
return_indices=True)
num_common = len(idx1)
assert min_length > 0
assert num_common >= min_length, \
f" Too few matching time points: {num_common} < {min_length}"
for c in sorted(columns):
x1 = numpy.asarray(ene1.eneData[c])[idx1]
x2 = numpy.asarray(ene2.eneData[c])[idx2]
close = numpy.isclose(x1, x2, atol=atol, rtol=rtol)
if not close.all():
not_close = [arr[~close] for arr in (common_times, x1, x2)]
print(f"Divergent values of '{c}' at:")
for t_, x1_, x2_ in zip(*not_close):
print(f"t={t_}: {x1_} vs {x2_}")
assert close.all(), \
f" Divergent values of '{c}', see stdout for details"
mean1, mean2 = x1.mean(), x2.mean()
assert numpy.isclose(mean1, mean2, atol=mean_atol, rtol=mean_rtol), \
f" Divergent values of mean('{c}'): {mean1} vs {mean2}"
# *****************************************************************************
# log files
[docs]def desmond_speed(filename, reference_rate, tolerance=0.05):
"""
Compares the average rate per step in ns/day against a standard.
example usage: desmond_speed('file1.log', 7.5, tolerance=0.1)
:param reference_rate: Value to be compared against in ns/day
:param tolerance: Tolerance. Default is 5%.
"""
with open(filename) as logfile:
for line in logfile:
if "Total rate per step" in line:
rate = float(line.split()[-2])
if old_div(abs(rate - reference_rate),
reference_rate) > tolerance:
msg = (" Rates: new: %.1f, reference: %.1f" %
(rate, reference_rate))
raise AssertionError(msg)
else:
return True
msg = " Total rate per step not found."
raise AssertionError(msg)
[docs]def desmond_stage_time(log_file, stage_num, max_time):
"""
:param log_file : Name of log file to be analyzed.
:type log_file : str
:param stage_num : Stage number
:type stage_num : int
:param max_time : Maximum acceptable time for the stage in seconds
:type max_time : float
:return : `True` if the stage completed successfully within `max_time` seconds, or `False` if not.
"""
search_str = 'Stage %d duration:' % stage_num
try:
with open(log_file) as file_data:
stage_duration = [
line.strip(search_str)
for line in file_data
if re.search(search_str, line)
][-1]
except IndexError:
return False
with open(log_file) as fh:
multisim_complete = fh.read().find(
"\nStage %d completed successfully." % stage_num) != -1
h, m, s = [e[:-1] for e in stage_duration.split()]
duration = float(h) * 3600 + float(m) * 60 + float(s)
return ((duration < max_time) and multisim_complete)
# *****************************************************************************
# st2 files.
#compare averages and ranges.
[docs]def desmond_st_stats(filename, *options):
"""
Compares the mean, median and/or standard deviation of a desmond st2 file to
reference values.
usage: desmond_st_stats('file.st2', 'mean=1.2', 'median=2.3', 'stddev=1.2', 'tol=0.2')
"""
with open(filename) as fh:
cfg = sea.Map(fh.read())
meanRef, medianRef, stddevRef = None, None, None
tolerance = 0.4
for a in options:
key, val = a.split("=")
val = float(val)
if key.lower() == "mean":
meanRef = val
elif key.lower() == "median":
medianRef = val
elif key.lower() == "stddev":
stddevRef = val
elif key.lower()[:3] == "tol":
tolerance = val
try:
rvalue = True
values = []
for st in cfg.Keywords:
for key in st:
if key == "Angle" or key == "Torsion":
values = [x.val for x in st[key]["Result"]]
values = _normalize_angular_values(values)
else:
values = [x.val for x in st[key]["Result"]]
errors = []
if meanRef:
_compare_stats(values, meanRef, tolerance, numpy.mean, "Mean",
errors)
if medianRef:
_compare_stats(values, medianRef, tolerance, numpy.median, "Median",
errors)
if stddevRef:
_compare_stats(values, stddevRef, tolerance, numpy.std,
"Standard deviation", errors)
if errors:
raise AssertionError('\n'.join(errors))
except Exception as e:
raise AssertionError(str(e))
def _compare_stats(values, reference, tolerance, function, name, errors):
value = function(values)
if abs(reference - value) > tolerance:
msg = f" {name} differs: {value} vs. {reference}"
errors.append(msg)
def _normalize_angular_values(array):
"""
Centers an array of angles in the unit of degree to minimize its standard
deviation.
:param array : Array to be (possibly) modified.
:type array : list
:return : Values, some of which (possibly) offset (by 360 deg) to minimize std dev.
:rtype : list
"""
good_array = array
stddev = numpy.std(good_array)
for shift in numpy.arange(0, 360, 2.7):
tmp = list(((elem + shift) % 360 - shift) for elem in array)
if numpy.std(tmp) < stddev:
good_array = tmp
stddev = numpy.std(good_array)
good_array -= (numpy.mean(good_array) // 360) * 360
return good_array.tolist()
[docs]def read_dE_file(fname):
"""
Read a dE file and return a dict from time to energy_dict;
energy_dict is a dict from (lambda_win, lambda_win) to energy value
"""
dic, energy = {}, {}
with open(fname) as fh:
for line in fh:
line = line.strip()
if 'Time' == line[:4]:
time = line[6:]
dic = energy[time] = {}
elif '[' == line[0]:
match = REPLICA.match(line)
if match:
r0, r1, ef, er = match.group(1, 2, 3, 4)
r0 = int(r0)
r1 = int(r1)
dic[(r0, r1)] = ef
dic[(r1, r0)] = er
return energy
[docs]def desmond_compare_deltaE(file1, file2):
"""
Compares two Desmond deltaE.txt files.
Checks time and energy. No tolerance (exact sameness).
usage: desmond_compare_deltaE('file1', 'file2')
"""
energy1 = read_dE_file(file1)
energy2 = read_dE_file(file2)
return energy1 == energy2
"""
if len(energy1) != len(energy2):
return False
for k, d1 in energy1.iteritems():
try:
d2 = energy2[k]
except KeyError:
return False
if len(d1) != len(d2):
return False
for l, e1 in d1.iteritems():
try:
e2 = d2[l]
except KeyError:
return False
if e1 != e2:
return False
return True
"""
[docs]def desmond_compare_energy_group(file1, file2, tolerance=0):
"""
Compares two Desmond enegrp.dat files.
Checks all values upto tolerance value.
usage: desmond_compare_energy_group('file1', 'file2', tolerance=0)
"""
def compare_value_line(l1, l2):
k1 = l1[:l1.index('(')].strip()
k2 = l2[:l2.index('(')].strip()
v1 = l1[l1.index(')') + 1:].strip()
v2 = l2[l2.index(')') + 1:].strip()
return k1 == k2 and my_compare(v1, v2)
def compare_time_line(l1, l2):
d1 = read_time_line(l1)
d2 = read_time_line(l2)
if len(d1) != len(d2):
return False
for k, v1 in d1.items():
try:
v2 = d2[k]
except KeyError:
return False
if not my_compare(v1, v2):
return False
return True
def read_time_line(l):
d = {}
for s in l.split():
k, v = s.split('=')
d[k] = v
return d
def my_compare(v1, v2):
v1 = float(v1)
v2 = float(v2)
return abs(v1 - v2) <= tolerance
with open(file1) as fh1,\
open(file2) as fh2:
for i, (line1, line2) in enumerate(zip(fh1, fh2)):
if '(' in line1:
if not compare_value_line(line1, line2):
msg = f'line {i} {line1} differ from {line2}'
raise AssertionError(msg)
else:
if not compare_time_line(line1, line2):
msg = f'line {i} {line1} differ from {line2}'
raise AssertionError(msg)
return True
[docs]def desmond_test_mean_num_waters(tarname, limits):
"""
Check that the mean number of waters in the simulation is within the given
limits.
:param tarname: the input tar file name
:type tarname: str
:param limits: a 2-tuple containing the min and max values
:type limits: tuple
"""
with tarfile.open(tarname) as tarball:
for name in tarball.getnames():
if name.endswith('.log'):
break
else:
raise ValueError("No logfile found")
# Read the average number of waters at the end of each GCMC plugin
# iteration from the logfile of the GCMC simulation.
nwaters = []
# Format: Total solvent in simulation box: <N> Average: <x>
check_phrase = 'Total solvent in simulation box: '
# compatibility with quiet format:
# GCMC Moves accepted ... Total solvent molecules: <N>
check_phrase_quiet = 'GCMC moves accepted: '
with tarball.extractfile(name) as f:
for l in f:
# extractfile doesn't give decoded output
decoded = l.decode('utf-8')
if decoded.startswith(check_phrase):
# Extract the average number of particles at the end of
# a given GCMC plugin iteration
nwaters.append(float(decoded.split()[-1]))
elif decoded.startswith(check_phrase_quiet):
# Extract the instantaneous number of particles
# (not the average) at regular intervals
nwaters.append(int(decoded.split()[-1]))
# Average over possibly many GCMC plugin iterations
mean_num_waters = numpy.mean(nwaters)
nmin, nmax = limits
return nmin < mean_num_waters < nmax
[docs]def desmond_check_fep_master_msj(master_msj_fname: str,
fep_type: constants.FEP_TYPES):
"""
Basic checks for the master msj.
:param master_msj_fname: The input master msj file name.
:param fep_type: Type of fep used to run the job.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
legs = (Legtype.COMPLEX.value, Legtype.SOLVENT.value)
job_name = os.path.splitext(os.path.basename(master_msj_fname))[0]
msj_as_sea = cmj.msj2sea(master_msj_fname)
found_mapper = False
found_launcher = False
found_fep_cleanup = False
found_protein_generator = False
mapper_name = {
constants.FEP_TYPES.SMALL_MOLECULE: stage.FepMapper.NAME,
constants.FEP_TYPES.METALLOPROTEIN: stage.FepMapper.NAME,
constants.FEP_TYPES.COVALENT_LIGAND: stage.CovalentFepMapper.NAME,
constants.FEP_TYPES.PROTEIN_STABILITY: stage.ProteinFepMapper.NAME,
constants.FEP_TYPES.PROTEIN_SELECTIVITY: stage.ProteinFepMapper.NAME,
constants.FEP_TYPES.LIGAND_SELECTIVITY: stage.ProteinFepMapper.NAME,
}
has_fragment_linking = fep_type == constants.FEP_TYPES.SMALL_MOLECULE
for s in msj_as_sea.stage:
msg_base = f"{master_msj_fname}: {s.__NAME__}"
if s.__NAME__ == mapper_name[fep_type]:
found_mapper = True
# Check mapper args
if fep_type == constants.FEP_TYPES.SMALL_MOLECULE:
pass
elif fep_type == constants.FEP_TYPES.COVALENT_LIGAND:
assert len(s.graph_file.val), \
f"{msg_base}.graph_file is not set."
elif fep_type == constants.FEP_TYPES.METALLOPROTEIN:
assert len(s.mp.val), \
f"{msg_base}.mp is not set."
assert sum(['i_fep_' in v for v in s.mp.val]), \
f"{msg_base}.mp is not set correctly, missing 'i_fep_' in name."
elif fep_type in constants.PROTEIN_FEP_TYPES:
assert s.fep_type.val == fep_type, \
f"{msg_base}.fep_type is not set correctly. Found {s.fep_type.val}, expected {fep_type}."
if fep_type in [
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY
]:
assert s.asl.val, \
f"{msg_base}.asl is empty for selectivity calculation."
else:
assert False, f"Unknown fep_type: {fep_type}"
elif s.__NAME__ == stage.FepLauncher.NAME:
found_launcher = True
for ileg, leg in enumerate(legs):
expected_msj = {
constants.SIMULATION_PROTOCOL.CHARGED: f'{job_name}_chg_{leg}.msj',
constants.SIMULATION_PROTOCOL.FORMALCHARGED: f'{job_name}_chg_{leg}.msj',
constants.SIMULATION_PROTOCOL.COREHOPPING: f'{job_name}_corehopping_{leg}.msj',
constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING: f'{job_name}_corehopping_{leg}.msj',
constants.SIMULATION_PROTOCOL.DEFAULT: f'{job_name}_{leg}.msj',
}
if has_fragment_linking:
expected_msj[
constants.SIMULATION_PROTOCOL.
FRAGMENT_LINKING] = f'{job_name}_fragment_linking_{leg}.msj'
for name, msj_fname in expected_msj.items():
assert name in s.dispatch.val, f"Missing {name} dispatcher."
assert msj_fname in s.dispatch[name].val[
ileg], f"Missing {msj_fname} in {name} dispatcher."
assert s.dispatch.val.keys() == expected_msj.keys(), \
f"Additional dispatchers found: " \
f"{set(s.dispatch.val.keys()) - set(expected_msj.keys())}, " \
"please update desmond_workups and the stu tests."
if has_fragment_linking:
for ileg, leg in enumerate(constants.FRAGMENT_LINKING_JOBS.VAL):
msj_fname = f'{job_name}_fragment_linking_{leg}.msj'
assert msj_fname in s.dispatch[name].val[
ileg], f"Missing {msj_fname} in {name} dispatcher."
elif s.__NAME__ == stage.ProteinMutationGenerator.NAME:
assert fep_type in constants.PROTEIN_FEP_TYPES, f"{msg_base}: not a protein fep calculation!"
assert s.mutation_file.val, f"{msg_base}.mutation_file is empty."
found_protein_generator = True
found_fep_cleanup = found_fep_cleanup or s.__NAME__ == stage.FepMapperCleanup.NAME
assert found_mapper, "Mapper stage not found"
assert found_launcher, "Launcher stage not found"
assert found_fep_cleanup, "Cleanup stage not found"
if fep_type in constants.PROTEIN_FEP_TYPES:
assert found_protein_generator, "Protein mutation generator stage not found"
return True
_EXPECTED_LAMBDAS = {
constants.SIMULATION_PROTOCOL.CHARGED: 'charge:24',
constants.SIMULATION_PROTOCOL.FORMALCHARGED: 'charge:24',
constants.SIMULATION_PROTOCOL.COREHOPPING: 'flexible:16',
constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING: 'flexible:16',
constants.SIMULATION_PROTOCOL.DEFAULT: 'default:12',
constants.SIMULATION_PROTOCOL.FRAGMENT_LINKING: 'flexible:16',
constants.FRAGMENT_LINKING_JOBS.VAL.FRAGMENT_HYDRATION: 'default:24'
}
def _check_edge_sid(edge: "graph.Edge", exp_num_lambdas_complex: int,
exp_num_lambdas_solvent: int):
"""
Check the edge's sid report to check for the appropriate number of
replicas for both the complex and solvent legs
Related cases: DESMOND-8836, DESMOND-8811, DESMOND-8805,
DESMOND-8674, DESMOND-8243, DESMOND-8224, DESMOND-8208.
:param edge: graph.Edge object to check
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
fep_type = edge.graph.fep_type
legs = (Legtype.COMPLEX.value, Legtype.SOLVENT.value)
sids = [edge.complex_sid, edge.solvent_sid]
expected_num_replicas = [exp_num_lambdas_complex, exp_num_lambdas_solvent]
for leg, sid, exp_num in zip(legs, sids, expected_num_replicas):
assert sid and len(
sid) > 0, f"Edge {edge.short_id} is missing {leg}_sid."
sid_obj = sea.Map(sid)
checked_fep_simulation = False
checked_num_replicas = False
for k in sid_obj['Keywords']:
if 'FEPSimulation' in k:
assert k['FEPSimulation']['PerturbationType'].val == fep_type, \
f'Could not find "{fep_type}" in {leg} sid perturbation_type for {edge.short_id}.'
checked_fep_simulation = True
if 'Replica' in k:
# DESMOND-8811
assert len(k['Replica'].val) == exp_num, \
f"Job ran with wrong number of lambdas. Found {len(k['Replica'].val)}, " \
f"expected {exp_num}."
checked_num_replicas = True
assert checked_fep_simulation, "Did not find FEPSimulation in {edge.short_id} sid."
assert checked_num_replicas, "Did not find Replica in {edge.short_id} sid."
def _check_edge_parched(edge: "graph.Edge", min_frames=21):
"""
Check the parched trajectories and representative structures
for a given edge.
Related cases: DESMOND-8839, DESMOND-8819, DESMOND-8812, DESMOND-8596,
DESMOND-8499, DESMOND-8279, DESMOND-8198.
:param edge: graph.Edge object to check
:param min_frames: Minimum number of frames in the parched trajectories.
Default is 21 for the number of frames in a
standard 5 ns FEP job.
"""
from schrodinger.application.desmond.packages import traj_util
from schrodinger.application.scisol.packages.fep.graph import Legtype
legs = (Legtype.COMPLEX, Legtype.SOLVENT)
assert edge.graph.fmpdb is not None, "graph has no fmpdb file."
if edge.graph.fep_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
lambdas = [0]
else:
lambdas = [0, 1]
edge.graph.fmpdb.open('.')
try:
fmpdb_fname = edge.graph.fmpdb.filename
for leg in legs:
trajs = edge.trajectories(leg)
assert trajs, f"{fmpdb_fname} is missing the trajectory for {edge.short_id} {leg.value}."
for ilambda in lambdas:
lambda_msys, lambda_cms, lambda_traj = traj_util.read_cms_and_traj(
trajs[ilambda][0])
try:
assert len(lambda_traj) >= min_frames, \
f"Trajectory for lambda {ilambda}, edge {edge.short_id}, leg {leg.value} has {len(lambda_traj)} frames, expected at least {min_frames} frames."
for (ct, _) in traj_util.extract_atoms(
lambda_cms,
f'water and not (atom.{constants.ALCHEMICAL_ION} or atom.{constants.CT_TYPE} "{constants.CT_TYPE.VAL.ALCHEMICAL_WATER}")',
lambda_traj):
assert len(ct.molecule) == 200, \
f"Trajectory for lambda {ilambda}, edge {edge.short_id}, leg {leg.value} "\
f" has wrong number of waters. Found {len(ct.molecule)}, expected 200."
except AssertionError:
raise
finally:
if lambda_traj:
# Close the reader to close the filehandle,
# otherwise fmpdb.close() can fail.
# DESMOND-8868
lambda_traj[0].source().close()
# Check representative structure (complex leg only).
leg = Legtype.COMPLEX
repr_fnames = edge.representative_strucs(leg)
assert repr_fnames is not None, f"{fmpdb_fname} is missing the representative struc for {edge.short_id} {leg.value}."
for ilambda in lambdas:
lambda_repr_ct = structure.Structure.read(repr_fnames[ilambda])
assert lambda_repr_ct.atom_total > 0, \
f"Representative structure for lambda {ilambda}, edge {edge.short_id}, leg {leg.value}, has no atoms."
except AssertionError:
raise
finally:
edge.graph.fmpdb.close()
def _check_graph_environment(g: "graph.Graph", membrane=False):
"""
Check the graph environment structures.
:param g: The graph object to check.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
"""
from schrodinger.application.scisol.packages.fep import fepmae
if g.fep_type == constants.FEP_TYPES.SMALL_MOLECULE:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.METALLOPROTEIN:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.COVALENT_LIGAND:
environments = []
elif g.fep_type == constants.FEP_TYPES.PROTEIN_STABILITY:
environments = []
elif g.fep_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
environments = ["receptor"]
else:
raise ValueError(f'unknown fep_type: {g.fep_type}')
if membrane:
environments.extend(["membrane", "solvent"])
for environment in environments:
env_ct = getattr(g, f'{environment}_struc')
assert env_ct is not None, f"graph {environment}_struc is missing."
results = fepmae.filter_receptors_and_ligands([env_ct])
results_dict = {
'receptor': results[0],
'solvent': results[1],
'membrane': results[2],
'ligands': results[3]
}
# For ligand selectivity, the ligand is stored in the receptor_struc
if environment == "receptor" and g.fep_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
assert len(results_dict['ligands']) == 1, \
f"{g.fep_type} should have one ligand as the receptor. Found {len(results_dict['ligands'])} ligand(s). {results_dict}"
results_dict['receptor'] = results_dict['ligands'][0]
results_dict['ligands'] = []
for k, v in results_dict.items():
if k == environment:
found_ct = results_dict[environment]
found_title = found_ct.title if hasattr(found_ct,
'title') else found_ct
env_title = env_ct.title if hasattr(env_ct, 'title') else env_ct
assert env_ct is v, \
f"graph {environment}_struc is not correct. Found {found_title}, expected {env_title}."
else:
assert not v, \
f"graph {environment}_struc is missing or has wrong type."
found_env_count = len([ct for ct in g.environment_struc if ct is not None])
exp_env_count = len(environments)
assert found_env_count == exp_env_count, \
f'graph environment_struc is not correct. Found {found_env_count}, expected {exp_env_count} ct(s).'
def _check_graph_nodes(g: "graph.Graph", node_ids: List[str]):
"""
Check the node structures for a graph.
:param g: The graph object to check.
:param node_ids: List of expected node ids.
"""
assert g.number_of_nodes() > 0, "No nodes found in graph"
found_node_ids = [node.short_id for node in g.nodes_iter()]
extra_node_ids = set(found_node_ids) - set(node_ids)
missing_node_ids = set(node_ids) - set(found_node_ids)
assert len(extra_node_ids) == 0, \
f'Extra nodes found in graph: {extra_node_ids}'
assert len(missing_node_ids) == 0, \
f'Missing nodes in graph: {missing_node_ids}'
if g.fep_type in [
constants.FEP_TYPES.SMALL_MOLECULE,
constants.FEP_TYPES.METALLOPROTEIN,
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.ABSOLUTE_BINDING,
]:
for node in g.nodes_iter():
assert node.struc is not None, \
f"graph node {node.short_id} has empty struc."
assert node.protein_struc is None, \
f"graph node {node.short_id} has non-empty protein_struc."
elif g.fep_type in [
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.PROTEIN_STABILITY
]:
for node in g.nodes_iter():
assert node.struc is not None, \
f"graph node {node.short_id} has empty struc."
assert node.protein_struc is not None, \
f"graph node {node.short_id} has empty protein_struc."
def _check_fep_backend_log(log_contents: str):
"""
Check the Desmond backend log for expected contents.
:param log_contents: Desmond backend log file contents.
"""
assert 'GPU Desmond' in log_contents, \
'GPU Desmond is not detected.'
assert '32 FEP_GPGPU' in log_contents or '64 FEP_GPGPU' in log_contents, \
'Desmond is missing FEP_GPGPU license checkout in backend log file.'
def _check_fep_cms(cms_fname: str,
cts: List[structure.Structure],
leg: str,
fep_type: constants.FEP_TYPES,
membrane=False):
"""
Check input and output cms files for missing components.
Related cases: DESMOND-7328, DESMOND-8602, DESMOND-8894.
:param cms_fname: Filename of the cms.
:param cts: List of structures from the cms.
:param leg: Name of the leg to check.
:param fep_type: Type of fep used to run the job.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
ffio_types = Counter()
for ct in cts:
ffio_types[ct.property[constants.CT_TYPE]] += 1
if fep_type in [constants.FEP_TYPES.SMALL_MOLECULE]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 3
if leg == Legtype.COMPLEX.value else 2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
expected_ffio_types[constants.CT_TYPE.VAL.SOLUTE] = 1
if membrane and leg == Legtype.COMPLEX.value:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
elif fep_type in [
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY
]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 3
if leg == Legtype.COMPLEX.value else 2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if membrane:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
elif fep_type in [
constants.FEP_TYPES.METALLOPROTEIN,
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.PROTEIN_STABILITY
]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if membrane and leg == Legtype.COMPLEX.value:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
else:
assert False, f"Unknown fep_type: {fep_type}"
for ffio_type, count in expected_ffio_types.items():
assert ffio_types[ffio_type] == count, \
f'cms file {cms_fname} does not have {count} {ffio_type} ct(s), found {ffio_types[ffio_type]} ct(s). ' \
f'{ffio_types}'
def _check_fep_in_cfg(cfg_contents: str,
edge: "graph.Edge",
leg: str,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the in.cfg file given an graph edge.
:param cfg_contents: in.cfg file contents
:param edge: Corresponding edge from the graph.
:param leg: Name of the leg to check.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
cfg = sea.Map(cfg_contents)
expected_lambda = expected_lambdas[edge.simulation_protocol]
if edge.is_fragment_linking and leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
expected_lambda = expected_lambdas[
constants.FRAGMENT_LINKING_JOBS.VAL.FRAGMENT_HYDRATION]
assert cfg.fep['lambda'].val == expected_lambda, \
f'Wrong lambda schedule in in.cfg. " \
"Found {cfg.fep["lambda"].val}, expected: {expected_lambda}'
# Make sure macrocycle core-hopping has correct soft bond alpha
if edge.simulation_protocol == constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING:
assert pytest.approx(
cfg.backend['soft_bond_alpha']
.val) == 0.5, "Macrocycle missing soft bond alpha in in.cfg."
def _check_deltaE(deltaE_contents: str, threshold=300):
"""
Check the deltaE file contents to make sure all of the energy
Related cases: DESMOND-9425, DESMOND-9429.
:param deltaE_contents: Contents of the deltaE.txt file.
:param threshold: Maximum absolute value for the energy in kcal/mol.
"""
vals = []
for line in deltaE_contents.split('\n'):
line = line.strip()
match = REPLICA.match(line)
if match:
ef = float(match.group(3))
er = float(match.group(4))
vals.extend([ef, er])
vals = numpy.array(vals)
msg = "deltaE values larger than threshold"
assert numpy.abs(
vals.min()) < threshold, f"{msg}: |{vals.min()}| > {threshold}"
assert numpy.abs(
vals.max()) < threshold, f"{msg}: |{vals.max()}| > {threshold}"
def _check_fep_launcher(launcher_path: str,
g: "graph.Graph",
membrane=False,
deltaE_threshold=None,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the output files generated by the FepLauncher stage.
Related cases: DESMOND-7831, DESMOND-6582.
:param launcher_path: Path to the output files of the fep launcher stage.
:param g: Graph object to use for checking the output files.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
:param deltaE_threshold: If not None, maximum absolute value for the deltaE energy in
kcal/mol.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
job_name = '_'.join(os.path.basename(launcher_path).split('_')[:-1])
for e in g.edges_iter():
from_id, to_id = e.short_id
fep_type = g.fep_type
legs = [Legtype.COMPLEX.value, Legtype.SOLVENT.value]
if e.is_fragment_linking:
legs += list(constants.FRAGMENT_LINKING_HYDRATION_JOBS)
for leg in legs:
subjob_name = f'{job_name}_{from_id}_{to_id}_{leg}'
# Check that the job completed
multisim_log_fname = f'{os.path.join(launcher_path, subjob_name)}_multisim.log'
standard_workups.parse_log_file(multisim_log_fname,
'Multisim completed')
with open(multisim_log_fname) as f:
for line in f:
m = re.match(r'.*?stage (\d+) - lambda_hopping.*?', line)
if m:
lambda_hopping_idx = int(m.group(1))
break
# Check the lambda hopping out.tgz
lambda_hopping_tgz = os.path.join(
launcher_path, f'{subjob_name}_{lambda_hopping_idx}-out.tgz')
# /some/path/<JOBNAME>_8-out.tgz -> <JOBNAME>_8
lambda_hopping_stage = os.path.basename(
lambda_hopping_tgz.replace('-out.tgz', ''))
with tarfile.open(lambda_hopping_tgz) as tar:
tar_path = os.path.join(f'{subjob_name}_fep1',
lambda_hopping_stage)
if leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
tar_path = lambda_hopping_stage
basename = os.path.join(tar_path, lambda_hopping_stage)
log_contents = tar.extractfile(basename + '.log').read().decode(
'latin-1')
_check_fep_backend_log(log_contents)
# Check the -in.cfg file
# DESMOND-8811
cfg_contents = tar.extractfile(
f'{basename}-in.cfg').read().decode('latin-1')
_check_fep_in_cfg(cfg_contents, e, leg, expected_lambdas)
# Check the deltaE.txt file
# DESMOND-9445
if deltaE_threshold is not None:
deltaE = tar.extractfile(
f'{tar_path}/deltaE.txt').read().decode('latin-1')
_check_deltaE(deltaE, deltaE_threshold)
# Check cms files
def is_cms(fname) -> bool:
return fname.endswith('.cms') or fname.endswith(
'.cmsgz') or fname.endswith('.cms.gz')
assert len([m.name for m in tar.getmembers() if is_cms(m.name)]), \
f'No cms files found in {lambda_hopping_tgz}.'
for m in tar.getmembers():
if is_cms(m.name):
cms_fname = m.name
cms_contents_fobj = tar.extractfile(cms_fname)
if m.name.endswith('gz'):
cms_contents_fobj = gzip.GzipFile(
mode='r', fileobj=cms_contents_fobj)
replica_cts = list(
structure.StructureReader.fromString(
cms_contents_fobj.read().decode('latin-1')))
_check_fep_cms(
cms_fname,
replica_cts,
leg,
fep_type,
membrane=membrane)
def _check_edge_values(edge: "graph.Edge", values: Dict[str, object]):
"""
Check values associated with the graph edge.
:param edge: Edge to check
:param values: Dictionary with the keys 'simulation_protocol',
'ddg', 'ddg_err'. The values in the edge are compared
with the corresponding values here.
"""
# Check the simulation protocol
if values.get('simulation_protocol'):
assert edge.simulation_protocol == values['simulation_protocol'], \
f"Edge {edge.short_id} has incorrect simulation protocol. " \
f"Found {edge.simulation_protocol}, expected {values['simulation_protocol']}"
# Check free energy values.
if values.get('ddg'):
assert pytest.approx(edge.bennett_ddg.val, abs=0.5) == values['ddg'], \
f'{edge.short_id} bennett_ddg.val different. ' \
f'Found {edge.bennett_ddg.val}, expected {values["ddg"]}.'
assert pytest.approx(edge.bennett_ddg.unc, abs=0.5) == values['ddg_err'], \
f'{edge.short_id} bennett_ddg.unc different. ' \
f'Found {edge.bennett_ddg.unc}, expected {values["ddg_err"]}.'
assert edge.bennett_ddg is not None, f'{edge.short_id} bennett_ddg is None'
assert edge.ccc_ddg is not None, f'{edge.short_id} ccc_ddg is None'
[docs]def desmond_check_fep_results(
fmp_fname: str,
launcher_path: str,
fep_type: constants.FEP_TYPES,
expected: Dict[Tuple[str, str], Dict[str, object]],
membrane: bool = False,
min_frames: int = 21,
skip_parched: bool = False,
skip_sid: bool = False,
edges_to_skip_parched_check: Optional[List[Tuple[str, str]]] = None,
deltaE_threshold: Optional[float] = None,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the fep results given the fep type and an `expected`
a dictionary mapping expected edge short ids to their
corresponding attributes.
:param fmp_fname: Name of the output fmp to check.
:param launcher_path: Path containing the output of the FepLauncher stage.
:param fep_type: Type of fep used to run the job.
:param expected: Dictionary mapping edge short ids to
a dictionary of attributes to check.
The attributes are the simulation_protocol,
with the values `constants.SIMULATION_PROTOCOL`,
'ddg', 'ddg_err' with the values corresponding to
the free energy. If the attributes are not present,
the corresponding checks are skipped.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
:param min_frames: Minimum number of frames in the parched trajectories.
Default is 21 for the number of frames in a
standard 5 ns FEP job.
:param skip_parched: If True, skip checking the parched trajectories.
Default is False.
:param skip_sid: If True, skip checking for sid analysis. Default is False.
:param edges_to_skip_parched_check: A list of short edge ids for which to
skip checking for parched trajectory
when skip_parched is False. If
skip_parched is True then all edges
will be skipped.
:param deltaE_threshold: If not None, maximum absolute value for the deltaE energy in
kcal/mol.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
from schrodinger.application.scisol.packages.fep.graph import Graph
g = Graph.deserialize(fmp_fname)
id2edge = {e.short_id: e for e in g.edges_iter()}
edge_ids = set(expected.keys())
node_ids = {a[0] for a in edge_ids}.union({a[1] for a in edge_ids})
missing_in_graph = edge_ids - set(id2edge.keys())
additional_in_graph = set(id2edge.keys()) - edge_ids
assert len(missing_in_graph) == 0, \
f"Missing edges in graph: {missing_in_graph}"
assert len(additional_in_graph) == 0, \
f"Additional edges in graph: {additional_in_graph}"
# Check edges.
for edge_id, values in expected.items():
e = id2edge.get(edge_id)
# Check that edge is in the graph.
assert e is not None, f"{edge_id} not in {fmp_fname}."
_check_edge_values(e, values)
if not skip_sid:
expected_num_replicas = int(
expected_lambdas[e.simulation_protocol].split(':')[-1])
_check_edge_sid(e, expected_num_replicas, expected_num_replicas)
edges_to_skip_parched_check = edges_to_skip_parched_check or []
if not skip_parched and edge_id not in edges_to_skip_parched_check:
_check_edge_parched(e, min_frames)
_check_graph_environment(g, membrane)
_check_graph_nodes(g, node_ids)
_check_fep_launcher(launcher_path, g, membrane, deltaE_threshold,
expected_lambdas)
return True
[docs]def desmond_check_ab_fep_results(
fmp_fname: str,
expected_values: Dict[Tuple[str, str], Dict[str, object]],
min_frames: int,
expected_lambdas_complex: int,
expected_lambdas_solvent: int,
membrane: bool = False,
):
from schrodinger.application.scisol.packages.fep.graph import Graph
g = Graph.deserialize(fmp_fname)
id2edge = {e.short_id: e for e in g.edges_iter()}
edge_ids = set(expected_values.keys())
node_ids = {a[0] for a in edge_ids}.union({a[1] for a in edge_ids})
missing_in_graph = edge_ids - set(id2edge.keys())
additional_in_graph = set(id2edge.keys()) - edge_ids
assert len(missing_in_graph) == 0, \
f"Missing edges in graph: {missing_in_graph}"
assert len(additional_in_graph) == 0, \
f"Additional edges in graph: {additional_in_graph}"
# Check edges.
for edge_id, values in expected_values.items():
e = id2edge.get(edge_id)
# Check that edge is in the graph.
assert e is not None, f"{edge_id} not in {fmp_fname}."
_check_edge_values(e, values)
_check_edge_sid(e, expected_lambdas_complex, expected_lambdas_solvent)
_check_edge_parched(e, min_frames)
_check_graph_environment(g, membrane)
_check_graph_nodes(g, node_ids)
return True
[docs]def desmond_check_memory_usage(logfile: str,
cpu_limits: List[float],
gpu_limits: Optional[List[float]] = None):
"""
Get statistics on memory usage printed to desmond logfile and compare
to specified inputs.
:param logfile: The logfile containing the memory usages
:param cpu_limits: The maximum acceptable value for the mean and maximum CPU
memory usage in kB
:param gpu_limits: The maximum acceptable value for the mean and maximum GPU
memory usage in kB [optional]
"""
types_to_limits = {'CPU': cpu_limits}
if gpu_limits:
types_to_limits['GPU'] = gpu_limits
memory_usages = defaultdict(list)
with open(logfile) as f:
for line in f.readlines():
match = re.search(r'using (\d+) kB of (.*?) memory', line)
if match is not None:
usage = match.group(1)
memory_usages[match.group(2)].append(int(usage))
def check_limits(usages, limits, memtype):
if not usages:
raise ValueError(
f"No {memtype} memory usage statements found in log "
f"file")
usages_arr = numpy.array(usages)
mean_usage = usages_arr.mean()
max_usage = usages_arr.max()
mean_limit, max_limit = limits
print(f"{memtype} usage mean: {mean_usage}, max {max_usage}")
return mean_usage < mean_limit and max_usage < max_limit
return all([
check_limits(memory_usages[memtype], limits, memtype)
for memtype, limits in types_to_limits.items()
])
[docs]def custom_charge_ct_count(input_mae_fname: str) -> int:
"""
Return the count of cts that have a custom charge block.
"""
count = 0
with open(input_mae_fname, 'r') as f:
for l in f.readlines():
if 'mmffld_custom_prop' in l:
count += 1
return count
if __name__ == "__main__":
def extract_summary(string):
summary = ""
for line in string.split("\n"):
line = line.strip()
if line:
summary += " " + line
elif summary:
return summary.strip()
if '-h' in sys.argv and len(sys.argv) > 2:
for arg in sys.argv[2:]:
if arg in dir():
print(arg + ":")
print(eval(arg).__doc__)
elif len(sys.argv) > 2 and sys.argv[1] in dir():
if eval(sys.argv[1])(sys.argv[2:]):
print(" Success!")
else:
print(" Failure.")
else:
print("usage: %s <workup> <args>\n" % os.path.split(__file__)[-1])
print(extract_summary(__doc__), "\n")
print("available workups:")
for method in dir():
if method.startswith("desmond"):
method = eval(method)
description = method.__doc__
summary = extract_summary(description)
print(f" {method.__name__}: {summary}")