Source code for schrodinger.tasks.taskmanager

import os

from schrodinger.models import mappers
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import tasks
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.utils import qt_utils
from schrodinger.utils import scollections


class _TaskNamer(parameters.CompoundParam):
    """
    Component class for TaskManager. Is responsible for choosing the name for
    the next task run from the task manager. The base_name is used to construct
    standard names which automatically increment.
    """
    base_name = parameters.StringParam()
    name = parameters.StringParam()
    taken_names = parameters.ListParam()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.enableDirBasedNaming(False)
        self.base_nameChanged.connect(self.updateName)
        self.taken_namesChanged.connect(self.updateName)

    def enableDirBasedNaming(self, name_by_dir):
        """
        Set whether to update names baesd on `self.taken_names` or by files
        and directories in the current working directory.
        """
        self._name_by_dir = name_by_dir

    def updateName(self):
        self.name = self._getUniqueName()

    def _getUniqueName(self):
        unique_name = jobnames.update_jobname(
            self.base_name, self.base_name, name_list=self._getTakenNames())
        return unique_name

    def _getTakenNames(self):
        """
        This simple wrapper method is provided so behavior can be customized in
        subclasses. For example, for job tasks, "taken names" are determined
        from the contents of the CWD.
        """
        if self._name_by_dir:
            return os.listdir('.')
        else:
            return self.taken_names


[docs]@qt_utils.add_enums_as_attributes(tasks.Status) class TaskManager(mappers.TargetMixin, parameters.CompoundParam): status = parameters.EnumParam(tasks.Status, tasks.Status.WAITING) namer = _TaskNamer() taskStarted = QtCore.pyqtSignal(tasks.AbstractTask) taskEnded = QtCore.pyqtSignal(tasks.AbstractTask) newTaskLoaded = QtCore.pyqtSignal(tasks.AbstractTask) task_list = parameters.ParamListParam(tasks.AbstractTask) taskStatusChanged = QtCore.pyqtSignal(tasks.AbstractTask, tasks.Status) NAME_NOT_UNIQUE_MSG = 'Task name is not unique.' TaskClass = parameters.Param()
[docs] def __init__(self, TaskClass, directory_management=False, **kwargs): super().__init__(TaskClass=TaskClass, **kwargs) self._next_task = None self._directory_management = directory_management if directory_management: self.namer.enableDirBasedNaming(True) self.setStandardBaseName(self.TaskClass.__name__) self.resetBaseName() self._started_tasks = scollections.IdSet() self.loadNewTask()
[docs] def resetBaseName(self): self.namer.base_name = self._standard_base_name
[docs] def setStandardBaseName(self, new_base_name): """ Set the standard base name for tasks. This is the base name that will be used whenever `resetBaseName` is called. """ self._standard_base_name = new_base_name self.resetBaseName()
[docs] def setCustomBaseName(self, new_base_name): """ Set a custom base name to use for tasks. """ self.namer.base_name = new_base_name
[docs] def uniquifiyTaskName(self): """ Update the next tasks name to a unique name. A unique name is defined as a name that hasn't been previously used by this taskmanager and one that doesn't already have a directory with the same name in the current directory. """ self.namer.updateName() self.nextTask().name = self.namer.name
[docs] def wait(self, timeout=None): # Call the module-level wait function return tasks._wait(self, timeout=timeout)
def _onTaskStatusChanged(self, status): task = self.sender() self.taskStatusChanged.emit(task, status) if status in tasks.FINISHED_STATUSES: self.taskEnded.emit(task) if task not in self._started_tasks and status != tasks.Status.WAITING: self._started_tasks.add(task) self.task_list.append(task) self.taskStarted.emit(task) self.namer.taken_names.append(task.name) self.loadNewTask() self.status = self._getAggregateStatus()
[docs] def removeTask(self, task): self.task_list.remove(task) task.statusChanged.disconnect(self._onTaskStatusChanged)
def _getAggregateStatus(self): #TODO: consider moving status logic into a component class if all(task.status is tasks.Status.WAITING for task in self.task_list): return tasks.Status.WAITING elif any( task.status is tasks.Status.RUNNING for task in self.task_list): return tasks.Status.RUNNING elif any(task.status is tasks.Status.FAILED for task in self.task_list): return tasks.Status.FAILED elif any(task.status is tasks.Status.DONE for task in self.task_list): return tasks.Status.DONE else: print([task.status for task in self.task_list]) assert False, 'This shouldnt be reachable'
[docs] def loadNewTask(self): old_task = self._next_task if old_task is None: new_task = self.TaskClass() else: new_task = old_task.replicate() self._next_task = new_task self._connectTask(new_task) if old_task is not None: if isinstance(old_task.owner(), parameters.CompoundParam): setattr(old_task.owner(), old_task.paramName(), new_task) self.uniquifiyTaskName() self.newTaskLoaded.emit(new_task)
[docs] def nextTask(self): if self.TaskClass is None: raise RuntimeError("Can't get next task before a task class has " "been set.") if self._next_task is None: self.loadNewTask() return self._next_task
[docs] def isStartable(self): return True
[docs] def startNextTask(self): self.nextTask().start()
def _connectTask(self, task): if self._directory_management: task.specifyTaskDir(tasks.AUTO_TASKDIR) task.statusChanged.connect(self._onTaskStatusChanged)
[docs] def targetGetValue(self): return self._next_task
[docs] def targetSetValue(self, value): if self._next_task is value: return self.setNextTask(value)
[docs] def setNextTask(self, task): if task is not self._next_task: if not isinstance(task, self.TaskClass): raise ValueError self._next_task = task self._connectTask(task) self.newTaskLoaded.emit(task) self.uniquifiyTaskName()
[docs] def __len__(self): return len(self.task_list)
def __repr__(self): return '<%s: %s>' % (self.__class__.__name__, self.TaskClass.__name__)