Source code for schrodinger.application.canvas.utils

# -*- coding: utf-8 -*-
"""
Canvas utility functions.

Copyright Schrodinger, LLC. All rights reserved.

"""

import csv
import os
import stat

from schrodinger.application.licensing.licerror import LicenseException
from schrodinger.infra.canvas import ChmLicenseFull
from schrodinger.infra.canvas import ChmLicenseFullOrMain
from schrodinger.infra.canvas import ChmLicenseMain
from schrodinger.infra.canvas import ChmLicenseShared
from schrodinger.utils import csv_unicode

(LICENSE_FULL, LICENSE_SHARED, LICENSE_FULL_OR_MAIN,
 LICENSE_MAIN) = ('LICENSE_FULL', 'LICENSE_SHARED', 'LICENSE_FULL_OR_MAIN',
                  'LICENSE_MAIN')

shared_lic = None
full_or_main_lic = None
full_lic = None


[docs]def strip_r(file_name): inputf = open(file_name, 'rb') original = inputf.read() inputf.close() converted = original.replace(b'\r', b'') os.chmod(file_name, stat.S_IRWXU) outputf = open(file_name, 'wb') outputf.write(converted) outputf.close()
[docs]class chm_excel(csv.excel): delimiter = ',' quotechar = '"' doublequote = True skipinitialspace = False lineterminator = '\n' quoting = csv.QUOTE_MINIMAL
[docs]class chm_bluebird(csv.excel): delimiter = '\t' quotechar = '"' doublequote = True skipinitialspace = False lineterminator = '\t\n' quoting = csv.QUOTE_NONE
[docs]class ChmCSVTools:
[docs] def __init__( self, default_format_in=None, default_format_out=None, case_sensitive=True, ): self.default_format_in = default_format_in self.default_format_out = default_format_out self.case_sensitive = False if not self.default_format_in: self.default_format_in = chm_excel() if not self.default_format_out: self.default_format_out = chm_bluebird()
[docs] def dictValues( self, dict, key_list=[], # noqa: M511 action_remove=False, ): keeplist = [] if action_remove: for (key, value) in dict.items(): cmpkey = (key.lower() if isinstance(key, str) and self.case_sensitive else key) found = False for h in key_list: cmph = (h.lower() if not self.case_sensitive and isinstance(h, str) else h) if cmpkey == cmph: found = True break if not found: keeplist.append(value) else: for (key, value) in dict.items(): cmpkey = (key.lower() if isinstance(key, str) and self.case_sensitive else key) found = False for h in key_list: cmph = (h.lower() if not self.case_sensitive and isinstance(h, str) else h) if cmpkey == cmph: keeplist.append(value) break return keeplist
[docs] def dictKeys( self, dict, key_list=[], # noqa: M511 action_remove=False, ): keeplist = [] if action_remove: for (key, value) in dict.items(): cmpkey = (key.lower() if isinstance(key, str) and self.case_sensitive else key) found = False for h in key_list: cmph = (h.lower() if not self.case_sensitive and isinstance(h, str) else h) if cmpkey == cmph: found = True break if not found: keeplist.append(key) else: for (key, value) in dict.items(): cmpkey = (key.lower() if isinstance(key, str) and self.case_sensitive else key) found = False for h in key_list: cmph = (h.lower() if not self.case_sensitive and isinstance(h, str) else h) if cmpkey == cmph: keeplist.append(key) break return keeplist
[docs] def getHeader( self, source_file, dialect=None, one_based=False, ): header = {} dialect = (self.default_format_in if not dialect else dialect) with csv_unicode.reader_open(source_file) as source: rdr = csv.reader(source, dialect) pos = (0 if not one_based else 1) while True: try: row = next(rdr) for col in row: header[col] = pos pos = pos + 1 break except StopIteration: pass return header
[docs] def rewrite( self, source_file, dest_file, header_substitutions={}, # noqa: M511 dialect=None, ): dialect = (self.default_format_in if not dialect else dialect) with csv_unicode.reader_open(source_file) as source, \ csv_unicode.writer_open(dest_file) as dest: rdr = csv.reader(source, dialect) wtr = csv.writer(dest, dialect) try: row = next(rdr) newrow = [] for col in row: cmpcol = (col.lower() if not self.case_sensitive and isinstance(col, str) else col) found = False for (key, value) in header_substitutions.items(): cmpkey = (key.lower() if not self.case_sensitive and isinstance(key, str) else key) if cmpcol == cmpkey: newrow.append(value) found = True break if not found: newrow.append(col) wtr.writerow(newrow) except StopIteration: pass while True: try: wtr.writerow(next(rdr)) except StopIteration: break
[docs] def rewriteByIndex( self, source_file, dest_file, column_indices=[], # noqa: M511 dialect_in=None, dialect_out=None, one_based=False, action_remove=False, ): dialect_in = \ (self.default_format_in if not dialect_in else dialect_in) dialect_out = \ (self.default_format_out if not dialect_out else dialect_out) offset = (0 if not one_based else 1) with csv_unicode.reader_open(source_file) as source, \ csv_unicode.writer_open(dest_file) as dest: rdr = csv.reader(source, dialect_in) wtr = csv.writer(dest, dialect_out) if action_remove: for row in rdr: newrow = [] for index in range(0, len(row)): if index + offset not in column_indices: newrow.append(row[index]) wtr.writerow(newrow) else: for row in rdr: newrow = [] for index in column_indices: newrow.append(row[index - offset]) wtr.writerow(newrow)
[docs] def mergeByIndex( self, source_file1, source_file2, dest_file, column_indices_file1=[], # noqa: M511 column_indices_file2=[], # noqa: M511 dialect_in1=None, dialect_in2=None, dialect_out=None, one_based=False, action_remove=False, ): dialect_in1 = \ (self.default_format_in if not dialect_in1 else dialect_in1) dialect_in2 = \ (self.default_format_in if not dialect_in2 else dialect_in2) dialect_out = \ (self.default_format_out if not dialect_out else dialect_out) with csv_unicode.reader_open(source_file1) as source1, \ csv_unicode.reader_open(source_file2) as source2, \ csv_unicode.writer_open(dest_file) as dest: rdr1 = csv.reader(source1, dialect_in1) rdr2 = csv.reader(source2, dialect_in2) wtr = csv.writer(dest, dialect_out) offset = (0 if not one_based else 1) while True: try: r1 = next(rdr1) r2 = next(rdr2) if action_remove: newrow = [] for index in range(0, len(r1)): if index + offset not in column_indices_file1: newrow.append(r1[index]) for index in range(0, len(r2)): if index + offset not in column_indices_file2: newrow.append(r2[index]) wtr.writerow(newrow) else: newr1 = [] newr2 = [] for c1 in column_indices_file1: newr1.append(r1[c1 - offset]) for c2 in column_indices_file2: newr2.append(r2[c2 - offset]) wtr.writerow(newr1 + newr2) except StopIteration: break
[docs] def mergeByName( self, source_file1, source_file2, dest_file, column_names1=[], # noqa: M511 column_names2=[], # noqa: M511 dialect_in1=None, dialect_in2=None, dialect_out=None, action_remove=False, ): one_based = False dialect_in1 = \ (self.default_format_in if not dialect_in1 else dialect_in1) dialect_in2 = \ (self.default_format_in if not dialect_in2 else dialect_in2) dialect_out = \ (self.default_format_out if not dialect_out else dialect_out) column_indices_file1 = \ self.dictValues(self.getHeader(source_file1, dialect_in1, one_based), column_names1, not action_remove) column_indices_file2 = \ self.dictValues(self.getHeader(source_file2, dialect_in2, one_based), column_names2, not action_remove) return self.mergeByIndex( source_file1, source_file2, dest_file, column_indices_file1, column_indices_file2, dialect_in1, dialect_in2, dialect_out, one_based, action_remove, )
[docs]def get_license(license_type=LICENSE_FULL): """ Instantiate a valid Canvas license object or raise an Exception. :type license_type: A module-level constant: LICENSE_FULL, LICENSE_SHARED, or LICENSE_FULL_OR_MAIN. Default is LICENSE_FULL. :param license_type: The type of license to request. :raises: Exception when the license_type isn't recognized or the requested license is not valid. """ #Due to CANVAS-4669, we allow all access to canvaslibs via python global shared_lic if not shared_lic: shared_lic = ChmLicenseShared(False) if not shared_lic.isValid(): raise LicenseException( "The requested Canvas license (type '%s') is not present or not valid." % "ChmLicenseShared") return shared_lic