major core refactor, shift to how storage is done

This commit is contained in:
James Turk 2012-02-14 16:42:41 -05:00
parent d1101a2949
commit d66ebdd74a
5 changed files with 114 additions and 115 deletions

View File

@ -14,10 +14,13 @@ tracked - metadata for tracked resources
_id : internal id
_random : a random integer used for sorting
url : url of resource
versioning : string indicating type of versioning to be used for this
resource. currently set to md5 for all documents
update_mins : minutes between automatic updates (defaults to 24*60)
doc_class : string indicating the document class, allows for different
settings/hooks for some documents
metadata : dictionary of extra user-specified attributes
versions : list of dictionaries with the following keys:
timestamp : UTC timestamp
<storage_key> : storage_id
(may be s3_url, gridfs_id, etc.)
logs - capped log collection
action : log entry
@ -29,3 +32,9 @@ status - internal state
update_queue : size of update queue
Storage Interface
=================
storage_key : key to store on versions
put(tracked_doc, data, content_type) -> id
get(id) -> file type object

View File

@ -5,9 +5,9 @@ import sys
import urllib
import pymongo
import gridfs
import scrapelib
from .storage.gridfs import GridFSStorage
class Kernel(object):
""" oyster's workhorse, handles tracking """
@ -28,6 +28,9 @@ class Kernel(object):
except pymongo.errors.CollectionInvalid:
pass
# create storage class
self.storage = GridFSStorage(self)
# create status document if it doesn't exist
if self.db.status.count() == 0:
self.db.status.insert({'update_queue': 0})
@ -35,8 +38,6 @@ class Kernel(object):
# ensure an index on _random
self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)])
self._collection_name = 'fs'
self.fs = gridfs.GridFS(self.db, self._collection_name)
self.scraper = scrapelib.Scraper(user_agent=user_agent,
requests_per_minute=rpm,
follow_robots=False,
@ -46,12 +47,12 @@ class Kernel(object):
self.retry_attempts = retry_attempts
self.retry_wait_minutes = retry_wait_minutes
self.doc_classes = {}
def _wipe(self):
""" exists primarily for debug use, wipes entire db """
self.db.drop_collection('tracked')
self.db.drop_collection('%s.chunks' % self._collection_name)
self.db.drop_collection('%s.files' % self._collection_name)
self.db.drop_collection('logs')
self.db.drop_collection('status')
@ -65,17 +66,21 @@ class Kernel(object):
self.db.logs.insert(kwargs)
def track_url(self, url, versioning='md5', update_mins=60*24,
**kwargs):
def _add_doc_class(self, doc_class, **properties):
if doc_class in self.doc_classes:
raise ValueError('attempt to re-add doc_class: %s' % doc_class)
else:
self.doc_classes[doc_class] = properties
def track_url(self, url, doc_class, **kwargs):
"""
Add a URL to the set of tracked URLs, accessible via a given filename.
url
URL to start tracking
versioning
currently only valid value is "md5"
update_mins
minutes between automatic updates, default is 1440 (1 day)
doc_class
document type, can be any arbitrary string
**kwargs
any keyword args will be added to the document's metadata
"""
@ -85,8 +90,7 @@ class Kernel(object):
# return the original object
if tracked:
if (tracked['metadata'] == kwargs and
tracked['versioning'] == versioning and
tracked['update_mins'] == update_mins):
tracked['doc_class'] == doc_class):
return tracked['_id']
else:
self.log('track', url=url, error='tracking conflict')
@ -94,20 +98,16 @@ class Kernel(object):
'metadata' % url)
self.log('track', url=url)
return self.db.tracked.insert(dict(url=url, versioning=versioning,
update_mins=update_mins,
return self.db.tracked.insert(dict(url=url, doc_class=doc_class,
_random=random.randint(0, sys.maxint),
metadata=kwargs))
versions=[], metadata=kwargs))
def md5_versioning(self, doc, data):
def md5_versioning(self, olddata, newdata):
""" return True if md5 changed or if file is new """
try:
old_md5 = self.fs.get_last_version(filename=doc['url']).md5
new_md5 = hashlib.md5(data).hexdigest()
return (old_md5 != new_md5)
except gridfs.NoFile:
return True
old_md5 = hashlib.md5(olddata).hexdigest()
new_md5 = hashlib.md5(newdata).hexdigest()
return old_md5 != new_md5
def update(self, doc):
@ -118,84 +118,62 @@ class Kernel(object):
* download latest document
* check if document has changed using versioning func
* if a change has occurred save the file to GridFS
* if a change has occurred save the file
* if error occured, log & keep track of how many errors in a row
* update last_update/next_update timestamp
"""
do_put = True
new_version = True
error = False
now = datetime.datetime.utcnow()
# FIXME
update_mins = self.doc_classes[doc['doc_class']].get('update_mins', 60)
# update strategies could be implemented here as well
# fetch strategies could be implemented here as well
try:
url = doc['url'].replace(' ', '%20')
newdata = self.scraper.urlopen(url)
content_type = newdata.response.headers['content-type']
except Exception as e:
do_put = False
new_version = False
error = str(e)
# versioning is a concept for future use, but here's how it can work:
# only do versioning check if at least one version exists
if new_version and doc['versions']:
# room here for different versioning schemes:
# versioning functions take doc & data, and return True if data is
# different, since they have access to doc, they can also modify
# certain attributes as needed
olddata = self.storage.get(doc['versions'][-1]['storage_key'])
new_version = self.md5_versioning(olddata, newdata)
if do_put:
if doc['versioning'] == 'md5':
do_put = self.md5_versioning(doc, newdata)
else:
raise ValueError('unknown versioning strategy "%s"' %
doc['versioning'])
if do_put:
self.fs.put(newdata, filename=doc['url'],
content_type=content_type, **doc['metadata'])
if new_version:
storage_id = self.storage.put(doc, newdata, content_type)
doc['versions'].append({'timestamp': now,
'storage_key': storage_id,
'storage_type': self.storage.storage_type,
})
if error:
# if there's been an error, increment the consecutive_errors count
# and back off a bit until we've reached our retry limit
c_errors = doc.get('consecutive_errors', 0)
doc['consecutive_errors'] = c_errors + 1
if c_errors <= self.retry_attempts:
update_mins = self.retry_wait_minutes * (2**c_errors)
else:
update_mins = doc['update_mins']
else:
# reset error count if all was ok
doc['consecutive_errors'] = 0
update_mins = doc['update_mins']
# last_update/next_update are separate from question of versioning
doc['last_update'] = datetime.datetime.utcnow()
doc['next_update'] = (doc['last_update'] +
datetime.timedelta(minutes=update_mins))
doc['last_update'] = now
doc['next_update'] = now + datetime.timedelta(minutes=update_mins)
self.log('update', url=url, new_doc=do_put, error=error)
self.log('update', url=url, new_doc=new_version, error=error)
self.db.tracked.save(doc, safe=True)
def get_all_versions(self, url):
"""
get all versions stored for a given URL
"""
versions = []
n = 0
while True:
try:
versions.append(self.fs.get_version(url, n))
n += 1
except gridfs.NoFile:
break
return versions
def get_version(self, url, n=-1):
"""
get a specific version of a file
defaults to getting latest version
"""
return self.fs.get_version(url, n)
def get_update_queue(self):
"""
Get a list of what needs to be updated.

