"""
Functions that are used in one or more modules. reduces danger of circular
dependencies.
@copyright: Schrodinger, Inc. All rights reserved.
"""
import datetime
import errno
import itertools
import os
import re
import stat
import sys
import zipfile
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger('stu_backend')
_api_key = None
"""
Module level cache for API key. Only accessible by one user, so this is OK.
"""
BASE_URL = 'https://stu.schrodinger.com'
"""Address of STU server"""
DATE_RE = r'^\d\d\d\d-\d\d-\d\d$'
"""The date format that our NBs use."""
BUILD_ID_RE = re.compile(r'build(?:\d{2}\b|-\d{3}\b)')
"""Updated build_id naming scheme."""
JOBID_RE = r'-[a-f0-9]{8}$|-[a-f0-9]{8}\.'
ZIP_MODE = zipfile.ZIP_DEFLATED
[docs]class ZipError(Exception):
"""Error while zipping up files."""
[docs]def str2list(string):
"""
Takes a string which can be a comma- or space-separated collection of
positive integers and ranges and returns an ordered list of numbers.
:param input_string: A comma-separated string of positive integers and
ranges
:type input_string: str
:return: An ordered list of integers
:rtype: list(int)
"""
output_set = set()
# treat any number of commas and spaces between digits as a delimiter
for part in re.split(r'(?<=\d)[,\s]+(?=\d)', string):
x = re.split(' *- *', part)
if len(x) > 2:
raise TypeError('Problem converting "%s" to integer list' % string)
try:
output_set.update(list(range(int(x[0]), int(x[-1]) + 1)))
except ValueError:
raise TypeError('Problem converting "%s" to integer list' % string)
return sorted(output_set)
[docs]def str2strlist(string):
"""
Split a string into a list of strings. Used in parser.
"""
if not string:
return []
return string.split(',')
[docs]def get_api_key():
"""
Get the user's API key. Uses caching. Also ensures that the user's API
key is only readable by self - raises RuntimeError if anyone else has read
permission.
:rtype: str
:return: User's API key from disk.
"""
global _api_key
if _api_key:
return _api_key
if os.getenv('STU_APIKEY_PATH'):
path = os.getenv('STU_APIKEY_PATH')
else:
paths_to_search = [
os.path.expanduser("~"),
fileutils.get_directory_path(fileutils.APPDATA)
]
if sys.platform == "win32":
msys_home = r"c:\msys64\home\{}".format(os.environ.get("USERNAME"))
paths_to_search.insert(0, msys_home)
paths_to_search.insert(0, msys_home + ".schrodinger")
for dir_path, ext, prefix in itertools.product(paths_to_search, (
".txt",
"",
), (
".",
"",
)):
path = os.path.join(dir_path, prefix + 'stu_apikey' + ext)
if os.path.isfile(path):
break
if not os.path.isfile(path):
msg = ('API Key not available at {path}{apikey_path_msg}. '
'See the STU docs for more information: '
'"{stu_url}/doc/quickstart#stu_apikey".')
if 'STU_APIKEY_PATH' not in os.environ:
apikey_path_msg = ' (and STU_APIKEY_PATH not set)'
else:
apikey_path_msg = ''
raise OSError(
msg.format(
path=path, stu_url=BASE_URL, apikey_path_msg=apikey_path_msg))
if hasattr(os, 'uname'):
permissions = os.stat(path).st_mode
if (permissions & stat.S_IRGRP or permissions & stat.S_IROTH or
permissions & stat.S_IWGRP or permissions & stat.S_IWOTH):
msg = ('API Key ({path}) is accessible by others. Please remove '
'permission for other users to view this file. '
'(e.g. `chmod 600 {path}`). See the STU docs for more '
'information: "{stu_url}/doc/quickstart#stu_apikey"')
raise RuntimeError(msg.format(path=path, stu_url=BASE_URL))
with open(path) as fh:
_api_key = fh.read().strip()
logger.debug(f'Got STU API key from {path}')
return _api_key
[docs]def assert_no_x():
xstub = "/usr/X11/bin/xstub"
if not sys.platform.startswith("darwin"):
return
if os.path.exists(xstub) and not os.path.islink(xstub):
raise AssertionError(
"{} is incompatible with running STU on OS X. Remove file or symlink to /usr/bin/true".
format(xstub))
[docs]def assert_build_id_matches(buildtype, build_id):
"""Check that the build_id is appropriate for the buildtype"""
if buildtype and not build_id:
msg = f'Build ID is required when buildtype is {buildtype}'
raise AssertionError(msg)
if buildtype == 'OB' and not BUILD_ID_RE.match(build_id):
msg = (
'For Official Builds (OB), build_id must be of the format '
'buildXX or build-XXX, where XX/XXX are the last two/three digits of the mmshare '
'version.')
raise AssertionError(msg)
elif buildtype == 'NB' and not (re.match(DATE_RE, build_id) or
BUILD_ID_RE.match(build_id)):
msg = ('For Nightly Builds (NB), build_id must be of the format '
'YYYY-MM-DD or build-XXX.')
raise AssertionError(msg)
[docs]def verify_zip(fileobj):
"""
Attempt to open fileobj as a zipfile.ZipFile and check it for errors using
ZipFile.testzip() (file headers and CRC32 check for all files).
:raise ZipError: If ZipFile.testzip() retuns a non-None value, indicating a
corrupted file.
"""
with zipfile.ZipFile(fileobj, 'r') as zf:
corrupted_file = zf.testzip()
if corrupted_file:
raise ZipError('Generated zip file is invalid: '
f'{corrupted_file} is corrupted')
[docs]def zip_files(fileobj, relative_to, filenames):
"""
Zip a list of files into an archive.
Relative paths are relative to `relative_to`.
"""
zf = zipfile.ZipFile(fileobj, 'w', ZIP_MODE)
start_dir = os.getcwd()
bad_symlinks = []
try:
os.chdir(relative_to)
for f in filenames:
_add_file_to_zip(zf, f, f, bad_symlinks)
finally:
os.chdir(start_dir)
if bad_symlinks:
missing = 'Symlinks with missing destinations: %s' % ', '.join(
bad_symlinks)
raise ZipError(missing)
def _add_file_to_zip(archive, absolute_path, relative_path, bad_symlinks):
if os.path.islink(absolute_path):
if not os.path.exists(absolute_path):
bad_symlinks.append('{} -> {}'.format(relative_path,
os.readlink(absolute_path)))
zfi = zipfile.ZipInfo(relative_path)
# symlink magic numbers, from:
# http://www.mail-archive.com/python-list@python.org/msg34223.html
zfi.create_system = 3
zfi.external_attr = 2716663808
archive.writestr(zfi, os.readlink(absolute_path))
else:
try:
_add_regular_file_to_zip(archive, absolute_path, relative_path)
except ValueError as e:
logger.exception(
f"Could not write {absolute_path} to {relative_path}")
raise
def _add_regular_file_to_zip(archive, absolute_path, relative_path):
with open(absolute_path, 'rb') as fh:
zfi = zipfile.ZipInfo.from_file(absolute_path, arcname=relative_path)
with archive.open(zfi, 'w') as zh:
while True:
chunk = fh.read(1024 * 1024)
if not chunk:
break
zh.write(chunk)
[docs]def zip_directory(dirname, fileobj=None, skipped_files=None):
"""
Zip the contents of a directory. File names will be relative to the
directory name. Preserves symlinks
:type dirname: str
:param dirname: Directory to be zipped
:type fileobj: file-like object or path to file
:param fileobj: Directory will be zipped to this filename or into this
file-like object.
:param set skipped_files: Names of files that should be excluded from
the zip archive.
"""
bad_symlinks = []
missing_files = []
other_errors = []
if fileobj is None:
fileobj = os.path.basename(dirname) + '.zip'
with zipfile.ZipFile(fileobj, 'w', ZIP_MODE) as zf:
for root, dirs, files in os.walk(dirname):
for filename in files:
absname = os.path.join(root, filename)
relname = os.path.relpath(absname, dirname)
if skipped_files and relname in skipped_files:
continue
try:
_add_file_to_zip(zf, absname, relname, bad_symlinks)
except OSError as err:
if err.errno == errno.ENOENT:
# It's OK if temporary jobcontrol files are removed
# during processing
if not re.search(JOBID_RE, filename):
missing_files.append(relname)
elif err.errno in {errno.ENXIO, errno.EOPNOTSUPP}:
# we just avoid adding unix sockets
#raise #continue
continue
else:
other_errors.append(f'{relname}:{err}')
#Allow saving empty directories.
for one_dir in dirs:
absname = os.path.join(root, one_dir)
relname = os.path.relpath(absname, dirname)
zfi = zipfile.ZipInfo(relname + '/')
# give read and write access to the directory
zfi.external_attr = 0o750 << 16
last_modified = os.path.getmtime(absname)
last_modified = datetime.datetime.fromtimestamp(last_modified)
zfi.date_time = last_modified.timetuple()
zf.writestr(zfi, '')
if bad_symlinks:
missing = 'Symlinks with missing destinations: %s' % ', '.join(
bad_symlinks)
raise ZipError(missing)
if missing_files:
missing = 'Files that disappeared while being zipped up : %s' % ', '.join(
missing_files)
raise ZipError(missing)
if other_errors:
missing = 'Files with other problems during zip: \n %s' % '\n '.join(
other_errors)
raise ZipError(missing)