diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py index b1d4040..a00b15e 100644 --- a/saucebrush/__init__.py +++ b/saucebrush/__init__.py @@ -8,16 +8,25 @@ class Recipe(filters.Filter): def __init__(self, *filter_args): self._filter_args = filter_args - self._rejected = [] + self.rejected = [] - def process_record(self, record): - self.run(source=iter((record,))) + def get_filters(self): + filters = [] + for filter_ in self._filter_args: + if hasattr(filter_, 'get_filters'): + filters.extend(filter_.get_filters()) + else: + filters.append(filter_) + return filters def run(self, source): + # load filters + filters = self.get_filters() + # connect datapath data = source - for filter_ in self._filter_args: + for filter_ in filters: data = filter_(self, data) # actually run the data through (causes iterators to actually be called) @@ -25,17 +34,19 @@ class Recipe(filters.Filter): pass # try and call done() on all filters - for filter_ in self._filter_args: + for filter_ in filters: try: filter_.done() except AttributeError: pass # don't care if there isn't a done method + + return self def run_recipe(source, *filter_args): """ Process data, taking it from a source and applying any number of filters """ - Recipe(*filter_args).run(source) + return Recipe(*filter_args).run(source) # experiment with threading - do not use diff --git a/saucebrush/filters.py b/saucebrush/filters.py index fa6dd7f..ab7051e 100644 --- a/saucebrush/filters.py +++ b/saucebrush/filters.py @@ -33,7 +33,12 @@ class Filter(object): raise NotImplementedError('process_record not defined in ' + self.__class__.__name__) + def reject_record(self, record, message): + if hasattr(self, '_recipe'): + self._recipe.rejected.append((record, message)) + def __call__(self, recipe, source): + self._recipe = recipe for record in source: result = self.process_record(record) if not result is None: