Source code for schrodinger.protein.tasks.clustal

# Runs ClustalW alignment jobs

import glob
import os
import shutil
import tempfile

from schrodinger.application.msv import seqio
from schrodinger.Qt import QtCore
from schrodinger.utils import subprocess


def get_clustal_path():
    """
    Returns a path to ClustalW executable.

    This function attempts to find clustalw2 excutable in following locations:

    1) Maestro bin directory based on MAESTRO_EXEC env var.
    2) Maestro bin directory based on $SCHRODINGER/maestro-v*/bin/* path.
    3) User-defined location (CLUSTALW2 env var).

    :rtype: str
    :return: path to ClustalW executable file, or None if the executable could
        not be located.
    """
    clustal_exec = None

    maestro_path = os.getenv("MAESTRO_EXEC")
    if maestro_path:
        clustal_exec = os.path.join(maestro_path, "clustalw2")
        if os.name == "nt":
            clustal_exec += ".exe"

    if not clustal_exec or not os.path.isfile(clustal_exec):
        schrodinger = os.getenv("SCHRODINGER")
        pattern = os.path.join(schrodinger, "maestro-v*", "bin", "*",
                               "clustalw2*")
        results = glob.glob(pattern)
        if results:
            clustal_exec = results[0]

    if not clustal_exec:
        clustal_exec = os.getenv("CLUSTALW2")

    return clustal_exec


class AbstractAlignmentJob(QtCore.QObject):
    """
    Abstract class for defining common alignment job behavior

    :cvar: progressMade: A signal emitted with the number of lines output by
        the clustal job.
    :vartype progressMade: QtCore.pyqtSignal
    """
    progressMade = QtCore.pyqtSignal(int)

    def __init__(self, aln, second_aln=None):
        """
        :param aln: Input sequence alignment.
        :type aln: ProteinAlignment

        :param second_aln: Second alignment for profile-profile and profile-sequence
            alignment.
        :type second_aln: ProteinAlignment
        """
        super().__init__()
        self.aln = aln
        self.second_aln = second_aln
        self.gapopen = None
        self.gapext = None
        self.proc = None
        self.canceled = False
        # Estimate progress as the number of output lines from clustal.
        # Use a little more than the number of possible pairwise alignments
        # as a guess for the maximum.
        self.maximum_progress = (len(aln)**2) / 1.9

    def setGapPenalties(self, gapopen, gapext):
        self.gapopen = gapopen
        self.gapext = gapext

    def run(self):
        """
        Should be implemented by subclasses.
        """
        return NotImplementedError

    def _run(self, command, output_fname=None):
        """
        Runs the specified command and emits progress based on output length

        :param command: Command to be run
        :type command: list(str)

        :param output_fname: Output log filename
        :type output_fname: str
        """
        p = subprocess.Popen(
            command, stdout=subprocess.PIPE, universal_newlines=True)
        self.proc = p
        output = []
        with p.stdout:
            while p.poll() is None and not self.canceled:
                line = p.stdout.readline()
                output.append(line)
                self.progressMade.emit(len(output))
            if self.canceled:
                return
            if output_fname:
                output.append(p.stdout.read())  # Read any remaining stdout
                with open(output_fname, 'w') as outfile:
                    outfile.write(''.join(output))

    def cancel(self):
        if self.proc and self.proc.poll() is None:
            self.proc.kill()
        self.canceled = True


