Source code for bjec.job

import functools
from typing import Any, Callable, List, Optional, Union

from .master import Runnable, Registerable, Constructible, Artefactor, Dependency, WrapperRun, Master, ResolveKey
from .master import master as global_master
from .utils import listify
from .generator import Generator
from .collector import Collector
from .processor import Processor


[docs]class Job(Dependency, Constructible, Artefactor, WrapperRun, Runnable):
[docs] class Constructor(Dependency.ResolveConstructor, Artefactor.Constructor, Constructible.Constructor): _obj: 'Job' @property def generator(self) -> Generator: if self._obj._generator is None: raise Exception('Insufficiently specified') return self._obj._generator @generator.setter def generator(self, generator: Generator) -> None: self._obj._generator = generator @property def processor(self) -> Processor[Any]: if self._obj._processor is None: raise Exception('Insufficiently specified') return self._obj._processor @processor.setter def processor(self, processor: Processor[Any]) -> None: self._obj._processor = processor @property def runnable(self) -> Any: if self._obj._runnable is None: raise Exception('Insufficiently specified') return self._obj._runnable @runnable.setter def runnable(self, runnable: Any) -> None: self._obj._runnable = runnable @property def collector(self) -> Collector[Any]: if self._obj._collector is None: raise Exception('Insufficiently specified') return self._obj._collector @collector.setter def collector(self, collector: Collector[Any]) -> None: self._obj._collector = collector
[docs] def after(self, *after_funcs: Callable[['Job'], None]) -> None: self._obj._after_funcs.extend(after_funcs)
def __init__( self, constructor_func: Callable[['Job.Constructor'], None], depends: List[ResolveKey] = [], ) -> None: super(Job, self).__init__() self._generator: Optional[Generator] = None self._processor: Optional[Processor[Any]] = None self._runnable: Optional[Any] = None self._collector: Optional[Collector[Any]] = None self._after_funcs: List[Callable[[Job], None]] = [] self.constructor_func = constructor_func self.depends(*depends) @property def generator(self) -> Generator: if self._generator is None: raise Exception('Insufficiently specified') return self._generator @property def processor(self) -> Processor[Any]: if self._processor is None: raise Exception('Insufficiently specified') return self._processor @property def runnable(self) -> Any: if self._runnable is None: raise Exception('Insufficiently specified') return self._runnable @property def collector(self) -> Collector[Any]: if self._collector is None: raise Exception('Insufficiently specified') return self._collector
[docs] def run(self) -> None: super(Job, self).run() for after_func in self._after_funcs: after_func(self)
def _run(self) -> None: if self._generator is None: raise Exception('Insufficiently specified, generator missing') if self._processor is None: raise Exception('Insufficiently specified, processor missing') if self._collector is None: raise Exception('Insufficiently specified, collector missing') with self._processor as processor, self._collector as collector: collector.collect(processor.process(self._runnable, self._generator))
[docs]def job( depends: List[ResolveKey] = [], master: Optional[Master] = None, ) -> Callable[[Callable[[Job.Constructor], None]], Callable[[], None]]: def decorator(f: Callable[[Job.Constructor], None]) -> Callable[[], None]: j = Job(f, depends=depends) nonlocal master if master is None: master = global_master @functools.wraps(f) def wrapper() -> None: j.run() master.register(j, wrapper) return wrapper return decorator