356 lines
10 KiB
Python
356 lines
10 KiB
Python
import unittest
|
|
import types
|
|
from saucebrush.filters import (
|
|
Filter,
|
|
YieldFilter,
|
|
FieldFilter,
|
|
SubrecordFilter,
|
|
ConditionalPathFilter,
|
|
ConditionalFilter,
|
|
FieldModifier,
|
|
FieldKeeper,
|
|
FieldRemover,
|
|
FieldMerger,
|
|
FieldAdder,
|
|
FieldCopier,
|
|
FieldRenamer,
|
|
Unique,
|
|
)
|
|
|
|
|
|
class DummyRecipe:
|
|
rejected_record = None
|
|
rejected_msg = None
|
|
|
|
def reject_record(self, record, msg):
|
|
self.rejected_record = record
|
|
self.rejected_msg = msg
|
|
|
|
|
|
class Doubler(Filter):
|
|
def process_record(self, record):
|
|
return record * 2
|
|
|
|
|
|
class OddRemover(Filter):
|
|
def process_record(self, record):
|
|
if record % 2 == 0:
|
|
return record
|
|
else:
|
|
return None # explicitly return None
|
|
|
|
|
|
class ListFlattener(YieldFilter):
|
|
def process_record(self, record):
|
|
for item in record:
|
|
yield item
|
|
|
|
|
|
class FieldDoubler(FieldFilter):
|
|
def process_field(self, item):
|
|
return item * 2
|
|
|
|
|
|
class NonModifyingFieldDoubler(Filter):
|
|
def __init__(self, key):
|
|
self.key = key
|
|
|
|
def process_record(self, record):
|
|
record = dict(record)
|
|
record[self.key] *= 2
|
|
return record
|
|
|
|
|
|
class ConditionalOddRemover(ConditionalFilter):
|
|
def test_record(self, record):
|
|
# return True for even values
|
|
return record % 2 == 0
|
|
|
|
|
|
class FilterTestCase(unittest.TestCase):
|
|
def _simple_data(self):
|
|
return [
|
|
{"a": 1, "b": 2, "c": 3},
|
|
{"a": 5, "b": 5, "c": 5},
|
|
{"a": 1, "b": 10, "c": 100},
|
|
]
|
|
|
|
def assert_filter_result(self, filter_obj, expected_data):
|
|
result = filter_obj.attach(self._simple_data())
|
|
self.assertEqual(list(result), expected_data)
|
|
|
|
def test_reject_record(self):
|
|
recipe = DummyRecipe()
|
|
f = Doubler()
|
|
result = f.attach([1, 2, 3], recipe=recipe)
|
|
# next has to be called for attach to take effect
|
|
next(result)
|
|
f.reject_record("bad", "this one was bad")
|
|
|
|
# ensure that the rejection propagated to the recipe
|
|
self.assertEqual("bad", recipe.rejected_record)
|
|
self.assertEqual("this one was bad", recipe.rejected_msg)
|
|
|
|
def test_simple_filter(self):
|
|
df = Doubler()
|
|
result = df.attach([1, 2, 3])
|
|
|
|
# ensure we got a generator that yields 2,4,6
|
|
self.assertEqual(type(result), types.GeneratorType)
|
|
self.assertEqual(list(result), [2, 4, 6])
|
|
|
|
def test_simple_filter_return_none(self):
|
|
cf = OddRemover()
|
|
result = cf.attach(range(10))
|
|
|
|
# ensure only even numbers remain
|
|
self.assertEqual(list(result), [0, 2, 4, 6, 8])
|
|
|
|
def test_simple_yield_filter(self):
|
|
lf = ListFlattener()
|
|
result = lf.attach([[1], [2, 3], [4, 5, 6]])
|
|
|
|
# ensure we got a generator that yields 1,2,3,4,5,6
|
|
self.assertEqual(type(result), types.GeneratorType)
|
|
self.assertEqual(list(result), [1, 2, 3, 4, 5, 6])
|
|
|
|
def test_simple_field_filter(self):
|
|
ff = FieldDoubler(["a", "c"])
|
|
|
|
# check against expected data
|
|
expected_data = [
|
|
{"a": 2, "b": 2, "c": 6},
|
|
{"a": 10, "b": 5, "c": 10},
|
|
{"a": 2, "b": 10, "c": 200},
|
|
]
|
|
self.assert_filter_result(ff, expected_data)
|
|
|
|
def test_conditional_filter(self):
|
|
cf = ConditionalOddRemover()
|
|
result = cf.attach(range(10))
|
|
|
|
# ensure only even numbers remain
|
|
self.assertEqual(list(result), [0, 2, 4, 6, 8])
|
|
|
|
# Tests for Subrecord
|
|
|
|
def test_subrecord_filter_list(self):
|
|
data = [
|
|
{"a": [{"b": 2}, {"b": 4}]},
|
|
{"a": [{"b": 5}]},
|
|
{"a": [{"b": 8}, {"b": 2}, {"b": 1}]},
|
|
]
|
|
|
|
expected = [
|
|
{"a": [{"b": 4}, {"b": 8}]},
|
|
{"a": [{"b": 10}]},
|
|
{"a": [{"b": 16}, {"b": 4}, {"b": 2}]},
|
|
]
|
|
|
|
sf = SubrecordFilter("a", NonModifyingFieldDoubler("b"))
|
|
result = sf.attach(data)
|
|
|
|
self.assertEqual(list(result), expected)
|
|
|
|
def test_subrecord_filter_deep(self):
|
|
data = [
|
|
{"a": {"d": [{"b": 2}, {"b": 4}]}},
|
|
{"a": {"d": [{"b": 5}]}},
|
|
{"a": {"d": [{"b": 8}, {"b": 2}, {"b": 1}]}},
|
|
]
|
|
|
|
expected = [
|
|
{"a": {"d": [{"b": 4}, {"b": 8}]}},
|
|
{"a": {"d": [{"b": 10}]}},
|
|
{"a": {"d": [{"b": 16}, {"b": 4}, {"b": 2}]}},
|
|
]
|
|
|
|
sf = SubrecordFilter("a.d", NonModifyingFieldDoubler("b"))
|
|
result = sf.attach(data)
|
|
|
|
self.assertEqual(list(result), expected)
|
|
|
|
def test_subrecord_filter_nonlist(self):
|
|
data = [
|
|
{"a": {"b": {"c": 1}}},
|
|
{"a": {"b": {"c": 2}}},
|
|
{"a": {"b": {"c": 3}}},
|
|
]
|
|
|
|
expected = [
|
|
{"a": {"b": {"c": 2}}},
|
|
{"a": {"b": {"c": 4}}},
|
|
{"a": {"b": {"c": 6}}},
|
|
]
|
|
|
|
sf = SubrecordFilter("a.b", NonModifyingFieldDoubler("c"))
|
|
result = sf.attach(data)
|
|
|
|
self.assertEqual(list(result), expected)
|
|
|
|
def test_subrecord_filter_list_in_path(self):
|
|
data = [
|
|
{"a": [{"b": {"c": 5}}, {"b": {"c": 6}}]},
|
|
{"a": [{"b": {"c": 1}}, {"b": {"c": 2}}, {"b": {"c": 3}}]},
|
|
{"a": [{"b": {"c": 2}}]},
|
|
]
|
|
|
|
expected = [
|
|
{"a": [{"b": {"c": 10}}, {"b": {"c": 12}}]},
|
|
{"a": [{"b": {"c": 2}}, {"b": {"c": 4}}, {"b": {"c": 6}}]},
|
|
{"a": [{"b": {"c": 4}}]},
|
|
]
|
|
|
|
sf = SubrecordFilter("a.b", NonModifyingFieldDoubler("c"))
|
|
result = sf.attach(data)
|
|
|
|
self.assertEqual(list(result), expected)
|
|
|
|
def test_conditional_path(self):
|
|
|
|
predicate = lambda r: r["a"] == 1 # noqa
|
|
|
|
# double b if a == 1, otherwise double c
|
|
cpf = ConditionalPathFilter(predicate, FieldDoubler("b"), FieldDoubler("c"))
|
|
expected_data = [
|
|
{"a": 1, "b": 4, "c": 3},
|
|
{"a": 5, "b": 5, "c": 10},
|
|
{"a": 1, "b": 20, "c": 100},
|
|
]
|
|
|
|
self.assert_filter_result(cpf, expected_data)
|
|
|
|
# Tests for Generic Filters
|
|
|
|
def test_field_modifier(self):
|
|
# another version of FieldDoubler
|
|
fm = FieldModifier(["a", "c"], lambda x: x * 2)
|
|
|
|
# check against expected data
|
|
expected_data = [
|
|
{"a": 2, "b": 2, "c": 6},
|
|
{"a": 10, "b": 5, "c": 10},
|
|
{"a": 2, "b": 10, "c": 200},
|
|
]
|
|
self.assert_filter_result(fm, expected_data)
|
|
|
|
def test_field_keeper(self):
|
|
fk = FieldKeeper(["c"])
|
|
|
|
# check against expected results
|
|
expected_data = [{"c": 3}, {"c": 5}, {"c": 100}]
|
|
self.assert_filter_result(fk, expected_data)
|
|
|
|
def test_field_remover(self):
|
|
fr = FieldRemover(["a", "b"])
|
|
|
|
# check against expected results
|
|
expected_data = [{"c": 3}, {"c": 5}, {"c": 100}]
|
|
self.assert_filter_result(fr, expected_data)
|
|
|
|
def test_field_merger(self):
|
|
fm = FieldMerger({"sum": ("a", "b", "c")}, lambda x, y, z: x + y + z)
|
|
|
|
# check against expected results
|
|
expected_data = [{"sum": 6}, {"sum": 15}, {"sum": 111}]
|
|
self.assert_filter_result(fm, expected_data)
|
|
|
|
def test_field_merger_keep_fields(self):
|
|
fm = FieldMerger(
|
|
{"sum": ("a", "b", "c")}, lambda x, y, z: x + y + z, keep_fields=True
|
|
)
|
|
|
|
# check against expected results
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3, "sum": 6},
|
|
{"a": 5, "b": 5, "c": 5, "sum": 15},
|
|
{"a": 1, "b": 10, "c": 100, "sum": 111},
|
|
]
|
|
self.assert_filter_result(fm, expected_data)
|
|
|
|
def test_field_adder_scalar(self):
|
|
fa = FieldAdder("x", 7)
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3, "x": 7},
|
|
{"a": 5, "b": 5, "c": 5, "x": 7},
|
|
{"a": 1, "b": 10, "c": 100, "x": 7},
|
|
]
|
|
self.assert_filter_result(fa, expected_data)
|
|
|
|
def test_field_adder_callable(self):
|
|
fa = FieldAdder("x", lambda: 7)
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3, "x": 7},
|
|
{"a": 5, "b": 5, "c": 5, "x": 7},
|
|
{"a": 1, "b": 10, "c": 100, "x": 7},
|
|
]
|
|
self.assert_filter_result(fa, expected_data)
|
|
|
|
def test_field_adder_iterable(self):
|
|
fa = FieldAdder("x", [1, 2, 3])
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3, "x": 1},
|
|
{"a": 5, "b": 5, "c": 5, "x": 2},
|
|
{"a": 1, "b": 10, "c": 100, "x": 3},
|
|
]
|
|
self.assert_filter_result(fa, expected_data)
|
|
|
|
def test_field_adder_replace(self):
|
|
fa = FieldAdder("b", lambda: 7)
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 7, "c": 3},
|
|
{"a": 5, "b": 7, "c": 5},
|
|
{"a": 1, "b": 7, "c": 100},
|
|
]
|
|
self.assert_filter_result(fa, expected_data)
|
|
|
|
def test_field_adder_no_replace(self):
|
|
fa = FieldAdder("b", lambda: 7, replace=False)
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3},
|
|
{"a": 5, "b": 5, "c": 5},
|
|
{"a": 1, "b": 10, "c": 100},
|
|
]
|
|
self.assert_filter_result(fa, expected_data)
|
|
|
|
def test_field_copier(self):
|
|
fc = FieldCopier({"a2": "a", "b2": "b"})
|
|
|
|
expected_data = [
|
|
{"a": 1, "b": 2, "c": 3, "a2": 1, "b2": 2},
|
|
{"a": 5, "b": 5, "c": 5, "a2": 5, "b2": 5},
|
|
{"a": 1, "b": 10, "c": 100, "a2": 1, "b2": 10},
|
|
]
|
|
self.assert_filter_result(fc, expected_data)
|
|
|
|
def test_field_renamer(self):
|
|
fr = FieldRenamer({"x": "a", "y": "b"})
|
|
|
|
expected_data = [
|
|
{"x": 1, "y": 2, "c": 3},
|
|
{"x": 5, "y": 5, "c": 5},
|
|
{"x": 1, "y": 10, "c": 100},
|
|
]
|
|
self.assert_filter_result(fr, expected_data)
|
|
|
|
# TODO: splitter & flattner tests?
|
|
|
|
def test_unique_filter(self):
|
|
u = Unique()
|
|
in_data = [{"a": 77}, {"a": 33}, {"a": 77}]
|
|
expected_data = [{"a": 77}, {"a": 33}]
|
|
result = u.attach(in_data)
|
|
|
|
self.assertEqual(list(result), expected_data)
|
|
|
|
# TODO: unicode & string filter tests
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|