schrodinger.job.queue module¶
A class for running multiple concurrent jobs.
This module provides the class JobDJ
.
The name JobDJ
refers to an old implementation of “Job-control for
Distributed-Job drivers,” and the module name “queue” reflects the fact that
added jobs are queued until ready to run.
Like the original JobDJ
, its main purpose is to provide for job distribution
on multiple hosts. It will keep the number of active jobs on each host at or
under the number of processors specified. It can also restart jobs (see
documentation on max_retries in the JobDJ.__init__
method).
Additional functionality that is provided in this implementation includes the following:
- The ability to create dependencies between jobs so that
JobDJ
can avoid starting a job until its prerequisites are met. See theBaseJob.addPrereq
method. - The ability to run jobs locally (i.e. not under job control). These jobs can also have dependencies.
Step by step instructions for basic use:
Create a
JobDJ
instance. You can optionally specify the hosts to run the jobs on and the maximum number of jobs for each host by passing a list (host, max_jobs) tuples. By default, the host list is read from theSCHRODINGER_NODEFILE
, which is created from the -HOST host list by jlaunch. For example:job_dj = queue.JobDJ( [('localhost',1), ('cluster',20)] )
Add jobs to the
JobDJ
instance by calling theJobDJ.addJob
method. Pass an instance of aBaseJob
subclass (such asJobControlJob
orSubprocessJob
) to theaddJob
method. For example:job_dj.addJob(JobControlJob(["impact", "input1.inp"])) # Passing job control commands as a list of strings also works: job_dj.addJob(["impact", "input2.inp"])
Run all jobs with the
JobDJ.run
method. This is simple:job_dj.run()
Alternatively, you can use the updatedJobs generator:
for job in job_dj.updatedJobs(): job_dj.dump(filename)
This yields a job every time its status changes. One reason to do things this way is to allow restarting from a pickle. (A
JobDJ
instance that is restored from a pickle can be started with therun
method orupdatedJobs
generator.)
Because an added job can be a BaseJob
instance, additional functionality
can be gained by subclassing. For example, the JobDJ
instance that a job
has been added to can be retrieved with the getJobDJ
method and new jobs
can be added dynamically by a job at any point during its normal life cycle.
Copyright Schrodinger, LLC. All rights reserved.
-
class
schrodinger.job.queue.
BaseJob
(command_dir=None)¶ Bases:
object
A base job class for jobs run under
JobDJ
.The main methods to be implemented in subclasses are:
doCommand
- The method that does the real work of the job, either running a simple local calculation or submitting a job to job control.update
- A method called periodically while a job is running to update its current state._getState
- The get method used in thestate
property, used byJobDJ
to determine the job’s current state.
A few additional methods only need to be implemented in special situations:
finalize
- If you want custom behavior in your finalize method, override this method.cancelSubmitted
- If the job can run under a queue, implementing this method allows jobs that are waiting in thesubmitted
state to be restarted immediately on a newly available non-queue host.getStatusStrings
- If you want to use theJobDJ
print summary, this method should be updated to provide more useful information.
The execution point for all jobs is in the
BaseJob.run
method. That method callspreCommand
,doCommand
andpostCommand
in order.For jobs that are run locally, all main computation should be done in the doCommand method method. Note that the doCommand method blocks until completion and so no additional work will be done (e.g. job updates or submissions) until it returns. For this reason, only short jobs should be run locally without job control.
-
addFinalizer
(function, run_dir=None)¶ Add a function to be invoked when the job completes successfully.
See also the add_multi_job_finalizer function.
-
addGroupPrereq
(job)¶ Make all jobs connected to
job
prerequisites of all jobs connected to this Job.
-
addPrereq
(job)¶ Add a job that is an immediate prerequisite for this one.
-
doCommand
(*args, **kwargs)¶ Execute the command associated with this job.
-
finalize
()¶ Clean up after a job successfully runs.
-
genAllJobs
(seen=None)¶ A generator that yields all jobs connected to this one.
-
genAllPrereqs
(seen=None)¶ A generator that yields all jobs that are prerequisites on this one.
-
getCommandDir
()¶ Return the launch/command directory name. If None is returned, the job will be launched in the current directory.
-
getJobDJ
()¶ Return the JobDJ instance that this job has been added to.
-
getPrereqs
()¶ Return a set of all immediate prerequisites for this job.
-
getStatusStrings
()¶ Return a tuple of status strings for printing by
JobDJ
.The strings returned are (status, jobid, name, host).
-
hasStarted
()¶ Returns True if this job has started (not waiting)
-
init_count
= 0¶
-
isComplete
()¶ Returns True if this job finished successfully
-
maxFailuresReached
(msg)¶ This is a method that will be called after the job has failed and the maximum number of failures per
JobDJ
run has been reached. After invoking this method,JobDJ
will raise aRuntimeError
and the process will exit.
-
postCommand
()¶ A method to restore things to the pre-command state.
-
preCommand
()¶ A method to make pre-command changes, like cd’ing to the correct directory to run the command in.
-
run
(*args, **kwargs)¶ Run the job.
- The steps taken are as follows:
- Execute the preCommand method for things like changing the working directory.
- Call the doCommand to do the actual work of computation or job launching.
- Call the postCommand method to undo the changes from the preCommand that need to be undone.
-
runsLocally
()¶ Return True if the job runs on the
JobDJ
control host, False if not. Jobs that run locally don’t need hosts.There is no limit on the number of locally run jobs.
-
setup
()¶ A method to do initial setup; executed after
preCommand
, just beforedoCommand
.
-
state
¶ Return the current state of the job.
Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling
BaseJob
instances. For example, by examining some external condition (e.g. presence of output files) the state DONE could be returned immediately and the job would not run.
-
class
schrodinger.job.queue.
JobControlJob
(command, command_dir=None, name=None, max_retries=None, timeout=None, launch_timeout=None, launch_env_variables=None, **kwargs)¶ Bases:
schrodinger.job.queue.BaseJob
This class defines a job control job to be run under
JobDJ
.-
cancelSubmitted
()¶ If the job is still in the ‘submitted’ state, cancel it, purge the jobrecord and set the job handle to None.
Return True if this was successful, False otherwise.
-
doCommand
(host, local)¶ Launch job on specified
host
using jobcontrol.launch_job(). The-LOCAL
flag is added to the job invocation command iflocal
is True.Parameters: - host (str) – Host on which the job will be executed.
- local (bool) – Should “-LOCAL” be appended to the command?
-
getCommand
()¶ A hook method that can allow for the command to be generated at run time.
-
getJob
()¶ Return the job record as a schrodinger.job.jobcontrol.Job instance.
Returns None if the job hasn’t been launched.
-
getStatusStrings
()¶
-
kill
()¶ Send kill request to jobcontrol managed job
-
maxFailuresReached
(msg)¶ Print an error summary, including the last 20 lines from each log file in the LogFiles list of the job record.
-
retryFailure
(max_retries=0)¶ This method will be called when the job has failed, and JobDJ needs to know whether the job should be retried or not.
JobDJ’s value for the max_retries parameter is passed in, to be used when the job doesn’t have its own max_retries value.
Return True if this job should be retried, otherwise False.
-
runsLocally
()¶
-
update
()¶ Checks for changes in job status, and updates the object appropriately (marks for restart, etc). Raises a RuntimeError if an unknown Job Status or ExitStatus is encountered.
-
usesJobServer
()¶ Detect, by looking at the jobId, whether this job uses a job server. Since the jobId is only set once, cache the answer (_uses_job_server) once it is established.
-
-
class
schrodinger.job.queue.
JobDJ
(hosts=None, local=False, max_retries=None, default_max_retries=0, max_failures=None, verbosity='quiet', job_class=<class 'schrodinger.job.queue.JobControlJob'>, update_delay=None)¶ Bases:
object
Class for running commands/jobs in parallel under jobcontrol.
Create an instance of this class, add commands to run with .addJob(), and then call run(). Alternatively, use the the updatedJobs() iterator to get access to each job as it changes state.
-
active_jobs
¶
-
addJob
(job, add_connected=True, **kwargs)¶ Add a job to run. If
job
is not aBaseJob
instance, aBaseJob
instance is constructed withjob
as the first argument. The defaultBaseJob
class for theJobDJ
instance can be specified in the constructor.Additional keyword arguments are passed on to the Job constructor.
All job prerequisites and dependencies need to be specified before adding a job to
JobDJ
.Parameters: add_connected (bool) – If True, for jobs with dependencies only one job per connected group should be added and all connected jobs will be discovered and added automatically. If False, it is the user’s responsibility to make sure that any prerequisites of a job are also added.
-
all_jobs
¶
-
disableSmartDistribution
()¶ Disable smart distribution of jobs.
Smart distribution allows subjobs to run on the machine that JobDJ is running on when JobDJ itself is running under a queuing system. This is usually desirable since the JobDJ process doesn’t generally consume significant computational resources and you don’t want to leave a queue slot mostly idle.
-
done_jobs
¶ Successfully completed jobs, sorted into the order they were marked as completed by JobDJ.
-
failed_jobs
¶
-
getActiveProcCounts
()¶ Return a dictionary containing the number of active jobs on each host.
-
hasStarted
()¶ Returns True if JobDJ has started already
-
isComplete
()¶ Returns True if JobDJ has completed, False otherwise.
-
killJobs
()¶ Kill all active jobs
-
markForRestart
(job, action)¶ Mark a job as dead, but make sure that it gets restarted.
Parameters: action – Describes the reason the job is being restarted.
-
printStatus
(job=None, action=None)¶ Prints the status of
JobDJ
and the action/status for the job.If no job is specified, prints the status header.
If no action is specified, the
status_string
attribute of the job is used.
-
run
(status_change_callback=None, periodic_callback=None, callback_interval=300, restart_failed=True)¶ Call this method to run all jobs that have been added. The method will return control when all jobs have completed.
Parameters: - status_change_callback – A command to call every time the status changes. An alternative approach is to use the updatedJobs generator, which makes it easy to determine which job changed its state.
- periodic_callback (callable) – A command to call periodically, regardless of whether job status has changed or not. The function will be called without any arguments.
- callback_interval (int) – The interval at which the periodic interval will be called. This time is only approximately enforced and will depend on the timing delay settings (e.g. MONITOR_DELAY).
- restart_failed (bool) – True (default) if previously failed jobs should be restarted, False if not.
-
setHostList
(host_list)¶ Tell JobDJ to use the specified host list.
Repeated hostnames will combine the available cpus and keep the list position of the first occurrence.
Active jobs are not affected by a change in the host list.
Parameters: host_list (A list of (<hostname>, <available cpus>) tuples, where <hostname> is a string and <available cpus> is an integer.) – A host list specification.
-
total_active
¶ The number of jobs currently running.
-
total_added
¶ The number of individual jobs that have been added to the JobDJ instance.
-
total_failed
¶ The number of jobs that have failed.
-
total_finished
¶ The number of jobs that have finished successfully.
-
updatedJobs
(periodic_callback=None, callback_interval=300, restart_failed=True)¶ A generator that starts job distribution and yields each job as its status changes. A status change occurs when a job is started, when it finishes, or fails. The
state
property of the yielded job can be examined to determine its current state.Use as:
for job in jobdj.updatedJobs(): if job.state == DONE and isinstance(job, JobControlJob): print "%s is done." % job.getJob().JobId
Parameters: restart_failed (bool) – True (default) if previously failed jobs should be restarted, False if not.
-
waiting_jobs
¶ Jobs waiting to be started.
-
-
class
schrodinger.job.queue.
LinkedList
¶ Bases:
object
A doubly linked list, providing constant time addition, size, and truth checks. It provides for constant time removal if you have the node object in hand. It provides for linear time iteration without copying while allowing removals or additions to the list during iteration.
-
add
(value)¶ Add a node to the list.
-
remove
(node)¶ Remove a node from the list.
-
reverse_iter
()¶ Iterate from tail to head over the list, yielding a (node, value) tuple for each element.
-
-
class
schrodinger.job.queue.
LinkedListNode
(value, prev=None)¶ Bases:
object
A node for the LinkedList class, holding a value, and a reference to the previous and next node in the list.
-
class
schrodinger.job.queue.
LocalhostJob
¶ Bases:
schrodinger.job.queue.RequirementsJob
This class should be used as a mixin on a job type that requires jobs to be run on localhost.
Only one of these jobs will be run at a time.
-
getRequirements
(jobdj)¶
-
init_count
= 0¶
-
requirement
= 'localhost'¶
-
-
class
schrodinger.job.queue.
PriorityQueue
¶ Bases:
object
This is a general priority queue.
-
pop
()¶ Get the highest priority item, removing it from the heap.
-
push
(item)¶ Add an item to the heap. This item must have a __lt__ method as per the heapq module requirement.
-
-
class
schrodinger.job.queue.
RequirementsJob
¶ Bases:
object
This is a base class that indicates a job has special requirements (e.g. runs only on localhost) and shouldn’t be run on the general pool of hosts.
-
getRequirements
(jobdj)¶ Start a job if the requirements are available. If the job was started, return True, if not, return False.
-
requirement
= None¶
-
-
class
schrodinger.job.queue.
RunningJobs
¶ Bases:
schrodinger.job.queue.LinkedList
A LinkedList subclass that tracks running jobs and keeps a tally of jobs running on each machine.
-
add
(job)¶ Add a running job.
-
jobsCount
()¶ Return a dict telling how many jobs are running on each host.
-
remove
(node)¶ Remove a linked list node.
-
-
class
schrodinger.job.queue.
SubprocessJob
(command, command_dir=None, timeout=None, stdout=-1, stderr=-1)¶ Bases:
schrodinger.job.queue.BaseJob
,schrodinger.job.queue.LocalhostJob
A job for running an external process. By default, stdout and stderr are collected and made available as the ‘stdout’ and ‘stderr’ attributes when the job is completed.
-
doCommand
(*args, **kwargs)¶ Execute the command associated with this job via subprocess.
Return type: None
-
kill
()¶ Send termination request to subprocess managed job.
-
-
schrodinger.job.queue.
add_multi_job_finalizer
(function, jobs, run_dir=None)¶ Create a finalizer function that will be called when all jobs in the jobs iterator are complete.
-
schrodinger.job.queue.
backup_file
(log_file, copy)¶ Backs up the file log_file copying it to .1, .2, etc.
-
schrodinger.job.queue.
get_command
(base_command, host='localhost', procs=1, local=False)¶ Given a base command and additional launching specifications, return a modified command that is ready to pass to jobcontrol.launch_job.
-
schrodinger.job.queue.
get_update_delay
()¶ Return the delay to use for jobdb reads.
-
schrodinger.job.queue.
use_jc_message
()¶ Check if job control messages need to be used for subjob status.