Source code for schrodinger.application.prepwizard2.tasks

# Note: this file must not import any gui-dependent modules (QtGui, QtWidgets,
# maestro_ui) so the tasks can run without display dependency
import inflect
import subprocess
from collections import defaultdict
from typing import List

import os
import enum
from schrodinger.infra.util import enum_speedup
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.models import parameters
from schrodinger.protein import annotation, captermini
from schrodinger.structutils import assignbondorders, analyze, measure
from schrodinger.structutils import build
from schrodinger.structutils import structalign
from schrodinger.structutils import color
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.tasks import queue
from schrodinger.utils import log
from schrodinger.job import jobcontrol

from . import prepare

import schrodinger
maestro = schrodinger.get_maestro()

try:
    from schrodinger.application.prime.packages import antibody
except ImportError:
    antibody = None

DEFAULT_PH = 7.4
PREPROCESS_PH_PROP = 'r_ppw_preprocess_pH'
OVERLAP_CUTOFF_DIST = 0.8
MIN_EPIK_STATES = 10

# This divides the task into conceptual steps so clients don't need to know
# the details of the subtasks:
Step = enum_speedup(enum.Enum("Step", [
    "Preprocess",
    "Optimize",
    "Cleanup",
    # TODO add a Review step for the auto workflow.
]))  # yapf: disable


DONE_MESSAGE = "✔"


