From 3539e50c9dfc95f9c6c3c71b526262b97fc8939e Mon Sep 17 00:00:00 2001 From: James Turk Date: Mon, 13 Feb 2012 14:34:45 -0500 Subject: [PATCH] rename client to connection --- oyster/{client.py => connection.py} | 24 ++-- oyster/tasks.py | 30 ++--- oyster/tests/test_client.py | 190 ---------------------------- oyster/tests/test_connection.py | 190 ++++++++++++++++++++++++++++ oyster/web.py | 22 ++-- 5 files changed, 228 insertions(+), 228 deletions(-) rename oyster/{client.py => connection.py} (91%) delete mode 100644 oyster/tests/test_client.py create mode 100644 oyster/tests/test_connection.py diff --git a/oyster/client.py b/oyster/connection.py similarity index 91% rename from oyster/client.py rename to oyster/connection.py index 6e8ada9..b154502 100644 --- a/oyster/client.py +++ b/oyster/connection.py @@ -9,21 +9,21 @@ import gridfs import scrapelib -def get_configured_client(): - """ helper factory, gets a client configured with oyster.conf.settings """ +def get_configured_connection(): + """ factory, gets a connection configured with oyster.conf.settings """ from oyster.conf import settings - return Client(mongo_host=settings.MONGO_HOST, - mongo_port=settings.MONGO_PORT, - mongo_db=settings.MONGO_DATABASE, - mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE, - user_agent=settings.USER_AGENT, - rpm=settings.REQUESTS_PER_MINUTE, - timeout=settings.REQUEST_TIMEOUT, - retry_attempts=settings.RETRY_ATTEMPTS, - retry_wait_minutes=settings.RETRY_WAIT_MINUTES) + return Connection(mongo_host=settings.MONGO_HOST, + mongo_port=settings.MONGO_PORT, + mongo_db=settings.MONGO_DATABASE, + mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE, + user_agent=settings.USER_AGENT, + rpm=settings.REQUESTS_PER_MINUTE, + timeout=settings.REQUEST_TIMEOUT, + retry_attempts=settings.RETRY_ATTEMPTS, + retry_wait_minutes=settings.RETRY_WAIT_MINUTES) -class Client(object): +class Connection(object): """ oyster's workhorse, handles tracking """ def __init__(self, mongo_host='localhost', mongo_port=27017, diff --git a/oyster/tasks.py b/oyster/tasks.py index 6ec09ca..162c126 100644 --- a/oyster/tasks.py +++ b/oyster/tasks.py @@ -4,7 +4,7 @@ from celery.execute import send_task from pymongo.objectid import ObjectId from oyster.conf import settings -from oyster.client import get_configured_client +from oyster.connection import get_configured_connection class UpdateTask(Task): @@ -13,16 +13,16 @@ class UpdateTask(Task): ignore_result = True def __init__(self): - # one client per process - self.client = get_configured_client() + # one connection per process + self.conn = get_configured_connection() def run(self, doc_id): - doc = self.client.db.tracked.find_one({'_id': doc_id}) - self.client.update(doc) + doc = self.conn.db.tracked.find_one({'_id': doc_id}) + self.conn.update(doc) for hook in doc.get('post_update_hooks', []): send_task(hook, (doc_id,)) - self.client.db.status.update({}, {'$inc': {'update_queue': -1}}) + self.conn.db.status.update({}, {'$inc': {'update_queue': -1}}) class UpdateTaskScheduler(PeriodicTask): @@ -30,19 +30,19 @@ class UpdateTaskScheduler(PeriodicTask): # 60s tick run_every = 60 - client = get_configured_client() + conn = get_configured_connection() def run(self): # if the update queue isn't empty, wait to add more # (currently the only way we avoid duplicates) # alternate option would be to set a _queued flag on documents - if self.client.db.status.find_one()['update_queue']: + if self.conn.db.status.find_one()['update_queue']: return - next_set = self.client.get_update_queue() + next_set = self.conn.get_update_queue() for doc in next_set: UpdateTask.delay(doc['_id']) - self.client.db.status.update({}, {'$inc': {'update_queue': 1}}) + self.conn.db.status.update({}, {'$inc': {'update_queue': 1}}) class ExternalStoreTask(Task): @@ -61,20 +61,20 @@ class ExternalStoreTask(Task): abstract = True def __init__(self): - # one client per process - self.client = get_configured_client() + # one connection per process + self.conn = get_configured_connection() def run(self, doc_id, extract_text=lambda x: x): # get the document - doc = self.client.db.tracked.find_one({'_id': ObjectId(doc_id)}) - filedata = self.client.get_version(doc['url']).read() + doc = self.conn.db.tracked.find_one({'_id': ObjectId(doc_id)}) + filedata = self.conn.get_version(doc['url']).read() text = extract_text(filedata, doc['metadata']) # put the document into the data store result = self.upload_document(doc_id, text, doc['metadata']) doc[self.external_store + '_id'] = result - self.client.db.tracked.save(doc, safe=True) + self.conn.db.tracked.save(doc, safe=True) def upload_document(self, doc_id, filedata, metadata): diff --git a/oyster/tests/test_client.py b/oyster/tests/test_client.py deleted file mode 100644 index d425f80..0000000 --- a/oyster/tests/test_client.py +++ /dev/null @@ -1,190 +0,0 @@ -import time -import datetime -from unittest import TestCase - -from nose.tools import assert_raises -import pymongo - -from oyster.client import Client - - -class ClientTests(TestCase): - - def setUp(self): - self.client = Client(mongo_db='oyster_test', retry_wait_minutes=1/60.) - self.client._wipe() - - - def test_constructor(self): - c = Client('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000, - user_agent='test-ua', rpm=30, timeout=60, - retry_attempts=7, retry_wait_minutes=8) - assert c.db.connection.host == '127.0.0.1' - assert c.db.connection.port == 27017 - assert c.db.logs.options()['capped'] == True - assert c.db.logs.options()['size'] == 5000 - assert c.retry_wait_minutes == 8 - # TODO: test retry_attempts - assert c.scraper.user_agent == 'test-ua' - assert c.scraper.requests_per_minute == 30 - assert c.scraper.timeout == 60 - - - def test_log(self): - self.client.log('action1', 'http://example.com') - self.client.log('action2', 'http://test.com', error=True, pi=3) - assert self.client.db.logs.count() == 2 - x = self.client.db.logs.find_one({'error': True}) - assert x['action'] == 'action2' - assert x['url'] == 'http://test.com' - assert x['pi'] == 3 - - - def test_track_url(self): - # basic insert - id1 = self.client.track_url('http://example.com', update_mins=30, pi=3) - obj = self.client.db.tracked.find_one() - assert '_random' in obj - assert obj['update_mins'] == 30 - assert obj['metadata'] == {'pi': 3} - - # logging - log = self.client.db.logs.find_one() - assert log['action'] == 'track' - assert log['url'] == 'http://example.com' - - # track same url again with same metadata returns id - id2 = self.client.track_url('http://example.com', update_mins=30, pi=3) - assert id1 == id2 - - # can't track same URL twice with different metadata - assert_raises(ValueError, self.client.track_url, 'http://example.com') - - # logged error - assert self.client.db.logs.find_one({'error': 'tracking conflict'}) - - - def test_md5_versioning(self): - doc = {'url': 'hello.txt'} - self.client.fs.put('hello!', filename='hello.txt') - assert not self.client.md5_versioning(doc, 'hello!') - assert self.client.md5_versioning(doc, 'hey!') - - - def test_update(self): - # get a single document tracked - self.client.track_url('http://example.com', update_mins=60, pi=3) - obj = self.client.db.tracked.find_one() - self.client.update(obj) - - # check that metadata has been updated - newobj = self.client.db.tracked.find_one() - assert (newobj['last_update'] + - datetime.timedelta(minutes=newobj['update_mins']) == - newobj['next_update']) - first_update = newobj['last_update'] - assert newobj['consecutive_errors'] == 0 - - # check that document exists in database - doc = self.client.fs.get_last_version() - assert doc.filename == 'http://example.com' - assert doc.content_type.startswith('text/html') - assert doc.pi == 3 - - # check logs - assert self.client.db.logs.find({'action': 'update'}).count() == 1 - - # and do an update.. - self.client.update(obj) - - # hopefully example.com hasn't changed, this tests that md5 worked - assert self.client.db.fs.files.count() == 1 - - # check that appropriate metadata updated - newobj = self.client.db.tracked.find_one() - assert first_update < newobj['last_update'] - - # check that logs updated - assert self.client.db.logs.find({'action': 'update'}).count() == 2 - - - def test_update_failure(self): - # track a non-existent URL - self.client.track_url('http://not_a_url') - obj = self.client.db.tracked.find_one() - self.client.update(obj) - - obj = self.client.db.tracked.find_one() - assert obj['consecutive_errors'] == 1 - - # we should have logged an error too - assert self.client.db.logs.find({'action': 'update', - 'error': {'$ne': False}}).count() == 1 - - # update again - self.client.update(obj) - - obj = self.client.db.tracked.find_one() - assert obj['consecutive_errors'] == 2 - - - def test_all_versions(self): - random_url = 'http://en.wikipedia.org/wiki/Special:Random' - self.client.track_url(random_url) - obj = self.client.db.tracked.find_one() - self.client.update(obj) - - versions = self.client.get_all_versions(random_url) - assert versions[0].filename == random_url - - self.client.update(obj) - assert len(self.client.get_all_versions(random_url)) == 2 - - - def test_get_update_queue(self): - self.client.track_url('never-updates', update_mins=0.01) - self.client.track_url('bad-uri', update_mins=0.01) - self.client.track_url('http://example.com', update_mins=0.01) - - never = self.client.db.tracked.find_one(dict(url='never-updates')) - bad = self.client.db.tracked.find_one(dict(url='bad-uri')) - good = self.client.db.tracked.find_one(dict(url='http://example.com')) - - # 3 in queue, ordered by random - queue = self.client.get_update_queue() - assert len(queue) == 3 - assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random'] - - # try and update bad & good - self.client.update(bad) - self.client.update(good) - - # queue should only have never in it - queue = self.client.get_update_queue() - assert queue[0]['_id'] == never['_id'] - - # wait for time to pass so queue should be full - time.sleep(1) - queue = self.client.get_update_queue() - assert len(queue) == 3 - - - def test_get_update_queue_size(self): - self.client.track_url('a', update_mins=0.01) - self.client.track_url('b', update_mins=0.01) - self.client.track_url('c', update_mins=0.01) - - a = self.client.db.tracked.find_one(dict(url='a')) - b = self.client.db.tracked.find_one(dict(url='b')) - c = self.client.db.tracked.find_one(dict(url='c')) - - # size should start at 3 - assert self.client.get_update_queue_size() == 3 - - # goes down one - self.client.update(a) - assert self.client.get_update_queue_size() == 2 - - # wait for it to go back to 3 - time.sleep(1) - assert self.client.get_update_queue_size() == 3 diff --git a/oyster/tests/test_connection.py b/oyster/tests/test_connection.py new file mode 100644 index 0000000..1fe392f --- /dev/null +++ b/oyster/tests/test_connection.py @@ -0,0 +1,190 @@ +import time +import datetime +from unittest import TestCase + +from nose.tools import assert_raises +import pymongo + +from oyster.connection import Connection + + +class ConnectionTests(TestCase): + + def setUp(self): + self.conn = Connection(mongo_db='oyster_test', retry_wait_minutes=1/60.) + self.conn._wipe() + + + def test_constructor(self): + c = Connection('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000, + user_agent='test-ua', rpm=30, timeout=60, + retry_attempts=7, retry_wait_minutes=8) + assert c.db.connection.host == '127.0.0.1' + assert c.db.connection.port == 27017 + assert c.db.logs.options()['capped'] == True + assert c.db.logs.options()['size'] == 5000 + assert c.retry_wait_minutes == 8 + # TODO: test retry_attempts + assert c.scraper.user_agent == 'test-ua' + assert c.scraper.requests_per_minute == 30 + assert c.scraper.timeout == 60 + + + def test_log(self): + self.conn.log('action1', 'http://example.com') + self.conn.log('action2', 'http://test.com', error=True, pi=3) + assert self.conn.db.logs.count() == 2 + x = self.conn.db.logs.find_one({'error': True}) + assert x['action'] == 'action2' + assert x['url'] == 'http://test.com' + assert x['pi'] == 3 + + + def test_track_url(self): + # basic insert + id1 = self.conn.track_url('http://example.com', update_mins=30, pi=3) + obj = self.conn.db.tracked.find_one() + assert '_random' in obj + assert obj['update_mins'] == 30 + assert obj['metadata'] == {'pi': 3} + + # logging + log = self.conn.db.logs.find_one() + assert log['action'] == 'track' + assert log['url'] == 'http://example.com' + + # track same url again with same metadata returns id + id2 = self.conn.track_url('http://example.com', update_mins=30, pi=3) + assert id1 == id2 + + # can't track same URL twice with different metadata + assert_raises(ValueError, self.conn.track_url, 'http://example.com') + + # logged error + assert self.conn.db.logs.find_one({'error': 'tracking conflict'}) + + + def test_md5_versioning(self): + doc = {'url': 'hello.txt'} + self.conn.fs.put('hello!', filename='hello.txt') + assert not self.conn.md5_versioning(doc, 'hello!') + assert self.conn.md5_versioning(doc, 'hey!') + + + def test_update(self): + # get a single document tracked + self.conn.track_url('http://example.com', update_mins=60, pi=3) + obj = self.conn.db.tracked.find_one() + self.conn.update(obj) + + # check that metadata has been updated + newobj = self.conn.db.tracked.find_one() + assert (newobj['last_update'] + + datetime.timedelta(minutes=newobj['update_mins']) == + newobj['next_update']) + first_update = newobj['last_update'] + assert newobj['consecutive_errors'] == 0 + + # check that document exists in database + doc = self.conn.fs.get_last_version() + assert doc.filename == 'http://example.com' + assert doc.content_type.startswith('text/html') + assert doc.pi == 3 + + # check logs + assert self.conn.db.logs.find({'action': 'update'}).count() == 1 + + # and do an update.. + self.conn.update(obj) + + # hopefully example.com hasn't changed, this tests that md5 worked + assert self.conn.db.fs.files.count() == 1 + + # check that appropriate metadata updated + newobj = self.conn.db.tracked.find_one() + assert first_update < newobj['last_update'] + + # check that logs updated + assert self.conn.db.logs.find({'action': 'update'}).count() == 2 + + + def test_update_failure(self): + # track a non-existent URL + self.conn.track_url('http://not_a_url') + obj = self.conn.db.tracked.find_one() + self.conn.update(obj) + + obj = self.conn.db.tracked.find_one() + assert obj['consecutive_errors'] == 1 + + # we should have logged an error too + assert self.conn.db.logs.find({'action': 'update', + 'error': {'$ne': False}}).count() == 1 + + # update again + self.conn.update(obj) + + obj = self.conn.db.tracked.find_one() + assert obj['consecutive_errors'] == 2 + + + def test_all_versions(self): + random_url = 'http://en.wikipedia.org/wiki/Special:Random' + self.conn.track_url(random_url) + obj = self.conn.db.tracked.find_one() + self.conn.update(obj) + + versions = self.conn.get_all_versions(random_url) + assert versions[0].filename == random_url + + self.conn.update(obj) + assert len(self.conn.get_all_versions(random_url)) == 2 + + + def test_get_update_queue(self): + self.conn.track_url('never-updates', update_mins=0.01) + self.conn.track_url('bad-uri', update_mins=0.01) + self.conn.track_url('http://example.com', update_mins=0.01) + + never = self.conn.db.tracked.find_one(dict(url='never-updates')) + bad = self.conn.db.tracked.find_one(dict(url='bad-uri')) + good = self.conn.db.tracked.find_one(dict(url='http://example.com')) + + # 3 in queue, ordered by random + queue = self.conn.get_update_queue() + assert len(queue) == 3 + assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random'] + + # try and update bad & good + self.conn.update(bad) + self.conn.update(good) + + # queue should only have never in it + queue = self.conn.get_update_queue() + assert queue[0]['_id'] == never['_id'] + + # wait for time to pass so queue should be full + time.sleep(1) + queue = self.conn.get_update_queue() + assert len(queue) == 3 + + + def test_get_update_queue_size(self): + self.conn.track_url('a', update_mins=0.01) + self.conn.track_url('b', update_mins=0.01) + self.conn.track_url('c', update_mins=0.01) + + a = self.conn.db.tracked.find_one(dict(url='a')) + b = self.conn.db.tracked.find_one(dict(url='b')) + c = self.conn.db.tracked.find_one(dict(url='c')) + + # size should start at 3 + assert self.conn.get_update_queue_size() == 3 + + # goes down one + self.conn.update(a) + assert self.conn.get_update_queue_size() == 2 + + # wait for it to go back to 3 + time.sleep(1) + assert self.conn.get_update_queue_size() == 3 diff --git a/oyster/web.py b/oyster/web.py index b41136e..9b986d3 100644 --- a/oyster/web.py +++ b/oyster/web.py @@ -7,7 +7,7 @@ import flask import pymongo.objectid from oyster.conf import settings -from oyster.client import get_configured_client +from oyster.connection import get_configured_connection class JSONEncoder(json.JSONEncoder): @@ -43,16 +43,16 @@ def api_wrapper(template=None): app = flask.Flask('oyster') -client = get_configured_client() +conn = get_configured_connection() @app.route('/') @api_wrapper('index.html') def index(): status = { - 'tracking': client.db.tracked.count(), - 'need_update': client.get_update_queue_size(), - 'logs': list(client.db.logs.find().sort('$natural', -1).limit(20)), + 'tracking': conn.db.tracked.count(), + 'need_update': conn.get_update_queue_size(), + 'logs': list(conn.db.logs.find().sort('$natural', -1).limit(20)), 'mongo_host': settings.MONGO_HOST, } return status @@ -62,8 +62,8 @@ def index(): @api_wrapper() def doc_list(): status = { - 'tracking': client.db.tracked.count(), - 'need_update': client.get_update_queue_size(), + 'tracking': conn.db.tracked.count(), + 'need_update': conn.get_update_queue_size(), } return status @@ -75,7 +75,7 @@ def log_view(): size = 100 prev_offset = max(offset - size, 0) next_offset = offset + size - logs = client.db.logs.find().sort('$natural', -1).skip(offset).limit(size) + logs = conn.db.logs.find().sort('$natural', -1).skip(offset).limit(size) return dict(logs=list(logs), prev_offset=prev_offset, next_offset=next_offset, offset=offset) @@ -83,14 +83,14 @@ def log_view(): @app.route('/tracked/') @api_wrapper() def tracked(): - tracked = list(client.db.tracked.find()) + tracked = list(conn.db.tracked.find()) return json.dumps(tracked, cls=JSONEncoder) @app.route('/tracked/') def tracked_view(url): url = _path_fixer(url) - doc = client.db.tracked.find_one({'url': url}) + doc = conn.db.tracked.find_one({'url': url}) return json.dumps(doc, cls=JSONEncoder) @@ -99,7 +99,7 @@ def show_doc(url, version): url = _path_fixer(url) if version == 'latest': version = -1 - doc = client.get_version(url, version) + doc = conn.get_version(url, version) resp = flask.make_response(doc.read()) resp.headers['content-type'] = doc.content_type return resp