Package schrodinger :: Package job :: Module queue :: Class JobDJ
[hide private]
[frames] | no frames]

Class JobDJ

Class for running commands/jobs in parallel under jobcontrol.

Create an instance of this class, add commands to run with .addJob(), and then call run(). Alternatively, use the the updatedJobs() iterator to get access to each job as it changes state.

Instance Methods [hide private]
 
__init__(self, hosts=None, local=False, max_retries=None, default_max_retries=0, max_failures=None, verbosity="quiet", job_class=JobControlJob, update_delay=None)
Constructor.
 
hasStarted(self)
Returns True if JobDJ has started already
 
isComplete(self)
Returns True if JobDJ has completed, False otherwise.
 
markForRestart(self, job, action)
Mark a job as dead, but make sure that it gets restarted.
 
_checkMessaging(self)
Check to see if use of job control messaging is possible.
 
__setstate__(self, state_dict)
Reset state from a pickle.
 
_getWaitingJobs(self)
 
_getDoneJobs(self)
 
_getActiveJobs(self)
 
active_jobs(self)
 
_getFailedJobs(self)
 
failed_jobs(self)
 
_getAllJobs(self)
 
all_jobs(self)
 
killJobs(self)
Kill all active jobs
 
total_added(self)
The number of individual jobs that have been added to the JobDJ instance.
 
addJob(self, job, add_connected=True, **kwargs)
Add a job to run.
 
_queueJob(self, job)
Add a job to a queue heap.
 
dump(self, filename)
Pickle the JobDJ instance to the specified file name.
 
disableSmartDistribution(self)
Disable smart distribution of jobs.
 
_checkSmartDistribution(self)
Check to see if we should use smart distribution.
 
_availableHost(self, required_procs=1)
Return the next available host name from the host list.
 
getActiveProcCounts(self)
Return a dictionary containing the number of active jobs on each host.
 
setHostList(self, host_list)
Tell JobDJ to use the specified host list.
 
_host_list(self)
 
printStatus(self, job=None, action=None)
Prints the status of JobDJ and the action/status for the job.
 
_start(self)
Perform startup activities for JobDJ, including printing headers and marking jobs for restart if JobDJ is restarting.
 
run(self, status_change_callback=None, periodic_callback=None, callback_interval=300)
Call this method to run all jobs that have been added.
 
updatedJobs(self, periodic_callback=None, callback_interval=300)
A generator that starts job distribution and yields each job as its status changes.
 
_checkSubmitted(self, last_job)
Cancel a job in the 'submitted' state if possible and move it back into the job queue to be restarted.
 
_updateRunningJobs(self)
A generator to monitor the running jobs and update their state.
 
_monitorJobs(self, max_starts=5)
A private generator that implements the guts of the updatedJobs generator.
 
_getMessages(self)
Update any messages from jmonitor.
 
_jobFailed(self, job)
A method to keep track of completely failed jobs and die if max_failures is exceeded.
 
_jobFinished(self, job)
Take some standard steps on successful completion of a job.
 
