diff --git a/README.rst b/README.rst index 0997e92..11d92b9 100644 --- a/README.rst +++ b/README.rst @@ -22,3 +22,8 @@ Requirements * mongodb 1.8 * pymongo 1.11 * scrapelib + +Usage +===== + +* Run celeryd with beat ``celeryd -B`` diff --git a/oyster/celeryconfig.py b/oyster/celeryconfig.py new file mode 100644 index 0000000..f64f8c9 --- /dev/null +++ b/oyster/celeryconfig.py @@ -0,0 +1,8 @@ +CELERY_IMPORTS = ("oyster.tasks",) + +BROKER_TRANSPORT = 'mongodb' +CELERY_RESULT_BACKEND = 'mongodb' +CELERY_MONGODB_BACKEND_SETTINGS = { + 'host': 'localhost', + 'port': 27017, +} diff --git a/oyster/conf/default_settings.py b/oyster/conf/default_settings.py index ddd7b77..0bc138f 100644 --- a/oyster/conf/default_settings.py +++ b/oyster/conf/default_settings.py @@ -10,5 +10,3 @@ REQUESTS_PER_MINUTE = 300 REQUEST_TIMEOUT = 0 RETRY_ATTEMPTS = 0 RETRY_WAIT_SECONDS = 5 - - diff --git a/oyster/tasks.py b/oyster/tasks.py index e483639..4758e12 100644 --- a/oyster/tasks.py +++ b/oyster/tasks.py @@ -10,16 +10,16 @@ class UpdateTask(Task): def __init__(self): # one client per process - client = get_configured_client() + self.client = get_configured_client() def run(self, doc_id): # maybe fetch doc instead? - doc = client.db.tracked.find_one({'_id': doc_id}) - client.update(doc) + doc = self.client.db.tracked.find_one({'_id': doc_id}) + self.client.update(doc) for hook in doc.get('post_update_hooks', []): send_task(hook, (doc_id,)) - client.db.status.update({}, {'$inc': {'update_queue': -1}}) + self.client.db.status.update({}, {'$inc': {'update_queue': -1}}) class UpdateTaskScheduler(PeriodicTask): @@ -27,15 +27,18 @@ class UpdateTaskScheduler(PeriodicTask): # 60s tick run_every = 60 - def run(self): + def __init__(self): + self.client = get_configured_client() + + def run(self): # if the update queue isn't empty, wait to add more # (currently the only way we avoid duplicates) # alternate option would be to set a _queued flag on documents - if client.db.status.find_one()['update_queue']: + if self.client.db.status.find_one()['update_queue']: return - next_set = client.get_update_queue() + next_set = self.client.get_update_queue() for doc in next_set: update_task.delay(doc['_id']) - client.db.status.update({}, {'$inc': {'update_queue': 1}}) + self.client.db.status.update({}, {'$inc': {'update_queue': 1}})