Source code for schrodinger.test.jobserver

"""
Convenience functions for dealing with job server
    (starting, killing, writing config file).
"""
import datetime
import getpass
import glob
import json
import logging
import os
import posixpath
import shutil
import socket
import sys
import tempfile
import time
from collections import namedtuple
from contextlib import contextmanager
from unittest.mock import patch

import paramiko
import psutil
import yaml

from schrodinger.application.licensing.licadmin import hostname_is_local
from schrodinger.job import jobcontrol
from schrodinger.job.server import jsc
from schrodinger.job import remote_command
from schrodinger.utils import fileutils
from schrodinger.utils import sshconfig
from schrodinger.utils import subprocess

SCHRODINGER_JOBSERVER_CONFIG_FILE = "SCHRODINGER_JOBSERVER_CONFIG_FILE"
SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY = "SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"

ServerInfo = namedtuple('ServerInfo', [
    'hostname', 'schrodinger', 'job_server_directory', 'username', 'pid',
    'job_server_port'
])

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)

LINUX_PATH = "/"

INI_CONTENTS = """[program:{proc_id}]
command = {cmd}
redirect_stderr = true
stdout_logfile = {log}
# Only give the process a second to start before declaring it failed.
startsecs = 1
# Don't retry if it doesn't start.
startretries = 0
"""


