diff --git a/oyster/connection.py b/oyster/core.py similarity index 77% rename from oyster/connection.py rename to oyster/core.py index 9d08b4d..25a4ba0 100644 --- a/oyster/connection.py +++ b/oyster/core.py @@ -9,27 +9,16 @@ import gridfs import scrapelib -def get_configured_connection(): - """ factory, gets a connection configured with oyster.conf.settings """ - from oyster.conf import settings - return Connection(mongo_host=settings.MONGO_HOST, - mongo_port=settings.MONGO_PORT, - mongo_db=settings.MONGO_DATABASE, - mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE, - user_agent=settings.USER_AGENT, - rpm=settings.REQUESTS_PER_MINUTE, - timeout=settings.REQUEST_TIMEOUT, - retry_attempts=settings.RETRY_ATTEMPTS, - retry_wait_minutes=settings.RETRY_WAIT_MINUTES) - - -class Connection(object): +class Kernel(object): """ oyster's workhorse, handles tracking """ def __init__(self, mongo_host='localhost', mongo_port=27017, mongo_db='oyster', mongo_log_maxsize=100000000, user_agent='oyster', rpm=60, timeout=300, retry_attempts=3, retry_wait_minutes=60): + """ + configurable for ease of testing, only one should be instantiated + """ # set up a capped log if it doesn't exist self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db] @@ -83,6 +72,12 @@ class Connection(object): url URL to start tracking + versioning + currently only valid value is "md5" + update_mins + minutes between automatic updates, default is 1440 (1 day) + **kwargs + any keyword args will be added to the document's metadata """ tracked = self.db.tracked.find_one({'url': url}) @@ -116,6 +111,18 @@ class Connection(object): def update(self, doc): + """ + perform update upon a given document + + :param:`doc` must be a document from the `tracked` collection + + * download latest document + * check if document has changed using versioning func + * if a change has occurred save the file to GridFS + * if error occured, log & keep track of how many errors in a row + * update last_update/next_update timestamp + """ + do_put = True error = False @@ -166,6 +173,9 @@ class Connection(object): def get_all_versions(self, url): + """ + get all versions stored for a given URL + """ versions = [] n = 0 while True: @@ -178,10 +188,23 @@ class Connection(object): def get_version(self, url, n=-1): + """ + get a specific version of a file + + defaults to getting latest version + """ return self.fs.get_version(url, n) def get_update_queue(self): + """ + Get a list of what needs to be updated. + + Documents that have never been updated take priority, followed by + documents that are simply stale. Within these two categories results + are sorted in semirandom order to decrease odds of piling on one + server. + """ # results are always sorted by random to avoid piling on single server # first we try to update anything that we've never retrieved @@ -198,7 +221,28 @@ class Connection(object): def get_update_queue_size(self): + """ + Get the size of the update queue, this should match + ``len(self.get_update_queue())``, but is computed more efficiently. + """ new = self.db.tracked.find({'next_update': {'$exists': False}}).count() next = self.db.tracked.find({'next_update': {'$lt': datetime.datetime.utcnow()}}).count() return new+next + + + +def _get_configured_kernel(): + """ factory, gets a connection configured with oyster.conf.settings """ + from oyster.conf import settings + return Kernel(mongo_host=settings.MONGO_HOST, + mongo_port=settings.MONGO_PORT, + mongo_db=settings.MONGO_DATABASE, + mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE, + user_agent=settings.USER_AGENT, + rpm=settings.REQUESTS_PER_MINUTE, + timeout=settings.REQUEST_TIMEOUT, + retry_attempts=settings.RETRY_ATTEMPTS, + retry_wait_minutes=settings.RETRY_WAIT_MINUTES) + +kernel = _get_configured_kernel() diff --git a/oyster/tasks.py b/oyster/tasks.py index 162c126..9766ed7 100644 --- a/oyster/tasks.py +++ b/oyster/tasks.py @@ -4,7 +4,7 @@ from celery.execute import send_task from pymongo.objectid import ObjectId from oyster.conf import settings -from oyster.connection import get_configured_connection +from oyster.core import kernel class UpdateTask(Task): @@ -12,17 +12,12 @@ class UpdateTask(Task): # results go straight to database ignore_result = True - def __init__(self): - # one connection per process - self.conn = get_configured_connection() - - def run(self, doc_id): - doc = self.conn.db.tracked.find_one({'_id': doc_id}) - self.conn.update(doc) + doc = kernel.db.tracked.find_one({'_id': doc_id}) + kernel.update(doc) for hook in doc.get('post_update_hooks', []): send_task(hook, (doc_id,)) - self.conn.db.status.update({}, {'$inc': {'update_queue': -1}}) + kernel.db.status.update({}, {'$inc': {'update_queue': -1}}) class UpdateTaskScheduler(PeriodicTask): @@ -30,19 +25,18 @@ class UpdateTaskScheduler(PeriodicTask): # 60s tick run_every = 60 - conn = get_configured_connection() def run(self): # if the update queue isn't empty, wait to add more # (currently the only way we avoid duplicates) # alternate option would be to set a _queued flag on documents - if self.conn.db.status.find_one()['update_queue']: + if kernel.db.status.find_one()['update_queue']: return - next_set = self.conn.get_update_queue() + next_set = kernel.get_update_queue() for doc in next_set: UpdateTask.delay(doc['_id']) - self.conn.db.status.update({}, {'$inc': {'update_queue': 1}}) + kernel.status.update({}, {'$inc': {'update_queue': 1}}) class ExternalStoreTask(Task): @@ -60,21 +54,17 @@ class ExternalStoreTask(Task): # used as a base class abstract = True - def __init__(self): - # one connection per process - self.conn = get_configured_connection() - def run(self, doc_id, extract_text=lambda x: x): # get the document - doc = self.conn.db.tracked.find_one({'_id': ObjectId(doc_id)}) - filedata = self.conn.get_version(doc['url']).read() + doc = kernel.db.tracked.find_one({'_id': ObjectId(doc_id)}) + filedata = kernel.get_version(doc['url']).read() text = extract_text(filedata, doc['metadata']) # put the document into the data store result = self.upload_document(doc_id, text, doc['metadata']) doc[self.external_store + '_id'] = result - self.conn.db.tracked.save(doc, safe=True) + kernel.db.tracked.save(doc, safe=True) def upload_document(self, doc_id, filedata, metadata): diff --git a/oyster/tests/test_connection.py b/oyster/tests/test_connection.py deleted file mode 100644 index 1fe392f..0000000 --- a/oyster/tests/test_connection.py +++ /dev/null @@ -1,190 +0,0 @@ -import time -import datetime -from unittest import TestCase - -from nose.tools import assert_raises -import pymongo - -from oyster.connection import Connection - - -class ConnectionTests(TestCase): - - def setUp(self): - self.conn = Connection(mongo_db='oyster_test', retry_wait_minutes=1/60.) - self.conn._wipe() - - - def test_constructor(self): - c = Connection('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000, - user_agent='test-ua', rpm=30, timeout=60, - retry_attempts=7, retry_wait_minutes=8) - assert c.db.connection.host == '127.0.0.1' - assert c.db.connection.port == 27017 - assert c.db.logs.options()['capped'] == True - assert c.db.logs.options()['size'] == 5000 - assert c.retry_wait_minutes == 8 - # TODO: test retry_attempts - assert c.scraper.user_agent == 'test-ua' - assert c.scraper.requests_per_minute == 30 - assert c.scraper.timeout == 60 - - - def test_log(self): - self.conn.log('action1', 'http://example.com') - self.conn.log('action2', 'http://test.com', error=True, pi=3) - assert self.conn.db.logs.count() == 2 - x = self.conn.db.logs.find_one({'error': True}) - assert x['action'] == 'action2' - assert x['url'] == 'http://test.com' - assert x['pi'] == 3 - - - def test_track_url(self): - # basic insert - id1 = self.conn.track_url('http://example.com', update_mins=30, pi=3) - obj = self.conn.db.tracked.find_one() - assert '_random' in obj - assert obj['update_mins'] == 30 - assert obj['metadata'] == {'pi': 3} - - # logging - log = self.conn.db.logs.find_one() - assert log['action'] == 'track' - assert log['url'] == 'http://example.com' - - # track same url again with same metadata returns id - id2 = self.conn.track_url('http://example.com', update_mins=30, pi=3) - assert id1 == id2 - - # can't track same URL twice with different metadata - assert_raises(ValueError, self.conn.track_url, 'http://example.com') - - # logged error - assert self.conn.db.logs.find_one({'error': 'tracking conflict'}) - - - def test_md5_versioning(self): - doc = {'url': 'hello.txt'} - self.conn.fs.put('hello!', filename='hello.txt') - assert not self.conn.md5_versioning(doc, 'hello!') - assert self.conn.md5_versioning(doc, 'hey!') - - - def test_update(self): - # get a single document tracked - self.conn.track_url('http://example.com', update_mins=60, pi=3) - obj = self.conn.db.tracked.find_one() - self.conn.update(obj) - - # check that metadata has been updated - newobj = self.conn.db.tracked.find_one() - assert (newobj['last_update'] + - datetime.timedelta(minutes=newobj['update_mins']) == - newobj['next_update']) - first_update = newobj['last_update'] - assert newobj['consecutive_errors'] == 0 - - # check that document exists in database - doc = self.conn.fs.get_last_version() - assert doc.filename == 'http://example.com' - assert doc.content_type.startswith('text/html') - assert doc.pi == 3 - - # check logs - assert self.conn.db.logs.find({'action': 'update'}).count() == 1 - - # and do an update.. - self.conn.update(obj) - - # hopefully example.com hasn't changed, this tests that md5 worked - assert self.conn.db.fs.files.count() == 1 - - # check that appropriate metadata updated - newobj = self.conn.db.tracked.find_one() - assert first_update < newobj['last_update'] - - # check that logs updated - assert self.conn.db.logs.find({'action': 'update'}).count() == 2 - - - def test_update_failure(self): - # track a non-existent URL - self.conn.track_url('http://not_a_url') - obj = self.conn.db.tracked.find_one() - self.conn.update(obj) - - obj = self.conn.db.tracked.find_one() - assert obj['consecutive_errors'] == 1 - - # we should have logged an error too - assert self.conn.db.logs.find({'action': 'update', - 'error': {'$ne': False}}).count() == 1 - - # update again - self.conn.update(obj) - - obj = self.conn.db.tracked.find_one() - assert obj['consecutive_errors'] == 2 - - - def test_all_versions(self): - random_url = 'http://en.wikipedia.org/wiki/Special:Random' - self.conn.track_url(random_url) - obj = self.conn.db.tracked.find_one() - self.conn.update(obj) - - versions = self.conn.get_all_versions(random_url) - assert versions[0].filename == random_url - - self.conn.update(obj) - assert len(self.conn.get_all_versions(random_url)) == 2 - - - def test_get_update_queue(self): - self.conn.track_url('never-updates', update_mins=0.01) - self.conn.track_url('bad-uri', update_mins=0.01) - self.conn.track_url('http://example.com', update_mins=0.01) - - never = self.conn.db.tracked.find_one(dict(url='never-updates')) - bad = self.conn.db.tracked.find_one(dict(url='bad-uri')) - good = self.conn.db.tracked.find_one(dict(url='http://example.com')) - - # 3 in queue, ordered by random - queue = self.conn.get_update_queue() - assert len(queue) == 3 - assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random'] - - # try and update bad & good - self.conn.update(bad) - self.conn.update(good) - - # queue should only have never in it - queue = self.conn.get_update_queue() - assert queue[0]['_id'] == never['_id'] - - # wait for time to pass so queue should be full - time.sleep(1) - queue = self.conn.get_update_queue() - assert len(queue) == 3 - - - def test_get_update_queue_size(self): - self.conn.track_url('a', update_mins=0.01) - self.conn.track_url('b', update_mins=0.01) - self.conn.track_url('c', update_mins=0.01) - - a = self.conn.db.tracked.find_one(dict(url='a')) - b = self.conn.db.tracked.find_one(dict(url='b')) - c = self.conn.db.tracked.find_one(dict(url='c')) - - # size should start at 3 - assert self.conn.get_update_queue_size() == 3 - - # goes down one - self.conn.update(a) - assert self.conn.get_update_queue_size() == 2 - - # wait for it to go back to 3 - time.sleep(1) - assert self.conn.get_update_queue_size() == 3 diff --git a/oyster/tests/test_kernel.py b/oyster/tests/test_kernel.py new file mode 100644 index 0000000..1b63bf7 --- /dev/null +++ b/oyster/tests/test_kernel.py @@ -0,0 +1,190 @@ +import time +import datetime +from unittest import TestCase + +from nose.tools import assert_raises +import pymongo + +from oyster.core import Kernel + + +class KernelTests(TestCase): + + def setUp(self): + self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.) + self.kernel._wipe() + + + def test_constructor(self): + c = Kernel('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000, + user_agent='test-ua', rpm=30, timeout=60, + retry_attempts=7, retry_wait_minutes=8) + assert c.db.connection.host == '127.0.0.1' + assert c.db.connection.port == 27017 + assert c.db.logs.options()['capped'] == True + assert c.db.logs.options()['size'] == 5000 + assert c.retry_wait_minutes == 8 + # TODO: test retry_attempts + assert c.scraper.user_agent == 'test-ua' + assert c.scraper.requests_per_minute == 30 + assert c.scraper.timeout == 60 + + + def test_log(self): + self.kernel.log('action1', 'http://example.com') + self.kernel.log('action2', 'http://test.com', error=True, pi=3) + assert self.kernel.db.logs.count() == 2 + x = self.kernel.db.logs.find_one({'error': True}) + assert x['action'] == 'action2' + assert x['url'] == 'http://test.com' + assert x['pi'] == 3 + + + def test_track_url(self): + # basic insert + id1 = self.kernel.track_url('http://example.com', update_mins=30, pi=3) + obj = self.kernel.db.tracked.find_one() + assert '_random' in obj + assert obj['update_mins'] == 30 + assert obj['metadata'] == {'pi': 3} + + # logging + log = self.kernel.db.logs.find_one() + assert log['action'] == 'track' + assert log['url'] == 'http://example.com' + + # track same url again with same metadata returns id + id2 = self.kernel.track_url('http://example.com', update_mins=30, pi=3) + assert id1 == id2 + + # can't track same URL twice with different metadata + assert_raises(ValueError, self.kernel.track_url, 'http://example.com') + + # logged error + assert self.kernel.db.logs.find_one({'error': 'tracking conflict'}) + + + def test_md5_versioning(self): + doc = {'url': 'hello.txt'} + self.kernel.fs.put('hello!', filename='hello.txt') + assert not self.kernel.md5_versioning(doc, 'hello!') + assert self.kernel.md5_versioning(doc, 'hey!') + + + def test_update(self): + # get a single document tracked + self.kernel.track_url('http://example.com', update_mins=60, pi=3) + obj = self.kernel.db.tracked.find_one() + self.kernel.update(obj) + + # check that metadata has been updated + newobj = self.kernel.db.tracked.find_one() + assert (newobj['last_update'] + + datetime.timedelta(minutes=newobj['update_mins']) == + newobj['next_update']) + first_update = newobj['last_update'] + assert newobj['consecutive_errors'] == 0 + + # check that document exists in database + doc = self.kernel.fs.get_last_version() + assert doc.filename == 'http://example.com' + assert doc.content_type.startswith('text/html') + assert doc.pi == 3 + + # check logs + assert self.kernel.db.logs.find({'action': 'update'}).count() == 1 + + # and do an update.. + self.kernel.update(obj) + + # hopefully example.com hasn't changed, this tests that md5 worked + assert self.kernel.db.fs.files.count() == 1 + + # check that appropriate metadata updated + newobj = self.kernel.db.tracked.find_one() + assert first_update < newobj['last_update'] + + # check that logs updated + assert self.kernel.db.logs.find({'action': 'update'}).count() == 2 + + + def test_update_failure(self): + # track a non-existent URL + self.kernel.track_url('http://not_a_url') + obj = self.kernel.db.tracked.find_one() + self.kernel.update(obj) + + obj = self.kernel.db.tracked.find_one() + assert obj['consecutive_errors'] == 1 + + # we should have logged an error too + assert self.kernel.db.logs.find({'action': 'update', + 'error': {'$ne': False}}).count() == 1 + + # update again + self.kernel.update(obj) + + obj = self.kernel.db.tracked.find_one() + assert obj['consecutive_errors'] == 2 + + + def test_all_versions(self): + random_url = 'http://en.wikipedia.org/wiki/Special:Random' + self.kernel.track_url(random_url) + obj = self.kernel.db.tracked.find_one() + self.kernel.update(obj) + + versions = self.kernel.get_all_versions(random_url) + assert versions[0].filename == random_url + + self.kernel.update(obj) + assert len(self.kernel.get_all_versions(random_url)) == 2 + + + def test_get_update_queue(self): + self.kernel.track_url('never-updates', update_mins=0.01) + self.kernel.track_url('bad-uri', update_mins=0.01) + self.kernel.track_url('http://example.com', update_mins=0.01) + + never = self.kernel.db.tracked.find_one(dict(url='never-updates')) + bad = self.kernel.db.tracked.find_one(dict(url='bad-uri')) + good = self.kernel.db.tracked.find_one(dict(url='http://example.com')) + + # 3 in queue, ordered by random + queue = self.kernel.get_update_queue() + assert len(queue) == 3 + assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random'] + + # try and update bad & good + self.kernel.update(bad) + self.kernel.update(good) + + # queue should only have never in it + queue = self.kernel.get_update_queue() + assert queue[0]['_id'] == never['_id'] + + # wait for time to pass so queue should be full + time.sleep(1) + queue = self.kernel.get_update_queue() + assert len(queue) == 3 + + + def test_get_update_queue_size(self): + self.kernel.track_url('a', update_mins=0.01) + self.kernel.track_url('b', update_mins=0.01) + self.kernel.track_url('c', update_mins=0.01) + + a = self.kernel.db.tracked.find_one(dict(url='a')) + b = self.kernel.db.tracked.find_one(dict(url='b')) + c = self.kernel.db.tracked.find_one(dict(url='c')) + + # size should start at 3 + assert self.kernel.get_update_queue_size() == 3 + + # goes down one + self.kernel.update(a) + assert self.kernel.get_update_queue_size() == 2 + + # wait for it to go back to 3 + time.sleep(1) + assert self.kernel.get_update_queue_size() == 3 diff --git a/oyster/web.py b/oyster/web.py index 9b986d3..9f4cef8 100644 --- a/oyster/web.py +++ b/oyster/web.py @@ -7,7 +7,7 @@ import flask import pymongo.objectid from oyster.conf import settings -from oyster.connection import get_configured_connection +from oyster.core import kernel class JSONEncoder(json.JSONEncoder): @@ -43,16 +43,14 @@ def api_wrapper(template=None): app = flask.Flask('oyster') -conn = get_configured_connection() - @app.route('/') @api_wrapper('index.html') def index(): status = { - 'tracking': conn.db.tracked.count(), - 'need_update': conn.get_update_queue_size(), - 'logs': list(conn.db.logs.find().sort('$natural', -1).limit(20)), + 'tracking': kernel.db.tracked.count(), + 'need_update': kernel.get_update_queue_size(), + 'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(20)), 'mongo_host': settings.MONGO_HOST, } return status @@ -62,8 +60,8 @@ def index(): @api_wrapper() def doc_list(): status = { - 'tracking': conn.db.tracked.count(), - 'need_update': conn.get_update_queue_size(), + 'tracking': kernel.db.tracked.count(), + 'need_update': kernel.get_update_queue_size(), } return status @@ -75,7 +73,7 @@ def log_view(): size = 100 prev_offset = max(offset - size, 0) next_offset = offset + size - logs = conn.db.logs.find().sort('$natural', -1).skip(offset).limit(size) + logs = kernel.db.logs.find().sort('$natural', -1).skip(offset).limit(size) return dict(logs=list(logs), prev_offset=prev_offset, next_offset=next_offset, offset=offset) @@ -83,14 +81,14 @@ def log_view(): @app.route('/tracked/') @api_wrapper() def tracked(): - tracked = list(conn.db.tracked.find()) + tracked = list(kernel.db.tracked.find()) return json.dumps(tracked, cls=JSONEncoder) @app.route('/tracked/') def tracked_view(url): url = _path_fixer(url) - doc = conn.db.tracked.find_one({'url': url}) + doc = kernel.db.tracked.find_one({'url': url}) return json.dumps(doc, cls=JSONEncoder) @@ -99,7 +97,7 @@ def show_doc(url, version): url = _path_fixer(url) if version == 'latest': version = -1 - doc = conn.get_version(url, version) + doc = kernel.get_version(url, version) resp = flask.make_response(doc.read()) resp.headers['content-type'] = doc.content_type return resp