From 8510a3017540128578b3d6cfc88839a8fb1c326d Mon Sep 17 00:00:00 2001 From: James Turk Date: Fri, 13 Apr 2012 23:41:49 -0400 Subject: [PATCH] logging and flush_every on cloud search --- oyster/ext/cloudsearch.py | 13 +++++++++---- oyster/tasks.py | 10 +++++++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/oyster/ext/cloudsearch.py b/oyster/ext/cloudsearch.py index 1a9459c..afa7720 100644 --- a/oyster/ext/cloudsearch.py +++ b/oyster/ext/cloudsearch.py @@ -11,12 +11,13 @@ class CloudSearch(object): # slightly under 5MB MAX_BYTES = 5240000 - def __init__(self, name, domainid): + def __init__(self, name, domainid, flush_every=None): self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid) self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid) self._current_batch = [] self._current_size = 0 + self._flush_very = flush_every def flush(self): @@ -36,7 +37,9 @@ class CloudSearch(object): newjson = json.dumps(newdoc) newsize = len(newjson) - if self._current_size + newsize > self.MAX_BYTES: + if ((self._current_size + newsize > self.MAX_BYTES) or + (self._flush_every and len(self._current_batch > self._flush_every) + )): self.flush() self._current_batch.append(json.dumps(newdoc)) @@ -49,6 +52,7 @@ class CloudSearch(object): params['bq'] = bq return requests.get(self.search_url, params=params).text +cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20) class CloudSearchPush(Task): """ task that updates documents """ @@ -57,7 +61,6 @@ class CloudSearchPush(Task): # a bit under 1MB MAX_BYTES = 1048000 - cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID) def run(self, doc_id): doc = kernel.db.tracked.find_one({'_id': doc_id}) @@ -65,6 +68,8 @@ class CloudSearchPush(Task): pieces = [text[i:i+self.MAX_BYTES] for i in xrange(0, len(text), self.MAX_BYTES)] + self.get_logger().debug('adding {0} pieces for {1}'.format( + len(pieces), doc_id)) for i, piece in enumerate(pieces): cloud_id = '%s_%s' % (doc_id.lower(), i) - self.cs.add_document(cloud_id, text=piece, **doc['metadata']) + cs.add_document(cloud_id, text=piece, **doc['metadata']) diff --git a/oyster/tasks.py b/oyster/tasks.py index c13f90e..873f05e 100644 --- a/oyster/tasks.py +++ b/oyster/tasks.py @@ -27,10 +27,18 @@ class UpdateTaskScheduler(PeriodicTask): # if the update queue isn't empty, wait to add more # (currently the only way we avoid duplicates) # alternate option would be to set a _queued flag on documents - if kernel.db.status.find_one()['update_queue']: + update_queue_size = kernel.db.status.find_one()['update_queue'] + if update_queue_size: + self.get_logger().debug('waiting, update_queue_size={0}'.format( + update_queue_size)) return next_set = kernel.get_update_queue() + if next_set: + self.get_logger().debug('repopulating update_queue') + else: + self.get_logger().debug('kernel.update_queue empty') + for doc in next_set: UpdateTask.delay(doc['_id']) kernel.db.status.update({}, {'$inc': {'update_queue': 1}})