Merge branch 'master' of github.com:sunlightlabs/oyster
This commit is contained in:
commit
8571042b05
@ -3,9 +3,10 @@ oyster changelog
|
|||||||
|
|
||||||
0.3.3
|
0.3.3
|
||||||
-----
|
-----
|
||||||
|
**2012-04-16**
|
||||||
* S3 storage backend bugfix
|
* S3 storage backend bugfix
|
||||||
* improvements to signal script
|
* improvements to signal script
|
||||||
* oyster.ext additions
|
* oyster.ext cloudsearch and superfastmatch
|
||||||
|
|
||||||
0.3.2
|
0.3.2
|
||||||
-----
|
-----
|
||||||
|
@ -37,7 +37,8 @@ class Kernel(object):
|
|||||||
self.db.status.insert({'update_queue': 0})
|
self.db.status.insert({'update_queue': 0})
|
||||||
|
|
||||||
# ensure an index on _random
|
# ensure an index on _random
|
||||||
self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)])
|
self.db.tracked.ensure_index('_random')
|
||||||
|
self.db.tracked.ensure_index('url')
|
||||||
|
|
||||||
self.scraper = scrapelib.Scraper(user_agent=user_agent,
|
self.scraper = scrapelib.Scraper(user_agent=user_agent,
|
||||||
requests_per_minute=rpm,
|
requests_per_minute=rpm,
|
||||||
@ -103,8 +104,7 @@ class Kernel(object):
|
|||||||
|
|
||||||
if id:
|
if id:
|
||||||
tracked = self.db.tracked.find_one({'_id': id})
|
tracked = self.db.tracked.find_one({'_id': id})
|
||||||
|
else:
|
||||||
if not tracked:
|
|
||||||
tracked = self.db.tracked.find_one({'url': url})
|
tracked = self.db.tracked.find_one({'url': url})
|
||||||
|
|
||||||
# if id exists, ensure that URL and doc_class haven't changed
|
# if id exists, ensure that URL and doc_class haven't changed
|
||||||
|
@ -1,60 +1,16 @@
|
|||||||
import json
|
# needed so we can import cloudsearch
|
||||||
import requests
|
from __future__ import absolute_import
|
||||||
|
|
||||||
from celery.task.base import Task
|
from celery.task.base import Task
|
||||||
|
|
||||||
from ..core import kernel
|
from ..core import kernel
|
||||||
from ..conf import settings
|
from ..conf import settings
|
||||||
|
|
||||||
|
from cloudsearch import CloudSearch
|
||||||
class CloudSearch(object):
|
|
||||||
|
|
||||||
# slightly under 5MB
|
|
||||||
MAX_BYTES = 5240000
|
|
||||||
|
|
||||||
def __init__(self, name, domainid, flush_every=None):
|
|
||||||
self.doc_url = 'http://doc-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/documents/batch'.format(name, domainid)
|
|
||||||
self.search_url = 'http://search-{0}-{1}.us-east-1.cloudsearch.amazonaws.com/2011-02-01/search'.format(name, domainid)
|
|
||||||
|
|
||||||
self._current_batch = []
|
|
||||||
self._current_size = 0
|
|
||||||
self._flush_every = flush_every
|
|
||||||
|
|
||||||
|
|
||||||
def flush(self):
|
|
||||||
print 'flushing!'
|
|
||||||
if self._current_batch:
|
|
||||||
payload = '[{0}]'.format(','.join(self._current_batch))
|
|
||||||
resp = requests.post(self.doc_url, payload,
|
|
||||||
headers={'content-type': 'application/json'})
|
|
||||||
self._current_batch = []
|
|
||||||
self._current_size = 0
|
|
||||||
if resp.status_code >= 400:
|
|
||||||
# http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/DocumentsBatch.html
|
|
||||||
raise Exception('{0}: {1}'.format(resp.status_code, resp.text))
|
|
||||||
|
|
||||||
def add_document(self, id, **kwargs):
|
|
||||||
newdoc = {'type': 'add', 'version': 1, 'lang': 'en',
|
|
||||||
'id': id, 'fields': kwargs}
|
|
||||||
newjson = json.dumps(newdoc)
|
|
||||||
newsize = len(newjson)
|
|
||||||
|
|
||||||
if (self._current_size + newsize > self.MAX_BYTES or
|
|
||||||
(self._flush_every and
|
|
||||||
len(self._current_batch) > self._flush_every)):
|
|
||||||
self.flush()
|
|
||||||
|
|
||||||
self._current_batch.append(json.dumps(newdoc))
|
|
||||||
self._current_size += newsize
|
|
||||||
|
|
||||||
def search_by_expr(self, q, bq=None, size=10, start=0):
|
|
||||||
" http://docs.amazonwebservices.com/cloudsearch/latest/developerguide/SearchAPI.html "
|
|
||||||
params = {'q': q, 'size': size, 'start': start}
|
|
||||||
if bq:
|
|
||||||
params['bq'] = bq
|
|
||||||
return requests.get(self.search_url, params=params).text
|
|
||||||
|
|
||||||
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
|
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
|
||||||
|
|
||||||
|
|
||||||
class CloudSearchPush(Task):
|
class CloudSearchPush(Task):
|
||||||
""" task that updates documents """
|
""" task that updates documents """
|
||||||
# results go straight to database
|
# results go straight to database
|
||||||
|
23
oyster/ext/superfastmatch.py
Normal file
23
oyster/ext/superfastmatch.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# needed so we can import superfastmatch.Client
|
||||||
|
from __future__ import absolute_import
|
||||||
|
from celery.task.base import Task
|
||||||
|
|
||||||
|
from ..core import kernel
|
||||||
|
from ..conf import settings
|
||||||
|
|
||||||
|
from superfastmatch import Client
|
||||||
|
|
||||||
|
sfm = Client(settings.SUPERFASTMATCH_URL)
|
||||||
|
|
||||||
|
|
||||||
|
class SuperFastMatchPush(Task):
|
||||||
|
""" task that pushes documents to SFM """
|
||||||
|
|
||||||
|
# results go straight to database
|
||||||
|
ignore_result = True
|
||||||
|
|
||||||
|
def run(self, doc_id):
|
||||||
|
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||||
|
text = kernel.extract_text(doc)
|
||||||
|
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
|
||||||
|
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])
|
Loading…
Reference in New Issue
Block a user