[docs]def hide_non_polar_hydrogens(st): """ Hide all non-polar hydrogens in the given structure; except those that overlap with other atoms. """ non_polar_hydrogens = set( analyze.evaluate_asl(st, 'atom.ele H and /C0-H0/')) overlapping_atoms = { anum for pair in measure.get_close_atoms(st, OVERLAP_CUTOFF_DIST) for anum in pair } for anum in non_polar_hydrogens - overlapping_atoms: st.atom[anum].visible = False
[docs]class LoggerMixin: """ A Mixin for a task that provides a thin wrapper for a logger """ logger = parameters.NonParamAttribute()
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.logger = log.get_output_logger('prepwizard.py')
[docs] def info(self, text): """ Log the text as informational. :param text: the info text to log :type text: str """ self.logger.info(text)
[docs] def warning(self, text): """ Log the text as a warning. :param text: the warning text to log :type text: str """ self.logger.warning(text)
[docs]class StructureInput(parameters.CompoundParam): struct: structure.Structure = None
[docs]class GenerateStatesSettings(parameters.CompoundParam): epik_pH: float = DEFAULT_PH epik_pHt: float = 2.0 max_states: int = 1 process_detected_ligands: bool = True process_metals_and_ions: bool = True process_non_water_solvents: bool = False process_other_hets: bool = True
[docs] def getHetTypesToProcess(self): """ Return a list of HetType for het types that should be processed. """ types_map = { prepare.HetType.METAL_OR_ION: self.process_metals_and_ions, prepare.HetType.NON_WATER_SOLVENT: self.process_non_water_solvents, prepare.HetType.DETECTED_LIGAND: self.process_detected_ligands, prepare.HetType.OTHER: self.process_other_hets, } return [htype for htype, do_process in types_map.items() if do_process]
[docs]class GenerateStatesInput(GenerateStatesSettings, StructureInput): pass
[docs]class PreprocessInput(StructureInput): reference_struct: structure.Structure = None # NOTE: far waters can be deleted during pre-processing, but also during # cleanup. preprocess_delete_far_waters: bool = False preprocess_watdist: float = 8.0 treat_metals: bool = True assign_bond_orders: bool = True use_ccd: bool = True add_hydrogens: bool = False readd_hydrogens: bool = True add_terminal_oxygens: bool = False treat_disulfides: bool = True treat_glycosylation: bool = False treat_palmitoylation: bool = False annotate_antibodies: bool = True antibody_cdr_scheme: annotation.AntibodyCDRScheme = annotation.DEFAULT_ANTIBODY_SCHEME selenomethionines: bool = False fillsidechains: bool = True fillloops: bool = False custom_fasta_file: bool = None cap_termini: bool = False # GenerateStatesTaskSettings == runEpik run_epik: bool = True idealize_hydrogen_tf: bool = True generate_states_settings: GenerateStatesSettings
[docs] def isDefault(self): # Needed because by default, isDefault() can't handle Structure params default_settings = PreprocessInput.defaultValue().toDict() default_settings.pop("struct") default_settings.pop("reference_struct") current_settings = self.toDict() current_settings.pop("struct") # Since user controls the pH value from the global settings pop-up # and not the preprocessing options, we need to ignore it when # analyzing whether the model is at defaults or not: default_settings['generate_states_settings'].pop("epik_pH") current_settings['generate_states_settings'].pop("epik_pH") ref_struct = current_settings.pop("reference_struct") return current_settings == default_settings and ref_struct is None
[docs] def reset(self): """ Reset all settings except pH, which is part of the global settings. """ bu_pH = self.generate_states_settings.epik_pH super().reset() self.generate_states_settings.epik_pH = bu_pH
[docs] def getNumSelected(self): """ Return the number of pre-processing steps that are enabled. """ # annotate_antibodies is skipped because it is always enabled in the GUI steps = [ self.preprocess_delete_far_waters, self.treat_metals, self.assign_bond_orders, self.add_hydrogens, self.readd_hydrogens, self.treat_disulfides, self.treat_glycosylation, self.treat_palmitoylation, self.selenomethionines, self.fillsidechains, self.fillloops, self.cap_termini, self.run_epik, ] return sum(1 for enabled in steps if enabled)
[docs]class PreprocessTask(LoggerMixin, tasks.ComboSubprocessTask): input: PreprocessInput output: List[structure.Structure] message: str
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): if not self.input.struct: return False, 'No input structure defined'
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def setInitialStatus(self): self.message = 'Pre-processing...'
[docs] @tasks.postprocessor def postProcessor(self): self.message = DONE_MESSAGE
[docs] def mainFunction(self): st = self.input.struct self._applyByElementColorScheme(st) if self.input.reference_struct: self._alignToReference(st, self.input.reference_struct) # Lets Maestro know that this protein was prepared: prepare.add_prepared_props(st) self._fixCommonStructureMistakes(st) if self.input.treat_metals: self._preTreatMetals(st) if self.input.assign_bond_orders: self._assignBondOrders(st) if self.input.readd_hydrogens: self._reAddHydrogens(st) elif self.input.add_hydrogens: self._addHydrogens(st) hide_non_polar_hydrogens(st) if self.input.treat_metals: self._postTreatMetals(st) if self.input.treat_disulfides: self._treatDisulfides(st) if self.input.treat_glycosylation: self._treatGlycosylation(st) if self.input.treat_palmitoylation: self._treatPalmitoylation(st) if self.input.annotate_antibodies: antibody.annotate_ab_regions( st, scheme=self.input.antibody_cdr_scheme.name) if self.input.selenomethionines: self._treatSelenomethionines(st) if self.input.preprocess_delete_far_waters: self.info("Deleting far waters...") prepare.delete_far_waters(st, self.input.preprocess_watdist) if self.input.fillloops: st = self._fillLoops(st, self.input.custom_fasta_file) if self.input.fillsidechains: self._fillSideChains(st) if self.input.add_terminal_oxygens: self._addTerminalOxygens(st) if self.input.cap_termini: st = self._capTermini(st) # Whether to generate states if self.input.run_epik: expanded_sts = self._generateStates(st) else: expanded_sts = [st] if self.input.idealize_hydrogen_tf: self._idealizeHydrogenTemperatureFactor(expanded_sts) self.info('Pre-processing has completed') self.output = expanded_sts
def _alignToReference(self, st, reference_st): """ Use StructAlign to align `st` to `reference_st`. """ self.info("Aligning to reference structure...") sa = structalign.StructAlign() try: sa.align(reference_st, [st]) except Exception as err: self.warning(f"Failed to align structure to reference.\n{err}") raise def _fixCommonStructureMistakes(self, st): """ Fix some element types, formal charges and some sulfurs. Note: will reset the atom types. :type st: structure.Structure """ self.info("Fixing common structure mistakes...") corrections_list = prepare.fix_common_structure_mistakes(st) for correction_str in corrections_list: self.info(" " + correction_str) if not corrections_list: self.info(" No mistakes were found") def _preTreatMetals(self, st): """ Pretreatment of metals. :type st: structure.Structure """ self.info("Pre-treating metals...") mm.mmlewis_initialize(mm.error_handler) mm.mmlewis_pre_add_zobs(st) self.info('Done treating metals') def _postTreatMetals(self, st): """ Optional final treatment of metals and zero-order bonded sulfurs. :type st: structure.Structure """ self.info("Treating metals...") mm.mmlewis_add_zobs(st) mm.mmlewis_terminate() prepare.fix_sulfur_charges(st) def _assignBondOrders(self, st): """ Assignment bond orders. :type st: structure.Structure """ self.info("Assigning bond orders...") self.info(" Using CCD: %s" % self.input.use_ccd) assigned_bonds = assignbondorders.assign_st( st, problem_only=True, use_ccd=self.input.use_ccd, _logger=self.logger) if assigned_bonds: self.info(" Assigned the following bonds (format: atom1," " atom2, order): %s" % assigned_bonds) else: self.info(" No bond orders were changed") def _reAddHydrogens(self, st): """ Remove and add hydrogens. :type st: structure.Structure """ self.info("Removing and re-adding hydrogens...") build.delete_hydrogens(st) build.add_hydrogens(st) def _addHydrogens(self, st): """ Add hydrogens. :type st: structure.Structure """ self.info("Adding hydrogens...") build.add_hydrogens(st) def _treatDisulfides(self, st): """ Create di-sulfur bonds. :type st: structure.Structure """ self.info("Creating di-sulfur bonds...") prepare.create_disulfide_bonds(st, verbose=True) st.property['b_ppw_created_disulfur'] = True def _treatGlycosylation(self, st): """ Add glycosylation bonds. :type st: structure.Structure """ self.info("Adding glycosylation bonds...") prepare.create_glycosylation_bonds(st, verbose=True) st.property['b_ppw_created_glycosylation'] = True def _treatPalmitoylation(self, st): """ Add palmitoylation bonds. :type st: structure.Structure """ self.info("Adding palmitoylation bonds...") prepare.create_palmitoylation_bonds(st, verbose=True) st.property['b_ppw_created_palmitoylation'] = True def _treatSelenomethionines(self, st): """ Convert selenomethionines to methionines. :type st: structure.Structure """ self.info("Converting Selenomethionines...") converted_residues = prepare.convert_selenomethionines(st) if converted_residues: self.info(" The following residues were converted from" " selenomethionines to methionines: %s" % ", ".join(converted_residues)) st.property['b_ppw_converted_selenomethionines'] = True def _fillLoops(self, st, fasta_file=None): """ Fill missing loops using prime. :type st: structure.Structure :param fasta_file: the fasta file to use for building missing loops :type fasta_file: str or None :rtype: structure.Structure """ self.info("Filling missing loops using Prime...") if fasta_file: self.info(" Using custom fasta file: %s" % fasta_file) from schrodinger.application.prime.packages import missing_loop return missing_loop.build_missing_loops( st, fasta_file=fasta_file, attempt_knowledge_based=True, build_tails=False) def _fillSideChains(self, st): """ Fill missing side chains using prime. :type st: structure.Structure """ self.info("Detecting missing residue atoms...") missing_res_present = prepare.do_any_residues_have_missing_side_chains( st) if missing_res_present: self.info("Filling missing side chains using Prime...") from schrodinger.application.prime.packages import missing_loop missing_loop.add_missing_sidechains(st) def _addTerminalOxygens(self, st): """ Add terminal OXT oxygen by transforming HXT to OXT :type st: structure.Structure """ self.info("Adding terminal oxygens...") capped_residues = captermini.add_terminal_oxygens(st) msg = " Added a terminal oxygen to the following residues: " msg += ", ".join([str(r) for r in capped_residues]) self.info(msg) st.property['b_ppw_added_terminal_oxygens'] = True def _capTermini(self, st): """ Cap the protein termini. :type st: structure.Structure :rtype: structure.Structure """ self.info("Capping protein termini...") capter = captermini.CapTermini(st) st = capter.outputStructure() capped_residues = capter.cappedResidues() msg = " The following terminal residues have been capped: " msg += ", ".join(capped_residues) self.info(msg) return st def _generateStates(self, st): """ Generate states for hetero atom groups and metals. :type st: structure.structure :rtype: List[structure.structure] """ task = GenerateStatesTask() update_params(task.input, self.input.generate_states_settings) st.property[ PREPROCESS_PH_PROP] = self.input.generate_states_settings.epik_pH task.input.struct = st task.logger = self.logger task.name = self.name + '-genstates' task.start() task.wait() # OK: subprocess self.info(task.getLogAsString().strip()) if task.status == task.FAILED: raise RuntimeError('Failed to generate het states') return task.output def _idealizeHydrogenTemperatureFactor(self, sts): """ Sets the temperature factors of added hydrogens to its neighboring value for all structures in `sts`. :type sts: List[structure.Structure] """ self.info("Idealizing hydrogen temperature factors...") for st in sts: prepare.idealize_hydrogen_temp_factor(st) def _applyByElementColorScheme(self, st): """ Apply the element color scheme to the given structure, but only if it's currently colored by the PDB conversion status. """ oxygens = analyze.evaluate_asl(st, 'protein AND atom.ele O') if not oxygens: return if st.atom[oxygens[0]].color.name in ('user4', 'gray'): # If oxygens are gray color.apply_color_scheme(st, "Element (Green Ligand)")
[docs]class OptimizeHBondInput(StructureInput): """ Model for the "Optimize H-bond Assignments" step of the workflow tab, i.e., OptimizeHBondTask.input """ samplewater: bool = True xtal: bool = False simplified_pH: str = 'neutral' use_propka: bool = True propka_pH: float = DEFAULT_PH label_pkas: bool = False force_list: list = [] minimize_adj_h: bool = False protassign_number_sequential_cycles: int = None protassign_max_cluster_size: int = None idealize_hydrogen_tf: bool = True
[docs]class OptimizeHBondTask(LoggerMixin, tasks.SubprocessCmdTask): """ A task optimized H-Bonds, i.e., to run the protein assignment (protassign) """ input: OptimizeHBondInput output: structure.Structure = None message: str
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): if not self.input.struct: return False, 'No input structure defined'
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def setInitialStatus(self): self.message = 'Optimizing H-bonds...'
[docs] @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def prepareInput(self): self._in_file = self.getTaskFilename(self.name + '.maegz') self._out_file = self.name + '-out.maegz' self.input.struct.write(self._in_file)
[docs] def makeCmd(self): """ Create the command to run impref not under job control (-NOJOBID) """ cmd = [ prepare.protassign_exe, self._in_file, self._out_file, '-NOJOBID' ] if self.input.samplewater: self.info(' will sample waters') else: cmd += ['-nowater'] if self.input.xtal: cmd += ['-xtal'] self.info(' will use crystal symmetry') if self.input.minimize_adj_h: cmd += ['-minimize'] self.info(' Uses -minimize option in ProtAssign') if self.input.protassign_number_sequential_cycles: opt_str = str(self.input.protassign_number_sequential_cycles) cmd += ['-number_sequential_cycles', opt_str] self.info( f' Running {opt_str} sequential cycles for large clusters') if self.input.protassign_max_cluster_size: opt_str = str(self.input.protassign_max_cluster_size) cmd += ['-max_cluster_size', opt_str] self.info(f' Limiting cluster size to {opt_str} residues') if self.input.use_propka: self.info(" Using PROPKA with pH: %s" % self.input.propka_pH) cmd += ['-propka_pH', str(self.input.propka_pH)] if self.input.label_pkas: cmd.append('-label_pkas') self.info(' (Labeling pKas)') if self.input.force_list: label = ' Forcing residue state' for res_str, state_str in self.input.force_list: cmd += ['-f', res_str, state_str] self.info(f'{label}: {res_str} {state_str}') else: self.info( f' Using simplified rules with pH: {self.input.simplified_pH}') cmd += ['-nopropka', '-pH', self.input.simplified_pH] return cmd
[docs] def runCmd(self, cmd): self.info(f' RUNNING: {subprocess.list2cmdline(cmd)}') super().runCmd(cmd)
[docs] @tasks.postprocessor def processOutput(self): # Post processor runs in launch dir instead of task dir st = structure.Structure.read(self.getTaskFilename(self._out_file)) if self.input.idealize_hydrogen_tf: self.info("Idealizing hydrogen temperature factors...") prepare.idealize_hydrogen_temp_factor(st) if 'b_pa_PROPKA_failed' in st.property: self.warning('Optimize H-bond job completed (ProPKA failed') else: # Some hydrogens that were previously overlapping, are likely # no longer overlapping and can now be hidden: hide_non_polar_hydrogens(st) self.info('Optimize H-bond job completed') self.output = st
[docs]class CleanupInput(StructureInput): """ Model for the "Clean Up" step of the workflow tab """ run_impref: bool = True # ProteinRefinementTask.input force_field: str rmsd: float = 0.3 fixheavy: bool = False # Options for far water deletion: delete_far_waters: bool = True watdist: float = 5.0 # Options for non-bridging water deletion: delete_nonbridging_waters: bool = False delwater_hbond_cutoff: int = 3
[docs]class DeleteWatersTask(LoggerMixin, tasks.BlockingFunctionTask): """ A task that may run water deletion and restrained minimization on a protein """ input: CleanupInput output: structure.Structure = None
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): assert self.input.delete_nonbridging_waters or self.input.delete_far_waters if not self.input.struct: return False, 'No input structure defined'
[docs] def mainFunction(self): st = self.input.struct if self.input.delete_nonbridging_waters: assert self.input.delwater_hbond_cutoff > 0 self.info("Deleting non-bridging waters...") del_waters = prepare.delete_non_bridging_waters( st, self.input.delwater_hbond_cutoff) if del_waters: self.info(' Deleted waters: %s' % ', '.join(del_waters)) else: self.info(' Deleted waters: None') if self.input.delete_far_waters: self.info("Deleting far waters...") prepare.delete_far_waters(st, self.input.watdist) self.info('done deleting waters') self.output = st
[docs]class CleanupTask(LoggerMixin, tasks.BlockingFunctionTask): """ A task that may run water deletion and restrained minimization on a protein """ input: CleanupInput output: structure.Structure = None # Whether Impref was skipped due to valence violations: valence_error: bool = False
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): if not self.input.struct: return False, 'No input structure defined'
[docs] def mainFunction(self): st = self.input.struct if self.input.run_impref: if analyze.hydrogens_present(st): self.info("Running restrained minimization...") impref_task = ProteinRefinementTask() impref_task.logger = self.logger impref_task.name = self.name + '-impref' impref_task.input.struct = st impref_task.input.force_field = self.input.force_field impref_task.input.rmsd = self.input.rmsd impref_task.input.fixheavy = self.input.fixheavy impref_task.specifyTaskDir(self.getTaskDir()) impref_task.start() impref_task.wait() # OK: job if impref_task.status == impref_task.FAILED: raise tasks.TaskFailure("Protein refinement has failed") st = impref_task.output else: # PPWorkflowTask will read the valence_error option self.valence_error = True if self.input.delete_far_waters or self.input.delete_nonbridging_waters: self.warning( "WARNING: Skipping Impref minimization due to valence violations; removing waters only." ) # Delete waters even if Impref failed: if self.input.delete_far_waters or self.input.delete_nonbridging_waters: self.info("Deleting waters...") delete_waters_task = DeleteWatersTask() delete_waters_task.logger = self.logger delete_waters_task.name = self.name + 'delete-waters' update_params(delete_waters_task.input, self.input) delete_waters_task.input.struct = st delete_waters_task.specifyTaskDir(self.getTaskDir()) delete_waters_task.start() delete_waters_task.wait() # OK: job st = delete_waters_task.output if delete_waters_task.status == delete_waters_task.FAILED: raise tasks.TaskFailure( f'Task {delete_waters_task.name} failed: {delete_waters_task.failure_info}' ) self.output = st
[docs]class PPWorkflowSettings(parameters.CompoundParam): """ Settings for PPW workflow auto task. """ preprocess: PreprocessInput hbond: OptimizeHBondInput cleanup: CleanupInput do_preprocess: bool = True do_hbond: bool = True do_cleanup: bool = True
[docs]class PPWorkflowInput(StructureInput, PPWorkflowSettings): """ Settings and structure input for PPW workflow tasks. """ pass
[docs]class PPWorkflowOutput(parameters.CompoundParam): structs: List[structure.Structure] postprocess_valence_error: bool = False
[docs]class PPWorkflowTask(LoggerMixin, jobtasks.ComboJobTask): """ Runs the entire workflow. Intended to be created by PPBatchWorkflowTask. """ input: PPWorkflowInput output: PPWorkflowOutput
[docs] def backendMain(self): input_st = self.input.struct if self.input.do_preprocess: # TODO propogate stdout from PreprocessTask into the log file from # the main PPWorkflowTask. task = PreprocessTask(name='Preprocess') task.input.setValue(self.input.preprocess) task.input.struct = input_st task.specifyTaskDir(self.getTaskDir()) task.start() task.wait() # OK: job if task.status == task.FAILED: raise tasks.TaskFailure( f'Task {task.name} failed: {task.failure_info}') self.output.structs = task.output else: self.output.structs = [input_st] if not self.input.do_hbond and not self.input.do_cleanup: return # Optionally run ProtAssign and/or Impref on each preprocessed CT: if self.input.do_hbond: task = OptimizeHBondTask(name='Optimize_H-bond_Assignments') task.input.setValue(self.input.hbond) self.output.structs = self._runTask(task, self.output.structs) if self.input.do_cleanup: task = CleanupTask(name='Clean_Up') task.input.setValue(self.input.cleanup) # Will run the task on each structure in self.output: self.output.structs = self._runTask(task, self.output.structs) # Impref is skipped if valence violations are found if task.valence_error: self.output.postprocess_valence_error = True
def _runTask(self, task, sts): """ Runs task using the structures in `sts` and returning output structures as a list. :param task: the task to run. This task should have a single structure as its output param. :type task: tasks.AbstractTask :param sts: List of input structures :type sts: list(structure.Structure) """ task.specifyTaskDir(self.getTaskDir()) output = [] for st in sts: task.input.struct = st task.start() task.wait() # OK: job if task.status == task.FAILED: raise tasks.TaskFailure( f'Task {task.name} failed: {task.failure_info}') output.append(task.output) return output
[docs]class PPBatchWorkflowInput(parameters.CompoundParam): struct_file: jobtasks.TaskFile settings: PPWorkflowSettings
[docs]class PPBatchWorkflowTask(LoggerMixin, jobtasks.ComboJobTask): """ Task for the auto PPW2 workflow, both single structure and batch. """ input: PPBatchWorkflowInput output: str # Output file path
[docs] class JobConfig(jobtasks.JobConfig): incorporation = jobtasks.IncorporationParam( jobtasks.IncorporationMode.APPEND, allowed_modes=list(jobtasks.IncorporationMode)) host_settings: jobtasks.HostSettings = jobtasks.HostSettings( allowed_host_types=jobtasks.AllowedHostTypes.CPU_ONLY, num_subjobs=4)
# TODO: default num_subjobs should depend on the host. Currently # panels have to hard-code a positive int here in order for config # dialog to show the #cpus option. DEFAULT_TASKDIR_SETTING = tasks.AUTO_TASKDIR # Add 100 to make sure this is the last preprocessor that is run:
[docs] @tasks.preprocessor(order=tasks.AFTER_TASKDIR + 100) def setFiles(self): self.addInputFile(self.input.struct_file) self.output = self.name + '-out.maegz' self.addOutputFile(self.output) self.incorporation_file = self.output
[docs] def backendMain(self): # Do a lazy import, to avoid circular import, as utils.py module # imports this tasks.py module. from prepwizard2_gui_dir import utils as pwutils # NOTE: TaskDJ will submit all jobs at same time, due to PANEL-18401 input_file = self.input.struct_file output_file = self.output dj = queue.TaskDJ(max_failures=queue.NOLIMIT) self.info('TaskDJ hosts: %s' % dj._hosts) taskdir = self.getTaskDir() subtasks = [] reader = structure.StructureReader(input_file) backend = jobcontrol.get_backend() for st_num, st in enumerate(reader, start=1): st_task = PPWorkflowTask() st_task.input.setValue(self.input.settings) st_task.input.struct = st # Use same task dir as parent job: st_task.specifyTaskDir(taskdir) st_task.name = f'{self.name}-{st_num}' # Make sure log file gets copied to the launch dir: # NOTE: Since structures and not file paths are saved to inputs and # outputs, we can't copy back any structure files. if backend: backend.addLogFile(st_task._getLogFilename()) dj.addTask(st_task, limit_cpus=True) subtasks.append(st_task) num_tasks = len(subtasks) self.info('Starting preparation. Number of subjobs: %i' % num_tasks) dj.run() self.info('Subjobs completed.') num_failed_tasks = 0 failed_titles = [] with structure.StructureWriter(output_file) as writer: for st_task in subtasks: if st_task.status != st_task.DONE: num_failed_tasks += 1 failed_titles.append(st_task.input.struct.title) continue prepared_sts = st_task.output.structs self.info('done with preparation') struct_text = inflect.engine().no("structure", len(prepared_sts)) self.info(f'Writing {struct_text} to file...') # Annotate the output structures: if st_task.output.postprocess_valence_error: annotation = BATCH_NOT_MINIMIZED else: annotation = BATCH_PREPARED for st in prepared_sts: annotate_entry_title(st, annotation) writer.append(st) if num_failed_tasks: self.warning('ERROR: Following structures failed to prepare:') for title in failed_titles: self.warning(' ' + title) self.warning( 'Total number of failed structures: %i' % num_failed_tasks) if num_failed_tasks == num_tasks: raise tasks.TaskFailure( "All batch preparations have failed; exiting...") self.info('Number of successful preparations: %i' % (num_tasks - num_failed_tasks)) # Each preparation can have multiple output structures self.info('Number of output structures: %i' % writer.written_count) self.info('Output file: %s' % output_file)
[docs]class GenerateStatesTask(LoggerMixin, tasks.ComboSubprocessTask): """ A task that will run epik to generate states """ input: GenerateStatesInput output: List[structure.Structure] ms: int = 1
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): if not self.input.struct: return False, 'No input structure defined'
[docs] def mainFunction(self): complex_st = self.input.struct self.info("Finding het groups (for Epik)...") types_to_process = self.input.getHetTypesToProcess() sts_to_run_epik_on, sts_to_not_run_epik_on = prepare.prepare_for_epik( complex_st, types_to_process) if not sts_to_run_epik_on and not sts_to_not_run_epik_on: # No hets present; return input complex without modifications. self.info('Epik was not run, as there are no preparable groups') self.output = [complex_st] return if sts_to_run_epik_on: # At least one het needs Epik, run Epik: epik_out_sts = self._runEpikJob(sts_to_run_epik_on) outsts = epik_out_sts + sts_to_not_run_epik_on else: self.info('Epik was not run, as there are no preparable groups') outsts = sts_to_not_run_epik_on # Generate output complexes, with best het states applied: self._applyStates(complex_st, outsts)
def _makeEpikCmd(self, infile, outfile): """ Create the command to run epik not under job control (-NO_REDIRECT) """ # Have Epik always generate at least MIN_EPIK_STATES per het, so that # we can choose best ones by more criteria than just the Epik state # penalty. If user requested more than this number of states per # protein, then we will generate more Epik states per het. self.ms = max(MIN_EPIK_STATES, self.input.max_states) cmd = [ prepare.epik_exe, '-ms', str(self.ms), '-ph', str(self.input.epik_pH), '-pht', str(self.input.epik_pHt), '-cg', '0.3', '0.3', '-imae', infile, '-omae', outfile, '-NO_REDIRECT', '-metal_binding' ] # yapf: disable return cmd def _runEpikJob(self, sts): """ Run and process the output for an Epik job. :param sts: the structures to run :type sts: List[structure.Structure] :return: the processed output structures from the Epik job :rtype: List[structure.Structure] """ in_file = self.getTaskFilename(self.name + '.maegz') out_file = self.name + '-out.maegz' self.info(" Using Epik with pH: %s" % self.input.epik_pH) self.info(" and pH Range: %s" % self.input.epik_pHt) self.info(" Max het states per protein: %s" % self.input.max_states) prepare.tag_st_het_num_prop(sts) structure.write_cts(sts, in_file) cmd = self._makeEpikCmd(in_file, out_file) ret = subprocess.call(cmd) if ret != 0: raise RuntimeError('Epik job failed') # Read Epik output file, and filter out metal binding states of hets # that are not within 5A of a metal: epik_out_sts = prepare.filter_undesired_states( self.input.struct, structure.StructureReader(out_file)) return epik_out_sts def _applyStates(self, st, out_sts): # Dict: key: hetnum, value: list of Structures: states_by_het = defaultdict(list) for state_st in out_sts: nhet = state_st.property['i_ppw_hetnum'] states_by_het[nhet].append(state_st) # Save original het states as JSON property: state_objs_by_het = {} for nhet, state_sts in states_by_het.items(): state_objs_by_het[nhet] = [ prepare.HetState.initFromEpikOutput(nhet, state_st) for state_st in state_sts ] prepare.serialize_het_states(state_objs_by_het, st) # List of complexes, and sum of het scores of all het states applied: applied_states = [(0.0, st)] # Apply states: # For each het (and Epik output associated with it): for nhet, state_sts in states_by_het.items(): # Generate states for phosphate and sulfate groups, because Epik # does not account 3D symmetry. TODO add support for more # symmetrical groups. state_sts = prepare.generate_phosphate_and_sulfur_states(state_sts) state_sts = prepare.generate_metal_and_ion_states(state_sts) applied_states = [(total_score + score, out_st) for total_score, in_complex_st in applied_states for score, out_st in prepare.apply_het_states( in_complex_st, state_sts, self.logger)] # Limit the number of complexes generated so far, as generating # all combintation can use up too much memory: applied_states.sort(key=lambda items: -items[0]) applied_states = applied_states[:self.input.max_states] output_sts = [st for _, st in applied_states] # Append an integer to each output entry title: if len(output_sts) > 1: for i, st in enumerate(output_sts, start=1): st.title += f'_{i}' self.output = output_sts
[docs]class ProteinRefinementInput(StructureInput): force_field: str rmsd: float = 0.3 fixheavy: bool = False
[docs]class ProteinRefinementTask(LoggerMixin, tasks.SubprocessCmdTask): """ A task to run the protein refinement (restrained minimization, i.e., impref) """ input: ProteinRefinementInput output: structure.Structure = None
[docs] @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def validateInput(self): if not self.input.struct: return False, 'No input structure defined'
[docs] @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def prepareInput(self): self._in_file = self.getTaskFilename(self.name + '.maegz') self._out_file = self.name + '-out.maegz' self.input.struct.write(self._in_file) self.info(' RMSD: %s' % self.input.rmsd) self.info(' Force field: %s' % self.input.force_field)
[docs] def makeCmd(self): """ Create the command to run impref not under job control (-NOJOBID) """ cmd = [prepare.impref_exe, '-r', str(self.input.rmsd), '-f', self.input.force_field, self._in_file, '-op', self._out_file, '-NOJOBID' ] # yapf: disable if self.input.fixheavy: # minimize hydrogens only: self.info(' hydrogens only') cmd += ['-fix'] else: self.info(' all atoms') return cmd
[docs] @tasks.postprocessor def processOutput(self): out_file = self.getTaskFilename(self._out_file) if self.exit_code != 0 or not os.path.isfile(out_file): self.warning('ERROR: Impref exited with failure:') self.warning(self.stderr) return False # Exits the task with failure # Post processor runs in launch dir instead of task dir self.output = structure.Structure.read(out_file) # Some hydrogens that were previously overlapping, are likely # no longer overlapping and can now be hidden: hide_non_polar_hydrogens(self.output)
[docs]def update_params(to_params, from_params): """ Update the to_params with the values in from_params. :param to_params: the parameters to update :type to_params: parameters.CompoundParam :param from_params: the parameters to get the values from :type from_params: parameters.CompoundParam """ to_dict = to_params.toDict() from_dict = from_params.toDict() update_dict = {k: v for k, v in from_dict.items() if k in to_dict} to_params.setValue(update_dict)
# Note: annotation code at the end because it relies on Task classes SEPARATOR = ' - ' SIDE_CHAINS = '3-side-chains' DELETION_STEP_3 = '3-substructures-deleted' DELETION_INACTIVE = 'with-deletions' BATCH_PREPARED = 'prepared' BATCH_NOT_MINIMIZED = 'NOT_MINIMIZED' # Mapping between completed tasks and the title suffixes. TASK_ANNOTATION_MAP = { PreprocessTask: '2-preprocessed', OptimizeHBondTask: '4-hbond-opt', ProteinRefinementTask: '5-minimized', DeleteWatersTask: '5-removed waters', } # additional annotation(s) not from running actual Tasks PPWTASK_ANNOTATIONS = { SIDE_CHAINS, DELETION_STEP_3, DELETION_INACTIVE, BATCH_PREPARED, BATCH_NOT_MINIMIZED, } ANNOTATIONS = set(TASK_ANNOTATION_MAP.values()).union(PPWTASK_ANNOTATIONS) # NOTE: annotation due to substructure extraction is not on this list, as # we don't want those suffixes to be cleared in later steps of preparation.
[docs]def generate_annotation(task): """ Return the annotation text to use for a finished task. :param task: the task :type task: schrodinger.tasks.tasks.AbstractTask :return: the note to use :rtype: str """ return TASK_ANNOTATION_MAP.get(type(task), '')
[docs]def annotate_entry_title(st, annotation): """ Annotates the structure title by putting a note on the structure title, only if it is the `ANNOTATIONS`. Will replace the existing note at the end of the title, if one is found. If no exact match is found at the end, the new note is appended. :param st: the new structure which needs annotated title :type st: `structure.Structure` :param annotation: one of the known annotation strings :type annotation: str """ if annotation not in ANNOTATIONS: return title = st.title tokens = title.split(SEPARATOR) if tokens and tokens[-1] in ANNOTATIONS: suffix_len = len(SEPARATOR) + len(tokens[-1]) title = title[:-suffix_len] st.title = title + SEPARATOR + annotation