Source code for schrodinger.job.jobcontrol

"""
Core job control for python.

There are currently four major sections of this module - "Job database,"
"Job launching," "Job backend," and "Job hosts." The job database section
deals with getting info about existing Jobs, the job launching section
deals with starting up a subjob, and the job backend section provides
utilities for a python script running as a job.

Copyright Schrodinger, LLC. All rights reserved.
"""

import base64
import collections
import contextlib
import enum
import functools
import inspect
import logging
import os
import re
import shlex
import subprocess
import sys
import tempfile
import time
import warnings
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from urllib.parse import urlparse
from urllib.request import url2pathname

import backoff
import more_itertools

import schrodinger.infra.mm as mm
import schrodinger.infra.mmjob as mmjob
import schrodinger.utils.fileutils as fileutils
import schrodinger.utils.log
import schrodinger.utils.mmutil as mmutil
from schrodinger import get_maestro
from schrodinger import get_mmshare_version
from schrodinger import get_release_name
from schrodinger import gpgpu
from schrodinger.infra.mminit import Initializer
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import application
from schrodinger.utils import qt_utils
from schrodinger.utils import subprocess as subprocess_utils

from . import resource

_version = "$Revision: 1.105 $"

LOCAL_RUN = os.path.join('${SCHRODINGER}', 'run')

logger = logging.getLogger("schrodinger.jobcontrol")
schrodinger.utils.log.default_logging_config()
# If JobControl debugging is on, also turn on Python debugging for this module.
if os.environ.get('SCHRODINGER_JOB_DEBUG'):
    logger.setLevel(logging.DEBUG)
profiling = os.getenv("JC_PROFILING", 0)

maestro = get_maestro()
mmjob.mmjob_initialize(mm.MMERR_DEFAULT_HANDLER, "")


class DisplayStatus(enum.Enum):
    WAITING = "Waiting"
    RUNNING = "Running"
    CANCELED = "Canceled"
    STOPPED = "Stopped"
    FAILED = "Failed"
    COMPLETED = "Completed"


def timestamp(msg):
    if profiling:
        print("@@", datetime.now().strftime("%H:%M:%S.%f"), "-", msg)


##
# A regular expression to grab a JobId, e.g. out of a launch command's
# stdout.
#
jobid_re = re.compile(r"^JobId:\s+(\S+)", re.IGNORECASE | re.MULTILINE)

##
# The timestamp format used for the Job database in the LaunchTime and
# StartTime fields.
#
timestamp_format = "%Y-%m-%d-%H:%M:%S"

##
# The order of host-entry fields, in which they will be stored
# in the _lines attribute of a Host object
entry_fields = [
    "name", "base", "host", "nodelist", "user", "queue", "qargs", "schrodinger",
    "proxyhost", "proxyport", "proxyexec", "tmpdir", "shareddir", "env",
    "processors", "processors_per_node", "parallel", "ngpu", "gpgpu",
    "maestrocontrols", "walltime", "memory", "accountcodes"
]
field_sortkey = {k: v for (v, k) in enumerate(entry_fields)}

if sys.platform == "darwin":
    INSTALL_ROOT = "/opt/schrodinger"
elif sys.platform == "win32":
    INSTALL_ROOT = "C:\\Program Files\\Schrodinger"
else:
    INSTALL_ROOT = "/opt/schrodinger"

HOSTS_FILE = "schrodinger.hosts"

# Characters that don't require escaping on command-line of any OS:
SAFE_COMMAND_CHARS = set(
    'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.')

# Programs which are stoppable by sending 'halt' message to the backend.
STOPPABLE_PROGRAMS = {
    "Desmond", "WaterMap", "multisim", "Quantum ESPRESSO executable",
    "Periodic DFT"
}

# A regular expression to check if the argument of the command list
# is a shell redirection operator.
shell_redirect_re = re.compile(r'^\d*(<{1,2}|>{1,2})&?\d*$')

qapp = None


class JobcontrolException(Exception):
    pass


class JobLaunchFailure(JobcontrolException, RuntimeError):
    pass


class MissingHostsFileException(JobcontrolException):
    pass


class UnreadableHostsFileException(JobcontrolException):
    pass


##
# Job database stuff
#


