Source code for schrodinger.application.desmond.starter.generator.abfep

"""
Absolute binding FEP generator

Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import subprocess
import sys
from typing import List
from typing import Dict
from typing import Tuple
from typing import TYPE_CHECKING
from pathlib import Path

from schrodinger import structure
from schrodinger.application.desmond import cmdline
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import launch_utils
from schrodinger.application.desmond import mapper_msj_generator
from schrodinger.application.desmond import stage
from schrodinger.application.desmond.stage.app.absolute_binding import restraint
from schrodinger.application.desmond import util
from schrodinger.application.desmond.starter import ui

from . import common

if TYPE_CHECKING:
    from schrodinger.application.scisol.packages.fep import graph


def _get_restraint_params(ligand_restraint: bool) -> Dict:
    """
    Return the set of ligand restraint parameters.

    :param ligand_restraint: Set to True if the ligand restraint
        is enabled, False otherwise.
    """
    if ligand_restraint:
        return {
            'enable': ligand_restraint,
            'name': 'soft',
            'sigma': 0.0,
            'alpha': 1.0,
            'fc': 40.0,
        }
    else:
        return {'enable': False}


def _prepare_msj(args: ui.abfep.Args, has_membrane: bool,
                 fmp_fname: str) -> str:
    """
    Generate the msj files using the arguments.
    If an existing master msj is given, just check that
    it can be read before returning the filename.

    :param args: Command-line arguments.
    :param has_membrane: Set to True if the input structures contains
        a membrane, False otherwise.

    :return: The filename of the master msj.
    """
    if args.msj:
        master_msj_fname = args.msj
        # Validate msj
        msj = cmj.msj2sea_full(args.msj)
        analysis_stage = next(
            mapper_msj_generator._find_stage(
                msj.stage, stage.FepAbsoluteBindingAnalysis.NAME))
        if analysis_stage['input_graph_file'].val != fmp_fname or not Path(
                fmp_fname).exists():
            sys.exit(
                f"ERROR: {args.msj} contains a setting "
                f"input_graph_file={analysis_stage['input_graph_file']} which does "
                f"not match provided input file")
    else:
        cd_params = {
            "processors_per_replica": 1,
            "cpus": args.ppj,
            "mps_factor": args.mps_factor,
        }
        generator_args = [args.JOBNAME, cd_params]
        generator_kwargs = dict(
            forcefield=args.forcefield,
            rand_seed=args.seed,
            md_sim_time=args.md_sim_time,
            sim_time=args.fep_sim_time,
            custom_charge_mode=args.custom_charge_mode,
            ligand_restraint=_get_restraint_params(args.ligand_restraint),
            adaptive_ligand_restraint=_get_restraint_params(
                args.adaptive_ligand_restraint),
            use_representative_structure=not args.use_final_frame,
            ensemble=args.ensemble,
            concatenate=False,
            membrane=has_membrane,
            graph_file=fmp_fname,
            max_walltime=args.max_walltime,
        )
        generator = mapper_msj_generator.AbsoluteBindingMsjGenerator(
            *generator_args, **generator_kwargs)
        generator.write_md_msj()
        generator.write_complex_msj()
        generator.write_solvent_msj()
        master_msj_fname = generator.write_master_msj()
    return master_msj_fname


def _cmd_for_extend_restart_job(args: ui.abfep.Args) -> List[str]:
    """
    Return a command for launching the extend multisim job.
    Returns None if the multisim stage could not be found.

    :param args: Command line arguments.
    """
    ligands = None
    if args.extend:
        try:
            ligands = util.parse_ligand_file(args.extend)
        except ValueError as ex:
            # If the file is invalid
            sys.exit(str(ex))

    if not args.checkpoint:
        args.checkpoint = launch_utils.find_checkpoint_file()

    args.inp_file = None  # inp file will be collected by stage code
    try:
        cmd, stage_data_fnames = common.prepare_files_and_command_for_fep_restart_extend(
            args,
            ligands,
            launcher_stage_name=stage.FepAbsoluteBindingFepLauncher.NAME)
    except common.RestartException as ex:
        sys.exit(str(ex))

    old_fmpdb_fname = common.find_fmpdb_file(args)
    if old_fmpdb_fname:
        stage_data_fnames.append(old_fmpdb_fname)

    forcefield = None
    cmd += launch_utils.additional_command_arguments(
        stage_data_fnames, args.RETRIES, args.WAIT, args.LOCAL, args.DEBUG,
        args.TMPDIR, forcefield, args.OPLSDIR, args.NICE, args.SAVE)

    return cmd


def _cmd_for_new_job(args: ui.abfep.Args) -> List[str]:
    """
    Return a command for launching a new multisim job.
    """
    from schrodinger.application.scisol.packages.fep import abfep_utils

    fmp_path, inp_graph = prepare_inputs(
        Path(args.inp_file),
        bool(args.ligand_restraint or args.adaptive_ligand_restraint),
        Path(args.JOBNAME))

    ligand_sts = [
        abfep_utils.get_ligand_node(e).struc for e in inp_graph.edges_iter()
    ]

    # Write ligand file for extend
    util.write_ligand_file(f'{args.JOBNAME}.ligand', ligand_sts)

    has_membrane = inp_graph.membrane_struc is not None
    master_msj_fname = _prepare_msj(
        args, has_membrane=has_membrane, fmp_fname=str(fmp_path))
    cmd = launch_utils.prepare_command_for_launch(
        args.HOST,
        args.SUBHOST,
        args.JOBNAME,
        master_msj_fname,
        args.maxjob,
        input_fname=str(args.inp_file))

    stage_data_fnames = []
    forcefield = None
    cmd += launch_utils.additional_command_arguments(
        stage_data_fnames, args.RETRIES, args.WAIT, args.LOCAL, args.DEBUG,
        args.TMPDIR, forcefield, args.OPLSDIR, args.NICE, args.SAVE)

    return cmd


[docs]def prepare_inputs(inp_path: Path, has_ligand_restraint: bool, jobname: Path) -> Tuple[Path, "graph.Graph"]: """ Given the user defined input file, write fmp and mae files for use in ab fep workflow :param inp_path: The input file path, either an fmp or mae :param has_ligand_restraint: whether the ligand restraint option was used """ from schrodinger.application.scisol.packages.fep import fepmae from schrodinger.application.scisol.packages.fep import graph from schrodinger.application.scisol.packages.fep import graph_generator if inp_path.suffix == ".fmp": fmp_path = inp_path g = graph.Graph.deserialize(inp_path) else: fmp_path = Path(jobname.with_suffix('.fmp').name) input_sts = list(structure.StructureReader(inp_path)) receptor_st, solvent_st, membrane_st, ligand_sts = fepmae.filter_receptors_and_ligands( input_sts) g = graph_generator.gen_graph_absolute( ligand_sts, [receptor_st, solvent_st, membrane_st]) # make the fmp consistent with hot atoms in fep_absolute_binding_fep_primer restraint.overwrite_hotatoms(g, has_ligand_restraint) g.write(fmp_path) return fmp_path, g
[docs]def generate(args: ui.abfep.Args) -> List[str]: """ Generate the files and a command to run multisim for the absolute binding FEP workflow. :param args: Command line arguments :return: Command to launch a multisim job """ if args.extend or args.RESTART: cmd = _cmd_for_extend_restart_job(args) else: cmd = _cmd_for_new_job(args) # Adds extra options. cmd += ['-o', args.JOBNAME + "-out.mae"] print("Launch command:", subprocess.list2cmdline(cmd).replace(os.environ["SCHRODINGER"], "$SCHRODINGER")) cmd.extend([ "-encoded_description", cmdline.get_b64encoded_str(cmdline.get_job_command_in_startup()), ]) return cmd