diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py index 5abf8b5..7df97e4 100644 --- a/saucebrush/__init__.py +++ b/saucebrush/__init__.py @@ -8,8 +8,14 @@ import filters, emitters, sources, utils class Recipe(object): def __init__(self, *filter_args, **kwargs): - self._filter_args = filter_args - self.rejected = [] + self.finished = False + + self.filters = [] + for filter in filter_args: + if hasattr(filter, 'filters'): + self.filters.extend(filter.filters) + else: + self.filters.append(filter) self.error_stream = kwargs.get('error_stream') if self.error_stream and not isinstance(self.error_stream, Recipe): @@ -21,49 +27,46 @@ class Recipe(object): raise ValueError('error_stream must be either a filter' ' or an iterable of filters') - def get_filters(self): - filters = [] - - for filter_ in self._filter_args: - # check to see if this is a filter or a recipe - if hasattr(filter_, 'get_filters'): - # load filters from child recipe - filters.extend(filter_.get_filters()) - else: - filters.append(filter_) - - return filters - def reject_record(self, record, exception): if self.error_stream: self.error_stream.run([{'record': record, 'exception': repr(exception)}]) def run(self, source): - # load filters - filters = self.get_filters() + if self.finished: + raise ValueError('run() called on finished recipe') # connect datapath data = source - for filter_ in filters: + for filter_ in self.filters: data = filter_.attach(data, recipe=self) # actually run the data through (causes iterators to actually be called) for record in data: pass + def done(self): + if self.finished: + raise ValueError('done() called on finished recipe') + + self.finished = True + + if self.error_stream: + self.error_stream.done() + # try and call done() on all filters - for filter_ in filters: + for filter_ in self.filters: try: filter_.done() except AttributeError: pass # don't care if there isn't a done method -def run_recipe(source, *filter_args): +def run_recipe(source, *filter_args, **kwargs): """ Process data, taking it from a source and applying any number of filters """ - r = Recipe(*filter_args) + r = Recipe(*filter_args, **kwargs) r.run(source) + r.done() return r diff --git a/saucebrush/tests/recipes.py b/saucebrush/tests/recipes.py index f996fe2..b40f18d 100644 --- a/saucebrush/tests/recipes.py +++ b/saucebrush/tests/recipes.py @@ -27,6 +27,15 @@ class RecipeTestCase(unittest.TestCase): self.assertEqual(saver.saved[0]['record'], {'a': 1}) self.assertEqual(saver.saved[1]['record'], {'b': 2}) + def test_done(self): + saver = Saver() + recipe = Recipe(saver) + recipe.run([1]) + recipe.done() + + self.assertRaises(ValueError, recipe.run, [2]) + self.assertEqual(saver.saved, [1]) + if __name__ == '__main__': unittest.main()