class Job:
    """
    A Job instance is always a snapshot of the job state at a specific
    point in time. It is only updated when the `readAgain` method is
    explicitly invoked.
    """

    def __init__(self, job_id: str, cpp_job: mmjob.Job = None):
        """
        Initialize a read-only Job object.

        :param job_id: Unique identifier for a job
        :param cpp_job: provide a c++ job object in memory, used for
            constructing objects in wrapper objects from c++ APIs, rather than
            direct construction.

        """
        _launch_qapp()

        self._case_insensitive_properties = self._get_case_insensitive_lookup()
        if cpp_job:
            self._cpp_job = cpp_job
            self.job_id = self.JobId
        else:
            self.job_id = job_id
            self.readAgain()

    def _get_case_insensitive_lookup(self) -> Dict[str, str]:
        """
        Construct a map of lowercase properties to valid attribute members
        """
        d = {}
        for name, _ in inspect.getmembers(Job, inspect.isdatadescriptor):
            if name.startswith("_"):
                continue
            d[name.lower()] = name
        return d

    def readAgain(self):
        """
        Reread the database. Calling this routine is necessary to get fresh
        values.
        """
        backend = get_backend()
        if backend and backend.job_id == self.job_id:
            self._cpp_job = mmjob.get_backend_job()
            return
        self._cpp_job = mmjob.Job(self.job_id)

    def isComplete(self) -> bool:
        """
        Returns True if the job is complete.

        Note that this does not necessarily mean the output files have been
        downloaded.
        """
        return self._cpp_job.isComplete()

    def isQueued(self) -> bool:
        """
        Returns True if the job runs on a HPC queueing system.
        """

        return self._cpp_job.isQueueJob()

    def succeeded(self) -> bool:
        """
        Returns False if the job was killed, died or fizzled. Returns True
        if ExitStatus is finished.

        :raises RuntimeError: if the job isn't completed, so use isComplete()
            before calling.
        """
        # Check ExitStatus before isComplete. There is a race condition in
        # legacy jobcontrol where ExitStatus may be "finished" before
        # isComplete is true, but the ExitStatus property will block if this
        # is the case."
        if self.ExitStatus == "finished":
            return True
        if not self.isComplete():
            raise RuntimeError(
                f"Job '{self.job_id}' is not complete. Current status is: {self.Status}"
            )
        return False

    def wait_before_kill(self):
        # Experiments show that attempting to kill a (non-job-server) job too
        # quickly fails, so wait at least 5 seconds.
        seconds_since_start = time.time() - time.mktime(
            time.strptime(self.LaunchTime, timestamp_format))
        if seconds_since_start < 5:
            time.sleep(5 - seconds_since_start)

    def stop(self):
        """
        Kill the job while collecting output files.
        """
        self._cpp_job.stopJob()

    def kill(self):
        """
        Kill the job if it is running. This cancels a running job and does not
        return output files.
        """
        self._cpp_job.killJob()

    def cancel(self):
        """
        Cancel a running job and do not return output files.
        This method will eventually deprecate job.kill
        """
        self._cpp_job.killJob()

    def kill_for_smart_distribution(self):
        """
        Kill the job for smart distribution if it is running.

        """
        if self.isComplete():
            return

        if not mmjob.mmjob_is_job_server_job(self.job_id):
            self.wait_before_kill()

        self._cpp_job.killJobForSmartDistribution()

    def _wait(self, max_interval: int):
        """
        Wait for the job to complete.

        :param max_interval: maximum interval to sleep between checking for
            completion.
        """
        if mmjob.mmjob_is_job_server_job(self.job_id):
            self.download()
            return
        # The python implementation differs from the job server C++
        # implementation in that it reads the job record again if it fails.
        interval = 2
        while 1:
            self.readAgain()
            if self.isComplete():
                return
            else:
                time.sleep(interval)
                # exponential fallback
                if interval * 2 > max_interval:
                    interval = max_interval
                else:
                    interval = interval * 2

    def wait(self, max_interval: int = 60, throw_on_failure: bool = False):
        """
        Wait for the job to complete; sleeping up to 'max_interval' seconds
        between each database check. (Interval increase gradually from 2 sec up
        to the maximum.)

        NOTE: Do not use if your program is running in Maestro, as this
            will make Maestro unresponsive while the job is running.

        :param throw_on_failure: whether to raise an exception if not succeeded
        :type throw_on_failure: bool

        :raises RuntimeError: if the job did not succeed. The error message
            will contain the last 20 lines of the job's logfile (if available).
        """
        self._wait(max_interval)
        if throw_on_failure:
            if not self.succeeded():
                msg = f"Job '{self.job_id}' did not succeed."

                if not self.LogFiles or not os.path.exists(self.LogFiles[0]):
                    msg += "\nNo log file available."

                else:
                    logfile = self.LogFiles[0]
                    num_lines = 20
                    with open(logfile) as fh:
                        last_lines = "".join(more_itertools.tail(num_lines, fh))
                    msg += (f"\nLast {num_lines} lines of {logfile}:\n"
                            f"{schrodinger.utils.log.SINGLE_LINE}\n"
                            f"{last_lines}")
                raise RuntimeError(msg)

    def download(self):
        """
        Download the output of the job into job's launch directory.
        No-op in legacy jobcontrol.
        """
        if not mmjob.mmjob_is_job_server_job(self.job_id):
            return
        mmjob.mmjob_wait_and_download(self.job_id)
        self.readAgain()

    def __repr__(self):
        """
        Returns the formal string representation of the Job object.
        """
        return "Job(\"%s\")" % self.job_id

    def get(self, attr, default=None):
        """
        This function will always raise an error, but is provided to guide
        users to a new syntax.
        """
        if attr.lower() not in self._case_insensitive_properties:
            raise TypeError(f"{attr} is not an attribute of Job")
        canonical_name = self._case_insensitive_properties[attr.lower()]
        raise TypeError(
            f'Update syntax: Job.{canonical_name} replaces Job.get("{attr}") '
            'to reflect that these attributes are always available')

    def __getattr__(self, name):
        if name in self._case_insensitive_properties.values():
            return super().__getattribute__(name)
        if name.lower() not in self._case_insensitive_properties:
            return super().__getattribute__(name)
        canonical_name = self._case_insensitive_properties[name.lower()]
        warnings.warn(
            f"Job.{canonical_name} is the appropriate way to call Job.{name}",
            SyntaxWarning,
            stacklevel=2)
        return getattr(self, canonical_name)

    def summary(self) -> str:
        """
        Return a string summarizing all current Job attributes.
        """
        return self._cpp_job.getSummary()

    def getDuration(self) -> Optional[int]:
        """
        Returns the wallclock running time of the job if it is complete. This
        does not include time is submission status. Returns time in seconds.
        If the job is not complete, returns None.
        """
        if not self.StartTime or not self.StopTime:
            return None
        start = time.mktime(time.strptime(self.StartTime, timestamp_format))
        stop = time.mktime(time.strptime(self.StopTime, timestamp_format))
        return int(stop - start)

    def isDownloaded(self):
        """
        Check if output files were downloaded.
        For legacy job control, identical to `isComplete()`.

        :return: Whether the job files were downloaded.
        :rtype: bool
        """
        is_complete = self.isComplete()
        if not mmjob.mmjob_is_job_server_job(self.job_id):
            # With JOB_SERVER off, job gets downloaded when its completed.
            return is_complete
        return is_complete and not self._cpp_job.hasDownloadableFiles()

    @property
    def BatchId(self) -> Optional[str]:
        """
        Return the batch id, if running on an HPC queueing system. Otherwise
        return None.
        """
        batchid = self._cpp_job.getBatchId()
        if not batchid:
            return None
        return batchid

    @property
    def Dir(self) -> str:
        """
        Return the absolute path of the launch directory.
        """
        return self._cpp_job.getLaunchDir()

    @property
    def ExitCode(self) -> Union[int, str]:
        """
        Returns the exit code of a process. If the job is still running, or it
        was killed without collecting the exit code, return a string
        indicating unknown status.
        """
        try:
            return self._cpp_job.getExitCode()
        except RuntimeError:
            return "Exit code unknown"

    @property
    def Host(self) -> str:
        """
        Return the hostname of the host which launched this job.
        """
        return self._cpp_job.getString("Host")

    @property
    def HostEntry(self) -> str:
        """
        Return the name of the host entry this job was launched to.
        """
        return self._cpp_job.getHostEntry()

    @property
    def LaunchTime(self) -> str:
        """
        Return a string timestamp for the time that the job was launched.
        This will before the job starts running, as soon as it is registered
        with jobcontrol as a job to be run.
        """
        return self._cpp_job.getLaunchTime()

    @property
    def JobId(self) -> str:
        """
        Return an identifier for a job.
        """
        return self._cpp_job.getJobId()

    @property
    def Name(self) -> str:
        """
        Returns a string representing -JOBNAME that was specified on launch.
        This may be an empty string.
        """
        return self._cpp_job.getJobName()

    @property
    def ParentJobId(self) -> Optional[str]:
        """
        Return the jobid of a parent job. If the job does not have a parent,
        return None.
        """
        parent_jobid = self._cpp_job.getParentJobId()
        if not parent_jobid:
            return None
        return parent_jobid

    @property
    def Processors(self) -> int:
        """
        For a batch job, returns the number of queue slots attached to this
        job. For a local job, return the number of CPU cores allowed to be
        used.
        """
        return self._cpp_job.getProcessorCount()

    @property
    def Program(self) -> str:
        """
        Return descriptive text for the name of the program running this job,
        e.g. Jaguar. This field is optional and may return an empty string.
        """
        return self._cpp_job.getProgram()

    @property
    def Version(self) -> str:
        """
        Return the build number.
        """
        mmshare_version = str(get_mmshare_version())
        return "{release} build {version}".format(
            release=get_release_name(), version=mmshare_version[-3:])

    @property
    def Project(self) -> str:
        """
        Return the job's project name field. This will be an empty string if no
        project is set.
        """
        return self._cpp_job.getProject()

    @property
    def QueueHost(self) -> str:
        """
        Return the hostname of the submission node of a HPC queueing system. If
        not an HPC host, this will be an empty string.
        """
        return self._cpp_job.getQueueHost()

    @property
    def StructureOutputFile(self) -> str:
        """
        Return the name of the file returned by the job that will get
        incorporated into a project of Maestro. Returns an empty string if
        no file is specified.
        """

        return self._cpp_job.getStructureOutputFile()

    @property
    def DisplayStatus(self) -> Optional[DisplayStatus]:
        """
        Return a user-focused status that indicates the current state of the
        job.

        Returns None in the case of non JOB_SERVER jobs.
        """
        # The opposite of "user-focused" in this case is
        # implementation-dependent.
        if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
            return DisplayStatus(self._cpp_job.getDisplayStatus())
        else:
            return None

    @property
    def StatusChangeReason(self) -> str:
        """
        Returns a human-readable reason that a job entered its current state,
        such as "job canceled by the user." If the reason was not recorded or
        is not particularly interesting (e.g. normal transition from waiting to
        running) it may be the empty string.
        """
        if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
            return self._cpp_job.getStatusChangeReason()
        else:
            return ""

    @property
    def Status(self) -> str:
        """
        Get the Status of the job. This is used by legacy jobcontrol API,
        but is superseded by DisplayStatus for JOB_SERVER jobs.
        """
        return self._cpp_job.getString("Status")

    @property
    def StartTime(self) -> str:
        """
        Return a string for the starting time of the job. Returns an empty
        string if the job is not yet started, for example, enqueued in an HPC
        environment.
        """
        return self._cpp_job.getStartTime()

    @property
    def StopTime(self) -> str:
        """
        Return a string for the completion time of the job. Returns an empty
        string if the job is not yet completed.
        """
        return self._cpp_job.getStopTime()

    @property
    def StatusTime(self) -> str:
        """
        Return a string for the time when the job was last updated.
        """
        return self._cpp_job.getStatusTime()

    @property
    def Viewname(self) -> str:
        """
        Return a representation of name used to filter jobs in maestro.
        May be empty.
        """
        return self._cpp_job.getViewName()

    def _wait_for_complete(self, interval: int = 1, max_time: int = 30):
        """
        Wait for a job to transition from finished to completed. This is
        only meaningful with legacy jobcontrol.

        :param interval: seconds to wait between reread of job record
        :param max_interval: maximum time to wait for job to be completed
        """

        @backoff.on_predicate(
            backoff.constant,
            lambda job: not job.isComplete(),
            interval=interval,
            max_time=max_time)
        def _reread():
            self.readAgain()
            return self

        _reread()

    @property
    def ExitStatus(self) -> str:
        """
        Get the ExitStatus of the job. This is a string representation of a
        job. Consider using DisplayStatus instead.

        :raises: RuntimeError if the job is not yet complete.
        """

        def get_status():
            return self._cpp_job.getString("ExitStatus")

        exitstatus = get_status()
        if exitstatus:
            if self.isComplete():
                return exitstatus
            elif not mmjob.mmjob_is_job_server_job(
                    self.job_id) and exitstatus == "finished":
                # There seems to sometimes a wait between "finished" and
                # "completed", especially in jobdj. Do a 30s retry to wait for
                # the specific finished->completed status change. This comes
                # from the fact that a message can happen before job record
                # is updated.
                self._wait_for_complete()
                exitstatus = get_status()
                if self.isComplete():
                    return exitstatus
        raise RuntimeError(
            f"ExitStatus is not valid until the job {self.JobId} has "
            f"completed; it is currently '{self.Status}:{exitstatus}'. "
            f"Full job record: {self.summary()}")

    @property
    def JobDir(self) -> str:
        """
        Return the directory where the job is run. This will be an empty string
        if the job has not yet started.
        """
        return self._cpp_job.getJobDir()

    @property
    def JobHost(self) -> str:
        """
        Return the hostname where the job is run. This will be an empty string
        if the job has not yet started.
        """
        return self._cpp_job.getJobHost()

    @property
    def JobSchrodinger(self) -> str:
        """
        Return the directory of Schrodinger installation where the job is
        running.
        """
        job_schrodinger = self._cpp_job.getString("JobSchrodinger")
        if job_schrodinger:
            return job_schrodinger
        jobMMshareExec = self._cpp_job.getString("JobMMshareExec")
        return os.path.dirname(os.path.dirname(os.path.dirname(jobMMshareExec)))

    @property
    def Envs(self) -> List[str]:
        """
        Return a list of environment varaibles that are set by job, in
        addition to a default environment on a machine. The format is
        ["SPECIAL_VAR=0", "SPECIAL_VAR2=yes"]

        """
        return list(self._cpp_job.getEnvironment())

    @property
    def Errors(self) -> List[str]:
        """
        Return possible error messages associated with a job. This will only
        return values in legacy jobcontrol.
        """
        return list(self._cpp_job.getErrors())

    @property
    def LogFiles(self) -> List[str]:
        """
        Get list of log files associated with a log. May be an empty list.
        """
        return list(self._cpp_job.getFiles(mmjob.JobFilesType_LOG_FILES))

    @property
    def SubJobs(self) -> List[str]:
        """
        Return list of subjob job ids.
        """
        return list(self._cpp_job.getSubjobIds())

    @property
    def Commandline(self) -> str:
        """
        Return the command used to launch the job.

        Note that this may not be accurate when the job is called directly from
        a jobspec. In that case it will instead return the commandline of
        the parent process.
        """
        return self._cpp_job.getString("Commandline")

    @property
    def User(self) -> str:
        """
        Return the username of user who launched the job.
        """
        return self._cpp_job.getUser()

    def getApplicationHeaderFields(self, default=None) -> Dict[str, str]:
        """
        Returns a dictionary of commonly used jobcontrol keyword:value pairs
        used to standardize application log files.

        :param default: Value assigned to a keyword if the corresponding
            attribute is not defined.
        :type default: any

        """

        fields = {}
        keywords = [
            'JobId', 'Name', 'Program', 'Version', 'Host', 'Dir', 'HostEntry',
            'Queue', 'JobHost', 'JobDir', 'JobSchrodinger', 'Commandline',
            'StartTime'
        ]
        for item in keywords:
            fields[item] = getattr(self, item, default)
        return fields

    def getApplicationHeaderString(self, field_sep: str = ' : ') -> str:
        """
        Returns a formatted string, suitable for printing at the top of a log
        file printing helpful information about the state of the job.

        :param field_sep: String that delimits the keyword and value.

        Example::

            backend = schrodinger.job.jobcontrol.get_backend()
            if backend:
                print backend.getJob().getApplicationHeaderString()

        """

        undefined_field = None
        fields = self.getApplicationHeaderFields(default=undefined_field)
        max_length = max([len(key) for key in fields])
        kw_args = []
        for key in fields:
            # Only show Queue field if it is defined.
            if key == 'Queue' and fields[key] == undefined_field:
                continue
            kw_args.append(
                field_sep.join([key.ljust(max_length),
                                str(fields[key])]))
        section_delim = 80 * '-'
        kw_args.insert(0, section_delim)
        kw_args.append(section_delim)
        header = '\n'.join(kw_args)
        return header

    def getInputFiles(self) -> List[str]:
        return self.InputFiles

    @property
    def InputFiles(self) -> List[str]:
        """
        Return list of files that will be transferred to the job directory
        on launch.
        """
        return list(self._cpp_job.getFiles(mmjob.JobFilesType_INPUT_FILES))

    @property
    def JobDB(self) -> str:
        """
        Path to the Job Database in legacy jobcontrol. This is an empty str
        for JOB_SERVER jobs.
        """
        return self._cpp_job.getString("JobDB")

    @property
    def OrigLaunchDir(self) -> str:
        """
        Return the launch directory of the oldest ancestor of this job.
        """
        return self._cpp_job.getOriginalLaunchDir()

    @property
    def OrigLaunchHost(self) -> str:
        """
        Return the hostname of the oldest ancestor of this job.
        """
        return self._cpp_job.getOriginalLaunchHost()

    def getOutputFiles(self) -> List[str]:
        return self.OutputFiles

    @property
    def OutputFiles(self) -> List[str]:
        """
        Return a list of output filenames which will be copied back, if
        existing, at the end of a job.

        Note that this list can grow while the backend is running, since output
        files can be registered by the backend.
        """
        return list(self._cpp_job.getFiles(mmjob.JobFilesType_OUTPUT_FILES))

    def getProgressAsPercentage(self) -> float:
        """
        Get the value of backend job progress in terms of percentage (values
        from 0.0 - 100.0)

        Return 0.0 when a job is not yet in running state.
        """
        return self._cpp_job.getProgress().percentage_completed

    def getProgressAsSteps(self) -> Tuple[int, int]:
        """
        Get the value of backend job progress in terms of steps and totalsteps.
        Return (0,1) when a job is not yet in 'running' state.
        """
        progress = self._cpp_job.getProgress()
        return (progress.completed_steps, progress.total_steps)

    def getProgressAsString(self) -> str:
        """
        Get the value of backend job progress in terms of descriptive text.
        Return "The job has not yet started." when a job is not yet in
        running state.
        """
        return self._cpp_job.getProgress().description

    def purgeRecord(self):
        """
        Purge the job record for the job from the database.
        """
        return self._cpp_job.removeRecord()


