add support for chaining and nesting of recipes
This commit is contained in:
parent
6f969ac3d4
commit
68cbbdcb54
@ -8,16 +8,25 @@ class Recipe(filters.Filter):
|
|||||||
|
|
||||||
def __init__(self, *filter_args):
|
def __init__(self, *filter_args):
|
||||||
self._filter_args = filter_args
|
self._filter_args = filter_args
|
||||||
self._rejected = []
|
self.rejected = []
|
||||||
|
|
||||||
def process_record(self, record):
|
def get_filters(self):
|
||||||
self.run(source=iter((record,)))
|
filters = []
|
||||||
|
for filter_ in self._filter_args:
|
||||||
|
if hasattr(filter_, 'get_filters'):
|
||||||
|
filters.extend(filter_.get_filters())
|
||||||
|
else:
|
||||||
|
filters.append(filter_)
|
||||||
|
return filters
|
||||||
|
|
||||||
def run(self, source):
|
def run(self, source):
|
||||||
|
|
||||||
|
# load filters
|
||||||
|
filters = self.get_filters()
|
||||||
|
|
||||||
# connect datapath
|
# connect datapath
|
||||||
data = source
|
data = source
|
||||||
for filter_ in self._filter_args:
|
for filter_ in filters:
|
||||||
data = filter_(self, data)
|
data = filter_(self, data)
|
||||||
|
|
||||||
# actually run the data through (causes iterators to actually be called)
|
# actually run the data through (causes iterators to actually be called)
|
||||||
@ -25,17 +34,19 @@ class Recipe(filters.Filter):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
# try and call done() on all filters
|
# try and call done() on all filters
|
||||||
for filter_ in self._filter_args:
|
for filter_ in filters:
|
||||||
try:
|
try:
|
||||||
filter_.done()
|
filter_.done()
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass # don't care if there isn't a done method
|
pass # don't care if there isn't a done method
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
def run_recipe(source, *filter_args):
|
def run_recipe(source, *filter_args):
|
||||||
""" Process data, taking it from a source and applying any number of filters
|
""" Process data, taking it from a source and applying any number of filters
|
||||||
"""
|
"""
|
||||||
|
|
||||||
Recipe(*filter_args).run(source)
|
return Recipe(*filter_args).run(source)
|
||||||
|
|
||||||
|
|
||||||
# experiment with threading - do not use
|
# experiment with threading - do not use
|
||||||
|
@ -33,7 +33,12 @@ class Filter(object):
|
|||||||
raise NotImplementedError('process_record not defined in ' +
|
raise NotImplementedError('process_record not defined in ' +
|
||||||
self.__class__.__name__)
|
self.__class__.__name__)
|
||||||
|
|
||||||
|
def reject_record(self, record, message):
|
||||||
|
if hasattr(self, '_recipe'):
|
||||||
|
self._recipe.rejected.append((record, message))
|
||||||
|
|
||||||
def __call__(self, recipe, source):
|
def __call__(self, recipe, source):
|
||||||
|
self._recipe = recipe
|
||||||
for record in source:
|
for record in source:
|
||||||
result = self.process_record(record)
|
result = self.process_record(record)
|
||||||
if not result is None:
|
if not result is None:
|
||||||
|
Loading…
Reference in New Issue
Block a user