Source code for schrodinger.job.jobhandler

import functools
import os
import random
import string
from typing import Callable
from typing import List
from typing import Optional

import schrodinger
from schrodinger.infra import jobhub
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import application
from schrodinger.utils import mmutil
from schrodinger.utils import qt_utils

from . import jobcontrol
from .download import download_job


class _AbstractJobHandler(QtCore.QObject):
    """
    Base class for job handlers.

    :ivar jobCompleted: Signal emitted when the job is completed and downloaded.
        Under JOB_SERVER, a job can be complete but not downloaded, but this
        signal will only be emitted when a job has finished downloading.
    """

    jobCompleted = QtCore.pyqtSignal(jobcontrol.Job)
    jobDownloadFailed = QtCore.pyqtSignal(jobcontrol.Job, str)
    jobProgressChanged = QtCore.pyqtSignal(jobcontrol.Job, int, int, str)

    def __init__(self,
                 cmd: List[str],
                 viewname: Optional[str] = None,
                 launch_dir: Optional[str] = None):
        super().__init__()

        if viewname is None:
            # If no viewname is provided, we just give the job a random string
            # for a viewname
            viewname = "".join(
                random.choices(string.ascii_uppercase + string.digits, k=32))
        self._launch_dir = launch_dir if launch_dir else os.getcwd()
        self.viewname = viewname
        self.job = None
        self._setupWaitLoop()
        self._setupJobCmd(cmd)
        for signal, slot in self._getJobManagerSignalsAndSlots():
            signal.connect(slot)
        self._completed = False

    def _setupJobCmd(self, cmd_list: List[str]):
        """
        Construct a job command adding a viewname and project to a job command
        if they are not already present.
        """
        if "-VIEWNAME" not in cmd_list:
            cmd_list += ["-VIEWNAME", self.viewname]
        # Calling schrodinger.get_maestro() directly to reduce the risk of
        # a unittest leaking a maestro mock
        maestro = schrodinger.get_maestro()
        if maestro and "-PROJ" not in cmd_list:
            pt = maestro.project_table_get()
            prj_name = pt.project_name
            cmd_list += ["-PROJ", prj_name]
        self._cmd_list = cmd_list

    def _setupWaitLoop(self):
        self._wait_loop = QtCore.QEventLoop()
        self.jobCompleted.connect(self._wait_loop.quit)

    def _getJobManagerSignalsAndSlots(self):
        jmgr = jobhub.get_job_manager()
        sas = [(jmgr.jobProgressChanged, self._onJobProgressChanged)]
        if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
            # with JOB_SERVER off, the job is downloaded when it's completed so
            # no special handling is needed
            done_signal = jmgr.jobCompleted
            done_slot = self._onJobDone
        # Calling schrodinger.get_maestro() directly to reduce the risk of
        # a unittest leaking a maestro mock
        elif schrodinger.get_maestro():
            # jobDownloaded is emitted when maestro downloads the job
            done_signal = jmgr.jobDownloaded
            done_slot = self._onJobDone
        else:
            done_signal = jmgr.jobCompleted
            # Outside of maestro, need special handling for completed job
            done_slot = self._onJobFinishedRunning
        sas.append((done_signal, done_slot))
        return sas

    # ==========================================================================
    # _AbstractJobHandler API
    # ==========================================================================

    def launchJob(self) -> jobcontrol.Job:
        """
        Subclasses define how to launch the job.
        """
        raise NotImplementedError

    def wait(self):
        if self.job is None:
            raise RuntimeError("Can't wait until the job has been launched")
        if self.job.isComplete():
            return
        self._wait_loop.exec()

    # ==========================================================================
    # _AbstractJobHandler slots
    # ==========================================================================

    def _onJobFinishedRunning(self, job: jobcontrol.Job):
        """
        Download the job if it succeeded. Only called outside of maestro.

        Called when job finishes running. Under JOB_SERVER, a job completes
        when the (remote) calculations finish but before the results have been
        downloaded.
        """
        if self.job is None or self._completed:
            return
        if job.JobId == self.job.JobId:
            try:
                download_error = download_job(job.JobId)
            except Exception as exc:
                self.jobDownloadFailed.emit(job, str(exc))
            else:
                if download_error:
                    self.jobDownloadFailed.emit(job, download_error)
            finally:
                self._onJobDone(job)

    def _onJobDone(self, job: jobcontrol.Job):
        """
        Called after job is both finished running and downloaded
        """
        if self.job is None or self._completed:
            return
        if job.JobId == self.job.JobId:
            self.job = job
            self._completeJob()

    def _onJobProgressChanged(self, job: jobcontrol.Job, current_step: int,
                              total_steps: int, progress_msg: str):
        if self._completed:
            return
        if (not self.job or job.JobId != self.job.job_id):
            return
        self.job = job
        self.jobProgressChanged.emit(self.job, current_step, total_steps,
                                     progress_msg)

    # ==========================================================================
    # _AbstractJobHandler implementation methods
    # ==========================================================================

    def _completeJob(self):
        """
        Mark the current job as completed (i.e. downloaded).
        """
        if self._completed:
            return
        self._completed = True
        self.jobCompleted.emit(self.job)
        for signal, slot in self._getJobManagerSignalsAndSlots():
            signal.disconnect(slot)

    def __del__(self):
        if hasattr(self, "_wait_loop"):
            self._wait_loop.quit()


