Source code for schrodinger.stepper.sideinputs

"""
Utility steps for creating side inputs in stepper workflows.

For example, in a workflow with steps A, B, C, and D, a ForkStep and JoinStep
can be set up so that all outputs from A are passed along to D. This allows
outputs from A to get to D even if B or C would normally filter those inputs.

Example::

    class MyWorkflow(stepper.Chain):
        def buildChain(self):
            a = A()
            self.addStep(a)
            fork = ForkStep(step=a)
            self.addStep(fork)
            self.addStep(B())
            self.addStep(C())
            self.addStep(JoinStep(fork=fork))
            self.addStep(D())

"""
import uuid

from schrodinger.tasks import stepper


[docs]class ForkStep(stepper.UnbatchedReduceStep):
[docs] def __init__(self, step): self.Input = self.Output = step.Output self._serializer = step.getOutputSerializer() self._pipe_fname = f'.{str(uuid.uuid4())}.forkfile' super().__init__()
[docs] def reduceFunction(self, inps): with open(self._pipe_fname, 'w') as outfile: for inp in inps: outfile.write(f"{self._serializer.toString(inp)}\n") yield inp
[docs] def getSerializer(self): return self._serializer
[docs] def getPipeFilename(self): return self._pipe_fname
[docs] def report(self, prefix=''): stepper.logger.info(f'{prefix} - {self.getStepId()}')
[docs]class JoinStep(stepper.UnbatchedReduceStep):
[docs] def __init__(self, fork): self._fork = fork self._serializer = fork.getSerializer() self._in_fname = fork.getPipeFilename() super().__init__()
@property def Input(self): return self._fork.Input @property def Output(self): return self._fork.Output
[docs] def reduceFunction(self, inps): yield from inps yield from self._serializer.deserialize(self._in_fname)
[docs] def report(self, prefix=''): stepper.logger.info( f'{prefix} - {self.getStepId()} <- {self._fork.getStepId()}')