Source code for schrodinger.application.desmond.queue

from schrodinger.job import queue as jobcontrol_queue
from schrodinger.job import jobcontrol
from typing import List, Tuple, Optional
import time
import functools
import os
from pathlib import Path
import schrodinger.utils.sea as sea

# File names for automatic checkpointing
CHECKPOINT_REQUESTED_FILENAME = 'CHECKPOINT_REQUESTED'
CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME = 'CHECKPOINT_WITH_RESTART_REQUESTED'

Host = Tuple[str, Optional[int]]


[docs]class Queue:
[docs] def __init__(self, hosts: str, max_job: int, max_retries: int, periodic_callback=None): """ :param hosts: string passed to -HOST. :param max_job: Maximum number of jobs to run simultaneously. :param max_retries: Maximum number of times to retry a failed job. :param periodic_callback: Function to call periodically as the jobs run. This can be used to handle the halt message for stopping a running workflow. """ self.hosts = _parse_hosts(hosts, max_job) self.max_retries = max_retries self.jobdj = None self.periodic_callback = periodic_callback self._queued_jobs = [] self._max_stop_time = 3600
[docs] def run(self): """ Run jobs for all multisim stages. Starts a separate JobDJ for each multisim stage. queue.push(jobs) queue.run() while jobs: <---------------| jobdj.run() | multisim_jobs.finish() | stage.capture() | next_stage.push() | next_stage.release() | queue.push(next_jobs) -- """ from schrodinger.application.desmond.cmj import JobStatus while self._queued_jobs: self._run_stage(self._queued_jobs)
[docs] def stop(self) -> int: """ Attempt to stop the subjobs, but kill them if they do not stop in time. :return: Number of subjobs killed due to a failure to stop. """ stop_time = time.time() num_subjobs_killed = 0 # New jobs may be launched, so loop until # no more jobs are running. stopped_jobs = set() while self.running_jobs: for job in self.running_jobs: jctrl = job.getJob() if time.time() - stop_time > self._max_stop_time: num_subjobs_killed += 1 jctrl.kill() elif job not in stopped_jobs: stopped_jobs.add(job) jctrl.stop() # Process the finished jobs self._finish_stage() return num_subjobs_killed
[docs] def push(self, jobs: List["cmj.Job"]): # noqa: F821 self._queued_jobs.extend( filter(lambda j: j.jlaunch_cmd is not None, jobs))
@property def running_jobs(self) -> List["JobAdapter"]: running_jobs = [] if self.jobdj is None: return [] return [ j for j in self.jobdj.active_jobs if j.getJob() and not j.getJob().isComplete() ] def _run_stage(self, jobs: List["JobAdapter"]): """ Launch JobDJ for a given set of jobs. """ # TODO: max_failures=jobcontrol_queue.NOLIMIT matches the current behavior # but in some cases it would be better to just exit on the first failure. self.jobdj = jobcontrol_queue.JobDJ( hosts=self.hosts, verbosity="normal", job_class=JobAdapter, max_failures=jobcontrol_queue.NOLIMIT, max_retries=self.max_retries) # The host running this often does not have a GPU # so smart distribution should be disabled. self.jobdj.disableSmartDistribution() # Run all jobs for this stage while jobs: # Add the jobs, linking to the multisim jobs for job in jobs: self.jobdj.addJob( job.jlaunch_cmd, multisim_job=job, command_dir=job.dir) # Clear queue, new jobs will be added if the current jobs # are requeued. self._queued_jobs = [] try: self.jobdj.run( status_change_callback=self._status_change_callback, periodic_callback=self.periodic_callback, callback_interval=60) except RuntimeError: # Use multisim error handling for failed jobs pass # Run any requeued jobs jobs = self._queued_jobs # Finish the stage and add new jobs (if any) to self._queued_jobs self._finish_stage() def _finish_stage(self): from schrodinger.application.desmond.cmj import JobStatus for job in self.jobdj.all_jobs: jctrl = job.getJob() if jctrl is None: # Check for jobs that never ran job.multisim_job.status.set(JobStatus.LAUNCH_FAILURE) continue # Skip intermediate jobs that were checkpointed if job.is_checkpointed: continue job.multisim_job.process_completed_job(jctrl) job.multisim_job.finish() def _status_change_callback(self, job: "JobAdapter"): """ Process the job on a status change. """ from schrodinger.application.desmond.cmj import JobStatus if job.state == jobcontrol_queue.JobState.WAITING: job.multisim_job.status.set(JobStatus.WAITING) elif job.state == jobcontrol_queue.JobState.ACTIVE: job.multisim_job.status.set(JobStatus.RUNNING) elif job.state == jobcontrol_queue.JobState.FAILED_RETRYABLE: job.multisim_job.status.set(JobStatus.RETRIABLE_FAILURE) elif job.state in (jobcontrol_queue.JobState.DONE, jobcontrol_queue.JobState.FAILED): if job.state == jobcontrol_queue.JobState.DONE: jctrl = job.getJob() for out_fname in jctrl.getOutputFiles(): if Path(out_fname ).name == CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME: job.multisim_job.process_completed_job( jctrl, restart_requested=True) job.multisim_job.requeue(jctrl) job.is_checkpointed = True self.push([job.multisim_job]) return elif Path(out_fname).name == CHECKPOINT_REQUESTED_FILENAME: job.multisim_job.process_completed_job( jctrl, checkpoint_requested=True) job.multisim_job.finish() job.is_checkpointed = True return
[docs]class JobAdapter(jobcontrol_queue.JobControlJob):
[docs] def __init__(self, *args, multisim_job=None, **kwargs): self.multisim_job = multisim_job # Set to True if this job is checkpointed by the auto restart mechanism self.is_checkpointed = False if launch_timeout := os.getenv('SCHRODINGER_MULTISIM_LAUNCH_TIMEOUT'): kwargs['launch_timeout'] = int(launch_timeout) else: kwargs['launch_timeout'] = 1800 super().__init__(*args, **kwargs)
[docs] def getCommand(self) -> List[str]: # Restart command has priority over the original command return self.multisim_job.jlaunch_cmd
[docs] def maxFailuresReached(*args, **kwargs): # Use multisim failure reporting for now pass
def _parse_hosts(hosts: str, max_job: int) -> List[Host]: """ Parse the hosts while also respecting the given max_job. See `Queue` for the meaning of the arguments. :return: List of Host tuples. """ # Handle multiple hosts "localhost localhost" split_hosts = hosts.strip().split() if len(split_hosts) > 1: # Only running on the same host is supported if len(set(split_hosts)) == 1: hosts = f'{split_hosts[0]}:{len(split_hosts)}' else: raise ValueError("Different hosts are no longer supported. " "All jobs must be run on the same host.") split_host = hosts.split(':') host = split_host[0] if max_job: # Max job takes priority if specified hosts = f'{host}:{max_job}' else: # For non-queue hosts, if max_job and # the number of cpus are not specified, # set to the number of processors. # This is different from JobDJ, which # set it to 1. host_entry = jobcontrol.get_host(host) if len(split_host) == 1 and not host_entry.isQueue(): hosts = f'{host}:{host_entry.processors}' return jobcontrol.host_str_to_list(hosts)