Source code for schrodinger.protein.tasks.kinase
import csv
import itertools
import os
from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import KinaseConservation
from schrodinger.protein.annotation import KinaseFeature
from schrodinger.protein.annotation import KinaseFeatureLabel
from schrodinger.structutils import analyze
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import csv_unicode
try:
from schrodinger.application.prime.packages import kinase_annotation
except ImportError:
kinase_annotation = None
[docs]class KinaseFeatureTask(tasks.ComboSubprocessTask):
DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs] class Output(parameters.CompoundParam):
annotation: list
BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE = {
'G': KinaseFeatureLabel.GLYCINE_RICH_LOOP,
'A': KinaseFeatureLabel.ALPHA_C,
'K': KinaseFeatureLabel.GATE_KEEPER,
'H': KinaseFeatureLabel.HINGE,
'L': KinaseFeatureLabel.LINKER,
'R': KinaseFeatureLabel.HRD,
'C': KinaseFeatureLabel.CATALYTIC_LOOP,
'D': KinaseFeatureLabel.DFG,
'T': KinaseFeatureLabel.ACTIVATION_LOOP,
'~': KinaseFeatureLabel.NO_FEATURE
}
[docs] def mainFunction(self):
seq = self.input.seq
seq_str = str(seq).replace(seq.gap_char, "")
seq_str = seq_str.upper()
try:
ret = kinase_annotation.KinaseAnnotation.process_annotation(seq_str)
except RuntimeError:
# Backend raises RuntimeError for non-kinase sequences
return
_gapless_sequence, backend_annotation, _annotation_head = ret
self.output.annotation = backend_annotation
[docs] def getKinaseFeatures(self):
backend_annotation = self.output.annotation
if not backend_annotation:
return []
# Create the annotation mapping with all gaps included from the
# original sequence.
annotation = [
self.BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE[code]
for code in backend_annotation
]
gap_idxs = [ii for ii, res in enumerate(self.input.seq) if res.is_gap]
for idx in gap_idxs:
annotation.insert(idx, KinaseFeatureLabel.NO_FEATURE)
# We create KinaseFeature(KinaseFeatureLabel, start, end) objects
# for all present kinase features.
DUMMY = object()
kinase_features = []
def add_kinase_feature():
"""
Create a `KinaseFeature` object from a contiguous sequence slice,
should be called after initializing previous_feature, start_idx,
idx.
"""
assert previous_feature is not DUMMY
if previous_feature is not KinaseFeatureLabel.NO_FEATURE:
end_idx = idx - 1
kinase_feature = KinaseFeature(previous_feature, start_idx,
end_idx)
kinase_features.append(kinase_feature)
start_idx, previous_feature = 0, annotation[0]
# Add DUMMY so loop runs an extra time to capture the last feature
annotation.append(DUMMY)
for idx, current_feature in enumerate(annotation):
if current_feature is not previous_feature:
add_kinase_feature()
start_idx, previous_feature = idx, current_feature
return kinase_features
[docs]class KinaseConservationTask(jobtasks.CmdJobTask):
"""
Task to run kinase binding site conservation analysis on the specified
sequence and ligand.
"""
DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
_receptor_fname = "receptor.maegz"
_out_fname = "receptor_conservation.csv"
_sres_id_key = 'ResidueID_in_ct(ChainID:ResidueNumberInscode)'
_conservation_key = 'Conservation_category'
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
def _checkInputs(self):
if self.input.receptor_seq is None:
return False, "Receptor sequence must be set"
if not self.input.receptor_seq.hasStructure():
return False, "Receptor sequence must have structure"
if not self.input.ligand_asl:
return False, "Ligand ASL must be set"
st = self.input.receptor_seq.getStructure()
if not analyze.evaluate_asl(st, self.input.ligand_asl):
return False, f"Ligand {self.input.ligand_asl} was not found"
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _writeInputFile(self):
receptor_st = self.input.receptor_seq.getStructure()
receptor_st.write(self.getTaskFilename(self._receptor_fname))
[docs] def makeCmd(self):
return [
'$SCHRODINGER/run',
'-FROM', 'psp',
'kinase_conservation_analysis.py',
self.getTaskFilename(self._receptor_fname),
'-receptor_chain', self.input.receptor_seq.structure_chain,
'-ligand_asl', self.input.ligand_asl
] # yapf: disable
def _getOutputFile(self):
return self.getTaskFilename(self._out_fname)
@tasks.postprocessor
def _checkOutputFile(self):
outfile = self._getOutputFile()
if not os.path.isfile(outfile):
return False, f"Output file not found: {outfile}"
[docs] def parseOutput(self):
"""
Read the output file and associate the output with the correct residues.
:rtype: dict[residue.Residue, annotation.KinaseConservation]
"""
outfile = self._getOutputFile()
seq = self.input.receptor_seq
st = seq.getStructure()
results = dict()
with csv_unicode.reader_open(outfile) as fh:
for _ in range(4):
# The first 4 lines are decorative
next(fh)
reader = csv.DictReader(fh)
for row in reader:
sres_id = row[self._sres_id_key]
try:
sres = st.findResidue(sres_id)
except ValueError:
continue
res_key = residue.get_structure_residue_key(sres, seq.entry_id)
res = seq.getResByKey(res_key)
if res is None:
continue
conservation = row[self._conservation_key]
conservation = KinaseConservation(conservation)
results[res] = conservation
return results