"""
Convenience functions for dealing with job server
(starting, killing, writing config file).
"""
import datetime
import getpass
import glob
import json
import logging
import os
import posixpath
import shutil
import socket
import sys
import tempfile
import time
from collections import namedtuple
from contextlib import contextmanager
from unittest.mock import patch
import paramiko
import psutil
import yaml
from schrodinger.application.licensing.licadmin import hostname_is_local
from schrodinger.job import jobcontrol
from schrodinger.job.server import jsc
from schrodinger.job import remote_command
from schrodinger.utils import fileutils
from schrodinger.utils import sshconfig
from schrodinger.utils import subprocess
SCHRODINGER_JOBSERVER_CONFIG_FILE = "SCHRODINGER_JOBSERVER_CONFIG_FILE"
SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY = "SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"
ServerInfo = namedtuple('ServerInfo', [
'hostname', 'schrodinger', 'job_server_directory', 'username', 'pid',
'job_server_port'
])
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
LINUX_PATH = "/"
INI_CONTENTS = """[program:{proc_id}]
command = {cmd}
redirect_stderr = true
stdout_logfile = {log}
# Only give the process a second to start before declaring it failed.
startsecs = 1
# Don't retry if it doesn't start.
startretries = 0
"""
[docs]def running_compatibility_tests():
"""
Returns true if we are running backwards compatibility tests.
This function is duplicated in server_management.py for backwards compatibility.
"""
return "JOBSERVER_SCHRODINGER" in os.environ
[docs]def get_user(hostname):
# case for mac machines which has this account to use 'buildbot'
if "buildbot" in getpass.getuser():
return "buildbot"
return getpass.getuser()
[docs]def get_scratch_dir(username):
scr = os.environ.get("SCHRODINGER_TMPDIR", "/scr")
return f"{scr}/{username}"
[docs]def supv(username, remote=False):
scr = get_scratch_dir(username)
if remote:
cmd = [f"{scr}/supervisord/venv/bin/supervisorctl"]
else:
cmd = ["supervisorctl"]
cmd.extend(["-c", f"{scr}/supervisord/supervisord.conf"])
return cmd
[docs]def get_ini_file(username, proc_id):
return get_scratch_dir(username) + f"/supervisord/conf/{proc_id}.ini"
[docs]def get_log_dir(username):
return get_scratch_dir(username) + "/supervisord/logs"
[docs]def get_log_location(username, proc_id):
return os.path.join(get_log_dir(username), f"{proc_id}.log")
[docs]def get_tls_config(cert_dir):
"""
:param cert_dir: path to wildcard certificates
:type cert_dir: str
:rtype: dict
:returns: dict for webserver tls config
"""
return dict(
certificate_key_file=cert_dir + "/wild.schrodinger.com.key",
certificate_chain_file=cert_dir + "/wild.schrodinger.com.pem")
[docs]def get_job_server_directory(basedir, username):
"""
Returns a unique path to a directory to store job server data in.
:param basedir: prefix of path, usually tmpdir on remote
:type basedir: str
:rtype: str
:rparam: Path to a directory
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
return basedir + f"/{username}/job_server.{timestamp}"
[docs]def job_server_exe(schrodinger):
# Use / as pathsep because this constructs local and remote paths
return LINUX_PATH.join(
[schrodinger, "internal", "bin", "job_server", "jobserverd"])
[docs]def job_server_setup_exe(schrodinger):
return LINUX_PATH.join(
[schrodinger, "internal", "bin", "job_server", "jsc_admin"])
[docs]def run(schrodinger):
return f"{schrodinger}/run"
[docs]def get_job_server_config(job_server_directory):
"""
Return a path for a server-specific job server config in the job server
directory.
"""
return posixpath.join(job_server_directory, "jobserver.test.config")
[docs]def get_queue_type(hostname: str) -> str:
"""
Return a queue type in the format for jsc_admin for a given hostname.
"""
hostname = hostname.lower()
if "slurm" in hostname:
return "Slurm"
elif "torque" in hostname:
return "Torque"
elif "lsf" in hostname:
return "LSF"
elif "pbs" in hostname:
return "PBS"
return "UGE"
[docs]def setup_host(hostname, schrodinger, job_server_directory, username,
serve_queue_jobs):
"""
Set up authentication in a new directory.
"""
cmd = [
run(schrodinger),
job_server_setup_exe(schrodinger),
]
# Create a new job_server config directory on the host. Includes
# authentication information
setup_cmd = cmd + [
'setup-server', '-host', hostname, "-dir", job_server_directory
]
if serve_queue_jobs:
setup_cmd.extend(["-queue", get_queue_type(hostname)])
else:
setup_cmd.extend(["-queue", "local"])
with get_ssh_client(hostname, username) as ssh:
run_command(ssh, setup_cmd)
[docs]def setup_supervisord(hostname, username):
""" Setup supervisord to start jobserver in given hostname"""
scr = get_scratch_dir(username)
with get_ssh_client(hostname, username) as ssh:
venv_dir = f"{scr}/supervisord/venv"
try:
run_command(ssh, ['ls', venv_dir])
except RuntimeError:
try:
run_command(ssh, ["python3", "-mvenv", venv_dir])
except RuntimeError:
run_command(ssh, [
"/utils/bin/python2.7",
"/home/buildbot/scripts/virtualenv.py", venv_dir
])
supervisorctl_path = f"{venv_dir}/bin/supervisorctl"
try:
run_command(ssh, ['ls', supervisorctl_path])
except RuntimeError:
run_command(
ssh, ["bash", "-lc", f"{venv_dir}/bin/pip install supervisor"])
else:
logger.info("supervisorctl script exists - "
f"{supervisorctl_path}; skipping supervisord setup")
return
with ssh.open_sftp() as ftp:
with ftp.file(f"{scr}/supervisord/supervisord.conf", "w") as fh:
fh.write(f"""
[unix_http_server]
file={scr}/supervisord/supervisor.sock
[supervisord]
logfile={scr}/supervisord/supervisord.log
pidfile={scr}/supervisord/supervisord.pid
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix://{scr}/supervisord/supervisor.sock ; use a unix:// URL for a unix socket
[include]
files = conf/*.ini
""")
# This is to pick path for queue commands.
run_command(ssh, [
"bash", "-lc",
f"\"{scr}/supervisord/venv/bin/supervisord\" -c \"{scr}/supervisord/supervisord.conf\""
])
[docs]def setup_server(hostname,
schrodinger,
job_server_directory,
username,
append=True,
licensing=False,
use_certs=False,
certs_dir=None,
use_shared_supervisors=False,
use_ldap=True,
use_socket_auth=True):
"""
Setup new server on arbitrary ports.
:param hostname: name of host to set up jobserver
:type hostname: str
:param schrodinger: path to SCHRODINGER
:type schrodinger: str
:param job_server_directory: base directory for job server
:type job_server_directory: str
:param username: username to use on hostname
:type username: str
:param append: If True, add server config to jobserver.config, If False,
overwrite jobserver.config
:type append: bool
:param licensing: If True, pass licensing check params to queued jobs
:type licensing: bool
:param use_certs: If True, use wildcard certificates in standard internal
locations
:type user_certs: bool
:param bool use_shared_supervisors: Use shared supervisor executables
:param bool use_ldap: Enable LDAP authentication
:param bool use_socket_auth: Enable unix socket authentication
"""
# For local jobserver, this cannot be a localhost for linux and need to be
# proper address that will map to the machine.
if sys.platform.startswith("linux") and hostname == "localhost":
hostname = socket.getfqdn()
serve_queue_jobs = not hostname_is_local(hostname)
if serve_queue_jobs:
setup_host(hostname, schrodinger, job_server_directory, username,
serve_queue_jobs)
setup_supervisord(hostname, username)
if use_certs or certs_dir:
with get_ssh_client(hostname, username) as ssh:
try:
if certs_dir is None:
scr = get_scratch_dir(username)
certs_dir = f"{scr}/supervisord/cert"
run_command(ssh,
["ls", f"{certs_dir}/wild.schrodinger.com.pem"])
except RuntimeError:
raise RuntimeError(
f"You need a wildcard cert on {hostname} before "
"setting up jobserver under supervisord. You can "
"specify the directory with --certs-dir.")
modify_jobserver_yml(hostname, job_server_directory, username,
licensing, use_shared_supervisors, certs_dir,
use_ldap, use_socket_auth)
server = start_server(
hostname,
schrodinger,
job_server_directory,
username,
serve_queue_jobs=serve_queue_jobs)
if serve_queue_jobs:
create_job_server_config(hostname, username, job_server_directory)
return server
[docs]def run_command(ssh, command, login=False):
"""
Runs a command.
:param ssh: a paramiko.SSHClient with an established connection to the
remote machine. If ssh is None, the command will be invoked by
subprocess.run
:type ssh: paramiko.SSHClient
:param command: The command to run as a list of string arguments
:type command: list[str]
:param login: If True, command requries login shell for ssh
:type login: bool
:return: The output of the executed command over ssh; None if local using
subprocess
:rtype: str, or None
This function is duplicated in server_management.py for backward
compatibility.
"""
logger.info(f"Running {command}")
if not ssh:
subprocess.run(command, universal_newlines=True, check=True)
return
env = {"PYTHONIOENCODING": "utf-8"}
command = subprocess.list2cmdline(command)
if login:
command = f"bash --login -c '{command}'"
_, out, err = ssh.exec_command(command, environment=env)
output = out.readlines()
error = err.readlines()
logger.info(f"Stdout: {output}")
logger.info(f"Stderr: {error}")
exit_status = out.channel.recv_exit_status()
if exit_status:
raise RuntimeError(f"{command} exited with {exit_status}")
return '\n'.join(output)
[docs]def setup_log_dir(log_dir, ssh=None):
if ssh:
run_command(ssh, ["mkdir", "-p", log_dir])
else:
if not os.path.exists(log_dir):
os.makedirs(log_dir)
[docs]def write_ini_file(ini_file, ini_contents, ssh=None):
""" Write the configuration file to manage jobserver """
logger.info(f"Writing {ini_file}")
dirname = os.path.dirname(ini_file)
if ssh:
run_command(ssh, ["mkdir", "-p", dirname])
with ssh.open_sftp() as ftp:
with ftp.file(ini_file, "w") as fh:
fh.write(ini_contents)
else:
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(ini_file, "w") as fh:
fh.write(ini_contents)
[docs]def monitor_job_server_with_supervisord(cmd, hostname, job_server_directory,
username):
"""
monitor the given job server command under supervisord
"""
proc_id = os.path.basename(job_server_directory)
log = get_log_location(username, proc_id)
conf = get_ini_file(username, proc_id)
ssh = None
ini_contents = INI_CONTENTS.format(
proc_id=proc_id, cmd=" ".join(cmd), log=log)
with get_ssh_client(hostname, username) as ssh:
setup_log_dir(get_log_dir(username), ssh)
write_ini_file(conf, ini_contents, ssh)
reload_supervisord(ssh, username)
logger.info(f"Show status of newly started program {proc_id}")
try:
run_command(ssh,
supv(username, remote=bool(ssh)) + ["status", proc_id])
except RuntimeError:
logger.info("Supervisord status command failed with an error; "
"attempting to tail the process logs.")
run_command(ssh,
supv(username, remote=bool(ssh)) + ["tail", proc_id])
raise
return get_ports_from_file(job_server_directory, ssh=ssh)
@contextmanager
def _get_file_handle(ssh, log_filename):
"""
:param ssh: open ssh client, or None to read locally
:type ssh: paramiko.SSHClient
:param log_filename: path of file to open
:type log_filename: str
:rtype: yields readable file-like object
"""
if ssh:
with ssh.open_sftp() as ftp:
with ftp.file(log_filename, "r") as fh:
yield fh
else:
with open(log_filename) as fh:
yield fh
[docs]def get_ports_from_file(job_server_directory, ssh=None):
"""
Parse the output of jobserver to get the pid and ports.
:param log_filename: path of file to open
:type log_filename: str
:param ssh: open ssh client, or None to read locally
:type ssh: paramiko.SSHClient
:rtype: tuple(int,int,int,int)
"""
filename = job_server_directory + "/runstate"
host = "localhost"
if ssh:
_, out, _ = ssh.exec_command("hostname")
host = out.read().strip()
logger.info(f"Reading {filename} from {host}")
current_time = 0
while True:
try:
with _get_file_handle(ssh, filename) as fh:
return json.loads(fh.read())
except FileNotFoundError:
# wait for successful read for 60s
if current_time < 60:
sleep_time = 2
logger.info(
f"Failed to read ports from {filename} from {host}, "
f"retry in {sleep_time}s")
time.sleep(sleep_time)
current_time += sleep_time
continue
raise
[docs]def start_server(hostname,
schrodinger,
job_server_directory,
username,
serve_queue_jobs=False):
"""
Start a job_server in the `schrodinger` directory on the `hostname`
provided, using the `job_server_directory` as its local storage.
"""
if not serve_queue_jobs:
return start_localhost_server(hostname, schrodinger,
job_server_directory)
cmd = [
run(schrodinger),
job_server_exe(schrodinger),
"--dir",
job_server_directory,
"--with-low-performance-db",
]
runstate = monitor_job_server_with_supervisord(
cmd, hostname, job_server_directory, username)
return ServerInfo(
hostname=hostname,
schrodinger=schrodinger,
job_server_directory=job_server_directory,
username=username,
pid=runstate['pid'],
job_server_port=runstate['job_server_port'])
[docs]def create_job_server_config(hostname, username, job_server_directory):
"""
Copy the user_authentication that is created automatically at server
setup from the remote server machine to the local launch host and dump
it into a job server config file.
:param hostname: name of hostname where job server config is located
:type hostname: str
"""
with get_ssh_client(hostname, username) as ssh:
with ssh.open_sftp() as ftp:
runstate_file = posixpath.join(job_server_directory, "runstate")
with ftp.file(runstate_file, "r") as fh:
runstate = json.loads(fh.read())
with ftp.file(runstate["user_certificate"], "r") as fh:
auth = json.loads(
str(fh.read(), encoding="ascii", errors="strict"))
jobport = runstate["job_server_port"]
job_server_config_path = os.environ[SCHRODINGER_JOBSERVER_CONFIG_FILE]
if os.path.exists(job_server_config_path):
if os.path.getsize(job_server_config_path) > 0:
with open(job_server_config_path) as fh:
job_server_config = json.loads(fh.read())
else:
job_server_config = []
else:
job_server_config = []
for server_config in job_server_config[:]:
if hostname == server_config.get("hostname", ""):
job_server_config.remove(server_config)
job_server_config.append({
"hostname": hostname,
"jobport": jobport,
"auth": auth
})
with open(job_server_config_path, "w") as fh:
json.dump(job_server_config, fh)
[docs]def modify_jobserver_yml(hostname,
job_server_directory,
username,
licensing,
use_shared_supervisors,
certs_dir=None,
use_ldap=True,
use_socket_auth=True):
"""
Modify the jobserver config to respect licensing and web server
certificates for a multi-user queue server.
:param hostname: name of hostname where job server config is located
:type hostname: str
:param job_server_directory: path to job server on hostname
:type get_job_server_directory: str
:param licensing: If True, enable license checking on job_server
:type licensing: bool
:param certs_dir: If provided, use wildcard certificates from that
directory.
:type certs_dir: str
:param bool use_shared_supervisors: Use shared supervisor executables
:param bool use_ldap: Enable LDAP authentication
:param bool use_socket_auth: Enable unix socket authentication
"""
config_directory = LINUX_PATH.join([job_server_directory, "config"])
jobserver_yml = LINUX_PATH.join([config_directory, "jobserver.yml"])
jobserver_yml_orig = LINUX_PATH.join(
[config_directory, "jobserver.yml.orig"])
with get_ssh_client(hostname, username) as ssh:
ssh.exec_command(f"cp {jobserver_yml} {jobserver_yml_orig}")
with ssh.open_sftp() as ftp:
with ftp.file(jobserver_yml_orig, "r") as fh:
config = yaml.load(fh.read())
config["server_mode"] = "multi-user"
config["job_server"]["port"] = 0
config["file_store"]["port"] = 0
config["web_server"]["port"] = 0
config["job_server"]["use_local_auth"] = use_socket_auth
if use_ldap:
config["ldap_auth"]["addr"] = "ldap1.schrodinger.com:636"
config["ldap_auth"][
"bind_dn_template"] = "uid={{.User}},ou=people,dc=schrodinger,dc=com"
config["ldap_auth"]["insecure_tls_skip_verify"] = True
else:
# This field is set by default in jsc_admin setup-server for
# internal multi-user deployments so needs to be explicitly disabled.
config["ldap_auth"]["addr"] = ""
config["job_server"]["license_checking"] = bool(licensing)
if certs_dir:
config["web_server"]["tls"] = get_tls_config(certs_dir)
if use_shared_supervisors:
config["job_server"][
"supervisor_directory"] = "/nfs/working/builds/job_server_execs"
if hostname.startswith("pdxgpu"):
config["job_server"][
"schrodinger_hosts_hostname"] = "pdxgpusub1.schrodinger.com"
# allow deployment of test job servers on test systems which don't
# have enough fds
config["file_descriptors"] = 1024
with ftp.file(jobserver_yml, "w") as fh:
fh.write(yaml.dump(config, default_flow_style=False))
[docs]def copy_server_log(server, destdir):
""" Copy jobserverd log file """
if hostname_is_local(server.hostname):
log_directory = os.path.join(server.job_server_directory, "logs")
for file in os.listdir(log_directory):
shutil.copy(os.path.join(log_directory, file), destdir)
return
with get_ssh_client(server.hostname, server.username) as sshclient:
with sshclient.open_sftp() as sftp:
log_directory = server.job_server_directory + "/logs"
for filename in sftp.listdir(log_directory):
sftp.get(f'{log_directory}/{filename}', f'{destdir}/{filename}')
[docs]def clean_server(ssh, server):
""" Clean server files in supervisord """
proc_id = os.path.basename(server.job_server_directory)
log_location = get_log_location(server.username, proc_id)
ini_location = get_ini_file(server.username, proc_id)
for filename in (ini_location, log_location):
run_command(ssh, ["rm", "-rf", filename])
return
[docs]def clean_localhost_server(server):
""" Clean localhost jobserver """
clean_server(None, server)
return
[docs]def clean_remotehost_server(ssh, server):
""" Clean remotehost jobserver """
reload_supervisord(ssh, server.username)
clean_server(ssh, server)
return
[docs]def kill_server(server):
"""
Kill the job_server on `hostname` that is using `job_server_directory`
as its local storage.
"""
if hostname_is_local(server.hostname):
stop_localhost_server(server)
if sys.platform != 'win32':
clean_localhost_server(server)
return
with get_ssh_client(server.hostname, server.username) as ssh:
proc_id = os.path.basename(server.job_server_directory)
run_command(ssh,
supv(server.username, remote=bool(ssh)) + ["stop", proc_id])
clean_remotehost_server(ssh, server)
return
[docs]def clean_server_dir(server):
"""
Remove `job_server_directory` on `hostname`.
"""
if hostname_is_local(server.hostname):
shutil.rmtree(server.job_server_directory)
return
cmd = ["rm", "-r", server.job_server_directory]
with get_ssh_client(server.hostname, server.username) as ssh:
run_command(ssh, cmd)
[docs]def reload_supervisord(ssh, username):
"""
Re-read configuration files and start any updated services. This is "safe
reload" operation and doesn't restart any processes running under
supervisord.
:param ssh: open ssh connection
:type ssh: paramiko.SSHClient
"""
run_command(ssh, supv(username, remote=bool(ssh)) + ["reread"])
run_command(ssh, supv(username, remote=bool(ssh)) + ["update"])
[docs]def start_localhost_server(hostname, schrodinger, job_server_directory):
run_command(
None,
[jsc(schrodinger), "local-server-start", "-dir", job_server_directory],
)
username = get_username_from_host_entry("localhost")
runstate = get_ports_from_file(job_server_directory, ssh=None)
return ServerInfo(
hostname=hostname,
schrodinger=schrodinger,
job_server_directory=job_server_directory,
username=username,
pid=runstate['pid'],
job_server_port=runstate['job_server_port'],
)
[docs]def stop_localhost_server(server):
""" Stop the jobserver running in localhost """
run_command(
None,
[
jsc(server.schrodinger), "local-server-stop", "-dir",
server.job_server_directory
],
)
return
@contextmanager
def _get_temp_ssh_key():
if sys.platform != "win32":
yield None
else:
with tempfile.NamedTemporaryFile(delete=False) as fh:
temp_file = fh.name
try:
ppk_file, _ = sshconfig.find_key_pair()
sshconfig._convert_ppk_openssh(ppk_file, temp_file)
yield temp_file
finally:
os.remove(temp_file)
[docs]@contextmanager
def get_ssh_client(hostname, username):
"""
Return ssh client for hostname. Closes ssh connection automatically.
:param hostname: name of remote host
:type hostname: str
"""
if hostname_is_local(hostname):
yield
else:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
with _get_temp_ssh_key() as key_filename:
logger.info(f"Connecting to {username}@{hostname}")
ssh.connect(hostname, username=username, key_filename=key_filename)
yield ssh
logger.info(f"Disconnecting from {username}@{hostname}")
ssh.close()
[docs]def get_username_from_host_entry(host_entry_name):
"""
Return username from host_entry_name to find the correct remote user. This
user name is useful for directory creation, and remote authentication.
:param host_entry_name: name of host entry (localhost, bolt-gpu)
:type host_entry_name: str
:rtype: str
"""
host_entry = jobcontrol.get_host(host_entry_name)
if host_entry.user:
return host_entry.user
username = getpass.getuser()
if "+" in username: # msys2 username on domain
return username.split("+")[-1]
return username
[docs]@contextmanager
def schrodinger_jobserver_config():
"""
Set job server configuration to a temporary location. Cleans up
the file after use.
This function is duplicated in server_management.py for backwards compatibility.
"""
tmpfile, filepath = tempfile.mkstemp(prefix="")
os.close(tmpfile)
with patch.dict(os.environ, {SCHRODINGER_JOBSERVER_CONFIG_FILE: filepath}):
try:
yield
finally:
os.remove(filepath)