check for run() call on finished recipe
This commit is contained in:
parent
e201df8eda
commit
636e17d89c
@ -8,8 +8,14 @@ import filters, emitters, sources, utils
|
||||
class Recipe(object):
|
||||
|
||||
def __init__(self, *filter_args, **kwargs):
|
||||
self._filter_args = filter_args
|
||||
self.rejected = []
|
||||
self.finished = False
|
||||
|
||||
self.filters = []
|
||||
for filter in filter_args:
|
||||
if hasattr(filter, 'filters'):
|
||||
self.filters.extend(filter.filters)
|
||||
else:
|
||||
self.filters.append(filter)
|
||||
|
||||
self.error_stream = kwargs.get('error_stream')
|
||||
if self.error_stream and not isinstance(self.error_stream, Recipe):
|
||||
@ -21,49 +27,46 @@ class Recipe(object):
|
||||
raise ValueError('error_stream must be either a filter'
|
||||
' or an iterable of filters')
|
||||
|
||||
def get_filters(self):
|
||||
filters = []
|
||||
|
||||
for filter_ in self._filter_args:
|
||||
# check to see if this is a filter or a recipe
|
||||
if hasattr(filter_, 'get_filters'):
|
||||
# load filters from child recipe
|
||||
filters.extend(filter_.get_filters())
|
||||
else:
|
||||
filters.append(filter_)
|
||||
|
||||
return filters
|
||||
|
||||
def reject_record(self, record, exception):
|
||||
if self.error_stream:
|
||||
self.error_stream.run([{'record': record,
|
||||
'exception': repr(exception)}])
|
||||
|
||||
def run(self, source):
|
||||
# load filters
|
||||
filters = self.get_filters()
|
||||
if self.finished:
|
||||
raise ValueError('run() called on finished recipe')
|
||||
|
||||
# connect datapath
|
||||
data = source
|
||||
for filter_ in filters:
|
||||
for filter_ in self.filters:
|
||||
data = filter_.attach(data, recipe=self)
|
||||
|
||||
# actually run the data through (causes iterators to actually be called)
|
||||
for record in data:
|
||||
pass
|
||||
|
||||
def done(self):
|
||||
if self.finished:
|
||||
raise ValueError('done() called on finished recipe')
|
||||
|
||||
self.finished = True
|
||||
|
||||
if self.error_stream:
|
||||
self.error_stream.done()
|
||||
|
||||
# try and call done() on all filters
|
||||
for filter_ in filters:
|
||||
for filter_ in self.filters:
|
||||
try:
|
||||
filter_.done()
|
||||
except AttributeError:
|
||||
pass # don't care if there isn't a done method
|
||||
|
||||
|
||||
def run_recipe(source, *filter_args):
|
||||
def run_recipe(source, *filter_args, **kwargs):
|
||||
""" Process data, taking it from a source and applying any number of filters
|
||||
"""
|
||||
|
||||
r = Recipe(*filter_args)
|
||||
r = Recipe(*filter_args, **kwargs)
|
||||
r.run(source)
|
||||
r.done()
|
||||
return r
|
||||
|
@ -27,6 +27,15 @@ class RecipeTestCase(unittest.TestCase):
|
||||
self.assertEqual(saver.saved[0]['record'], {'a': 1})
|
||||
self.assertEqual(saver.saved[1]['record'], {'b': 2})
|
||||
|
||||
def test_done(self):
|
||||
saver = Saver()
|
||||
recipe = Recipe(saver)
|
||||
recipe.run([1])
|
||||
recipe.done()
|
||||
|
||||
self.assertRaises(ValueError, recipe.run, [2])
|
||||
self.assertEqual(saver.saved, [1])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user