import enum
import os
import typing
from schrodinger.job import jobwriter
from schrodinger.job import jobcontrol
from schrodinger.job import jobhandler
from schrodinger.job import launchapi
from schrodinger.job import launchparams
from schrodinger.models import jsonable
from schrodinger.models import parameters
from schrodinger.models import json
from schrodinger.tasks import cmdline
from schrodinger.tasks import hosts
from schrodinger.tasks import tasks
from schrodinger.tasks.hosts import strip_gpu_from_localhost
# Imported so other modules can access tasks features here.
from schrodinger.tasks.tasks import AFTER_TASKDIR
from schrodinger.tasks.tasks import AUTO_TASKDIR
from schrodinger.tasks.tasks import BEFORE_TASKDIR
from schrodinger.tasks.tasks import FINISHED_STATUSES
from schrodinger.tasks.tasks import TEMP_TASKDIR
from schrodinger.tasks.tasks import TaskFailure
from schrodinger.tasks.tasks import TaskFile
from schrodinger.tasks.tasks import TaskFolder
from schrodinger.tasks.tasks import TaskKilled
from schrodinger.tasks.tasks import postprocessor
from schrodinger.tasks.tasks import preprocessor
import schrodinger
maestro = schrodinger.get_maestro()
[docs]def is_jobtask(task):
"""
Utility function to check if an object is a jobtask.
"""
return isinstance(task, _AbstractJobMixin)
#===============================================================================
# Job Config - Hosts
#===============================================================================
[docs]class AllowedHostTypes(jsonable.JsonableEnum):
CPU_ONLY = enum.auto()
GPU_ONLY = enum.auto()
CPU_AND_GPU = enum.auto()
[docs]class HostParam(parameters.Param):
DataClass = hosts.Host
[docs] def __init__(self,
default_value='localhost',
allowed_types=AllowedHostTypes.CPU_ONLY,
*args,
**kwargs):
self.allowed_types = allowed_types
super().__init__(default_value, *args, **kwargs)
[docs]def get_hosts():
return hosts.get_hosts(excludeGPGPUs=False)
def _get_jobhost_name():
"""
If called from within a jobcontrol backend, this function will return the
name of the host this job is running on (i.e. the jobhost).
"""
if jobcontrol.get_backend():
hosts = jobcontrol.get_backend_host_list()
return hosts[0][0] if hosts else None
else:
return os.environ.get('JOBHOST')
[docs]def get_default_host(allowed_host_types=AllowedHostTypes.CPU_AND_GPU):
"""
Gets the default host for a job to run on. Which will be the jobhost if
this function is called from within a jobcontrol backend, or the localhost
otherwise. If the specified host type is GPU_ONLY, and the localhost and
jobhost don't have GPUs, then the returned host will be the first gpu-enabled
host returned from `get_hosts()`.
"""
default_hostname = _get_jobhost_name() or 'localhost'
default_host = hosts.get_host_by_name(default_hostname)
if allowed_host_types is AllowedHostTypes.GPU_ONLY and not default_host.num_gpus:
default_host = next((host for host in get_hosts() if host.num_gpus),
None)
return default_host
"""
NOT_SUPPORTED is a sentinel that can be used to signal that a host or
subjob are not supported for a particular host settings. For example::
class MyJobConfig(JobConfig):
host_settings = HostSettings(allowed_host_types=NOT_SUPPORTED)
or:
class MyJobConfig(JobConfig):
host_settings = HostSettings(num_subjobs=NOT_SUPPORTED)
"""
NOT_SUPPORTED = None
[docs]class HostSettings(parameters.CompoundParam):
host = HostParam()
num_subjobs: int = None
allowed_host_types: AllowedHostTypes = AllowedHostTypes.CPU_ONLY
[docs] def toCmdArg(self):
if self.host is None:
return ''
host_arg = self.host.name
if self.num_subjobs is not None:
host_arg += ':' + str(self.num_subjobs)
return strip_gpu_from_localhost(host_arg)
[docs] def initializeValue(self):
if self.allowed_host_types is not None:
self.host = get_default_host(self.allowed_host_types)
#===============================================================================
# Job Config - Incorporation
#===============================================================================
[docs]class IncorporationMode(jsonable.JsonableEnum):
APPEND = 'append'
APPENDINPLACE = 'appendinplace'
REPLACE = 'replace'
IGNORE = 'ignore'
APPENDNEW = 'appendnew'
[docs]class IncorporationParam(parameters.EnumParam):
DEFAULT_ALLOWED_MODES = (
IncorporationMode.APPEND,
IncorporationMode.APPENDINPLACE,
IncorporationMode.IGNORE,
IncorporationMode.APPENDNEW,
)
[docs] def __init__(self, *args, allowed_modes=DEFAULT_ALLOWED_MODES, **kwargs):
super().__init__(IncorporationMode, *args, **kwargs)
self.allowed_modes = allowed_modes
INCORPORATION_MODE_MAP = {
IncorporationMode.APPEND: launchparams.ProjectDisposition.APPEND,
IncorporationMode.APPENDINPLACE: launchparams.ProjectDisposition.
APPEND, # FIXME - No direct mapping, see PANEL-17514
IncorporationMode.APPENDNEW: launchparams.ProjectDisposition.APPEND,
IncorporationMode.IGNORE: launchparams.ProjectDisposition.IGNORE,
}
#===============================================================================
# Job Config
#===============================================================================
[docs]class JobConfig(parameters.CompoundParam):
"""
Subclass JobConfig to customize what job settings are available for a given
jobtask. To disable an option, set an ordinary (non-param) class variable
with value None for that option.
Subclasses may add any arbitrary options as desired; it is the
responsibility of the task to handle those options.
"""
viewname: str = None # A default value will be assigned by the task
jobname: str
host_settings: HostSettings
incorporation = None # Override with an IncorporationParam to enable
@json.adapter(version=52085)
def _adaptHost(self, json_dict):
"""
In 52085, we switched from defining host on the toplevel of jobconfig
to inside a HostSettings param.
"""
json_dict['host_settings'] = {'host': json_dict.pop('host')}
return json_dict
[docs]def job_config_factory(allowed_host_types=AllowedHostTypes.CPU_ONLY,
default_incorp_mode=None,
supports_subjobs=False,
viewname=None):
"""
Generate JobConfig objects with typical options.
:param allowed_host_types: Whether this job accepts cpu hosts, gpu hosts,
or both. Pass None to disable remote hosts (always run on localhost)
:type allowed_host_types: AllowedHostTypes or None
:param default_incorp_mode: The default disposition. Pass None for jobs that
do not incorporate at all.
:type default_incorp_mode: IncorporationMode or None
:param supports_subjobs: whether this job can be split into subjobs
:type supports_subjobs: bool
:param viewname: what viewname should be used for this type of job
:type viewname: str or None
"""
# Have to give `viewname` a different variable name before assigning it
# inside of `NewJobConfig` otherwise you get a weird NameError.
default_viewname = viewname
default_allowed_host_types = allowed_host_types
class NewJobConfig(JobConfig):
if default_incorp_mode is not None:
incorporation = IncorporationParam(default_incorp_mode)
viewname: str = default_viewname
host_settings = HostSettings(
allowed_host_types=default_allowed_host_types)
def initializeValue(self):
super().initializeValue()
if not allowed_host_types:
self.host_settings.host = None
if supports_subjobs:
self.host_settings.num_subjobs = 0
return NewJobConfig()
#===============================================================================
# Job Task Execution Mixins
#===============================================================================
class _AbstractJobMixin(parameters.CompoundParamMixin):
"""
Base class for running tasks via job control.
Child classes must be mixed in with `AbstractCmdTask`.
"""
# If PROGRAM_NAME is not overridden, it will default to the class name.
# Used for specifying the program name field of the job visible in the
# job monitor.
PROGRAM_NAME = None
job_config: JobConfig
input: parameters.CompoundParam
job_id = parameters.NonParamAttribute()
_use_async_jobhandler: bool = False
@json.adapter(50002)
def configToJobConfigAdapter(cls, json_dict):
json_dict['job_config'] = json_dict.pop('config')
return json_dict
@classmethod
def configureParam(cls):
"""
@overrides: parameters.CompoundParam
"""
super().configureParam()
cls.setReference(cls.name, cls.job_config.jobname)
def initializeValue(self):
"""
@overrides: paramters.CompoundParam
"""
super().initializeValue()
self._write_mode = False
if self.job_config.viewname is None:
self.job_config.viewname = type(self).__name__
def inWriteMode(self):
return self._write_mode
@classmethod
def runFromCmdLine(cls):
"""
@overrides: tasks.AbstractTask
"""
return cmdline.run_jobtask_from_cmdline(cls)
@classmethod
def _populateClassParams(cls):
"""
@overrides: tasks.AbstractTask
"""
cls._convertNestedClassToDescriptor('JobConfig', 'job_config')
super()._populateClassParams()
def runCmd(self, cmd):
"""
@overrides: tasks.AbstractCmdTask
"""
wrapped_cmd = self._wrapCmd(cmd)
self._launchCmd(wrapped_cmd)
def runToCmd(self, skip_preprocessing=False):
"""
Does the same thing as start except it doesn't actually launch the job.
Instead it just returns the final job cmd.
Intended to be used for running jobtasks on JobDJ, which requires a
job cmd rather than a task.
"""
if not skip_preprocessing:
self.runPreprocessing()
return self._wrapCmd(self.makeCmd())
def write(self, skip_preprocessing=False):
self._write_mode = True
try:
cmd = self.runToCmd(skip_preprocessing=skip_preprocessing)
sh_fname = self.getTaskFilename(self.name + '.sh')
jobwriter.write_job_cmd(cmd, sh_fname, self.getTaskDir())
finally:
self._write_mode = False
def replicate(self):
"""
@overrides: tasks.AbstractTask
"""
old_task = self
new_task = super().replicate()
new_task.job_config.setValue(old_task.job_config)
return new_task
@property
def PROGRAM_NAME(self):
return type(self).__name__
def _launchCmd(self, cmd):
if self._use_async_jobhandler:
JobHandlerClass = jobhandler.AsyncJobHandler
else:
JobHandlerClass = jobhandler.JobHandler
jhandler = JobHandlerClass(
cmd,
launch_dir=self.getTaskDir(),
viewname=self.job_config.viewname)
# Need to keep a reference so that jobCompleted signal get emitted:
self._jhandler = jhandler
jhandler.jobDownloadFailed.connect(self._onJobDownloadFailed)
jhandler.jobCompleted.connect(self.__onJobCompletion)
jhandler.jobProgressChanged.connect(self._onJobProgressChanged)
if self._use_async_jobhandler:
jhandler.jobStarted.connect(self._onJobStarted)
jhandler.jobLaunchFailed.connect(self._onJobLaunchFailed)
jhandler.launchJob()
if not self._use_async_jobhandler:
self._onJobStarted(jhandler.job)
def _onJobStarted(self, job):
self.job_id = job.job_id
def _getWrappedCmd(self):
cmd = self.makeCmd()
return self._wrapCmd(cmd)
def _wrapCmd(self, cmd):
return cmd
def _onJobProgressChanged(self, job, steps, total_steps, message):
self.progress = steps
self.max_progress = total_steps
self.progress_string = message
def writeStuZipFile(self):
# TODO
raise NotImplementedError
def _onJobDownloadFailed(self, job, error: str):
"""
Mark the task as failed if the job fails to download
"""
self._recordFailure(RuntimeError(error))
@typing.final
def __onJobCompletion(self):
with self.guard():
self._onJobCompletion()
self._finish()
def _onJobCompletion(self):
"""
Hook for subclasses to customize behavior on job completion
"""
pass
def _onJobLaunchFailed(self, exception):
with self.guard():
raise exception
if self.failure_info is not None:
self.status = self.FAILED
def kill(self):
"""
@overrides: tasks.AbstractTask
"""
if self.status is not self.RUNNING:
raise RuntimeError("Can't kill a task that's not running.")
self._jhandler.job.kill()
self._recordFailure(TaskKilled())
self._finish()
[docs]class JobBackendCmdMixin(_AbstractJobMixin):
"""
Base class for running backends that already support job control. Combine
with an AbstractCmdTask. To use, override `makeCmd`.
"""
[docs] def __init__(self, *args, cmd_list=None, **kwargs):
super().__init__(*args, **kwargs)
if cmd_list is not None:
# If this turns out to be too restrictive, feel free to remove it.
assert self.__class__ is CmdJobTask
self._cmd_list = cmd_list
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
Child classes must override.
"""
if self._cmd_list is not None:
return self._cmd_list
else:
return []
def _wrapCmd(self, cmd):
"""
@overrides: tasks._AbstractJobMixin
"""
cmd.extend(['-JOBNAME', self.name])
if self.job_config.host_settings.host is not None:
cmd.extend(['-HOST', self.job_config.host_settings.toCmdArg()])
if self.job_config.incorporation is not None and not self.inWriteMode():
cmd.extend(['-DISP', self.job_config.incorporation.value])
return cmd
def _onJobCompletion(self):
"""
@overrides: _AbstractJobMixin
"""
if not self._jhandler.job.succeeded():
self._recordFailure(RuntimeError("Job returned nonzero exit code."))
super()._onJobCompletion()
class _LaunchAPIMixin(_AbstractJobMixin):
"""
Base class for running python code under job control by wrapping with
launchapi. Combine with an AbstractCmdTask. To use, override `makeCmd`.
"""
input_files: list
output_files: list
incorporation_file: str
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._license_reservations = {}
def addLicenseReservation(self, license, num_tokens=1):
"""
Add a license reservation for this job. This information is used by job
control to ensure the job is only started once the required licenses
become available.
In a preprocessor, (i.e. before launching the backend), a reservation
should be added for each license that will be checked out directly by
that backend. Example::
class GlideTask(ComboJobTask):
@preprocessor
def _reserveGlideLicense(self):
# Reserve a Glide license.
self.addLicenseReservation(license.GLIDE_MAIN)
def mainFunction(self):
# Check out the Glide license
lic = license.License(license.GLIDE_MAIN)
# ... Do computations requiring Glide ...
lic.checkin()
Licenses that will be checked out by subjobs of this job do not need
reservations added here; subjobs are responsible for their own license
reservations.
:param license: a license that will be used by the backend
:type license: module-constant from schrodinger.utils.license (e.g.
license.AUTODESIGNER)
:param num_tokens: number of tokens for this license reservations
:type num_tokens: int
"""
self._license_reservations[license] = num_tokens
def addInputFile(self, filename):
"""
Register the given file with job control as an input file, so that
it gets copied to the job directory when the task starts.
:param filename: Input file path.
:type filename: str
"""
self.input_files.append(filename)
def addOutputFile(self, filename):
"""
Register the given file with job control as an output file, so that
it gets copied to the launch directory after the tasks completes.
:param filename: Input file path.
:type filename: str
"""
self.output_files.append(filename)
def initConcrete(self):
super().initConcrete()
self._input_dirs = []
def _legacyCmd(self, cmd):
extra_args = cmd[1:]
cmd = cmd[0:1]
cmd.extend(['-JOBNAME', self.name])
if self.job_config.host_settings.host is not None:
cmd.extend(['-HOST', self.job_config.host_settings.host.name])
cmd.extend(extra_args)
return cmd
def addInputDirectory(self, directory):
"""
Add an input directory to be copied over with the job.
"""
self._input_dirs.append(directory)
def wrapCmdInLaunchApi(self, cmd):
cmd = self._legacyCmd(cmd)
job_spec = self.makeJobSpecFromCmd(cmd)
launch_parameters = self.makeLaunchParams()
full_cmd = jobcontrol._get_job_spec_launch_command(
job_spec, launch_parameters)
return full_cmd
def makeJobSpecFromCmd(self, cmd):
cmd = list(map(str, cmd))
job_builder = launchapi.JobSpecificationArgsBuilder(cmd)
for license, num_tokens in self._license_reservations.items():
job_builder.addLicense(license, num_tokens)
logfilename = self._getLogFilename()
job_builder.setStderr(logfilename, stream=True)
job_builder.setStdout(logfilename, stream=True)
for filename in self.input_files:
job_builder.setInputFile(filename)
for dir in self._input_dirs:
job_builder.setInputDirectory(dir)
self.output_files = list(set(self.output_files))
incorp_mode = self.job_config.incorporation
for filename in self.output_files:
incorp = (filename == self.incorporation_file)
# While regular output files can be specified with absolute paths,
# StructureOutputFile (incorporate=True) must be local path:
filename = os.path.basename(filename)
job_builder.setOutputFile(filename, incorporate=incorp)
job_builder.setProgramName(self.PROGRAM_NAME)
return job_builder.getJobSpec()
def makeLaunchParams(self):
launch_parameters = launchparams.LaunchParameters()
if self.job_config.host_settings.host is not None:
launch_parameters.setHostname(
self.job_config.host_settings.host.name)
launch_parameters.setJobname(self.name)
if self.job_config.host_settings.num_subjobs is not None:
launch_parameters.setNumberOfSubjobs(
self.job_config.host_settings.num_subjobs)
if maestro and self.job_config.incorporation is not None and not \
self.inWriteMode():
proj_disp = INCORPORATION_MODE_MAP[self.job_config.incorporation]
launch_parameters.setMaestroProjectDisposition(proj_disp)
pt = maestro.project_table_get()
launch_parameters.setMaestroProjectName(pt.project_name)
return launch_parameters
def _wrapCmd(self, cmd):
"""
@overrides: _AbstractJobMixin
"""
return self.wrapCmdInLaunchApi(cmd)
def _getLogFilename(self):
return self.name + '.log'
def getLogAsString(self):
with open(self.getTaskFilename(self._getLogFilename())) as log_file:
return log_file.read()
[docs]class ComboJobMixin(_LaunchAPIMixin):
"""
Base class for running python code using the "combo" task pattern. Combine
with AbstractComboTask. To use, define:
mainFunction (or, equivalently backendMain): the python function that
will be executed in the backend process under job control.
"""
[docs] def write(self, skip_preprocessing=False):
try:
super().write(skip_preprocessing)
finally:
self._regenerateComboId()
def _writeFrontendJsonFile(self):
"""
@overrides: AbstractComboTask
"""
self.addInputFile(self.json_filename)
self.addOutputFile(os.path.basename(self.json_out_filename))
super()._writeFrontendJsonFile()
def _copyScriptToBackend(self):
script_filename = super()._copyScriptToBackend()
self.addInputFile(script_filename)
return script_filename
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
Child classes must not override this method.
"""
cmd = super().makeCmd()
assert cmd[0] == 'run'
cmd.pop(0)
return cmd
def _wrapCmd(self, cmd):
"""
@overrides: tasks.AbstractCmdTask
"""
cmd.extend(['--task_json', self.json_filename])
wrapped_cmd = self.wrapCmdInLaunchApi(cmd)
return wrapped_cmd
[docs] def isBackendMode(self):
is_backend_mode = self._run_as_backend
return is_backend_mode
[docs] def getTaskDir(self):
taskdir = super().getTaskDir()
if jobcontrol.get_backend():
# workaround for JOBCON-6136
return os.path.relpath(taskdir)
return taskdir
[docs] def getTaskFilename(self, fname):
task_fname = super().getTaskFilename(fname)
if jobcontrol.get_backend():
# workaround for JOBCON-6136
return os.path.relpath(task_fname)
return task_fname
def _onJobCompletion(self):
"""
@overrides: _AbstractJobMixin
"""
self._processBackend()
super()._onJobCompletion()
[docs] def runBackend(self):
"""
@overrides: AbstractComboTask
"""
super().runBackend()
self._registerBackendOutputFiles()
def _onBackendProgressChanged(self):
backend = jobcontrol.get_backend()
backend.setJobProgress(
self.progress, self.max_progress, description=self.progress_string)
def _registerBackendOutputFiles(self):
"""
Called by the backend to make sure any dynamically added output files
get registered with job control to be copied back to the launch dir.
"""
backend = jobcontrol.get_backend()
if backend is None:
# Running in-process, usually as a test or debugging.
return
for file in self.output_files:
backend.addOutputFile(file)
#===============================================================================
# Prepackaged Job Task Classes
#===============================================================================
[docs]class CmdJobTask(JobBackendCmdMixin, tasks.AbstractCmdTask):
"""
Class for running backends that already support jobcontrol.
CmdJobTask can either be subclassed to implement custom input/output params
and other behavior, or can be instantiated and run directly by supplying the
optional cmd_list constructor argument. For example:
task = jobtasks.CmdJobTask(cmd_list=['testapp', '-t', '1'])
task.start()
Note that specifying cmd_list will bypass some custom functionality and
should not be used with CmdJobTask subclasses.
"""
WINDOWS_SEP = '\\'
POSIX_SEP = '/'
[docs]class ComboJobTask(ComboJobMixin, tasks.AbstractComboTask):
def _setUpTaskFile(self, filename):
filename = filename.replace(WINDOWS_SEP, os.path.sep)
filename = filename.replace(POSIX_SEP, os.path.sep)
if not self.isBackendMode():
self.addInputFile(filename)
else:
self.addOutputFile(os.path.relpath(filename))
if os.path.isabs(filename) or filename.startswith('..'):
return os.path.basename(filename)
else: # filename is relative path in launch directory
return filename
def _setUpTaskFolder(self, folderpath):
folderpath = folderpath.replace(WINDOWS_SEP, os.path.sep)
folderpath = folderpath.replace(POSIX_SEP, os.path.sep)
if not self.isBackendMode():
dummy_file = os.path.join(folderpath, ".dummy_file")
with open(dummy_file, 'w'):
pass
self.addInputDirectory(folderpath)
else:
dummy_file = os.path.join(
os.path.relpath(folderpath), ".dummy_file")
with open(dummy_file, 'w'):
pass
self.output_files.append(os.path.relpath(folderpath))
if os.path.isabs(folderpath) or folderpath.startswith('..'):
return os.path.basename(folderpath)
else: # filename is relative path in launch directory
return folderpath
[docs] def runBackend(self):
# Specify the task dir as the cwd since we've already chdirs into
# the directory with all the task files
self.specifyTaskDir(None)
return super().runBackend()
[docs] def getTaskDir(self):
if self.isBackendMode():
return ''
return super().getTaskDir()
def _updateFromBackend(self, rehydrated_backend):
super()._updateFromBackend(rehydrated_backend)
self.output_files = rehydrated_backend.output_files