class ClustalJob(AbstractAlignmentJob):
    """
    Class to run a clustal job:
    """

    def __init__(self,
                 aln,
                 second_aln=None,
                 profile_mode='profile',
                 matrix=None,
                 gapopen=None,
                 gapext=None,
                 quicktree=True,
                 output_fname=None,
                 clustering=None):
        """
        This class can use one of three available alignment modes:
            - regular multiple sequence alignment,
            - profile-profile alignment where two alignments are aligned to each
              other, but both alignments remain unchanged,
            - profile-sequence alignment where several sequences are iteratively
              aligned to existing alignment.

        :param aln: Input sequence alignment.
        :type aln: ProteinAlignment

        :param second_aln: Second alignment for profile-profile and profile-sequence
            alignment.
        :type second_aln: ProteinAlignment

        :param profile_mode: Determines profile alignment mode. Can be "profile" for
            profile-profile alignment, or "sequences" for profile-sequence alignment.
        :type profile_mode: str

        :param matrix: substitution matrix family ("BLOSUM", "PAM", "GONNET", "ID")
            If None, default matrix (GONNET) is used.
        :type matrix: str or None

        :param gapopen: Gap opening penalty. If None, default value is used.
        :type gapopen: float or None

        :param gapext: Gap extension penalty. If None, default value is used.
        :type gapext: float or None

        :param quicktree: Use fast algorithm for building guide tree.
        :type quicktree: bool

        :param output_fname: Path of file to save clustalw2 std output to. If None,
            output is not saved.
        :type output_fname: str or None

        """
        super().__init__(aln, second_aln)
        self.profile_mode = profile_mode
        self.matrix = matrix
        self.setGapPenalties(gapopen, gapext)
        self.quicktree = quicktree
        self.name_seq_mapping = {}
        self.dnd_string = ''
        self.output_fname = output_fname
        self.clustering = clustering

    def run(self):
        """
        Run the clustal job.

        :raises: RuntimeError if no clustal executable can be found

        :return: Output alignment. The sequences are output in the
            same order as input. Sequence attributes are preserved. The tree is in
            Newick format. This function returns None if the job is
            canceled.
        :rtype: `ProteinAlignment` or None
        """
        clustalw_exe = get_clustal_path()
        if not clustalw_exe:
            raise RuntimeError("Could not find Clustalw2 executable!")

        aln = self.aln
        tmpdir = tempfile.mkdtemp()
        inp_fname = os.path.join(tmpdir, 'input.aln')
        inp_fname2 = None
        out_fname = os.path.join(tmpdir, 'output.aln')

        # write the alignment using unique names
        self.name_seq_mapping = seqio.ClustalAlignmentWriter.write(
            aln, inp_fname, use_unique_names=True)

        if self.second_aln and self.profile_mode in ["profile", "sequences"]:
            # create second input file
            inp_fname2 = os.path.join(tmpdir, 'input2.aln')
            # write the alignment using unique names
            seqio.ClustalAlignmentWriter.write(
                self.second_aln, inp_fname2, use_unique_names=True)

            dnd_fname = os.path.join(tmpdir, 'input2.dnd')
        else:
            dnd_fname = os.path.join(tmpdir, 'input.dnd')

        command = self._makeCommand(
            clustalw_exe=clustalw_exe,
            inp_fname=inp_fname,
            out_fname=out_fname,
            inp_fname2=inp_fname2)

        self._run(command, output_fname=self.output_fname)
        if self.canceled:
            return None
        # read the aligned sequences
        new_alignment = seqio.ClustalAlignmentReader.read(out_fname)

        # read tree file
        with open(dnd_fname, "r") as f:
            self.dnd_string = f.read()

        # clean temporary files
        shutil.rmtree(tmpdir)

        return new_alignment

    def _makeCommand(self, clustalw_exe, inp_fname, out_fname, inp_fname2=None):
        command = [clustalw_exe, "-outfile=" + out_fname, "-outorder=input"]

        if self.quicktree:
            command.append("-quicktree")
        if self.matrix:
            command.append("-matrix=" + self.matrix)
        if self.gapopen:
            command.append("-gapopen=" + str(self.gapopen))
        if self.gapext:
            command.append("-gapext=" + str(self.gapext))

        if inp_fname2 is None:
            command.append("-infile=" + inp_fname)
        else:
            # Perform profile alignment.
            command.append("-" + self.profile_mode)
            command.append("-profile1=" + inp_fname)
            command.append("-profile2=" + inp_fname2)

        if self.clustering:
            command.append(f'-clustering={self.clustering}')
        return command