Package schrodinger :: Package pipeline :: Module pipeline :: Class Pipeline
[hide private]
[frames] | no frames]

Class Pipeline

A controller responsible for running the stages in the correct order.

Pipeline parses the input file, creates instances of all IO objects, stage objects, and stage job objects, submits the stages in the appropriate directories, and waits for them to finish. Once a stage finishes, it starts any stages that depend on its output. When all stages are complete, it presents the user with the USER OUTPUT objects - IO output objects that are to be returned by the pipeline.

Instance Methods [hide private]
 
__init__(self, jobname='pipeline', prog=None, logfh=None, restart_file=None)
 
setOptions(self, subjob_hosts=None, subjob_local=None, njobs=None, adjust=None, force=None, cleanup=None, max_retries=None)
Set the options of the pipeline.
 
readNewFormatVariable(self, varname, keywords)
 
readNewFormatStage(self, stagename, keywords)
 
_addUserOut(self, value)
 
_setStructOut(self, value)
 
readNewFormat(self, command_file)
 
readFile(self, command_file)
Read a Pipeline input file.
 
checkUserOutputs(self)
Make sure that all specified user outputs are variable names that are returned by a stage.
 
createStage(self, stagename, inIOnames, outIOnames, stage_class, keywords)
Create a stage object and add it to the pipeline.
 
_addVariable(self, var_obj, varname)
Add an initiation variable to the pipeline.
 
reportParameters(self)
Print the parameters of each stage.
 
inputsForStagesDefined(self)
Check if the inputs for all stages are either specified in the input variables or are outputs from other stages.
 
startReadyStages(self)
Start all stages that are ready to be run.
 
setStageOptions(self, stageobj)
Propagate the pipeline options (hosts, ncpus, etc) to the specified stage.
 
requestCpus(self)
 
updateStagesCpus(self)
Send messages to stages (if necessary) telling them how many processors to use from each host.
 
_submitStage(self, stagejob)
Start the specified stage.
 
_updateStageStatusRestarting(self)
Update the status of each stage job.
 
_updateStageStatus(self)
Update the status of each RUNNING stage job object.
 
__getstate__(self)
 
dump(self)
Dumps the Pipeline instance to a restart file
 
_getMaxRetries(self)
Unimplemented.
 
getStageByJobid(self, jobid)
 
handleStageMessages(self)
 
distributeCpus(self)
Called when extra CPUs become available (as given back by the stages using them).
 
run(self, idle_function=None)
Run the Pipeline.
 
getUserOutputs(self)
Return a list of pipeio objects that are to be presented to the user at the end of the Pipeline run.
 
getStructureOutput(self)
 
getUserOutputFiles(self)
Return a list of files for all user (final) outputs of Pipeline.
 
printAction(self, stagename, stagejobid, action)
Print an action for stage stagename.
 
cleanupIntermediateFiles(self, stagejob)
Remove any stage outputs that are no longer needed.
 
getUsedHosts(self, host_pool)
Return a dictionary of hosts that are CURRENTLY checked out (used) by all stages combined (within specified host_pool).
Method Details [hide private]

setOptions(self, subjob_hosts=None, subjob_local=None, njobs=None, adjust=None, force=None, cleanup=None, max_retries=None)

 

Set the options of the pipeline.

Call this function before calling pipeline.run() to set hosts/njobs/etc. When restarting, call this function to modify the options.

readFile(self, command_file)

 

Read a Pipeline input file.

Raises:
  • RuntimeError - Raised if there is a problem with input file.

checkUserOutputs(self)

 

Make sure that all specified user outputs are variable names that are returned by a stage. This is done to fail on typos in input file.

Raises:
  • RuntimeError - Raised on invalid USEROUT name.

createStage(self, stagename, inIOnames, outIOnames, stage_class, keywords)

 

Create a stage object and add it to the pipeline.

Parameters:
  • stagename - Name of the stage.
  • inIOnames - Input pipeio object names.
  • outIOnames - Output pipeio object names.
  • stage_class - module.class defining the stage.
  • keywords (list) - All keywords for the stage, a list of (keyword, value) tuples.

_addVariable(self, var_obj, varname)

 

Add an initiation variable to the pipeline.

Raises:
  • RuntimeError - Raised if there is a problem with the variable name or if the variable name is already used.

inputsForStagesDefined(self)

 

Check if the inputs for all stages are either specified in the input variables or are outputs from other stages.

Raises:
  • RuntimeError - Raised if inputs are not defined, as this indicates the input file is invalid.

startReadyStages(self)

 

Start all stages that are ready to be run.

When restarting, start WAITING and RESTARTING stages. When NOT restarting, start only WAITING stages. Return the number of stages that were started (not currently used).

_updateStageStatusRestarting(self)

 

Update the status of each stage job.

This runs when restarting a pipeline. If a job has failed for any reason, it marks it to be restarted. If a job is still running or submitted, it does nothing.

_updateStageStatus(self)

 

Update the status of each RUNNING stage job object.

This is periodically run by the pipeline. It checks the status of RUNNING jobs and updates them if they have completed of failed.

distributeCpus(self)

 

Called when extra CPUs become available (as given back by the stages using them). Will distribute the freed-up CPUs to other stages.

run(self, idle_function=None)

 

Run the Pipeline.

Parameters:
  • idle_function - A routine to call periodically while running the pipeline.
Raises:
  • RuntimeError - Raised if Pipeline failed for any reason.

printAction(self, stagename, stagejobid, action)

 

Print an action for stage stagename.

Parameters:
  • stagejobid - The jobid of the stage.
  • action - The latest action of the stage.

cleanupIntermediateFiles(self, stagejob)

 

Remove any stage outputs that are no longer needed. Intermediate files are any outputs of a previously completed stage. They are no longer needed if they are not marked as a USEROUT, and they are not needed as input for any yet-to-be-run stage.