logging and flush_every on cloud search
This commit is contained in:
parent
b60ee969ca
commit
8510a30175
@ -11,12 +11,13 @@ class CloudSearch(object):
|
|||||||
# slightly under 5MB
|
# slightly under 5MB
|
||||||
MAX_BYTES = 5240000
|
MAX_BYTES = 5240000
|
||||||
|
|
||||||
def __init__(self, name, domainid):
|
def __init__(self, name, domainid, flush_every=None):
|
||||||
self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid)
|
self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid)
|
||||||
self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid)
|
self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid)
|
||||||
|
|
||||||
self._current_batch = []
|
self._current_batch = []
|
||||||
self._current_size = 0
|
self._current_size = 0
|
||||||
|
self._flush_very = flush_every
|
||||||
|
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
@ -36,7 +37,9 @@ class CloudSearch(object):
|
|||||||
newjson = json.dumps(newdoc)
|
newjson = json.dumps(newdoc)
|
||||||
newsize = len(newjson)
|
newsize = len(newjson)
|
||||||
|
|
||||||
if self._current_size + newsize > self.MAX_BYTES:
|
if ((self._current_size + newsize > self.MAX_BYTES) or
|
||||||
|
(self._flush_every and len(self._current_batch > self._flush_every)
|
||||||
|
)):
|
||||||
self.flush()
|
self.flush()
|
||||||
|
|
||||||
self._current_batch.append(json.dumps(newdoc))
|
self._current_batch.append(json.dumps(newdoc))
|
||||||
@ -49,6 +52,7 @@ class CloudSearch(object):
|
|||||||
params['bq'] = bq
|
params['bq'] = bq
|
||||||
return requests.get(self.search_url, params=params).text
|
return requests.get(self.search_url, params=params).text
|
||||||
|
|
||||||
|
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
|
||||||
|
|
||||||
class CloudSearchPush(Task):
|
class CloudSearchPush(Task):
|
||||||
""" task that updates documents """
|
""" task that updates documents """
|
||||||
@ -57,7 +61,6 @@ class CloudSearchPush(Task):
|
|||||||
|
|
||||||
# a bit under 1MB
|
# a bit under 1MB
|
||||||
MAX_BYTES = 1048000
|
MAX_BYTES = 1048000
|
||||||
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID)
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
def run(self, doc_id):
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||||
@ -65,6 +68,8 @@ class CloudSearchPush(Task):
|
|||||||
pieces = [text[i:i+self.MAX_BYTES] for i in
|
pieces = [text[i:i+self.MAX_BYTES] for i in
|
||||||
xrange(0, len(text), self.MAX_BYTES)]
|
xrange(0, len(text), self.MAX_BYTES)]
|
||||||
|
|
||||||
|
self.get_logger().debug('adding {0} pieces for {1}'.format(
|
||||||
|
len(pieces), doc_id))
|
||||||
for i, piece in enumerate(pieces):
|
for i, piece in enumerate(pieces):
|
||||||
cloud_id = '%s_%s' % (doc_id.lower(), i)
|
cloud_id = '%s_%s' % (doc_id.lower(), i)
|
||||||
self.cs.add_document(cloud_id, text=piece, **doc['metadata'])
|
cs.add_document(cloud_id, text=piece, **doc['metadata'])
|
||||||
|
@ -27,10 +27,18 @@ class UpdateTaskScheduler(PeriodicTask):
|
|||||||
# if the update queue isn't empty, wait to add more
|
# if the update queue isn't empty, wait to add more
|
||||||
# (currently the only way we avoid duplicates)
|
# (currently the only way we avoid duplicates)
|
||||||
# alternate option would be to set a _queued flag on documents
|
# alternate option would be to set a _queued flag on documents
|
||||||
if kernel.db.status.find_one()['update_queue']:
|
update_queue_size = kernel.db.status.find_one()['update_queue']
|
||||||
|
if update_queue_size:
|
||||||
|
self.get_logger().debug('waiting, update_queue_size={0}'.format(
|
||||||
|
update_queue_size))
|
||||||
return
|
return
|
||||||
|
|
||||||
next_set = kernel.get_update_queue()
|
next_set = kernel.get_update_queue()
|
||||||
|
if next_set:
|
||||||
|
self.get_logger().debug('repopulating update_queue')
|
||||||
|
else:
|
||||||
|
self.get_logger().debug('kernel.update_queue empty')
|
||||||
|
|
||||||
for doc in next_set:
|
for doc in next_set:
|
||||||
UpdateTask.delay(doc['_id'])
|
UpdateTask.delay(doc['_id'])
|
||||||
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})
|
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})
|
||||||
|
Loading…
Reference in New Issue
Block a user