def _launch_qapp():
    """
    Launch QCoreApplication so mmjob can run async calls under the hood.
    If we are calling from maestro or other PyQt GUI, this should already
    be instantiated, so don't put this somewhere that will get called on
    import.
    """
    global qapp
    if not qapp:
        qapp = QtCore.QCoreApplication.instance()
        if not qapp:
            qapp = QtCore.QCoreApplication([])


#
# Job launching stuff
#


def _get_viewname_from_launch_cmd(cmd: List[str]) -> str:
    """
    Returns an empty viewname if not found.
    """
    is_viewname = False
    for arg in cmd:
        if is_viewname:
            return arg
        elif arg == "-VIEWNAME":
            is_viewname = True
    return ""


@application.require_application(create=True, use_qtcore_app=True)
def launch_job(cmd: List[str],
               print_output: bool = False,
               expandvars: bool = True,
               launch_dir: Optional[str] = None,
               timeout: Optional[int] = None,
               env: Optional[Dict[str, str]] = None,
               show_failure_dialog: bool = True,
               _debug_delay=None) -> jobcontrol.Job:
    """
    Run a process under job control and return a Job object. For a process to
    be under job control, it must print a valid JobId: line to stdout. If
    such a line isn't printed, a RuntimeError will be raised.

    The cmd argument should be a list of command arguments (including the
    executable) as expected by the subprocess module.

    If the executable is present in $SCHRODINGER or $SCHRODINGER/utilities,
    an absolute path does not need to be specified.

    NOTE: UI events will be processed while the job is launching.

    :param print_output: Determines if the output from job launch is printed
       to the terminal or not. Output will be logged (to stderr by default) if
       Python or JobControl debugging is turned on or if there is a launch
       failure, even if 'print_output' is False.

    :param expandvars: If True, any environment variables of the form `$var`
        or `${var`} will be expanded with their values by the
        `os.path.expandvars` function.

    :param launch_dir: Launch the job from the specified directory. If
        unspecified use current working directory.

    :param timeout: Timeout (in seconds) to be applied while waiting for the
        job control launch process to start or finish. The launch process will
        be terminated after this time.  If None, the launch process will run
        without a timeout.

    :param env: This dictionary will replace the environment for the launch
        process. If env is None, use the current environment for the launch
        process.

    :param show_failure_dialog: If True, show failure dialog if we detect we
        are using a graphical application and the job launch fails.

    :raise RuntimeError: If there is a problem launching the job (e.g., no
        JobId gets printed). If running within Maestro, an error dialog
        will first be shown to the user.

    :raise FileNotFoundError: If launch_dir doesn't exist.
    """
    from schrodinger.infra import jobhub

    if timeout is not None and timeout < 1:
        # JobCommand only deals with timeouts of integer seconds
        raise RuntimeError(
            f"Timeout {timeout} will be truncated to the latest second and "
            "will fail if set to a number less than 1 sec")

    cmd = fix_cmd(cmd, expandvars)
    timestamp('execute perl jlaunch.pl')

    event_loop = QtCore.QEventLoop()
    job = None
    err_message = None

    job_cmd = jobhub.JobCommand(cmd, _get_viewname_from_launch_cmd(cmd),
                                launch_dir)

    if env:
        qprocess_env = QtCore.QProcessEnvironment()
        for key, value in env.items():
            qprocess_env.insert(key, value)
        job_cmd.setEnvironment(qprocess_env)

    if timeout is not None:
        job_cmd.setTimeout(int(timeout))
    else:
        # NOTE: The default timeout needs to be unlimited for the launch_job
        # timeout to be honored when the job command passed is a startup script
        # (a script that contains get_job_spec_from_args or uses launcher)
        # because running these scripts will also call launch_job in the newly
        # launched process, but with timeout=None.
        # See JOBCON-7177 for details.
        job_cmd.setTimeout(-1)  # unlimited

    @qt_utils.exit_event_loop_on_exception
    def job_started(launched_job: jobcontrol.Job, *, event_loop=None):
        nonlocal job
        job = launched_job
        if _debug_delay is None:
            event_loop.quit()
        else:
            QtCore.QTimer.singleShot(_debug_delay * 1000, event_loop.quit)

    @qt_utils.exit_event_loop_on_exception
    def job_launch_failed(launch_err: str, *, event_loop=None):
        nonlocal err_message
        nonlocal err_message
        err_message = launch_err
        event_loop.quit()

    job_launcher = jobhub.JobLauncher(job_cmd)
    job_launcher.jobStarted.connect(
        functools.partial(job_started, event_loop=event_loop))
    job_launcher.jobLaunchFailed.connect(
        functools.partial(job_launch_failed, event_loop=event_loop))
    job_launcher.launch()
    # Confirm that neither signal was emitted yet
    assert job is None
    assert err_message is None

    event_loop.exec()
    exception = qt_utils.get_last_exception()
    if exception:
        raise exception
    if not err_message and not job:
        err_message = "Job launch failed"

    if err_message:
        if show_failure_dialog and maestro:
            # Import QtWidgets only if running within Maestro:
            # Note similarities with utils.JobLaunchFailureDialog
            from schrodinger.Qt import QtWidgets
            dialog = QtWidgets.QMessageBox()
            dialog.setIcon(QtWidgets.QMessageBox.Warning)
            dialog.setText(f"Job launch failed: {cmd}")
            dialog.setDetailedText(err_message)
            dialog.exec_()
        raise jobcontrol.JobLaunchFailure(err_message)

    if print_output:
        print(f"JobId: {job.JobId}")

    return job


