Source code for schrodinger.application.desmond.fep_mapping

import atexit
import gc
import itertools
import os
import pickle
import random
import sys
import tempfile
import uuid
from optparse import OptionParser
from past.utils import old_div

from schrodinger import structure
from schrodinger.application.canvas import r_group
from schrodinger.infra import mm
from schrodinger.structutils import build
from schrodinger.structutils.rmsd import superimpose
from schrodinger.utils import fileutils
from schrodinger.utils import log

from . import constants

logger = log.get_output_logger(name="fep_mapping")

#logger.setLevel(log.DEBUG)


[docs]def cleanup(topdir=None): """ Cleanup function to be run atexit """ #Garbage collect objects in case we hit the cancel button and htere are #any open Structure{Reader,Writer} objects, which will run fh.close() in #the __del__ method. gc.collect() #See if we are running inside of maestro if topdir: fileutils.force_rmtree(topdir)
schrod_tmp = fileutils.get_directory_path(fileutils.TEMP) if not os.path.exists(schrod_tmp): os.makedirs(schrod_tmp) TEMP_DIR = tempfile.mkdtemp(prefix="fep-mapping-tmp", dir=schrod_tmp) atexit.register(cleanup, topdir=TEMP_DIR) FRAGMENT_SIZE_THRESHOLD = 10 HEAVY_ATOM_NUM_THERESHOLD = 7 FEP_MAPPING_ORIGINAL_INDEX = 'i_fep_mapping_origial_index' FEP_MAPPING_BRIDGE = 'i_fep_mapping_bridge' DEFAULT_ATOM_TYPE = 11
[docs]class AtomMappingData: """ Class to store intermediary and final data structures for atom mapping """
[docs] def __init__(self, source_ct, dest_ct): # corresponding matched atoms in two-element tuple. Each element is # consist of a list of atom indices self._core_atoms = ([], []) #List of attachment point tuples. Indexed on attachment point. # [(source_anchor_atom_index, source_bridge_atom_index, # dest_anchor_atom_index, dest_bridge_atom_index) # ] # *anchor_atom_index refers to atom index of core atom that belongs to # specific attachment, which *bridge_atom_index refers to atom index of # non-core atom connected to anchor_atom self._attachment_points = [] self._attached_fragments = [] self.source_ct = source_ct self._initAtomProperty(self.source_ct) self.dest_ct = dest_ct self._initAtomProperty(self.dest_ct)
[docs] def addAttachmentPoint(self, attachment_point): """ Adding attachment point. :type attachment_point: two-element tuple. :param attachment_point: """ if attachment_point not in self._attachment_points: self._attachment_points.append(attachment_point) sa_index, sb_index, da_index, db_index = attachment_point if sb_index: self.source_ct.atom[sb_index].property[FEP_MAPPING_BRIDGE] += 1 if db_index: self.dest_ct.atom[db_index].property[FEP_MAPPING_BRIDGE] += 1
[docs] def getAttachmentPoints(self): return self._attachment_points
[docs] def getBridgeAtoms(self): return [(sb, db) for sa, sb, da, db in self._attachment_points]
[docs] def getAnchorAtoms(self): return [(sa, da) for sa, sb, da, db in self._attachment_points]
[docs] def extendCoreAtoms(self, core_atoms): """ Adding core atoms. Note that, adding new core atoms can resulting smaller attached fragments """ source_core_atoms, dest_core_atoms = core_atoms self._core_atoms[0].extend(source_core_atoms) self._core_atoms[1].extend(dest_core_atoms)
#for i, source, dest in enumerate(zip(source_core_atoms, #dest_core_atoms)): # if source not in self._core_atoms[0] and \ # dest not in self._core_atoms[1]: # self._core_atoms[0].append(source) # self._core_atoms[1].append(dest) #FIXME
[docs] def getCoreAtoms(self): return self._core_atoms
def _initAtomProperty(self, ct): """ Initialized Atom Properties. Recording i_fep_mapping_origial_index. Current implementation of MCS has one limitation in which all of the atoms belong to MCS must be connected. This will be a problem if we would are trying to map linker molecules like: R1-O-R2 ==> R1-C-R2 The workaround is to extract the attached fragment and call canvasMCS to do another searching. The fragment atoms have different indices in the newly extracted ct and original ct. We have to maintain the mapping. Recording the original index is one way to solve this problem. """ for i, atom in enumerate(ct.atom): atom.property[FEP_MAPPING_ORIGINAL_INDEX] = atom.index atom.property[FEP_MAPPING_BRIDGE] = 0
[docs]def map_core_hydrogen(source_ct, dest_ct, core_map, s_remaining_indices, d_remaining_indices): """ Mapping hydrogen atoms connected to the core. The number of hydrogen atoms connected to the corresponding atoms should be equal except the core atoms which have attachment. """ def find_hydrogen(atom, remaining_indices): hydrogens = [] for bonded_atom in atom.bonded_atoms: if bonded_atom.element == 'H' and bonded_atom.index in remaining_indices: hydrogens.append(bonded_atom.index) return hydrogens hydrogen_map = [] for source_atom_index, dest_atom_index in core_map: source_atom = source_ct.atom[source_atom_index] dest_atom = dest_ct.atom[dest_atom_index] source_hydrogen = find_hydrogen(source_atom, s_remaining_indices) dest_hydrogen = find_hydrogen(dest_atom, d_remaining_indices) hydrogen_map.extend( itertools.zip_longest(source_hydrogen, dest_hydrogen)) return hydrogen_map
[docs]def find_fragment_by_cond(bridge_atom, cond): fragment = [] stack = [] stack.append(bridge_atom) visited = set() while len(stack) > 0: atom = stack.pop() visited.add(atom.index) if cond(atom.index): fragment.append(atom) for bonded in atom.bonded_atoms: if cond(bonded.index) and bonded.index not in visited: stack.append(bonded) return fragment
[docs]def find_noncore_fragment(bridge_atom, core_atom_indices): """ Finding non-core fragment from bridged :type bridge_atom: _StructureAtom :param bridge_atom: atom bonded to the attachment core atom. :type core_atoms: set of int :param core_atoms: a set of core atom indices. :return a list of atoms. """ def is_noncore_atom(index): return index not in core_atom_indices return find_fragment_by_cond(bridge_atom, is_noncore_atom)
[docs]def find_noncore_subfragment(bridge_atom, core_atom_indices, frag_atom_indices): """ Finding non-core fragment from bridged :type bridge_atom: _StructureAtom :param bridge_atom: atom bonded to the attachment core atom. :type core_atom_indices: set of int :param core_atom_indices: a set of core atom indices. :return a list of atoms. """ def is_noncore_subfragment_atom(index): return index not in core_atom_indices and index in frag_atom_indices return find_fragment_by_cond(bridge_atom, is_noncore_subfragment_atom)
[docs]def find_all_fragment_pairs(source_ct, dest_ct, attachment_points, core_atom_indices): fragment_pairs = [] source_core_atom_indices, dest_core_atom_indices = core_atom_indices source_frag_atom_visited = set([None]) dest_frag_atom_visited = set([None]) for fsa_index, fsb_index, fda_index, fdb_index in attachment_points: fsb = None if fsb_index: fsb = source_ct.atom[fsb_index] else: # handle trivial source fragment if fdb_index not in dest_frag_atom_visited: df = find_noncore_fragment(dest_ct.atom[fdb_index], dest_core_atom_indices) fragment_pairs.append(([], df)) dest_frag_atom_visited.update([atom.index for atom in df]) fdb = None if fdb_index: fdb = dest_ct.atom[fdb_index] else: # handle trivial dest fragment if fsb_index not in source_frag_atom_visited: sf = find_noncore_fragment(source_ct.atom[fsb_index], source_core_atom_indices) fragment_pairs.append((sf, [])) source_frag_atom_visited.update([atom.index for atom in sf]) if fsb_index not in source_frag_atom_visited and \ fdb_index not in dest_frag_atom_visited: # sf = find_noncore_fragment(fsb, source_core_atom_indices) df = find_noncore_fragment(fdb, dest_core_atom_indices) fragment_pairs.append((sf, df)) source_frag_atom_visited.update([atom.index for atom in sf]) dest_frag_atom_visited.update([atom.index for atom in df]) return fragment_pairs
[docs]def get_atom_mapping(data): """ mapping atoms based on the result from RGA :type data: AtomMappingData :param data: intermediary and finals data structures from RGA """ atom_map = [] # mapping core atoms core_atom_indices = data.getCoreAtoms() source_core_atoms = core_atom_indices[0] dest_core_atoms = core_atom_indices[1] core_map = [(source_atom, dest_atom) for source_atom, dest_atom in \ zip(source_core_atoms, dest_core_atoms)] atom_map.extend(core_map) #print "core map", core_map # mapping non-core atoms # attachment_points = data.getAttachmentPoints() # @todo: refactor this data structure to a class s2d_bridge_atom = {} d2s_bridge_atom = {} for sa_index, sb_index, da_index, db_index in attachment_points: s2d_bridge_atom[sb_index] = db_index d2s_bridge_atom[db_index] = sb_index # matching bridge atoms s_subfrag_indices = [] d_subfrag_indices = [] s_bridge_indices_visited = set([None]) d_bridge_indices_visited = set([None]) bridge_pair_visited = set() tmp_list = [] for x in data.source_ct.find_rings(): tmp_list.extend(x) source_ring_atoms = set(tmp_list) tmp_list = [] for x in data.dest_ct.find_rings(): tmp_list.extend(x) dest_ring_atoms = set(tmp_list) #print attachment_points for sa_index, sb_index, da_index, db_index in attachment_points: if (sb_index, db_index) in bridge_pair_visited: continue bridge_pair_visited.add((sb_index, db_index)) s_core_atom_indices = set(core_atom_indices[0]) s_natom = data.source_ct.atom_total s_frag_atom_indices = set(range(1, s_natom + 1)) - s_core_atom_indices s_found_frag_indices = [] if sb_index: s_found_frag = find_noncore_subfragment( data.source_ct.atom[sb_index], s_core_atom_indices, s_frag_atom_indices) s_found_frag_indices = [atom.index for atom in s_found_frag] s_subfrag_indices.extend(s_found_frag_indices) d_core_atom_indices = set(core_atom_indices[1]) d_natom = data.dest_ct.atom_total d_frag_atom_indices = set(range(1, d_natom + 1)) - d_core_atom_indices d_found_frag_indices = [] if db_index: d_found_frag = find_noncore_subfragment(data.dest_ct.atom[db_index], d_core_atom_indices, d_frag_atom_indices) d_found_frag_indices = [atom.index for atom in d_found_frag] d_subfrag_indices.extend(d_found_frag_indices) # in case like R1-C-S-R2 ==> R1-S-R2, the S atom in the source ct # can be both core atom and bridge atom. In this case, we should # skip mapping it. Another mapping will be map C-S-R2 to dummy and # dummy to S-R2. We may need more logic here to do a better mapping if sb_index in s_core_atom_indices or db_index in d_core_atom_indices: if sb_index in s_core_atom_indices and db_index in d_core_atom_indices: raise RuntimeError( "core atom pair (%d, %d) can not be attachment point" % (sb_index, db_index)) elif sb_index in s_core_atom_indices: atom_map.append((None, db_index)) elif db_index in d_core_atom_indices: atom_map.append((sb_index, None)) else: # @FIXME: may want to check element type etc to determine if it is # a reasonable mapping between bridge atoms if sb_index not in s_bridge_indices_visited and \ db_index not in d_bridge_indices_visited: #atom_map.append((sb_index, db_index)) # Handle ring open and close case if is_ring_open_or_closed(s_found_frag_indices, d_found_frag_indices, s2d_bridge_atom, d2s_bridge_atom): #print "handling ring open/closed mapping" atom_map.append((sb_index, None)) atom_map.append((None, db_index)) elif data.source_ct.atom[sb_index].property[FEP_MAPPING_BRIDGE] > 1 or \ data.dest_ct.atom[db_index].property[FEP_MAPPING_BRIDGE] > 1: atom_map.append((sb_index, db_index)) # if sb_index in source_ring_atoms and db_index in dest_ring_atoms: # atom_map.append((sb_index, db_index)) # else: # atom_map.append((sb_index, None)) # atom_map.append((None, db_index)) else: atom_map.append((sb_index, None)) atom_map.append((None, db_index)) #atom_map.append((sb_index, db_index)) elif sb_index in s_bridge_indices_visited: atom_map.append((None, db_index)) elif db_index in d_bridge_indices_visited: atom_map.append((sb_index, None)) else: raise RuntimeError("Unknown error.") s_bridge_indices_visited.add(sb_index) d_bridge_indices_visited.add(db_index) s_subfrag_indices = set(s_subfrag_indices) # remove bridge atoms s_subfrag_indices.difference_update(s_bridge_indices_visited) d_subfrag_indices = set(d_subfrag_indices) # remove bridge atoms d_subfrag_indices.difference_update(d_bridge_indices_visited) atom_map.extend([(index, None) for index in s_subfrag_indices]) atom_map.extend([(None, index) for index in d_subfrag_indices]) # mapping remaining hydrogens attached to core s_mapped_indices = set() d_mapped_indices = set() #print "heavy atom map: ", atom_map for si, di in atom_map: # si or di could be None because the fragments have been mapped if si: s_mapped_indices.add(si) if di: d_mapped_indices.add(di) s_natom = data.source_ct.atom_total s_remaining_indices = set(range(1, s_natom + 1)) - s_mapped_indices d_natom = data.dest_ct.atom_total d_remaining_indices = set(range(1, d_natom + 1)) - d_mapped_indices # If we have other heavy atoms left at this point, something is screwed up for index in s_remaining_indices: atom = data.source_ct.atom[index] if atom.element != 'H': raise RuntimeError("Unexpected heavy atom (%s:%d)" % (atom.element, index)) for index in d_remaining_indices: atom = data.dest_ct.atom[index] if atom.element != 'H': raise RuntimeError("Unexpected heavy atom (%s:%d)" % (atom.element, index)) core_hydrogen_map = map_core_hydrogen(data.source_ct, data.dest_ct, core_map, s_remaining_indices, d_remaining_indices) atom_map.extend(core_hydrogen_map) atom_map = unique_list(atom_map) if not check_mapping(atom_map, data.source_ct.atom_total, data.dest_ct.atom_total): raise RuntimeError( "get_atom_mapping generates corrupted mapping: %s\n" % atom_map) return (atom_map, core_hydrogen_map)
[docs]def get_fepio_atom_mapping(atom_mapping): """ converting to atom mapping format required by mmfepio library. Below is an example: 1 1 2 2 3 -1 4 -1 5 3 -6 4 -7 5 """ fep_mapping = [] source_dummy_index = 0 s_natom = len( [source for source, dest in atom_mapping if source is not None]) s_dummy_begin_index = -(s_natom + 1) for s_index, d_index in atom_mapping: if s_index is None: s_index = s_dummy_begin_index s_dummy_begin_index -= 1 if d_index is None: d_index = -1 fep_mapping.append((s_index, d_index)) return fep_mapping
[docs]def fragment_size_cond(frag1, frag2, threshold=FRAGMENT_SIZE_THRESHOLD): if len(frag1) > threshold and len(frag2) > threshold: return True return False
[docs]def heavy_atom_num_cond(frag1, frag2, threshold=HEAVY_ATOM_NUM_THERESHOLD): def count_heavy_atom(frag): return len([atom for atom in frag if atom.element != 'H']) if count_heavy_atom(frag1) > threshold and \ count_heavy_atom(frag2) > threshold: return True return False
[docs]def get_heavy_fragment(frag): return [atom.index for atom in frag if atom.element != 'H']
[docs]def find_fragment_MCS(data, cond=heavy_atom_num_cond, atomtype=DEFAULT_ATOM_TYPE): """ Finding MCS in fragment if the fragment pair satisfied specified conditions. @FIXME: need to convert it to a recursive algorithm """ frag_pair_stack = [] core_atoms = data.getCoreAtoms() core_atom_indices = list(map(set, core_atoms)) attachment_points = data.getAttachmentPoints() fragment_pairs = find_all_fragment_pairs( data.source_ct, data.dest_ct, attachment_points, core_atom_indices) frag_pair_stack.extend(fragment_pairs) ap_visited = set(attachment_points) # n_files = 0 while len(frag_pair_stack) > 0: source_frag, dest_frag = frag_pair_stack.pop() if cond(source_frag, dest_frag): # Extract fragments source_frag_indices = [atom.index for atom in source_frag] source_frag_ct = data.source_ct.extract( source_frag_indices, copy_props=True) dest_frag_indices = [atom.index for atom in dest_frag] dest_frag_ct = data.dest_ct.extract( dest_frag_indices, copy_props=True) #print "source_frag: ",[atom.index for atom in source_frag_ct.atom \ #if atom.property[FEP_MAPPING_BRIDGE] > 0] #print "dest_frag: ", [atom.index for atom in dest_frag_ct.atom \ #if atom.property[FEP_MAPPING_BRIDGE] > 0] #print "source_frag: ",[atom.index for atom in source_frag_ct.atom ] #print "dest_frag: ", [atom.index for atom in dest_frag_ct.atom ] # @todo: We need to use different approaches for different types # of fragment. For instance, for medium and large size fragment, # we call RGA again. For smaller fragment, we need different # algorithm. For instance, if one fragment is ring fragment and the # other is non-ring fragment, we can consider break the ring and do # a search # Run RGA # FIXME: move TEMP_DIR to setting class input_file = os.path.join(TEMP_DIR, "%s.mae" % uuid.uuid4()) writer = structure.StructureWriter(input_file) for ct in [source_frag_ct, dest_frag_ct]: writer.append(ct) # shutil.copyfile(input_file, "linker_rec%d.mae" % n_files) # n_files += 1 writer.close() try: frag_data = run_rga(input_file, atomtype=atomtype) logger.error("Warning: recursive call to rga:\n ") except RuntimeError as e: logger.error( "Warning: fail to run RGA when trying to match:\n ") logger.error("%s ==> %s" % (get_heavy_fragment(source_frag), get_heavy_fragment(dest_frag))) logger.error(e) # for now, we just skip the matching between these two fragments. # in other words, the fragment atoms will be mapped to dummy. continue if not frag_data: continue # Map atom index in fragment RGA to original atom index fs_map = [atom.property[FEP_MAPPING_ORIGINAL_INDEX] \ for atom in source_frag_ct.atom] fd_map = [atom.property[FEP_MAPPING_ORIGINAL_INDEX] \ for atom in dest_frag_ct.atom] frag_attachment_points = frag_data.getAttachmentPoints() if len(frag_attachment_points) > 0: frag_source_core_atoms, frag_dest_core_atoms = frag_data.getCoreAtoms( ) new_source_core_atoms = [ fs_map[atid - 1] for atid in frag_source_core_atoms ] new_destcore_atoms = [ fd_map[atid - 1] for atid in frag_dest_core_atoms ] data.extendCoreAtoms((new_source_core_atoms, new_destcore_atoms)) s_core_atom_indices, d_core_atom_indices = list( map(set, data.getCoreAtoms())) fs_atom_indices = [atom.index for atom in source_frag] fd_atom_indices = [atom.index for atom in dest_frag] for fs_anchor_index, fs_bridge_index, \ fd_anchor_index, fd_bridge_index \ in frag_data.getAttachmentPoints(): fsa_index = None fsb_index = None fda_index = None fdb_index = None if fs_anchor_index: fsa_index = fs_map[fs_anchor_index - 1] if fs_bridge_index: fsb_index = fs_map[fs_bridge_index - 1] if fd_anchor_index: fda_index = fd_map[fd_anchor_index - 1] if fd_bridge_index: fdb_index = fd_map[fd_bridge_index - 1] new_ap = (fsa_index, fsb_index, fda_index, fdb_index) if new_ap not in ap_visited: ap_visited.add(new_ap) data.addAttachmentPoint(new_ap) fsb = None fdb = None if fsb_index and fdb_index: fsb = data.source_ct.atom[fsb_index] fdb = data.dest_ct.atom[fdb_index] s_subfrag = find_noncore_subfragment( fsb, s_core_atom_indices, fs_atom_indices) d_subfrag = find_noncore_subfragment( fdb, d_core_atom_indices, fd_atom_indices) frag_pair_stack.append((s_subfrag, d_subfrag)) elif fsb_index: fsb = data.source_ct.atom[fsb_index] s_subfrag = find_noncore_subfragment( fsb, s_core_atom_indices, fs_atom_indices) frag_pair_stack.append((s_subfrag, [])) elif fdb_index: fdb = data.dest_ct.atom[fdb_index] d_subfrag = find_noncore_subfragment( fdb, d_core_atom_indices, fd_atom_indices) frag_pair_stack.append(([], d_subfrag)) else: logger.info( "All bridge atoms in fragment pairs are dummy")
[docs]def convert_rga_data(rga_data, input_file): """ Converting RGA internal data to AtomMappingData object :type rga_data: r_group_analysis.Data :param rga_data: :type input_file: string :param input_file: input file """ # read cts from input file based on offsets from data object with structure.MaestroReader(input_file) as reader: cts = [] for position in rga_data.get_ct_offsets(): cts.append(reader.read(position=position)) if len(cts) != 2: raise RuntimeError("The number of cts is not equal to 2") source_ct = cts[0] dest_ct = cts[1] source_core_atoms = rga_data.core_atoms[0][0] dest_core_atoms = rga_data.core_atoms[1][0] attachment_points = rga_data.get_attachment_points() source_attachments = attachment_points[0] dest_attachments = attachment_points[1] mapping_data = convert_data(source_ct, dest_ct, source_core_atoms, dest_core_atoms, source_attachments, dest_attachments) return mapping_data
[docs]def convert_data(source_ct, dest_ct, source_core_atoms, dest_core_atoms, source_attachments, dest_attachments): """ source_ct: dest_ct: source_core_atoms: list of source core atoms dest_core_atoms: list of dest core atoms source_attachments: list of source attachment points dest_attachments: list of dest attachment points """ mapping_data = AtomMappingData(source_ct, dest_ct) # adding core atoms mapping_data.extendCoreAtoms((source_core_atoms, dest_core_atoms)) print("core atoms", [(source_core_atoms[x], dest_core_atoms[x]) for x in range(0, len(source_core_atoms))]) # adding attachment point source_core_atoms_set = set(source_core_atoms) dest_core_atoms_set = set(dest_core_atoms) def dot_product(a, b): return sum(x * y for (x, y) in zip(a, b)) # print "source att: ", [ (bridge.index, anchor.index) for (bridge, anchor, order) in source_attachments] # print "dest att: ", [ (bridge.index, anchor.index) for (bridge, anchor, order) in dest_attachments] # find equivalent attachment point index = 0 source_switched = set([]) dest_switched = set([]) for source_attachment, dest_attachment in zip(source_attachments, dest_attachments): source_anchor, source_bridge, source_order = source_attachment dest_anchor, dest_bridge, dest_order = dest_attachment if not source_bridge or \ not dest_bridge: continue source_diff = [ source_bridge.x - source_anchor.x, source_bridge.y - source_anchor.y, source_bridge.z - source_anchor.z ] dest_diff = [ dest_bridge.x - dest_anchor.x, dest_bridge.y - dest_anchor.y, dest_bridge.z - dest_anchor.z ] dot = dot_product(source_diff, dest_diff) if (dest_bridge, dest_anchor) in dest_switched: continue for sa_bonded in source_anchor.bonded_atoms: #test bond vector dot1 = dot_product([ sa_bonded.x - source_anchor.x, sa_bonded.y - source_anchor.y, sa_bonded.z - source_anchor.z ], dest_diff) # and source_bridge.element == "H" \ if dot1 > 0 and dot1 > dot and sa_bonded.element == source_bridge.element \ and sa_bonded.index not in source_core_atoms_set and (source_bridge, source_anchor) not in source_switched: switched = False #find another attchment point for i in range(len(source_attachments)): if (source_attachments[i][0], source_attachments[i][1]) == (source_anchor, sa_bonded): #switch logger.debug( "switch with another attachment point %i to %i" % (source_bridge.index, sa_bonded.index)) source_attachments[i][1] = source_attachments[index][1] source_attachments[index][1] = sa_bonded switched = True # logger.debug("source att: %" % [ (bridge.index, anchor.index) for (bridge, anchor, order) in source_attachments]) source_switched.add((source_bridge, source_anchor)) break if not switched: #alternative core atom for anchor logger.debug("source switch %i to %i, %s to %s" % (source_bridge.index, sa_bonded.index, source_bridge.element, sa_bonded.element)) source_attachments[index][1] = sa_bonded if (source_bridge, source_anchor) in source_switched: continue for da_bonded in dest_anchor.bonded_atoms: #test bond vector dot1 = dot_product([ da_bonded.x - dest_anchor.x, da_bonded.y - dest_anchor.y, da_bonded.z - dest_anchor.z ], source_diff) # and dest_bridge.element == "H" \ if dot1 > 0 and dot1 > dot and da_bonded.element == dest_bridge.element \ and da_bonded.index not in dest_core_atoms_set and (dest_bridge, dest_anchor) not in dest_switched: switched = False #find another attchment point for i in range(len(dest_attachments)): if (dest_attachments[i][0], dest_attachments[i][1]) == (dest_anchor, da_bonded): #switch logger.debug( "switch with another attachment point %i to %i" % (dest_bridge.index, da_bonded.index)) dest_attachments[i][1] = dest_attachments[index][1] dest_attachments[index][1] = da_bonded switched = True # logger.debug("dest att: %" % [ (bridge.index, anchor.index) for (bridge, anchor, order) in dest_attachments]) dest_switched.add((dest_bridge, dest_anchor)) break if not switched: #alternative core atom for anchor logger.debug("dest switch %i to %i, %s to %s" % (dest_bridge.index, da_bonded.index, dest_bridge.element, da_bonded.element)) dest_attachments[index][1] = da_bonded index += 1 for source_attachment, dest_attachment in zip(source_attachments, dest_attachments): source_anchor, source_bridge, source_order = source_attachment dest_anchor, dest_bridge, dest_order = dest_attachment if source_anchor.index > source_ct.atom_total: continue if source_bridge: if source_bridge.index > source_ct.atom_total: logger.warning("skipping attachment point: (%d, %d, %d, %d)" % (source_anchor.index, source_bridge.index, dest_anchor.index, dest_bridge.index)) return None # In case we extract fragment, RGA may add hydrogen to the extracted # fragment, in that case, we may get an out of index error. We probably # can find a way to map in back. Just ignore this kind of case for now. if dest_anchor.index > dest_ct.atom_total: continue if dest_bridge: if dest_bridge.index > dest_ct.atom_total: logger.warning("skipping attachment point: (%d, %d, %d, %d)" % (source_anchor.index, source_bridge.index, dest_anchor.index, dest_bridge.index)) return None source_b_index = None if source_bridge: source_b_index = source_bridge.index dest_b_index = None if dest_bridge: dest_b_index = dest_bridge.index mapping_data.addAttachmentPoint((source_anchor.index, source_b_index, dest_anchor.index, dest_b_index)) return mapping_data
[docs]def get_atom_mapping_data(input_file, atomtype=DEFAULT_ATOM_TYPE): data = run_rga(input_file, atomtype=atomtype) find_fragment_MCS(data, atomtype=atomtype) return data
[docs]def run_rga(input_file, atomtype=11): if not os.path.exists(input_file): raise IOError("%s does not exist" % input_file) rga_data, message = r_group.get_rgroup_MCS(input_file, atomtype) if rga_data is None: raise RuntimeError(message) data = convert_rga_data(rga_data, input_file) return data
[docs]def unique_list(seq, idfun=None): """ A fast function to remove duplicate element from sequence by preserving the order at the same time. """ # order preserving if idfun is None: def idfun(x): return x seen = {} result = [] for item in seq: marker = idfun(item) if marker in seen: continue seen[marker] = 1 result.append(item) return result
[docs]def get_atom_marking(data, output_file): pass
[docs]def get_bridge_atoms(data): bridge_atoms = [(sb, db) for sa, sb, da, db in data.getAttachmentPoints()] return list(set(bridge_atoms))
[docs]def check_mapping(data, n1, n2): a1_set = set() a2_set = set() for a1, a2 in data: if a1: if a1 in a1_set: logger.warning( "first element (%d) shows up more than once in %s" % (a1, data)) return False a1_set.add(a1) if a2: if a2 in a2_set: logger.warning( "second element (%d) shows up more than once in %s" % (a2, data)) return False a2_set.add(a2) if a1_set != set(range(1, n1 + 1)) or a2_set != set(range(1, n2 + 1)): logger.warning("missing indices") return False return True
[docs]def reorder_st(st, index_map): """ :type ct: Structure :param ct: Structure object :type index_map: list of tuple :param index_map: index mapping between old and new index [(new_index, old_index), ...] """ new_to_old_map = [0] * (len(st.atom) + 1) for old_index, new_index in index_map: new_to_old_map[new_index] = old_index new_st = st.copy() # The ordering array must have natom + 1 elements for the C code mm.mmct_ct_reorder(new_st, st, new_to_old_map) return new_st
[docs]def is_ring_open_or_closed(s_frag_indices, d_frag_indices, s2d_bridge_atom, d2s_bridge_atom): s_brdige_atoms = set([index for index in s_frag_indices \ if index in s2d_bridge_atom]) d_brdige_atoms = set([index for index in d_frag_indices \ if index in d2s_bridge_atom]) d_mapped_bridge_atoms = set( [s2d_bridge_atom[index] for index in s_brdige_atoms]) s_mapped_bridge_atoms = set( [d2s_bridge_atom[index] for index in d_brdige_atoms]) return s_brdige_atoms != s_mapped_bridge_atoms or \ d_brdige_atoms != d_mapped_bridge_atoms
[docs]def write_fepsubst_to_file(data, filename, overwrite=True): #bridge atoms marked by 99, 199, 299, etc incrementing with fragment number #linker/ring atoms markde by 97, 197, 297, etc incrementing with fragment number source_ct = data.source_ct.copy() dest_ct = data.dest_ct.copy() attachment_points = data.getAttachmentPoints() # sort the attachment points so that the subst-codes generated are the same attachment_points.sort(key=lambda tup: tup[0]) bridge_atoms = data.getBridgeAtoms() core_atoms = data.getCoreAtoms() logger.debug("att points: %s" % attachment_points) (atom_mapping, core_h_map) = get_atom_mapping(data) core_h_map.sort(key=lambda tup: tup[0]) tmp_list = [] for x in source_ct.find_rings(): tmp_list.extend(x) source_ring_atoms = set(tmp_list) tmp_list = [] for x in dest_ct.find_rings(): tmp_list.extend(x) dest_ring_atoms = set(tmp_list) source_ct.property[constants.FEP_FRAGNAME] = "none" dest_ct.property[constants.FEP_FRAGNAME] = "product" for atom in source_ct.atom: atom.property[constants.FEP_SUBST] = 1 for atom in dest_ct.atom: atom.property[constants.FEP_SUBST] = 1 for atom in dest_ct.atom: atom.property[constants.FEP_MAPPING] = 0 for (s, d) in zip(core_atoms[0], core_atoms[1]): dest_ct.atom[d].property[constants.FEP_MAPPING] = s h_offset = 0 for (hs, hd) in core_h_map: logger.debug("core hydrogen: %s %s" % (hs, hd)) if hd and hs: dest_ct.atom[hd].property[constants.FEP_MAPPING] = hs elif hd: dest_ct.atom[hd].property[constants.FEP_MAPPING] = 0 d_anchor = [x for x in dest_ct.atom[hd].bonded_atoms] s_anchor = 0 s_core_atoms, d_core_atoms = core_atoms for (i, d_core) in enumerate(d_core_atoms): if d_core == d_anchor[0].index: s_anchor = s_core_atoms[i] break source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + h_offset dest_ct.atom[d_anchor[0].index].property[ constants.FEP_SUBST] = 98 + h_offset dest_ct.atom[hd].property[constants.FEP_SUBST] = 99 + h_offset h_offset += 100 elif hs: # dest_ct.atom[hd].property[constants.FEP_MAPPING] = 0 s_anchor = [x for x in source_ct.atom[hs].bonded_atoms] d_anchor = 0 s_core_atoms, d_core_atoms = core_atoms for (i, s_core) in enumerate(s_core_atoms): if s_core == s_anchor[0].index: d_anchor = d_core_atoms[i] break source_ct.atom[hs].property[constants.FEP_SUBST] = 99 + h_offset source_ct.atom[s_anchor[0].index].property[ constants.FEP_SUBST] = 98 + h_offset dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + h_offset h_offset += 100 fragment_pairs = find_all_fragment_pairs(source_ct, dest_ct, attachment_points, core_atoms) fragment_pairs = find_all_fragment_pairs(source_ct, dest_ct, attachment_points, core_atoms) i = 0 for (s_frag, d_frag) in fragment_pairs: logger.debug("pair %d" % i) logger.debug([x.index for x in s_frag]) logger.debug([x.index for x in d_frag]) i = i + 1 # group the fragment pairs in terms of anchor atoms def get_fragment_index(fragment_pairs, s_bridge, d_bridge): for (s_frag, d_frag) in fragment_pairs: s_frag_atoms = [] d_frag_atoms = [] for x in s_frag: s_frag_atoms.append(x.index) for x in d_frag: d_frag_atoms.append(x.index) if len(s_frag_atoms) > 0: if s_bridge in s_frag_atoms: return (s_frag_atoms, d_frag_atoms) if len(d_frag_atoms) > 0: if d_bridge in d_frag_atoms: return (s_frag_atoms, d_frag_atoms) raise RuntimeError( "Cannot find bridge atoms in fragments, bridge atoms: %d %d" % (s_bridge, d_bridge)) fragment_groups = {} for (s_anchor, s_bridge, d_anchor, d_bridge) in attachment_points: (s_frag_atoms, d_frag_atoms) = get_fragment_index( fragment_pairs, s_bridge, d_bridge) if (s_anchor, d_anchor) in fragment_groups: (s_frags, d_frags) = fragment_groups[(s_anchor, d_anchor)] s_frags.append(s_frag_atoms) d_frags.append(d_frag_atoms) else: fragment_groups[(s_anchor, d_anchor)] = ([s_frag_atoms], [d_frag_atoms]) frag_offset = h_offset for (s_anchor, s_bridge, d_anchor, d_bridge) in attachment_points: (s_frags, d_frags) = fragment_groups[(s_anchor, d_anchor)] if s_bridge and d_bridge: #map linker/ring bridge atoms to each other if source_ct.atom[s_bridge].property["i_fep_mapping_bridge"] > 1 and dest_ct.atom[d_bridge].property["i_fep_mapping_bridge"] > 1: dest_ct.atom[d_bridge].property[ constants.FEP_MAPPING] = s_bridge if source_ct.atom[s_bridge].property[constants.FEP_SUBST] > 1 and \ dest_ct.atom[d_bridge].property[constants.FEP_SUBST] > 1: #already assigned, must be a linker or ring fragment source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + int( old_div(source_ct.atom[s_bridge] .property[constants.FEP_SUBST], 100)) * 100 dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int( old_div(dest_ct.atom[d_bridge] .property[constants.FEP_SUBST], 100)) * 100 continue elif source_ct.atom[s_bridge].property[constants. FEP_SUBST] == 1 and dest_ct.atom[d_bridge].property[constants. FEP_SUBST] == 1: source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset dest_ct.atom[d_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset else: #ring closure would cause this raise RuntimeError("Invalid fepsubst code for bridge atoms s_anchor:%d d_anchor:%d sb:%d %d db:%d %d" \ % (s_anchor, d_anchor, s_bridge, source_ct.atom[s_bridge].property[constants.FEP_SUBST], d_bridge, dest_ct.atom[d_bridge].property[constants.FEP_SUBST])) elif s_bridge: if source_ct.atom[s_bridge].property[constants.FEP_SUBST] > 1: #already assigned, must be a linker or ring fragment source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + int( old_div(source_ct.atom[s_bridge] .property[constants.FEP_SUBST], 100)) * 100 dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int( old_div(source_ct.atom[s_bridge] .property[constants.FEP_SUBST], 100)) * 100 continue elif source_ct.atom[s_bridge].property[constants.FEP_SUBST] == 1: source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset dest_ct.atom[d_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset elif d_bridge: if dest_ct.atom[d_bridge].property[constants.FEP_SUBST] > 1: #already assigned, must be a linker or ring fragment source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + int( old_div(dest_ct.atom[d_bridge] .property[constants.FEP_SUBST], 100)) * 100 dest_ct.atom[d_anchor].property[constants.FEP_SUBST] = 98 + int( old_div(dest_ct.atom[d_bridge] .property[constants.FEP_SUBST], 100)) * 100 continue elif dest_ct.atom[d_bridge].property[constants.FEP_SUBST] == 1: source_ct.atom[s_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset dest_ct.atom[d_anchor].property[ constants.FEP_SUBST] = 98 + frag_offset else: raise RuntimeError("all bridge atoms are dummy") att_offset = 0 for s_frag in s_frags: s_frag_bg_atoms = [] for x in s_frag: source_ct.atom[x].property[constants.FEP_SUBST] = 2 logger.debug("s_frag bg %s" % x) if source_ct.atom[x].property["i_fep_mapping_bridge"] > 0: s_frag_bg_atoms.append(x) if len(s_frag_bg_atoms) == 1: #single attachment bond if source_ct.atom[s_frag_bg_atoms[0]].property[ "i_fep_mapping_bridge"] == 1: source_ct.atom[s_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 99 + frag_offset - att_offset else: if s_frag_bg_atoms[0] in source_ring_atoms: source_ct.atom[s_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 97 + frag_offset else: source_ct.atom[s_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 97 + frag_offset elif len(s_frag_bg_atoms) > 1: #linker/ring fragment for index in s_frag_bg_atoms: print("source: ", index, source_ring_atoms) if index in source_ring_atoms: source_ct.atom[index].property[ constants.FEP_SUBST] = 97 + frag_offset else: source_ct.atom[index].property[ constants.FEP_SUBST] = 97 + frag_offset att_offset = att_offset + 10 att_offset = 0 for d_frag in d_frags: d_frag_bg_atoms = [] for x in d_frag: dest_ct.atom[x].property[constants.FEP_SUBST] = 2 logger.debug("d_frag bg %s" % x) if dest_ct.atom[x].property["i_fep_mapping_bridge"] > 0: d_frag_bg_atoms.append(x) if len(d_frag_bg_atoms) == 1: #single attachment bond if dest_ct.atom[d_frag_bg_atoms[0]].property[ "i_fep_mapping_bridge"] == 1: dest_ct.atom[d_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 99 + frag_offset - att_offset else: if d_frag_bg_atoms[0] in dest_ring_atoms: dest_ct.atom[d_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 97 + frag_offset else: dest_ct.atom[d_frag_bg_atoms[0]].property[ constants.FEP_SUBST] = 97 + frag_offset elif len(d_frag_bg_atoms) > 1: #linker/ring fragment for index in d_frag_bg_atoms: print("dest: ", index, dest_ring_atoms) if index in dest_ring_atoms: dest_ct.atom[index].property[ constants.FEP_SUBST] = 97 + frag_offset else: dest_ct.atom[index].property[ constants.FEP_SUBST] = 97 + frag_offset att_offset = att_offset + 10 frag_offset = frag_offset + 100 writer = structure.StructureWriter(filename, overwrite=overwrite) for ct in [source_ct, dest_ct]: writer.append(ct) writer.close() return (source_ct, dest_ct)
[docs]def reorder_atoms(data, source_ct, dest_ct): source_index_map = [] dest_index_map = [] (atom_map, core_h_map) = get_atom_mapping(data) # handle mapping physical atoms. for source_index, dest_index in atom_map: if source_index and dest_index: source_index_map.append((len(source_index_map) + 1, source_index)) dest_index_map.append((len(dest_index_map) + 1, dest_index)) # handle mapping between physical atom and dummy atom. for source_index, dest_index in atom_map: if not source_index: dest_index_map.append((len(dest_index_map) + 1, dest_index)) if not dest_index: source_index_map.append((len(source_index_map) + 1, source_index)) source_ct = reorder_st(source_ct, source_index_map) dest_ct = reorder_st(dest_ct, dest_index_map) # print out atom mapping in newly created maestro file source_old_to_new = [None] * (source_ct.atom_total + 1) for new, old in source_index_map: source_old_to_new[old] = new dest_old_to_new = [None] * (dest_ct.atom_total + 1) for new, old in dest_index_map: dest_old_to_new[old] = new new_atom_map = [] for source_index, dest_index in atom_map: if source_index: new_source_index = source_old_to_new[source_index] else: new_source_index = None if dest_index: new_dest_index = dest_old_to_new[dest_index] else: new_dest_index = None new_atom_map.append((new_source_index, new_dest_index)) new_bridge_atoms = [] for source_index, dest_index in get_bridge_atoms(data): new_bridge_atoms.append((source_old_to_new[source_index], dest_old_to_new[dest_index])) return (new_atom_map, new_bridge_atoms)
[docs]def check1_subst_code(source_ct, dest_ct): """ take input cts and check single ring-atom/attachment subst_code, return true if pass """ def find_marked_atoms(ct): atoms_98 = [] atoms_99 = [] atoms_97 = [] for atom in ct.atom: if atom.property[constants.FEP_SUBST] == 98: atoms_98.append(atom) if atom.property[constants.FEP_SUBST] == 97: atoms_97.append(atom) if atom.property[constants.FEP_SUBST] == 99: atoms_99.append(atom) return (atoms_98, atoms_97, atoms_99) (source_98_atoms, source_97_atoms, source_99_atoms) = find_marked_atoms(source_ct) (dest_98_atoms, dest_97_atoms, dest_99_atoms) = find_marked_atoms(dest_ct) # print source_98_atoms, source_97_atoms, source_99_atoms if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0: logger.error("No anchor atoms found") return False if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len( source_97_atoms) == 0 and len(dest_97_atoms) == 0: logger.error("No bridge atoms found") return False if len(source_97_atoms) != len(dest_97_atoms): logger.error( "Number of linker/ring atoms changed not the same in product and reatant" ) return False if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1 ) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1: logger.error("Too many attachment point detected") return False if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1: if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2: logger.error("Not enough connection for ring/linker atom") return False for atoms_98 in source_98_atoms: if atoms_98 not in source_97_atoms[0].bonded_atoms: logger.error( "anchor atom not bonded to ring/linker atom in original molecule" ) return False for atoms_98 in dest_98_atoms: if atoms_98 not in dest_97_atoms[0].bonded_atoms: logger.error( "anchor atom not bonded to ring/linker atom in mutated molecule" ) return False return True
[docs]def random_shuffle(data, atomtype): ''' generate atommap with random shuffled atom order, return atommaps in original order ''' source_ct = data.source_ct.copy() dest_ct = data.dest_ct.copy() source_permut = list(range(1, source_ct.atom_total + 1)) dest_permut = list(range(1, dest_ct.atom_total + 1)) random.shuffle(source_permut) random.shuffle(dest_permut) source_map = {} source_map[None] = None source_index_map = [] for (i, index) in enumerate(source_permut): source_map[index] = i + 1 source_index_map.append((i + 1, index)) dest_map = {} dest_map[None] = None dest_index_map = [] for (i, index) in enumerate(dest_permut): dest_map[index] = i + 1 dest_index_map.append((i + 1, index)) source_ct = reorder_st(source_ct, source_index_map) dest_ct = reorder_st(dest_ct, dest_index_map) input_file = os.path.join(TEMP_DIR, "%s.mae" % uuid.uuid4()) writer = structure.StructureWriter(input_file) for ct in [source_ct, dest_ct]: writer.append(ct) writer.close() perm_data = get_atom_mapping_data(input_file, atomtype=atomtype) (perm_atom_map, perm_core_h_map) = get_atom_mapping(perm_data) atom_map = [] for (atom_s, atom_d) in perm_atom_map: atom_map.append((source_map[atom_s], dest_map[atom_d])) return atom_map
def __raw_fepsubst(mol, noncore, cor, non): """ Assigns raw fep_subst code to the given molecule. We assign only the following values: 1, 2, 97, 98, and 99, and here we ignore the other forms of values (i.e., 197, 297, 198, 298, 89, 79, etc.). :type mol: `structure.Structure` :param mol: The molecule that we will set the "i_fep_subst" atom property :type noncore: `set` of `int` :param noncore: All noncore atoms' indices :type cor: `list` of `int` :param cor: All core atoms' indices :type non: `list` of `int` :param non: Noncore-but-matched atoms' indices """ for e in mol.atom: e.property[constants.FEP_SUBST] = 1 cor = set(cor) non = set(non) for i in noncore: a = mol.atom[i] cor_neighbor = set([int(e) for e in a.bonded_atoms]) & cor if (i in non): a.property[constants.FEP_SUBST] = 97 else: a.property[constants.FEP_SUBST] = 99 if (len(cor_neighbor) > 0) else 2 for j in cor_neighbor: mol.atom[j].property[constants.FEP_SUBST] = 98 def __is_dummy(atom): """ Returns true if this atom can become a dummy atom in an alchemical FEP mutation, or false if not. """ fep_subst = atom.property[constants.FEP_SUBST] return (2 == fep_subst or 99 == fep_subst % 100) def __fix_97atoms(mol): """ We loop through all <97> atoms and check if a <97> atom is connected to more than 1 physical-physical hybrid atoms. If so, we leave the atom's fep_subst code to be 97, otherwise we change it to 2 (so it becomes a physical-dummy hybrid atom). We repeat this process until there is no change of the fep_subst code. """ should_loop = True while (should_loop): should_loop = False for a in mol.atom: if (a.property[constants.FEP_SUBST] == 97): nondummy = [b for b in a.bonded_atoms if (not __is_dummy(b))] if (len(nondummy) == 1): a.property[constants.FEP_SUBST] = 99 if ( nondummy[0].property[constants.FEP_SUBST] == 98) else 2 should_loop = True def __fix_99atoms(atom98, code98, atom99_set): """ This function reassigns the 'fep_subst' code for <99> atoms that are bonded to the given <98> atom. Here I use the <99> nomenclature to mean 99, 199, 299, and so on. -YW This reassignment is for two purposes: 1. Ensure the code consistency between the two molecules. 2. Adjust some 99 codes to be 89 or 79 or 69. This reassignment has to be done after that we obtain an overall knowledge of which atoms are <98>, <99>, and <97>. The assigned 'fep_subst' code is based the given <98> code: if it's 98, <99> will be 99 (or 89, 79, etc.); if it's 198, <99> will be 199 (or 189, 179, etc.), and so on. :type atom98: `Structure.Atom` :param atom98: The <98> atom which we will reassign the <99> and <97> atoms that are bonded to :type code98: `int` :param code98: The <98> fep_subst code. The value will be one of 98, 198, 298, ... and so on. :type atom99_set: `set` of `Structure.Atom` objects :param atom99_set: A set that we use to keep track of the <99> atoms that have already been reassigned. For each atom, we reassign at most once. """ code99 = code98 + 1 atom99 = [ e for e in atom98.bonded_atoms if (e.property[constants.FEP_SUBST] % 100 == 99) ] atom99.sort(key=lambda x: x.index) for e in atom99: if (e not in atom99_set): e.property[constants.FEP_SUBST] = code99 atom99_set.add(e) code99 -= 10
[docs]def get_subst_from_mapping(mol0, mol1): """ Given two CTs: `ct0` and `ct1`, with `i_fep_mapping` atom properties properly set for all atoms of `ct1`, sets the `i_fep_subst` atom properties for both CTs. """ # Gets core and non-core matchings. cor0, cor1 = [], [] # Matched core atoms. List of `int's. non0, non1 = [], [] # Matched noncore atoms. List of `int's. for a1 in mol1.atom: i = a1.property[constants.FEP_MAPPING] if (i == 0): continue a0 = mol0.atom[i] if (a0.element == a1.element and a0.bond_total == a1.bond_total): cor0.append(int(a0)) cor1.append(int(a1)) else: non0.append(int(a0)) non1.append(int(a1)) # Gets core and non-core atoms. Note that some non-core atoms (i.e., the <97> atoms) are matched. core0 = set(cor0) core1 = set(cor1) c1_to_c0 = {c1: c0 for (c0, c1) in zip(cor0, cor1)} noncore0 = set(range(1, mol0.atom_total + 1)) noncore1 = set(range(1, mol1.atom_total + 1)) noncore0 -= core0 noncore1 -= core1 __raw_fepsubst(mol0, noncore0, cor0, non0) __raw_fepsubst(mol1, noncore1, cor1, non1) __fix_97atoms(mol0) __fix_97atoms(mol1) atom98 = [( e.property[constants.FEP_SUBST], int(e), ) for e in mol1.atom if (e.property[constants.FEP_SUBST] == 98)] code98 = 98 atom99_set = set() for junk, i in atom98: j = c1_to_c0[i] a1 = mol1.atom[i] a0 = mol0.atom[j] a1.property[constants.FEP_SUBST] = code98 a0.property[constants.FEP_SUBST] = code98 __fix_99atoms(a0, code98, atom99_set) __fix_99atoms(a1, code98, atom99_set) code98 += 100 # DESMOND-3621 atom97 = [ int(e) for e in mol1.atom if (e.property[constants.FEP_SUBST] == 97) ] code97 = 97 for i in atom97: a1 = mol1.atom[i] j = a1.property[constants.FEP_MAPPING] a0 = mol0.atom[j] a1.property[constants.FEP_SUBST] = code97 a0.property[constants.FEP_SUBST] = code97 code97 += 100 # Assigns the i_fep_mapping code. complete_a1_to_a0 = {a1: a0 for (a0, a1) in zip(cor0 + non0, cor1 + non1)} for a in mol1.atom: a.property[constants.FEP_MAPPING] = complete_a1_to_a0[int(a)] if ( a.property[constants.FEP_SUBST] % 100 in [ 97, 98, 1, ]) else 0
if __name__ == '__main__': usage = "$SCHRODINGER/run desmond_fep_mapping.py input.mae " parser = OptionParser(usage) parser.add_option( "-a", "--atomtype", type='int', action="store", dest="atomtype", default=DEFAULT_ATOM_TYPE, help=""" Atom Typing Schemes ------------------- 1 - All atoms equivalent; all bonds equivalent. 2 - Atoms distinguished by HB acceptor/donor; all bonds equivalent. 3 - Atoms distinguished by hybridization state; all bonds equivalent. 4 - Atoms distinguished by functional type: {H}, {C}, {F,Cl}, {Br,I}, {N,0}, {S}, {other}; bonds by hybridization. 5 - Mol2 atom types; all bonds equivalent. 6 - Atoms distinguished by whether terminal, halogen, HB acceptor/donor; bonds distinguished by bond order. 7 - Atomic number and bond order. 8 - Atoms distinguished by ring size, aromaticity, HB acceptor/donor, ionization potential, whether terminal, whether halogen; bonds distinguished by bond order. 9 - Carhart atom types (atom-pairs approach); all bonds equivalent. 10 - Daylight invariant atom types; bonds distinguished by bond order. 11 - Same as 7, but distinguishing aromatic from non-aromatic. 12 - Same as 11, but distinguishing aliphatic atoms by ring/acyclic. 13 - Same as 12, but distinguishing rings by size.""") parser.add_option( "-o", "--output", type='string', action="store", dest="output", default=None, help="Writing out new structure file") parser.add_option( "-s", "--superimpose", action="store_true", dest="superimpose", default=False, help="Superimpose based on the core atoms") parser.add_option( "-r", "--renumber", action="store_true", dest="renumber", default=False, help="Renumber atom index") parser.add_option( "-f", "--fepio", action="store_true", dest="fepio", default=False, help="print out fepio mapping") parser.add_option( "-p", "--permute", action="store", dest="permute", default=0, type="int", help="generate atom mapping by randomly permute atom order") opt, args = parser.parse_args(sys.argv[1:]) if len(args) != 1: print(usage) sys.exit(1) input_file = args[0] logger.info("Running RGA with atomtype (%d) ..." % opt.atomtype) data = get_atom_mapping_data(input_file, atomtype=opt.atomtype) (atom_map, core_h_map) = get_atom_mapping(data) logger.info("atom mapping in %s" % input_file) logger.info("bridge atoms: %s" % get_bridge_atoms(data)) logger.info("mapping: %s" % atom_map) logger.info("core hydrogen map: %s" % core_h_map) write_fepsubst_to_file(data, "%s.mae" % input_file) def get_atom_pair(source_index, atom_map): ''' return atom mapping pair given soucrce_index ''' for (s, d) in atom_map: if s == source_index: return (s, d) for i in range(opt.permute): permute_atommap = random_shuffle(data, opt.atomtype) if len(atom_map) != len(permute_atommap): raise RuntimeError( "permuted atommap has a different length, permutation %d" % i) else: if set(atom_map) != set(permute_atommap): #check equivalent hydrogen in_org = set(atom_map).difference(set(permute_atommap)) in_permute = set(permute_atommap).difference(set(atom_map)) dest_ct = data.dest_ct.copy() source_ct = data.source_ct.copy() for (source_index, dest_index) in in_org: if source_index is None or dest_index is None: logger.info("map to dummy (%s, %s)" % (source_index, dest_index)) continue d_atom = dest_ct.atom[dest_index] s_atom = source_ct.atom[source_index] if d_atom.element != 'H' and s_atom.element != 'H': raise RuntimeError("heavy atom mapping error (%d, %d)" % (source_index, dest_index)) d_hydrogen = [] d_core_atoms = [atom for atom in d_atom.bonded_atoms] for atom in d_core_atoms[0].bonded_atoms: if atom.element == 'H': d_hydrogen.append(atom.index) (s_p, d_p) = get_atom_pair(source_index, in_permute) logger.info( "alternative atom mapping: (%s, %s) == (%s, %s)" % (source_index, dest_index, s_p, d_p)) logger.info("equivalent hydorgens: %s" % d_hydrogen) if d_p not in d_hydrogen: raise RuntimeError( "permuted atommap is different, permutation %s" % permute_atommap) pickle.dump(atom_map, open("tmp.map", 'w')) if opt.fepio: logger.info("fepio mapping: %s" % (get_fepio_atom_mapping(atom_map))) if opt.output: logger.info("Writing %s ..." % opt.output) writer = structure.StructureWriter(opt.output) source_ct = data.source_ct.copy() dest_ct = data.dest_ct.copy() if opt.superimpose: logger.info("Superimposing structures ...") natom = 15 # number of atoms to be used as reference in superimposition. # @todo: use better algorithm to determine which set of atoms to be used # as reference for superimposition. s_core_indices, d_core_indices = data.getCoreAtoms() superimpose(source_ct, s_core_indices[:natom], dest_ct, d_core_indices[:natom]) if opt.renumber: logger.info("Renumbering structures ...") (new_atom_map, new_bridge_atoms) = reorder_atoms( data, source_ct, dest_ct) fepio_mapping = get_fepio_atom_mapping(new_atom_map) new_atom_map.sort( ) # just sort it and make easy to follow the mapping logger.info("atom mapping in %s" % opt.output) logger.info("bridge atoms: %s" % new_bridge_atoms) logger.info("mapping: %s" % new_atom_map) if opt.fepio: logger.info("fepio mapping: %s" % (fepio_mapping)) writer.append(source_ct) writer.append(dest_ct) writer.close()