added FileSource and JSONSource

This commit is contained in:
Michael Stephens 2010-06-17 17:06:28 -04:00
parent 172af19014
commit ce55ad0fcf

View File

@ -136,7 +136,7 @@ class DjangoModelSource(object):
class MongoDBSource(object): class MongoDBSource(object):
""" Source for reading from a MongoDB database. """ Source for reading from a MongoDB database.
The record dict is populated with records matching the spec The record dict is populated with records matching the spec
from the specified database and collection. from the specified database and collection.
""" """
@ -146,10 +146,10 @@ class MongoDBSource(object):
conn = Connection(host, port) conn = Connection(host, port)
self.collection = conn[database][collection] self.collection = conn[database][collection]
self.spec = spec self.spec = spec
def __iter__(self): def __iter__(self):
return self._find_spec() return self._find_spec()
def _find_spec(self): def _find_spec(self):
for doc in self.collection.find(self.spec): for doc in self.collection.find(self.spec):
yield dict(doc) yield dict(doc)
@ -166,18 +166,18 @@ class SqliteSource(object):
The record dict is populated with the results from the The record dict is populated with the results from the
query argument. If given, args will be passed to the query query argument. If given, args will be passed to the query
when executed. when executed.
""" """
def __init__(self, dbpath, query, args=None, conn_params=None): def __init__(self, dbpath, query, args=None, conn_params=None):
import sqlite3 import sqlite3
self._dbpath = dbpath self._dbpath = dbpath
self._query = query self._query = query
self._args = args or [] self._args = args or []
self._conn_params = conn_params or [] self._conn_params = conn_params or []
# setup connection # setup connection
self._conn = sqlite3.connect(self._dbpath) self._conn = sqlite3.connect(self._dbpath)
self._conn.row_factory = dict_factory self._conn.row_factory = dict_factory
@ -186,7 +186,7 @@ class SqliteSource(object):
setattr(self._conn, param, value) setattr(self._conn, param, value)
def _process_query(self): def _process_query(self):
cursor = self._conn.cursor() cursor = self._conn.cursor()
for row in cursor.execute(self._query, self._args): for row in cursor.execute(self._query, self._args):
@ -196,6 +196,64 @@ class SqliteSource(object):
def __iter__(self): def __iter__(self):
return self._process_query() return self._process_query()
def done(self): def done(self):
self._conn.close() self._conn.close()
class FileSource(object):
""" Base class for sources which read from one or more files.
Takes as input a file-like, a file path, a list of file-likes,
or a list of file paths.
"""
def __init__(self, input):
self._input = input
def __iter__(self):
# This method would be a lot cleaner with the proposed
# 'yield from' expression (PEP 380)
if hasattr(self._input, '__read__'):
for record in self._process_file(input):
yield record
elif isinstance(self._input, basestring):
with open(self._input) as f:
for record in self._process_file(input):
yield record
elif hasattr(self._input, '__iter__'):
for el in self._input:
if isinstance(el, basestring):
with open(el) as f:
for record in self._process_file(f):
yield record
elif hasattr(el, '__read__'):
for record in self._process_file(f):
yield record
def _process_file(self, file):
raise NotImplementedError('Descendants of FileSource should implement'
' a custom _process_file method.')
class JSONSource(FileSource):
""" Source for reading from JSON files.
When processing JSON files, if the top-level object is a list, will
yield each member separately. Otherwise, yields the top-level
object.
"""
def _process_file(self, file):
import json
obj = json.load(file)
# If the top-level JSON object in the file is a list
# then yield each element separately; otherwise, yield
# the top-level object.
if isinstance(obj, list):
for record in obj:
yield record
else:
yield obj