add recipe object
This commit is contained in:
parent
a0b206e711
commit
6f969ac3d4
@ -4,26 +4,39 @@
|
||||
|
||||
import filters, emitters, sources, utils
|
||||
|
||||
def run_recipe(source, *filter_args):
|
||||
""" Process data, taking it from a source and applying any number of filters
|
||||
"""
|
||||
class Recipe(filters.Filter):
|
||||
|
||||
def __init__(self, *filter_args):
|
||||
self._filter_args = filter_args
|
||||
self._rejected = []
|
||||
|
||||
def process_record(self, record):
|
||||
self.run(source=iter((record,)))
|
||||
|
||||
def run(self, source):
|
||||
|
||||
# connect datapath
|
||||
data = source
|
||||
for filter_ in filter_args:
|
||||
data = filter_(data)
|
||||
for filter_ in self._filter_args:
|
||||
data = filter_(self, data)
|
||||
|
||||
# actually run the data through (causes iterators to actually be called)
|
||||
for record in data:
|
||||
pass
|
||||
|
||||
# try and call done() on all filters
|
||||
for filter_ in filter_args:
|
||||
for filter_ in self._filter_args:
|
||||
try:
|
||||
filter_.done()
|
||||
except AttributeError:
|
||||
pass # don't care if there isn't a done method
|
||||
|
||||
def run_recipe(source, *filter_args):
|
||||
""" Process data, taking it from a source and applying any number of filters
|
||||
"""
|
||||
|
||||
Recipe(*filter_args).run(source)
|
||||
|
||||
|
||||
# experiment with threading - do not use
|
||||
|
||||
|
@ -33,9 +33,11 @@ class Filter(object):
|
||||
raise NotImplementedError('process_record not defined in ' +
|
||||
self.__class__.__name__)
|
||||
|
||||
def __call__(self, source):
|
||||
def __call__(self, recipe, source):
|
||||
for record in source:
|
||||
yield self.process_record(record)
|
||||
result = self.process_record(record)
|
||||
if not result is None:
|
||||
yield result
|
||||
|
||||
|
||||
class YieldFilter(Filter):
|
||||
|
Loading…
Reference in New Issue
Block a user