From 8654f03ba7211b45b28e87e7e2f1d10b456592b3 Mon Sep 17 00:00:00 2001 From: James Turk Date: Mon, 27 Oct 2008 19:13:50 +0000 Subject: [PATCH] saucebrush moving from bzr to svn :( --- examples/earmarkwatch.py | 62 ++++++++++ examples/fec_cobol.py | 67 +++++++++++ examples/fec_electronic.py | 112 +++++++++++++++++ examples/tables.sql | 56 +++++++++ examples/test-splitter.py | 15 +++ saucebrush.kpf | 7 ++ saucebrush/__init__.py | 18 +++ saucebrush/emitters.py | 162 +++++++++++++++++++++++++ saucebrush/filters.py | 241 +++++++++++++++++++++++++++++++++++++ saucebrush/sources.py | 131 ++++++++++++++++++++ saucebrush/utils.py | 64 ++++++++++ 11 files changed, 935 insertions(+) create mode 100644 examples/earmarkwatch.py create mode 100644 examples/fec_cobol.py create mode 100644 examples/fec_electronic.py create mode 100644 examples/tables.sql create mode 100644 examples/test-splitter.py create mode 100644 saucebrush.kpf create mode 100644 saucebrush/__init__.py create mode 100644 saucebrush/emitters.py create mode 100644 saucebrush/filters.py create mode 100644 saucebrush/sources.py create mode 100644 saucebrush/utils.py diff --git a/examples/earmarkwatch.py b/examples/earmarkwatch.py new file mode 100644 index 0000000..88f7454 --- /dev/null +++ b/examples/earmarkwatch.py @@ -0,0 +1,62 @@ +from saucebrush.sources import CSVSource +from saucebrush.outputs import CSVOutput, DebugOutput + +def merge_columns(datasource, mapping, merge_func): + for rowdata in datasource: + for to_col,from_cols in mapping.iteritems(): + values = [rowdata.pop(col, None) for col in from_cols] + rowdata[to_col] = reduce(merge_func, values) + yield rowdata + +def add_column(datasource, column_name, column_value): + if callable(column_value): + for rowdata in datasource: + rowdata[column_name] = column_value() + else: + for rowdata in datasource: + rowdata[column_name] = column_value + +def legislators_to_ids(datasource): + for rowdata in datasource: + names = rowdata['members'].split('; ') + parties = rowdata['parties'].split('; ') + states = rowdata['states'].split('; ') + if not len(names) == len(parties) == len(states): + raise Exception('line %d: len(names)=%d, len(parties)=%d, len(states)=%d' % (rowdata['row'], len(names), len(parties), len(states))) + members = zip(names, parties, states) + for name, party, state in members: + pass + +def main(): + import sys + filename = sys.argv[1] + + column_names = ['house_amount', 'senate_amount', 'conference_amount', + 'budget_request', 'request_letter', 'description', 'benficiary', + 'address', 'city', 'county', 'state', 'zipcode', 'bill', + 'bill_section', 'bill_subsection', 'project_heading', + 'house_members', 'house_parties', 'house_states', + 'senate_members', 'senate_parties', 'senate_states', + 'presidential', 'undisclosed', 'intended_recipient', 'notes'] + + output_names = ['appropriated', 'budget_request', 'request_letter', + 'description', 'benficiary', + 'address', 'city', 'county', 'state', 'zipcode', 'bill', + 'bill_section', 'bill_subsection', 'project_heading', + 'members', 'parties', 'states', + 'presidential', 'undisclosed', 'intended_recipient', 'notes'] + + data = CSVSource(open(filename), column_names, 1) + data = merge_columns(data, {'appropriated': ['house_amount', 'senate_amount', 'conference_amount'], + 'members': ['house_members', 'senate_members'], + 'parties': ['house_parties', 'senate_parties'], + 'states': ['house_states', 'senate_states']}, + lambda x,y: x or y) + + output = CSVOutput(open('brushed.'+filename,'w'), output_names) + + for item in data: + output.write(item) + +if __name__=='__main__': + main() diff --git a/examples/fec_cobol.py b/examples/fec_cobol.py new file mode 100644 index 0000000..8330909 --- /dev/null +++ b/examples/fec_cobol.py @@ -0,0 +1,67 @@ +from saucebrush.sources import FixedWidthFileSource +from saucebrush.filters import FieldModifier, FieldRemover +from saucebrush.emitters import SqliteEmitter, SqlDumpEmitter +from saucebrush import run_recipe + +CM_FIELDS = [('id',9),('name',90),('treasurer',38),('street_1',34), + ('street_2',34),('city',18),('state',2),('zipcode',5), + ('designation',1),('type',1),('party',3),('filing_frequency',1), + ('interest_group_category',1),('connected_org_name',38), + ('candidate_id',9)] + +# party_1 and party_3 become party and party_2 +CN_FIELDS = [('id',9), ('name',38), ('party',3), ('fillerA',3), ('party_2',3), + ('seat_status',1), ('fillerB',1), ('status',1), ('street_1',34), + ('street_2',34), ('city',18), ('state',2), ('zipcode',5), + ('committee_id', 9), ('election_year',2), ('district', 2)] + +# Combines year field (split in data for no apparent reason) +INDIV_FIELDS = [('filer_id',9), ('amendment',1), ('report_type', 3), + ('primary_general',1), ('microfilm_loc',11), + ('transaction_type',3), ('name',34), ('city',18), ('state', 2), + ('zipcode', 5), ('occupation', 35), ('month', 2), ('day', 2), + ('year',4), ('amount',7), ('other_id',9), + ('fec_record_number',7)] + +# Combines year field (split in data for no apparent reason) +PAS2_FIELDS = [('id',9), ('amendment',1), ('report_type', 3), + ('primary_general',1), ('microfilm_loc',11), + ('transaction_type',3), ('month', 2), ('day', 2), + ('year',4), ('amount',7), ('other_id',9), ('candidate_id',9), + ('fec_record_number', 7)] + + +def fix_cobol_number(number): + mapping = {']':'0', 'j':'1', 'k':'2', 'l':'3', 'm':'4', 'n':'5', 'o':'6', 'p':'7', 'q':'8', 'r':'9'} + number = number.lstrip('0') + if not number: + number = '0' + elif number[-1] in mapping.keys(): + number = '-' + number[0:-1] + mapping[number[-1]] + return number + +def process_fec_year(year): + # committees + source = FixedWidthFileSource(open('%s/foiacm.dta' % year), CM_FIELDS) + #sqlite = SqliteOutput('fec%s.sqlite' % year, 'committee', [f[0] for f in CM_FIELDS if f[0] != 'filler']) + emit_mysql = SqlDumpEmitter(open('fec%s.sql' % year,'a'), 'committee', [f[0] for f in CM_FIELDS if f[0] != 'filler']) + run_recipe(source, [emit_mysql]) + + # candidate + source = FixedWidthFileSource(open('%s/foiacn.dta' % year), CN_FIELDS) + fieldremover = FieldRemover(('fillerA', 'fillerB')) + #sqlite = SqliteOutput('fec%s.sqlite' % year, 'candidate', [f[0] for f in CN_FIELDS if f[0] != 'filler']) + emit_mysql = SqlDumpEmitter(open('fec%s.sql' % year,'a'), 'candidate', [f[0] for f in CN_FIELDS if not f[0].startswith('filler')]) + run_recipe(source, [fieldremover, emit_mysql]) + + # contributions + source = FixedWidthFileSource(open('%s/itcont.dta' % year), INDIV_FIELDS) + decobolizer = FieldModifier(('amount', ), fix_cobol_number) + #sqlite = SqliteOutput('fec%s.sqlite' % year, 'contribution', [f[0] for f in INDIV_FIELDS if f[0] != 'filler']) + emit_mysql = SqlDumpEmitter(open('fec%s.sql' % year,'a'), 'contribution', [f[0] for f in INDIV_FIELDS if f[0] != 'filler']) + run_recipe(source, [decobolizer, emit_mysql]) + +if __name__=='__main__': + process_fec_year(2008) + #for year in [2000,2002,2004,2006,2008]: + # process_fec_year(year) diff --git a/examples/fec_electronic.py b/examples/fec_electronic.py new file mode 100644 index 0000000..6bf1b78 --- /dev/null +++ b/examples/fec_electronic.py @@ -0,0 +1,112 @@ +import re +import exceptions + +class FECSource(object): + + SPLIT_CHAR = '\x1c' + FORM_FIELDS = { + 'F56' : ['form_type', 'committee_id', 'transaction_id', 'entity_type', + 'contributor_organization', 'contributor_lastname', 'contributor_firstname', + 'contributor_middlename', 'contributor_prefix', 'contributor_suffix', + 'contributor_street1', 'contributor_street2', 'contributor_city', + 'contributor_state', 'contributor_zip', 'contributor_committee_id', + 'date', 'amount', 'contributor_employer', 'contributor_occupation'] + } + + # use Regex to map forms to keys in FORM_FIELDS + FORM_MAPPING = ( + ('F1(A|N)', 'F1'), + ('F1S', 'F1S'), + ('F1M(A|N)', 'F1M'), + ('F2(A|N)', 'F2'), + ('F2S', 'F2S'), + ('F24', 'F24'), + ('F3(N|A|T)', 'F3'), + ('F3S', 'F3S'), + ('F3ZT?', 'F3Z'), + ('F3P(N|A|T)', 'F3P'), + ('F3PS', 'F3PS'), + ('F3P31AL', 'F3P31AL'), + ('F3X(N|A|T)', 'F3X'), + ('F4(N|A|T)', 'F4'), + ('F5(N|A|T)', 'F5'), + ('F56', 'F56'), + ('F57', 'F57'), + ('F6', 'F6'), + ('F65', 'F65'), + ('F7(N|A|T)', 'F7'), + ('F76', 'F76'), + ('F8(N|A|T)', 'F8'), + ('F82', 'F82'), + ('F83', 'F83'), + ('F9(A|N)', 'F9'), + ('F91', 'F91'), + ('F92', 'F92'), + ('F93', 'F93'), + ('F94', 'F94'), + ('F10', 'F10'), + ('F105', 'F105'), + ('F13(A|N)', 'F13'), + ('F132', 'F132'), + ('F133', 'F133'), + ('F99', 'F99'), + ('SA.+', 'SA'), + ('SB.+', 'SB'), + ('SC/.+', 'SC'), + ('SC1/.+', 'SC1'), + ('SC2/.+', 'SC2'), + ('SD.+', 'SD'), + ('SE', 'SE'), + ('SF', 'SF'), + ('H1', 'H1'), + ('H2', 'H2'), + ('H3', 'H3'), + ('H4', 'H4'), + ('H5', 'H5'), + ('H6', 'H6'), + ('SI', 'SI'), + ('SL', 'SL'), + ('TEXT', 'TEXT'), + ) + + # compile regexes with optional quotes + FORM_MAPPING = dict( [(re.compile("(\")?%s(\")?" % pattern), form) + for pattern,form in FORM_MAPPING] ) + + def __init__(self, filename): + self.filename = filename + self.fecfile = open(filename) + self.header = self.fecfile.readline().split(self.SPLIT_CHAR) + if self.header[0] != "HDR": + print self.header + #assert self.header[2].startswith("6.2"), self.header + self._in_textblock = False + + @staticmethod + def get_form_type(rectype): + for type_re, type in FECSource.FORM_MAPPING.iteritems(): + if type_re.match(rectype): + return type + + def process_file(self): + begintext = re.compile('\[BEGINTEXT\]', re.IGNORECASE) + endtext = re.compile('\[ENDTEXT\]', re.IGNORECASE) + in_textblock = False + + for line in self.fecfile: + + # get fields from line + fields = line.split(self.SPLIT_CHAR) + + # handle the BEGINTEXT/ENDTEXT blocks + if begintext.match(fields[0]): + in_textblock = True + elif begintext.match(fields[0]): + in_textblock = False + elif line != '\n' and not in_textblock: + type = self.get_form_type(fields[0]) + if type in self.FORM_FIELDS: + yield dict(zip(self.FORM_FIELDS[type], fields)) + + def __iter__(self): + return self.process_file() diff --git a/examples/tables.sql b/examples/tables.sql new file mode 100644 index 0000000..af9077e --- /dev/null +++ b/examples/tables.sql @@ -0,0 +1,56 @@ + + +CREATE TABLE `candidate` ( + `id` char(9) default NULL, + `name` varchar(38) default NULL, + `party` char(3) default NULL, + `party_2` char(3) default NULL, + `seat_status` char(1) default NULL, + `status` char(1) default NULL, + `street_1` varchar(34) default NULL, + `street_2` varchar(34) default NULL, + `city` varchar(18) default NULL, + `state` char(2) default NULL, + `zipcode` char(5) default NULL, + `committee_id` char(9) default NULL, + `election_year` char(2) default NULL, + `district` char(2) default NULL +); + +CREATE TABLE `committee` ( + `id` char(9) default NULL, + `name` varchar(90) default NULL, + `treasurer` varchar(38) default NULL, + `street_1` varchar(34) default NULL, + `street_2` varchar(34) default NULL, + `city` varchar(18) default NULL, + `state` char(2) default NULL, + `zipcode` char(5) default NULL, + `designation` char(1) default NULL, + `type` char(1) default NULL, + `party` char(3) default NULL, + `filing_frequency` char(1) default NULL, + `interest_group_category` char(1) default NULL, + `connected_org_name` varchar(38) default NULL, + `candidate_id` char(9) default NULL +); + +CREATE TABLE `contribution` ( + `filer_id` char(9) default NULL, + `amendment` char(1) default NULL, + `report_type` char(3) default NULL, + `primary_general` char(1) default NULL, + `microfilm_loc` char(11) default NULL, + `transaction_type` char(3) default NULL, + `name` varchar(34) default NULL, + `city` varchar(18) default NULL, + `state` char(2) default NULL, + `zipcode` char(5) default NULL, + `occupation` varchar(35) default NULL, + `month` char(2) default NULL, + `day` char(2) default NULL, + `year` char(4) default NULL, + `amount` char(7) default NULL, + `other_id` char(9) default NULL, + `fec_record_number` char(7) default NULL +) ; diff --git a/examples/test-splitter.py b/examples/test-splitter.py new file mode 100644 index 0000000..f338bf3 --- /dev/null +++ b/examples/test-splitter.py @@ -0,0 +1,15 @@ +from saucebrush.filters import Splitter, PhoneNumberCleaner, FieldMerger, FieldAdder +from saucebrush.emitters import DebugEmitter +import operator +import saucebrush + +data = [{'person': {'firstname': 'James', 'lastname': 'Turk'}, + 'phones': [{'phone': '222-222-2222'}, {'phone': '(202) 333-3321'}] + }] + +namemerger = FieldMerger({'name': ('firstname', 'lastname')}, lambda x,y: ' '.join((x,y))) +phonecleaner = PhoneNumberCleaner(('phone',)) +splitter = Splitter({'person':[namemerger], 'phones':[phonecleaner]}) +ider = FieldAdder('id', [1,2,3,4,5]) + +saucebrush.run_recipe(data, [ider, splitter, DebugEmitter()]) diff --git a/saucebrush.kpf b/saucebrush.kpf new file mode 100644 index 0000000..22be670 --- /dev/null +++ b/saucebrush.kpf @@ -0,0 +1,7 @@ + + + + + 1 + + diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py new file mode 100644 index 0000000..15f9682 --- /dev/null +++ b/saucebrush/__init__.py @@ -0,0 +1,18 @@ +import filters, emitters, sources, utils + +def run_recipe(source, filters): + # connect datapath + data = source + for filter in filters: + data = filter(data) + + # actually run the data through (causes iterators to actually be called) + for record in data: + pass + + # try and call done() on all filters + for filter in filters: + try: + filter.done() + except AttributeError: + pass # don't care if there isn't a done method diff --git a/saucebrush/emitters.py b/saucebrush/emitters.py new file mode 100644 index 0000000..756c5e4 --- /dev/null +++ b/saucebrush/emitters.py @@ -0,0 +1,162 @@ +""" + Saucebrush Emitters are filters that instead of modifying the record, output + it in some manner. +""" + +from exceptions import NotImplementedError +from saucebrush.filters import Filter + + +class Emitter(Filter): + """ ABC for emitters + + All derived emitters must provide an emit_record(self, record) that + takes a single record (python dictionary). + + Emitters can optionally define a close() method that is called after + all records are processed (allowing database flushes, or printing of + aggregate data). + """ + + def __init__(self): + super(Emitter, self).__init__() + + def process_record(self, record): + self.emit_record(record) + return record + + def emit_record(self, record): + """ Abstract method to be overridden. + + Called with a single record, should "emit" the record unmodified. + """ + raise NotImplementedError('emit_record not defined in ' + + self.__class__.__name__) + + def done(self): + """ No-op Method to be overridden. + + Called when all processing is complete + """ + pass + + +class DebugEmitter(Emitter): + """ Emitter that prints raw records to a file, useful for debugging. + + DebugEmitter() by default prints to stdout. + DebugEmitter(open('test', 'w')) would print to a file named test + """ + def __init__(self, outfile=None): + super(DebugEmitter, self).__init__() + if not outfile: + import sys + self._outfile = sys.stdout + else: + self._outfile = outfile + + def emit_record(self, record): + self._outfile.write(str(record) + '\n') + + +class CSVEmitter(Emitter): + """ Emitter that writes records to a CSV file. + + CSVEmitter(open('output.csv','w'), ('id', 'name', 'phone')) writes all + records to a csvfile with the columns in the order specified. + """ + + def __init__(self, csvfile, fieldnames=None): + super(CSVEmitter, self).__init__() + import csv + self._dictwriter = csv.DictWriter(csvfile, fieldnames) + # write header row + self._dictwriter.writerow(dict(zip(fieldnames, fieldnames))) + + def emit_record(self, record): + self._dictwriter.writerow(record) + + +class SqliteEmitter(Emitter): + """ Emitter that writes records to a SQLite database. + + SqliteEmitter('addressbook.db', 'friend') writes all records to the + friends table in the SQLite database named addressbook.db + + (To have the emitter create the table, the fieldnames should be passed + as a third parameter to SqliteEmitter.) + """ + + def __init__(self, dbname, table_name, fieldnames=None): + self(SqliteEmitter, self).__init__() + import sqlite3 + self._conn = sqlite3.connect(dbname) + self._cursor = self.conn.cursor() + self._table_name = table_name + if fieldnames: + create = "CREATE TABLE IF NOT EXISTS %s (%s)" % (table_name, + ', '.join([' '.join((field, 'TEXT')) for field in fieldnames])) + self._cursor.execute(create) + + def emit_record(self, record): + # input should be escaped with ? if data isn't trusted + qmarks = ','.join(('?',) * len(record)) + insert = 'INSERT INTO %s (%s) VALUES (%s)' % (self._table_name, + ','.join(record.keys()), + qmarks) + self._cursor.execute(insert, record.values()) + + def done(self): + self._conn.commit() + self._conn.close() + + +class SqlDumpEmitter(Emitter): + """ Emitter that writes SQL INSERT statements. + + The output generated by the SqlDumpEmitter is intended to be used to + populate a mySQL database. + + SqlDumpEmitter(open('addresses.sql', 'w'), 'friend', ('name', 'phone')) + writes statements to addresses.sql to insert the data + into the friends table. + """ + + def __init__(self, outfile, table_name, fieldnames): + super(SqlDumpEmitter, self).__init__() + self._fieldnames = fieldnames + if not outfile: + import sys + self._outfile = sys.stderr + else: + self._outfile = outfile + self._insert_str = "INSERT INTO `%s` (`%s`) VALUES (%%s);\n" % (table_name, '`,`'.join(fieldnames)) + + def quote(self, item): + return "'%s'" % item.replace("\\","\\\\").replace("'","\\'").replace(chr(0),'0') + + def emit_record(self, record): + quoted_data = [self.quote(record[field]) for field in self._fieldnames] + self._outfile.write(self._insert_str % ','.join(quoted_data)) + + def done(self): + self._outfile.close() + + +class DjangoModelEmitter(Emitter): + """ Emitter that populates a table corresponding to a django model. + + Takes a django settings file, app label and model name and uses django + to insert the records into the appropriate table. + + DjangoModelOutput('settings.py', 'addressbook', 'friend') writes + records to addressbook.models.friend model using database settings + from settings.py. + """ + def __init__(self, dj_settings, app_label, model_name): + super(DjangoModelEmitter, self).__init__() + from saucebrush.utils import get_django_model + self.dbmodel = get_django_model(dj_settings, app_label, model_name) + + def emit_record(self, record): + self.dbmodel.objects.create(**record) diff --git a/saucebrush/filters.py b/saucebrush/filters.py new file mode 100644 index 0000000..f56e14a --- /dev/null +++ b/saucebrush/filters.py @@ -0,0 +1,241 @@ +""" + Saucebrush Filters are callables that take a Saucebrush source and yield + back filtered records. + + The Filter, YieldFilter, and FieldFilter abstract base types are provided + for convenience. Derived classes only need to implement process_record + (or process_field for FieldFilter). +""" + +from exceptions import NotImplementedError + +###################### +## Abstract Filters ## +###################### + +class Filter(object): + """ ABC for filters that operate on records. + + All derived filters must provide a process_record(self, record) that + takes a single record (python dictionary) and returns a result. + """ + + def __init__(self): + pass + + def process_record(self, record): + """ Abstract method to be overridden. + + Called with a single record, should return modified record. + """ + raise NotImplementedError('process_record not defined in ' + + self.__class__.__name__) + + def __call__(self, source): + for record in source: + yield self.process_record(record) + + +class YieldFilter(Filter): + """ ABC for defining filters where process_record yields. + + If process_record cannot return exactly one result for every record + it is passed, it should yield back as many records as needed and the + filter must derive from YieldFilter. + """ + def __init__(self): + super(YieldFilter, self).__init__() + + def __call__(self, source): + for record in source: + for result in self.process_record(record): + yield result + + +class FieldFilter(Filter): + """ ABC for filters that do a single operation on individual fields. + + All derived filters must provide a process_field(self, item) that + returns a modified item. process_field is called on one or more keys + passed into __init__. + """ + + def __init__(self, keys): + super(FieldFilter, self).__init__() + self._target_keys = keys + + def process_record(self, record): + """ Calls process_field on all keys passed to __init__. """ + for key in self._target_keys: + record[key] = self.process_field(record[key]) + return record + + def process_field(self, item): + """ Given a value, return the value that it should be replaced with. """ + raise NotImplementedError('process_field not defined in ' + + self.__class__.__name__) + + def __unicode__(self): + return '%s( %s )' % (self.__class__.__name__, str(self._target_keys)) + + +##################### +## Generic Filters ## +##################### + +class FieldModifier(FieldFilter): + """ Filter that calls a given function on a given set of fields. + + FieldModifier(('spam','eggs'), abs) to call the abs method on the spam + and eggs fields in each record filtered. + """ + + def __init__(self, keys, func): + super(FieldModifier, self).__init__(keys) + self._filter_func = func + + def process_field(self, item): + return self._filter_func(item) + + def __unicode__(self): + return '%s( %s, %s )' % (self.__class__.__name__, str(self._target_keys), + str(self._filter_func)) + + +class FieldRemover(Filter): + """ Filter that removes a given set of fields. + + FieldRemover(('spam', 'eggs')) removes the spam and eggs fields from + every record filtered. + """ + + def __init__(self, keys): + super(FieldRemover, self).__init__() + self._target_keys = keys + + def process_record(self, record): + for key in self._target_keys: + record.pop(key, None) + return record + + def __unicode__(self): + return '%s( %s )' % (self.__class__.__name__, str(self._target_keys)) + + +class FieldMerger(Filter): + """ Filter that merges a given set of fields using a supplied merge_func. + + Takes a mapping (dictionary of new_column:(from_col1,from_col2)) + + FieldMerger({"bacon": ("spam", "eggs")}, operator.add) creates a new + column bacon that is the result of spam+eggs + """ + + def __init__(self, mapping, merge_func): + super(FieldMerger, self).__init__() + self._field_mapping = mapping + self._merge_func = merge_func + + def process_record(self, record): + for to_col,from_cols in self._field_mapping.iteritems(): + values = [record.pop(col, None) for col in from_cols] + record[to_col] = self._merge_func(*values) + return record + + def __unicode__(self): + return '%s( %s, %s )' % (self.__class__.__name__, + str(self._field_mapping), + str(self._merge_func)) + + +class FieldAdder(Filter): + """ Filter that adds a new field. + + Takes a name for the new field and a value, field_value can be an + iterable, a function, or a static value. + + from itertools import count + FieldAdder('id', count) + """ + + def __init__(self, field_name, field_value): + super(FieldAdder, self).__init__() + self._field_name = field_name + try: + self._field_value = iter(field_value).next + except TypeError: + self._field_value = field_value + + def process_record(self, record): + if callable(self._field_value): + record[self._field_name] = self._field_value() + else: + record[self._field_name] = self._field_value + return record + + def __unicode__(self): + return '%s( %s, %s )' % (self.__class__.__name__, self._field_name, + str(self._field_value)) + + +class Splitter(Filter): + """ Filter that splits nested data into different paths. + + Takes a dictionary of keys and a series of filters to run against the + associated dictionaries. + + {'person': {'firstname': 'James', 'lastname': 'Turk'}, + 'phones': [{'phone': '222-222-2222'}, {'phone': '335-333-3321'}] + } + """ + + def __init__(self, split_mapping): + super(Splitter, self).__init__() + self._split_mapping = split_mapping + + def process_record(self, record): + for key, filters in self._split_mapping.iteritems(): + + subrecord = record[key] + + # if a dict, use process_record directly + if isinstance(subrecord, dict): + for filter in filters: + subrecord = filter.process_record(subrecord) + + # if a list or tuple, use __call__ + elif isinstance(subrecord, (list, tuple)): + for filter in filters: + subrecord = filter(subrecord) + subrecord = [r for r in subrecord] # unchain generators + + # place back from whence it came + record[key] = subrecord + return record + + + +########################### +## Commonly Used Filters ## +########################### + +class PhoneNumberCleaner(FieldFilter): + """ Filter that cleans phone numbers to match a given format. + + Takes a list of target keys and an optional phone # format that has + 10 %s placeholders. + + PhoneNumberCleaner( ('phone','fax'), number_format='%s%s%s-%s%s%s-%s%s%s%s') + would format the phone & fax columns to 555-123-4567 format. + """ + def __init__(self, keys, number_format='%s%s%s.%s%s%s.%s%s%s%s'): + import re + super(PhoneNumberCleaner, self).__init__(keys) + self._number_format = number_format + self._num_re = re.compile('\d') + + def process_field(self, item): + nums = self._num_re.findall(item) + if len(nums) == 10: + item = self._number_format % tuple(nums) + return item diff --git a/saucebrush/sources.py b/saucebrush/sources.py new file mode 100644 index 0000000..b4bc1a2 --- /dev/null +++ b/saucebrush/sources.py @@ -0,0 +1,131 @@ +""" + Saucebrush data sources, convert data in some format into python dicts. + + All sources must implement the iterable interface and return python + dictionaries. +""" + +import string + +class CSVSource(object): + """ Saucebrush source for reading from CSV files. + + Takes an open csvfile, an optional set of fieldnames and optional number + of rows to skip. + + CSVSource(open('test.csv')) will read a csvfile, using the first row as + the field names. + + CSVSource(open('test.csv'), ('name', 'phone', 'address'), 1) will read + in a CSV file and treat the three columns as name, phone, and address, + ignoring the first row (presumed to be column names). + """ + + def __init__(self, csvfile, fieldnames=None, skiprows=0): + import csv + self._dictreader = csv.DictReader(csvfile, fieldnames) + for _ in xrange(skiprows): + self.dictreader.next() + + def __iter__(self): + return self._dictreader + + +class FixedWidthFileSource(object): + """ Saucebrush source for reading from fixed width field files. + + FixedWidthFileSource expects an open fixed width file and a tuple + of fields with their lengths. There is also an optional fillchars + command that is the filler characters to strip from the end of each + field. (defaults to whitespace) + + FixedWidthFileSource(open('testfile'), (('name',30), ('phone',12))) + will read in a fixed width file where the first 30 characters of each + line are part of a name and the characters 31-42 are a phone number. + """ + + def __init__(self, fwfile, fields, fillchars=string.whitespace): + self._fwfile = fwfile + self._fields_dict = {} + self._fillchars = fillchars + from_offset = 0 + to_offset = 0 + for field, size in fields: + to_offset += size + self._fields_dict[field] = (from_offset, to_offset) + from_offset += size + + def __iter__(self): + return self + + def next(self): + line = self._fwfile.next() + record = {} + for name, range in self._fields_dict.iteritems(): + record[name] = line[range[0]:range[1]].rstrip(self._fillchars) + return record + + +class HtmlTableSource(object): + """ Saucebrush source for reading data from an HTML table. + + HtmlTableSource expects an open html file, the id of the table or a + number indicating which table on the page to use, an optional fieldnames + tuple, and an optional number of rows to skip. + + HtmlTableSource(open('test.html'), 0) opens the first HTML table and + uses the first row as the names of the columns. + + HtmlTableSource(open('test.html'), 'people', ('name','phone'), 1) opens + the HTML table with an id of 'people' and names the two columns + name and phone, skipping the first row where alternate names are + stored. + """ + + def __init__(self, htmlfile, id_or_num, fieldnames=None, skiprows=0): + + # extract the table + from BeautifulSoup import BeautifulSoup + soup = BeautifulSoup(htmlfile.read()) + if isinstance(id_or_num, int): + table = soup.findAll('table')[id_or_num] + elif isinstance(id_or_num, str): + table = soup.find('table', id=id_or_num) + + # skip the necessary number of rows + self._rows = table.findAll('tr')[skiprows:] + + # determine the fieldnames + if not fieldnames: + self._fieldnames = [td.string for td in self.rows[0].findAll(('td','th'))] + else: + self._fieldnames = fieldnames + + def process_tr(): + for row in self._rows: + strings = [string_dig(td) for td in row.findAll('td')] + yield dict(zip(self._fieldnames, strings)) + + def __iter__(self): + return self.process_tr() + + +class DjangoModelSource(object): + """ Saucebrush source for reading data from django models. + + DjangoModelSource expects a django settings file, app label, and model + name. The resulting records contain all columns in the table for the + specified model. + + DjangoModelSource('settings.py', 'phonebook', 'friend') would read all + friends from the friend model in the phonebook app described in + settings.py. + """ + def __init__(self, dj_settings, app_label, model_name): + dbmodel = get_django_model(dj_settings, app_label, model_name) + + # only get values defined in model (no extra fields from custom manager) + self._data = dbmodel.objects.values(*[f.name for f in dbmodel._meta.fields]) + + def __iter__(self): + return iter(self._data) diff --git a/saucebrush/utils.py b/saucebrush/utils.py new file mode 100644 index 0000000..c8140ac --- /dev/null +++ b/saucebrush/utils.py @@ -0,0 +1,64 @@ +""" + General utilities used within saucebrush that may be useful elsewhere. +""" + +def get_django_model(dj_settings, app_label, model_name): + """ + Get a django model given a settings file, app label, and model name. + """ + + from django.conf import settings + if not settings.configured: + settings.configure(DATABASE_ENGINE=dj_settings.DATABASE_ENGINE, + DATABASE_NAME=dj_settings.DATABASE_NAME, + DATABASE_USER=dj_settings.DATABASE_USER, + DATABASE_PASSWORD=dj_settings.DATABASE_PASSWORD, + DATABASE_HOST=dj_settings.DATABASE_HOST, + INSTALLED_APPS=dj_settings.INSTALLED_APPS) + from django.db.models import get_model + dbmodel = get_model(app_label, model_name) + + +def string_dig(element, joiner=''): + """ + Dig into BeautifulSoup HTML elements looking for inner strings. + + If element resembled:

testtest

+ then string_dig(element, '~') would return test~test + """ + if element.string: + return element.string + else: + return joiner.join([string_dig(child) for child in element.findAll(True)]) + + +def dotted_key_lookup(dict_, dotted_key, default=KeyError, separator='.'): + """ + Do a lookup within dict_ by the various elements of dotted_key. + + Optionally specifiy a default to return if key does not exist (similar + to default + + >>> d = {'a': {'b': {'c': 3} } } + >>> dotted_key_lookup(d, 'a.b.c') + 3 + >>> dotted_key_lookup(d, 'a.z', -1) + -1 + >>> dotted_key_lookup(d, 'a|b|c', separator='|') + 3 + >>> dotted_key_lookup(d, ' + """ + val = dict_ + try: + for key in dotted_key.split(separator): + if isinstance(val, dict): + val = val[key] + elif isinstance(val, (list,tuple)): + val = val[int(key)] + else: + val = getattr(val, key) + except (KeyError, IndexError, AttributeError): + if default is KeyError: + raise + val = default + return val