saucebrush/saucebrush/filters.py

476 lines
16 KiB
Python
Raw Normal View History

2008-10-27 19:13:50 +00:00
"""
Saucebrush Filters are callables that take a Saucebrush source and yield
back filtered records.
The Filter, YieldFilter, and FieldFilter abstract base types are provided
for convenience. Derived classes only need to implement process_record
(or process_field for FieldFilter).
"""
2008-11-04 17:20:10 +00:00
from saucebrush import utils
import re
2008-11-13 21:31:42 +00:00
import time
2008-10-27 19:13:50 +00:00
######################
## Abstract Filters ##
######################
class Filter(object):
""" ABC for filters that operate on records.
All derived filters must provide a process_record(self, record) that
takes a single record (python dictionary) and returns a result.
"""
def __init__(self):
pass
def process_record(self, record):
""" Abstract method to be overridden.
Called with a single record, should return modified record.
"""
raise NotImplementedError('process_record not defined in ' +
self.__class__.__name__)
def reject_record(self, record, message):
if hasattr(self, '_recipe'):
self._recipe.rejected.append((record, message))
2009-06-26 21:54:57 +00:00
def __call__(self, recipe, source):
self._recipe = recipe
2008-10-27 19:13:50 +00:00
for record in source:
2009-06-26 21:54:57 +00:00
result = self.process_record(record)
if not result is None:
yield result
2008-10-27 19:13:50 +00:00
class YieldFilter(Filter):
""" ABC for defining filters where process_record yields.
If process_record cannot return exactly one result for every record
it is passed, it should yield back as many records as needed and the
filter must derive from YieldFilter.
"""
def __init__(self):
super(YieldFilter, self).__init__()
def __call__(self, recipe, source):
self._recipe = recipe
2008-10-27 19:13:50 +00:00
for record in source:
for result in self.process_record(record):
yield result
class FieldFilter(Filter):
""" ABC for filters that do a single operation on individual fields.
All derived filters must provide a process_field(self, item) that
returns a modified item. process_field is called on one or more keys
passed into __init__.
"""
def __init__(self, keys):
super(FieldFilter, self).__init__()
2008-11-20 15:37:45 +00:00
self._target_keys = utils.str_or_list(keys)
2008-10-27 19:13:50 +00:00
def process_record(self, record):
""" Calls process_field on all keys passed to __init__. """
2008-10-27 19:13:50 +00:00
for key in self._target_keys:
try:
item = record[key]
record[key] = self.process_field(item)
except KeyError:
# probably want to have a boolean to flag missing fields
pass
2008-10-27 19:13:50 +00:00
return record
def process_field(self, item):
""" Given a value, return the value that it should be replaced with. """
2008-10-27 19:13:50 +00:00
raise NotImplementedError('process_field not defined in ' +
self.__class__.__name__)
def __unicode__(self):
return '%s( %s )' % (self.__class__.__name__, str(self._target_keys))
class ConditionalFilter(YieldFilter):
""" ABC for filters that only pass through records meeting a condition.
All derived filters must provide a test_record(self, record) that
returns True or False -- True indicating that the record should be
passed through, and False preventing pass through.
"""
def __init__(self):
super(ConditionalFilter, self).__init__()
def process_record(self, record):
""" Yields all records for which self.test_record is true """
if self.test_record(record):
yield record
def test_record(self, record):
""" Given a record, return True iff it should be passed on """
raise NotImplementedError('test_record not defined in ' +
self.__class__.__name__)
2008-10-27 19:13:50 +00:00
#####################
## Generic Filters ##
#####################
class FieldModifier(FieldFilter):
""" Filter that calls a given function on a given set of fields.
FieldModifier(('spam','eggs'), abs) to call the abs method on the spam
and eggs fields in each record filtered.
"""
def __init__(self, keys, func):
super(FieldModifier, self).__init__(keys)
self._filter_func = func
def process_field(self, item):
return self._filter_func(item)
def __unicode__(self):
2008-11-04 21:39:40 +00:00
return '%s( %s, %s )' % (self.__class__.__name__,
str(self._target_keys), str(self._filter_func))
2008-10-27 19:13:50 +00:00
class FieldRemover(Filter):
""" Filter that removes a given set of fields.
FieldRemover(('spam', 'eggs')) removes the spam and eggs fields from
every record filtered.
"""
def __init__(self, keys):
super(FieldRemover, self).__init__()
2008-11-20 15:37:45 +00:00
self._target_keys = utils.str_or_list(keys)
2008-10-27 19:13:50 +00:00
def process_record(self, record):
for key in self._target_keys:
record.pop(key, None)
return record
def __unicode__(self):
return '%s( %s )' % (self.__class__.__name__, str(self._target_keys))
class FieldMerger(Filter):
""" Filter that merges a given set of fields using a supplied merge_func.
Takes a mapping (dictionary of new_column:(from_col1,from_col2))
FieldMerger({"bacon": ("spam", "eggs")}, operator.add) creates a new
column bacon that is the result of spam+eggs
"""
def __init__(self, mapping, merge_func):
super(FieldMerger, self).__init__()
self._field_mapping = mapping
self._merge_func = merge_func
def process_record(self, record):
2008-11-04 21:39:40 +00:00
for to_col, from_cols in self._field_mapping.iteritems():
2008-10-27 19:13:50 +00:00
values = [record.pop(col, None) for col in from_cols]
record[to_col] = self._merge_func(*values)
return record
def __unicode__(self):
return '%s( %s, %s )' % (self.__class__.__name__,
str(self._field_mapping),
str(self._merge_func))
class FieldAdder(Filter):
""" Filter that adds a new field.
Takes a name for the new field and a value, field_value can be an
iterable, a function, or a static value.
from itertools import count
FieldAdder('id', count)
2008-11-04 17:20:10 +00:00
would yield a new column named id that uses the itertools count iterable
to create sequentially numbered ids.
2008-10-27 19:13:50 +00:00
"""
def __init__(self, field_name, field_value):
super(FieldAdder, self).__init__()
self._field_name = field_name
2008-11-20 15:37:45 +00:00
self._field_value = field_value
2008-10-27 19:13:50 +00:00
def process_record(self, record):
if self._field_name not in record:
if callable(self._field_value):
record[self._field_name] = self._field_value()
else:
record[self._field_name] = self._field_value
2008-10-27 19:13:50 +00:00
return record
def __unicode__(self):
return '%s( %s, %s )' % (self.__class__.__name__, self._field_name,
str(self._field_value))
class FieldCopier(Filter):
""" Filter that copies one field to another.
Takes a dictionary mapping destination keys to source keys.
"""
def __init__(self, copy_mapping):
super(FieldCopier, self).__init__()
self._copy_mapping = copy_mapping
def process_record(self, record):
# mapping is dest:source
for dest, source in self._copy_mapping.iteritems():
srcval = utils.dotted_key_lookup(record, source)
utils.dotted_key_set(record, dest, srcval)
return record
2008-11-13 21:31:42 +00:00
class FieldRenamer(Filter):
""" Filter that renames one field to another.
Takes a dictionary mapping destination keys to source keys.
2008-11-13 21:31:42 +00:00
"""
def __init__(self, rename_mapping):
super(FieldRenamer, self).__init__()
self._rename_mapping = rename_mapping
def process_record(self, record):
# mapping is dest:source
for dest, source in self._rename_mapping.iteritems():
try:
srcval = utils.dotted_key_pop(record, source)
utils.dotted_key_set(record, dest, srcval)
except KeyError:
# silently pass if source key didn't exist
pass
return record
2008-10-27 19:13:50 +00:00
class Splitter(Filter):
""" Filter that splits nested data into different paths.
Takes a dictionary of keys and a series of filters to run against the
associated dictionaries.
{'person': {'firstname': 'James', 'lastname': 'Turk'},
'phones': [{'phone': '222-222-2222'}, {'phone': '335-333-3321'}]
}
"""
def __init__(self, split_mapping):
super(Splitter, self).__init__()
self._split_mapping = split_mapping
def process_record(self, record):
for key, filters in self._split_mapping.iteritems():
# if the key doesn't exist -- move on to next key
try:
subrecord = record[key]
except KeyError:
continue
2008-10-27 19:13:50 +00:00
# if a dict, use process_record directly
if isinstance(subrecord, dict):
2008-11-04 21:39:40 +00:00
for filter_ in filters:
subrecord = filter_.process_record(subrecord)
2008-10-27 19:13:50 +00:00
# if a list or tuple, use __call__
elif isinstance(subrecord, (list, tuple)):
2008-11-04 21:39:40 +00:00
for filter_ in filters:
subrecord = filter_(subrecord)
2008-10-27 19:13:50 +00:00
subrecord = [r for r in subrecord] # unchain generators
# place back from whence it came
record[key] = subrecord
return record
class Flattener(FieldFilter):
2008-11-04 17:20:10 +00:00
""" Collapse a set of similar dictionaries into a list.
Takes a dictionary of keys and flattens the key names:
2008-11-04 17:20:10 +00:00
addresses = [{'addresses': [{'address': {'state':'NC', 'street':'146 shirley drive'}},
{'address': {'state':'NY', 'street':'3000 Winton Rd'}}]}]
flattener = Flattener(['addresses'])
would yield:
{'addresses': [{'state': 'NC', 'street': '146 shirley drive'},
{'state': 'NY', 'street': '3000 Winton Rd'}]}
2008-11-04 17:20:10 +00:00
"""
def __init__(self, keys):
super(Flattener, self).__init__(keys)
2008-11-04 17:20:10 +00:00
def process_field(self, item):
result = []
for d in item:
rec = {}
for values in d.values():
rec.update(values)
result.append(rec)
return result
2008-11-17 20:53:04 +00:00
class DictFlattener(Filter):
def __init__(self, keys, separator='_'):
super(DictFlattener, self).__init__()
2008-11-20 15:37:45 +00:00
self._keys = utils.str_or_list(keys)
2008-11-17 20:53:04 +00:00
self._separator = separator
2008-11-04 17:20:10 +00:00
2008-11-17 20:53:04 +00:00
def process_record(self, record):
2008-11-20 15:37:45 +00:00
return utils.flatten(record, keys=self._keys, separator=self._separator)
2008-11-17 20:53:04 +00:00
class Unique(ConditionalFilter):
""" Filter that ensures that all records passing through are unique.
"""
2008-11-04 17:20:10 +00:00
def __init__(self):
super(Unique, self).__init__()
self._seen = set()
def test_record(self, record):
record_hash = hash(repr(record))
if record_hash not in self._seen:
self._seen.add(record_hash)
return True
else:
return False
2008-10-27 19:13:50 +00:00
2009-07-02 22:01:09 +00:00
class UnicodeFilter(Filter):
""" Convert all str elements in the record to Unicode.
"""
def __init__(self, encoding='utf-8', errors='ignore'):
super(UnicodeFilter, self).__init__()
self._encoding = encoding
self._errors = errors
def process_record(self, record):
for key, value in record.iteritems():
if isinstance(value, str):
record[key] = unicode(value, self._encoding, self._errors)
elif isinstance(value, unicode):
record[key] = value.decode(self._encoding, self._errors)
return record
class StringFilter(Filter):
def __init__(self, encoding='utf-8', errors='ignore'):
super(UnicodeFilter, self).__init__()
self._encoding = encoding
self._errors = errors
def process_record(self, record):
for key, value in record.iteritems():
if isinstance(value, unicode):
record[key] = value.encode(self._encoding, self._errors)
2009-07-02 22:01:09 +00:00
return record
2009-07-02 22:01:09 +00:00
2008-10-27 19:13:50 +00:00
###########################
## Commonly Used Filters ##
###########################
class PhoneNumberCleaner(FieldFilter):
""" Filter that cleans phone numbers to match a given format.
Takes a list of target keys and an optional phone # format that has
10 %s placeholders.
PhoneNumberCleaner( ('phone','fax'), number_format='%s%s%s-%s%s%s-%s%s%s%s')
would format the phone & fax columns to 555-123-4567 format.
"""
def __init__(self, keys, number_format='%s%s%s.%s%s%s.%s%s%s%s'):
super(PhoneNumberCleaner, self).__init__(keys)
self._number_format = number_format
self._num_re = re.compile('\d')
def process_field(self, item):
nums = self._num_re.findall(item)
if len(nums) == 10:
item = self._number_format % tuple(nums)
return item
2008-11-13 21:31:42 +00:00
class DateCleaner(FieldFilter):
""" Filter that cleans dates to match a given format.
Takes a list of target keys and to and from formats in strftime format.
"""
def __init__(self, keys, from_format, to_format):
super(DateCleaner, self).__init__(keys)
self._from_format = from_format
self._to_format = to_format
def process_field(self, item):
return time.strftime(self._to_format,
time.strptime(item, self._from_format))
class NameCleaner(Filter):
""" Filter that splits names into a first, last, and middle name field.
Takes a list of target keys.
2009-02-27 19:49:50 +00:00
NameCleaner( ('name', ), nomatch_name='raw_name')
would attempt to split 'name' into firstname, middlename, lastname,
and suffix columns, and if it did not fit would place it in raw_name
"""
# first middle? last suffix?
FIRST_LAST = re.compile('''^\s*(?:(?P<firstname>\w+)(?:\.?)
\s+(?:(?P<middlename>\w+)\.?\s+)?
(?P<lastname>[A-Za-z'-]+))
(?:\s+(?P<suffix>JR\.?|II|III|IV))?
\s*$''', re.VERBOSE | re.IGNORECASE)
# last, first middle? suffix?
LAST_FIRST = re.compile('''^\s*(?:(?P<lastname>[A-Za-z'-]+),
\s+(?P<firstname>\w+)(?:\.?)
(?:\s+(?P<middlename>\w+)\.?)?)
(?:\s+(?P<suffix>JR\.?|II|III|IV))?
\s*$''', re.VERBOSE | re.IGNORECASE)
2008-11-20 15:37:45 +00:00
def __init__(self, keys, prefix='', formats=None, nomatch_name=None):
super(NameCleaner, self).__init__()
2008-11-20 15:37:45 +00:00
self._keys = utils.str_or_list(keys)
self._name_prefix = prefix
self._nomatch_name = nomatch_name
if formats:
self._name_formats = formats
else:
self._name_formats = [self.FIRST_LAST, self.LAST_FIRST]
def process_record(self, record):
# run for each key (not using a FieldFilter due to multi-field output)
for key in self._keys:
name = record[key]
# check if key matches any formats
for format in self._name_formats:
match = format.match(name)
# if there is a match, remove original name and add pieces
if match:
record.pop(key)
for k,v in match.groupdict().iteritems():
2008-11-20 15:37:45 +00:00
record[self._name_prefix + k] = v
break
2008-11-20 15:37:45 +00:00
# if there is no match, move name into nomatch_name
else:
if self._nomatch_name:
record.pop(key)
record[self._nomatch_name] = name
return record