schrodinger.pipeline.pipeline module

Classes for running pipelines.

The main class is called Pipeline. This class parses the input file, creates appropriate stages, and runs them in their own subdirectories.

The StageJob class represents a pipeline job linked to a specific stage.

The IO (In/out object) classes (defined in pipeio.py) represent information that is from one stage to another, such as a list of files. They are also called Variables.

Input Object Syntax

The Pipeline input file is used to specify which stages to run, how to run them (parameters), what to use for input, and where to send the output. An example input file looks like:

SET MY_INPUT
    VARCLASS Structures
    FILE /home/adzhigir/vsw_testing/20confs.mae

The SET line value (MY_INPUT) specifies the name of the IO object. The VARCLASS value (Structures) specifies the PipeIO class to create. Pipeline uses VARCLASS to determine which variable to create. Pipeline will search schrodinger.pipeline.pipeio module for the class name specified of this line. If it is not found there, it assumes a custom class is specified as absolute path. (In this case, make sure the custom module is in your PYTHONPATH.)

All lines following VARCLASS are used to define what information to put into this variable, in this case it is a Maestro file (20confs.mae).

Stage Syntax

An example stage file looks like:

STAGE MY_STAGE
    STAGECLASS  macromodel.ConfSearchStage
    INPUT       MY_INPUT
    OUTPUT      MY_OUTPUT
    FFLD        MMFFS

The STAGE line value (MY_STAGE) specifies the name of the stage. The STAGECLASS keyword specifies <module>. that defines the stage. Pipeline uses STAGECLASS to determine which stage to create. Pipeline will search schrodinger.pipeline.stages namespace as well. Please make sure the module is in your PYTHONPATH.

See schrodinger.pipeline.stages.combine for an example on how to write a stage module.

Input variables for the stage are specified via INPUT keywords, and outputs via OUTPUT keywords. The rest of the keywords tell the stage how to run.

If you wish to run the Pipeline without using the pipeline startup machinery:

p = pipeline.Pipeline([options])
p.readFile(<input file>)
try:
    p.run()
except RuntimeError:
    ...

If restartability is important, specify the restart_file when constructing the Pipeline object.

To restart Pipeline, do:

p = pipeline.Restart(restart_file [, new options]),
try:
    p.run()
except RuntimeError:
    ...

where restart_file is the same file that you specified to this constructor when the initial instance was created.

Copyright Schrodinger, LLC. All rights reserved.

schrodinger.pipeline.pipeline.log(text)

Prints specified text to the log pipe; adds a return at the end

schrodinger.pipeline.pipeline.logn(text)

Print the specified text to the log pipe with no newline. This is especially useful when printing progress periods.

schrodinger.pipeline.pipeline.add_host_lists(list1, list2)

Append hosts in list2 to list1.

Example::

list1 = a:5,b:10 list2 = a:2,c:12 output = a:7,b:10,c:12

The order of hosts is retained (first list is given priority).

schrodinger.pipeline.pipeline.subtract_host_lists(list1, dict2)

Return available (not used) hosts. This function subtracts the host dict dict2 from the host dict list1.

Parameters
  • list1 (dict) – All available hosts (specified by user), with hostname as key and cpu count as value.

  • dict2 (dict) – All used hosts (used by stages)

schrodinger.pipeline.pipeline.importName(modulename, name)

Import a named object from a module in the context of this function.

For example, if you would like to create an instance of the Foo class from the bar.py module:

foo_class = importName("bar", "Foo")
foo_instance = foo_class()
Raises

ImportError – Raised when the object can not be imported.

class schrodinger.pipeline.pipeline.StageJob(stageobj, pipeline, module)

Bases: object

A “Job” that is used by Pipeline to run a Stage.

Each StageJob has a Stage object associated with it. This object is periodically dumped to a file in order to support restartability. The process is called “dumping” and the file is the dump file. When Pipeline is restarted, each stage object is recovered from the associated dump file to get the latest state.

__init__(stageobj, pipeline, module)
Parameters
  • pipeline – Reference to the Pipeline object.

  • stageobj (Stage) – Stage to run.

  • module – The module where the above stage is defined.

setUsedHosts(new_host_list)
getUnusedHostList()
updateStageCpus()

Based on current host usage and number of needed cpus, determine which hosts this stage should use and send them to it in a message.

sendHostsToUse()

Send a message to the stage job telling it how many CPUS to use. Gets called periodically in case messages don’t go through.

restart(action)

Mark this job to be restarted by the Pipeline.

finish()

Sets the pipe’s stage object to the final finished stage object from the dump file, and parses all of the outputs.