def prepend_schrodinger_run(cmd: List[str]) -> List[str]:
    """
    Check if a command executes a Python script and prepend
    $SCHRODINGER/run to the command if it does not already begin
    with it.

    :param cmd: Command to prepend $SCHRODINGER/run to.
    """
    if len(cmd) == 0:
        raise ValueError('Empty command list specified')
    if cmd[0].endswith('.py') or cmd[0].endswith('.pyc'):
        cmd = [LOCAL_RUN] + cmd
    return cmd


def fix_cmd(cmd: List[str], expandvars: bool = True) -> List[str]:
    """
    A function to clean up the command passed to launch_job.

    :param cmd: A list of strings for command line launching.

    :param expandvars: If True, any environment variables of the form `$var` or
        `${var`} will be expanded with their values by the `os.path.expandvars`
        function.

    :return: The command to be launched

    """
    cmd = prepend_schrodinger_run(cmd)
    # This function exists as a separate entity from launch_job mostly so it
    # can be tested.

    try:
        cmd + []
    except TypeError:
        raise TypeError("String commands are not accepted. "
                        "Please use lists of arguments.")

    # cmd is a list
    logger.debug("launch_job command: %s" % " ".join(cmd))
    if expandvars:
        cmd = [os.path.expandvars(arg) for arg in cmd]
        logger.debug(" expandvars? command: %s" % " ".join(cmd))
    cmd = _fix_program_path(cmd[0]) + cmd[1:]
    logger.debug(" modified? command: %s" % " ".join(cmd))
    return cmd


def _fix_program_path(prog: str) -> List[str]:
    """
    If the program executable doesn't exist as a file and isn't an absolute
    path, prepend the SCHRODINGER directory if it exists there.

    Return a list of command line arguments so this function can be used
    from launch_job when 'cmd' is a list.

    """
    # Shortcut the search if the program is python; this prevents an extra
    # layer of $SCHRODINGER/utilities/python for python scripts launched
    # from python. This is an issue because of toplevel.py.
    if prog == "python":
        if sys.platform == "win32":
            return ["python.exe"]
        else:
            return ["python"]

    prog = subprocess_utils.abs_schrodinger_path(prog)
    return [prog]


def list2jmonitorcmdline(cmdlist: List[str]) -> str:
    """
    Turn a command in list form to a single string that can be executed
    by jmonitor.

    """
    cmd = []
    for arg in cmdlist:
        if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
            arg = subprocess.list2cmdline((arg,))
            # Quote all args since they end up in a string. Make an exception
            # for strings that contain only "safe" characters or shell redirect.
            if not (arg.startswith('"') and arg.endswith('"')) \
                and not shell_redirect_re.match(arg):
                if not set(arg).issubset(SAFE_COMMAND_CHARS):
                    # If argument contains "unsafe" characters, quote it:
                    arg = '\"' + arg + '\"'
        else:
            if not shell_redirect_re.match(arg):
                arg = shlex.quote(arg)
        cmd.append(arg)

    return ' '.join(cmd)


def _get_file_path(url: str) -> str:
    """
    From a job spec url, return a filename.
    """
    if not url.startswith("file://"):
        raise RuntimeError(
            "File URI is %s but job_control_launch doesn't know how "
            "to deal with non-file URIs at this time." % url)
    parsed_url = urlparse(url)
    netloc = parsed_url.netloc
    # for UNC paths
    if netloc:
        netloc = r"\\" + netloc
    return url2pathname(netloc + parsed_url.path)


