Source code for schrodinger.protein.tasks.kinase

import csv
import itertools
import os

from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import KinaseConservation
from schrodinger.protein.annotation import KinaseFeature
from schrodinger.protein.annotation import KinaseFeatureLabel
from schrodinger.structutils import analyze
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import csv_unicode

try:
    from schrodinger.application.prime.packages import kinase_annotation
except ImportError:
    kinase_annotation = None


[docs]class KinaseFeatureTask(tasks.ComboSubprocessTask): DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs] class Input(parameters.CompoundParam): seq: sequence.ProteinSequence = None
[docs] class Output(parameters.CompoundParam): annotation: list
BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE = { 'G': KinaseFeatureLabel.GLYCINE_RICH_LOOP, 'A': KinaseFeatureLabel.ALPHA_C, 'K': KinaseFeatureLabel.GATE_KEEPER, 'H': KinaseFeatureLabel.HINGE, 'L': KinaseFeatureLabel.LINKER, 'R': KinaseFeatureLabel.HRD, 'C': KinaseFeatureLabel.CATALYTIC_LOOP, 'D': KinaseFeatureLabel.DFG, 'T': KinaseFeatureLabel.ACTIVATION_LOOP, '~': KinaseFeatureLabel.NO_FEATURE }
[docs] def mainFunction(self): seq = self.input.seq seq_str = str(seq).replace(seq.gap_char, "") seq_str = seq_str.upper() try: ret = kinase_annotation.KinaseAnnotation.process_annotation(seq_str) except RuntimeError: # Backend raises RuntimeError for non-kinase sequences return _gapless_sequence, backend_annotation, _annotation_head = ret self.output.annotation = backend_annotation
[docs] def getKinaseFeatures(self): backend_annotation = self.output.annotation if not backend_annotation: return [] # Create the annotation mapping with all gaps included from the # original sequence. annotation = [ self.BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE[code] for code in backend_annotation ] gap_idxs = [ii for ii, res in enumerate(self.input.seq) if res.is_gap] for idx in gap_idxs: annotation.insert(idx, KinaseFeatureLabel.NO_FEATURE) # We create KinaseFeature(KinaseFeatureLabel, start, end) objects # for all present kinase features. DUMMY = object() kinase_features = [] def add_kinase_feature(): """ Create a `KinaseFeature` object from a contiguous sequence slice, should be called after initializing previous_feature, start_idx, idx. """ assert previous_feature is not DUMMY if previous_feature is not KinaseFeatureLabel.NO_FEATURE: end_idx = idx - 1 kinase_feature = KinaseFeature(previous_feature, start_idx, end_idx) kinase_features.append(kinase_feature) start_idx, previous_feature = 0, annotation[0] # Add DUMMY so loop runs an extra time to capture the last feature annotation.append(DUMMY) for idx, current_feature in enumerate(annotation): if current_feature is not previous_feature: add_kinase_feature() start_idx, previous_feature = idx, current_feature return kinase_features
[docs]class KinaseConservationTask(jobtasks.CmdJobTask): """ Task to run kinase binding site conservation analysis on the specified sequence and ligand. """ DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs] class Input(parameters.CompoundParam): receptor_seq: sequence.ProteinSequence = None ligand_asl: str
_receptor_fname = "receptor.maegz" _out_fname = "receptor_conservation.csv" _sres_id_key = 'ResidueID_in_ct(ChainID:ResidueNumberInscode)' _conservation_key = 'Conservation_category' @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkInputs(self): if self.input.receptor_seq is None: return False, "Receptor sequence must be set" if not self.input.receptor_seq.hasStructure(): return False, "Receptor sequence must have structure" if not self.input.ligand_asl: return False, "Ligand ASL must be set" st = self.input.receptor_seq.getStructure() if not analyze.evaluate_asl(st, self.input.ligand_asl): return False, f"Ligand {self.input.ligand_asl} was not found" @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _writeInputFile(self): receptor_st = self.input.receptor_seq.getStructure() receptor_st.write(self.getTaskFilename(self._receptor_fname))
[docs] def makeCmd(self): return [ '$SCHRODINGER/run', '-FROM', 'psp', 'kinase_conservation_analysis.py', self.getTaskFilename(self._receptor_fname), '-receptor_chain', self.input.receptor_seq.structure_chain, '-ligand_asl', self.input.ligand_asl ] # yapf: disable
def _getOutputFile(self): return self.getTaskFilename(self._out_fname) @tasks.postprocessor def _checkOutputFile(self): outfile = self._getOutputFile() if not os.path.isfile(outfile): return False, f"Output file not found: {outfile}"
[docs] def parseOutput(self): """ Read the output file and associate the output with the correct residues. :rtype: dict[residue.Residue, annotation.KinaseConservation] """ outfile = self._getOutputFile() seq = self.input.receptor_seq st = seq.getStructure() results = dict() with csv_unicode.reader_open(outfile) as fh: for _ in range(4): # The first 4 lines are decorative next(fh) reader = csv.DictReader(fh) for row in reader: sres_id = row[self._sres_id_key] try: sres = st.findResidue(sres_id) except ValueError: continue res_key = residue.get_structure_residue_key(sres, seq.entry_id) res = seq.getResByKey(res_key) if res is None: continue conservation = row[self._conservation_key] conservation = KinaseConservation(conservation) results[res] = conservation return results