From 54a99d4d05776a60f6a446cbf305da181ca5e653 Mon Sep 17 00:00:00 2001 From: Michael Stephens Date: Wed, 30 Jun 2010 14:13:31 -0400 Subject: [PATCH] added basic error stream support --- saucebrush/__init__.py | 25 +++++++++++++++++-------- saucebrush/filters.py | 16 +++++++++++----- saucebrush/tests/__init__.py | 2 ++ saucebrush/tests/recipes.py | 30 ++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 saucebrush/tests/recipes.py diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py index 767766d..87bfd56 100644 --- a/saucebrush/__init__.py +++ b/saucebrush/__init__.py @@ -4,20 +4,28 @@ import filters, emitters, sources, utils -class Recipe(filters.Filter): - def __init__(self, *filter_args): +class Recipe(object): + + def __init__(self, *filter_args, **kwargs): self._filter_args = filter_args self.rejected = [] - def get_filters(self): + self.error_stream = kwargs.get('error_stream') + if self.error_stream and not isinstance(self.error_stream, Recipe): + if isinstance(self.error_stream, filters.Filter): + self.error_stream = Recipe(self.error_stream) + elif hasattr(self.error_stream, '__iter__'): + self.error_stream = Recipe(*self.error_stream) + else: + raise ValueError('error_stream must be either a filter' + ' or an iterable of filters') + def get_filters(self): filters = [] for filter_ in self._filter_args: - # check to see if this is a filter or a recipe - if hasattr(filter_, 'get_filters'): # load filters from child recipe filters.extend(filter_.get_filters()) @@ -26,11 +34,11 @@ class Recipe(filters.Filter): return filters - def reject_record(self): - self.rejected.append((record, message)) + def reject_record(self, record, message): + if self.error_stream: + self.error_stream.run([record]) def run(self, source): - # load filters filters = self.get_filters() @@ -50,6 +58,7 @@ class Recipe(filters.Filter): except AttributeError: pass # don't care if there isn't a done method + def run_recipe(source, *filter_args): """ Process data, taking it from a source and applying any number of filters """ diff --git a/saucebrush/filters.py b/saucebrush/filters.py index d7e3b2a..143c41b 100644 --- a/saucebrush/filters.py +++ b/saucebrush/filters.py @@ -38,9 +38,12 @@ class Filter(object): def attach(self, source, recipe=None): self._recipe = recipe for record in source: - result = self.process_record(record) - if result is not None: - yield result + try: + result = self.process_record(record) + if result is not None: + yield result + except Exception as e: + self.reject_record(record, unicode(e)) class YieldFilter(Filter): @@ -54,8 +57,11 @@ class YieldFilter(Filter): def attach(self, source, recipe=None): self._recipe = recipe for record in source: - for result in self.process_record(record): - yield result + try: + for result in self.process_record(record): + yield result + except Exception as e: + self.reject_record(record, unicode(e)) class FieldFilter(Filter): diff --git a/saucebrush/tests/__init__.py b/saucebrush/tests/__init__.py index 1a891c9..9b8c511 100644 --- a/saucebrush/tests/__init__.py +++ b/saucebrush/tests/__init__.py @@ -2,10 +2,12 @@ import unittest from saucebrush.tests.filters import FilterTestCase from saucebrush.tests.sources import SourceTestCase from saucebrush.tests.emitters import EmitterTestCase +from saucebrush.tests.recipes import RecipeTestCase filter_suite = unittest.TestLoader().loadTestsFromTestCase(FilterTestCase) source_suite = unittest.TestLoader().loadTestsFromTestCase(SourceTestCase) emitter_suite = unittest.TestLoader().loadTestsFromTestCase(EmitterTestCase) +recipe_suite = unittest.TestLoader().loadTestsFromTestCase(RecipeTestCase) if __name__ == '__main__': unittest.main() diff --git a/saucebrush/tests/recipes.py b/saucebrush/tests/recipes.py new file mode 100644 index 0000000..8b9ecea --- /dev/null +++ b/saucebrush/tests/recipes.py @@ -0,0 +1,30 @@ +import doctest +import unittest +from saucebrush import Recipe +from saucebrush.filters import Filter + + +class Raiser(Filter): + def process_record(self, record): + raise Exception("bad record") + + +class Saver(Filter): + def __init__(self): + self.saved = [] + + def process_record(self, record): + self.saved.append(record) + return record + + +class RecipeTestCase(unittest.TestCase): + def test_error_stream(self): + saver = Saver() + recipe = Recipe(Raiser(), error_stream=saver) + recipe.run([{'a': 1}, {'b': 2}]) + self.assertEqual(saver.saved, [{'a': 1}, {'b': 2}]) + + +if __name__ == '__main__': + unittest.main()