def _get_job_spec_launch_command(job_spec,
                                 launch_parameters,
                                 write_output=False):
    """
    Get the launch command for a job based on its specification.

    :param job_spec: Data defining the job.
    :type job_spec: schrodinger.job.launchapi.JobSpecification

    :param launch_parameters: launch parameters for the job
    :type launch_parmeters: schrodinger.job.launchparams.LaunchParameters

    :param write_output: If true, construct a launch command that can be run
        from any SCHRODINGER. Used by appframework2 to write a shell script that
        can run the job later.
    :type write_output: bool

    :return: Command to execute as a list of strings.

    """

    # Set the launch_parameters jobname in the job_spec so it can sub out
    # any <JOBNAME> variables when calling getCommand().
    lp_jobname = launch_parameters.getJobname()
    if lp_jobname:
        job_spec.setJobname(lp_jobname)

    job_spec.validate()

    cmd = job_spec.getCommand()
    stdout_file = job_spec.getStdout()
    if not stdout_file:
        stdout_file = job_spec.getDefaultStdout()
    cmd = list2jmonitorcmdline(cmd)
    cmd += f' > "{stdout_file}" '

    stderr_file = job_spec.getStderr()
    if not stderr_file:
        stderr_file = job_spec.getDefaultStderr()

    cmd = f" {cmd} "
    if stdout_file == stderr_file:
        cmd += "2>&1 "
    else:
        cmd += f"2> {stderr_file}"

    if write_output:
        launch_command = ["$SCHRODINGER/run", "jlaunch.pl"]
    else:
        launch_command = [
            "perl",
            os.path.join(os.environ['MMSHARE_EXEC'], "jlaunch.pl")
        ]

    # Use the -usetmpdir option so that missing input files in the spec will
    # cause the job to fail.
    launch_command.extend([
        "-usetmpdir", "-cmd",
        "BASE64 " + base64.b64encode(cmd.encode('ascii')).decode('ascii')
    ])
    if job_spec.jobUsesTPP():
        tpp = launch_parameters.getTPP()
        if tpp:
            launch_parameters.setNumberOfProcessorsOneNode(tpp)

    launch_command.extend(launch_parameters.convertToJLaunchOptions())

    if not lp_jobname and job_spec.getJobname():
        launch_command.extend(["-name", job_spec.getJobname()])

    if job_spec.getProgramName():
        launch_command.extend(["-prog", job_spec.getProgramName()])

    if mmutil.feature_flag_is_enabled(
            mmutil.JOB_SERVER) and job_spec.isStoppable():
        launch_command.extend(["-stoppable"])

    # In general, the number of queue slots is dictated by the number
    # of threads needed. (i.e. each job requests N queue slots. For some
    # jobs that mix parallelism, this is N * M jobs. In special case where
    # the driver jobs launches all the jobs, we reserve by number of processors
    # (only) as the number of queue slots
    nprocs = launch_parameters.getNumberOfQueueSlots()
    if job_spec.driverReservesCores():
        if nprocs != 1:
            raise RuntimeError(
                f"Setting number of slots ({nprocs}) and processors consumed "
                "by driver are incompatible")
        nprocs = launch_parameters.getNumberOfSubjobs()
        if not nprocs:
            raise RuntimeError(
                "For the driver to reserve cores, please set the number of "
                "subjobs explicitly (:X) as -HOST host:X")

    if nprocs:
        launch_command.extend(["-NPROC", str(nprocs)])

    licenses = job_spec.getLicenses()
    if licenses:
        for license_name, token_count in licenses:
            launch_command.extend(["-lic", f"{license_name}:{token_count}"])

    input_file_args = input_file_arguments(job_spec, launch_parameters,
                                           write_output)
    cmdline_file_args = file_arguments_for_launch_command(input_file_args)
    launch_command.extend(cmdline_file_args)

    for file_ in job_spec.getOutputFiles(stream=False, incorporate=False):
        launch_command.extend(["-out", file_])

    for file_ in job_spec.getOutputFiles(stream=True, incorporate=False):
        launch_command.extend(["-log", file_])

    for file_ in job_spec.getOutputFiles(incorporate=True):
        launch_command.extend(['-structout', file_])

    logger.debug("launch_from_job_spec launch command %s" % launch_command)
    logger.debug("launch_from_job_spec job spec %s" % job_spec.asJSON(
        indent=4, sort_keys=True))
    logger.debug("launch_from_job_spec task command %s" % cmd)
    return launch_command


def input_file_arguments(job_spec, launch_parameters, write_output):
    """
    Return a set of file arguments (a list of (option, value) tuples)
    corresponding to the input files of a given job.
    If any of the input files are missing, raises an error.
    """
    file_args = []
    missing_input_files = []
    for (url, runtime_path) in job_spec.getInputFiles():
        source = _get_file_path(url)
        if not os.path.exists(source):
            missing_input_files.append(source)
        else:
            if write_output:
                launch_dir = launch_parameters.getLaunchDirectory(
                ) or os.getcwd()
                if fileutils.on_same_drive_letter(source, launch_dir):
                    source = os.path.relpath(source, launch_dir)
                if sys.platform == "win32":
                    source = source.replace("\\", "/")
            file_args.append(("-f", f"'{source}' > '{runtime_path}'"))

    if missing_input_files:
        s = "s" if len(missing_input_files) > 1 else ""
        raise RuntimeError("Input file%s could not be found: %s" %
                           (s, ", ".join(missing_input_files)))

    return file_args


def file_arguments_for_launch_command(file_args):
    """
    Given a set of "raw" file arguments, return the set of those
    to be used on an actual command line.
    If the given set is too long, the arguments will be written to an argfile.
    (It is the responsibility of the caller to remove that file after use.)
    """

    # Command-line length limit is normally about 8192 or higher.
    # Choose the maximum permissible length of all file arguments
    # so as to make sure not to surpass it.
    if total_file_arguments_length(file_args) < 5000:
        args = []
        for (option, value) in file_args:
            args.extend([option, value])
        return args
    argfile = write_argfile(file_args)
    return ['-argfile', argfile]


def total_file_arguments_length(args):
    """
    Determine the total length of the given set of file arguments
    (which is a list of 2-tuples) as they would be represented on the command line.
    """

    # The total length of all tuple elements, plus spaces between them
    return sum([sum([len(a) for a in tuple]) for tuple in args]) + 2 * len(args)


def write_argfile(file_args):
    """
    Write a set of file arguments to a temporary "argfile" (one option-value pair per line)
    and return the name of that file. (The caller is responsible for removing it.)

    :param file_args: A list of (option, value) tuples
    """
    tfd, tfpath = tempfile.mkstemp(text=True)
    with os.fdopen(tfd, 'w') as tfh:
        for (option, value) in file_args:
            tfh.write('{} "{}"\n'.format(option,
                                         value))  # Protect spaces in filenames
    return tfpath


def launch_from_job_spec(job_spec, launch_parameters, display_commandline=None):
    """
    Launch a job based on its specification.

    :param job_spec: Data defining the job.
    :type job_spec: schrodinger.job.launchapi.JobSpecification

    :param launch_parameters: Data defining how the job is run
    :type launch_parameters: schrodinger.job.launchparams.LaunchParameters

    :param display_commandline: commandline attribute of resulting job. Most
                                cases will require this value to be specified,
                                optional value to make it easier to refactor
                                out in the future.
    :type display_commandline: str

    :return: A schrodinger.job.jobcontrol.Job object.

    """
    launch_command = _get_job_spec_launch_command(job_spec, launch_parameters)
    env = os.environ.copy()
    if display_commandline:
        env["SCHRODINGER_COMMANDLINE"] = display_commandline
    return launch_job(
        launch_command,
        launch_dir=launch_parameters.getLaunchDirectory(),
        env=env)


#
# Job control back end stuff
#

_backend_singleton = None


