From 415dc44af225dac6564f4fe68e761c48892ce614 Mon Sep 17 00:00:00 2001 From: James Turk Date: Fri, 5 Aug 2011 15:32:44 -0400 Subject: [PATCH] celery tasks replacing oysterd --- oyster/tasks.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 oyster/tasks.py diff --git a/oyster/tasks.py b/oyster/tasks.py new file mode 100644 index 0000000..79b1197 --- /dev/null +++ b/oyster/tasks.py @@ -0,0 +1,39 @@ +from celery.task import task +from celery.task.base import PeriodicTask +from celery.task.schedules import schedule +from celery.execute import send_task + +from oyster.client import Client + +client = Client() +client.db.status.drop() +client.db.status.insert({'update_queue': 0}) + + +@task(ignore_result=True) +def update_task(doc_id): + # maybe fetch doc instead? + doc = client.db.tracked.find_one({'_id': doc_id}) + client.update(doc) + for hook in doc.get('post_update_hooks', []): + send_task(hook, (doc_id,)) + client.db.status.update({}, {'$inc': {'update_queue': -1}}) + + +class UpdateTaskScheduler(PeriodicTask): + + # 60s tick + run_every = 60 + + def run(self): + + # if the update queue isn't empty, wait to add more + # (currently the only way we avoid duplicates) + # alternate option would be to set a _queued flag on documents + if client.db.status.find_one()['update_queue']: + pass + + next_set = client.get_update_queue() + for doc in next_set: + update_task.delay(doc['_id']) + client.db.status.update({}, {'$inc': {'update_queue': 1}})