Source code for schrodinger.application.livedesign.export_models

import collections
import enum
import os.path
from typing import List
from typing import Set

from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.utils.fileutils import force_remove

from . import constants
from . import data_classes
from . import entry_types as ets
from . import ld_utils
from . import login

PROPNAME_COMPOUND_ID = constants.PROPNAME_COMPOUND_ID
PROPNAME_CORP_ID = constants.PROPNAME_CORP_ID
PROPNAME_IMPORT_ENTITY_ID = constants.PROPNAME_IMPORT_ENTITY_ID

LD_DATA_3D = data_classes.LDData(
    user_name=constants.USERNAME_3D_DATA,
    family_name=constants.FAMNAME_3D_DATA,
    requires_3d=True)

RefreshResult = enum.Enum('RefreshResult', ('none', 'success', 'failure'))


class MatchCompoundsBy(enum.Enum):
    structure = 'Structure or Imported Corporate ID'
    corp_id = 'Corporate ID'

    def __str__(self):
        return self.value


class LDDestination(parameters.CompoundParam):
    """
    Parameters specifying the destination of the exported data, both LiveDesign
    server and live report.
    """

    host: str
    proj_id: str
    proj_name: str
    lr_id: str
    lr_name: str


class DataExportSpec(parameters.CompoundParam):
    """
    Abstract specification for uploading data to a LiveDesign server.
    """

    data_name: str
    ld_model: str
    ld_endpoint: str
    units: str
    option: object

    def addDataToExportTask(self, task):
        """
        Update the provided task with data from this specification.

        Must be overridden in concrete subclasses.

        :param task: an export task
        :type task: tasks.ThreadFunctionTask
        """

        raise NotImplementedError


class PropertyExportSpec(DataExportSpec):
    """
    Specification for structure property data.
    """

    def addDataToExportTask(self, task):
        """
        Update the provided task `prop_dicts` attribute with data from this
        specification.

        :param task: an export task
        :type task: tasks.ThreadFunctionTask
        """

        prop_dict = make_prop_dict(
            units=self.units,
            name=self.getName(),
            endpoint=self.ld_endpoint,
            model=self.ld_model)
        task.prop_dicts += [prop_dict]

    def getName(self):
        """
        :return: the appropriate value for the "name" field for this property's
                property dictionary during export to LiveDesign
        :rtype: str
        """

        if ld_utils.is_sd_dataname(self.data_name):
            # SD files store SD data according to a format different
            # from the standard structure property data name; they should be
            # formatted as if they were a user name, e.g. "All IDs" rather than
            # "s_sd_All_IDs"
            property_name = structure.PropertyName(dataname=self.data_name)
            return property_name.userName()
        else:
            return self.data_name


class Base3DExportSpec(DataExportSpec):
    """
    Abstract specification for 3D structure data.
    """

    def addDataToExportTask(self, task, rl_map, corp_id_match_prop=None):
        """
        Update the provided task `three_d_export_items` attribute with data
        from this specification.

        :param task: an export task
        :type task: tasks.ThreadFunctionTask
        :param rl_map: a receptor-ligand map
        :type rl_map: data_classes.ReceptorLigandMap
        :param corp_id_match_prop: optionally, a property that stores the
            corporate ID that should be used to store these 3D items on LD, if
            any
        :type corp_id_match_prop: str or NoneType
        """

        items = self._prepare3DExportItems(
            rl_map, corp_id_match_prop=corp_id_match_prop)
        task.input.three_d_export_items.extend(items)

    def _prepare3DExportItems(self, rl_map, corp_id_match_prop=None):
        """
        Generate a list of 3D export items according to this specification.

        Must be overridden in concrete subclasses.

        :param rl_map: a receptor-ligand map
        :type rl_map: data_classes.ReceptorLigandMap
        :param corp_id_match_prop: optionally, a property that stores the
            corporate ID that should be used to store these 3D items on LD, if
            any
        :type corp_id_match_prop: str or NoneType
        :return: a list of 3D items for export
        :rtype: list[ThreeDExportItem]
        """

        raise NotImplementedError

    def _getExportKey(self, ligand, rl_group, corp_id_match_prop=None):
        """
        If possible, determine the key for the specified ligand.

        :param ligand: the ligand to be exported
        :type ligand: structure.Structure
        :param rl_group: the receptor-ligand group to which the ligand belongs
        :type rl_group: data_classes.ReceptorLigandGroup
        :param corp_id_match_prop: optionally, a structure property data name
            that specifies the source of the corporate ID for each compound
        :type corp_id_match_prop: str or NoneType
        :return: the appropriate key to use for this ligand, if any
        :rtype: str or structure.Structure or NoneType
        """

        if rl_group.ligand and ligand != rl_group.ligand:
            # If the ligand is not the primary ligand for this RL group, use the
            # primary ligand as they key so that it gets grouped with the
            # primary ligand
            return rl_group.ligand
        key = None
        if corp_id_match_prop:
            # If the user specifies a property from which to obtain the
            # corporate ID, get the corporate ID from that property
            key = rl_group.ligand.property.get(corp_id_match_prop)
            # Corporate ID when available should only be a string value.
            key = None if key is None else str(key)
        if key is None and ld_utils.st_matches_round_trip_hash(rl_group.ligand):
            # If no corporate ID has been specified but the ligand structure was
            # downloaded from LD, attempt to match the ligand with its
            # source compound on LD
            key = rl_group.ligand.property.get(PROPNAME_IMPORT_ENTITY_ID)
        return key