def get_backend() -> Optional["_Backend"]:
    """
    A convenience function to see if we're running under job control. If so,
    return a _Backend object. Otherwise, return None.

    """

    global _backend_singleton
    if _backend_singleton is not None:
        return _backend_singleton

    backend = _Backend()
    if backend.job_id:
        _backend_singleton = backend
        return backend
    return None


def get_runtime_path(pathname: str) -> str:
    """
    Return the runtime path for the input file 'pathname'.

    If the pathname is of a type that job control will not copy to the job
    directory or no runtime file can be found, returns the original path
    name.

    """
    if not pathname:
        raise TypeError("Pathname has to be non empty string")
    # We don't care about the return code, so just ignore it.
    try:
        runtime_path = mmjob.mmjobbe_path_runtime(pathname)
    except mm.MmException as e:
        if e.rc == mmjob.MMJOBBE_ERROR:
            runtime_path = pathname
        else:
            raise
    return runtime_path


def under_job_control() -> bool:
    """
    Returns True if this process is running under job control; False otherwise.
    """
    return bool(mmjob.mmjobbe_has_jobcontrol())


class _Backend:
    """
    An interface to mmjobbe. Because this class does nothing useful if
    not already running under jobcontrol, all methods should be no-ops
    if there is no job_id attribute.

    """

    def __init__(self):
        """
        Get the job_id and jobdir pathname of the current Job Control
        process (only meaningful if running under Job Control).
        Using the get_backend() function should be preferred to
        initializing a _Backend object explicitly.

        """
        self.job_id = None
        self.job_dir = None
        # Store mmjobbe_terminate as an attribute so it will be available
        # even if the mm module is garbage collected before the _Backend
        # instance is.
        self.mmjobbe_terminate = mmjob.mmjobbe_terminate

        if not under_job_control():
            return

        try:
            mmjob.mmjobbe_initialize(mm.error_handler)

            self.job_id = mmjob.mmjobbe_jobid()
            self.job_dir = mmjob.mmjobbe_job_directory()
            logger.debug("_Backend jobid is %s" % self.job_id)
        except mm.MmException as e:
            if e.rc != mmjob.MMJOBBE_NO_JOBID:
                logger.warn("_Backend.__init__ exception: %s", e)
                raise
        except Exception as e:
            logger.warn("_Backend.__init__ exception: %s", e)
            raise

    def getJob(self) -> Job:
        """
        Retrieve a read-only instance of a job record (as an instance of the
        Job class).

        Changes made to the job record after this method is called (e.g. via
        setStructureOutputFile) will not be visible in the record returned.
        It should be called again to get any updates.
        """
        return Job(self.job_id)

    def __getstate__(self):
        raise Exception("_Backend objects cannot be pickled.")

    def __del__(self):
        """
        Clean up library initializations.
        """
        if self.job_id:
            self.mmjobbe_terminate()
            self.job_id = None

    def setStructureOutputFile(self, filename: str):
        """
        Set the path for a file that will be the output file which will be
        marked for incorporation by maestro.
        """
        if self.job_id:
            mmjob.mmjobbe_set_structure_output_file(filename)

    def addOutputFile(self, filename: str):
        """
        Add an output file for job control to copy back to the launch host.
        Job control will silently skip this file if it does not exist.
        """
        if self.job_id:
            mmjob.mmjobbe_add_outputfile(filename)

    def addRequiredOutputFile(self, filename: str):
        """
        Add a required output file for job control to copy back to the launch
        host. If this file does not exist at the end of the job, then job
        control will mark the job as "died".
        """
        if self.job_id:
            mmjob.mmjobbe_add_required_outputfile(filename)

    def addLogFile(self, filename: str):
        """
        Add a log file for this job. A log file is continuously updated as the
        log file appends. Log files can only be append-only text data.
        """
        if self.job_id:
            mmjob.mmjobbe_add_logfile(filename)

    def addMonitorFile(self, filename: str):
        """
        This function has no effect.
        """
        if self.job_id:
            mmjob.mmjobbe_add_monitorfile(filename)

    def setStructureMonitorFile(self, filename: str):
        """
        This function has no effect.
        """
        if self.job_id:
            mmjob.mmjobbe_set_structure_monitor_file(filename)

    def copyOutputFile(self, path: str):
        """
        Copy a completed output file or directory back to the launch directory.

        :param path: The path to the file or directory that should be copied
        """
        if self.job_id:
            mmjob.mmjobbe_copy_outputfile(path)

    def archiveFiles(self, filenames: List[str], archive_file: str):
        """
        Archive the given files into a given archive file
        and add the latter as an output file.
        """
        if self.job_id:
            mmjob.mmjobbe_archive_files(filenames, archive_file)

    def sendMessageToParent(self, message: str):
        """
        Send message <message> (string) to the parent of this job.
        If this job does not have a parent, nothing will be done.

        Normally it should not be necessary to use this method.
        """
        if self.job_id:
            mmjob.mmjobbe_send_parent(message)

    def addMessageName(self, msgname: str):
        """
        Add message type to be queued for the backend to read using
        nextMessage(). Only messages whose first word is in this list
        will be returned by nextMessage().
        """
        if self.job_id:
            mmjob.mmjobbe_add_message_name(msgname)

    def nextMessage(self) -> Optional[str]:
        """
        Return next unread message from the queue if there is one or None
        if there are no messages. The types of messages to return must
        first be specified via addMessageName().
        """
        if self.job_id:
            try:
                return mmjob.mmjobbe_next_message()
            except mm.MmException as e:
                if e.rc == mmjob.MMJOBBE_EMPTY_LIST:
                    return None  # No message
                else:
                    raise  # Any other exception
        return None

    def setJobProgress(self,
                       steps: int = 0,
                       totalsteps: int = 0,
                       description: str = ""):
        """
        Update the progress of a job.

        :param steps: number of steps completed
        :param totalsteps: total number of steps
        :param description: text description of progress
        """
        mmjob.mmjobbe_set_jobprogress(steps, totalsteps, description)

    def deleteSubJob(self, jobid: str):
        """
        Tell jmonitor of the backend to delete the subjob field from
        the parent job record.
        """
        if self.job_id:
            mmjob.mmjobbe_delete_subjob(jobid)


#
# Job hosts stuff
#


class Host:
    """
    A class to encapsulate host info from the schrodinger.hosts file.

    Use the module level functions get_host or get_hosts to create Host
    instances.

    :ivar name: Label for the Host.
    :ivar user: Username by which to run jobs.
    :ivar processors: Number of processors for the host/cluster.
    :ivar tmpdir: Temporary/scratch directory to use for jobs.  List
    :ivar schrodinger: $SCHRODINGER installation to use for jobs.
    :ivar env: Variables to set in the job environment.  List.
    :ivar gpgpu: GPGPU entries.  List.
    :ivar queue: Queue entries only.  Queue type (e.g., SGE, PBS).
    :ivar qargs: Queue entries only.  Optional arguments passed to the queue
                 submission command.

    """

    def __init__(self, name: str):
        """
        Create a named Host object.  The various host attributes must be set
        after object instatiation.

        Only host-entry fields can be public attributes of a Host object.
        Attributes introduced to capture other information about the entry
        must be private (named with a leading underscore.)

        :param name: name of the host entry.
        """
        self.name = name
        self.server_address = None
        self._host = None
        self.user = None
        self.processors = 1
        self.tmpdir = []
        self.schrodinger = None
        self.env = []
        self.gpgpu = []
        self.queue = None
        self.qargs = None
        self._is_queue = False
        self._lines = []

    def to_hostentry(self) -> str:
        """
        Return a string representation of the Host object suitable for
        including in a hosts file.
        """
        lines = []
        for key in entry_fields:
            if hasattr(self, key):
                value = getattr(self, key)
                if value and value != "0":
                    if type(value) is list:
                        if key == "gpgpu":
                            for item in value:
                                lines.append("{}: {}, {}".format(
                                    key, item[0], item[1]))
                        else:
                            for item in value:
                                lines.append("{}: {}".format(key, str(item)))
                    else:
                        lines.append("{}: {}".format(key, str(value)))

        return "\n".join(lines)

    def getHost(self) -> str:
        """
        Return the name of the host, which defaults to 'name' if a
        separate 'host' attribute wasn't specified.

        """
        if self._host:
            return self._host
        else:
            return self.name

    def setHost(self, host: str):
        """
        Store host as _host to allow us to use a property for the 'host'
        attr.

        """
        self._host = host

    host = property(getHost, setHost)

    def isQueue(self) -> bool:
        """
        Check to see whether the host represents a batch queue. Returns True
        if the host is a HPC queueing system.
        """
        return self._is_queue

    def __str__(self):
        """
        Return the informal string representation of the Host -- a
        comma-separated list of "Key: value" attribute pairs.
        """
        return ", ".join(self._hostLines())

    def _hostLines(self):
        """
        Return an array of lines that if joined with newlines would create
        a schrodinger.hosts entry.

        """
        return self._lines

    def __repr__(self):
        """
        Return the formal string representation of the Host.
        """
        return "Host(%s)" % self.name

    def matchesRequirement(
            self, resource_requirement: resource.ComputeRequirement) -> bool:
        """
        Return True if this host can meet the resource requirements provided.

        All hosts will meet CPU resource requirements.
        """
        if resource_requirement.compute_type == resource.ComputeType.GPU:
            if self.gpgpu:
                return True
            elif self.name == "localhost":
                return gpgpu.is_any_gpu_available()
            return False
        # resource type CPU, all hosts allow cpu jobs at this time
        return True


