added basic error stream support
This commit is contained in:
parent
c5770668e0
commit
54a99d4d05
@ -4,20 +4,28 @@
|
||||
|
||||
import filters, emitters, sources, utils
|
||||
|
||||
class Recipe(filters.Filter):
|
||||
|
||||
def __init__(self, *filter_args):
|
||||
class Recipe(object):
|
||||
|
||||
def __init__(self, *filter_args, **kwargs):
|
||||
self._filter_args = filter_args
|
||||
self.rejected = []
|
||||
|
||||
def get_filters(self):
|
||||
self.error_stream = kwargs.get('error_stream')
|
||||
if self.error_stream and not isinstance(self.error_stream, Recipe):
|
||||
if isinstance(self.error_stream, filters.Filter):
|
||||
self.error_stream = Recipe(self.error_stream)
|
||||
elif hasattr(self.error_stream, '__iter__'):
|
||||
self.error_stream = Recipe(*self.error_stream)
|
||||
else:
|
||||
raise ValueError('error_stream must be either a filter'
|
||||
' or an iterable of filters')
|
||||
|
||||
def get_filters(self):
|
||||
filters = []
|
||||
|
||||
for filter_ in self._filter_args:
|
||||
|
||||
# check to see if this is a filter or a recipe
|
||||
|
||||
if hasattr(filter_, 'get_filters'):
|
||||
# load filters from child recipe
|
||||
filters.extend(filter_.get_filters())
|
||||
@ -26,11 +34,11 @@ class Recipe(filters.Filter):
|
||||
|
||||
return filters
|
||||
|
||||
def reject_record(self):
|
||||
self.rejected.append((record, message))
|
||||
def reject_record(self, record, message):
|
||||
if self.error_stream:
|
||||
self.error_stream.run([record])
|
||||
|
||||
def run(self, source):
|
||||
|
||||
# load filters
|
||||
filters = self.get_filters()
|
||||
|
||||
@ -50,6 +58,7 @@ class Recipe(filters.Filter):
|
||||
except AttributeError:
|
||||
pass # don't care if there isn't a done method
|
||||
|
||||
|
||||
def run_recipe(source, *filter_args):
|
||||
""" Process data, taking it from a source and applying any number of filters
|
||||
"""
|
||||
|
@ -38,9 +38,12 @@ class Filter(object):
|
||||
def attach(self, source, recipe=None):
|
||||
self._recipe = recipe
|
||||
for record in source:
|
||||
result = self.process_record(record)
|
||||
if result is not None:
|
||||
yield result
|
||||
try:
|
||||
result = self.process_record(record)
|
||||
if result is not None:
|
||||
yield result
|
||||
except Exception as e:
|
||||
self.reject_record(record, unicode(e))
|
||||
|
||||
|
||||
class YieldFilter(Filter):
|
||||
@ -54,8 +57,11 @@ class YieldFilter(Filter):
|
||||
def attach(self, source, recipe=None):
|
||||
self._recipe = recipe
|
||||
for record in source:
|
||||
for result in self.process_record(record):
|
||||
yield result
|
||||
try:
|
||||
for result in self.process_record(record):
|
||||
yield result
|
||||
except Exception as e:
|
||||
self.reject_record(record, unicode(e))
|
||||
|
||||
|
||||
class FieldFilter(Filter):
|
||||
|
@ -2,10 +2,12 @@ import unittest
|
||||
from saucebrush.tests.filters import FilterTestCase
|
||||
from saucebrush.tests.sources import SourceTestCase
|
||||
from saucebrush.tests.emitters import EmitterTestCase
|
||||
from saucebrush.tests.recipes import RecipeTestCase
|
||||
|
||||
filter_suite = unittest.TestLoader().loadTestsFromTestCase(FilterTestCase)
|
||||
source_suite = unittest.TestLoader().loadTestsFromTestCase(SourceTestCase)
|
||||
emitter_suite = unittest.TestLoader().loadTestsFromTestCase(EmitterTestCase)
|
||||
recipe_suite = unittest.TestLoader().loadTestsFromTestCase(RecipeTestCase)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
30
saucebrush/tests/recipes.py
Normal file
30
saucebrush/tests/recipes.py
Normal file
@ -0,0 +1,30 @@
|
||||
import doctest
|
||||
import unittest
|
||||
from saucebrush import Recipe
|
||||
from saucebrush.filters import Filter
|
||||
|
||||
|
||||
class Raiser(Filter):
|
||||
def process_record(self, record):
|
||||
raise Exception("bad record")
|
||||
|
||||
|
||||
class Saver(Filter):
|
||||
def __init__(self):
|
||||
self.saved = []
|
||||
|
||||
def process_record(self, record):
|
||||
self.saved.append(record)
|
||||
return record
|
||||
|
||||
|
||||
class RecipeTestCase(unittest.TestCase):
|
||||
def test_error_stream(self):
|
||||
saver = Saver()
|
||||
recipe = Recipe(Raiser(), error_stream=saver)
|
||||
recipe.run([{'a': 1}, {'b': 2}])
|
||||
self.assertEqual(saver.saved, [{'a': 1}, {'b': 2}])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user