_startJobs(self, max_starts=None)
A generator to start as many jobs as possible, given the current open hosts.
Class Variables [hide private]
  waiting_jobs = property(_getWaitingJobs, doc= "Jobs waiting to...
  done_jobs = property(_getDoneJobs, doc= "Successfully complete...
Method Details [hide private]

__init__(self, hosts=None, local=False, max_retries=None, default_max_retries=0, max_failures=None, verbosity="quiet", job_class=JobControlJob, update_delay=None)
(Constructor)

 

Constructor.

Parameters:
  • hosts (A list of (<hostname>, <available cpus>) tuples, where <hostname> is a string and <available cpus> is an integer.) - A host list specification. To convert a string representation from -HOST to a list, use the jobcontrol.host_str_to_list() function. If no hosts argument is specified, the hosts will be read from the job control SCHRODINGER_NODEFILE environment variable.
  • local (bool) - If True, then all jobs will run with -LOCAL. Default is False.
  • max_retries (int) - Number of allowed retries per subjob. If this is set, it is never overridden by the SCHRODINGER_MAX_RETRIES environment variable. If it is not set, the value in default_max_retries is used, and SCHRODINGER_MAX_RETRIES is allowed to override. If you wish to disable restarting altogether, set this value to zero.
  • default_max_retries (int) - Number of allowed retries per subjob. This value can always be overridden by the SCHRODINGER_MAX_RETRIES environment variable. Default is zero.
  • max_failures (int) - Total number of allowed subjob failures before JobDJ exits. If it is not defined, a default of zero will be used (exit on any failure after attempting to restart), but this can be overridden with the SCHRODINGER_MAX_FAILURES environment variable. To allow an unlimited number of subjob failures, set max_failures to the module level NOLIMIT constant.
  • verbosity (str) - There are three allowed verbosity levels: "quiet" - only warnings and errors are printed; "normal" - JobDJ progress is printed; and "verbose" - additional debugging info is printed. Default is "quiet".
  • job_class (BaseJob subclass) - The class to use as the default job constructor when the addJob argument is not a BaseJob instance.
  • update_delay (int) - The number of seconds to wait between job control database reads for JobControlJob jobs. (This delay is for an individual job, not for any job database read.) Default is None, which causes the module level constant UPDATE_DELAY to be used.

markForRestart(self, job, action)

 

Mark a job as dead, but make sure that it gets restarted.

Parameters:
  • action - Describes the reason the job is being restarted.

_checkMessaging(self)

 

Check to see if use of job control messaging is possible. It is necessary to get a backend object to communicate via jmonitor messages.

Returns:
True if messaging is possible, False if not.

total_added(self)

 

The number of individual jobs that have been added to the JobDJ instance.

Decorators:
  • @property

addJob(self, job, add_connected=True, **kwargs)

 

Add a job to run. If job is not a BaseJob instance, a BaseJob instance is constructed with job as the first argument. The default BaseJob class for the JobDJ instance can be specified in the constructor.

Additional keyword arguments are passed on to the Job constructor.

All job prerequisites and dependencies need to be specified before adding a job to JobDJ.

Parameters:
  • add_connected (bool) - If True, for jobs with dependencies only one job per connected group should be added and all connected jobs will be discovered and added automatically. If False, it is the user's responsibility to make sure that any prerequisites of a job are also added.

disableSmartDistribution(self)

 

Disable smart distribution of jobs.

Smart distribution allows subjobs to run on the machine that JobDJ is running on when JobDJ itself is running under a queuing system. This is usually desirable since the JobDJ process doesn't generally consume significant computational resources and you don't want to leave a queue slot mostly idle.

_checkSmartDistribution(self)

 

Check to see if we should use smart distribution. Smart distribution runs jobs on the localhost when a host is consumed by the JobDJ driver.

Smart distribution is used if:

  1. smart distribution is enabled (the default),
  2. JobDJ is running under Job Control itself,
  3. the JobDJ job is running on a queue, and
  4. "localhost" is not in the JobDJ host list already.

In this situation, the preference is to always keep a single job running on localhost.

If the JobDJ job has a HostEntry that is present in the host list, the maximum number of jobs for that host is decreased by one.

This ensures a "smart" job distributor that runs jobs on the same host that the JobDJ driver is on, rather than leaving it mostly idle.

_availableHost(self, required_procs=1)

 

Return the next available host name from the host list. Return None if all hosts are saturated.

Parameters:
  • required_props (int) - Number of processors that this subjobs will use.

setHostList(self, host_list)

 

Tell JobDJ to use the specified host list.

Repeated hostnames will combine the available cpus and keep the list position of the first occurrence.

Active jobs are not affected by a change in the host list.

Parameters:
  • host_list (A list of (<hostname>, <available cpus>) tuples, where <hostname> is a string and <available cpus> is an integer.) - A host list specification.

_host_list(self)

 
Decorators:
  • @property

printStatus(self, job=None, action=None)

 

Prints the status of JobDJ and the action/status for the job.

If no job is specified, prints the status header.

If no action is specified, the status_string attribute of the job is used.

_start(self)

 

Perform startup activities for JobDJ, including printing headers and marking jobs for restart if JobDJ is restarting.

Raises:
  • RuntimeError - If no jobs have been added to JobDJ.

run(self, status_change_callback=None, periodic_callback=None, callback_interval=300)

 

Call this method to run all jobs that have been added. The method will return control when all jobs have completed.

Parameters:
  • status_change_callback - A command to call every time the status changes. An alternative approach is to use the updatedJobs generator, which makes it easy to determine which job changed its state.
  • periodic_callback (callable) - A command to call periodically, regardless of whether job status has changed or not. The function will be called without any arguments.
  • callback_interval (int) - The interval at which the periodic interval will be called. This time is only approximately enforced and will depend on the timing delay settings (e.g. MONITOR_DELAY).

updatedJobs(self, periodic_callback=None, callback_interval=300)

 

A generator that starts job distribution and yields each job as its status changes. A status change occurs when a job is started, when it finishes, or fails. The state property of the yielded job can be examined to determine its current state.

Use as:

   for job in jobdj.updatedJobs():
       if job.state == DONE and isinstance(job, JobControlJob):
           print "%s is done." % job.getJob().JobId

_checkSubmitted(self, last_job)

 

Cancel a job in the 'submitted' state if possible and move it back into the job queue to be restarted.

This method is only useful to call when a known non-queue host is available.

_updateRunningJobs(self)

 

A generator to monitor the running jobs and update their state. Yields a job every time it finishes or fails.

_monitorJobs(self, max_starts=5)

 

A private generator that implements the guts of the updatedJobs generator. It yields a job every time its status changes, and also yields None at the end of each cycle so that the caller can gain control if desired.

(Despite being private, the pipeline.stage module calls this generator directly. So, we can't assume that this function has been called from updateJobs.)

The JobDJ._start method must be called before invoking this method.

Parameters:
  • max_starts (int) - The maximum number of jobs to start before pausing to check for job status updates.

_jobFinished(self, job)

 

Take some standard steps on successful completion of a job.

Add the completed job to the finished list, call its finalize method, and look for new jobs from completed dependencies.

_startJobs(self, max_starts=None)

 

A generator to start as many jobs as possible, given the current open hosts. Each job that is successfully started (or restarted) is yielded. The optional max_starts argument can be used to limit the number of jobs that will be started before this generator terminates.

Parameters:
  • max_starts (int) - The maximum number of jobs to start.

Class Variable Details [hide private]

waiting_jobs

Value:
property(_getWaitingJobs, doc= "Jobs waiting to be started.")

done_jobs

Value:
property(_getDoneJobs, doc= "Successfully completed jobs, sorted into \
" "the order they were marked as completed by " "JobDJ.")