From 8d71088e9a3b547a3b687b02f96d11c52628c0c2 Mon Sep 17 00:00:00 2001 From: James Turk Date: Fri, 14 Nov 2008 23:17:43 +0000 Subject: [PATCH] multithreaded/multiprocess approach to filters --- examples/sopr_lobbyists.py | 2 +- saucebrush/__init__.py | 83 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/examples/sopr_lobbyists.py b/examples/sopr_lobbyists.py index 609855c..b6d1461 100644 --- a/examples/sopr_lobbyists.py +++ b/examples/sopr_lobbyists.py @@ -12,7 +12,7 @@ def process_sopr_filing(sopr_xml_file): from sunlightapi import settings as DJ_SETTINGS DJ_APPLABEL = 'lobbyists' - saucebrush.run_recipe(lobbyists.parse_filings(sopr_xml_file), + saucebrush.run_recipe_multitasking(lobbyists.parse_filings(sopr_xml_file), FieldRemover(['govt_entities', 'affiliated_orgs']), Flattener(['issues', 'lobbyists']), FieldCopier({'issues.filing_id': 'filing.id', diff --git a/saucebrush/__init__.py b/saucebrush/__init__.py index 62f3b14..cef6e9e 100644 --- a/saucebrush/__init__.py +++ b/saucebrush/__init__.py @@ -23,3 +23,86 @@ def run_recipe(source, *filter_args): filter_.done() except AttributeError: pass # don't care if there isn't a done method + + +##### allow selection between threading/processing ##### + +#from threading import Thread as TaskType +#from Queue import Queue +#from threading import activeCount as active_tasks + +from processing import Process as TaskType +from processing import Queue +def active_tasks(): + from processing import activeChildren + return len(activeChildren()) + +######################################################## + +class FilterThread(TaskType): + __create_count = 0 + + def __init__(self, filter, data, queue=True): + super(FilterThread, self).__init__(name='saucebrush-%d' % + FilterThread.__create_count) + FilterThread.__create_count += 1 + self._filter = filter + self._data = data + if queue: + self._queue = Queue() # threading or processing Queue + else: + self._queue = None + + def done(self): + try: + self._filter.done() + except AttributeError: + pass # don't care if there isn't a done method + + def run(self): + try: + for record in iter(self._data): + result = self._filter.process_record(record) + if self._queue: + self._queue.put(result) + if self._queue: + self._queue.put(None) + except Exception, e: + print e + if self._queue: + self._queue.put(None) + print self.getName(), 'exiting.' + + def __iter__(self): + item = True + while item is not None: + item = self._queue.get() + if item is not None: + yield item + +def run_recipe_multitasking(source, *filter_args): + max_tasks = 5 + tasks = [] + + data = source + for filter_ in filter_args: + # this task is the next task's data source + data = FilterThread(filter_, data) + tasks.append(data) + + tasks[-1]._queue = None + + # start all threads (no more than max_threads at once) + tasks_started = 0 + while tasks_started < len(tasks): + if active_tasks() < max_tasks: + tasks[tasks_started].start() + print 'starting task', tasks_started + tasks_started += 1 + + # wait for all threads and call done + for task in tasks: + print 'joining', task + task.join() + print task, 'joined' + task.done() \ No newline at end of file