[docs]def running_compatibility_tests(): """ Returns true if we are running backwards compatibility tests. This function is duplicated in server_management.py for backwards compatibility. """ return "JOBSERVER_SCHRODINGER" in os.environ
[docs]def get_user(hostname): # case for mac machines which has this account to use 'buildbot' if "buildbot" in getpass.getuser(): return "buildbot" return getpass.getuser()
[docs]def get_scratch_dir(username): scr = os.environ.get("SCHRODINGER_TMPDIR", "/scr") return f"{scr}/{username}"
[docs]def supv(username, remote=False): scr = get_scratch_dir(username) if remote: cmd = [f"{scr}/supervisord/venv/bin/supervisorctl"] else: cmd = ["supervisorctl"] cmd.extend(["-c", f"{scr}/supervisord/supervisord.conf"]) return cmd
[docs]def get_ini_file(username, proc_id): return get_scratch_dir(username) + f"/supervisord/conf/{proc_id}.ini"
[docs]def get_log_dir(username): return get_scratch_dir(username) + "/supervisord/logs"
[docs]def get_log_location(username, proc_id): return os.path.join(get_log_dir(username), f"{proc_id}.log")
[docs]def get_tls_config(cert_dir): """ :param cert_dir: path to wildcard certificates :type cert_dir: str :rtype: dict :returns: dict for webserver tls config """ return dict( certificate_key_file=cert_dir + "/wild.schrodinger.com.key", certificate_chain_file=cert_dir + "/wild.schrodinger.com.pem")
[docs]def get_job_server_directory(basedir, username): """ Returns a unique path to a directory to store job server data in. :param basedir: prefix of path, usually tmpdir on remote :type basedir: str :rtype: str :rparam: Path to a directory """ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") return basedir + f"/{username}/job_server.{timestamp}"
[docs]def job_server_exe(schrodinger): # Use / as pathsep because this constructs local and remote paths return LINUX_PATH.join( [schrodinger, "internal", "bin", "job_server", "jobserverd"])
[docs]def job_server_setup_exe(schrodinger): return LINUX_PATH.join( [schrodinger, "internal", "bin", "job_server", "jsc_admin"])
[docs]def run(schrodinger): return f"{schrodinger}/run"
[docs]def get_job_server_config(job_server_directory): """ Return a path for a server-specific job server config in the job server directory. """ return posixpath.join(job_server_directory, "jobserver.test.config")
[docs]def get_queue_type(hostname: str) -> str: """ Return a queue type in the format for jsc_admin for a given hostname. """ hostname = hostname.lower() if "slurm" in hostname: return "Slurm" elif "torque" in hostname: return "Torque" elif "lsf" in hostname: return "LSF" elif "pbs" in hostname: return "PBS" return "UGE"
[docs]def setup_host(hostname, schrodinger, job_server_directory, username, serve_queue_jobs): """ Set up authentication in a new directory. """ cmd = [ run(schrodinger), job_server_setup_exe(schrodinger), ] # Create a new job_server config directory on the host. Includes # authentication information setup_cmd = cmd + [ 'setup-server', '-host', hostname, "-dir", job_server_directory ] if serve_queue_jobs: setup_cmd.extend(["-queue", get_queue_type(hostname)]) else: setup_cmd.extend(["-queue", "local"]) with get_ssh_client(hostname, username) as ssh: run_command(ssh, setup_cmd)
[docs]def setup_supervisord(hostname, username): """ Setup supervisord to start jobserver in given hostname""" scr = get_scratch_dir(username) with get_ssh_client(hostname, username) as ssh: venv_dir = f"{scr}/supervisord/venv" try: run_command(ssh, ['ls', venv_dir]) except RuntimeError: try: run_command(ssh, ["python3", "-mvenv", venv_dir]) except RuntimeError: run_command(ssh, [ "/utils/bin/python2.7", "/home/buildbot/scripts/virtualenv.py", venv_dir ]) supervisorctl_path = f"{venv_dir}/bin/supervisorctl" try: run_command(ssh, ['ls', supervisorctl_path]) except RuntimeError: run_command( ssh, ["bash", "-lc", f"{venv_dir}/bin/pip install supervisor"]) else: logger.info("supervisorctl script exists - " f"{supervisorctl_path}; skipping supervisord setup") return with ssh.open_sftp() as ftp: with ftp.file(f"{scr}/supervisord/supervisord.conf", "w") as fh: fh.write(f""" [unix_http_server] file={scr}/supervisord/supervisor.sock [supervisord] logfile={scr}/supervisord/supervisord.log pidfile={scr}/supervisord/supervisord.pid [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix://{scr}/supervisord/supervisor.sock ; use a unix:// URL for a unix socket [include] files = conf/*.ini """) # This is to pick path for queue commands. run_command(ssh, [ "bash", "-lc", f"\"{scr}/supervisord/venv/bin/supervisord\" -c \"{scr}/supervisord/supervisord.conf\"" ])
[docs]def setup_server(hostname, schrodinger, job_server_directory, username, append=True, licensing=False, use_certs=False, certs_dir=None, use_shared_supervisors=False, use_ldap=True, use_socket_auth=True): """ Setup new server on arbitrary ports. :param hostname: name of host to set up jobserver :type hostname: str :param schrodinger: path to SCHRODINGER :type schrodinger: str :param job_server_directory: base directory for job server :type job_server_directory: str :param username: username to use on hostname :type username: str :param append: If True, add server config to jobserver.config, If False, overwrite jobserver.config :type append: bool :param licensing: If True, pass licensing check params to queued jobs :type licensing: bool :param use_certs: If True, use wildcard certificates in standard internal locations :type user_certs: bool :param bool use_shared_supervisors: Use shared supervisor executables :param bool use_ldap: Enable LDAP authentication :param bool use_socket_auth: Enable unix socket authentication """ # For local jobserver, this cannot be a localhost for linux and need to be # proper address that will map to the machine. if sys.platform.startswith("linux") and hostname == "localhost": hostname = socket.getfqdn() serve_queue_jobs = not hostname_is_local(hostname) if serve_queue_jobs: setup_host(hostname, schrodinger, job_server_directory, username, serve_queue_jobs) setup_supervisord(hostname, username) if use_certs or certs_dir: with get_ssh_client(hostname, username) as ssh: try: if certs_dir is None: scr = get_scratch_dir(username) certs_dir = f"{scr}/supervisord/cert" run_command(ssh, ["ls", f"{certs_dir}/wild.schrodinger.com.pem"]) except RuntimeError: raise RuntimeError( f"You need a wildcard cert on {hostname} before " "setting up jobserver under supervisord. You can " "specify the directory with --certs-dir.") modify_jobserver_yml(hostname, job_server_directory, username, licensing, use_shared_supervisors, certs_dir, use_ldap, use_socket_auth) server = start_server( hostname, schrodinger, job_server_directory, username, serve_queue_jobs=serve_queue_jobs) if serve_queue_jobs: create_job_server_config(hostname, username, job_server_directory) return server
[docs]def run_command(ssh, command, login=False): """ Runs a command. :param ssh: a paramiko.SSHClient with an established connection to the remote machine. If ssh is None, the command will be invoked by subprocess.run :type ssh: paramiko.SSHClient :param command: The command to run as a list of string arguments :type command: list[str] :param login: If True, command requries login shell for ssh :type login: bool :return: The output of the executed command over ssh; None if local using subprocess :rtype: str, or None This function is duplicated in server_management.py for backward compatibility. """ logger.info(f"Running {command}") if not ssh: subprocess.run(command, universal_newlines=True, check=True) return env = {"PYTHONIOENCODING": "utf-8"} command = subprocess.list2cmdline(command) if login: command = f"bash --login -c '{command}'" _, out, err = ssh.exec_command(command, environment=env) output = out.readlines() error = err.readlines() logger.info(f"Stdout: {output}") logger.info(f"Stderr: {error}") exit_status = out.channel.recv_exit_status() if exit_status: raise RuntimeError(f"{command} exited with {exit_status}") return '\n'.join(output)
[docs]def setup_log_dir(log_dir, ssh=None): if ssh: run_command(ssh, ["mkdir", "-p", log_dir]) else: if not os.path.exists(log_dir): os.makedirs(log_dir)
[docs]def write_ini_file(ini_file, ini_contents, ssh=None): """ Write the configuration file to manage jobserver """ logger.info(f"Writing {ini_file}") dirname = os.path.dirname(ini_file) if ssh: run_command(ssh, ["mkdir", "-p", dirname]) with ssh.open_sftp() as ftp: with ftp.file(ini_file, "w") as fh: fh.write(ini_contents) else: if not os.path.exists(dirname): os.makedirs(dirname) with open(ini_file, "w") as fh: fh.write(ini_contents)
[docs]def monitor_job_server_with_supervisord(cmd, hostname, job_server_directory, username): """ monitor the given job server command under supervisord """ proc_id = os.path.basename(job_server_directory) log = get_log_location(username, proc_id) conf = get_ini_file(username, proc_id) ssh = None ini_contents = INI_CONTENTS.format( proc_id=proc_id, cmd=" ".join(cmd), log=log) with get_ssh_client(hostname, username) as ssh: setup_log_dir(get_log_dir(username), ssh) write_ini_file(conf, ini_contents, ssh) reload_supervisord(ssh, username) logger.info(f"Show status of newly started program {proc_id}") try: run_command(ssh, supv(username, remote=bool(ssh)) + ["status", proc_id]) except RuntimeError: logger.info("Supervisord status command failed with an error; " "attempting to tail the process logs.") run_command(ssh, supv(username, remote=bool(ssh)) + ["tail", proc_id]) raise return get_ports_from_file(job_server_directory, ssh=ssh)
@contextmanager def _get_file_handle(ssh, log_filename): """ :param ssh: open ssh client, or None to read locally :type ssh: paramiko.SSHClient :param log_filename: path of file to open :type log_filename: str :rtype: yields readable file-like object """ if ssh: with ssh.open_sftp() as ftp: with ftp.file(log_filename, "r") as fh: yield fh else: with open(log_filename) as fh: yield fh
[docs]def get_ports_from_file(job_server_directory, ssh=None): """ Parse the output of jobserver to get the pid and ports. :param log_filename: path of file to open :type log_filename: str :param ssh: open ssh client, or None to read locally :type ssh: paramiko.SSHClient :rtype: tuple(int,int,int,int) """ filename = job_server_directory + "/runstate" host = "localhost" if ssh: _, out, _ = ssh.exec_command("hostname") host = out.read().strip() logger.info(f"Reading {filename} from {host}") current_time = 0 while True: try: with _get_file_handle(ssh, filename) as fh: return json.loads(fh.read()) except FileNotFoundError: # wait for successful read for 60s if current_time < 60: sleep_time = 2 logger.info( f"Failed to read ports from {filename} from {host}, " f"retry in {sleep_time}s") time.sleep(sleep_time) current_time += sleep_time continue raise
[docs]def start_server(hostname, schrodinger, job_server_directory, username, serve_queue_jobs=False): """ Start a job_server in the `schrodinger` directory on the `hostname` provided, using the `job_server_directory` as its local storage. """ if not serve_queue_jobs: return start_localhost_server(hostname, schrodinger, job_server_directory) cmd = [ run(schrodinger), job_server_exe(schrodinger), "--dir", job_server_directory, "--with-low-performance-db", ] runstate = monitor_job_server_with_supervisord( cmd, hostname, job_server_directory, username) return ServerInfo( hostname=hostname, schrodinger=schrodinger, job_server_directory=job_server_directory, username=username, pid=runstate['pid'], job_server_port=runstate['job_server_port'])
[docs]def create_job_server_config(hostname, username, job_server_directory): """ Copy the user_authentication that is created automatically at server setup from the remote server machine to the local launch host and dump it into a job server config file. :param hostname: name of hostname where job server config is located :type hostname: str """ with get_ssh_client(hostname, username) as ssh: with ssh.open_sftp() as ftp: runstate_file = posixpath.join(job_server_directory, "runstate") with ftp.file(runstate_file, "r") as fh: runstate = json.loads(fh.read()) with ftp.file(runstate["user_certificate"], "r") as fh: auth = json.loads( str(fh.read(), encoding="ascii", errors="strict")) jobport = runstate["job_server_port"] job_server_config_path = os.environ[SCHRODINGER_JOBSERVER_CONFIG_FILE] if os.path.exists(job_server_config_path): if os.path.getsize(job_server_config_path) > 0: with open(job_server_config_path) as fh: job_server_config = json.loads(fh.read()) else: job_server_config = [] else: job_server_config = [] for server_config in job_server_config[:]: if hostname == server_config.get("hostname", ""): job_server_config.remove(server_config) job_server_config.append({ "hostname": hostname, "jobport": jobport, "auth": auth }) with open(job_server_config_path, "w") as fh: json.dump(job_server_config, fh)
[docs]def modify_jobserver_yml(hostname, job_server_directory, username, licensing, use_shared_supervisors, certs_dir=None, use_ldap=True, use_socket_auth=True): """ Modify the jobserver config to respect licensing and web server certificates for a multi-user queue server. :param hostname: name of hostname where job server config is located :type hostname: str :param job_server_directory: path to job server on hostname :type get_job_server_directory: str :param licensing: If True, enable license checking on job_server :type licensing: bool :param certs_dir: If provided, use wildcard certificates from that directory. :type certs_dir: str :param bool use_shared_supervisors: Use shared supervisor executables :param bool use_ldap: Enable LDAP authentication :param bool use_socket_auth: Enable unix socket authentication """ config_directory = LINUX_PATH.join([job_server_directory, "config"]) jobserver_yml = LINUX_PATH.join([config_directory, "jobserver.yml"]) jobserver_yml_orig = LINUX_PATH.join( [config_directory, "jobserver.yml.orig"]) with get_ssh_client(hostname, username) as ssh: ssh.exec_command(f"cp {jobserver_yml} {jobserver_yml_orig}") with ssh.open_sftp() as ftp: with ftp.file(jobserver_yml_orig, "r") as fh: config = yaml.load(fh.read()) config["server_mode"] = "multi-user" config["job_server"]["port"] = 0 config["file_store"]["port"] = 0 config["web_server"]["port"] = 0 config["job_server"]["use_local_auth"] = use_socket_auth if use_ldap: config["ldap_auth"]["addr"] = "ldap1.schrodinger.com:636" config["ldap_auth"][ "bind_dn_template"] = "uid={{.User}},ou=people,dc=schrodinger,dc=com" config["ldap_auth"]["insecure_tls_skip_verify"] = True else: # This field is set by default in jsc_admin setup-server for # internal multi-user deployments so needs to be explicitly disabled. config["ldap_auth"]["addr"] = "" config["job_server"]["license_checking"] = bool(licensing) if certs_dir: config["web_server"]["tls"] = get_tls_config(certs_dir) if use_shared_supervisors: config["job_server"][ "supervisor_directory"] = "/nfs/working/builds/job_server_execs" if hostname.startswith("pdxgpu"): config["job_server"][ "schrodinger_hosts_hostname"] = "pdxgpusub1.schrodinger.com" # allow deployment of test job servers on test systems which don't # have enough fds config["file_descriptors"] = 1024 with ftp.file(jobserver_yml, "w") as fh: fh.write(yaml.dump(config, default_flow_style=False))
[docs]def copy_server_log(server, destdir): """ Copy jobserverd log file """ if hostname_is_local(server.hostname): log_directory = os.path.join(server.job_server_directory, "logs") for file in os.listdir(log_directory): shutil.copy(os.path.join(log_directory, file), destdir) return with get_ssh_client(server.hostname, server.username) as sshclient: with sshclient.open_sftp() as sftp: log_directory = server.job_server_directory + "/logs" for filename in sftp.listdir(log_directory): sftp.get(f'{log_directory}/{filename}', f'{destdir}/{filename}')
[docs]def clean_server(ssh, server): """ Clean server files in supervisord """ proc_id = os.path.basename(server.job_server_directory) log_location = get_log_location(server.username, proc_id) ini_location = get_ini_file(server.username, proc_id) for filename in (ini_location, log_location): run_command(ssh, ["rm", "-rf", filename]) return
[docs]def clean_localhost_server(server): """ Clean localhost jobserver """ clean_server(None, server) return
[docs]def clean_remotehost_server(ssh, server): """ Clean remotehost jobserver """ reload_supervisord(ssh, server.username) clean_server(ssh, server) return
[docs]def kill_server(server): """ Kill the job_server on `hostname` that is using `job_server_directory` as its local storage. """ if hostname_is_local(server.hostname): stop_localhost_server(server) if sys.platform != 'win32': clean_localhost_server(server) return with get_ssh_client(server.hostname, server.username) as ssh: proc_id = os.path.basename(server.job_server_directory) run_command(ssh, supv(server.username, remote=bool(ssh)) + ["stop", proc_id]) clean_remotehost_server(ssh, server) return
[docs]def clean_server_dir(server): """ Remove `job_server_directory` on `hostname`. """ if hostname_is_local(server.hostname): shutil.rmtree(server.job_server_directory) return cmd = ["rm", "-r", server.job_server_directory] with get_ssh_client(server.hostname, server.username) as ssh: run_command(ssh, cmd)
[docs]def reload_supervisord(ssh, username): """ Re-read configuration files and start any updated services. This is "safe reload" operation and doesn't restart any processes running under supervisord. :param ssh: open ssh connection :type ssh: paramiko.SSHClient """ run_command(ssh, supv(username, remote=bool(ssh)) + ["reread"]) run_command(ssh, supv(username, remote=bool(ssh)) + ["update"])
[docs]def start_localhost_server(hostname, schrodinger, job_server_directory): run_command( None, [jsc(schrodinger), "local-server-start", "-dir", job_server_directory], ) username = get_username_from_host_entry("localhost") runstate = get_ports_from_file(job_server_directory, ssh=None) return ServerInfo( hostname=hostname, schrodinger=schrodinger, job_server_directory=job_server_directory, username=username, pid=runstate['pid'], job_server_port=runstate['job_server_port'], )
[docs]def stop_localhost_server(server): """ Stop the jobserver running in localhost """ run_command( None, [ jsc(server.schrodinger), "local-server-stop", "-dir", server.job_server_directory ], ) return
@contextmanager def _get_temp_ssh_key(): if sys.platform != "win32": yield None else: with tempfile.NamedTemporaryFile(delete=False) as fh: temp_file = fh.name try: ppk_file, _ = sshconfig.find_key_pair() sshconfig._convert_ppk_openssh(ppk_file, temp_file) yield temp_file finally: os.remove(temp_file)
[docs]@contextmanager def get_ssh_client(hostname, username): """ Return ssh client for hostname. Closes ssh connection automatically. :param hostname: name of remote host :type hostname: str """ if hostname_is_local(hostname): yield else: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) with _get_temp_ssh_key() as key_filename: logger.info(f"Connecting to {username}@{hostname}") ssh.connect(hostname, username=username, key_filename=key_filename) yield ssh logger.info(f"Disconnecting from {username}@{hostname}") ssh.close()
[docs]def get_username_from_host_entry(host_entry_name): """ Return username from host_entry_name to find the correct remote user. This user name is useful for directory creation, and remote authentication. :param host_entry_name: name of host entry (localhost, bolt-gpu) :type host_entry_name: str :rtype: str """ host_entry = jobcontrol.get_host(host_entry_name) if host_entry.user: return host_entry.user username = getpass.getuser() if "+" in username: # msys2 username on domain return username.split("+")[-1] return username
[docs]@contextmanager def schrodinger_jobserver_config(): """ Set job server configuration to a temporary location. Cleans up the file after use. This function is duplicated in server_management.py for backwards compatibility. """ tmpfile, filepath = tempfile.mkstemp(prefix="") os.close(tmpfile) with patch.dict(os.environ, {SCHRODINGER_JOBSERVER_CONFIG_FILE: filepath}): try: yield finally: os.remove(filepath)