From db718d3190ed2971d5bbca6b2b787a3bedd5cda7 Mon Sep 17 00:00:00 2001 From: James Turk Date: Fri, 29 Jul 2011 13:04:44 -0400 Subject: [PATCH] oysterd --- oyster/bin/daemon.py | 59 ----------------- oyster/bin/oysterd.py | 103 +++++++++++++++++++++++++++++ oyster/bin/{oyster.py => shell.py} | 0 3 files changed, 103 insertions(+), 59 deletions(-) delete mode 100644 oyster/bin/daemon.py create mode 100644 oyster/bin/oysterd.py rename oyster/bin/{oyster.py => shell.py} (100%) diff --git a/oyster/bin/daemon.py b/oyster/bin/daemon.py deleted file mode 100644 index 05728af..0000000 --- a/oyster/bin/daemon.py +++ /dev/null @@ -1,59 +0,0 @@ -import multiprocessing -import time -import urllib - -from ..client import Client - -class UpdateProcess(multiprocessing.Process): - - def __init__(self, task_q): - super(UpdateProcess, self).__init__() - self.task_q = task_q - self.client = Client() - - - def run(self): - while True: - task = self.task_q.get() - - # break on 'None' poison pill - if task is None: - self.task_q.task_done() - break - - # update tracked document - self.client.update(task) - - # decrement count for semaphore - self.task_q.task_done() - - -def main(): - num_processes = 4 - sleep_upon_exhaustion = 60 - - c = Client() - work_queue = multiprocessing.JoinableQueue() - workers = [UpdateProcess(work_queue) for i in xrange(num_processes)] - for w in workers: - w.start() - - # go forever - while True: - - # get everything overdue and put it into the queue - next_set = c.get_update_queue() - if next_set: - for item in next_set: - work_queue.put(item) - - # do all queued work - work_queue.join() - - else: - # allow for a quiet period if queue is exhausted - time.sleep(sleep_upon_exhaustion) - - -if __name__ == '__main__': - main() diff --git a/oyster/bin/oysterd.py b/oyster/bin/oysterd.py new file mode 100644 index 0000000..02e3505 --- /dev/null +++ b/oyster/bin/oysterd.py @@ -0,0 +1,103 @@ +import multiprocessing +import signal +import threading +import time +import urllib + +from oyster.client import Client +from oyster.web import app + + +class UpdateProcess(multiprocessing.Process): + + def __init__(self, task_q): + super(UpdateProcess, self).__init__() + self.task_q = task_q + self.client = Client() + + + def run(self): + while True: + task = self.task_q.get() + + # break on 'None' poison pill + if task is None: + self.task_q.task_done() + break + + # update tracked document + self.client.update(task) + + # decrement count for semaphore + self.task_q.task_done() + + +class Producer(threading.Thread): + def __init__(self, queue, sleep_length): + super(Producer, self).__init__() + self.queue = queue + self.sleep_length = sleep_length + self.client = Client() + self._stop = threading.Event() + + def run(self): + # go forever + while not self.stopped(): + + # get everything overdue and put it into the queue + next_set = self.client.get_update_queue() + if next_set: + for item in next_set: + self.queue.put(item) + + # do all queued work + self.queue.join() + + else: + # allow for a quiet period if queue is exhausted + time.sleep(self.sleep_length) + + def stop(self): + self._stop.set() + + def stopped(self): + return self._stop.is_set() + + +def main_process(): + num_processes = 4 + + work_queue = multiprocessing.JoinableQueue() + producer = Producer(work_queue, 60) + workers = [UpdateProcess(work_queue) for i in xrange(num_processes)] + for w in workers: + w.start() + producer.start() + + +def flask_process(): + app.run(debug=True) + + +def main(): + num_processes = 4 + debug = True + + work_queue = multiprocessing.JoinableQueue() + producer = Producer(work_queue, 60) + workers = [UpdateProcess(work_queue) for i in xrange(num_processes)] + server = multiprocessing.Process(target=flask_process) + + def cleanup(signal, frame): + for worker in workers: + worker.terminate() + producer.stop() + server.terminate() + + for worker in workers: + worker.start() + producer.start() + server.start() + +if __name__ == '__main__': + main() diff --git a/oyster/bin/oyster.py b/oyster/bin/shell.py similarity index 100% rename from oyster/bin/oyster.py rename to oyster/bin/shell.py