Source code for bjec.processor

import threading
import queue

from .config import config

[docs]class Processor(object): """docstring for Processor A ``Processor`` is responsible for the task execution pipeline, that is fetching parameter sets from a ``Generator``, handing them to a ``Runner`` and passing the ``Runner``'s output to a ``Collector``. Meanwhile the ``Processor`` has to manage its ``Runners'`` lifecycle. """ def __init__(self): super(Processor, self).__init__() self._generator = None self._runner_factory = None self._collector = None
[docs] def generator(self, generator): self._generator = generator
[docs] def runner_factory(self, runner_factory): self._runner_factory = runner_factory
[docs] def collector(self, collector): self._collector = collector
[docs] def process(self): """Process all parameter sets produced by the generator. **Must** be implemented by inheriting classes. """ raise NotImplementedError
[docs]class Inline(Processor):
[docs] def process(self): runner = self._runner_factory() runner.start() for params in self._generator: output = self._collector.add(params, output) runner.stop()
[docs]class Threading(Processor): """docstring for Threading Args: n (int): Number of threads to be run. If ``<= 0``, the configuration option of the same name is used instead. Configuration Options: * ``n``: Number of threads to run, it is used when `n` passed to the constructor is ``<= 0``. Defaults to 1. """ def __init__(self, n): super(Threading, self).__init__() if n <= 0: self.n = config[Threading].get("n", 1) if self.n <= 0: raise ValueError("Invalid value for n retrieved (<= 0)") else: self.n = n self._queue = queue.Queue(maxsize=n) self._started = threading.Event() self._exhausted = threading.Event()
[docs] def process(self): threads = list() for _ in range(self.n): thread = threading.Thread(target=self._thread_worker) threads.append(thread) thread.start() self._started.set() for params in self._generator: self._queue.put(params) self._exhausted.set() for thread in threads: thread.join()
def _thread_worker(self): runner = self._runner_factory() runner.start() self._started.wait() while True: try: params = self._queue.get(block=False) except queue.Empty: if self._exhausted.is_set(): break else: continue output = self._collector.add(params, output) self._queue.task_done() runner.stop()