Jobcontrol

Jobcontrol is a tool for running tasks asynchronously and (optionally) on different machines. Schrödinger wrote Jobcontrol instead of using off the shelf tools for distributed computation in order to deal with unusual requirements of our software, especially the size of the binaries required for our computations.

You might use Jobcontrol to launch a task from a laptop (running Maestro) to a compute node, so that the task runs on several cores. Jobcontrol takes care of transferring input files from your machine to the cluster, monitoring progress of the job, and collecting results and log files once the job is complete.

Jobcontrol is also used to write more complicated multistage workflows that treat the outputs of some jobs as inputs for later stages. This is discussed below.

Running code under jobcontrol

Running a Job

Jobs are usually launched by running a command with -HOST <host entry argument>. Any executable that supports this option can be run under Jobcontrol.

Host entries are defined in schrodinger.hosts files that live at the root of your installation. Most executables will run the job on localhost if -HOST is not specified, others may require explicit -HOST localhost argument.

Example:

$SCHRODINGER/ligprep -imae in.mae -omae out.mae

This will run ligprep locally on in.mae and produce an out.mae output file.

Adding -HOST bolt_cpu would submit the job to the bolt host defined in the hosts file.

Ordinary scripts

It is possible to add support for Jobcontrol for simple Python scripts. This section will show how to add such support to a simple script.

In this example, myscript.py will simply print out the hostname that the script is running on to show that our script will have different outputs on different machines.

import socket

def main():
     print(socket.gethostname())

if __name__ == "__main__":
    main()

$SCHRODINGER/run myscript.py will print out your local hostname.

(In this case, jobcontrol is not involved at all, and the value of SCHRODINGER/run is just that it allows the use of Schrödinger’s Python API.)

Add jobcontrol API

If we want to execute our script under jobcontrol, locally or remotely, we need to add a function at the top level that jobcontrol can use to understand the requirements of the job. This function must be called get_job_spec_from_args and it must return a launchapi.JobSpecification. Note that the script must be importable as a module for Jobcontrol to be able to run the script’s function.

An important requirement for running our script under jobcontrol is to set the script itself as an input file since it is not already in the distribution. We do this in the example below using setInputFile("myscript.py"). Note that this is necessary even when we run with -HOST localhost since even in that case the job will run under a temporary working directory.

In in the example below, in addition to adding the script itself as an input file, we also direct stderr and stdout to a file named “myscript.log” that can be streamed as the job progresses (using jsc tail-file) and will be downloaded to the current working directory as output upon completion of the job:

import socket
from schrodinger.job import launchapi

def get_job_spec_from_args(argv):
    """
    Return a JobSpecification necessary to run this script on a remote
    machine (e.g. under job control with the launch.py script).

    :type argv: list(str)
    :param argv: The list of command line arguments, including the script name
    at [0], matching $SCHRODINGER/run __file__ sys.argv
    """
    job_builder = launchapi.JobSpecificationArgsBuilder(argv)
    job_builder.setStderr("myscript.log")
    job_builder.setStdout("myscript.log")
    job_builder.setInputFile("myscript.py")
    return job_builder.getJobSpec()

def main():
     print(socket.gethostname())

if __name__ == "__main__":
    main()

Note that we specify input and logging file names via relative path and not absolute path.

$SCHRODINGER/run myscript.py will run the script without Jobcontrol, and will print out your local hostname.

$SCHRODINGER/run myscript.py -HOST localhost will run the script under Jobcontrol on your local machine, and will log your local hostname to myscript.log

$SCHRODINGER/run myscript.py -HOST bolt_cpu will log the hostname of bolt compute node to myscript.log

Register input and output files

As mentioned above, files that are transferred from the launch machine to the compute machine need to be registered by job control, including the script itself if it is not within the SCHRODINGER installation on the compute machine.

Input files are registered with the setInputFile method when building a job specification. Within this document, we use the invocation setInputFile("myscript.py") to register the current jobcontrol script as an input file, intending the script file to be saved and run as myscript.py. Other input files for the job can be added in the same way.

