Source code for schrodinger.application.matsci.guibase

"""
GUI classes/function shared by multiple Materials Science panels.

Copyright Schrodinger, LLC. All rights reserved.
"""

import os
import sys

import schrodinger
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import fep_dialog
from schrodinger.application.desmond import gui as desmond_gui
from schrodinger.application.desmond import util as demond_util
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import equilibrium_md as emd
from schrodinger.job import jobwriter
from schrodinger.job import jobcontrol
from schrodinger.job import launchparams
from schrodinger.project import utils as proj_utils
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.ui.qt import config_dialog
from schrodinger.ui.qt import forcefield
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt import utils as qt_utils
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.ui.qt.appframework2 import baseapp
from schrodinger.ui.qt.appframework2 import jobs
from schrodinger.ui.qt.appframework2 import validation
from schrodinger.utils import fileutils

maestro = schrodinger.get_maestro()

DARK_YELLOW = QtGui.QColor(QtCore.Qt.darkYellow)

PER_STRUC_CPU_LABEL = 'processors per structure'
PER_STRUC_GPU_LABEL = 'GPUs per structure'
CMS_EXT_FILTER = 'Chemical model system files (*cms *cms.gz *cmsgz)'


[docs]class MultiJobApp(af2.JobApp): """ Subclass of AF2 JobApp that allows launching multiple simultanious jobs. Every panel subclassing this should re-implement getJobCount() and getJobSpec() methods. A start method should NOT be implemented. """ def _startOrWrite(self): """ Over-riden method from JobApp class. Launcher/writes single or multiple jobs. The value of self.start_mode determines which to do. """ # TODO: Either move this functionality into AF2, or factor out # duplication between this method and base JobApp._startOrWrite(). if not self._preLaunchValidate(): return njobs = self.getJobCount() # Create the outer/master job directory: if not jobs.CHDIR_MUTEX.tryLock(): self.warning(jobs.CHDIR_LOCKED_TEXT) return if self.createJobDir() is False: # User has cancelled the job start/write; we don't chdir into jobdir jobs.CHDIR_MUTEX.unlock() return s_if_multijob = 's' if njobs > 1 else '' if self.start_mode == af2.FULL_START: msg = 'Submitting job%s...' % s_if_multijob elif self.start_mode == af2.ONLY_WRITE: msg = 'Writing job%s...' % s_if_multijob self.orig_dir = os.getcwd() self.status_bar.showMessage(msg) start_bn = self.bottom_bar.start_bn start_bn.setEnabled(False) settings_bn = self.bottom_bar.settings_bn settings_bn.setEnabled(False) # Force some QT event processing to ensure these state changes show up # in the GUI - PANEL-7556 self.application.processEvents(QtCore.QEventLoop.ExcludeUserInputEvents) os.chdir(self.jobDir()) try: ret_list = self.multiJobStart() except: self.showLaunchStatus(0, njobs) raise finally: os.chdir(self.orig_dir) jobs.CHDIR_MUTEX.unlock() start_bn.setEnabled(True) settings_bn.setEnabled(True) self.orig_dir = '' num_succeeded = sum([1 for ret in ret_list if ret]) self.showLaunchStatus(num_succeeded, njobs) if self.start_mode == af2.ONLY_WRITE and num_succeeded == 0: fileutils.force_rmtree(self.jobDir()) self.status_bar.showMessage('Failed to write job') return self.updateJobname() if self.start_mode == af2.FULL_START: job_list = [job for job in ret_list if job] if not job_list or len(job_list) > 1: # Job tracking is not supported with multiple jobs; stop # any previous tracking: self.last_job = None else: # If a single job was launched; track it: self.last_job = job_list[0] if maestro: # Monitor started jobs: for job in job_list: maestro.job_started(job.JobID) @af2.validator(-1000) def validateJobCount(self): # Validate job counts: njobs = self.getJobCount() if njobs < 1: self.error("At least one job must be requested") return False if njobs > 100: msg = ("This panel supports at most 100 jobs; while " "%i jobs were requested" % njobs) return (False, msg) if njobs > 10: msg = ("Launching over 10 simultaneous jobs with this panel " "is not recommened; you requested %i." % njobs) return (True, msg) return True def _preLaunchValidate(self): """ Run validation before starting/writing jobs. :return: True if validation passed, False if operation should be cancelled. :rtype: bool """ if not fileutils.is_valid_jobname(self.jobname()): msg = fileutils.INVALID_JOBNAME_ERR % self.jobname() self.warning(msg) return False if not self.runValidation(stop_on_fail=True): return False if self.config_dlg: if not self.config_dlg.validate(): return False return True
[docs] def showLaunchStatus(self, num_succeeded, njobs): """ Show a label at the bottom of the panel with status of the launch. """ s_if_multijob = 's' if njobs > 1 else '' if num_succeeded == 0: # All jobs failed if self.start_mode == af2.FULL_START: msg = 'Failed to start job%s' % s_if_multijob else: msg = 'Failed to write job%s' % s_if_multijob self.status_bar.showMessage(msg) elif num_succeeded == njobs: # All jobs were launched/written successfully if self.start_mode == af2.FULL_START: if njobs == 1: msg = "Job started" else: msg = "%i jobs started" % njobs else: if njobs == 1: msg = "Job written" else: msg = "%i jobs written" % njobs self.status_bar.showMessage(msg, 3000, af2.DARK_GREEN) else: # Some of the job launched, some failed if self.start_mode == af2.FULL_START: template = "%i jobs started; %i failed to start" else: template = "%i jobs written; %i failed to write" msg = template % (num_succeeded, njobs - num_succeeded) self.status_bar.showMessage(msg, 3000, DARK_YELLOW)
[docs] def getJobCount(self): """ Over-ride in the subclass to return the number of jobs that the user would like to start. Used for validation. :return the number of jobs that the user requests. :rtype: int """ raise NotImplementedError()
# start() decorator is required for AF2 to show the Run button.
[docs] @af2.appmethods.start() def multiJobStart(self): """ Custom "start" method. For each job, creates a job sub-directory, cd's into it, and launches the job there. :return list of return values from self.launchFromJobSpec() calls. Each value is a Job object or False, in start mode, or True/False in write mode. """ njobs = self.getJobCount() master_jobname = self.jobname() if njobs == 1: ret = self.launchFromJobSpec(master_jobname, 1) return [ret] ret_list = [] for job_num in range(1, njobs + 1): sub_jobname = '%s-%03d' % (master_jobname, job_num) os.mkdir(sub_jobname) with fileutils.chdir(sub_jobname): ret = self.launchFromJobSpec(sub_jobname, job_num) ret_list.append(ret) return ret_list
[docs] def getValidatedOPLSDir(self, oplsdir=None): """ Validate the existing oplsdir or get a requested oplsdir from the user's selection in the force field selector widget :param str oplsdir: The currently requested oplsdir :rtype: False, None or str :return: False if no valid oplsdir can be found, None if there is no custom dir requested, or the path to the custom directory """ # PANEL-8401 has been filed to improve the AF2 infrastructure for using # the FF Selector. That case may eventually result in changes here. # Detect any forcefield selector if requested sanitized_opls_dir = False if oplsdir is None: child = self.findChild(forcefield.ForceFieldSelector) if child: if not child.sanitizeCustomOPLSDir(): return False sanitized_opls_dir = True oplsdir = child.getCustomOPLSDIR() # verify the oplsdir method argument's validity and allow using default if oplsdir and not sanitized_opls_dir: opls_dir_result = self.validateOPLSDir(oplsdir) if opls_dir_result == forcefield.OPLSDirResult.ABORT: return False elif opls_dir_result == forcefield.OPLSDirResult.DEFAULT: oplsdir = None return oplsdir
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None): """ Re-implemented from JobApp; options and behavior is somewhat differnt. Starts or write the command for the given subjob. :return: Job object on successful start; True on successful write, False on failure. """ jobdir = os.getcwd() # TODO: Remove duplication between this method and JobApp method. try: job_spec = self.getJobSpec(sub_jobname, job_num) except SystemExit as e: self.error('Error launching job {}'.format(e)) return False launch_params = launchparams.LaunchParameters() launch_params.setJobname(sub_jobname) cd_params = self.configDialogSettings() host = None if 'host' in cd_params: host = cd_params['host'] launch_params.setHostname(host) if 'openmpcpus' in cd_params: threads = cd_params['threads'] cpus = cd_params['openmpcpus'] if threads: launch_params.setNumberOfSubjobs(cd_params['openmpsubjobs']) if job_spec.jobUsesTPP(): launch_params.setNumberOfProcessorsOneNode(threads) # NOTE: If the driver is not using the TPP option, but passing # to subjobs, this needs to go as part of command in getJobSpec # (use _addJaguarOptions) else: # NOTE: this is the right thing to do for matsci GUIs but # maybe be the wrong thing to do for jaguar GUIs, since # they may want ONLY the -PARALLEL N option and not also # -HOST foo:N as well launch_params.setNumberOfSubjobs(cpus) elif 'cpus' in cd_params: launch_params.setNumberOfSubjobs(cd_params['cpus']) if self.runMode() == baseapp.MODE_MAESTRO: if 'proj' in cd_params: launch_params.setMaestroProjectName(cd_params['proj']) # Setting disposition is only valid if we have a project if 'disp' in cd_params: launch_params.setMaestroProjectDisposition( cd_params['disp']) launch_params.setMaestroViewname(self.viewname) if maestro and maestro.get_command_option( "prefer", "enablejobdebugoutput") == "True": launch_params.setDebugLevel(2) oplsdir = self.getValidatedOPLSDir(oplsdir=oplsdir) if oplsdir is False: return False elif oplsdir: launch_params.setOPLSDir(oplsdir) launch_params.setDeleteAfterIncorporation(True) launch_params.setLaunchDirectory(jobdir) # Call private function here because there's not guaranteed a great analog # for cmdline launching. cmdlist = jobcontrol._get_job_spec_launch_command( job_spec, launch_params, write_output=True) if self.start_mode == af2.FULL_START: try: job = jobcontrol.launch_from_job_spec( job_spec, launch_params, display_commandline=jobwriter.cmdlist_to_cmd(cmdlist)) except jobcontrol.JobLaunchFailure: # NOTE: jobcontrol.launch_job() already showed an error dialog # to the user by this point. return False # NOTE: AF2 implementaiton calls registerJob(job) here. Not needed # here as when multiple jobs are launched, job tracking is not # possible. Instead we call maestro.job_started() at a later time, # in order for the job to show up in monitor panels. return job elif self.start_mode == af2.ONLY_WRITE: self.writeJobCmd(cmdlist) return True
def _getSHFilename(self): """ Return the name of the *.sh file that should be written. Assumes that the CWD is the job directory for the (sub)job. """ jobdir = os.getcwd() jobname = os.path.basename(jobdir) return os.path.join(jobdir, jobname + '.sh')
[docs]class MultiDesmondJobApp(MultiJobApp): """ Class with functionality for setting up, validating and running multiple Desmond jobs. """
[docs] def setup(self): MultiJobApp.setup(self) self._models = []
[docs] def setDefaults(self): MultiJobApp.setDefaults(self) self._models = []
[docs] def getModel(self, index): """ Return the model system at the specified index. :param index: Index of the system to return :type index: int :return: Model system at the specified job index :rtype: `cms.Cms` """ return self._models[index]
[docs] def getJobCount(self): """ Return the number of jobs that the user would like to run. :return: Number of jobs to run :rtype: int """ # Note that subclasses may choose to reimplement this function as needed; # this general implementation should work for most cases. if not maestro or not hasattr(self, 'input_selector'): return len(self._models) pt = maestro.project_table_get() input_state = self.input_selector.inputState() if input_state == self.input_selector.SELECTED_ENTRIES: return len(pt.selected_rows) elif input_state == self.input_selector.FILE: return 1 else: return len(pt.included_rows)
[docs] def getStructFromPtEntry(self): """ Get the first included entry in the Workspace if that entry is one of the chosen entries, or the first selected entry if no included entry is chosen. :rtype: (`schrodinger.structure.Structure`, string) or (None, None) :return: one structure from selected or included entries, the structure entry id """ val_st = None val_eid = None if not maestro: return val_st, val_eid # File is not supported here, so assert right away assert self.input_selector.options['file'] is False ptable = maestro.project_table_get() included_eids = maestro.get_included_entry_ids() input_state = self.input_selector.inputState() val_row = None if input_state == self.input_selector.SELECTED_ENTRIES: # Give priority to the the selected row that is also included in # the Workspace for eid in included_eids: row = ptable.getRow(eid) if row.is_selected: val_row = row break if not val_row and ptable.selected_rows: # None of the selected rows were included; use the first one: val_row = next(iter(ptable.selected_rows)) else: if included_eids: val_row = ptable.getRow(included_eids.pop()) if val_row: val_st = val_row.getStructure() val_eid = val_row.entry_id return val_st, val_eid
[docs] def customOPLSDirForModel(self, job_num): """ Get the custom OPLS directory (if any) for the model corresponding to job_num :param int job_num: The index of the job number :rtype: str or None :return: The opls directory to use, or None if this model does not use it """ struct = self._models[job_num - 1].fsys_ct # MATSCI-8027 / DESMOND-10004 if demond_util.use_custom_oplsdir(struct): oplsdir = forcefield.get_custom_opls_dir() else: oplsdir = None return oplsdir
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None): """ See parent class for documentation. Here mainly OPLS directory is obtained from structure properties. """ if not oplsdir: oplsdir = self.customOPLSDirForModel(job_num) return super().launchFromJobSpec(sub_jobname, job_num, oplsdir=oplsdir)
@af2.validator(-999) def validateModelLoaded(self): """ At runtime we check the user's input selection and attempt to load the input, ensuring that all specified inputs are valid models. """ if not hasattr(self, 'input_selector'): # Does not use input_selector for specifying input. return True try: self._models = list(self.input_selector.cmsModels()) except TypeError as err: return False, str(err) if not self._models: return False, "No valid model systems have been specified." return True
[docs]class MultiCmdJobApp(MultiDesmondJobApp): """ Class with functionality for setting up, validating and running multiple Desmond jobs with GPU subhosts via commands. Note: multiJobStart() fires off multiple jobs by launchJobCmd(), which calls setupJobCmd() to add HOST and SUBHOST flags. """
[docs] @af2.appmethods.start() def multiJobStart(self): """ Custom "start" method. For each job, creates a job sub-directory, cd's into it, and launches the job there. :return list of return values from self.launchFromJobSpec() calls. Each value is a Job object or False, in start mode, or True/False in write mode. """ njobs = self.getJobCount() master_jobname = self.jobname() if njobs == 1: cmd = self.getJobCmd(master_jobname, 1) ret = self.launchJobCmd(cmd) if self.start_mode == af2.ONLY_WRITE and ret is None: # launchJobCmd returns None on ONLY_WRITE mode. Convert it to True return [True] return [ret] ret_list = [] for job_num in range(1, njobs + 1): sub_jobname = '%s-%03d' % (master_jobname, job_num) os.mkdir(sub_jobname) with fileutils.chdir(sub_jobname): cmd = self.getJobCmd(sub_jobname, job_num) ret = self.launchJobCmd(cmd, jobdir=os.getcwd()) ret_list.append(ret) if self.start_mode == af2.ONLY_WRITE: # launchJobCmd returns None on ONLY_WRITE mode. Convert it to True ret_list = [True if x is None else x for x in ret_list] return ret_list
[docs] def getJobCmd(self, jobname, job_number, cmd=None): """ Must be inherited by subclasses to yield a command list :param str jobname: The job name :param int job_number: the job number :param list cmd: The list of command args to this point :rtype: list of str :return: a list of the command args for job submission """ if not cmd: raise NotImplementedError("Subclasses must implement this method") cmd += ['-JOBNAME', jobname] # Add a custom OPLSDIR if requested model_oplsdir = self.customOPLSDirForModel(job_number) oplsdir = self.getValidatedOPLSDir(oplsdir=model_oplsdir) if oplsdir: cmd += ['-OPLSDIR', oplsdir] return cmd
[docs] def setupJobCmd(self, cmdlist, auto_add_host=True, auto_add_subhost=True, **kwargs): """ Adds arguments such as HOST, SUBHOST, and GPU flags to cmdlist beyond the parent class method if they are set in the config dialog. Settings pre-existing in the cmdlist take precedence over the config dialog settings. :param cmdlist: the command list :type cmdlist: list :param auto_add_host: Whether or not to automatically add -HOST flat to command when it is not already included. :type auto_add_host: bool :param auto_add_subhost: Whether or not to automatically add -SUBHOST flat to command when it is not already included. :type auto_add_subhost: bool """ # MultiCmdJobApp has been tested with PerStructDesmondSubhostConfigDialog # Please only remove this block after confirming other config dialog working if self.config_dlg: assert isinstance(self.config_dlg, PerStructDesmondSubhostConfigDialog) cd_params = self.configDialogSettings() if cd_params.get('gpus'): is_full = True try: rtype_idx = cmdlist.index(emd.FLAG_RUN_TYPE) except ValueError: pass else: # Post analysis, line fitting, and replica averaging don't # request GPU resources is_full = cmdlist[rtype_idx + 1] == emd.FULL_RUN if is_full: cmdlist += [jobutils.FLAG_GPU] maxjobs = cd_params.get('maxjobs') subjob_host = cd_params.get('subjob_host') if subjob_host and '-SUBHOST' not in cmdlist and auto_add_subhost: if maxjobs: cmdlist.extend(['-SUBHOST', '%s:%s' % (subjob_host, maxjobs)]) else: cmdlist.extend(['-SUBHOST', subjob_host]) if cd_params.get('host') and 'HOST' not in cmdlist and auto_add_host: host = cd_params['host'] if maxjobs and not cd_params.get('subjob_host'): cmdlist.extend(['-HOST', '%s:%s' % (host, maxjobs)]) else: cmdlist.extend(['-HOST', host]) return super().setupJobCmd( cmdlist, auto_add_host=auto_add_host, **kwargs)
[docs]class PerStrucConfigDialog(desmond_gui.DesmondGuiConfigDialog): """ Dialog for configuring jobs that can have CPUs/GPUs per input structure specified. """ CPU_UNIT_LABEL = PER_STRUC_CPU_LABEL GPU_UNIT_LABEL = PER_STRUC_GPU_LABEL
Super = desmond_gui.SingleGpuDesmondGuiConfigDialog
[docs]class PerStrucSingleGpuConfigDialog(Super): """ Class to configure jobs that can use a single GPU per input structure. """ CPU_UNIT_LABEL = PER_STRUC_CPU_LABEL GPU_UNIT_LABEL = PER_STRUC_GPU_LABEL
[docs]class PerStructDesmondSubhostConfigDialog(fep_dialog.FEPConfigDialog): """ Subclass fep_dialog.FEPConfigDialog dialog, and customize it for panels that run multiple desmond subjobs. """
[docs] def __init__(self, *arg, gpu_num=None, cpu_num=None, sim_jobnum=None, sim_job_sb=None, allow_cpu=False, has_subjobs_func=None, **kwargs): """ See parent class for additional documentation string. :type gpu_num: int :param gpu_num: Fix the gpu processor number per subjob, if provided. :type cpu_num: int :param cpu_num: Fix the cpu processor number per subjob, if provided. :type sim_jobnum: int :param sim_jobnum: The default simultaneous subjob number. :type sim_job_sb: str :param sim_job_sb: the attribute name of the parent to define the number of simultaneous subjob. :type allow_cpu: bool :param allow_cpu: whether to allow subjob submission to cpu master host. :type: callable :param has_subjobs_func: Function that takes no arguments and returns a boolean indicating whether subjobs will be run. If not supplied, validation will always assume subjobs are run. """ self.allow_cpu = allow_cpu self.sim_job_sb = sim_job_sb self.has_subjobs_func = has_subjobs_func super().__init__(*arg, **kwargs) gpu_sb = self.num_cpus_sw.widget(fep_dialog.GPU_LAYOUT) cpu_sb = self.num_cpus_sw.widget(fep_dialog.CPU_LAYOUT) for p_num_sb, p_num_value in zip([gpu_sb, cpu_sb], [gpu_num, cpu_num]): if p_num_value: p_num_sb.setValue(p_num_value) p_num_sb.setEnabled(False) subjob_host = self.currentHost(self.subhost_menu) if not subjob_host: # Disable the subhost menu if no subhost is available self.subhost_menu.setEnabled(False) if sim_jobnum: # Default simultaneous job number is the number of independent runs self.maxjobs_ef.setText(str(sim_jobnum))
[docs] def updateMaxjobsDefault(self): """ Overwrite the parent class method to set the upper limit of max simultaneous subjobs. """ host = self.currentHost() if host is None: return if self.sim_job_sb and hasattr(self.parent, self.sim_job_sb): md_num = getattr(self.parent, self.sim_job_sb).value() max_sim_jobs = min([md_num, host.processors]) else: max_sim_jobs = host.processors # bolt_gpu_short prints (10000, 8) as host.queue # localhost could print (8, 2) as host.queue # processors are 10000 and 8 usually a large number compared to the total # gpu number on a single host. if self.maxjobs_ef.text() and int( self.maxjobs_ef.text()) > max_sim_jobs: self.maxjobs_ef.setText(str(max_sim_jobs)) non_negative_int_val = swidgets.SNonNegativeIntValidator( top=max_sim_jobs) self.maxjobs_ef.setValidator(non_negative_int_val)
[docs] def validateSubHost(self): """ Overwritten the parent method. """ if self.has_subjobs_func is not None and not self.has_subjobs_func(): # No subjobs are being run (MATSCI-9978) return True subjob_host = self.currentHost(self.subhost_menu) if subjob_host is None and not self.allow_cpu: msg = ('No GPU host available. This workflow is only supported ' 'on GPUs.') # Temporary workaround for PANEL-15456 as 'pdxgpu-base' fails gpu jobs elif subjob_host and 'pdxgpu-base' in subjob_host.label(): msg = (f"{subjob_host.label()} doesn't allow GPU jobs. Please " "choose another GPU host.") else: return True self.warning(msg) return False
[docs] def validate(self): """ Overwritten the parent method to allow cpu hosts for master jobs. """ if not self.validateSubjobs(): return False if not self.validateSubHost(): return False return config_dialog.ConfigDialog.validate(self)
[docs]class ConfigDialogWithPlatformValidation(af2.ConfigDialog): """ Adds a platform check to ConfigDialog for localhost jobs. """
[docs] def __init__(self, *args, incompatible_platforms=(), platform_warning="", **kwargs): """ :type incompatible_platforms: tuple :param incompatible_platforms: Incompatible platforms for the job. :type platform_warning: str :param platform_warning: The warning to be displayed if the platform is incompatible. """ self.incompatible_platforms = incompatible_platforms self.platform_warning = platform_warning super().__init__(*args, **kwargs)
[docs] def validateHost(self, host): """ Validates whether or not a launch host is valid for the job. Currently only validates localhost. :param host: Launch host to validate :type host: `schrodinger.job.jobcontrol.Host` object :rtype: bool :return: True if the selected host is valid for the job, False otherwise. """ return not (host.name == 'localhost' and sys.platform in self.incompatible_platforms)
[docs] def validate(self): """ Checks the panel to make sure settings are valid. Return False if any validation test fails, otherwise return True. :rtype: bool :return: True if all checks pass, False otherwise """ host = self.currentHost() if not self.validateHost(host): self.warning(self.platform_warning) return False return super().validate()
[docs]def validate_child_widgets(child_widgets): """ Runs each widget's validators while respecting the validation_order across all other widgets. Rather than running validators one widget at a time, ensures that the validator with smallest validation_order across widgets is run first, and so on. Validation methods that call this function and return its value should be decorated with @validation.multi_validator() :param list child_widgets: The widgets to run validations for :rtype: validation.ValidationResults :return: The results of validations, or the results up to and including the first failed validation. """ validators = [] for widget in child_widgets: validators.extend(validation.find_validators(widget)) validators.sort(key=lambda method: method.validation_order) results = validation.ValidationResults() for validate_method in validators: result = validate_method() results.add(result) if not result: break return results
[docs]class ProcessPtStructuresApp(af2.App): """ Base class for panels that process pt structures and either replace them or creates new entries """ REPLACE_ENTRIES = 'Replace current entries' NEW_ENTRIES = 'Create new entries' RUN_BUTTON_TEXT = 'Run' # Can be overwritten for custom button name TEMP_DIR = fileutils.get_directory_path(fileutils.TEMP) # Used for unittests. Should be overwritten in derived classes. DEFAULT_GROUP_NAME = 'new_structures'
[docs] def setPanelOptions(self): """ Override the generic parent class to set panel options """ super().setPanelOptions() self.input_selector_options = { 'file': False, 'selected_entries': True, 'included_entries': True, 'included_entry': False, 'workspace': False }
[docs] def layOut(self): """ Lay out the widgets for the panel """ super().layOut() layout = self.main_layout # Any widgets for subclasses should be added to self.top_main_layout self.top_main_layout = swidgets.SVBoxLayout(layout=layout) output_gb = swidgets.SGroupBox("Output", parent_layout=layout) self.output_rbg = swidgets.SRadioButtonGroup( labels=[self.REPLACE_ENTRIES, self.NEW_ENTRIES], layout=output_gb.layout, command=self.outputTypeChanged, nocall=True) hlayout = swidgets.SHBoxLayout(layout=output_gb.layout, indent=True) dator = swidgets.FileBaseNameValidator() self.group_name_le = swidgets.SLabeledEdit( "Group name: ", edit_text=self.DEFAULT_GROUP_NAME, validator=dator, always_valid=True, layout=hlayout) self.outputTypeChanged() layout.addStretch() self.status_bar.showProgress() self.app = QtWidgets.QApplication.instance() # MATSCI-8244 size_hint = self.sizeHint() size_hint.setWidth(410) self.resize(size_hint) # Set custom run button name if subclass defines it self.bottom_bar.start_bn.setText(self.RUN_BUTTON_TEXT)
[docs] def outputTypeChanged(self): """ React to a change in output type """ self.group_name_le.setEnabled( self.output_rbg.checkedText() == self.NEW_ENTRIES)
[docs] @af2.appmethods.start() def myStartMethod(self): """ Process the selected or included rows' structures """ # Get rows ptable = maestro.project_table_get() input_state = self.input_selector.inputState() if input_state == self.input_selector.INCLUDED_ENTRIES: rows = [row for row in ptable.included_rows] elif input_state == self.input_selector.SELECTED_ENTRIES: rows = [row for row in ptable.selected_rows] # Prepare progress bar nouser = QtCore.QEventLoop.ExcludeUserInputEvents num_structs = len(rows) self.progress_bar.setValue(0) self.progress_bar.setMaximum(num_structs) self.app.processEvents(nouser) structs_per_interval = max(1, num_structs // 20) # Initialize output means if self.output_rbg.checkedText() == self.REPLACE_ENTRIES: modify = True writer = None else: modify = False file_path = os.path.join(self.TEMP_DIR, self.group_name_le.text() + ".mae") writer = structure.StructureWriter(file_path) self.setUp() # Process the structures with qt_utils.wait_cursor: for index, row in enumerate(rows, start=1): with proj_utils.ProjectStructure(row=row, modify=modify) as \ struct: try: passed = self.processStructure(struct) if passed and writer: writer.append(struct) except WorkflowError: break if not index % structs_per_interval: self.progress_bar.setValue(index) self.app.processEvents(nouser) # Import file if applicable if writer: writer.close() if os.path.exists(file_path): # No file will exist if all sts fail ptable = maestro.project_table_get() ptable.importStructureFile( file_path, wsreplace=True, creategroups='all') fileutils.force_remove(file_path) # Show 100%. Needed when num_structs is large and not a multiple of 20 self.progress_bar.setValue(num_structs) self.app.processEvents(nouser) # Panel-specific wrap up self.wrapUp()
[docs] def setUp(self): """ Make any preparations required for processing structures """ pass
[docs] def processStructure(self, struct): """ Process each structure. Should be implemented in derived classes. :param `structure.Structure` struct: The structure to process """ raise NotImplementedError
[docs] def wrapUp(self): """ Wrap up processing the structures """ pass
[docs] def reset(self): """ Reset the panel """ self.group_name_le.reset() self.output_rbg.reset() self.outputTypeChanged() self.progress_bar._bar.reset( ) # af2.ProgressFrame doesn't have a reset method
[docs]class WorkflowError(ValueError): """ Custom exception for when the workflow should be stopped """ pass
[docs]def format_tooltip(tip, min_width=None): """ Return a formatted tooltip. :type tip: str :param tip: the tip text :type min_width: int or None :param min_width: the minimum tip width, if None a default is used :rtype: str :return: the formatted tool tip """ tip = tip.strip() dummy = 'I think this seems like a good tool tip length.' min_width = min_width or len(dummy) if len(tip) <= min_width: return tip start, rest = tip[:min_width], tip[min_width:] if start[-1] == ' ': return f'<nobr>{start}</nobr>{rest}' if rest[0] == ' ': return f'<nobr>{start} </nobr>{rest[1:]}' words = rest.split() start += words[0] rest = ' '.join(words[1:]) if rest: return f'<nobr>{start} </nobr>{rest}' else: return start
[docs]def load_entries_for_panel(*entry_ids, panel=None, load_func=None, included_entry=False, selected_entries=False, load_job=False): """ :param tuple entry_ids: Entry ids to load into the panel :param `af2.App` panel: The panel to populate :param callable load_func: The panel method to call to load the entries :param bool included_entry: Whether the group's entries should be included for the panel :param bool selected_entries: Whether the group's entries should be selected for the panel :param bool load_job: Whether the entry's job should be loaded into the panel :raise RuntimeError: If neither included nor selected entries is being used """ if not entry_ids: return if load_job: ptable = maestro.project_table_get() row = ptable.getRow(entry_ids[0]) job_dir = jobutils.get_source_path(row, existence_check=False) if job_dir: if os.path.exists(job_dir): load_func(job_dir) else: panel.error(f'Could not find job directory: {job_dir}') else: panel.error('Could not get the job directory path from entry.') return elif included_entry: command = 'entrywsincludeonly entry ' + str(entry_ids[0]) input_state = af2.input_selector.InputSelector.INCLUDED_ENTRY elif selected_entries: command = 'entryselectonly entry ' + ' '.join(map(str, entry_ids)) input_state = af2.input_selector.InputSelector.SELECTED_ENTRIES else: raise RuntimeError("One of load keywords should be True.") if hasattr(panel, 'input_selector') and panel.input_selector is not None: panel.input_selector.setInputState(input_state) elif hasattr(panel, '_if') and panel._if is not None: panel._if.setInputState(input_state) maestro.command(command) if load_func: load_func()
[docs]class ThreadOnlyConfigDialog(af2.ConfigDialog): """ Manage a config dialog that only exposes the number of threads and locks the number of simultaneous subjobs at one. """ def _setupOpenMPWidgets(self): """ See parent class for documentation. """ super()._setupOpenMPWidgets() self.open_mp_ui.mp_cpus_rb.setVisible(False) self.open_mp_ui.mp_cpus_grouping.setVisible(False) self.open_mp_ui.mp_open_mp_rb.setChecked(True) self.open_mp_ui.mp_open_mp_rb.setVisible(False) self.open_mp_ui.mp_max_subjobs_sb.setValue(1) self.open_mp_ui.mp_max_subjobs_sb.setEnabled(False) self.updateOpenMPInfo()