import atexit
import gc
import itertools
import os
import pickle
import random
import sys
import tempfile
import uuid
from optparse import OptionParser
from past.utils import old_div
from schrodinger import structure
from schrodinger.application.canvas import r_group
from schrodinger.infra import mm
from schrodinger.structutils import build
from schrodinger.structutils.rmsd import superimpose
from schrodinger.utils import fileutils
from schrodinger.utils import log
from . import constants
logger = log.get_output_logger(name="fep_mapping")
#logger.setLevel(log.DEBUG)
[docs]def cleanup(topdir=None):
"""
Cleanup function to be run atexit
"""
#Garbage collect objects in case we hit the cancel button and htere are
#any open Structure{Reader,Writer} objects, which will run fh.close() in
#the __del__ method.
gc.collect()
#See if we are running inside of maestro
if topdir:
fileutils.force_rmtree(topdir)
schrod_tmp = fileutils.get_directory_path(fileutils.TEMP)
if not os.path.exists(schrod_tmp):
os.makedirs(schrod_tmp)
TEMP_DIR = tempfile.mkdtemp(prefix="fep-mapping-tmp", dir=schrod_tmp)
atexit.register(cleanup, topdir=TEMP_DIR)
FRAGMENT_SIZE_THRESHOLD = 10
HEAVY_ATOM_NUM_THERESHOLD = 7
FEP_MAPPING_ORIGINAL_INDEX = 'i_fep_mapping_origial_index'
FEP_MAPPING_BRIDGE = 'i_fep_mapping_bridge'
DEFAULT_ATOM_TYPE = 11
[docs]class AtomMappingData:
"""
Class to store intermediary and final data structures for atom mapping
"""
[docs] def __init__(self, source_ct, dest_ct):
# corresponding matched atoms in two-element tuple. Each element is
# consist of a list of atom indices
self._core_atoms = ([], [])
#List of attachment point tuples. Indexed on attachment point.
# [(source_anchor_atom_index, source_bridge_atom_index,
# dest_anchor_atom_index, dest_bridge_atom_index)
# ]
# *anchor_atom_index refers to atom index of core atom that belongs to
# specific attachment, which *bridge_atom_index refers to atom index of
# non-core atom connected to anchor_atom
self._attachment_points = []
self._attached_fragments = []
self.source_ct = source_ct
self._initAtomProperty(self.source_ct)
self.dest_ct = dest_ct
self._initAtomProperty(self.dest_ct)
[docs] def addAttachmentPoint(self, attachment_point):
"""
Adding attachment point.
:type attachment_point: two-element tuple.
:param attachment_point:
"""
if attachment_point not in self._attachment_points:
self._attachment_points.append(attachment_point)
sa_index, sb_index, da_index, db_index = attachment_point
if sb_index:
self.source_ct.atom[sb_index].property[FEP_MAPPING_BRIDGE] += 1
if db_index:
self.dest_ct.atom[db_index].property[FEP_MAPPING_BRIDGE] += 1
[docs] def getAttachmentPoints(self):
return self._attachment_points
[docs] def getBridgeAtoms(self):
return [(sb, db) for sa, sb, da, db in self._attachment_points]
[docs] def getAnchorAtoms(self):
return [(sa, da) for sa, sb, da, db in self._attachment_points]
[docs] def extendCoreAtoms(self, core_atoms):
"""
Adding core atoms.
Note that, adding new core atoms can resulting smaller attached fragments
"""
source_core_atoms, dest_core_atoms = core_atoms
self._core_atoms[0].extend(source_core_atoms)
self._core_atoms[1].extend(dest_core_atoms)
#for i, source, dest in enumerate(zip(source_core_atoms, #dest_core_atoms)):
# if source not in self._core_atoms[0] and \
# dest not in self._core_atoms[1]:
# self._core_atoms[0].append(source)
# self._core_atoms[1].append(dest)
#FIXME
[docs] def getCoreAtoms(self):
return self._core_atoms
def _initAtomProperty(self, ct):
"""
Initialized Atom Properties.
Recording i_fep_mapping_origial_index. Current implementation of MCS has one limitation in which all of the atoms belong to MCS must be connected.
This will be a problem if we would are trying to map linker molecules
like:
R1-O-R2 ==> R1-C-R2
The workaround is to extract the attached fragment and call canvasMCS
to do another searching. The fragment atoms have different indices in
the newly extracted ct and original ct. We have to maintain the mapping.
Recording the original index is one way to solve this problem.
"""
for i, atom in enumerate(ct.atom):
atom.property[FEP_MAPPING_ORIGINAL_INDEX] = atom.index
atom.property[FEP_MAPPING_BRIDGE] = 0
[docs]def map_core_hydrogen(source_ct, dest_ct, core_map, s_remaining_indices,
d_remaining_indices):
"""
Mapping hydrogen atoms connected to the core. The number of hydrogen atoms
connected to the corresponding atoms should be equal except the core atoms
which have attachment.
"""
def find_hydrogen(atom, remaining_indices):
hydrogens = []
for bonded_atom in atom.bonded_atoms:
if bonded_atom.element == 'H' and bonded_atom.index in remaining_indices:
hydrogens.append(bonded_atom.index)
return hydrogens
hydrogen_map = []
for source_atom_index, dest_atom_index in core_map:
source_atom = source_ct.atom[source_atom_index]
dest_atom = dest_ct.atom[dest_atom_index]
source_hydrogen = find_hydrogen(source_atom, s_remaining_indices)
dest_hydrogen = find_hydrogen(dest_atom, d_remaining_indices)
hydrogen_map.extend(
itertools.zip_longest(source_hydrogen, dest_hydrogen))
return hydrogen_map
[docs]def find_fragment_by_cond(bridge_atom, cond):
fragment = []
stack = []
stack.append(bridge_atom)
visited = set()
while len(stack) > 0:
atom = stack.pop()
visited.add(atom.index)
if cond(atom.index):
fragment.append(atom)
for bonded in atom.bonded_atoms:
if cond(bonded.index) and bonded.index not in visited:
stack.append(bonded)
return fragment
[docs]def find_noncore_fragment(bridge_atom, core_atom_indices):
"""
Finding non-core fragment from bridged
:type bridge_atom: _StructureAtom
:param bridge_atom: atom bonded to the attachment core atom.
:type core_atoms: set of int
:param core_atoms: a set of core atom indices.
:return a list of atoms.
"""
def is_noncore_atom(index):
return index not in core_atom_indices
return find_fragment_by_cond(bridge_atom, is_noncore_atom)
[docs]def find_noncore_subfragment(bridge_atom, core_atom_indices, frag_atom_indices):
"""
Finding non-core fragment from bridged
:type bridge_atom: _StructureAtom
:param bridge_atom: atom bonded to the attachment core atom.
:type core_atom_indices: set of int
:param core_atom_indices: a set of core atom indices.
:return a list of atoms.
"""
def is_noncore_subfragment_atom(index):
return index not in core_atom_indices and index in frag_atom_indices
return find_fragment_by_cond(bridge_atom, is_noncore_subfragment_atom)
[docs]def find_all_fragment_pairs(source_ct, dest_ct, attachment_points,
core_atom_indices):
fragment_pairs = []
source_core_atom_indices, dest_core_atom_indices = core_atom_indices
source_frag_atom_visited = set([None])
dest_frag_atom_visited = set([None])
for fsa_index, fsb_index, fda_index, fdb_index in attachment_points:
fsb = None
if fsb_index:
fsb = source_ct.atom[fsb_index]
else:
# handle trivial source fragment
if fdb_index not in dest_frag_atom_visited:
df = find_noncore_fragment(dest_ct.atom[fdb_index],
dest_core_atom_indices)
fragment_pairs.append(([], df))
dest_frag_atom_visited.update([atom.index for atom in df])
fdb = None
if fdb_index:
fdb = dest_ct.atom[fdb_index]
else:
# handle trivial dest fragment
if fsb_index not in source_frag_atom_visited:
sf = find_noncore_fragment(source_ct.atom[fsb_index],
source_core_atom_indices)
fragment_pairs.append((sf, []))
source_frag_atom_visited.update([atom.index for atom in sf])
if fsb_index not in source_frag_atom_visited and \
fdb_index not in dest_frag_atom_visited:
#
sf = find_noncore_fragment(fsb, source_core_atom_indices)
df = find_noncore_fragment(fdb, dest_core_atom_indices)
fragment_pairs.append((sf, df))
source_frag_atom_visited.update([atom.index for atom in sf])
dest_frag_atom_visited.update([atom.index for atom in df])
return fragment_pairs
[docs]def get_atom_mapping(data):
"""
mapping atoms based on the result from RGA
:type data: AtomMappingData
:param data: intermediary and finals data structures from RGA
"""
atom_map = []
# mapping core atoms
core_atom_indices = data.getCoreAtoms()
source_core_atoms = core_atom_indices[0]
dest_core_atoms = core_atom_indices[1]
core_map = [(source_atom, dest_atom) for source_atom, dest_atom in \
zip(source_core_atoms, dest_core_atoms)]
atom_map.extend(core_map)
#print "core map", core_map
# mapping non-core atoms
#
attachment_points = data.getAttachmentPoints()
# @todo: refactor this data structure to a class
s2d_bridge_atom = {}
d2s_bridge_atom = {}
for sa_index, sb_index, da_index, db_index in attachment_points:
s2d_bridge_atom[sb_index] = db_index
d2s_bridge_atom[db_index] = sb_index
# matching bridge atoms
s_subfrag_indices = []
d_subfrag_indices = []
s_bridge_indices_visited = set([None])
d_bridge_indices_visited = set([None])
bridge_pair_visited = set()
tmp_list = []
for x in data.source_ct.find_rings():
tmp_list.extend(x)
source_ring_atoms = set(tmp_list)
tmp_list = []
for x in data.dest_ct.find_rings():
tmp_list.extend(x)
dest_ring_atoms = set(tmp_list)
#print attachment_points
for sa_index, sb_index, da_index, db_index in attachment_points:
if (sb_index, db_index) in bridge_pair_visited:
continue
bridge_pair_visited.add((sb_index, db_index))
s_core_atom_indices = set(core_atom_indices[0])
s_natom = data.source_ct.atom_total
s_frag_atom_indices = set(range(1, s_natom + 1)) - s_core_atom_indices
s_found_frag_indices = []
if sb_index:
s_found_frag = find_noncore_subfragment(
data.source_ct.atom[sb_index], s_core_atom_indices,
s_frag_atom_indices)
s_found_frag_indices = [atom.index for atom in s_found_frag]
s_subfrag_indices.extend(s_found_frag_indices)
d_core_atom_indices = set(core_atom_indices[1])
d_natom = data.dest_ct.atom_total
d_frag_atom_indices = set(range(1, d_natom + 1)) - d_core_atom_indices
d_found_frag_indices = []
if db_index:
d_found_frag = find_noncore_subfragment(data.dest_ct.atom[db_index],
d_core_atom_indices,
d_frag_atom_indices)
d_found_frag_indices = [atom.index for atom in d_found_frag]
d_subfrag_indices.extend(d_found_frag_indices)
# in case like R1-C-S-R2 ==> R1-S-R2, the S atom in the source ct
# can be both core atom and bridge atom. In this case, we should
# skip mapping it. Another mapping will be map C-S-R2 to dummy and
# dummy to S-R2. We may need more logic here to do a better mapping
if sb_index in s_core_atom_indices or db_index in d_core_atom_indices:
if sb_index in s_core_atom_indices and db_index in d_core_atom_indices:
raise RuntimeError(
"core atom pair (%d, %d) can not be attachment point" %
(sb_index, db_index))
elif sb_index in s_core_atom_indices:
atom_map.append((None, db_index))
elif db_index in d_core_atom_indices:
atom_map.append((sb_index, None))
else:
# @FIXME: may want to check element type etc to determine if it is
# a reasonable mapping between bridge atoms
if sb_index not in s_bridge_indices_visited and \
db_index not in d_bridge_indices_visited:
#atom_map.append((sb_index, db_index))
# Handle ring open and close case
if is_ring_open_or_closed(s_found_frag_indices,
d_found_frag_indices, s2d_bridge_atom,
d2s_bridge_atom):
#print "handling ring open/closed mapping"
atom_map.append((sb_index, None))
atom_map.append((None, db_index))
elif data.source_ct.atom[sb_index].property[FEP_MAPPING_BRIDGE] > 1 or \
data.dest_ct.atom[db_index].property[FEP_MAPPING_BRIDGE] > 1:
atom_map.append((sb_index, db_index))
# if sb_index in source_ring_atoms and db_index in dest_ring_atoms:
# atom_map.append((sb_index, db_index))
# else:
# atom_map.append((sb_index, None))
# atom_map.append((None, db_index))
else:
atom_map.append((sb_index, None))
atom_map.append((None, db_index))
#atom_map.append((sb_index, db_index))
elif sb_index in s_bridge_indices_visited:
atom_map.append((None, db_index))
elif db_index in d_bridge_indices_visited:
atom_map.append((sb_index, None))
else:
raise RuntimeError("Unknown error.")
s_bridge_indices_visited.add(sb_index)
d_bridge_indices_visited.add(db_index)
s_subfrag_indices = set(s_subfrag_indices)
# remove bridge atoms
s_subfrag_indices.difference_update(s_bridge_indices_visited)
d_subfrag_indices = set(d_subfrag_indices)
# remove bridge atoms
d_subfrag_indices.difference_update(d_bridge_indices_visited)
atom_map.extend([(index, None) for index in s_subfrag_indices])
atom_map.extend([(None, index) for index in d_subfrag_indices])
# mapping remaining hydrogens attached to core
s_mapped_indices = set()
d_mapped_indices = set()
#print "heavy atom map: ", atom_map
for si, di in atom_map:
# si or di could be None because the fragments have been mapped
if si:
s_mapped_indices.add(si)
if di:
d_mapped_indices.add(di)
s_natom = data.source_ct.atom_total
s_remaining_indices = set(range(1, s_natom + 1)) - s_mapped_indices
d_natom = data.dest_ct.atom_total
d_remaining_indices = set(range(1, d_natom + 1)) - d_mapped_indices
# If we have other heavy atoms left at this point, something is screwed up
for index in s_remaining_indices:
atom = data.source_ct.atom[index]
if atom.element != 'H':
raise RuntimeError("Unexpected heavy atom (%s:%d)" % (atom.element,
index))
for index in d_remaining_indices:
atom = data.dest_ct.atom[index]
if atom.element != 'H':
raise RuntimeError("Unexpected heavy atom (%s:%d)" % (atom.element,
index))
core_hydrogen_map = map_core_hydrogen(data.source_ct, data.dest_ct,
core_map, s_remaining_indices,
d_remaining_indices)
atom_map.extend(core_hydrogen_map)
atom_map = unique_list(atom_map)
if not check_mapping(atom_map, data.source_ct.atom_total,
data.dest_ct.atom_total):
raise RuntimeError(
"get_atom_mapping generates corrupted mapping: %s\n" % atom_map)
return (atom_map, core_hydrogen_map)
[docs]def get_fepio_atom_mapping(atom_mapping):
"""
converting to atom mapping format required by mmfepio library.
Below is an example:
1 1
2 2
3 -1
4 -1
5 3
-6 4
-7 5
"""
fep_mapping = []
source_dummy_index = 0
s_natom = len(
[source for source, dest in atom_mapping if source is not None])
s_dummy_begin_index = -(s_natom + 1)
for s_index, d_index in atom_mapping:
if s_index is None:
s_index = s_dummy_begin_index
s_dummy_begin_index -= 1
if d_index is None:
d_index = -1
fep_mapping.append((s_index, d_index))
return fep_mapping
[docs]def fragment_size_cond(frag1, frag2, threshold=FRAGMENT_SIZE_THRESHOLD):
if len(frag1) > threshold and len(frag2) > threshold:
return True
return False
[docs]def heavy_atom_num_cond(frag1, frag2, threshold=HEAVY_ATOM_NUM_THERESHOLD):
def count_heavy_atom(frag):
return len([atom for atom in frag if atom.element != 'H'])
if count_heavy_atom(frag1) > threshold and \
count_heavy_atom(frag2) > threshold:
return True
return False
[docs]def get_heavy_fragment(frag):
return [atom.index for atom in frag if atom.element != 'H']
[docs]def find_fragment_MCS(data,
cond=heavy_atom_num_cond,
atomtype=DEFAULT_ATOM_TYPE):
"""
Finding MCS in fragment if the fragment pair satisfied specified conditions.
@FIXME: need to convert it to a recursive algorithm
"""
frag_pair_stack = []
core_atoms = data.getCoreAtoms()
core_atom_indices = list(map(set, core_atoms))
attachment_points = data.getAttachmentPoints()
fragment_pairs = find_all_fragment_pairs(
data.source_ct, data.dest_ct, attachment_points, core_atom_indices)
frag_pair_stack.extend(fragment_pairs)
ap_visited = set(attachment_points)
# n_files = 0
while len(frag_pair_stack) > 0:
source_frag, dest_frag = frag_pair_stack.pop()
if cond(source_frag, dest_frag):
# Extract fragments
source_frag_indices = [atom.index for atom in source_frag]
source_frag_ct = data.source_ct.extract(
source_frag_indices, copy_props=True)
dest_frag_indices = [atom.index for atom in dest_frag]
dest_frag_ct = data.dest_ct.extract(
dest_frag_indices, copy_props=True)
#print "source_frag: ",[atom.index for atom in source_frag_ct.atom \
#if atom.property[FEP_MAPPING_BRIDGE] > 0]
#print "dest_frag: ", [atom.index for atom in dest_frag_ct.atom \
#if atom.property[FEP_MAPPING_BRIDGE] > 0]
#print "source_frag: ",[atom.index for atom in source_frag_ct.atom ]
#print "dest_frag: ", [atom.index for atom in dest_frag_ct.atom ]
# @todo: We need to use different approaches for different types
# of fragment. For instance, for medium and large size fragment,
# we call RGA again. For smaller fragment, we need different
# algorithm. For instance, if one fragment is ring fragment and the
# other is non-ring fragment, we can consider break the ring and do
# a search
# Run RGA
# FIXME: move TEMP_DIR to setting class
input_file = os.path.join(TEMP_DIR, "%s.mae" % uuid.uuid4())
writer = structure.StructureWriter(input_file)
for ct in [source_frag_ct, dest_frag_ct]:
writer.append(ct)
# shutil.copyfile(input_file, "linker_rec%d.mae" % n_files)
# n_files += 1
writer.close()
try:
frag_data = run_rga(input_file, atomtype=atomtype)
logger.error("Warning: recursive call to rga:\n ")
except RuntimeError as e:
logger.error(
"Warning: fail to run RGA when trying to match:\n ")
logger.error("%s ==> %s" % (get_heavy_fragment(source_frag),
get_heavy_fragment(dest_frag)))
logger.error(e)
# for now, we just skip the matching between these two fragments.
# in other words, the fragment atoms will be mapped to dummy.
continue
if not frag_data:
continue
# Map atom index in fragment RGA to original atom index
fs_map = [atom.property[FEP_MAPPING_ORIGINAL_INDEX] \
for atom in source_frag_ct.atom]
fd_map = [atom.property[FEP_MAPPING_ORIGINAL_INDEX] \
for atom in dest_frag_ct.atom]
frag_attachment_points = frag_data.getAttachmentPoints()
if len(frag_attachment_points) > 0:
frag_source_core_atoms, frag_dest_core_atoms = frag_data.getCoreAtoms(
)
new_source_core_atoms = [
fs_map[atid - 1] for atid in frag_source_core_atoms
]
new_destcore_atoms = [
fd_map[atid - 1] for atid in frag_dest_core_atoms
]
data.extendCoreAtoms((new_source_core_atoms,
new_destcore_atoms))
s_core_atom_indices, d_core_atom_indices = list(
map(set, data.getCoreAtoms()))
fs_atom_indices = [atom.index for atom in source_frag]
fd_atom_indices = [atom.index for atom in dest_frag]
for fs_anchor_index, fs_bridge_index, \
fd_anchor_index, fd_bridge_index \
in frag_data.getAttachmentPoints():
fsa_index = None
fsb_index = None
fda_index = None
fdb_index = None
if fs_anchor_index:
fsa_index = fs_map[fs_anchor_index - 1]
if fs_bridge_index:
fsb_index = fs_map[fs_bridge_index - 1]
if fd_anchor_index:
fda_index = fd_map[fd_anchor_index - 1]
if fd_bridge_index:
fdb_index = fd_map[fd_bridge_index - 1]
new_ap = (fsa_index, fsb_index, fda_index, fdb_index)
if new_ap not in ap_visited:
ap_visited.add(new_ap)
data.addAttachmentPoint(new_ap)
fsb = None
fdb = None
if fsb_index and fdb_index:
fsb = data.source_ct.atom[fsb_index]
fdb = data.dest_ct.atom[fdb_index]
s_subfrag = find_noncore_subfragment(
fsb, s_core_atom_indices, fs_atom_indices)
d_subfrag = find_noncore_subfragment(
fdb, d_core_atom_indices, fd_atom_indices)
frag_pair_stack.append((s_subfrag, d_subfrag))
elif fsb_index:
fsb = data.source_ct.atom[fsb_index]
s_subfrag = find_noncore_subfragment(
fsb, s_core_atom_indices, fs_atom_indices)
frag_pair_stack.append((s_subfrag, []))
elif fdb_index:
fdb = data.dest_ct.atom[fdb_index]
d_subfrag = find_noncore_subfragment(
fdb, d_core_atom_indices, fd_atom_indices)
frag_pair_stack.append(([], d_subfrag))
else:
logger.info(
"All bridge atoms in fragment pairs are dummy")
[docs]def convert_rga_data(rga_data, input_file):
"""
Converting RGA internal data to AtomMappingData object
:type rga_data: r_group_analysis.Data
:param rga_data:
:type input_file: string
:param input_file: input file
"""
# read cts from input file based on offsets from data object
with structure.MaestroReader(input_file) as reader:
cts = []
for position in rga_data.get_ct_offsets():
cts.append(reader.read(position=position))
if len(cts) != 2:
raise RuntimeError("The number of cts is not equal to 2")
source_ct = cts[0]
dest_ct = cts[1]
source_core_atoms = rga_data.core_atoms[0][0]
dest_core_atoms = rga_data.core_atoms[1][0]
attachment_points = rga_data.get_attachment_points()
source_attachments = attachment_points[0]
dest_attachments = attachment_points[1]
mapping_data = convert_data(source_ct, dest_ct, source_core_atoms,
dest_core_atoms, source_attachments,
dest_attachments)
return mapping_data
[docs]def convert_data(source_ct, dest_ct, source_core_atoms, dest_core_atoms,
source_attachments, dest_attachments):
"""
source_ct:
dest_ct:
source_core_atoms: list of source core atoms
dest_core_atoms: list of dest core atoms
source_attachments: list of source attachment points
dest_attachments: list of dest attachment points
"""
mapping_data = AtomMappingData(source_ct, dest_ct)
# adding core atoms
mapping_data.extendCoreAtoms((source_core_atoms, dest_core_atoms))
print("core atoms", [(source_core_atoms[x], dest_core_atoms[x])
for x in range(0, len(source_core_atoms))])
# adding attachment point
source_core_atoms_set = set(source_core_atoms)
dest_core_atoms_set = set(dest_core_atoms)
def dot_product(a, b):
return sum(x * y for (x, y) in zip(a, b))
# print "source att: ", [ (bridge.index, anchor.index) for (bridge, anchor, order) in source_attachments]
# print "dest att: ", [ (bridge.index, anchor.index) for (bridge, anchor, order) in dest_attachments]
# find equivalent attachment point
index = 0
source_switched = set([])
dest_switched = set([])
for source_attachment, dest_attachment in zip(source_attachments,
dest_attachments):
source_anchor, source_bridge, source_order = source_attachment
dest_anchor, dest_bridge, dest_order = dest_attachment
if not source_bridge or \
not dest_bridge:
continue
source_diff = [
source_bridge.x - source_anchor.x,
source_bridge.y - source_anchor.y, source_bridge.z - source_anchor.z
]
dest_diff = [
dest_bridge.x - dest_anchor.x, dest_bridge.y - dest_anchor.y,
dest_bridge.z - dest_anchor.z
]
dot = dot_product(source_diff, dest_diff)
if (dest_bridge, dest_anchor) in dest_switched:
continue
for sa_bonded in source_anchor.bonded_atoms:
#test bond vector
dot1 = dot_product([
sa_bonded.x - source_anchor.x, sa_bonded.y - source_anchor.y,
sa_bonded.z - source_anchor.z
], dest_diff)
# and source_bridge.element == "H" \
if dot1 > 0 and dot1 > dot and sa_bonded.element == source_bridge.element \
and sa_bonded.index not in source_core_atoms_set and (source_bridge, source_anchor) not in source_switched:
switched = False
#find another attchment point
for i in range(len(source_attachments)):
if (source_attachments[i][0],
source_attachments[i][1]) == (source_anchor,
sa_bonded):
#switch
logger.debug(
"switch with another attachment point %i to %i" %
(source_bridge.index, sa_bonded.index))
source_attachments[i][1] = source_attachments[index][1]
source_attachments[index][1] = sa_bonded
switched = True
# logger.debug("source att: %" % [ (bridge.index, anchor.index) for (bridge, anchor, order) in source_attachments])
source_switched.add((source_bridge, source_anchor))
break
if not switched:
#alternative core atom for anchor
logger.debug("source switch %i to %i, %s to %s" %
(source_bridge.index, sa_bonded.index,
source_bridge.element, sa_bonded.element))
source_attachments[index][1] = sa_bonded
if (source_bridge, source_anchor) in source_switched:
continue
for da_bonded in dest_anchor.bonded_atoms:
#test bond vector
dot1 = dot_product([
da_bonded.x - dest_anchor.x, da_bonded.y - dest_anchor.y,
da_bonded.z - dest_anchor.z
], source_diff)
# and dest_bridge.element == "H" \
if dot1 > 0 and dot1 > dot and da_bonded.element == dest_bridge.element \
and da_bonded.index not in dest_core_atoms_set and (dest_bridge, dest_anchor) not in dest_switched:
switched = False
#find another attchment point
for i in range(len(dest_attachments)):
if (dest_attachments[i][0],
dest_attachments[i][1]) == (dest_anchor, da_bonded):
#switch
logger.debug(
"switch with another attachment point %i to %i" %
(dest_bridge.index, da_bonded.index))
dest_attachments[i][1] = dest_attachments[index][1]
dest_attachments[index][1] = da_bonded
switched = True
# logger.debug("dest att: %" % [ (bridge.index, anchor.index) for (bridge, anchor, order) in dest_attachments])
dest_switched.add((dest_bridge, dest_anchor))
break
if not switched:
#alternative core atom for anchor
logger.debug("dest switch %i to %i, %s to %s" %
(dest_bridge.index, da_bonded.index,
dest_bridge.element, da_bonded.element))
dest_attachments[index][1] = da_bonded
index += 1
for source_attachment, dest_attachment in zip(source_attachments,
dest_attachments):
source_anchor, source_bridge, source_order = source_attachment
dest_anchor, dest_bridge, dest_order = dest_attachment
if source_anchor.index > source_ct.atom_total:
continue
if source_bridge:
if source_bridge.index > source_ct.atom_total:
logger.warning("skipping attachment point: (%d, %d, %d, %d)" %
(source_anchor.index, source_bridge.index,
dest_anchor.index, dest_bridge.index))
return None
# In case we extract fragment, RGA may add hydrogen to the extracted
# fragment, in that case, we may get an out of index error. We probably
# can find a way to map in back. Just ignore this kind of case for now.
if dest_anchor.index > dest_ct.atom_total:
continue
if dest_bridge:
if dest_bridge.index > dest_ct.atom_total:
logger.warning("skipping attachment point: (%d, %d, %d, %d)" %
(source_anchor.index, source_bridge.index,
dest_anchor.index, dest_bridge.index))
return None
source_b_index = None
if source_bridge:
source_b_index = source_bridge.index
dest_b_index = None
if dest_bridge:
dest_b_index = dest_bridge.index
mapping_data.addAttachmentPoint((source_anchor.index, source_b_index,
dest_anchor.index, dest_b_index))
return mapping_data
[docs]def get_atom_mapping_data(input_file, atomtype=DEFAULT_ATOM_TYPE):
data = run_rga(input_file, atomtype=atomtype)
find_fragment_MCS(data, atomtype=atomtype)
return data
[docs]def run_rga(input_file, atomtype=11):
if not os.path.exists(input_file):
raise IOError("%s does not exist" % input_file)
rga_data, message = r_group.get_rgroup_MCS(input_file, atomtype)
if rga_data is None:
raise RuntimeError(message)
data = convert_rga_data(rga_data, input_file)
return data
[docs]def unique_list(seq, idfun=None):
"""
A fast function to remove duplicate element from sequence by preserving
the order at the same time.
"""
# order preserving
if idfun is None:
def idfun(x):
return x
seen = {}
result = []
for item in seq:
marker = idfun(item)
if marker in seen:
continue
seen[marker] = 1
result.append(item)
return result
[docs]def get_atom_marking(data, output_file):
pass
[docs]def get_bridge_atoms(data):
bridge_atoms = [(sb, db) for sa, sb, da, db in data.getAttachmentPoints()]
return list(set(bridge_atoms))
[docs]def check_mapping(data, n1, n2):
a1_set = set()
a2_set = set()
for a1, a2 in data:
if a1:
if a1 in a1_set:
logger.warning(
"first element (%d) shows up more than once in %s" % (a1,
data))
return False
a1_set.add(a1)
if a2:
if a2 in a2_set:
logger.warning(
"second element (%d) shows up more than once in %s" %
(a2, data))
return False
a2_set.add(a2)
if a1_set != set(range(1, n1 + 1)) or a2_set != set(range(1, n2 + 1)):
logger.warning("missing indices")
return False
return True
[docs]def reorder_st(st, index_map):
"""
:type ct: Structure
:param ct: Structure object
:type index_map: list of tuple
:param index_map: index mapping between old and new index
[(new_index, old_index), ...]
"""
new_to_old_map = [0] * (len(st.atom) + 1)
for old_index, new_index in index_map:
new_to_old_map[new_index] = old_index
new_st = st.copy()
# The ordering array must have natom + 1 elements for the C code
mm.mmct_ct_reorder(new_st, st, new_to_old_map)
return new_st
[docs]def is_ring_open_or_closed(s_frag_indices, d_frag_indices, s2d_bridge_atom,
d2s_bridge_atom):
s_brdige_atoms = set([index for index in s_frag_indices \
if index in s2d_bridge_atom])
d_brdige_atoms = set([index for index in d_frag_indices \
if index in d2s_bridge_atom])
d_mapped_bridge_atoms = set(
[s2d_bridge_atom[index] for index in s_brdige_atoms])
s_mapped_bridge_atoms = set(
[d2s_bridge_atom[index] for index in d_brdige_atoms])
return s_brdige_atoms != s_mapped_bridge_atoms or \
d_brdige_atoms != d_mapped_bridge_atoms
[docs]def write_fepsubst_to_file(data, filename, overwrite=True):
#bridge atoms marked by 99, 199, 299, etc incrementing with fragment number
#linker/ring atoms markde by 97, 197, 297, etc incrementing with fragment number
source_ct = data.source_ct.copy()
dest_ct = data.dest_ct.copy()
attachment_points = data.getAttachmentPoints()
# sort the attachment points so that the subst-codes generated are the same
attachment_points.sort(key=lambda tup: tup[0])
bridge_atoms = data.getBridgeAtoms()
core_atoms = data.getCoreAtoms()
logger.debug("att points: %s" % attachment_points)
(atom_mapping, core_h_map) = get_atom_mapping(data)
core_h_map.sort(key=lambda tup: tup[0])
tmp_list = []
for x in source_ct.find_rings():
tmp_list.extend(x)
source_ring_atoms = set(tmp_list)
tmp_list = []
for x in dest_ct.find_rings():
tmp_list.extend(x)
dest_ring_atoms = set(tmp_list)
source_ct.property[constants.FEP_FRAGNAME] = "none"
dest_ct.property[constants.FEP_FRAGNAME] = "product"
for atom in source_ct.atom:
atom.property[constants.FEP_SUBST] = 1
for atom in dest_ct.atom:
atom.property[constants.FEP_SUBST] = 1
for atom in dest_ct.atom:
atom.property[constants.FEP_MAPPING] = 0
for (s, d) in zip(core_atoms[0], core_atoms[1]):
dest_ct.atom[d].property[constants.FEP_MAPPING] = s
h_offset = 0
for (hs, hd) in core_h_map:
logger.debug("core hydrogen: %s %s" % (hs, hd))
if hd and hs:
dest_ct.atom[hd].property[constants.FEP_MAPPING] = hs
elif hd:
dest_ct.atom[hd].property[constants.FEP_MAPPING] = 0
d_anchor = [x for x in dest_ct.atom[hd].bonded_atoms]
s_anchor = 0
s_core_atoms, d_core_atoms = core_atoms
for (i, d_core) in enumerate(d_core_atoms):
if d_core == d_anchor[0].index:
s_anchor = s_core_atoms[i]
break
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + h_offset
dest_ct.atom[d_anchor[0].index].property[
constants.FEP_SUBST] = 98 + h_offset
dest_ct.atom[hd].property[constants.FEP_SUBST] = 99 + h_offset
h_offset += 100
elif hs:
# dest_ct.atom[hd].property[constants.FEP_MAPPING] = 0
s_anchor = [x for x in source_ct.atom[hs].bonded_atoms]
d_anchor = 0
s_core_atoms, d_core_atoms = core_atoms
for (i, s_core) in enumerate(s_core_atoms):
if s_core == s_anchor[0].index:
d_anchor = d_core_atoms[i]
break
source_ct.atom[hs].property[constants.FEP_SUBST] = 99 + h_offset
source_ct.atom[s_anchor[0].index].property[
constants.FEP_SUBST] = 98 + h_offset
dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + h_offset
h_offset += 100
fragment_pairs = find_all_fragment_pairs(source_ct, dest_ct,
attachment_points, core_atoms)
fragment_pairs = find_all_fragment_pairs(source_ct, dest_ct,
attachment_points, core_atoms)
i = 0
for (s_frag, d_frag) in fragment_pairs:
logger.debug("pair %d" % i)
logger.debug([x.index for x in s_frag])
logger.debug([x.index for x in d_frag])
i = i + 1
# group the fragment pairs in terms of anchor atoms
def get_fragment_index(fragment_pairs, s_bridge, d_bridge):
for (s_frag, d_frag) in fragment_pairs:
s_frag_atoms = []
d_frag_atoms = []
for x in s_frag:
s_frag_atoms.append(x.index)
for x in d_frag:
d_frag_atoms.append(x.index)
if len(s_frag_atoms) > 0:
if s_bridge in s_frag_atoms:
return (s_frag_atoms, d_frag_atoms)
if len(d_frag_atoms) > 0:
if d_bridge in d_frag_atoms:
return (s_frag_atoms, d_frag_atoms)
raise RuntimeError(
"Cannot find bridge atoms in fragments, bridge atoms: %d %d" %
(s_bridge, d_bridge))
fragment_groups = {}
for (s_anchor, s_bridge, d_anchor, d_bridge) in attachment_points:
(s_frag_atoms, d_frag_atoms) = get_fragment_index(
fragment_pairs, s_bridge, d_bridge)
if (s_anchor, d_anchor) in fragment_groups:
(s_frags, d_frags) = fragment_groups[(s_anchor, d_anchor)]
s_frags.append(s_frag_atoms)
d_frags.append(d_frag_atoms)
else:
fragment_groups[(s_anchor, d_anchor)] = ([s_frag_atoms],
[d_frag_atoms])
frag_offset = h_offset
for (s_anchor, s_bridge, d_anchor, d_bridge) in attachment_points:
(s_frags, d_frags) = fragment_groups[(s_anchor, d_anchor)]
if s_bridge and d_bridge:
#map linker/ring bridge atoms to each other
if source_ct.atom[s_bridge].property["i_fep_mapping_bridge"] > 1 and dest_ct.atom[d_bridge].property["i_fep_mapping_bridge"] > 1:
dest_ct.atom[d_bridge].property[
constants.FEP_MAPPING] = s_bridge
if source_ct.atom[s_bridge].property[constants.FEP_SUBST] > 1 and \
dest_ct.atom[d_bridge].property[constants.FEP_SUBST] > 1:
#already assigned, must be a linker or ring fragment
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + int(
old_div(source_ct.atom[s_bridge]
.property[constants.FEP_SUBST], 100)) * 100
dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int(
old_div(dest_ct.atom[d_bridge]
.property[constants.FEP_SUBST], 100)) * 100
continue
elif source_ct.atom[s_bridge].property[constants.
FEP_SUBST] == 1 and dest_ct.atom[d_bridge].property[constants.
FEP_SUBST] == 1:
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
dest_ct.atom[d_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
else:
#ring closure would cause this
raise RuntimeError("Invalid fepsubst code for bridge atoms s_anchor:%d d_anchor:%d sb:%d %d db:%d %d" \
% (s_anchor, d_anchor, s_bridge, source_ct.atom[s_bridge].property[constants.FEP_SUBST], d_bridge, dest_ct.atom[d_bridge].property[constants.FEP_SUBST]))
elif s_bridge:
if source_ct.atom[s_bridge].property[constants.FEP_SUBST] > 1:
#already assigned, must be a linker or ring fragment
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + int(
old_div(source_ct.atom[s_bridge]
.property[constants.FEP_SUBST], 100)) * 100
dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int(
old_div(source_ct.atom[s_bridge]
.property[constants.FEP_SUBST], 100)) * 100
continue
elif source_ct.atom[s_bridge].property[constants.FEP_SUBST] == 1:
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
dest_ct.atom[d_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
elif d_bridge:
if dest_ct.atom[d_bridge].property[constants.FEP_SUBST] > 1:
#already assigned, must be a linker or ring fragment
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + int(
old_div(dest_ct.atom[d_bridge]
.property[constants.FEP_SUBST], 100)) * 100
dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int(
old_div(dest_ct.atom[d_bridge]
.property[constants.FEP_SUBST], 100)) * 100
continue
elif dest_ct.atom[d_bridge].property[constants.FEP_SUBST] == 1:
source_ct.atom[s_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
dest_ct.atom[d_anchor].property[
constants.FEP_SUBST] = 98 + frag_offset
else:
raise RuntimeError("all bridge atoms are dummy")
att_offset = 0
for s_frag in s_frags:
s_frag_bg_atoms = []
for x in s_frag:
source_ct.atom[x].property[constants.FEP_SUBST] = 2
logger.debug("s_frag bg %s" % x)
if source_ct.atom[x].property["i_fep_mapping_bridge"] > 0:
s_frag_bg_atoms.append(x)
if len(s_frag_bg_atoms) == 1:
#single attachment bond
if source_ct.atom[s_frag_bg_atoms[0]].property[
"i_fep_mapping_bridge"] == 1:
source_ct.atom[s_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 99 + frag_offset - att_offset
else:
if s_frag_bg_atoms[0] in source_ring_atoms:
source_ct.atom[s_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 97 + frag_offset
else:
source_ct.atom[s_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 97 + frag_offset
elif len(s_frag_bg_atoms) > 1:
#linker/ring fragment
for index in s_frag_bg_atoms:
print("source: ", index, source_ring_atoms)
if index in source_ring_atoms:
source_ct.atom[index].property[
constants.FEP_SUBST] = 97 + frag_offset
else:
source_ct.atom[index].property[
constants.FEP_SUBST] = 97 + frag_offset
att_offset = att_offset + 10
att_offset = 0
for d_frag in d_frags:
d_frag_bg_atoms = []
for x in d_frag:
dest_ct.atom[x].property[constants.FEP_SUBST] = 2
logger.debug("d_frag bg %s" % x)
if dest_ct.atom[x].property["i_fep_mapping_bridge"] > 0:
d_frag_bg_atoms.append(x)
if len(d_frag_bg_atoms) == 1:
#single attachment bond
if dest_ct.atom[d_frag_bg_atoms[0]].property[
"i_fep_mapping_bridge"] == 1:
dest_ct.atom[d_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 99 + frag_offset - att_offset
else:
if d_frag_bg_atoms[0] in dest_ring_atoms:
dest_ct.atom[d_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 97 + frag_offset
else:
dest_ct.atom[d_frag_bg_atoms[0]].property[
constants.FEP_SUBST] = 97 + frag_offset
elif len(d_frag_bg_atoms) > 1:
#linker/ring fragment
for index in d_frag_bg_atoms:
print("dest: ", index, dest_ring_atoms)
if index in dest_ring_atoms:
dest_ct.atom[index].property[
constants.FEP_SUBST] = 97 + frag_offset
else:
dest_ct.atom[index].property[
constants.FEP_SUBST] = 97 + frag_offset
att_offset = att_offset + 10
frag_offset = frag_offset + 100
writer = structure.StructureWriter(filename, overwrite=overwrite)
for ct in [source_ct, dest_ct]:
writer.append(ct)
writer.close()
return (source_ct, dest_ct)
[docs]def reorder_atoms(data, source_ct, dest_ct):
source_index_map = []
dest_index_map = []
(atom_map, core_h_map) = get_atom_mapping(data)
# handle mapping physical atoms.
for source_index, dest_index in atom_map:
if source_index and dest_index:
source_index_map.append((len(source_index_map) + 1, source_index))
dest_index_map.append((len(dest_index_map) + 1, dest_index))
# handle mapping between physical atom and dummy atom.
for source_index, dest_index in atom_map:
if not source_index:
dest_index_map.append((len(dest_index_map) + 1, dest_index))
if not dest_index:
source_index_map.append((len(source_index_map) + 1, source_index))
source_ct = reorder_st(source_ct, source_index_map)
dest_ct = reorder_st(dest_ct, dest_index_map)
# print out atom mapping in newly created maestro file
source_old_to_new = [None] * (source_ct.atom_total + 1)
for new, old in source_index_map:
source_old_to_new[old] = new
dest_old_to_new = [None] * (dest_ct.atom_total + 1)
for new, old in dest_index_map:
dest_old_to_new[old] = new
new_atom_map = []
for source_index, dest_index in atom_map:
if source_index:
new_source_index = source_old_to_new[source_index]
else:
new_source_index = None
if dest_index:
new_dest_index = dest_old_to_new[dest_index]
else:
new_dest_index = None
new_atom_map.append((new_source_index, new_dest_index))
new_bridge_atoms = []
for source_index, dest_index in get_bridge_atoms(data):
new_bridge_atoms.append((source_old_to_new[source_index],
dest_old_to_new[dest_index]))
return (new_atom_map, new_bridge_atoms)
[docs]def check1_subst_code(source_ct, dest_ct):
"""
take input cts and check single ring-atom/attachment subst_code, return true if pass
"""
def find_marked_atoms(ct):
atoms_98 = []
atoms_99 = []
atoms_97 = []
for atom in ct.atom:
if atom.property[constants.FEP_SUBST] == 98:
atoms_98.append(atom)
if atom.property[constants.FEP_SUBST] == 97:
atoms_97.append(atom)
if atom.property[constants.FEP_SUBST] == 99:
atoms_99.append(atom)
return (atoms_98, atoms_97, atoms_99)
(source_98_atoms, source_97_atoms,
source_99_atoms) = find_marked_atoms(source_ct)
(dest_98_atoms, dest_97_atoms, dest_99_atoms) = find_marked_atoms(dest_ct)
# print source_98_atoms, source_97_atoms, source_99_atoms
if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0:
logger.error("No anchor atoms found")
return False
if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len(
source_97_atoms) == 0 and len(dest_97_atoms) == 0:
logger.error("No bridge atoms found")
return False
if len(source_97_atoms) != len(dest_97_atoms):
logger.error(
"Number of linker/ring atoms changed not the same in product and reatant"
)
return False
if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1
) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1:
logger.error("Too many attachment point detected")
return False
if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1:
if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2:
logger.error("Not enough connection for ring/linker atom")
return False
for atoms_98 in source_98_atoms:
if atoms_98 not in source_97_atoms[0].bonded_atoms:
logger.error(
"anchor atom not bonded to ring/linker atom in original molecule"
)
return False
for atoms_98 in dest_98_atoms:
if atoms_98 not in dest_97_atoms[0].bonded_atoms:
logger.error(
"anchor atom not bonded to ring/linker atom in mutated molecule"
)
return False
return True
[docs]def random_shuffle(data, atomtype):
'''
generate atommap with random shuffled atom order, return atommaps in original order
'''
source_ct = data.source_ct.copy()
dest_ct = data.dest_ct.copy()
source_permut = list(range(1, source_ct.atom_total + 1))
dest_permut = list(range(1, dest_ct.atom_total + 1))
random.shuffle(source_permut)
random.shuffle(dest_permut)
source_map = {}
source_map[None] = None
source_index_map = []
for (i, index) in enumerate(source_permut):
source_map[index] = i + 1
source_index_map.append((i + 1, index))
dest_map = {}
dest_map[None] = None
dest_index_map = []
for (i, index) in enumerate(dest_permut):
dest_map[index] = i + 1
dest_index_map.append((i + 1, index))
source_ct = reorder_st(source_ct, source_index_map)
dest_ct = reorder_st(dest_ct, dest_index_map)
input_file = os.path.join(TEMP_DIR, "%s.mae" % uuid.uuid4())
writer = structure.StructureWriter(input_file)
for ct in [source_ct, dest_ct]:
writer.append(ct)
writer.close()
perm_data = get_atom_mapping_data(input_file, atomtype=atomtype)
(perm_atom_map, perm_core_h_map) = get_atom_mapping(perm_data)
atom_map = []
for (atom_s, atom_d) in perm_atom_map:
atom_map.append((source_map[atom_s], dest_map[atom_d]))
return atom_map
def __raw_fepsubst(mol, noncore, cor, non):
"""
Assigns raw fep_subst code to the given molecule. We assign only the following values: 1, 2, 97, 98, and 99, and here we
ignore the other forms of values (i.e., 197, 297, 198, 298, 89, 79, etc.).
:type mol: `structure.Structure`
:param mol: The molecule that we will set the "i_fep_subst" atom property
:type noncore: `set` of `int`
:param noncore: All noncore atoms' indices
:type cor: `list` of `int`
:param cor: All core atoms' indices
:type non: `list` of `int`
:param non: Noncore-but-matched atoms' indices
"""
for e in mol.atom:
e.property[constants.FEP_SUBST] = 1
cor = set(cor)
non = set(non)
for i in noncore:
a = mol.atom[i]
cor_neighbor = set([int(e) for e in a.bonded_atoms]) & cor
if (i in non):
a.property[constants.FEP_SUBST] = 97
else:
a.property[constants.FEP_SUBST] = 99 if (len(cor_neighbor) >
0) else 2
for j in cor_neighbor:
mol.atom[j].property[constants.FEP_SUBST] = 98
def __is_dummy(atom):
"""
Returns true if this atom can become a dummy atom in an alchemical FEP mutation, or false if not.
"""
fep_subst = atom.property[constants.FEP_SUBST]
return (2 == fep_subst or 99 == fep_subst % 100)
def __fix_97atoms(mol):
"""
We loop through all <97> atoms and check if a <97> atom is connected to more than 1 physical-physical hybrid atoms. If so,
we leave the atom's fep_subst code to be 97, otherwise we change it to 2 (so it becomes a physical-dummy hybrid atom).
We repeat this process until there is no change of the fep_subst code.
"""
should_loop = True
while (should_loop):
should_loop = False
for a in mol.atom:
if (a.property[constants.FEP_SUBST] == 97):
nondummy = [b for b in a.bonded_atoms if (not __is_dummy(b))]
if (len(nondummy) == 1):
a.property[constants.FEP_SUBST] = 99 if (
nondummy[0].property[constants.FEP_SUBST] == 98) else 2
should_loop = True
def __fix_99atoms(atom98, code98, atom99_set):
"""
This function reassigns the 'fep_subst' code for <99> atoms that are bonded to the given <98> atom. Here I use the
<99> nomenclature to mean 99, 199, 299, and so on. -YW
This reassignment is for two purposes:
1. Ensure the code consistency between the two molecules.
2. Adjust some 99 codes to be 89 or 79 or 69.
This reassignment has to be done after that we obtain an overall knowledge of which atoms are <98>, <99>, and <97>.
The assigned 'fep_subst' code is based the given <98> code: if it's 98, <99> will be 99 (or 89, 79, etc.); if it's
198, <99> will be 199 (or 189, 179, etc.), and so on.
:type atom98: `Structure.Atom`
:param atom98: The <98> atom which we will reassign the <99> and <97> atoms that are bonded to
:type code98: `int`
:param code98: The <98> fep_subst code. The value will be one of 98, 198, 298, ... and so on.
:type atom99_set: `set` of `Structure.Atom` objects
:param atom99_set: A set that we use to keep track of the <99> atoms that have already been reassigned. For each atom,
we reassign at most once.
"""
code99 = code98 + 1
atom99 = [
e for e in atom98.bonded_atoms
if (e.property[constants.FEP_SUBST] % 100 == 99)
]
atom99.sort(key=lambda x: x.index)
for e in atom99:
if (e not in atom99_set):
e.property[constants.FEP_SUBST] = code99
atom99_set.add(e)
code99 -= 10
[docs]def get_subst_from_mapping(mol0, mol1):
"""
Given two CTs: `ct0` and `ct1`, with `i_fep_mapping` atom properties properly set for all atoms of `ct1`, sets
the `i_fep_subst` atom properties for both CTs.
"""
# Gets core and non-core matchings.
cor0, cor1 = [], [] # Matched core atoms. List of `int's.
non0, non1 = [], [] # Matched noncore atoms. List of `int's.
for a1 in mol1.atom:
i = a1.property[constants.FEP_MAPPING]
if (i == 0):
continue
a0 = mol0.atom[i]
if (a0.element == a1.element and a0.bond_total == a1.bond_total):
cor0.append(int(a0))
cor1.append(int(a1))
else:
non0.append(int(a0))
non1.append(int(a1))
# Gets core and non-core atoms. Note that some non-core atoms (i.e., the <97> atoms) are matched.
core0 = set(cor0)
core1 = set(cor1)
c1_to_c0 = {c1: c0 for (c0, c1) in zip(cor0, cor1)}
noncore0 = set(range(1, mol0.atom_total + 1))
noncore1 = set(range(1, mol1.atom_total + 1))
noncore0 -= core0
noncore1 -= core1
__raw_fepsubst(mol0, noncore0, cor0, non0)
__raw_fepsubst(mol1, noncore1, cor1, non1)
__fix_97atoms(mol0)
__fix_97atoms(mol1)
atom98 = [(
e.property[constants.FEP_SUBST],
int(e),
) for e in mol1.atom if (e.property[constants.FEP_SUBST] == 98)]
code98 = 98
atom99_set = set()
for junk, i in atom98:
j = c1_to_c0[i]
a1 = mol1.atom[i]
a0 = mol0.atom[j]
a1.property[constants.FEP_SUBST] = code98
a0.property[constants.FEP_SUBST] = code98
__fix_99atoms(a0, code98, atom99_set)
__fix_99atoms(a1, code98, atom99_set)
code98 += 100
# DESMOND-3621
atom97 = [
int(e) for e in mol1.atom if (e.property[constants.FEP_SUBST] == 97)
]
code97 = 97
for i in atom97:
a1 = mol1.atom[i]
j = a1.property[constants.FEP_MAPPING]
a0 = mol0.atom[j]
a1.property[constants.FEP_SUBST] = code97
a0.property[constants.FEP_SUBST] = code97
code97 += 100
# Assigns the i_fep_mapping code.
complete_a1_to_a0 = {a1: a0 for (a0, a1) in zip(cor0 + non0, cor1 + non1)}
for a in mol1.atom:
a.property[constants.FEP_MAPPING] = complete_a1_to_a0[int(a)] if (
a.property[constants.FEP_SUBST] % 100 in [
97,
98,
1,
]) else 0
if __name__ == '__main__':
usage = "$SCHRODINGER/run desmond_fep_mapping.py input.mae "
parser = OptionParser(usage)
parser.add_option(
"-a",
"--atomtype",
type='int',
action="store",
dest="atomtype",
default=DEFAULT_ATOM_TYPE,
help="""
Atom Typing Schemes
-------------------
1 - All atoms equivalent; all bonds equivalent.
2 - Atoms distinguished by HB acceptor/donor; all bonds equivalent.
3 - Atoms distinguished by hybridization state; all bonds equivalent.
4 - Atoms distinguished by functional type: {H}, {C}, {F,Cl}, {Br,I}, {N,0},
{S}, {other}; bonds by hybridization.
5 - Mol2 atom types; all bonds equivalent.
6 - Atoms distinguished by whether terminal, halogen, HB acceptor/donor;
bonds distinguished by bond order.
7 - Atomic number and bond order.
8 - Atoms distinguished by ring size, aromaticity, HB acceptor/donor,
ionization potential, whether terminal, whether halogen; bonds
distinguished by bond order.
9 - Carhart atom types (atom-pairs approach); all bonds equivalent.
10 - Daylight invariant atom types; bonds distinguished by bond order.
11 - Same as 7, but distinguishing aromatic from non-aromatic.
12 - Same as 11, but distinguishing aliphatic atoms by ring/acyclic.
13 - Same as 12, but distinguishing rings by size.""")
parser.add_option(
"-o",
"--output",
type='string',
action="store",
dest="output",
default=None,
help="Writing out new structure file")
parser.add_option(
"-s",
"--superimpose",
action="store_true",
dest="superimpose",
default=False,
help="Superimpose based on the core atoms")
parser.add_option(
"-r",
"--renumber",
action="store_true",
dest="renumber",
default=False,
help="Renumber atom index")
parser.add_option(
"-f",
"--fepio",
action="store_true",
dest="fepio",
default=False,
help="print out fepio mapping")
parser.add_option(
"-p",
"--permute",
action="store",
dest="permute",
default=0,
type="int",
help="generate atom mapping by randomly permute atom order")
opt, args = parser.parse_args(sys.argv[1:])
if len(args) != 1:
print(usage)
sys.exit(1)
input_file = args[0]
logger.info("Running RGA with atomtype (%d) ..." % opt.atomtype)
data = get_atom_mapping_data(input_file, atomtype=opt.atomtype)
(atom_map, core_h_map) = get_atom_mapping(data)
logger.info("atom mapping in %s" % input_file)
logger.info("bridge atoms: %s" % get_bridge_atoms(data))
logger.info("mapping: %s" % atom_map)
logger.info("core hydrogen map: %s" % core_h_map)
write_fepsubst_to_file(data, "%s.mae" % input_file)
def get_atom_pair(source_index, atom_map):
'''
return atom mapping pair given soucrce_index
'''
for (s, d) in atom_map:
if s == source_index:
return (s, d)
for i in range(opt.permute):
permute_atommap = random_shuffle(data, opt.atomtype)
if len(atom_map) != len(permute_atommap):
raise RuntimeError(
"permuted atommap has a different length, permutation %d" % i)
else:
if set(atom_map) != set(permute_atommap):
#check equivalent hydrogen
in_org = set(atom_map).difference(set(permute_atommap))
in_permute = set(permute_atommap).difference(set(atom_map))
dest_ct = data.dest_ct.copy()
source_ct = data.source_ct.copy()
for (source_index, dest_index) in in_org:
if source_index is None or dest_index is None:
logger.info("map to dummy (%s, %s)" % (source_index,
dest_index))
continue
d_atom = dest_ct.atom[dest_index]
s_atom = source_ct.atom[source_index]
if d_atom.element != 'H' and s_atom.element != 'H':
raise RuntimeError("heavy atom mapping error (%d, %d)" %
(source_index, dest_index))
d_hydrogen = []
d_core_atoms = [atom for atom in d_atom.bonded_atoms]
for atom in d_core_atoms[0].bonded_atoms:
if atom.element == 'H':
d_hydrogen.append(atom.index)
(s_p, d_p) = get_atom_pair(source_index, in_permute)
logger.info(
"alternative atom mapping: (%s, %s) == (%s, %s)" %
(source_index, dest_index, s_p, d_p))
logger.info("equivalent hydorgens: %s" % d_hydrogen)
if d_p not in d_hydrogen:
raise RuntimeError(
"permuted atommap is different, permutation %s" %
permute_atommap)
pickle.dump(atom_map, open("tmp.map", 'w'))
if opt.fepio:
logger.info("fepio mapping: %s" % (get_fepio_atom_mapping(atom_map)))
if opt.output:
logger.info("Writing %s ..." % opt.output)
writer = structure.StructureWriter(opt.output)
source_ct = data.source_ct.copy()
dest_ct = data.dest_ct.copy()
if opt.superimpose:
logger.info("Superimposing structures ...")
natom = 15 # number of atoms to be used as reference in superimposition.
# @todo: use better algorithm to determine which set of atoms to be used
# as reference for superimposition.
s_core_indices, d_core_indices = data.getCoreAtoms()
superimpose(source_ct, s_core_indices[:natom], dest_ct,
d_core_indices[:natom])
if opt.renumber:
logger.info("Renumbering structures ...")
(new_atom_map, new_bridge_atoms) = reorder_atoms(
data, source_ct, dest_ct)
fepio_mapping = get_fepio_atom_mapping(new_atom_map)
new_atom_map.sort(
) # just sort it and make easy to follow the mapping
logger.info("atom mapping in %s" % opt.output)
logger.info("bridge atoms: %s" % new_bridge_atoms)
logger.info("mapping: %s" % new_atom_map)
if opt.fepio:
logger.info("fepio mapping: %s" % (fepio_mapping))
writer.append(source_ct)
writer.append(dest_ct)
writer.close()