Source code for schrodinger.tasks.queue

import enum
import functools
import os

from schrodinger.job import queue
from schrodinger.job import jobcontrol
from schrodinger.job.queue import NOLIMIT
from schrodinger.Qt import QtCore
from schrodinger.tasks import tasks
from schrodinger.tasks import jobtasks
from schrodinger.utils import scollections
from schrodinger.utils import qt_utils

#===============================================================================
# Task Queue
#===============================================================================


[docs]class TaskQueue(tasks.SignalTask): """ A task that runs a queue of tasks. The TaskQueue is done when all its added tasks have completed, regardless of whether they completed successfully or failed. To use, add tasks with addTask and then start the task queue. """ queuedTaskFinished = QtCore.pyqtSignal(object) queueDone = QtCore.pyqtSignal() max_running_tasks: int = 4
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.taskDone.connect(self.queueDone) self._running_tasks = scollections.IdSet() self._waiting_tasks = [] self._tasks = [] self._stop_on_empty_queue = True self._done_adding = False
[docs] def addTask(self, task): self._tasks.append(task) self._waiting_tasks.append(task) if self.isRunning(): self._startMoreTasks()
[docs] def getTasks(self): """ Return all tasks in the queue. :rtype: tuple[tasks.AbstractTask] """ return tuple(self._tasks)
[docs] def setUpMain(self): self._running_tasks = scollections.IdSet() self._waiting_tasks = list(self._tasks) self._startMoreTasks()
@tasks.SignalTask.guard_method def _onTaskFinished(self): task = self.sender() self._running_tasks.remove(task) self.queuedTaskFinished.emit(task) self._startMoreTasks() @tasks.SignalTask.guard_method def _startMoreTasks(self): if not self._running_tasks and not self._waiting_tasks: if self._stop_on_empty_queue: self.mainDone.emit() while len(self._running_tasks) < self.max_running_tasks: try: task = self._waiting_tasks.pop(0) except IndexError: break task.taskDone.connect(self._onTaskFinished) task.taskFailed.connect(self._onTaskFinished) self._running_tasks.add(task) QtCore.QTimer.singleShot(0, task.start)
#=============================================================================== # TaskDJ #=============================================================================== class _TaskJob(queue.BaseJob): TASK_JOB_STATUS_MAP = { tasks.Status.WAITING: queue.JobState.WAITING, tasks.Status.RUNNING: queue.JobState.ACTIVE, tasks.Status.DONE: queue.JobState.DONE, tasks.Status.FAILED: queue.JobState.FAILED_RETRYABLE } def __init__(self, task): super(_TaskJob, self).__init__() self._task = task self.name = task.name def doCommand(self): self._task.start() def update(self): self.state = self.TASK_JOB_STATUS_MAP[self._task.status] def retryFailure(self, max_retries=0): """ Called by JobDJ to determine whether we should retry this failed task. :param max_retries: the max number of retries allowed by the TaskDJ :return: whether to retry """ return self.num_failures <= max_retries
[docs]class TaskDJ(queue.JobDJ): """ A subclass of JobDJ that supports running tasks. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._updated_jobs = [] self._event_loop = QtCore.QEventLoop() self._job_dj_complete = False self._completed_tasks = scollections.IdSet()
[docs] def addTask(self, task, limit_cpus=False): """ For 2021-1 release only: limit_cpus: If False, TaskDJ will submit the task as soon as it starts running. If False (and task is a job task), it will be submitted only as CPUs become available. """ task_job = _TaskJob(task) if limit_cpus and jobtasks.is_jobtask(task): # Setting host ensures that TaskDJ doesn't use more than # requested number of CPUs. This fixes PPW2 issue PPREP-1674 task_job.host = task.job_config.host_settings.host.name self.addJob(task_job)
def _startJobDJ(self): @qt_utils.exit_event_loop_on_exception def callback(job, *, event_loop=None): self._updated_jobs.append(job) event_loop.quit() callback = functools.partial(callback, event_loop=self._event_loop) @qt_utils.exit_event_loop_on_exception def run_jobdj(*, event_loop=None): try: self.run(status_change_callback=callback) finally: self._job_dj_complete = True event_loop.quit() run_jobdj = functools.partial(run_jobdj, event_loop=self._event_loop) self._job_dj_complete = False QtCore.QTimer.singleShot(0, run_jobdj)
[docs] def updatedTasks(self): self._startJobDJ() while not self._job_dj_complete: yield from self._processUpdatedTasks() self._event_loop.exec() yield from self._processUpdatedTasks() exc = qt_utils.get_last_exception() if exc: raise exc
def _processUpdatedTasks(self): """ Yield the tasks from the jobs in `_updated_jobs`. Each task is yielded once when it starts and once when it completes. Calling this method empites out `_updated_jobs`. """ for _ in range(len(self._updated_jobs)): job = self._updated_jobs.pop() if job._task.status is job._task.DONE: # Only yield a task if hasn't already been yielded as DONE before if job._task in self._completed_tasks: continue self._completed_tasks.add(job._task) yield job._task
#=============================================================================== # Utility functions #===============================================================================
[docs]class AutoFileMode(enum.IntEnum): FAILED_LOG = enum.auto() # Only log file from failed tasks FAILED_ALL = enum.auto() # The entire taskdir for failed tasks NONE = enum.auto() # No files LOG = enum.auto() # Only the log file from all tasks ALL = enum.auto() # The entire taskdir for all tasks
[docs]def run_tasks_on_dj(task_list, dj=None, autoname=True, basename=None, auto_file_mode=AutoFileMode.FAILED_LOG): """ This functions provides a convenient way of launching multiple tasks in parallel while taking care of boilerplate and prevents common mistakes. This will typically be used when launching subjobs from a driver job. By default, this function will: - Give each task a unique name - Give each task its own directory - Create a new TaskDJ - Add all tasks to the TaskDJ - Run the TaskDJ - Register the log files of any failed tasks with jobserver :param task_list: the tasks to run :type task_list: list of tasks.AbstractTask :param dj: a TaskDJ instance to use instead of creating a new one. Use this option to provide a TaskDJ with non-standard options :type dj: TaskDJ :param autoname: whether to automatically give each task a unique name :type autoname: bool :param basename: basename to be used in autonaming each task. Has no effect if autoname is set to False :type basename: str :param auto_file_mode: what subtask files to register with the parent job to be copied back by jobcontrol. Only has an effect if this function is called from inside a jobcontrol backend. :type auto_file_mode: AutoFileMode :return: the list of tasks that completed successfully. This return value is useful for downstream processing, so the caller doesn't need to filter out failed tasks :rtype: list of tasks.AbstractTask """ if task_list == []: return [] dj = setup_dj_tasks(task_list, dj=dj, autoname=autoname, basename=basename) dj.run() succeeded_tasks = [] for task in task_list: if task.status is task.DONE: succeeded_tasks.append(task) auto_register_files(task_list, auto_file_mode=auto_file_mode) return succeeded_tasks
[docs]def setup_dj_tasks(task_list, dj=None, autoname=True, basename=None): """ Sets up a TaskDJ with a collection of tasks. See run_tasks_on_dj for details """ if autoname: autoname_tasks(task_list, basename=basename) if dj is None: dj = TaskDJ(max_failures=NOLIMIT) for task in task_list: task.specifyTaskDir(tasks.AUTO_TASKDIR) dj.addTask(task) return dj
[docs]def autoname_tasks(task_list, basename=None): """ Sets unique names on all the tasks provided with an incrementing integer suffix. See run_tasks_on_dj for details. """ if basename is None: basename = task_list[0].name for i, task in enumerate(task_list): task.name = f'{basename}_{i}'
[docs]def auto_register_files(task_list, auto_file_mode=AutoFileMode.FAILED_LOG): """ Automatically registers files from the subtask with the parent job (if applicable). """ backend = jobcontrol.get_backend() if backend is not None and auto_file_mode is not AutoFileMode.NONE: for task in task_list: logfile = task.getTaskFilename(f'{task.name}.log') taskdir = task.getTaskDir() if auto_file_mode is AutoFileMode.LOG: backend.addOutputFile(logfile) elif auto_file_mode is AutoFileMode.ALL: backend.addOutputFile(taskdir) if task.status is task.FAILED: if auto_file_mode is AutoFileMode.FAILED_LOG: backend.addOutputFile(logfile) if auto_file_mode is AutoFileMode.FAILED_ALL: backend.addOutputFile(taskdir)