multithreaded/multiprocess approach to filters
This commit is contained in:
parent
163d92d44b
commit
8d71088e9a
@ -12,7 +12,7 @@ def process_sopr_filing(sopr_xml_file):
|
|||||||
from sunlightapi import settings as DJ_SETTINGS
|
from sunlightapi import settings as DJ_SETTINGS
|
||||||
DJ_APPLABEL = 'lobbyists'
|
DJ_APPLABEL = 'lobbyists'
|
||||||
|
|
||||||
saucebrush.run_recipe(lobbyists.parse_filings(sopr_xml_file),
|
saucebrush.run_recipe_multitasking(lobbyists.parse_filings(sopr_xml_file),
|
||||||
FieldRemover(['govt_entities', 'affiliated_orgs']),
|
FieldRemover(['govt_entities', 'affiliated_orgs']),
|
||||||
Flattener(['issues', 'lobbyists']),
|
Flattener(['issues', 'lobbyists']),
|
||||||
FieldCopier({'issues.filing_id': 'filing.id',
|
FieldCopier({'issues.filing_id': 'filing.id',
|
||||||
|
@ -23,3 +23,86 @@ def run_recipe(source, *filter_args):
|
|||||||
filter_.done()
|
filter_.done()
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass # don't care if there isn't a done method
|
pass # don't care if there isn't a done method
|
||||||
|
|
||||||
|
|
||||||
|
##### allow selection between threading/processing #####
|
||||||
|
|
||||||
|
#from threading import Thread as TaskType
|
||||||
|
#from Queue import Queue
|
||||||
|
#from threading import activeCount as active_tasks
|
||||||
|
|
||||||
|
from processing import Process as TaskType
|
||||||
|
from processing import Queue
|
||||||
|
def active_tasks():
|
||||||
|
from processing import activeChildren
|
||||||
|
return len(activeChildren())
|
||||||
|
|
||||||
|
########################################################
|
||||||
|
|
||||||
|
class FilterThread(TaskType):
|
||||||
|
__create_count = 0
|
||||||
|
|
||||||
|
def __init__(self, filter, data, queue=True):
|
||||||
|
super(FilterThread, self).__init__(name='saucebrush-%d' %
|
||||||
|
FilterThread.__create_count)
|
||||||
|
FilterThread.__create_count += 1
|
||||||
|
self._filter = filter
|
||||||
|
self._data = data
|
||||||
|
if queue:
|
||||||
|
self._queue = Queue() # threading or processing Queue
|
||||||
|
else:
|
||||||
|
self._queue = None
|
||||||
|
|
||||||
|
def done(self):
|
||||||
|
try:
|
||||||
|
self._filter.done()
|
||||||
|
except AttributeError:
|
||||||
|
pass # don't care if there isn't a done method
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
try:
|
||||||
|
for record in iter(self._data):
|
||||||
|
result = self._filter.process_record(record)
|
||||||
|
if self._queue:
|
||||||
|
self._queue.put(result)
|
||||||
|
if self._queue:
|
||||||
|
self._queue.put(None)
|
||||||
|
except Exception, e:
|
||||||
|
print e
|
||||||
|
if self._queue:
|
||||||
|
self._queue.put(None)
|
||||||
|
print self.getName(), 'exiting.'
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
item = True
|
||||||
|
while item is not None:
|
||||||
|
item = self._queue.get()
|
||||||
|
if item is not None:
|
||||||
|
yield item
|
||||||
|
|
||||||
|
def run_recipe_multitasking(source, *filter_args):
|
||||||
|
max_tasks = 5
|
||||||
|
tasks = []
|
||||||
|
|
||||||
|
data = source
|
||||||
|
for filter_ in filter_args:
|
||||||
|
# this task is the next task's data source
|
||||||
|
data = FilterThread(filter_, data)
|
||||||
|
tasks.append(data)
|
||||||
|
|
||||||
|
tasks[-1]._queue = None
|
||||||
|
|
||||||
|
# start all threads (no more than max_threads at once)
|
||||||
|
tasks_started = 0
|
||||||
|
while tasks_started < len(tasks):
|
||||||
|
if active_tasks() < max_tasks:
|
||||||
|
tasks[tasks_started].start()
|
||||||
|
print 'starting task', tasks_started
|
||||||
|
tasks_started += 1
|
||||||
|
|
||||||
|
# wait for all threads and call done
|
||||||
|
for task in tasks:
|
||||||
|
print 'joining', task
|
||||||
|
task.join()
|
||||||
|
print task, 'joined'
|
||||||
|
task.done()
|
Loading…
Reference in New Issue
Block a user