From 7b096d76d079ceab6fdef5a7991b10367ff8e00f Mon Sep 17 00:00:00 2001 From: James Turk Date: Fri, 13 Apr 2012 13:14:57 -0400 Subject: [PATCH] add oyster.ext.cloudsearch --- oyster/ext/__init__.py | 0 oyster/ext/cloudsearch.py | 67 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 oyster/ext/__init__.py create mode 100644 oyster/ext/cloudsearch.py diff --git a/oyster/ext/__init__.py b/oyster/ext/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oyster/ext/cloudsearch.py b/oyster/ext/cloudsearch.py new file mode 100644 index 0000000..b5afdc6 --- /dev/null +++ b/oyster/ext/cloudsearch.py @@ -0,0 +1,67 @@ +import json +import requests +from celery.task.base import Task +from ..core import kernel + + +class CloudSearch(object): + + # slightly under 5MB + MAX_BYTES = 5240000 + + def __init__(self, name, domainid): + self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid) + self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid) + + self._current_batch = [] + self._current_size = 0 + + + def flush(self): + if self._current_batch: + payload = '[{0}]'.format(','.join(self._current_batch)) + resp = requests.post(self.doc_url, payload, + headers={'content-type': 'application/json'}) + self._current_batch = [] + self._current_size = 0 + if resp.status_code >= 400: + # http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/DocumentsBatch.html + print resp.status_code, resp.text + + def add_document(self, id, **kwargs): + newdoc = {'type': 'add', 'version': 1, 'lang': 'en', + 'id': id, 'fields': kwargs} + newjson = json.dumps(newdoc) + newsize = len(newjson) + + if self._current_size + newsize > self.MAX_BYTES: + self.flush() + + self._current_batch.append(json.dumps(newdoc)) + self._current_size += newsize + + def search_by_expr(self, q, bq=None, size=10, start=0): + " http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/SearchAPI.html " + params = {'q': q, 'size': size, 'start': start} + if bq: + params['bq'] = bq + return requests.get(self.search_url, params=params).text + + +class CloudSearchPush(Task): + """ task that updates documents """ + # results go straight to database + ignore_result = True + + # a bit under 1MB + MAX_BYTES = 1048000 + cs = CloudSearch('openstates-billtext', '57xota2lexdoaymh5l56sh3vdm') + + def run(self, doc_id): + doc = kernel.db.tracked.find_one({'_id': doc_id}) + text = kernel.extract_text(doc) + pieces = [text[i:i+MAX_BYTES] for i in xrange(0, len(text), MAX_BYTES)] + + for i, piece in enumerate(pieces): + cloud_id = '%s_%s' % (doc_id.lower(), i) + cs.add_document(cloud_id, text=piece, **doc['metadata'])