fix for celery tasks to use configured client
This commit is contained in:
parent
c93a8d7b3b
commit
67efb9b681
@ -22,3 +22,8 @@ Requirements
|
|||||||
* mongodb 1.8
|
* mongodb 1.8
|
||||||
* pymongo 1.11
|
* pymongo 1.11
|
||||||
* scrapelib
|
* scrapelib
|
||||||
|
|
||||||
|
Usage
|
||||||
|
=====
|
||||||
|
|
||||||
|
* Run celeryd with beat ``celeryd -B``
|
||||||
|
8
oyster/celeryconfig.py
Normal file
8
oyster/celeryconfig.py
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
CELERY_IMPORTS = ("oyster.tasks",)
|
||||||
|
|
||||||
|
BROKER_TRANSPORT = 'mongodb'
|
||||||
|
CELERY_RESULT_BACKEND = 'mongodb'
|
||||||
|
CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||||
|
'host': 'localhost',
|
||||||
|
'port': 27017,
|
||||||
|
}
|
@ -10,5 +10,3 @@ REQUESTS_PER_MINUTE = 300
|
|||||||
REQUEST_TIMEOUT = 0
|
REQUEST_TIMEOUT = 0
|
||||||
RETRY_ATTEMPTS = 0
|
RETRY_ATTEMPTS = 0
|
||||||
RETRY_WAIT_SECONDS = 5
|
RETRY_WAIT_SECONDS = 5
|
||||||
|
|
||||||
|
|
||||||
|
@ -10,16 +10,16 @@ class UpdateTask(Task):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# one client per process
|
# one client per process
|
||||||
client = get_configured_client()
|
self.client = get_configured_client()
|
||||||
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
def run(self, doc_id):
|
||||||
# maybe fetch doc instead?
|
# maybe fetch doc instead?
|
||||||
doc = client.db.tracked.find_one({'_id': doc_id})
|
doc = self.client.db.tracked.find_one({'_id': doc_id})
|
||||||
client.update(doc)
|
self.client.update(doc)
|
||||||
for hook in doc.get('post_update_hooks', []):
|
for hook in doc.get('post_update_hooks', []):
|
||||||
send_task(hook, (doc_id,))
|
send_task(hook, (doc_id,))
|
||||||
client.db.status.update({}, {'$inc': {'update_queue': -1}})
|
self.client.db.status.update({}, {'$inc': {'update_queue': -1}})
|
||||||
|
|
||||||
|
|
||||||
class UpdateTaskScheduler(PeriodicTask):
|
class UpdateTaskScheduler(PeriodicTask):
|
||||||
@ -27,15 +27,18 @@ class UpdateTaskScheduler(PeriodicTask):
|
|||||||
# 60s tick
|
# 60s tick
|
||||||
run_every = 60
|
run_every = 60
|
||||||
|
|
||||||
def run(self):
|
def __init__(self):
|
||||||
|
self.client = get_configured_client()
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
# if the update queue isn't empty, wait to add more
|
# if the update queue isn't empty, wait to add more
|
||||||
# (currently the only way we avoid duplicates)
|
# (currently the only way we avoid duplicates)
|
||||||
# alternate option would be to set a _queued flag on documents
|
# alternate option would be to set a _queued flag on documents
|
||||||
if client.db.status.find_one()['update_queue']:
|
if self.client.db.status.find_one()['update_queue']:
|
||||||
return
|
return
|
||||||
|
|
||||||
next_set = client.get_update_queue()
|
next_set = self.client.get_update_queue()
|
||||||
for doc in next_set:
|
for doc in next_set:
|
||||||
update_task.delay(doc['_id'])
|
update_task.delay(doc['_id'])
|
||||||
client.db.status.update({}, {'$inc': {'update_queue': 1}})
|
self.client.db.status.update({}, {'$inc': {'update_queue': 1}})
|
||||||
|
Loading…
Reference in New Issue
Block a user