some cleanups discovered via test-splitter.py failure
* recipe is now a kwarg to Filter.__call__ * recipe.reject_record * Splitter works with iterables
This commit is contained in:
parent
293c7a0a78
commit
5421c12acd
@ -1,6 +1,7 @@
|
||||
from saucebrush.filters import Splitter, PhoneNumberCleaner, FieldMerger, FieldAdder
|
||||
from saucebrush.emitters import DebugEmitter
|
||||
import operator
|
||||
from itertools import count
|
||||
import saucebrush
|
||||
|
||||
data = [{'person': {'firstname': 'James', 'lastname': 'Turk'},
|
||||
@ -10,6 +11,6 @@ data = [{'person': {'firstname': 'James', 'lastname': 'Turk'},
|
||||
namemerger = FieldMerger({'name': ('firstname', 'lastname')}, lambda x,y: ' '.join((x,y)))
|
||||
phonecleaner = PhoneNumberCleaner(('phone',))
|
||||
splitter = Splitter({'person':[namemerger], 'phones':[phonecleaner]})
|
||||
ider = FieldAdder('id', [1,2,3,4,5])
|
||||
ider = FieldAdder('id', count())
|
||||
|
||||
saucebrush.run_recipe(data, [ider, splitter, DebugEmitter()])
|
||||
saucebrush.run_recipe(data, ider, splitter, DebugEmitter())
|
||||
|
@ -5,37 +5,39 @@
|
||||
import filters, emitters, sources, utils
|
||||
|
||||
class Recipe(filters.Filter):
|
||||
|
||||
|
||||
def __init__(self, *filter_args):
|
||||
self._filter_args = filter_args
|
||||
self.rejected = []
|
||||
|
||||
|
||||
def get_filters(self):
|
||||
|
||||
|
||||
filters = []
|
||||
|
||||
|
||||
for filter_ in self._filter_args:
|
||||
|
||||
|
||||
# check to see if this is a filter or a recipe
|
||||
|
||||
|
||||
if hasattr(filter_, 'get_filters'):
|
||||
# load filters from child recipe
|
||||
filters.extend(filter_.get_filters())
|
||||
|
||||
else:
|
||||
filters.append(filter_)
|
||||
|
||||
|
||||
return filters
|
||||
|
||||
|
||||
def reject_record(self):
|
||||
self.rejected.append((record, message))
|
||||
|
||||
def run(self, source):
|
||||
|
||||
|
||||
# load filters
|
||||
filters = self.get_filters()
|
||||
|
||||
|
||||
# connect datapath
|
||||
data = source
|
||||
for filter_ in filters:
|
||||
data = filter_(self, data)
|
||||
data = filter_(data, recipe=self)
|
||||
|
||||
# actually run the data through (causes iterators to actually be called)
|
||||
for record in data:
|
||||
@ -51,7 +53,7 @@ class Recipe(filters.Filter):
|
||||
def run_recipe(source, *filter_args):
|
||||
""" Process data, taking it from a source and applying any number of filters
|
||||
"""
|
||||
|
||||
|
||||
r = Recipe(*filter_args)
|
||||
r.run(source)
|
||||
return r
|
||||
|
@ -22,9 +22,6 @@ class Filter(object):
|
||||
takes a single record (python dictionary) and returns a result.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def process_record(self, record):
|
||||
""" Abstract method to be overridden.
|
||||
|
||||
@ -34,14 +31,15 @@ class Filter(object):
|
||||
self.__class__.__name__)
|
||||
|
||||
def reject_record(self, record, message):
|
||||
if hasattr(self, '_recipe'):
|
||||
self._recipe.rejected.append((record, message))
|
||||
recipe = getattr(self, '_recipe')
|
||||
if recipe:
|
||||
recipe.reject_record(record, message)
|
||||
|
||||
def __call__(self, recipe, source):
|
||||
def __call__(self, source, recipe=None):
|
||||
self._recipe = recipe
|
||||
for record in source:
|
||||
result = self.process_record(record)
|
||||
if not result is None:
|
||||
if result is not None:
|
||||
yield result
|
||||
|
||||
|
||||
@ -52,10 +50,8 @@ class YieldFilter(Filter):
|
||||
it is passed, it should yield back as many records as needed and the
|
||||
filter must derive from YieldFilter.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(YieldFilter, self).__init__()
|
||||
|
||||
def __call__(self, recipe, source):
|
||||
def __call__(self, source, recipe=None):
|
||||
self._recipe = recipe
|
||||
for record in source:
|
||||
for result in self.process_record(record):
|
||||
@ -207,6 +203,8 @@ class FieldAdder(Filter):
|
||||
super(FieldAdder, self).__init__()
|
||||
self._field_name = field_name
|
||||
self._field_value = field_value
|
||||
if hasattr(self._field_value, '__iter__'):
|
||||
self._field_value = iter(self._field_value).next
|
||||
self._replace = replace
|
||||
|
||||
def process_record(self, record):
|
||||
@ -291,7 +289,7 @@ class Splitter(Filter):
|
||||
# if a list or tuple, use __call__
|
||||
elif isinstance(subrecord, (list, tuple)):
|
||||
for filter_ in filters:
|
||||
subrecord = filter_(subrecord)
|
||||
subrecord = filter_(subrecord, recipe=self._recipe)
|
||||
subrecord = [r for r in subrecord] # unchain generators
|
||||
|
||||
# place back from whence it came
|
||||
|
Loading…
Reference in New Issue
Block a user