readyToRun(objects)

Return True if this StageJob has all inputs that are required for it to start.

printAction(action)

Call the Pipeline’s printAction method.

died(action)

Mark this stage as failed. The “action” gets saved in the “died_action” attribute, and will be printed out at the end of the workflow.

This gets called every time a StageJob dies by raising a RuntimeError exception.

printFailureMessage()

Print the failure status of the stage and the path to the log file.

updateFromDump(quiet=False)

Update this stage of the pipeline to the latest state from the dump file.

class schrodinger.pipeline.pipeline.Pipeline(jobname='pipeline', prog=None, logfh=None, restart_file=None)

Bases: object

A controller responsible for running the stages in the correct order.

Pipeline parses the input file, creates instances of all IO objects, stage objects, and stage job objects, submits the stages in the appropriate directories, and waits for them to finish. Once a stage finishes, it starts any stages that depend on its output. When all stages are complete, it presents the user with the USER OUTPUT objects - IO output objects that are to be returned by the pipeline.

__init__(jobname='pipeline', prog=None, logfh=None, restart_file=None)

Initialize self. See help(type(self)) for accurate signature.

setOptions(subjob_hosts=None, njobs=None, adjust=None, force=None, cleanup=None, max_retries=None)

Set the options of the pipeline.

Call this function before calling pipeline.run() to set hosts/njobs/etc. When restarting, call this function to modify the options.

readNewFormatVariable(varname, keywords)
readNewFormatStage(stagename, keywords)
readNewFormat(command_file)
readFile(command_file)

Read a Pipeline input file.

Raises

RuntimeError – Raised if there is a problem with input file.

checkUserOutputs()

Make sure that all specified user outputs are variable names that are returned by a stage. This is done to fail on typos in input file.

Raises

RuntimeError – Raised on invalid USEROUT name.

createStage(stagename, inIOnames, outIOnames, stage_class, keywords)

Create a stage object and add it to the pipeline.

Parameters
  • stagename – Name of the stage.

  • inIOnames – Input pipeio object names.

  • outIOnames – Output pipeio object names.

  • stage_class – module.class defining the stage.

  • keywords (list) – All keywords for the stage, a list of (keyword, value) tuples.

Raises

RuntimeError or ImportError – Raised on input file error.

reportParameters()

Print the parameters of each stage.

inputsForStagesDefined()

Check if the inputs for all stages are either specified in the input variables or are outputs from other stages.

Raises

RuntimeError – Raised if inputs are not defined, as this indicates the input file is invalid.

startReadyStages()

Start all stages that are ready to be run.

When restarting, start WAITING and RESTARTING stages. When NOT restarting, start only WAITING stages. Return the number of stages that were started (not currently used).

setStageOptions(stageobj)

Propagate the pipeline options (hosts, ncpus, etc) to the specified stage.

requestCpus()
updateStagesCpus()

Send messages to stages (if necessary) telling them how many processors to use from each host.

dump()

Dumps the Pipeline instance to a restart file

getStageByJobid(jobid)
handleStageMessages()
distributeCpus()

Called when extra CPUs become available (as given back by the stages using them). Will distribute the freed-up CPUs to other stages.

run(idle_function=None)

Run the Pipeline.

Parameters

idle_function – A routine to call periodically while running the pipeline.

Raises

RuntimeError – Raised if Pipeline failed for any reason.

getUserOutputs()

Return a list of pipeio objects that are to be presented to the user at the end of the Pipeline run.

getStructureOutput()
getUserOutputFiles()

Return a list of files for all user (final) outputs of Pipeline.

printAction(stagename, stagejobid, action)

Print an action for stage stagename.

Parameters
  • stagejobid – The jobid of the stage.

  • action – The latest action of the stage.

cleanupIntermediateFiles(stagejob)

Remove any stage outputs that are no longer needed. Intermediate files are any outputs of a previously completed stage. They are no longer needed if they are not marked as a USEROUT, and they are not needed as input for any yet-to-be-run stage.

getUsedHosts(host_pool)

Return a dictionary of hosts that are CURRENTLY checked out (used) by all stages combined (within specified host_pool).

schrodinger.pipeline.pipeline.Restart(restart_file, restartbeg=False)

Recover a saved Pipeline instance.

Specify new options only if the settings need to change.

Returns a Pipeline instance recovered from the restart_file. You need to call pipeline.run() in order to get the pipeline running.

Raises

RuntimeError – Raised if a Pipeline can’t be loaded from the specified file.

Parameters

restartbeg – Whether to start failed stages from beginning.