[docs]class JobHandler(_AbstractJobHandler): """ A Job Handler for running and waiting on jobs. To use, initialize with a list of strings that you would use with JobViewFilter.launchJob. Then connect `my_jobhandler.jobCompleted` to any slots that need to be executed after the job is finished. The job handler also has a wait method that will pause execution of the current event until the job is finished. Note that during the wait, other ui events will continue to be processed. """ # ========================================================================== # JobHandler API # ==========================================================================
[docs] def launchJob(self) -> jobcontrol.Job: """ Launch the job. An event loop is executed while job is being launched. :return Job object for the started job. :rtype: jobcontrol.Job :raises JobLaunchFailure: if the job failed to start. NOTE: unlike jobcontrol.launch_job(), no dialog is shown on failue, so calling code is responsible by informing the user of the failure. """ if self.job is not None: raise RuntimeError("Job has already been launched") self.job = jobcontrol.launch_job( self._cmd_list, launch_dir=self._launch_dir, print_output=True) return self.job
[docs]class AsyncJobHandler(_AbstractJobHandler): """ A jobhandler that launches jobs asynchronously (i.e. launchJob doesn't wait for the job to actually start before returning). """ jobStarted = QtCore.pyqtSignal(jobcontrol.Job) jobLaunchFailed = QtCore.pyqtSignal(Exception)
[docs] def __init__(self, *args, **kwargs): """ See _AbstractJobHandler for arguments. """ super().__init__(*args, **kwargs) self.err_message = None
# ========================================================================== # AsyncJobHandler API # ==========================================================================
[docs] def launchJob(self) -> jobcontrol.Job: """ Launch the job. An event loop is executed while job is being launched. :return Job object for the started job. :rtype: jobcontrol.Job :raises JobLaunchFailure: if the job failed to start. NOTE: unlike jobcontrol.launch_job(), no dialog is shown on failue, so calling code is responsible by informing the user of the failure. """ if self.job is not None: raise RuntimeError("Job has already been launched") self._launchJob(self._cmd_list, self._launch_dir)
# ========================================================================== # AsyncJobHandler slots # ========================================================================== def _onJobStarted(self, launched_job: jobcontrol.Job): job = launched_job if not job: raise jobcontrol.JobLaunchFailure( 'Launch failed (event loop killed)') self.job = job self.err_message = None if self.job.isComplete(): self._completeJob() self.jobStarted.emit(job) def _onJobLaunchFailed(self, launch_err): err_message = launch_err if err_message: self.jobLaunchFailed.emit(jobcontrol.JobLaunchFailure(err_message)) self.err_message = err_message # ========================================================================== # AsyncJobHandler implementation methods # ========================================================================== def _launchJob(self, job_cmd: List[str], launch_dir: str): """ Launch a job asynchronously. Returns before command is started. :param job_cmd: launch command for the job. :param launch_dir: current working directory on job launch """ viewname = "" # this parameter is unused MAE-45178 job_launcher = jobhub.JobLauncher( jobhub.JobCommand(job_cmd, viewname, launch_dir)) job_launcher.jobStarted.connect(self._onJobStarted) job_launcher.jobLaunchFailed.connect(self._onJobLaunchFailed) self.job_launcher = job_launcher job_launcher.launch() # Confirm that neither signal was emitted yet assert self.job is None assert self.err_message is None
[docs]def job_incorporated(job_id: str, first_entry_id: int, last_entry_id: int): """ The function which is called after successful incorporation of the job from maestro. It is called only if job output is incorporated through maestro job incorporation. If individual panels have their own incorporation handler registered via maestro.job_incorporation_function_add(), and the panel is currently open and has handled the incorporation, this function will not be called by Maestro. :param job_id: The id of the incorporated job :param first_entry_id: The id of the first entry imported in the project from the output structure file associated with the given job. :param last_entry_id: The id of the last entry imported in the project from the output structure file associated with the given job. """ try: job = jobcontrol.Job(job_id) except RuntimeError: # If job record is missing return viewname = job.Viewname if not viewname: return from schrodinger.maestro import maestro_job_callbacks func = maestro_job_callbacks.maestro_job_incorporated_callbacks.get( viewname) if func is not None: func(job_id, first_entry_id, last_entry_id)
# =============================================================================== # Convenience functions # =============================================================================== _active_handlers = []
[docs]def launch_job_with_callback( cmd: List[str], callback: Callable[[jobcontrol.Job], None], launch_dir: Optional[str] = None) -> jobcontrol.Job: """ Launch the given job, and call the specified callback when the job completes (either successfully or with a failure). :param cmd: Command list :param callback: Function to call when the job completes (either successfully or with a failure), with Job object as the only parameter. :param launch_dir: Directory to launch job under :raises RuntimeError: if the job fails to start. """ jhandler = JobHandler(cmd, launch_dir=launch_dir) _active_handlers.append(jhandler) def job_completed(job: jobcontrol.Job): if jhandler in _active_handlers: _active_handlers.remove(jhandler) callback(jhandler.job) jhandler.jobCompleted.connect(job_completed) jhandler.launchJob() return jhandler.job