class Standard3DExportSpec(Base3DExportSpec):
    """
    Specification for standard 3D export.
    """

    def _prepare3DExportItems(self, rl_map, corp_id_match_prop=None):
        """
        Generate a list of 3D export items according to this specification.

        :param rl_map: a receptor-ligand map
        :type rl_map: data_classes.ReceptorLigandMap
        :param corp_id_match_prop: optionally, a property that stores the
            corporate ID that should be used to store these 3D items on LD, if
            any
        :type corp_id_match_prop: str or NoneType
        :return: a list of 3D items for export
        :rtype: list[ThreeDExportItem]
        """

        items = []
        for rl_group in rl_map.rl_groups:
            item = ThreeDExportItem()
            item.ligand = rl_group.alt_ligand or rl_group.ligand
            item.receptor = rl_group.receptor
            key = self._getExportKey(item.ligand, rl_group, corp_id_match_prop)
            item.setItemKey(key)
            item.three_d_specs = [self]
            items.append(item)
        return items


class FFCExportSpec(DataExportSpec):
    """
    Abstract specification for FFC attachment export.
    """

    DESCRIPTION = NotImplemented

    def _getColumnName(self):
        """
        :return: the name of the column to which this data should be exported
        :rtype: str
        """

        name = self.ld_model
        if self.ld_endpoint:
            name += f' {self.ld_endpoint}'
        return name

    def _getRemoteFileName(self):
        """
        Return the name under which this attachment should be stored on the LD
        server.

        Must be overridden in concrete subclasses.

        :return: the remote file name
        :rtype: str
        """

        raise NotImplementedError

    def _getAttachmentFile(self):
        """
        Return the path to the file to upload to LD as an attachment.

        Must be overridden in concrete subclasses.

        :return: the attachment file path
        :rtype: str
        """

        raise NotImplementedError

    def getAttachmentData(self, panel_model):
        """
        Retrieve and store attachment data from the panel model.

        Must be overridden in concrete subclasses.

        :param panel_model: the model for the LD Export panel
        :type panel_model: ld_export2.ExportModel
        """

        raise NotImplementedError

    def addDataToExportTask(self, task):
        """
        Update the provided task `attachment_data_map` attribute with data from
        this specification.

        :param task: an export task
        :type task: tasks.ThreadFunctionTask
        """

        data = AttachmentData()
        data.remote_file_name = self._getRemoteFileName()
        data.file_path = self._getAttachmentFile()
        data.description = self.DESCRIPTION
        col_name = self._getColumnName()
        task.attachment_data_map[col_name] = data

    def removeLocalFile(self):
        """
        Remove the file created by this spec, if it still exists.
        """

        file_path = self._getAttachmentFile()
        if os.path.isfile(file_path):
            force_remove(file_path)


class ThreeDExportItem(parameters.CompoundParam):
    """
    Parameters specifying 3D structure data export.

    :ivar key: the identification key for this 3D item. It should either be
        1. a structure, if its corporate ID should be the same as that assigned
           to a structure that has not yet been exported
        2. a string, if its corporate ID is known
        3. `None`, if its corporate ID should be automatically assigned by LD
    :vartype key: structure.Structure or str or NoneType
    """

    key: object
    ligand: structure.Structure = None
    receptor: structure.Structure = None
    three_d_specs: List[Base3DExportSpec]

    def setItemKey(self, key):
        """
        Assign the key to be used to identify where this item should be stored
        on a LiveDesign server.

        :param key: the identification key for this item
        :type key: structure.Structure or str or NoneType
        """

        self.key = key
        if isinstance(key, str):
            self.setLigandCorpID(key)
        else:
            self.setLigandCorpID(None)

    def setLigandCorpID(self, corp_id):
        """
        Assign the corporate ID for this structure.

        :param corp_id: the corporate ID, if any
        :type corp_id: str or NoneType
        """

        if corp_id is None and PROPNAME_CORP_ID in self.ligand.property:
            del self.ligand.property[PROPNAME_CORP_ID]
        elif corp_id is not None:
            self.ligand.property[PROPNAME_CORP_ID] = str(corp_id)

    def getLigandCorpID(self):
        """
        :return: the corporate ID for this item
        :rtype: str or NoneType
        """

        return self.ligand.property.get(PROPNAME_CORP_ID)

    def getLigandCompoundID(self):
        """
        :return: the compound ID for this item
        :rtype: str or NoneType
        """

        return self.ligand.property.get(PROPNAME_COMPOUND_ID)