Similarly, output files can be registered with the setOutputFile method when building a job specification.

In the following example, in addition to the script itself, we register an input maestro file and an output maestro file.:

import os
import sys
from schrodinger import structure
from schrodinger.job import launchapi

def get_job_spec_from_args(argv):
    job_builder = launchapi.JobSpecificationArgsBuilder(argv)
    input_file = argv[1]
    output_mae_file = os.path.basename(input_file) + "processed.mae"
    job_builder.setInputFile("myscript.py")
    job_builder.setInputFile(input_file)
    job_builder.setOutputFile(output_mae_file)
    job_builder.setStderr("myscript.log")
    job_builder.setStdout("myscript.log")
    return job_builder.getJobSpec()

def main():
    input_file = sys.argv[1]
    output_file = os.path.basename(input_file) + "processed.mae"
    with structure.StructureReader(input_file) as reader:
        with structure.StructureWriter(output_file) as writer:
            for ct in reader:
                ct.title = ct.title + "processed"
                writer.append(ct)

if __name__ == "__main__":
    main()

Execute using: $SCHRODINGER/run myscript.py foo.mae -HOST localhost

Output files will be downloaded to the current working directory.

Using a jobname

Any job can be given a jobname via the -JOBNAME option, and many scripts use it to determine the names of the log files and output files. If -JOBNAME option is not specified, some scripts derive the job name from the name of the main input file, while others use a default job name.

Above, we explicitly called setStderr and setStdout in get_job_spec_from_args. In the example below, we instead pass use_jobname_log=True to the constructor of launchapi.JobSpecificationArgsBuilder:

import socket
from schrodinger.job import launchapi

def get_job_spec_from_args(argv):
    job_builder = launchapi.JobSpecificationArgsBuilder(argv, use_jobname_log=True)
    job_builder.setInputFile("myscript.py")
    return job_builder.getJobSpec()

def main():
     print(socket.gethostname())

if __name__ == "__main__":
    main()

Execute using: $SCHRODINGER/run myscript.py -JOBNAME foo -HOST localhost

Because we passed use_jobname_log=True to the JobSpecificationArgsBuilder constructor our log file will get a suitable name without having to call setStderr and setStdout, as in the examples above.

Note that the use of use_jobname_log in the example above requires us to specify a jobname in some way. This can be done via the command line (as we just did above), by passing default_job_name into the constructor of JobSpecificationArgsBuilder, or by calling setJobname on an instance of JobSpecificationArgsBuilder.

Maestro Incorporation

Maestro files from a job can be marked for incorporation into maestro, meaning that its structures will show up in the project table.

def get_job_spec_from_args(argv):
    job_builder = launchapi.JobSpecificationArgsBuilder(argv)
    job_builder.setOutputFile("foo.mae", incorporate=True)
    return job_builder.getJobSpec()

Note that it is possible to set multiple output files to be incorporated as separate entries. Only Maestro-formatted files are supported for incorporation.

Using $SCHRODINGER/run -FROM <product>

Some scripts require $SCHRODINGER/run -FROM <product> to run (when they need to load libraries from one of the product directories). In this case, we mark this when we create a JobSpecification:

def get_job_spec_from_args(argv):
    job_builder = launchapi.JobSpecificationArgsBuilder(
        argv, schrodinger_product="scisol")
    return job_builder.getJobSpec()

Integration with an Argument Parser

An argument parser is useful when we want to document, validate, and access command line arguments within a script. It is easy to integrate an argument parser into a script that uses jobcontrol. Also notice how in this example, default job name is derived from the name of the input file, by using the fileutils.get_jobname() function.

import argparse
import os
import sys

from schrodinger import structure
from schrodinger.job import launchapi
from schrodinger.utils import fileutils

