Source code for schrodinger.test.stu.common

"""
Functions that are used in one or more modules. reduces danger of circular
dependencies.

@copyright: Schrodinger, Inc. All rights reserved.
"""

import datetime
import errno
import itertools
import os
import re
import stat
import sys
import zipfile

from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger('stu_backend')

_api_key = None
"""
Module level cache for API key.  Only accessible by one user, so this is OK.
"""

BASE_URL = 'https://stu.schrodinger.com'
"""Address of STU server"""

DATE_RE = r'^\d\d\d\d-\d\d-\d\d$'
"""The date format that our NBs use."""

BUILD_ID_RE = re.compile(r'build(?:\d{2}\b|-\d{3}\b)')
"""Updated build_id naming scheme."""
JOBID_RE = r'-[a-f0-9]{8}$|-[a-f0-9]{8}\.'

ZIP_MODE = zipfile.ZIP_DEFLATED


[docs]class ZipError(Exception): """Error while zipping up files."""
[docs]def str2list(string): """ Takes a string which can be a comma- or space-separated collection of positive integers and ranges and returns an ordered list of numbers. :param input_string: A comma-separated string of positive integers and ranges :type input_string: str :return: An ordered list of integers :rtype: list(int) """ output_set = set() # treat any number of commas and spaces between digits as a delimiter for part in re.split(r'(?<=\d)[,\s]+(?=\d)', string): x = re.split(' *- *', part) if len(x) > 2: raise TypeError('Problem converting "%s" to integer list' % string) try: output_set.update(list(range(int(x[0]), int(x[-1]) + 1))) except ValueError: raise TypeError('Problem converting "%s" to integer list' % string) return sorted(output_set)
[docs]def str2strlist(string): """ Split a string into a list of strings. Used in parser. """ if not string: return [] return string.split(',')
[docs]def get_api_key(): """ Get the user's API key. Uses caching. Also ensures that the user's API key is only readable by self - raises RuntimeError if anyone else has read permission. :rtype: str :return: User's API key from disk. """ global _api_key if _api_key: return _api_key if os.getenv('STU_APIKEY_PATH'): path = os.getenv('STU_APIKEY_PATH') else: paths_to_search = [ os.path.expanduser("~"), fileutils.get_directory_path(fileutils.APPDATA) ] if sys.platform == "win32": msys_home = r"c:\msys64\home\{}".format(os.environ.get("USERNAME")) paths_to_search.insert(0, msys_home) paths_to_search.insert(0, msys_home + ".schrodinger") for dir_path, ext, prefix in itertools.product(paths_to_search, ( ".txt", "", ), ( ".", "", )): path = os.path.join(dir_path, prefix + 'stu_apikey' + ext) if os.path.isfile(path): break if not os.path.isfile(path): msg = ('API Key not available at {path}{apikey_path_msg}. ' 'See the STU docs for more information: ' '"{stu_url}/doc/quickstart#stu_apikey".') if 'STU_APIKEY_PATH' not in os.environ: apikey_path_msg = ' (and STU_APIKEY_PATH not set)' else: apikey_path_msg = '' raise OSError( msg.format( path=path, stu_url=BASE_URL, apikey_path_msg=apikey_path_msg)) if hasattr(os, 'uname'): permissions = os.stat(path).st_mode if (permissions & stat.S_IRGRP or permissions & stat.S_IROTH or permissions & stat.S_IWGRP or permissions & stat.S_IWOTH): msg = ('API Key ({path}) is accessible by others. Please remove ' 'permission for other users to view this file. ' '(e.g. `chmod 600 {path}`). See the STU docs for more ' 'information: "{stu_url}/doc/quickstart#stu_apikey"') raise RuntimeError(msg.format(path=path, stu_url=BASE_URL)) with open(path) as fh: _api_key = fh.read().strip() logger.debug(f'Got STU API key from {path}') return _api_key
[docs]def assert_no_x(): xstub = "/usr/X11/bin/xstub" if not sys.platform.startswith("darwin"): return if os.path.exists(xstub) and not os.path.islink(xstub): raise AssertionError( "{} is incompatible with running STU on OS X. Remove file or symlink to /usr/bin/true". format(xstub))
[docs]def assert_build_id_matches(buildtype, build_id): """Check that the build_id is appropriate for the buildtype""" if buildtype and not build_id: msg = f'Build ID is required when buildtype is {buildtype}' raise AssertionError(msg) if buildtype == 'OB' and not BUILD_ID_RE.match(build_id): msg = ( 'For Official Builds (OB), build_id must be of the format ' 'buildXX or build-XXX, where XX/XXX are the last two/three digits of the mmshare ' 'version.') raise AssertionError(msg) elif buildtype == 'NB' and not (re.match(DATE_RE, build_id) or BUILD_ID_RE.match(build_id)): msg = ('For Nightly Builds (NB), build_id must be of the format ' 'YYYY-MM-DD or build-XXX.') raise AssertionError(msg)
[docs]def verify_zip(fileobj): """ Attempt to open fileobj as a zipfile.ZipFile and check it for errors using ZipFile.testzip() (file headers and CRC32 check for all files). :raise ZipError: If ZipFile.testzip() retuns a non-None value, indicating a corrupted file. """ with zipfile.ZipFile(fileobj, 'r') as zf: corrupted_file = zf.testzip() if corrupted_file: raise ZipError('Generated zip file is invalid: ' f'{corrupted_file} is corrupted')
[docs]def zip_files(fileobj, relative_to, filenames): """ Zip a list of files into an archive. Relative paths are relative to `relative_to`. """ zf = zipfile.ZipFile(fileobj, 'w', ZIP_MODE) start_dir = os.getcwd() bad_symlinks = [] try: os.chdir(relative_to) for f in filenames: _add_file_to_zip(zf, f, f, bad_symlinks) finally: os.chdir(start_dir) if bad_symlinks: missing = 'Symlinks with missing destinations: %s' % ', '.join( bad_symlinks) raise ZipError(missing)
def _add_file_to_zip(archive, absolute_path, relative_path, bad_symlinks): if os.path.islink(absolute_path): if not os.path.exists(absolute_path): bad_symlinks.append('{} -> {}'.format(relative_path, os.readlink(absolute_path))) zfi = zipfile.ZipInfo(relative_path) # symlink magic numbers, from: # http://www.mail-archive.com/python-list@python.org/msg34223.html zfi.create_system = 3 zfi.external_attr = 2716663808 archive.writestr(zfi, os.readlink(absolute_path)) else: try: _add_regular_file_to_zip(archive, absolute_path, relative_path) except ValueError as e: logger.exception( f"Could not write {absolute_path} to {relative_path}") raise def _add_regular_file_to_zip(archive, absolute_path, relative_path): with open(absolute_path, 'rb') as fh: zfi = zipfile.ZipInfo.from_file(absolute_path, arcname=relative_path) with archive.open(zfi, 'w') as zh: while True: chunk = fh.read(1024 * 1024) if not chunk: break zh.write(chunk)
[docs]def zip_directory(dirname, fileobj=None, skipped_files=None): """ Zip the contents of a directory. File names will be relative to the directory name. Preserves symlinks :type dirname: str :param dirname: Directory to be zipped :type fileobj: file-like object or path to file :param fileobj: Directory will be zipped to this filename or into this file-like object. :param set skipped_files: Names of files that should be excluded from the zip archive. """ bad_symlinks = [] missing_files = [] other_errors = [] if fileobj is None: fileobj = os.path.basename(dirname) + '.zip' with zipfile.ZipFile(fileobj, 'w', ZIP_MODE) as zf: for root, dirs, files in os.walk(dirname): for filename in files: absname = os.path.join(root, filename) relname = os.path.relpath(absname, dirname) if skipped_files and relname in skipped_files: continue try: _add_file_to_zip(zf, absname, relname, bad_symlinks) except OSError as err: if err.errno == errno.ENOENT: # It's OK if temporary jobcontrol files are removed # during processing if not re.search(JOBID_RE, filename): missing_files.append(relname) elif err.errno in {errno.ENXIO, errno.EOPNOTSUPP}: # we just avoid adding unix sockets #raise #continue continue else: other_errors.append(f'{relname}:{err}') #Allow saving empty directories. for one_dir in dirs: absname = os.path.join(root, one_dir) relname = os.path.relpath(absname, dirname) zfi = zipfile.ZipInfo(relname + '/') # give read and write access to the directory zfi.external_attr = 0o750 << 16 last_modified = os.path.getmtime(absname) last_modified = datetime.datetime.fromtimestamp(last_modified) zfi.date_time = last_modified.timetuple() zf.writestr(zfi, '') if bad_symlinks: missing = 'Symlinks with missing destinations: %s' % ', '.join( bad_symlinks) raise ZipError(missing) if missing_files: missing = 'Files that disappeared while being zipped up : %s' % ', '.join( missing_files) raise ZipError(missing) if other_errors: missing = 'Files with other problems during zip: \n %s' % '\n '.join( other_errors) raise ZipError(missing)