Source code for schrodinger.application.desmond.queue
from schrodinger.job import queue as jobcontrol_queue
from schrodinger.job import jobcontrol
from typing import List, Tuple, Optional
import time
import functools
import os
from pathlib import Path
import schrodinger.utils.sea as sea
# File names for automatic checkpointing
CHECKPOINT_REQUESTED_FILENAME = 'CHECKPOINT_REQUESTED'
CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME = 'CHECKPOINT_WITH_RESTART_REQUESTED'
Host = Tuple[str, Optional[int]]
[docs]class Queue:
[docs] def __init__(self,
hosts: str,
max_job: int,
max_retries: int,
periodic_callback=None):
"""
:param hosts: string passed to -HOST.
:param max_job: Maximum number of jobs to run simultaneously.
:param max_retries: Maximum number of times to retry a failed job.
:param periodic_callback: Function to call periodically as the jobs run.
This can be used to handle the halt message for stopping
a running workflow.
"""
self.hosts = _parse_hosts(hosts, max_job)
self.max_retries = max_retries
self.jobdj = None
self.periodic_callback = periodic_callback
self._queued_jobs = []
self._max_stop_time = 3600
[docs] def run(self):
"""
Run jobs for all multisim stages.
Starts a separate JobDJ for each multisim stage.
queue.push(jobs)
queue.run()
while jobs: <---------------|
jobdj.run() |
multisim_jobs.finish() |
stage.capture() |
next_stage.push() |
next_stage.release() |
queue.push(next_jobs) --
"""
from schrodinger.application.desmond.cmj import JobStatus
while self._queued_jobs:
self._run_stage(self._queued_jobs)
[docs] def stop(self) -> int:
"""
Attempt to stop the subjobs, but kill them if they
do not stop in time.
:return: Number of subjobs killed due to a failure to stop.
"""
stop_time = time.time()
num_subjobs_killed = 0
# New jobs may be launched, so loop until
# no more jobs are running.
stopped_jobs = set()
while self.running_jobs:
for job in self.running_jobs:
jctrl = job.getJob()
if time.time() - stop_time > self._max_stop_time:
num_subjobs_killed += 1
jctrl.kill()
elif job not in stopped_jobs:
stopped_jobs.add(job)
jctrl.stop()
# Process the finished jobs
self._finish_stage()
return num_subjobs_killed
[docs] def push(self, jobs: List["cmj.Job"]): # noqa: F821
self._queued_jobs.extend(
filter(lambda j: j.jlaunch_cmd is not None, jobs))
@property
def running_jobs(self) -> List["JobAdapter"]:
running_jobs = []
if self.jobdj is None:
return []
return [
j for j in self.jobdj.active_jobs
if j.getJob() and not j.getJob().isComplete()
]
def _run_stage(self, jobs: List["JobAdapter"]):
"""
Launch JobDJ for a given set of jobs.
"""
# TODO: max_failures=jobcontrol_queue.NOLIMIT matches the current behavior
# but in some cases it would be better to just exit on the first failure.
self.jobdj = jobcontrol_queue.JobDJ(
hosts=self.hosts,
verbosity="normal",
job_class=JobAdapter,
max_failures=jobcontrol_queue.NOLIMIT,
max_retries=self.max_retries)
# The host running this often does not have a GPU
# so smart distribution should be disabled.
self.jobdj.disableSmartDistribution()
# Run all jobs for this stage
while jobs:
# Add the jobs, linking to the multisim jobs
for job in jobs:
self.jobdj.addJob(
job.jlaunch_cmd, multisim_job=job, command_dir=job.dir)
# Clear queue, new jobs will be added if the current jobs
# are requeued.
self._queued_jobs = []
try:
self.jobdj.run(
status_change_callback=self._status_change_callback,
periodic_callback=self.periodic_callback,
callback_interval=60)
except RuntimeError:
# Use multisim error handling for failed jobs
pass
# Run any requeued jobs
jobs = self._queued_jobs
# Finish the stage and add new jobs (if any) to self._queued_jobs
self._finish_stage()
def _finish_stage(self):
from schrodinger.application.desmond.cmj import JobStatus
for job in self.jobdj.all_jobs:
jctrl = job.getJob()
if jctrl is None:
# Check for jobs that never ran
job.multisim_job.status.set(JobStatus.LAUNCH_FAILURE)
continue
# Skip intermediate jobs that were checkpointed
if job.is_checkpointed:
continue
job.multisim_job.process_completed_job(jctrl)
job.multisim_job.finish()
def _status_change_callback(self, job: "JobAdapter"):
"""
Process the job on a status change.
"""
from schrodinger.application.desmond.cmj import JobStatus
if job.state == jobcontrol_queue.JobState.WAITING:
job.multisim_job.status.set(JobStatus.WAITING)
elif job.state == jobcontrol_queue.JobState.ACTIVE:
job.multisim_job.status.set(JobStatus.RUNNING)
elif job.state == jobcontrol_queue.JobState.FAILED_RETRYABLE:
job.multisim_job.status.set(JobStatus.RETRIABLE_FAILURE)
elif job.state in (jobcontrol_queue.JobState.DONE,
jobcontrol_queue.JobState.FAILED):
if job.state == jobcontrol_queue.JobState.DONE:
jctrl = job.getJob()
for out_fname in jctrl.getOutputFiles():
if Path(out_fname
).name == CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME:
job.multisim_job.process_completed_job(
jctrl, restart_requested=True)
job.multisim_job.requeue(jctrl)
job.is_checkpointed = True
self.push([job.multisim_job])
return
elif Path(out_fname).name == CHECKPOINT_REQUESTED_FILENAME:
job.multisim_job.process_completed_job(
jctrl, checkpoint_requested=True)
job.multisim_job.finish()
job.is_checkpointed = True
return
[docs]class JobAdapter(jobcontrol_queue.JobControlJob):
[docs] def __init__(self, *args, multisim_job=None, **kwargs):
self.multisim_job = multisim_job
# Set to True if this job is checkpointed by the auto restart mechanism
self.is_checkpointed = False
if launch_timeout := os.getenv('SCHRODINGER_MULTISIM_LAUNCH_TIMEOUT'):
kwargs['launch_timeout'] = int(launch_timeout)
else:
kwargs['launch_timeout'] = 1800
super().__init__(*args, **kwargs)
[docs] def getCommand(self) -> List[str]:
# Restart command has priority over the original command
return self.multisim_job.jlaunch_cmd
[docs] def maxFailuresReached(*args, **kwargs):
# Use multisim failure reporting for now
pass
def _parse_hosts(hosts: str, max_job: int) -> List[Host]:
"""
Parse the hosts while also respecting the given max_job.
See `Queue` for the meaning of the arguments.
:return: List of Host tuples.
"""
# Handle multiple hosts "localhost localhost"
split_hosts = hosts.strip().split()
if len(split_hosts) > 1:
# Only running on the same host is supported
if len(set(split_hosts)) == 1:
hosts = f'{split_hosts[0]}:{len(split_hosts)}'
else:
raise ValueError("Different hosts are no longer supported. "
"All jobs must be run on the same host.")
split_host = hosts.split(':')
host = split_host[0]
if max_job:
# Max job takes priority if specified
hosts = f'{host}:{max_job}'
else:
# For non-queue hosts, if max_job and
# the number of cpus are not specified,
# set to the number of processors.
# This is different from JobDJ, which
# set it to 1.
host_entry = jobcontrol.get_host(host)
if len(split_host) == 1 and not host_entry.isQueue():
hosts = f'{host}:{host_entry.processors}'
return jobcontrol.host_str_to_list(hosts)