"""
PathFinder helper functions for reading and writing files using RDKit Mol
objects.
"""
import copy
import csv
import gzip
import io
import json
import os
import shutil
import tempfile
import zipfile
import more_itertools
from rdkit import Chem
from schrodinger import structure
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils.fileutils import open_maybe_compressed
logger = log.get_output_logger('pathfinder')
# Filename extension for PathFinder reactant files
PFX = '.pfx'
METADATA = 'metadata.json'
STRUCTURES = 'structures.csv'
[docs]class MolWriter(structure.StructureWriter):
"""
Write Mol objects to a file using a StructureWriter-like API, optionally
generating 3D coordinates.
"""
[docs] def __init__(self,
filename,
generate_coordinates=True,
require_stereo=False):
super(MolWriter, self).__init__(filename)
self.generate_coordinates = generate_coordinates
self.require_stereo = require_stereo
[docs] def append(self, mol):
st = rdkit_adapter.from_rdkit(mol)
if self.generate_coordinates:
st.generate3dConformation(require_stereo=self.require_stereo)
super(MolWriter, self).append(st)
[docs]class CsvMolReader:
"""
Read a SMILES CSV file, returning Mol objects.
This is similar to RDKit's SmilesMolSupplier with delimiter=',', except that
it uses the csv module instead of naively splitting on commas. This makes it
possible to have field values containing commas, as long as they are quoted
following the CSV convention. Note, however, that multi-line records are
still not supported for efficiency reasons.
Also, gzip-compressed files (identified by the filename ending in "gz") are
supported.
A CsvMolReader supports random access, like a list. Upon instantiation, the
file is read in full and kept in memory. For a CSV file having only SMILES
and an ID, this takes about 100 MB per million entries.
"""
[docs] def __init__(self, filename):
if hasattr(filename, 'read'):
fh = filename
else:
fh = open_maybe_compressed(filename, 'rt')
with fh:
header = fh.readline()
self.lines = fh.readlines()
self.fieldnames = next(csv.reader([header]))
SKIPPED = {'SMILES', 'NAME', ''}
self.propnames = [f for f in self.fieldnames if f not in SKIPPED]
[docs] def __len__(self):
return len(self.lines)
def __getitem__(self, index):
line = self.lines[index]
vals = next(csv.reader([line]))
row = dict(zip(self.fieldnames, vals))
mol = Chem.MolFromSmiles(row['SMILES'])
if mol is None:
return None
for prop in self.propnames:
if row[prop]:
mol.SetProp(prop, row[prop])
for prop in ['NAME', 's_m_title']:
if prop in row:
mol.SetProp('_Name', row[prop])
break
return mol
[docs]class CsvMolWriter:
"""
Write a CSV file given Mol objects, using a StructureWriter-like API. The
first two columns are the SMILES and title, and the rest are the properties
of the molecule.
- We don't use structure.SmilesCsvWriter because it is too slow due to all
the conversions (the overall job takes 4 times as long, so the bottleneck
clearly becomes the writing of the output file!).
- We don't use Chem.SmilesWriter because even though it can use comma as a
delimiter, it doesn't write proper CSV files because it doesn't know how
to escape the delimiter.
Also, gzip-compressed files (identified by the filename ending in "gz") are
supported.
"""
[docs] def __init__(self, filename, properties=None, cxsmiles=False):
"""
:param filename: file to write
:type filename: str or file-like object
:param properties: optional, list of names of properties to write to
output file. If None, all the properties are written. (CAVEAT: if
`filename` is a file object rather than an actual filename, only the
properties present in the first molecule are written.)
:type properties: list of str or None
:param cxsmiles: when writing SMILES, use CXSMILES extensions
:type cxsmiles: bool
"""
if hasattr(filename, 'write'):
self.fh = filename
self.unionize_props = False
else:
self.fh = open_maybe_compressed(filename, 'wt', newline='')
self.unionize_props = properties is None
self.filename = filename
_, self.suffix = fileutils.splitext(self.filename)
self._writer = None
self.written_count = 0
self.properties = properties
self.cxsmiles = cxsmiles
self.tmpfiles = []
[docs] def append(self, mol):
"""
Write a molecule to the file. The first time this is called, the header
row is written based on mol's properties or the properties passed to
__init__, if any.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
"""
props = self._getProps(mol)
props_list = list(props)
if (self._writer and self.unionize_props and
props_list != self.properties):
self.properties = props_list
self._openTmp()
if self._writer is None:
if self.properties is None:
self.properties = list(props)
self._initWriter(self.properties)
props['SMILES'] = self.toSmiles(mol)
props['NAME'] = mol.GetProp('_Name')
self._writer.writerow(props)
self.written_count += 1
[docs] def toSmiles(self, mol):
if self.cxsmiles:
# Remove atom properties added by reaction because they are fairly
# useless in a CXSmiles and take a lot of space.
new_mol = remove_react_atom_props(mol)
return Chem.MolToCXSmiles(new_mol)
else:
return Chem.MolToSmiles(mol)
def _getProps(self, mol):
"""
Return a dictionary of molecule properties after some munging. Property
names are renamed to follow the Schrodinger convention, and float values
are rounded for cosmetic reasons.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:return: molecule properties
:rtype: dict
"""
raw_props = rdkit_adapter.translate_rdkit_props_dict(
mol.GetPropsAsDict())
# We reduce float precision because RDKit produces values such as
# 320.41100000000006 where we would rather see 320.411.
props = {}
for name, val in raw_props.items():
if name.startswith('r_'):
props[name] = round(val, 6)
else:
props[name] = val
return props
def _initWriter(self, props):
"""
Initialize the underlying CSV writer, using 'props' to write the header
row. "SMILES" and "Name" are always added as the first two columns.
:param props: property names
:type props: iterable of str
"""
fields = ['SMILES', 'NAME'] + sorted(props)
self._writer = csv.DictWriter(self.fh, fields, extrasaction='ignore')
self._writer.writeheader()
def _openTmp(self):
logger.debug(f'Extended properties: {self.properties}')
with tempfile.NamedTemporaryFile(
dir='.', suffix=self.suffix, delete=False) as fh:
tmpname = fh.name
self.fh.close()
self._writer = None
self.fh = open_maybe_compressed(tmpname, 'wt', newline='')
self.tmpfiles.append(tmpname)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.fh.close()
if self.tmpfiles:
self._mergeTmpfiles()
def _mergeTmpfiles(self):
with tempfile.NamedTemporaryFile(
dir='.', suffix=self.suffix, delete=False) as fh:
tmpname = fh.name
fileutils.force_rename(self.filename, tmpname)
self.tmpfiles.insert(0, tmpname)
logger.debug(f'Merging tmpfiles: {self.tmpfiles}')
csvcat_union(self.tmpfiles, self.filename)
fileutils.force_remove(*self.tmpfiles)
[docs]class PfxMolReader:
"""
Reader for PFX (PathFinder reactants) files. These are really zip archives
containing a CSV file and a metadata JSON file.
Like CsvMolReader, PfxMolReader supports random access, like a list. Upon
instantiation, the file is read in full and kept in memory. For a file
having only SMILES and an ID, this takes about 100 MB per million entries.
"""
[docs] def __init__(self, filename):
self.zipfh = zipfile.ZipFile(filename, 'r')
fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES))
self.csv_mol_reader = CsvMolReader(fh)
[docs] def __len__(self):
return len(self.csv_mol_reader)
def __getitem__(self, index):
return self.csv_mol_reader[index]
[docs]class PfxMolWriter:
"""
Writer for PFX (PathFinder reactants) files. These are really zip archives
containing a CSV file and a metadata JSON file.
"""
[docs] def __init__(self, filename, properties=None):
"""
:param filename: file to write
:type filename: str
:param properties: optional, list of names of properties to write to
output file. If None, all the properties present on the first
structure will be written (the assumption is that all molecules
will have the same properties, or at least that the first
molecule has all the properties that we care about).
:type properties: list of str or None
"""
self.zipfh = zipfile.ZipFile(
filename, 'w', compression=zipfile.ZIP_DEFLATED)
fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES, 'w'), newline='')
self.csv_mol_writer = CsvMolWriter(fh, properties)
[docs] def append(self, mol):
"""
Write a molecule to the file.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
"""
self.csv_mol_writer.append(mol)
@property
def written_count(self):
return self.csv_mol_writer.written_count
def _writeMetadata(self):
metadata = {'size': self.written_count}
with io.TextIOWrapper(self.zipfh.open(METADATA, 'w')) as fh:
json.dump(metadata, fh)
fh.write('\n')
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.csv_mol_writer.close()
self._writeMetadata()
self.zipfh.close()
[docs]class RdkitMolWriter:
"""
Write Mol objects to a file using the RDKit file-writing classes, but
with a StructureWriter-like API. Supports SMILES and SDF.
"""
[docs] def __init__(self, filename, v3000=False):
"""
:param filename: filename to write
:type filtename: str
:param v3000: when writing SD, force the use of the V3000 format
:type V3000: bool
"""
self.rdkit_writer = None
if fileutils.is_gzipped_structure_file(filename):
self.fh = gzip.open(filename, 'w')
else:
self.fh = open(filename, 'w')
if fileutils.is_sd_file(filename):
self.rdkit_writer = Chem.SDWriter(self.fh)
self.rdkit_writer.SetForceV3000(v3000)
elif fileutils.is_smiles_file(filename):
self.rdkit_writer = Chem.SmilesWriter(
self.fh, includeHeader=False, isomericSmiles=True)
else:
raise ValueError("Unsupported output file type.")
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
@property
def written_count(self):
return self.rdkit_writer.NumMols()
[docs] def append(self, mol):
self.rdkit_writer.write(mol)
[docs] def close(self):
self.rdkit_writer.close()
self.fh.close()
[docs]def get_mol_writer(filename,
generate_coordinates=True,
require_stereo=False,
v3000=False,
cxsmiles=False):
"""
Return a StructureWriter-like object based on the command-line arguments.
RDkit is used for non-Maestro formats.
:param filename: filename to write
:type filtename: str
:param generate_coordinates: generate 3D coordinates (non-SMILES formats)
:type generate_coordinates: bool
:param require_stereo: when generating coordinates, fail when there's
unspecified stereochemistry, instead of producing an arbitrary isomer
:type require_stereo: bool
:param v3000: when writing SD, force the use of the V3000 format
:type V3000: bool
:param cxsmiles: when writing SMILES, use CXSMILES extensions
:type cxsmiles: bool
"""
if fileutils.is_maestro_file(filename):
return MolWriter(
filename,
generate_coordinates=generate_coordinates,
require_stereo=require_stereo)
elif fileutils.is_csv_file(filename) or is_csvgz(filename):
return CsvMolWriter(filename, cxsmiles=cxsmiles)
else:
return RdkitMolWriter(filename, v3000=v3000)
[docs]def get_mol_reader(filename, skip_bad=True, implicitH=True):
"""
Return a Mol reader given a filename or a SMILES string. For .smi and .csv
files, use the RDKit SmilesMolSupplier; for other formats, use
StructureReader but convert Structure to Mol before yielding each molecule.
Whenever possible, the reader will be a Sequence. This is the currently the
case for .smi and .csv files when skip_bad is False. (And for a SMILES
string, which returns a list of size 1.)
:param skip_bad: if True, bad structures are skipped implicitly, instead
of being yielded as None (only applies to SMILES and CSV formats.)
:type skip_bad: bool
:param implicitH: use implicit hydrogens (only has an effect when reading
Maestro files)
:type implicitH: bool
:rtype: Generator or Sequence of Mol
"""
if os.path.isfile(filename):
if is_pfx(filename):
format = PFX
elif is_csvgz(filename):
format = fileutils.SMILESCSV
else:
format = fileutils.get_structure_file_format(filename)
logger.debug("Opening %s", filename)
if format == fileutils.MAESTRO:
reader = structure.StructureReader(filename)
return adapt_st_reader(reader, implicitH)
if format == fileutils.SMILES:
supp = Chem.SmilesMolSupplier(
filename, delimiter=' ', titleLine=False, nameColumn=1)
elif format == fileutils.SMILESCSV:
supp = CsvMolReader(filename)
elif format == PFX:
supp = PfxMolReader(filename)
elif format == fileutils.SD:
supp = Chem.SDMolSupplier(filename)
else:
raise ValueError(f"Unsupported file format: {format}")
if skip_bad:
return (m for m in supp if m is not None) # filter out errors
else:
len(supp) # strangely needed before operations such as list()
return supp
else:
mol = Chem.MolFromSmiles(filename)
if mol is None:
raise IOError(f"'{filename}' must be either a valid filename "
"or a valid SMILES")
return [mol]
[docs]def get_mol(target, implicitH=True):
"""
Read a Mol from a file or a SMILES string.
:param target: filename or SMILES
:type target: str
:param implicitH: use implicit hydrogens (only has an effect when reading
Maestro files)
:type implicitH: bool
:rtype: rdkit.Chem.Mol
"""
reader = get_mol_reader(target, skip_bad=False, implicitH=implicitH)
return next(iter(reader))
[docs]def adapt_st_reader(reader, implicitH=True):
"""
Generate RDKit Mol objects given a StructureReader-like object.
Structures which cause conversion errors are skipped but a warning is
logged.
:param reader: source of structures to convert
:type reader: iterable of Structure
:param implicitH: use implicit hydrogens
:type implicitH: bool
:return: converted RDKit molecule objects
:rtype: generator of Mol
"""
for st in reader:
try:
yield rdkit_adapter.to_rdkit(
st, implicitH=implicitH, include_coordinates=False)
except (ValueError, RuntimeError) as e:
logger.warning(e)
[docs]def combine_output_files(outfiles, out, dedup=True):
"""
Write the final output file.
:param outfiles: subjob output filenames
:type outfiles: list of str
:param out: output filename
:type out: list of str
:param dedup: skip duplicate products
:type dedup: bool
"""
logger.info("Combining subjob output files into %s." % out)
if dedup:
logger.info(
"Duplicate products will be removed, possibly resulting "
"in a smaller number of products than originally requested.")
missing, existing = map(list,
more_itertools.partition(os.path.isfile, outfiles))
if missing:
logger.warning('Missing output files:')
for fname in missing:
logger.warning(f' {fname}')
n = None
if fileutils.is_csv_file(out) or is_csvgz(out):
dedup_field = 'SMILES' if dedup else None
n = csvcat_union(existing, out, dedup_field)
elif not dedup:
fileutils.cat(existing, out)
elif fileutils.is_smiles_file(out):
n = dedup_smiles_cat(existing, out)
else:
n = dedup_st_cat(existing, out)
if n is not None:
logger.info(f'Wrote {n} structures.')
[docs]def dedup_smiles_cat(source_filenames, dest_filename):
"""
Concatenate the given SMILES files, with deduplication.
:param source_filenames: input files
:type source_filenames: sequence of str
:param dest_filename: destination file
:type dest_filename: str
:return: number of structures written
:rtype: int
"""
seen = set()
with open(dest_filename, 'w', newline='') as fout:
for fname in source_filenames:
with open(fname) as fin:
for line in fin:
smiles = line.split()[0]
if smiles not in seen:
seen.add(smiles)
fout.write(line)
return len(seen)
[docs]def dedup_st_cat(source_filenames, dest_filename):
"""
Concatenate the given structure files, with deduplication. This uses
StructureReader / StructureWriter so any file format supported by those
classes may be used.
:param source_filenames: input files
:type source_filenames: sequence of str
:param dest_filename: destination file
:type dest_filename: str
:return: number of structures written
:rtype: int
"""
seen = set()
smiles_generator = smiles_mod.SmilesGenerator()
with structure.StructureWriter(dest_filename) as writer:
for fname in source_filenames:
with structure.StructureReader(fname) as reader:
for st in reader:
smiles = smiles_generator.getSmiles(st)
if smiles not in seen:
seen.add(smiles)
writer.append(st)
return len(seen)
[docs]def csvcat(source_filenames, dest_filename, dedup):
"""
Concatenate the contents of the CSV source files, writing them to a
destination CSV file, but making sure that the header row only appears once.
Each input file is assumed to have a header row, and the headers must be
consistent or an exception will be raised.
:param source_filenames: input files
:type source_filenames: sequence of str
:param dest_filename: destination file
:type dest_filename: str
:param dedup: skip duplicate products, using first column as key
:type dedup: bool
:return: number of structures written
:rtype: int
:raises: ValueError if headers are inconsistent
"""
# We use the csv module in case a property has a newline
# or something crazy like that.
seen = set()
nwritten = 0
with open_maybe_compressed(dest_filename, 'wt', newline='') as fout:
writer = csv.writer(fout)
for ifile, fname in enumerate(source_filenames):
with open_maybe_compressed(fname, 'rt') as fin:
reader = csv.reader(fin)
for iline, row in enumerate(reader):
if iline > 0:
smiles = row[0]
if not dedup:
writer.writerow(row)
nwritten += 1
elif smiles not in seen:
seen.add(smiles)
nwritten += 1
writer.writerow(row)
elif ifile == 0:
# First header row.
writer.writerow(row)
header = row
else:
if row != header:
msg = "Inconsistent header: {} != {}".format(
fname, source_filenames[0])
raise ValueError(msg)
return nwritten
[docs]def csvcat_union(source_filenames, dest_filename, dedup_field=None):
"""
Concatenate the contents of the CSV source files, writing them to a
destination CSV file, but making sure that the header row only appears once.
Each input file is assumed to have a header row. If the headers are
inconsistent, the output file contains the union of the columns from the
input files, with empty values in rows coming from a file which didn't have
a given column.
:param source_filenames: input files
:type source_filenames: sequence of str
:param dest_filename: destination file
:type dest_filename: str
:param dedup_field: skip duplicate products using the named field as key. If
not truthy, no deduplication is performed.
:type dedup_field: str or NoneType
:return: number of structures written
:rtype: int
"""
seen = set()
nwritten = 0
fieldnames = get_fieldnames(source_filenames)
with open_maybe_compressed(dest_filename, 'wt', newline='') as fout:
writer = csv.DictWriter(fout, fieldnames)
writer.writeheader()
for fname in source_filenames:
with open_maybe_compressed(fname, 'rt') as fin:
reader = csv.DictReader(fin)
for row in reader:
if not dedup_field:
writer.writerow(row)
nwritten += 1
else:
smiles = row[dedup_field]
if smiles not in seen:
seen.add(smiles)
nwritten += 1
writer.writerow(row)
return nwritten
[docs]def get_fieldnames(filenames):
"""
Return a list with the union of the field names from all the given CSV
files. The field names are listed in the order in which they were first
seen. (First all the fields from file #1, then the "new" field names from
file #2, etc.)
:param filenames: list of CSV files
:type filenames: [str]
:return: list of field names
:rtype: [str]
"""
fieldnames = {}
for fname in filenames:
with open_maybe_compressed(fname, 'rt') as fin:
reader = csv.reader(fin)
try:
row = next(reader)
except StopIteration:
row = []
fieldnames.update({name: None for name in row})
return list(fieldnames.keys())
[docs]def is_csvgz(filename):
lcfname = filename.lower()
return (lcfname.endswith('.csv.gz') or lcfname.endswith('.csvgz'))
[docs]def is_pfx(filename):
return filename.lower().endswith(PFX)
[docs]def get_pfx_size(filename):
"""
Return the size from the metadata header of a .pfx file.
"""
with zipfile.ZipFile(filename) as zipfh:
jsonstr = zipfh.read(METADATA)
metadata = json.loads(jsonstr)
return metadata['size']
[docs]def remove_react_atom_props(mol):
"""
Return a copy of `mol` where atom properties added by the RDKit reaction
module have been stripped out.
:param mol: input molecule; not modified
:type mol: rdkit.Chem.Mol
:return: modified molecule
:rtype: rdkit.Chem.Mol
"""
new_mol = copy.copy(mol)
react_props = ['react_atom_idx', 'old_mapno']
for atom in new_mol.GetAtoms():
for prop in react_props:
atom.ClearProp(prop)
return new_mol
[docs]def copy_csv_file(input_file, output_file):
"""
Copy compressed or uncompressed input .csv file to another .csv file.
Output file can also be compressed or uncompressed.
:param input_file: input file name
:type input_file: str
:param output_file: output file name
:type output_file: str
"""
with open_maybe_compressed(input_file, 'rb') as f_in:
with open_maybe_compressed(output_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)