diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py index 6c1ae84..b1d4040 100644 --- a/saucebrush/__init__.py +++ b/saucebrush/__init__.py @@ -4,25 +4,38 @@ import filters, emitters, sources, utils +class Recipe(filters.Filter): + + def __init__(self, *filter_args): + self._filter_args = filter_args + self._rejected = [] + + def process_record(self, record): + self.run(source=iter((record,))) + + def run(self, source): + + # connect datapath + data = source + for filter_ in self._filter_args: + data = filter_(self, data) + + # actually run the data through (causes iterators to actually be called) + for record in data: + pass + + # try and call done() on all filters + for filter_ in self._filter_args: + try: + filter_.done() + except AttributeError: + pass # don't care if there isn't a done method + def run_recipe(source, *filter_args): """ Process data, taking it from a source and applying any number of filters """ - # connect datapath - data = source - for filter_ in filter_args: - data = filter_(data) - - # actually run the data through (causes iterators to actually be called) - for record in data: - pass - - # try and call done() on all filters - for filter_ in filter_args: - try: - filter_.done() - except AttributeError: - pass # don't care if there isn't a done method + Recipe(*filter_args).run(source) # experiment with threading - do not use diff --git a/saucebrush/filters.py b/saucebrush/filters.py index 26d8cff..fa6dd7f 100644 --- a/saucebrush/filters.py +++ b/saucebrush/filters.py @@ -33,9 +33,11 @@ class Filter(object): raise NotImplementedError('process_record not defined in ' + self.__class__.__name__) - def __call__(self, source): + def __call__(self, recipe, source): for record in source: - yield self.process_record(record) + result = self.process_record(record) + if not result is None: + yield result class YieldFilter(Filter):