Other Modules

Pipelined Workflows

A Python pipeline workflow consists of multiple stages that are run in serial or parallel manner (or a mixture of both). Each stage performs one specific operation on one or more input sets and returns one or more output sets. Each set is file-based and has one of the following types:

  1. Structures - a set of one or more structure file paths.
  2. Grid - a path to a grid file.
  3. Text - a path to one or more text files.
  4. PhaseDB - a path to a Phase database.

You can also define custom set types by overwriting the PipeIO class.

Examples of operations that can be performed by a stage:

  1. Filtering of ligands.
  2. Running a program (like LigPrep or Glide) on ligands.
  3. Modifying a Phase database.
  4. Converting ligands from one file format to another.

Each stage can optionally accept keywords. Keywords are based on the ConfigObj format but differ in that equals signs are not used to separate keyword-value pairs. Values can be of type integer, float, boolean, or string. More complicated value types are also supported. See existing stages for examples and the ConfigObj validation functions for details.

There are many stages already written for tasks that are run by VSW, QPLD, CovalentDocking, and GeneratePhaseDB workflows. These can be found in the API documentation for the Pipeline stages package.

You can also write your own stage. The best way to get started is to read this manual and to look at some existing stages.

A stage can optionally have these features:

  1. The ability to run simultaneous subjobs. This will make your stage more complex, and requires the use of JobDJ.
  2. Restartability. Adding this feature will allow the stage to restart from the middle in case the user’s job fails.

However, we don’t recommend adding these features until the stage is functioning properly without parallelization and restartability.

Here is an example of a basic stage class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from schrodinger.pipeline import pipeio, stage
from schrodinger import structure       # for StructureWriter class
from schrodinger.structutils import io  # for MultiFileReader class

class ExampleStage(stage.Stage):
    def __init__(self, *args, **kwargs):
        stage.Stage.__init__(self, *args, **kwargs)

        specs="""
        KEEP_STS = integer(default=10)
        """
        stage.Stage.__init__(self, specs=specs, *args, **kwargs)

        self.addExpectedInput(1, "structures", required=True)
        self.addExpectedOutput(1, "structures", always=True)


    def operate(self):
        keep_sts = self['KEEP_STS']

        # Read the input set:
        input_files = self.getInput(1).getFiles()

        # Prepare the output writer:
        out_file = self.getOutputName(1) + ".maegz"
        writer = structure.StructureWriter(out_file)

        st_num = 0
        for st in io.MultiFileReader(input_files):
            writer.append(st)
            st_num += 1

            if st_num == keep_sts:
                break

        writer.close() # close the structure writer

        self.info("Total {} structures kept".format(st_num))

        self.setOutput(1, pipeio.Structures([out_file], st_num))

This stage keeps the first KEEP_STS structures it reads, passing them on to the next stage or output file.

Going through this example by line number:

  • Line 1: General imports for all stage modules.
  • Lines 9-12: Define the keywords for the stage, following ConfigObj specification.
  • Line 14: Define the only supported input set. This is input set number 1, of type “structures”, and is required.
  • Line 15: Define the only output set. This is output set number 1, of type “structures,” and is always generated.
  • Line 18: operate is the entry point into the stage’s code and is a required method for all stages.
  • Line 19: Access the value for the keyword. This is defined in the pipeline input file.
  • Line 22: Get the list of file paths from the input set. These files are either specified in the pipeline input files, or are passed from a preceding stage.
  • Line 25: Determine what the user would like to call the output files (from the pipeline input file).
  • Line 26: Open a structure writer for writing the output file.
  • Line 29: Read the input files (there may be more than one)
  • Line 38: Each stage has self.info(), self.warning(), self.error(), and self.debug() methods for printing various messages to the log files.
  • Line 40: Set the output set to a list of output files (in this case only one file). Optionally the number of items in this set can be specified (in this case, the number of structures).

Here is a sample input file that shows how the stage can be used:

[ SET:ORIGINAL_LIGANDS ]
    VARCLASS    Structures
    FILES       /Users/adzhigir/50ligs.maegz

[ STAGE:EXAMPLE_STAGE ]
    STAGECLASS  example.ExampleStage
    INPUTS      ORIGINAL_LIGANDS
    OUTPUTS     FILTERED_LIGANDS
    KEEP_STS    20

[ USEROUTS ]
    USEROUTS    FILTERED_LIGANDS
    STRUCTOUT   FILTERED_LIGANDS

See the Python Pipeline Manual for more info on the workflow input file format.

Here is a sample log file from this stage:

Stage test-example-EXAMPLE_STAGE initializing...

Running stage: test-example-EXAMPLE_STAGE
SCHRODINGER: /software/schro2012
PYTHONPATH: None
Job ID: pc-16-21-111-0-4ed6bf94
Time: Wed Nov 30 15:43:17 2011
Stage started successfully

Python version: 0
Total 20 structures kept

STAGE COMPLETED. Elapsed time: 0 seconds

    Output #1 (test-example-FILTERED_LIGANDS) [structures(20)]:
       test-example-EXAMPLE_STAGE/test-example-FILTERED_LIGANDS.maegz

Job Control

