improve how storage is handled
This commit is contained in:
parent
39e812c891
commit
86ea27c62b
@ -15,3 +15,5 @@ REQUEST_TIMEOUT = 300
|
||||
# other
|
||||
RETRY_ATTEMPTS = 3
|
||||
RETRY_WAIT_MINUTES = 60
|
||||
|
||||
DOCUMENT_CLASSES = {}
|
||||
|
@ -7,9 +7,7 @@ import urllib
|
||||
import pymongo
|
||||
import scrapelib
|
||||
|
||||
from .storage.gridfs import GridFSStorage
|
||||
from .storage.s3 import S3Storage
|
||||
|
||||
from .storage import engines
|
||||
|
||||
class Kernel(object):
|
||||
""" oyster's workhorse, handles tracking """
|
||||
@ -17,7 +15,9 @@ class Kernel(object):
|
||||
def __init__(self, mongo_host='localhost', mongo_port=27017,
|
||||
mongo_db='oyster', mongo_log_maxsize=100000000,
|
||||
user_agent='oyster', rpm=60, timeout=300,
|
||||
retry_attempts=3, retry_wait_minutes=60):
|
||||
retry_attempts=3, retry_wait_minutes=60,
|
||||
doc_classes=None,
|
||||
):
|
||||
"""
|
||||
configurable for ease of testing, only one should be instantiated
|
||||
"""
|
||||
@ -28,11 +28,9 @@ class Kernel(object):
|
||||
self.db.create_collection('logs', capped=True,
|
||||
size=mongo_log_maxsize)
|
||||
except pymongo.errors.CollectionInvalid:
|
||||
# cap collection if not capped?
|
||||
pass
|
||||
|
||||
# create storage class
|
||||
self.storage = S3Storage(self)
|
||||
|
||||
# create status document if it doesn't exist
|
||||
if self.db.status.count() == 0:
|
||||
self.db.status.insert({'update_queue': 0})
|
||||
@ -49,7 +47,19 @@ class Kernel(object):
|
||||
self.retry_attempts = retry_attempts
|
||||
self.retry_wait_minutes = retry_wait_minutes
|
||||
|
||||
self.doc_classes = {}
|
||||
# load engines
|
||||
self.storage = {}
|
||||
for name, StorageCls in engines.iteritems():
|
||||
self.storage[name] = StorageCls(self)
|
||||
|
||||
# set document classes
|
||||
_doc_class_fields = ('update_mins', 'storage_engine')
|
||||
self.doc_classes = doc_classes or {}
|
||||
for dc_name, dc_props in self.doc_classes.iteritems():
|
||||
for key in _doc_class_fields:
|
||||
if key not in dc_props:
|
||||
raise ValueError('doc_class %s missing key %s' %
|
||||
(dc_name, key))
|
||||
|
||||
|
||||
def _wipe(self):
|
||||
@ -69,10 +79,7 @@ class Kernel(object):
|
||||
|
||||
|
||||
def _add_doc_class(self, doc_class, **properties):
|
||||
if doc_class in self.doc_classes:
|
||||
raise ValueError('attempt to re-add doc_class: %s' % doc_class)
|
||||
else:
|
||||
self.doc_classes[doc_class] = properties
|
||||
self.doc_classes[doc_class] = properties
|
||||
|
||||
|
||||
def track_url(self, url, doc_class, **kwargs):
|
||||
@ -128,8 +135,14 @@ class Kernel(object):
|
||||
new_version = True
|
||||
error = False
|
||||
now = datetime.datetime.utcnow()
|
||||
# FIXME
|
||||
update_mins = self.doc_classes[doc['doc_class']].get('update_mins', 60)
|
||||
|
||||
try:
|
||||
doc_class = self.doc_classes[doc['doc_class']]
|
||||
except KeyError:
|
||||
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
|
||||
|
||||
update_mins = doc_class['update_mins']
|
||||
storage = self.storage[doc_class['storage_engine']]
|
||||
|
||||
# fetch strategies could be implemented here as well
|
||||
try:
|
||||
@ -142,18 +155,15 @@ class Kernel(object):
|
||||
|
||||
# only do versioning check if at least one version exists
|
||||
if new_version and doc['versions']:
|
||||
# room here for different versioning schemes:
|
||||
# versioning functions take doc & data, and return True if data is
|
||||
# different, since they have access to doc, they can also modify
|
||||
# certain attributes as needed
|
||||
olddata = self.storage.get(doc['versions'][-1]['storage_key'])
|
||||
# room here for different versioning schemes
|
||||
olddata = storage.get(doc['versions'][-1]['storage_key'])
|
||||
new_version = self.md5_versioning(olddata, newdata)
|
||||
|
||||
if new_version:
|
||||
storage_id = self.storage.put(doc, newdata, content_type)
|
||||
storage_id = storage.put(doc, newdata, content_type)
|
||||
doc['versions'].append({'timestamp': now,
|
||||
'storage_key': storage_id,
|
||||
'storage_type': self.storage.storage_type,
|
||||
'storage_type': storage.storage_type,
|
||||
})
|
||||
|
||||
if error:
|
||||
@ -223,6 +233,8 @@ def _get_configured_kernel():
|
||||
rpm=settings.REQUESTS_PER_MINUTE,
|
||||
timeout=settings.REQUEST_TIMEOUT,
|
||||
retry_attempts=settings.RETRY_ATTEMPTS,
|
||||
retry_wait_minutes=settings.RETRY_WAIT_MINUTES)
|
||||
retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
|
||||
doc_classes=settings.DOCUMENT_CLASSES,
|
||||
)
|
||||
|
||||
kernel = _get_configured_kernel()
|
||||
|
@ -0,0 +1,19 @@
|
||||
|
||||
engines = {}
|
||||
try:
|
||||
from .dummy import DummyStorage
|
||||
engines['dummy'] = DummyStorage
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from .s3 import S3Storage
|
||||
engines['s3'] = S3Storage
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from .gridfs import GridFSStorage
|
||||
engines['gridfs'] = GridFSStorage
|
||||
except ImportError:
|
||||
pass
|
@ -11,11 +11,14 @@ from oyster.core import Kernel
|
||||
class KernelTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.)
|
||||
doc_classes = {'default':
|
||||
{'update_mins': 30, 'storage_engine': 'dummy'},
|
||||
'fast-update':
|
||||
{'update_mins': 1/60., 'storage_engine': 'dummy'},
|
||||
}
|
||||
self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.,
|
||||
doc_classes=doc_classes)
|
||||
self.kernel._wipe()
|
||||
self.kernel._add_doc_class('default', update_mins=30)
|
||||
self.kernel._add_doc_class('fast-update', update_mins=0.01)
|
||||
|
||||
|
||||
def test_constructor(self):
|
||||
c = Kernel('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
|
||||
@ -31,6 +34,9 @@ class KernelTests(TestCase):
|
||||
assert c.scraper.requests_per_minute == 30
|
||||
assert c.scraper.timeout == 60
|
||||
|
||||
# ensure that a bad document class raises an error
|
||||
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
|
||||
|
||||
|
||||
def test_log(self):
|
||||
self.kernel.log('action1', 'http://example.com')
|
||||
|
Loading…
Reference in New Issue
Block a user