diff --git a/changelog.rst b/changelog.rst index 9d1ff17..c641bd9 100644 --- a/changelog.rst +++ b/changelog.rst @@ -3,9 +3,10 @@ oyster changelog 0.3.3 ----- +**2012-04-16** * S3 storage backend bugfix * improvements to signal script - * oyster.ext additions + * oyster.ext cloudsearch and superfastmatch 0.3.2 ----- diff --git a/oyster/core.py b/oyster/core.py index 4a7dfb3..3ad4ba1 100644 --- a/oyster/core.py +++ b/oyster/core.py @@ -37,7 +37,8 @@ class Kernel(object): self.db.status.insert({'update_queue': 0}) # ensure an index on _random - self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)]) + self.db.tracked.ensure_index('_random') + self.db.tracked.ensure_index('url') self.scraper = scrapelib.Scraper(user_agent=user_agent, requests_per_minute=rpm, @@ -103,8 +104,7 @@ class Kernel(object): if id: tracked = self.db.tracked.find_one({'_id': id}) - - if not tracked: + else: tracked = self.db.tracked.find_one({'url': url}) # if id exists, ensure that URL and doc_class haven't changed diff --git a/oyster/ext/cloudsearch.py b/oyster/ext/cloudsearch.py index 5d3f3d7..e9bf1ce 100644 --- a/oyster/ext/cloudsearch.py +++ b/oyster/ext/cloudsearch.py @@ -1,60 +1,16 @@ -import json -import requests +# needed so we can import cloudsearch +from __future__ import absolute_import + from celery.task.base import Task from ..core import kernel from ..conf import settings - -class CloudSearch(object): - - # slightly under 5MB - MAX_BYTES = 5240000 - - def __init__(self, name, domainid, flush_every=None): - self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid) - self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid) - - self._current_batch = [] - self._current_size = 0 - self._flush_every = flush_every - - - def flush(self): - print 'flushing!' - if self._current_batch: - payload = '[{0}]'.format(','.join(self._current_batch)) - resp = requests.post(self.doc_url, payload, - headers={'content-type': 'application/json'}) - self._current_batch = [] - self._current_size = 0 - if resp.status_code >= 400: - # http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/DocumentsBatch.html - raise Exception('{0}: {1}'.format(resp.status_code, resp.text)) - - def add_document(self, id, **kwargs): - newdoc = {'type': 'add', 'version': 1, 'lang': 'en', - 'id': id, 'fields': kwargs} - newjson = json.dumps(newdoc) - newsize = len(newjson) - - if (self._current_size + newsize > self.MAX_BYTES or - (self._flush_every and - len(self._current_batch) > self._flush_every)): - self.flush() - - self._current_batch.append(json.dumps(newdoc)) - self._current_size += newsize - - def search_by_expr(self, q, bq=None, size=10, start=0): - " http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/SearchAPI.html " - params = {'q': q, 'size': size, 'start': start} - if bq: - params['bq'] = bq - return requests.get(self.search_url, params=params).text +from cloudsearch import CloudSearch cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20) + class CloudSearchPush(Task): """ task that updates documents """ # results go straight to database diff --git a/oyster/ext/superfastmatch.py b/oyster/ext/superfastmatch.py new file mode 100644 index 0000000..b36a21e --- /dev/null +++ b/oyster/ext/superfastmatch.py @@ -0,0 +1,23 @@ +# needed so we can import superfastmatch.Client +from __future__ import absolute_import +from celery.task.base import Task + +from ..core import kernel +from ..conf import settings + +from superfastmatch import Client + +sfm = Client(settings.SUPERFASTMATCH_URL) + + +class SuperFastMatchPush(Task): + """ task that pushes documents to SFM """ + + # results go straight to database + ignore_result = True + + def run(self, doc_id): + doc = kernel.db.tracked.find_one({'_id': doc_id}) + text = kernel.extract_text(doc) + doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id) + sfm.add(doctype, docid, text, defer=True, **doc['metadata'])