The jobcontrol module contains four major sections:

  1. Job data interaction - Deals with getting information about existing jobs.
  2. Job launching - Deals with starting a subjob.
  3. Job backend interaction - Provides utilities for a Python script running as a job.
  4. Job Hosts.

To run a LigPrep job to convert a SMILES input file to 3D:

import schrodinger.job.jobcontrol as jc
ligprep_output = options.jobname+".mae"
run_ligprep = [os.path.join(os.environ['SCHRODINGER'], 'ligprep')]
options = ["-i", input_smiles_file, "-o", output_3d_mae_file]
command = run_ligprep + options
job = jc.launch_job(command)
print("LigPrep job status: {}".format(job.Status))
job.wait()
print("LigPrep job status: {}".format(job.Status))

Introduction to JobDJ

The JobDJ class is used to write driver scripts for “distributed jobs”, which involve one or more subjobs independently carrying out different parts of a larger computation, in parallel. JobDJ can submit individual jobs to a queueing system (like LSF or SGE) or an explicit list of compute machines.

JobDJ is primarily used to run programs under job control, but can also be used to manage simple local tasks, executed as subprocesses of the driver. It can be asked to retry failed jobs, which is often necessary to get large distributed jobs to complete successfully. It also provides a mechanism for enforcing dependencies between jobs.

This document will only describe the most common use case for JobDJ, which is to run a number of independent subjobs under job control.

Usage

Logically, there are three steps to running a distributed job with JobDJ:

  1. specify the list of hosts on which subjobs will be run (normally handled automatically),
  2. define the subjobs, by specifying the command to use to start each one, and,
  3. let JobDJ run the jobs.
The Host List
The host list is defined when you create a JobDJ instance. If your script is running under job control, JobDJ will automatically use the host list specified on the commandline, unless you override that with an explicit host list, specified as a list of (host, max_jobs) tuples, where max_jobs is the maximum number of jobs to run at a time on each host.
The Subjobs
Add jobs to the JobDJ instance using the addJob() method. Jobs are defined as instances of a BaseJob subclass, such as JobControlJob or SubprocessJob. If you’re running job control jobs, you can just specify the command to start the job and a JobControlJob instance will be created for you.
Running the Subjobs
Run all jobs defined on a JobDJ instance using its run() method. This will then block until all subjobs have completed. If you want to take some action whenever a subjob’s status changes, you can loop over the updatedJobs() generator; this yields a job every time its status changes.

Examples

In all of the following examples, the schrodinger.job.jobcontrol and schrodinger.job.queue modules are assumed to have been imported under the aliases jobcontrol and jobqueue.:

import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.job.queue as jobqueue

Basic Usage

In the simplest case, a driver script running under jobcontrol just needs to define one or more subjobs and call the JobDJ object’s run() method. JobDJ will run the jobs on the hosts specified on the command line. For example, you might write a driver script driver.py for running a set of Jaguar jobs like this,:

jobdj = jobqueue.JobDJ()
for infile in inputfiles:
    cmd = [‘jaguar’, ‘run’, infile]
    jobdj.addJob(cmd)
jobdj.run()

If you run this script with the command:

$SCHRODINGER/run -HOST host1,host2 driver.py

… JobDJ will automatically pick up the host list “host1, host2” through the job control system and run your jobs on those machines (one job on each machine at a time.) If the -HOST argument is a queueing system, then your jobs will all be submitted to that queueing system.

Explicit Host List

If you want JobDJ to run your subjobs on some particular set of hosts, you can specify that list when you create the JobDJ instance. For example,:

hostlist = [‘host1’, ‘host2’]
jobdj = jobqueue.JobDJ(hostlist)
...
jobdj.run()

You can get the host list that’s passed on from the command line from the getbackend_host_list() function in the jobcontrol module. Using this, the first example above could have been written:

hostlist = jobcontrol.get_backend_host_list()
jobdj = jobqueue.JobDJ(hostlist)
...
jobdj.run()

Retrying Failed Jobs

If you don’t do anything special, jobs run under JobDJ will not be retried if they fail. If you’d like failed subjobs to be run again, up to a fixed number of times, use the max_retries parameter in the JobControlJob constructor. For example,:

jobdj = jobqueue.JobDJ(hostlist)
...
job = jobqueue.JobControlJob(cmd, max_retries=3)
jobdj.addJob(job)
...
jobdj.run()

The older method for setting a nonzero number of retries was to specify a max_retries argument in the JobDJ constructor; this still works, but is now deprecated.

Retrying Failed Jobs Selectively

If you’d like launch failures to be retried, but not failures of your computational code, you need to override the JobControlJob.retryFailure() method.:

class MyJob(jobqueue.JobControlJob):
    def retryFailure(self, max_retries=0):
        obj = self.getJob()
        if obj and obj.ExitStatus == "died":
            return False  # don't retry the job
        return super(MyJob, self).retryFailure(max_retries)
...
jobdj = jobqueue.JobDJ(hostlist)
...
job = MyJob(cmd, max_retries=3)
jobdj.addJob(job)
...
jobdj.run()

This code uses the fact that an ExitStatus of “died” signals that the backend exited with a nonzero exit code. The retryFailure() method is passed a single argument (max_retries) from JobDJ. This is a value specified when constructing the JobDJ object, and it defaults to zero. The number of retries can also be specified on a per-job basis.