Source code for bjec.runner

import subprocess
import shlex
import tempfile

from .params import evaluate


try:
	subprocess.run
except AttributeError:
	from .subprocess_run import run
	subprocess.run = run


[docs]class Runner(object): """docstring for Runner"""
[docs] def start(self): """start is called before the Runner is used for the first time. **May** be implemented by inheriting classes (but must be defined). If starting is an asynchronous process, start() must return after this process completed. """ pass
[docs] def run(self, params): """run is called to have the Runner execute a task. **Must** be implemented by inheriting classes. The parameter params consists of the task's parameters, its type will depend on the Runner's configuration. run() must return only after processing completed. Its return type will be depend on the Runner's configuration. """ raise NotImplementedError
[docs] def stop(self): """stop is called when the Runner is no longer needed. **May** be implemented by inheriting classes (but must be defined). If stopping is an asynchronous process, stop() must return after this process completed. """ pass
[docs] @classmethod def factory(cls, *args, **kwargs): """Creates a factory for properly set-up ``Runner`` objects. Here, a factory is a function taking no parameters and returning a new instance of a ``Runner`` (subclass). **May** be implemented by inheriting classes. The default implementation will create a new object of the current class with the exact same parameters as passed into the ``factory`` method. Returns: function: Calling this function will return a new ``Runner`` instance with the parameters passed into the ``factory`` method. """ def factory(): return cls(*args, **kwargs) return factory
[docs]class SubprocessRunner(Runner): """docstring for SubprocessRunner"""
[docs] class Wrapper(object): """Wrapper is a helper class used by both input and output methods. Wrapper is used as a context manager, the ``subprocess.run()`` is wrapped in it. Input and output methods can therefore use its ``__enter__`` methods to modify the ``args`` and ``kwargs`` passed to the ``subprocess.run()`` call. For details also check out the ``InputMethod`` and ``OutputMethod`` classes as well as the concrete implementations of the both. Attributes: obj (object): Arbitrary object, meant to contain the instance of of the Wrapper's method class, thus enabling access to its members. params (dict): The parameter set serving as the input of the current run / task. args (list): The args list passed to ``subprocess.run()``. May be modified. kwargs (dict): The kwargs list passed to ``subprocess.run()``. May be modified. """ def __init__(self, obj, params, args, kwargs): super(SubprocessRunner.Wrapper, self).__init__() self.obj = obj self.params = params self.args = args self.kwargs = kwargs def __enter__(self): pass def __exit__(self, type, value, traceback): pass
def __init__(self, *args, input=None, output=None, **kwargs): super(SubprocessRunner, self).__init__() if kwargs.get("shell", False): self.args = args[0] else: self.args = args self.input = input or InputMethod() self.output = output or OutputMethod() self.kwargs = kwargs
[docs] def run(self, params): args = list(self.args) kwargs = dict(self.kwargs) in_w = self.input.wrapper(params, args, kwargs) out_w = self.output.wrapper(params, args, kwargs) with in_w, out_w: subprocess.run(args, **kwargs) return out_w.output()
[docs]class InputMethod(object):
[docs] class Wrapper(SubprocessRunner.Wrapper): pass
[docs] def wrapper(self, params, args, kwargs): return self.Wrapper(self, params, args, kwargs)
[docs]class ProcessArgs(InputMethod): """docstring for ProcessArgs Args: *args (str, ParamsEvaluable): Arguments to execute the subprocess with. Supports ParamsEvaluable arguments. """
[docs] class Wrapper(InputMethod.Wrapper): def __enter__(self): l = map(str, evaluate(self.obj.args, self.params)) if isinstance(self.args, str): self.args += " " + " ".join(map(shlex.quote, l)) else: self.args += l
def __init__(self, *args): super(ProcessArgs, self).__init__() self.args = args
[docs]class OutputMethod(object):
[docs] class Wrapper(SubprocessRunner.Wrapper):
[docs] def output(self): """Returns the output of the subprocess. **Must** be implemented by inheriting classes. Will be called by SubprocessRunner after the ``subprocess.run()`` call has finished. """ pass
[docs] def wrapper(self, params, args, kwargs): return self.Wrapper(self, params, args, kwargs)
[docs]class Stdout(OutputMethod): """docstring for Stdout Parameters: spool (int, default 0): If spool is greater 0, the stdout will be stored in a spooled file in memory until its size exceeds ``spool``. named (bool, default False): If True, the output file will be located on the file system with its path in the its ``name`` attribute. Implies ``spool = 0`` if set to True. stdout (bool, default True): If True, the stdout of the subprocess will be included in the output file. stderr (bool, default False): If True, the stderr of the subprocess will be included in the output file """
[docs] class Wrapper(OutputMethod.Wrapper): def __init__(self, *args, **kwargs): super(Stdout.Wrapper, self).__init__(*args, **kwargs) if self.obj.named: self.file = tempfile.NamedTemporaryFile() elif self.obj.spool > 0: self.file = tempfile.SpooledTemporaryFile( max_size=self.obj.spool, ) else: self.file = tempfile.TemporaryFile() def __enter__(self): if self.obj.stdout: if self.obj.stderr: self.kwargs["stderr"] = subprocess.STDOUT else: self.kwargs["stderr"] = subprocess.DEVNULL self.kwargs["stdout"] = self.file else: if self.obj.stderr: self.kwargs["stderr"] = self.file else: self.kwargs["stderr"] = subprocess.DEVNULL self.kwargs["stdout"] = subprocess.DEVNULL def __exit__(self, type, value, traceback): if type is not None: self.file.close() else: self.file.seek(0)
[docs] def output(self): return self.file
def __init__(self, spool=0, named=False, stdout=True, stderr=False): super(Stdout, self).__init__() self.spool = spool self.named = named self.stdout = stdout self.stderr = stderr