Source code for schrodinger.job.server

"""
Interact with a Job Server.
"""
import json
import os
from typing import NamedTuple
from typing import Union
from schrodinger.utils import subprocess
from schrodinger.Qt import QtCore
from schrodinger.infra import mmjob
import sys


[docs]class JobServersHealthCheck(QtCore.QThread): # Signal emittted when jobserver status changed from last # reported. serverStatusChanged = QtCore.pyqtSignal(mmjob.JobServerOnlineInfo)
[docs] def runHealthCheck(self): """ Check the health of configured jobservers and report its information. """ status = mmjob.run_health_check(self._connect_deadline) for server in status: if server.getHostPort() in self._status: if self._status[ server.getHostPort()].getStatus() == server.getStatus(): continue self._status[server.getHostPort()] = server self.serverStatusChanged.emit(server)
[docs] def __init__(self, interval: int, connect_deadline: int): """ Initializes the jobservers health check monitor. :param interval: Interval in seconds to wait to trigger the health check of jobservers. :param connect_deadline: Deadline to wait in seconds for jobserver channel to setup if needed and query status. """ super().__init__() self._interval = interval self._connect_deadline = connect_deadline self._status = {} self.health_check_timer = QtCore.QTimer() self.health_check_timer.moveToThread(self) self.health_check_timer.timeout.connect(self.runHealthCheck)
[docs] def run(self): self.health_check_timer.start(self._interval * 1000) event_loop = QtCore.QEventLoop() event_loop.exec()
[docs]def ensure_localhost_server_running(): """ Makes sure there is a localhost jobserver running to do devtests. This server is configured in the default location. """ proc = subprocess.run( [jsc(os.environ["SCHRODINGER"]), "local-server-start"], stderr=subprocess.STDOUT, stdout=subprocess.PIPE) if proc.returncode != 0: raise RuntimeError( f"localhost job server failed to start with error {proc.stdout}")
[docs]def jsc(schrodinger): return f"{schrodinger}/jsc"
[docs]class ServerInfo(NamedTuple): webServerSecret: str hasLicenseChecking: bool hasServerAuth: bool hasSocketAuth: bool authSocketPath: str versionString: str APIVersion: str hostname: str
[docs] @classmethod def from_dict(cls, data: dict) -> "ServerInfo": """ Convert a dict to a ServerInfo. """ return ServerInfo(**{field: data[field] for field in cls._fields})
[docs] def has_authenticator(self) -> bool: return self.hasServerAuth or self.hasSocketAuth
[docs]def get_server_info(schrodinger: str, address: str) -> ServerInfo: """ Subprocess '$SCHRODINGER/jsc server-info' to get server information. """ server_info_command = [jsc(schrodinger), "server-info", "--json", address] proc = subprocess.run( server_info_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if proc.returncode: raise RuntimeError( f"Could not get server info from job server: {address}.\n" f"Ran command: '{server_info_command}' with output: '{proc.stdout!r}'" f"with exit code: '{proc.returncode}.'") info = decode_to_server_info(proc.stdout) return info
[docs]def decode_to_server_info(data: Union[str, bytes]) -> ServerInfo: info_json = json.loads(data) server_info = ServerInfo.from_dict(info_json) return server_info