def get_hostfile() -> str:
    """
    Return the name of the schrodinger.hosts file last used by get_hosts().
    The file is found using the standard search path ($SCHRODINGER_HOSTS,
    local dir, $HOME/.schrodinger, $SCHRODINGER).
    """

    return mmjob.mmjob_hostfile_path()


def hostfile_is_empty(host_filepath: str) -> bool:
    """
    Return if the given host_filepath host is empty, meaning it contains only
    the localhost entry. If the host_filepath str is empty or invalid, then this
    function will raise an invalid path exception - IOError.

    :param host_filepath: schrodinger.hosts file to use.
    :type host_filepath: str

    """
    if not os.path.isfile(host_filepath):
        raise OSError("Host file not found in path: " + host_filepath)

    try:
        with _temp_hostfile(host_filepath) as num_hosts:
            hosts = num_hosts
    except (mm.MmException, MissingHostsFileException):
        hosts = 0
    # check if host only contains the localhost
    return not hosts or len(hosts) == 1 and hosts[0].name == 'localhost'


def get_installed_hostfiles(root_dir="") -> List[str]:
    """
    Return the pathname for the schrodinger.hosts file installed in the most
    recent previous installation directory we can find.

    If a root pathname is passed in, previous installations are searched for
    there.  Otherwise, we look in the standard install locations.
    """
    if not root_dir:
        root_dir = INSTALL_ROOT
    if not os.path.isdir(root_dir):
        return []
    schrod_quarterly_re = re.compile(r"[a-z](20\d\d)-(\d)", re.IGNORECASE)
    schrod_yearly_re = re.compile(r"[a-z](20\d\d)", re.IGNORECASE)
    hostfiles = []
    for dirname in os.listdir(root_dir):
        hostsfile = os.path.join(root_dir, dirname, HOSTS_FILE)
        if os.path.exists(hostsfile):
            m = schrod_quarterly_re.search(dirname)
            if m:
                hostfiles.append((m.group(1), m.group(2), hostsfile))
                continue
            m = schrod_yearly_re.search(dirname)
            if m:
                hostfiles.append((m.group(1), "0", hostsfile))
                continue
    hostfiles.sort(reverse=True)
    return [h[-1] for h in hostfiles]


def get_hosts() -> List[Host]:
    """
    Return a list of all Hosts in the schrodinger.hosts file.  After
    this is called, get_hostfile() will return the pathname for the
    schrodinger.hosts file that was used.
    Raises UnreadableHostsFileException or MissingHostsFileException on error.
    """
    hosts = []

    # PANEL-3412
    try:
        mmjob.mmjob_hosts_reload()
    except mm.MmException as e:
        if e.rc == mmjob.MMJOB_HOSTS_MISSING:
            raise MissingHostsFileException(
                "Hosts file could not be loaded. (It could be missing.)")
    # NOTE: If the schrodinger.hosts file is present but is empty, the host
    # list will contain one host: localhost.

    nhosts = _get_num_hosts()
    for i in range(1, nhosts + 1):
        name = mmjob.mmjob_host_name(i)
        hosts.append(_get_host(name))
    return hosts


@contextlib.contextmanager
def _temp_hostfile(fname: str):
    try:
        orig_hostfile = os.environ.get("SCHRODINGER_HOSTS", None)
        os.environ["SCHRODINGER_HOSTS"] = fname
        # The only way to check for validity is to actually set it and see what
        # happens
        hosts = get_hosts()
        yield hosts
    finally:
        # now we restore the original value
        if orig_hostfile is None:
            del os.environ["SCHRODINGER_HOSTS"]
        else:
            os.environ["SCHRODINGER_HOSTS"] = orig_hostfile
        # clear the bad host after resetting the environmental variable
        get_hosts()


def hostfile_is_valid(fname: str) -> Tuple[bool, str]:
    """
    :param fname: The full path of the host file to validate

    :return: a (bool, str) tuple indicating whether the host file is valid
    """
    is_valid = False
    msg = ""
    try:
        with _temp_hostfile(fname):
            is_valid = True
    except (mm.MmException, MissingHostsFileException) as e:
        msg = str(e)
    return (is_valid, msg)


def get_host(name: str) -> int:
    """
    Return a Host object for the named host. If the host is not
    found, we return a Host object with the provided name and details
    that match localhost. This matches behavior that jobcontrol uses.
    Raises UnreadableHostsFileException or MissingHostsFileException on error.
    """
    return _get_host(name)


def _get_num_hosts() -> int:
    """
    Returns the number of hosts in the schrodinger.hosts file. Will raise
    UnreadableHostsFileException or MissingHostsFileException on error.
    """
    try:
        num_hosts = mmjob.mmjob_hosts_length()
    except mm.MmException as err:
        if err.rc == mmjob.MMJOB_INCLUDE_MISSING:
            raise UnreadableHostsFileException(
                "The Schrodinger hosts file is invalid.")
        raise
    if num_hosts == 0:
        raise MissingHostsFileException(
            "Hosts file could not be loaded. (It could be missing.)")
    return num_hosts


def _get_host(name) -> Host:
    """
    Return a Host object for 'name', with all attributes read in from
    the corresponding entry in the schrodinger.hosts file.  Requires that mmjob
    be initialized.
    Raises UnreadableHostsFileException or MissingHostsFileException on error.
    """
    original_host_name = None
    h = Host(name)
    try:
        num_keys = mmjob.mmjob_host_keys_length(name)
    except mm.MmException as e:
        _get_num_hosts()  # To verify validity of the hosts file. Will raise
        # a more useful exception.
        if e.rc == mmjob.MMJOB_NO_SUCH_HOST:
            original_host_name = name
            name = "localhost"
            num_keys = mmjob.mmjob_host_keys_length(name)
        else:
            raise

    setattr(h, "server_address", mmjob.mmjob_host_server(name))

    h._lines = ["name: %s" % h.name]
    # Go through each schrodinger.hosts entry for this host:
    for i in range(1, num_keys + 1):
        # Lowercase the names for consistency, since mmjob_host_get_int etc.
        # don't care about the case and our style guide precscribes
        # lowercase attribute names.
        attr = mmjob.mmjob_host_keys_item(name, i).lower()
        if mmjob.mmjob_host_is_list(name, attr) and attr != "gpgpu":
            if not hasattr(h, attr):
                setattr(h, attr, [])
            for j in range(1, mmjob.mmjob_host_list_length(name, attr) + 1):
                value = mmjob.mmjob_host_list_item(name, attr, j)
                getattr(h, attr).append(value)
                h._lines.append(f"{attr}: {value}")
        elif attr == "gpgpu":
            if not hasattr(h, attr):
                setattr(h, attr, [])
            for j in range(1, mmjob.mmjob_host_gpgpu_length(name) + 1):
                value = mmjob.mmjob_host_list_item(name, attr, j)
                (index, description) = get_gpgpu_params(value)
                getattr(h, attr).append((index, description))
                h._lines.append(f"{attr}: {index} {description}")
        elif attr == 'processors':
            value = mmjob.mmjob_host_get_int(name, attr)
            setattr(h, attr, value)
            if value > 1:
                h._lines.append("processors: %s" % h.processors)
        elif attr == 'processors_per_node':
            value = mmjob.mmjob_host_get_str(name, attr)
            # convert the string that mmjob gives us to an integer
            setattr(h, attr, int(value))
            if h.processors_per_node > 1:
                h._lines.append(
                    "processors_per_node: %s" % h.processors_per_node)
        else:
            value = mmjob.mmjob_host_get_str(name, attr)
            setattr(h, attr, value)
            h._lines.append(f"{attr}: {value}")

    # sort lines in the predefined order of fields
    h._lines.sort(key=lambda line: field_sortkey.get(line.split(':')[0], 999))

    if mmjob.mmjob_host_is_queue(name):
        h._is_queue = True
    else:
        h._is_queue = False

    if original_host_name:
        h._host = original_host_name

    return h