class SummaryModel(parameters.CompoundParam):
    """
    The model for the summary panel shown the user prior to export.
    """

    ld_destination: LDDestination
    structures_for_2d_export: List[structure.Structure]
    three_d_export_items: List[ThreeDExportItem]
    match_compounds_by: MatchCompoundsBy
    property_export_specs: List[PropertyExportSpec]
    ffc_export_specs: List[FFCExportSpec]
    export_specs: List[DataExportSpec]
    export_global: bool


class LDClientModelMixin:
    """
    Mixin for models that contain a `LDClient` instance named `ld_client`.
    """

    def refreshLDClient(self):
        """
        Check whether the stored `LDClient` instance is connected to LiveDesign.
        If not, create a new instance and replace the old one if the new one is
        connected.

        :return: an enum describing the status of the connection;
                - `none` if no refresh was required
                - `success` if the the `LDClient` instance was replaced
                - `failure` if even the new `LDClient` instance was disconnected
        :rtype: `RefreshResult`
        """

        if self.ld_client is None or not ld_utils.is_connected(self.ld_client):
            _, ld_client, _ = login.get_ld_client_and_models()
            if ld_client and ld_utils.is_connected(ld_client):
                self.ld_client = ld_client
                return RefreshResult.success
            return RefreshResult.failure
        return RefreshResult.none


class TaskInput(LDClientModelMixin, parameters.CompoundParam):
    """
    Input model for export tasks.
    """

    ld_client: object
    ld_models: object
    ld_destination: LDDestination
    structures_for_2d_export: List[structure.Structure]
    three_d_export_items: List[ThreeDExportItem]
    property_export_specs: List[PropertyExportSpec]
    ffc_export_specs: List[FFCExportSpec]
    export_global: bool
    entry_type_name: str
    export_3d: bool
    pose_name_custom_text: str
    pose_name_propname: structure.PropertyName = None


class PoseNameEditModel(parameters.CompoundParam):
    """
    Model for the Pose Name Edit Panel.

    :ivar custom_text: the text of the custom text line edit; this value is
        stored temporarily while the panel is open, and will be copied to
        `custom_text_final` if the user accepts the panel
    :ivar include_property: the check state of the "include property" checkbox
    :ivar property_name: the structure property (if any) selected as part of
        the custom pose name; this value is stored temporarily while the panel
        is open, and will be copied to `property_name_final` if the user
        accepts the panel
    :ivar property_user_name: the text of the structure property label
    :ivar example_prop_string: the text of the example property
    :ivar example_name: the text of the example pose name label
    :ivar entry_data: the system entry data for the panel
    :ivar custom_text_final: the custom text accepted by the user
    :ivar property_name_final: the structure property (if any) accepted by the
        user
    """

    custom_text: str
    include_property: bool
    property_name: structure.PropertyName = None
    property_user_name: str = '(not defined)'
    example_prop_string: str = None
    example_name: str
    entry_data: ets.BaseEntryData = None
    custom_text_final: str
    property_name_final: structure.PropertyName = None


class AttachmentTaskInput(TaskInput):
    """
    Input model for FFC attachment export task.
    """

    corp_ids: Set[str]


class AttachmentTaskOutput(parameters.CompoundParam):
    """
    Output model for FFC attachment export task.
    """

    num_success: int
    num_failure: int


class FileBatch(parameters.CompoundParam):
    """
    Data class for storing file paths necessary for standard LD export (v8.6-).
    """

    map_file_path: str = None
    sdf_file_path: str = None
    three_d_file_path: str = None


class AttachmentData(parameters.CompoundParam):
    """
    Data class for storing FFC attachment information.
    """

    remote_file_name: str = None
    file_path: str = None
    description: str = None


def make_prop_dict(units='', name='', endpoint='', model=''):
    """
    Return a dictionary formatted to represent a structure property. Formatting
    should match the dictionary formatting required by the `properties` argument
    of `LDClient.start_export_assay_and_pose_data()`.

    :param units: unit system used by this property
    :type units: str
    :param name: name for this property; either the data name or user name
    :type name: str
    :param endpoint: the LiveDesign endpoint
    :type endpoint: str
    :param model: the user-specified portion of the name for the column under
        which this property will be stored after export to LiveDesign
    :type model: str
    """
    prop_dict = collections.OrderedDict()
    # SD files get rid of the 's_m_title' field altogether, storing
    # the title outside of the property dictionary.
    if name == constants.PROPNAME_TITLE:
        name = constants.PROPNAME_SD_TITLE

    prop_dict[constants.LD_PROP_UNITS] = units
    prop_dict[constants.LD_PROP_NAME] = name
    prop_dict[constants.LD_PROP_ENDPOINT] = endpoint
    prop_dict[constants.LD_PROP_MODEL] = model
    return prop_dict