Source code for schrodinger.tasks.taskmanager
import os
from schrodinger.models import mappers
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import tasks
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.utils import qt_utils
from schrodinger.utils import scollections
class _TaskNamer(parameters.CompoundParam):
"""
Component class for TaskManager. Is responsible for choosing the name for
the next task run from the task manager. The base_name is used to construct
standard names which automatically increment.
"""
base_name = parameters.StringParam()
name = parameters.StringParam()
taken_names = parameters.ListParam()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enableDirBasedNaming(False)
self.base_nameChanged.connect(self.updateName)
self.taken_namesChanged.connect(self.updateName)
def enableDirBasedNaming(self, name_by_dir):
"""
Set whether to update names baesd on `self.taken_names` or by files
and directories in the current working directory.
"""
self._name_by_dir = name_by_dir
def updateName(self):
self.name = self._getUniqueName()
def _getUniqueName(self):
unique_name = jobnames.update_jobname(
self.base_name, self.base_name, name_list=self._getTakenNames())
return unique_name
def _getTakenNames(self):
"""
This simple wrapper method is provided so behavior can be customized in
subclasses. For example, for job tasks, "taken names" are determined
from the contents of the CWD.
"""
if self._name_by_dir:
return os.listdir('.')
else:
return self.taken_names
[docs]@qt_utils.add_enums_as_attributes(tasks.Status)
class TaskManager(mappers.TargetMixin, parameters.CompoundParam):
status = parameters.EnumParam(tasks.Status, tasks.Status.WAITING)
namer = _TaskNamer()
taskStarted = QtCore.pyqtSignal(tasks.AbstractTask)
taskEnded = QtCore.pyqtSignal(tasks.AbstractTask)
newTaskLoaded = QtCore.pyqtSignal(tasks.AbstractTask)
task_list = parameters.ParamListParam(tasks.AbstractTask)
taskStatusChanged = QtCore.pyqtSignal(tasks.AbstractTask, tasks.Status)
NAME_NOT_UNIQUE_MSG = 'Task name is not unique.'
TaskClass = parameters.Param()
[docs] def __init__(self, TaskClass, directory_management=False, **kwargs):
super().__init__(TaskClass=TaskClass, **kwargs)
self._next_task = None
self._directory_management = directory_management
if directory_management:
self.namer.enableDirBasedNaming(True)
self.setStandardBaseName(self.TaskClass.__name__)
self.resetBaseName()
self._started_tasks = scollections.IdSet()
self.loadNewTask()
[docs] def resetBaseName(self):
self.namer.base_name = self._standard_base_name
[docs] def setStandardBaseName(self, new_base_name):
"""
Set the standard base name for tasks. This is the base name that
will be used whenever `resetBaseName` is called.
"""
self._standard_base_name = new_base_name
self.resetBaseName()
[docs] def setCustomBaseName(self, new_base_name):
"""
Set a custom base name to use for tasks.
"""
self.namer.base_name = new_base_name
[docs] def uniquifiyTaskName(self):
"""
Update the next tasks name to a unique name. A unique name is defined
as a name that hasn't been previously used by this taskmanager and
one that doesn't already have a directory with the same name in
the current directory.
"""
self.namer.updateName()
self.nextTask().name = self.namer.name
[docs] def wait(self, timeout=None):
# Call the module-level wait function
return tasks._wait(self, timeout=timeout)
def _onTaskStatusChanged(self, status):
task = self.sender()
self.taskStatusChanged.emit(task, status)
if status in tasks.FINISHED_STATUSES:
self.taskEnded.emit(task)
if task not in self._started_tasks and status != tasks.Status.WAITING:
self._started_tasks.add(task)
self.task_list.append(task)
self.taskStarted.emit(task)
self.namer.taken_names.append(task.name)
self.loadNewTask()
self.status = self._getAggregateStatus()
[docs] def removeTask(self, task):
self.task_list.remove(task)
task.statusChanged.disconnect(self._onTaskStatusChanged)
def _getAggregateStatus(self):
#TODO: consider moving status logic into a component class
if all(task.status is tasks.Status.WAITING for task in self.task_list):
return tasks.Status.WAITING
elif any(
task.status is tasks.Status.RUNNING for task in self.task_list):
return tasks.Status.RUNNING
elif any(task.status is tasks.Status.FAILED for task in self.task_list):
return tasks.Status.FAILED
elif any(task.status is tasks.Status.DONE for task in self.task_list):
return tasks.Status.DONE
else:
print([task.status for task in self.task_list])
assert False, 'This shouldnt be reachable'
[docs] def loadNewTask(self):
old_task = self._next_task
if old_task is None:
new_task = self.TaskClass()
else:
new_task = old_task.replicate()
self._next_task = new_task
self._connectTask(new_task)
if old_task is not None:
if isinstance(old_task.owner(), parameters.CompoundParam):
setattr(old_task.owner(), old_task.paramName(), new_task)
self.uniquifiyTaskName()
self.newTaskLoaded.emit(new_task)
[docs] def nextTask(self):
if self.TaskClass is None:
raise RuntimeError("Can't get next task before a task class has "
"been set.")
if self._next_task is None:
self.loadNewTask()
return self._next_task
[docs] def isStartable(self):
return True
[docs] def startNextTask(self):
self.nextTask().start()
def _connectTask(self, task):
if self._directory_management:
task.specifyTaskDir(tasks.AUTO_TASKDIR)
task.statusChanged.connect(self._onTaskStatusChanged)
[docs] def targetGetValue(self):
return self._next_task
[docs] def targetSetValue(self, value):
if self._next_task is value:
return
self.setNextTask(value)
[docs] def setNextTask(self, task):
if task is not self._next_task:
if not isinstance(task, self.TaskClass):
raise ValueError
self._next_task = task
self._connectTask(task)
self.newTaskLoaded.emit(task)
self.uniquifiyTaskName()
[docs] def __len__(self):
return len(self.task_list)
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self.TaskClass.__name__)