def get_gpgpu_params(gpgpu_str: str) -> Tuple[str, str]:
    """
    Convert a gpgpu string (ex. "0,V100") to a tuple (index, description).
    Raise an exception if the string is invalid.

    :param gpugpu_str: gpgpu line from schrodinger.hosts (ex. "0,V100")
    :type gpgpu_str: str

    :rtype: tuple(str, str)
    :raises: ValueError if the input is invalid
    """
    fields = re.split("[, ]+", gpgpu_str, 1)
    if len(fields) < 2:
        raise ValueError("ERROR: Invalid gpgpu value: '" + gpgpu_str + "'")
    (index, description) = fields
    return (index, description)


def host_str_to_list(hosts_str: str) -> List[Tuple[str, int]]:
    """
    Convert a hosts string (Ex: "galina:1 monica:4") to a list of tuples.
    First value of each tuple is the host, second value is # of cpus.
    """
    # Implemented because of # EV 55179

    host_list = []
    # Use both white spaces and commas as host entry separators (Ev:56146):
    for spacesplit in hosts_str.split():
        for hostentry in spacesplit.split(','):
            if hostentry:  # not empty string
                s = hostentry.split(':')
                if len(s) == 1:
                    ncpus = None
                else:
                    try:
                        ncpus = int(s[1])
                    except ValueError:  # ncpus is not an integer
                        raise ValueError('ERROR: Could not parse host string: '
                                         f"{hosts_str}")
                host_list.append((s[0], ncpus))
    return host_list


def host_list_to_str(host_list: List[Tuple[str, int]]) -> str:
    """
    Converts a hosts list [('host1',1), ('host2', 10)] to a string.
    Output example: "host1:1,host2:10"
    """
    host_strings = []
    for hostname, ncpus in host_list:
        if ncpus is not None:
            host_strings.append("%s:%i" % (hostname, ncpus))
        else:  # No #cpus specified; means unlimited CPUs:
            host_strings.append(hostname)
    # Return comma-separated host list string:
    return ",".join(host_strings)


def get_command_line_host_list() -> Optional[List[Tuple[str, int]]]:
    """
    Return a list of (host, ncpu) tuples corresponding to the host list that
    is specified on the command line.

    This function is meant to be called by scripts that are running under a
    toplevel job control script but are not running under jlaunch.

    The host list is determined from the following sources:
      1. SCHRODINGER_NODELIST
      2. JOBHOST (if only a single host is specified)
      3. "localhost" (if no host is specified)

    If no SCHRODINGER_NODELIST is present in the environment, None is
    returned.

    """

    hosts = None
    if 'SCHRODINGER_NODELIST' in os.environ:
        nodelist_str = os.environ['SCHRODINGER_NODELIST']
        # Ev:63971 SCHRODINGER_NODELIST is "" for single host entries
        if nodelist_str == "":  # Only one host specified via -HOST:
            nodelist_str = os.environ['JOBHOST']
            if nodelist_str == "":  # localhost
                nodelist_str = 'localhost'
        hosts = host_str_to_list(nodelist_str)

    return hosts


_backend_hosts = []


def get_backend_host_list() -> Optional[List[Tuple[str, int]]]:
    """
    Return a list of (host, ncpu) tuples corresponding to the host list as
    determined from the SCHRODINGER_NODEFILE.

    This function is meant to be called from scripts that are running under
    jlaunch (i.e. backend scripts).

    Returns None if SCHRODINGER_NODEFILE is not present in the environment.

    """
    global _backend_hosts

    # PYTHON-2012: when something uses the contents of SCHRODINGER_NODEFILE,
    # its value is not supposed to be passed on to subjobs. This is
    # accomplished by unsetting it here.
    #
    # I'm somewhat worried that people are calling this function more than
    # once in the same process, however, so am caching the results in the
    # global variable so that its behavior is consistent within a process at
    # least.
    if _backend_hosts:
        return _backend_hosts

    if 'SCHRODINGER_NODEFILE' in os.environ:
        _hosts = collections.OrderedDict()
        nodefilename = os.environ['SCHRODINGER_NODEFILE']
        with open(nodefilename) as fh:
            for line in fh:
                host = line.strip()
                if host:
                    hostpieces = host.split(":")
                    if len(hostpieces) == 2:
                        ncpu = int(hostpieces[1])
                    else:
                        ncpu = 1
                    if hostpieces[0] in _hosts:
                        _hosts[hostpieces[0]] += ncpu
                    else:
                        _hosts[hostpieces[0]] = ncpu
        del os.environ['SCHRODINGER_NODEFILE']

        _backend_hosts = list(_hosts.items())

        # Return a copy of the global variable to ensure the return value
        # remains consistent.
        return _backend_hosts[:]

    else:
        return None


def calculate_njobs(host_list: Union[str, List[Tuple[str, int]]] = None) -> int:
    """
    Derive the number of jobs from the specified host list.
    This function is useful to determine number of subjobs if user didn't
    specified the '-NJOBS' option.

    :param host_list: String of hosts along with optional number of subjobs
                      -HOST my_cluster:20 or list of tuples of hosts, typically
                      one element [(my_cluster, 20)]

    If host list is not specified then it uses get_command_line_host_list() to
    determine njobs, else uses the user provided host list.

    """
    if host_list:
        if isinstance(host_list, str):
            host_list = host_str_to_list(host_list)
    else:
        host_list = get_command_line_host_list()
    ncpus_sum = 0
    for (hostname, ncpus) in host_list:
        if ncpus is None:
            ncpus_sum += 1  # Assume 1 (even for queued hosts)
        else:
            ncpus_sum += ncpus
    return ncpus_sum


def is_valid_hostname(hostname: str) -> bool:
    """
    Checks if the hostname is valid.

    :param hostname: host name
    """
    return mmjob.mmjob_is_valid_hostname(hostname)


def get_jobname(filename: Optional[str] = None) -> Optional[str]:
    """
    Figure out the jobname from the first available source: 1) the
    SCHRODINGER_JOBNAME environment variable (comes from -JOBNAME during
    startup); 2) the job control backend; 3) the basename of a given filename.

    :param filename: if provided, and the jobname can't otherwise be determined,
                     (e.g., running outside job control with no -FILENAME
                     argument), construct a jobname from its basename.

    :return: jobname (may be None if filename was not provided)
    """
    env_jobname = os.environ.get('SCHRODINGER_JOBNAME')
    if env_jobname:
        return os.environ['SCHRODINGER_JOBNAME']

    backend = get_backend()
    if backend:
        return backend.getJob().Name

    if filename:
        return fileutils.get_jobname(filename)
    else:
        return None


def register_job_output(job: Job):
    """
    Registers the output and log files associated with the given job to the
    backend if running under jobcontrol.

    :param job: job from which to extract output/log files
    """
    backend = get_backend()
    if backend:
        for filename in job.OutputFiles + job.LogFiles:
            backend.addOutputFile(filename)