From d66ebdd74a375e3cc659b1631acfca4ddd0c2233 Mon Sep 17 00:00:00 2001 From: James Turk Date: Tue, 14 Feb 2012 16:42:41 -0500 Subject: [PATCH] major core refactor, shift to how storage is done --- design.txt | 15 ++++- oyster/core.py | 122 +++++++++++++++--------------------- oyster/storage/__init__.py | 0 oyster/storage/gridfs.py | 20 ++++++ oyster/tests/test_kernel.py | 72 ++++++++++----------- 5 files changed, 114 insertions(+), 115 deletions(-) create mode 100644 oyster/storage/__init__.py create mode 100644 oyster/storage/gridfs.py diff --git a/design.txt b/design.txt index 5d4be0a..807c027 100644 --- a/design.txt +++ b/design.txt @@ -14,10 +14,13 @@ tracked - metadata for tracked resources _id : internal id _random : a random integer used for sorting url : url of resource - versioning : string indicating type of versioning to be used for this - resource. currently set to md5 for all documents - update_mins : minutes between automatic updates (defaults to 24*60) + doc_class : string indicating the document class, allows for different + settings/hooks for some documents metadata : dictionary of extra user-specified attributes + versions : list of dictionaries with the following keys: + timestamp : UTC timestamp + : storage_id + (may be s3_url, gridfs_id, etc.) logs - capped log collection action : log entry @@ -29,3 +32,9 @@ status - internal state update_queue : size of update queue +Storage Interface +================= + storage_key : key to store on versions + put(tracked_doc, data, content_type) -> id + get(id) -> file type object + diff --git a/oyster/core.py b/oyster/core.py index 25a4ba0..43f2698 100644 --- a/oyster/core.py +++ b/oyster/core.py @@ -5,9 +5,9 @@ import sys import urllib import pymongo -import gridfs import scrapelib +from .storage.gridfs import GridFSStorage class Kernel(object): """ oyster's workhorse, handles tracking """ @@ -28,6 +28,9 @@ class Kernel(object): except pymongo.errors.CollectionInvalid: pass + # create storage class + self.storage = GridFSStorage(self) + # create status document if it doesn't exist if self.db.status.count() == 0: self.db.status.insert({'update_queue': 0}) @@ -35,8 +38,6 @@ class Kernel(object): # ensure an index on _random self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)]) - self._collection_name = 'fs' - self.fs = gridfs.GridFS(self.db, self._collection_name) self.scraper = scrapelib.Scraper(user_agent=user_agent, requests_per_minute=rpm, follow_robots=False, @@ -46,12 +47,12 @@ class Kernel(object): self.retry_attempts = retry_attempts self.retry_wait_minutes = retry_wait_minutes + self.doc_classes = {} + def _wipe(self): """ exists primarily for debug use, wipes entire db """ self.db.drop_collection('tracked') - self.db.drop_collection('%s.chunks' % self._collection_name) - self.db.drop_collection('%s.files' % self._collection_name) self.db.drop_collection('logs') self.db.drop_collection('status') @@ -65,17 +66,21 @@ class Kernel(object): self.db.logs.insert(kwargs) - def track_url(self, url, versioning='md5', update_mins=60*24, - **kwargs): + def _add_doc_class(self, doc_class, **properties): + if doc_class in self.doc_classes: + raise ValueError('attempt to re-add doc_class: %s' % doc_class) + else: + self.doc_classes[doc_class] = properties + + + def track_url(self, url, doc_class, **kwargs): """ Add a URL to the set of tracked URLs, accessible via a given filename. url URL to start tracking - versioning - currently only valid value is "md5" - update_mins - minutes between automatic updates, default is 1440 (1 day) + doc_class + document type, can be any arbitrary string **kwargs any keyword args will be added to the document's metadata """ @@ -85,8 +90,7 @@ class Kernel(object): # return the original object if tracked: if (tracked['metadata'] == kwargs and - tracked['versioning'] == versioning and - tracked['update_mins'] == update_mins): + tracked['doc_class'] == doc_class): return tracked['_id'] else: self.log('track', url=url, error='tracking conflict') @@ -94,20 +98,16 @@ class Kernel(object): 'metadata' % url) self.log('track', url=url) - return self.db.tracked.insert(dict(url=url, versioning=versioning, - update_mins=update_mins, + return self.db.tracked.insert(dict(url=url, doc_class=doc_class, _random=random.randint(0, sys.maxint), - metadata=kwargs)) + versions=[], metadata=kwargs)) - def md5_versioning(self, doc, data): + def md5_versioning(self, olddata, newdata): """ return True if md5 changed or if file is new """ - try: - old_md5 = self.fs.get_last_version(filename=doc['url']).md5 - new_md5 = hashlib.md5(data).hexdigest() - return (old_md5 != new_md5) - except gridfs.NoFile: - return True + old_md5 = hashlib.md5(olddata).hexdigest() + new_md5 = hashlib.md5(newdata).hexdigest() + return old_md5 != new_md5 def update(self, doc): @@ -118,84 +118,62 @@ class Kernel(object): * download latest document * check if document has changed using versioning func - * if a change has occurred save the file to GridFS + * if a change has occurred save the file * if error occured, log & keep track of how many errors in a row * update last_update/next_update timestamp """ - do_put = True + new_version = True error = False + now = datetime.datetime.utcnow() + # FIXME + update_mins = self.doc_classes[doc['doc_class']].get('update_mins', 60) - # update strategies could be implemented here as well + # fetch strategies could be implemented here as well try: url = doc['url'].replace(' ', '%20') newdata = self.scraper.urlopen(url) content_type = newdata.response.headers['content-type'] except Exception as e: - do_put = False + new_version = False error = str(e) - # versioning is a concept for future use, but here's how it can work: - # versioning functions take doc & data, and return True if data is - # different, since they have access to doc, they can also modify - # certain attributes as needed + # only do versioning check if at least one version exists + if new_version and doc['versions']: + # room here for different versioning schemes: + # versioning functions take doc & data, and return True if data is + # different, since they have access to doc, they can also modify + # certain attributes as needed + olddata = self.storage.get(doc['versions'][-1]['storage_key']) + new_version = self.md5_versioning(olddata, newdata) - if do_put: - if doc['versioning'] == 'md5': - do_put = self.md5_versioning(doc, newdata) - else: - raise ValueError('unknown versioning strategy "%s"' % - doc['versioning']) - - if do_put: - self.fs.put(newdata, filename=doc['url'], - content_type=content_type, **doc['metadata']) + if new_version: + storage_id = self.storage.put(doc, newdata, content_type) + doc['versions'].append({'timestamp': now, + 'storage_key': storage_id, + 'storage_type': self.storage.storage_type, + }) if error: + # if there's been an error, increment the consecutive_errors count + # and back off a bit until we've reached our retry limit c_errors = doc.get('consecutive_errors', 0) doc['consecutive_errors'] = c_errors + 1 if c_errors <= self.retry_attempts: update_mins = self.retry_wait_minutes * (2**c_errors) - else: - update_mins = doc['update_mins'] else: + # reset error count if all was ok doc['consecutive_errors'] = 0 - update_mins = doc['update_mins'] # last_update/next_update are separate from question of versioning - doc['last_update'] = datetime.datetime.utcnow() - doc['next_update'] = (doc['last_update'] + - datetime.timedelta(minutes=update_mins)) + doc['last_update'] = now + doc['next_update'] = now + datetime.timedelta(minutes=update_mins) - self.log('update', url=url, new_doc=do_put, error=error) + self.log('update', url=url, new_doc=new_version, error=error) self.db.tracked.save(doc, safe=True) - def get_all_versions(self, url): - """ - get all versions stored for a given URL - """ - versions = [] - n = 0 - while True: - try: - versions.append(self.fs.get_version(url, n)) - n += 1 - except gridfs.NoFile: - break - return versions - - - def get_version(self, url, n=-1): - """ - get a specific version of a file - - defaults to getting latest version - """ - return self.fs.get_version(url, n) - - def get_update_queue(self): """ Get a list of what needs to be updated. diff --git a/oyster/storage/__init__.py b/oyster/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oyster/storage/gridfs.py b/oyster/storage/gridfs.py new file mode 100644 index 0000000..976d0bc --- /dev/null +++ b/oyster/storage/gridfs.py @@ -0,0 +1,20 @@ +from __future__ import absolute_import + +import gridfs + +class GridFSStorage(object): + + storage_type = 'gridfs' + + def __init__(self, kernel): + self.db = kernel.db + self._collection_name = 'fs' + self.fs = gridfs.GridFS(self.db, self._collection_name) + + def put(self, tracked_doc, data, content_type): + return self.fs.put(data, filename=tracked_doc['url'], + content_type=content_type, + **tracked_doc['metadata']) + + def get(self, id): + return self.fs.get(id).read() diff --git a/oyster/tests/test_kernel.py b/oyster/tests/test_kernel.py index 1b63bf7..dd83c3e 100644 --- a/oyster/tests/test_kernel.py +++ b/oyster/tests/test_kernel.py @@ -13,6 +13,8 @@ class KernelTests(TestCase): def setUp(self): self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.) self.kernel._wipe() + self.kernel._add_doc_class('default', update_mins=30) + self.kernel._add_doc_class('fast-update', update_mins=0.01) def test_constructor(self): @@ -42,11 +44,12 @@ class KernelTests(TestCase): def test_track_url(self): # basic insert - id1 = self.kernel.track_url('http://example.com', update_mins=30, pi=3) + id1 = self.kernel.track_url('http://example.com', 'default', pi=3) obj = self.kernel.db.tracked.find_one() assert '_random' in obj - assert obj['update_mins'] == 30 + assert obj['doc_class'] == 'default' assert obj['metadata'] == {'pi': 3} + assert obj['versions'] == [] # logging log = self.kernel.db.logs.find_one() @@ -54,51 +57,53 @@ class KernelTests(TestCase): assert log['url'] == 'http://example.com' # track same url again with same metadata returns id - id2 = self.kernel.track_url('http://example.com', update_mins=30, pi=3) + id2 = self.kernel.track_url('http://example.com', 'default', pi=3) assert id1 == id2 # can't track same URL twice with different metadata - assert_raises(ValueError, self.kernel.track_url, 'http://example.com') + assert_raises(ValueError, self.kernel.track_url, 'http://example.com', + 'default') + # logged error + assert self.kernel.db.logs.find_one({'error': 'tracking conflict'}) + assert_raises(ValueError, self.kernel.track_url, 'http://example.com', + 'special-doc-class', pi=3) # logged error assert self.kernel.db.logs.find_one({'error': 'tracking conflict'}) + def test_md5_versioning(self): - doc = {'url': 'hello.txt'} - self.kernel.fs.put('hello!', filename='hello.txt') - assert not self.kernel.md5_versioning(doc, 'hello!') - assert self.kernel.md5_versioning(doc, 'hey!') + assert not self.kernel.md5_versioning('hello!', 'hello!') + assert self.kernel.md5_versioning('hello!', 'hey!') def test_update(self): - # get a single document tracked - self.kernel.track_url('http://example.com', update_mins=60, pi=3) + # get a single document tracked and call update on it + self.kernel.track_url('http://example.com', 'default') obj = self.kernel.db.tracked.find_one() self.kernel.update(obj) - # check that metadata has been updated + # check that the metadata has been updated newobj = self.kernel.db.tracked.find_one() - assert (newobj['last_update'] + - datetime.timedelta(minutes=newobj['update_mins']) == + assert (newobj['last_update'] + datetime.timedelta(minutes=30) == newobj['next_update']) first_update = newobj['last_update'] assert newobj['consecutive_errors'] == 0 - # check that document exists in database - doc = self.kernel.fs.get_last_version() - assert doc.filename == 'http://example.com' - assert doc.content_type.startswith('text/html') - assert doc.pi == 3 + assert len(newobj['versions']) == 1 + + # check that document exists in storage (TODO: storage test) + assert self.kernel.storage.get(newobj['versions'][0]['storage_key']) # check logs assert self.kernel.db.logs.find({'action': 'update'}).count() == 1 - # and do an update.. + # and do another update.. self.kernel.update(obj) # hopefully example.com hasn't changed, this tests that md5 worked - assert self.kernel.db.fs.files.count() == 1 + assert len(newobj['versions']) == 1 # check that appropriate metadata updated newobj = self.kernel.db.tracked.find_one() @@ -110,7 +115,7 @@ class KernelTests(TestCase): def test_update_failure(self): # track a non-existent URL - self.kernel.track_url('http://not_a_url') + self.kernel.track_url('http://not_a_url', 'default') obj = self.kernel.db.tracked.find_one() self.kernel.update(obj) @@ -128,23 +133,10 @@ class KernelTests(TestCase): assert obj['consecutive_errors'] == 2 - def test_all_versions(self): - random_url = 'http://en.wikipedia.org/wiki/Special:Random' - self.kernel.track_url(random_url) - obj = self.kernel.db.tracked.find_one() - self.kernel.update(obj) - - versions = self.kernel.get_all_versions(random_url) - assert versions[0].filename == random_url - - self.kernel.update(obj) - assert len(self.kernel.get_all_versions(random_url)) == 2 - - def test_get_update_queue(self): - self.kernel.track_url('never-updates', update_mins=0.01) - self.kernel.track_url('bad-uri', update_mins=0.01) - self.kernel.track_url('http://example.com', update_mins=0.01) + self.kernel.track_url('never-updates', 'fast-update') + self.kernel.track_url('bad-uri', 'fast-update') + self.kernel.track_url('http://example.com', 'fast-update') never = self.kernel.db.tracked.find_one(dict(url='never-updates')) bad = self.kernel.db.tracked.find_one(dict(url='bad-uri')) @@ -170,9 +162,9 @@ class KernelTests(TestCase): def test_get_update_queue_size(self): - self.kernel.track_url('a', update_mins=0.01) - self.kernel.track_url('b', update_mins=0.01) - self.kernel.track_url('c', update_mins=0.01) + self.kernel.track_url('a', 'fast-update') + self.kernel.track_url('b', 'fast-update') + self.kernel.track_url('c', 'fast-update') a = self.kernel.db.tracked.find_one(dict(url='a')) b = self.kernel.db.tracked.find_one(dict(url='b'))