View File

20
oyster/storage/gridfs.py Normal file
View File

@ -0,0 +1,20 @@
from __future__ import absolute_import
import gridfs
class GridFSStorage(object):
storage_type = 'gridfs'
def __init__(self, kernel):
self.db = kernel.db
self._collection_name = 'fs'
self.fs = gridfs.GridFS(self.db, self._collection_name)
def put(self, tracked_doc, data, content_type):
return self.fs.put(data, filename=tracked_doc['url'],
content_type=content_type,
**tracked_doc['metadata'])
def get(self, id):
return self.fs.get(id).read()

View File

@ -13,6 +13,8 @@ class KernelTests(TestCase):
def setUp(self):
self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.)
self.kernel._wipe()
self.kernel._add_doc_class('default', update_mins=30)
self.kernel._add_doc_class('fast-update', update_mins=0.01)
def test_constructor(self):
@ -42,11 +44,12 @@ class KernelTests(TestCase):
def test_track_url(self):
# basic insert
id1 = self.kernel.track_url('http://example.com', update_mins=30, pi=3)
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
obj = self.kernel.db.tracked.find_one()
assert '_random' in obj
assert obj['update_mins'] == 30
assert obj['doc_class'] == 'default'
assert obj['metadata'] == {'pi': 3}
assert obj['versions'] == []
# logging
log = self.kernel.db.logs.find_one()
@ -54,51 +57,53 @@ class KernelTests(TestCase):
assert log['url'] == 'http://example.com'
# track same url again with same metadata returns id
id2 = self.kernel.track_url('http://example.com', update_mins=30, pi=3)
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
assert id1 == id2
# can't track same URL twice with different metadata
assert_raises(ValueError, self.kernel.track_url, 'http://example.com')
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'default')
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'special-doc-class', pi=3)
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
def test_md5_versioning(self):
doc = {'url': 'hello.txt'}
self.kernel.fs.put('hello!', filename='hello.txt')
assert not self.kernel.md5_versioning(doc, 'hello!')
assert self.kernel.md5_versioning(doc, 'hey!')
assert not self.kernel.md5_versioning('hello!', 'hello!')
assert self.kernel.md5_versioning('hello!', 'hey!')
def test_update(self):
# get a single document tracked
self.kernel.track_url('http://example.com', update_mins=60, pi=3)
# get a single document tracked and call update on it
self.kernel.track_url('http://example.com', 'default')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
# check that metadata has been updated
# check that the metadata has been updated
newobj = self.kernel.db.tracked.find_one()
assert (newobj['last_update'] +
datetime.timedelta(minutes=newobj['update_mins']) ==
assert (newobj['last_update'] + datetime.timedelta(minutes=30) ==
newobj['next_update'])
first_update = newobj['last_update']
assert newobj['consecutive_errors'] == 0
# check that document exists in database
doc = self.kernel.fs.get_last_version()
assert doc.filename == 'http://example.com'
assert doc.content_type.startswith('text/html')
assert doc.pi == 3
assert len(newobj['versions']) == 1
# check that document exists in storage (TODO: storage test)
assert self.kernel.storage.get(newobj['versions'][0]['storage_key'])
# check logs
assert self.kernel.db.logs.find({'action': 'update'}).count() == 1
# and do an update..
# and do another update..
self.kernel.update(obj)
# hopefully example.com hasn't changed, this tests that md5 worked
assert self.kernel.db.fs.files.count() == 1
assert len(newobj['versions']) == 1
# check that appropriate metadata updated
newobj = self.kernel.db.tracked.find_one()
@ -110,7 +115,7 @@ class KernelTests(TestCase):
def test_update_failure(self):
# track a non-existent URL
self.kernel.track_url('http://not_a_url')
self.kernel.track_url('http://not_a_url', 'default')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
@ -128,23 +133,10 @@ class KernelTests(TestCase):
assert obj['consecutive_errors'] == 2
def test_all_versions(self):
random_url = 'http://en.wikipedia.org/wiki/Special:Random'
self.kernel.track_url(random_url)
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
versions = self.kernel.get_all_versions(random_url)
assert versions[0].filename == random_url
self.kernel.update(obj)
assert len(self.kernel.get_all_versions(random_url)) == 2
def test_get_update_queue(self):
self.kernel.track_url('never-updates', update_mins=0.01)
self.kernel.track_url('bad-uri', update_mins=0.01)
self.kernel.track_url('http://example.com', update_mins=0.01)
self.kernel.track_url('never-updates', 'fast-update')
self.kernel.track_url('bad-uri', 'fast-update')
self.kernel.track_url('http://example.com', 'fast-update')
never = self.kernel.db.tracked.find_one(dict(url='never-updates'))
bad = self.kernel.db.tracked.find_one(dict(url='bad-uri'))
@ -170,9 +162,9 @@ class KernelTests(TestCase):
def test_get_update_queue_size(self):
self.kernel.track_url('a', update_mins=0.01)
self.kernel.track_url('b', update_mins=0.01)
self.kernel.track_url('c', update_mins=0.01)
self.kernel.track_url('a', 'fast-update')
self.kernel.track_url('b', 'fast-update')
self.kernel.track_url('c', 'fast-update')
a = self.kernel.db.tracked.find_one(dict(url='a'))
b = self.kernel.db.tracked.find_one(dict(url='b'))