def parse_args(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument("inputfile", help="Input Maestro file")
    args = parser.parse_args(argv)
    return args

def get_job_spec_from_args(argv):
    # first argument is this script
    args_namespace = parse_args(argv[1:])
    job_builder = launchapi.JobSpecificationArgsBuilder(argv, use_jobname_log=True)
    job_builder.setInputFile(__file__)
    job_builder.setInputFile(args_namespace.inputfile)
    if not job_builder.getJobname():
        job_builder.setJobname(fileutils.get_jobname(args_namespace.inputfile))
    return job_builder.getJobSpec()

def main(argv):
    args = parse_args(argv)
    with structure.StructureReader(args.inputfile) as reader:
        for ct in reader:
            print(f"ct title={ct.title}")

if __name__ == '__main__':
    main(sys.argv[1:])

JobDJ

Introduction to JobDJ

The foregoing has shown us how to run a single job, with options, on a specified host. But often jobs are knit together into larger workflows. For this we use JobDJ, a workflow tool that makes it possible to run multiple, potentially simultaneous jobs, and provides a mechanism for enforcing dependencies between jobs representing different stages of a computation.

A JobDJ instance can submit individual jobs to a queueing system (like SLURM or UGE) or an explicit list of compute machines.

This document will cover a few types of more complex distributed workflows.

Basic Usage

In the simplest case, a driver script running under jobcontrol just needs to define one or more subjobs and call the JobDJ object’s run() method. JobDJ will run the jobs on the hosts specified on the command line. For example, you might write a driver script driver.py for running a set of Jaguar jobs like this:

import argparse
import sys

from schrodinger.job import launchapi
from schrodinger.job import queue


def parse_args(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument("inputfiles", nargs='+', help="Jaguar input files")
    args = parser.parse_args(argv)
    return args


def get_job_spec_from_args(argv):
    args_namespace = parse_args(argv[1:])
    job_builder = launchapi.JobSpecificationArgsBuilder(
        argv, default_jobname="test_jaguar_driver", use_jobname_log=True)
    job_builder.setInputFile("myscript.py")
    for infile in args_namespace.inputfiles:
        job_builder.setInputFile(infile)
    return job_builder.getJobSpec()


def main(argv):
    args = parse_args(argv)
    jobdj = queue.JobDJ()
    for infile in args.inputfiles:
        cmd = ['jaguar', 'run', infile]
        jobdj.addJob(cmd)
    jobdj.run()


if __name__ == '__main__':
    main(sys.argv[1:])

Note that jobdj.run() will block until all subjobs have completed, which is why driver scripts themselves should be run under jobcontrol.

Workflow Basics

Complex job workflows are organized by a driver script that is typically run under job control using a remote host. The use of a remote host eliminates disruptions to a network connection from the driver script to the cluster that might complicate jobs launches from, e.g., a personal laptop.

When any stage of a distributed computation completes, its output files will be returned to the current working directory of the driver script. In the event of network glitches, jobcontrol will attempt to return files for some time (currently about 30 minutes) and then the job is considered failed.

A common pattern in driver scripts for collecting output files for a job into a single location is to change the current working directory of the driver for the duration of the job to a directory that uniquely identifies that job (using, e.g., the job_id). This prevents later uses of the same workflow from accidentally overwriting the outputs of previous ones.

When a job completes successfully or is cancelled by a user, the job directory is cleaned up. Files not set as output files are not returned but rather are simply removed. If a job fails we leave the output files in the directory to aid investigation.

There is no support for restartability currently built into jobcontrol but drivers are free to implement their own solutions.

Working With The Workflow Graph

Jobs are added to a workflow by calling addJob on a JobDJ object. In the simplest case, a job command given as a list of strings can be passed to addJob and these are turned into a Job object internally. A Job is almost always an instance of JobControlJob. You can think of a job as a node in the workflow graph.

Running addJob in this way will create isolated job nodes in the workflow graph with no dependencies on each other. Ultimately, this means that calling jobdj.run() will run all added jobs concurrently because the order at which they run doesn’t matter.

In more complex cases, we can create a Job object and pass it to addJob. Relationships between stages of a computation are specified with the addPrereq method on a Job object. (A job’s immediate prerequisites can be inspected by calling getPrereqs on it.):

>>> from schrodinger.job import queue
>>> jobdj = queue.JobDJ()
>>> # A job that will produce a "producer.out" output file.
>>> producer_job = queue.JobControlJob(["testapp", "-t", "1", "-j", "producer"])
>>> # A job that will consume a "producer.out" and create a "consumer.out" output file.
>>> consumer_job = queue.JobControlJob(["testapp", "-t", "1", "-j", "consumer", "-i", "producer.out"])
>>> # Because consumer_job requires the output of producer_job as input, add
>>> # producer_job as a prerequisite.
>>> consumer_job.addPrereq(producer_job)
>>> consumer_job.getPrereqs() == {producer_job}
True
>>> # addJob has a kwarg `add_connected=True` which will automatically add
>>> # all prerequisite jobs to the jobdj instance, so we only need to add
>>> # consumer_job.
>>> jobdj.addJob(consumer_job)
>>> sorted(jobdj.all_jobs) == sorted([producer_job, consumer_job])
True
>>> # Calling jobdj.run() will now run producer_job to completion, download its output,
>>> # and only then run consumer_job.

The JobDJ.run() method accepts two optional arguments that allow a caller to register callbacks in connection with the workflow.

The first, status_change_callback will be called whenever any Job’s status changes. This callback will be called with the schrodinger.job.queue.BaseJob whose status has changed.

For periodic actions while the workflow is in progress, the periodic_callback can be used. This callback will be called with no arguments. The periodic_callback is run every callback_interval seconds (approximately); the default value is every 300 seconds.

Note that status_change_callback and periodic_callback will be called by JobDJ in a blocking fashion - this means job updates will stop being processed and won’t continue processing until your callable returns!

Intermediate actions can also be added by calling the addFinalizer method on the job object and passing in a callable. The callable will be invoked when the job completes successfully. For example, if a job outputs a file named “foo.txt” but a job in the next stage expects this file as an input file named “bar.txt”, you can add a finalizer to the initial job that will rename “foo.txt” to “bar.txt”:

import shutil
from schrodinger.job import queue

job = queue.JobControlJob(["myscript.py"])
def renameFile(job: queue.BaseJob):
    shutil.move("foo.txt", "bar.txt")
job.addFinalizer(renameFile)

Like the JobDJ.run() callbacks, finalizers are blocking calls and should ideally be short operations. For longer operations, it may instead be preferable to launch a JobControlJob that invokes addPrereq with the job to be finalized.

Failure Tolerance

Jobs can fail for a variety of reasons such as incorrect scientific assumptions or network issues. Failures can be acceptable in some workflows but detrimental in others. JobDJ allows for flexibility in failure handling and job retrying through some simple configuration parameters.

When constructing a JobDJ class, you can use the max_retries, default_max_retries, and max_failures parameters to control failure and retry behavior. The default behavior is that no subjobs will be retried, and no failures tolerated.

  • The max_retries parameter is the number of allowed retries per subjob. If you wish to disable retries, set this value to zero.

  • The default_max_retries parameter is the same as the max_retries parameter, except that it allows the SCHRODINGER_MAX_RETRIES environment variable to override this value.

  • The max_failures parameter is the total number of allowed independent subjob failures before JobDJ exits. Subjob failures that were retried successfully do not add to this count. To allow an unlimited number of subjob failures, use the module level NOLIMIT constant (schrodinger.job.queue.NOLIMIT).

For example, if you wish to create a JobDJ that always allows 3 retries per subjob and allows any number of total failures, use the invocation:

from schrodinger.job import queue
jobdj = queue.JobDJ(max_retries=3, max_failures=queue.NOLIMIT)

The max_retries parameter is also available when creating an instance of a JobControlJob, and will override the value set in JobDJ.