rename client to connection

This commit is contained in:
James Turk 2012-02-13 14:34:45 -05:00
parent 7526280333
commit 3539e50c9d
5 changed files with 228 additions and 228 deletions

View File

@ -9,21 +9,21 @@ import gridfs
import scrapelib import scrapelib
def get_configured_client(): def get_configured_connection():
""" helper factory, gets a client configured with oyster.conf.settings """ """ factory, gets a connection configured with oyster.conf.settings """
from oyster.conf import settings from oyster.conf import settings
return Client(mongo_host=settings.MONGO_HOST, return Connection(mongo_host=settings.MONGO_HOST,
mongo_port=settings.MONGO_PORT, mongo_port=settings.MONGO_PORT,
mongo_db=settings.MONGO_DATABASE, mongo_db=settings.MONGO_DATABASE,
mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE, mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE,
user_agent=settings.USER_AGENT, user_agent=settings.USER_AGENT,
rpm=settings.REQUESTS_PER_MINUTE, rpm=settings.REQUESTS_PER_MINUTE,
timeout=settings.REQUEST_TIMEOUT, timeout=settings.REQUEST_TIMEOUT,
retry_attempts=settings.RETRY_ATTEMPTS, retry_attempts=settings.RETRY_ATTEMPTS,
retry_wait_minutes=settings.RETRY_WAIT_MINUTES) retry_wait_minutes=settings.RETRY_WAIT_MINUTES)
class Client(object): class Connection(object):
""" oyster's workhorse, handles tracking """ """ oyster's workhorse, handles tracking """
def __init__(self, mongo_host='localhost', mongo_port=27017, def __init__(self, mongo_host='localhost', mongo_port=27017,

View File

@ -4,7 +4,7 @@ from celery.execute import send_task
from pymongo.objectid import ObjectId from pymongo.objectid import ObjectId
from oyster.conf import settings from oyster.conf import settings
from oyster.client import get_configured_client from oyster.connection import get_configured_connection
class UpdateTask(Task): class UpdateTask(Task):
@ -13,16 +13,16 @@ class UpdateTask(Task):
ignore_result = True ignore_result = True
def __init__(self): def __init__(self):
# one client per process # one connection per process
self.client = get_configured_client() self.conn = get_configured_connection()
def run(self, doc_id): def run(self, doc_id):
doc = self.client.db.tracked.find_one({'_id': doc_id}) doc = self.conn.db.tracked.find_one({'_id': doc_id})
self.client.update(doc) self.conn.update(doc)
for hook in doc.get('post_update_hooks', []): for hook in doc.get('post_update_hooks', []):
send_task(hook, (doc_id,)) send_task(hook, (doc_id,))
self.client.db.status.update({}, {'$inc': {'update_queue': -1}}) self.conn.db.status.update({}, {'$inc': {'update_queue': -1}})
class UpdateTaskScheduler(PeriodicTask): class UpdateTaskScheduler(PeriodicTask):
@ -30,19 +30,19 @@ class UpdateTaskScheduler(PeriodicTask):
# 60s tick # 60s tick
run_every = 60 run_every = 60
client = get_configured_client() conn = get_configured_connection()
def run(self): def run(self):
# if the update queue isn't empty, wait to add more # if the update queue isn't empty, wait to add more
# (currently the only way we avoid duplicates) # (currently the only way we avoid duplicates)
# alternate option would be to set a _queued flag on documents # alternate option would be to set a _queued flag on documents
if self.client.db.status.find_one()['update_queue']: if self.conn.db.status.find_one()['update_queue']:
return return
next_set = self.client.get_update_queue() next_set = self.conn.get_update_queue()
for doc in next_set: for doc in next_set:
UpdateTask.delay(doc['_id']) UpdateTask.delay(doc['_id'])
self.client.db.status.update({}, {'$inc': {'update_queue': 1}}) self.conn.db.status.update({}, {'$inc': {'update_queue': 1}})
class ExternalStoreTask(Task): class ExternalStoreTask(Task):
@ -61,20 +61,20 @@ class ExternalStoreTask(Task):
abstract = True abstract = True
def __init__(self): def __init__(self):
# one client per process # one connection per process
self.client = get_configured_client() self.conn = get_configured_connection()
def run(self, doc_id, extract_text=lambda x: x): def run(self, doc_id, extract_text=lambda x: x):
# get the document # get the document
doc = self.client.db.tracked.find_one({'_id': ObjectId(doc_id)}) doc = self.conn.db.tracked.find_one({'_id': ObjectId(doc_id)})
filedata = self.client.get_version(doc['url']).read() filedata = self.conn.get_version(doc['url']).read()
text = extract_text(filedata, doc['metadata']) text = extract_text(filedata, doc['metadata'])
# put the document into the data store # put the document into the data store
result = self.upload_document(doc_id, text, doc['metadata']) result = self.upload_document(doc_id, text, doc['metadata'])
doc[self.external_store + '_id'] = result doc[self.external_store + '_id'] = result
self.client.db.tracked.save(doc, safe=True) self.conn.db.tracked.save(doc, safe=True)
def upload_document(self, doc_id, filedata, metadata): def upload_document(self, doc_id, filedata, metadata):

View File

@ -1,190 +0,0 @@
import time
import datetime
from unittest import TestCase
from nose.tools import assert_raises
import pymongo
from oyster.client import Client
class ClientTests(TestCase):
def setUp(self):
self.client = Client(mongo_db='oyster_test', retry_wait_minutes=1/60.)
self.client._wipe()
def test_constructor(self):
c = Client('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
user_agent='test-ua', rpm=30, timeout=60,
retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017
assert c.db.logs.options()['capped'] == True
assert c.db.logs.options()['size'] == 5000
assert c.retry_wait_minutes == 8
# TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua'
assert c.scraper.requests_per_minute == 30
assert c.scraper.timeout == 60
def test_log(self):
self.client.log('action1', 'http://example.com')
self.client.log('action2', 'http://test.com', error=True, pi=3)
assert self.client.db.logs.count() == 2
x = self.client.db.logs.find_one({'error': True})
assert x['action'] == 'action2'
assert x['url'] == 'http://test.com'
assert x['pi'] == 3
def test_track_url(self):
# basic insert
id1 = self.client.track_url('http://example.com', update_mins=30, pi=3)
obj = self.client.db.tracked.find_one()
assert '_random' in obj
assert obj['update_mins'] == 30
assert obj['metadata'] == {'pi': 3}
# logging
log = self.client.db.logs.find_one()
assert log['action'] == 'track'
assert log['url'] == 'http://example.com'
# track same url again with same metadata returns id
id2 = self.client.track_url('http://example.com', update_mins=30, pi=3)
assert id1 == id2
# can't track same URL twice with different metadata
assert_raises(ValueError, self.client.track_url, 'http://example.com')
# logged error
assert self.client.db.logs.find_one({'error': 'tracking conflict'})
def test_md5_versioning(self):
doc = {'url': 'hello.txt'}
self.client.fs.put('hello!', filename='hello.txt')
assert not self.client.md5_versioning(doc, 'hello!')
assert self.client.md5_versioning(doc, 'hey!')
def test_update(self):
# get a single document tracked
self.client.track_url('http://example.com', update_mins=60, pi=3)
obj = self.client.db.tracked.find_one()
self.client.update(obj)
# check that metadata has been updated
newobj = self.client.db.tracked.find_one()
assert (newobj['last_update'] +
datetime.timedelta(minutes=newobj['update_mins']) ==
newobj['next_update'])
first_update = newobj['last_update']
assert newobj['consecutive_errors'] == 0
# check that document exists in database
doc = self.client.fs.get_last_version()
assert doc.filename == 'http://example.com'
assert doc.content_type.startswith('text/html')
assert doc.pi == 3
# check logs
assert self.client.db.logs.find({'action': 'update'}).count() == 1
# and do an update..
self.client.update(obj)
# hopefully example.com hasn't changed, this tests that md5 worked
assert self.client.db.fs.files.count() == 1
# check that appropriate metadata updated
newobj = self.client.db.tracked.find_one()
assert first_update < newobj['last_update']
# check that logs updated
assert self.client.db.logs.find({'action': 'update'}).count() == 2
def test_update_failure(self):
# track a non-existent URL
self.client.track_url('http://not_a_url')
obj = self.client.db.tracked.find_one()
self.client.update(obj)
obj = self.client.db.tracked.find_one()
assert obj['consecutive_errors'] == 1
# we should have logged an error too
assert self.client.db.logs.find({'action': 'update',
'error': {'$ne': False}}).count() == 1
# update again
self.client.update(obj)
obj = self.client.db.tracked.find_one()
assert obj['consecutive_errors'] == 2
def test_all_versions(self):
random_url = 'http://en.wikipedia.org/wiki/Special:Random'
self.client.track_url(random_url)
obj = self.client.db.tracked.find_one()
self.client.update(obj)
versions = self.client.get_all_versions(random_url)
assert versions[0].filename == random_url
self.client.update(obj)
assert len(self.client.get_all_versions(random_url)) == 2
def test_get_update_queue(self):
self.client.track_url('never-updates', update_mins=0.01)
self.client.track_url('bad-uri', update_mins=0.01)
self.client.track_url('http://example.com', update_mins=0.01)
never = self.client.db.tracked.find_one(dict(url='never-updates'))
bad = self.client.db.tracked.find_one(dict(url='bad-uri'))
good = self.client.db.tracked.find_one(dict(url='http://example.com'))
# 3 in queue, ordered by random
queue = self.client.get_update_queue()
assert len(queue) == 3
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
# try and update bad & good
self.client.update(bad)
self.client.update(good)
# queue should only have never in it
queue = self.client.get_update_queue()
assert queue[0]['_id'] == never['_id']
# wait for time to pass so queue should be full
time.sleep(1)
queue = self.client.get_update_queue()
assert len(queue) == 3
def test_get_update_queue_size(self):
self.client.track_url('a', update_mins=0.01)
self.client.track_url('b', update_mins=0.01)
self.client.track_url('c', update_mins=0.01)
a = self.client.db.tracked.find_one(dict(url='a'))
b = self.client.db.tracked.find_one(dict(url='b'))
c = self.client.db.tracked.find_one(dict(url='c'))
# size should start at 3
assert self.client.get_update_queue_size() == 3
# goes down one
self.client.update(a)
assert self.client.get_update_queue_size() == 2
# wait for it to go back to 3
time.sleep(1)
assert self.client.get_update_queue_size() == 3

View File

@ -0,0 +1,190 @@
import time
import datetime
from unittest import TestCase
from nose.tools import assert_raises
import pymongo
from oyster.connection import Connection
class ConnectionTests(TestCase):
def setUp(self):
self.conn = Connection(mongo_db='oyster_test', retry_wait_minutes=1/60.)
self.conn._wipe()
def test_constructor(self):
c = Connection('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
user_agent='test-ua', rpm=30, timeout=60,
retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017
assert c.db.logs.options()['capped'] == True
assert c.db.logs.options()['size'] == 5000
assert c.retry_wait_minutes == 8
# TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua'
assert c.scraper.requests_per_minute == 30
assert c.scraper.timeout == 60
def test_log(self):
self.conn.log('action1', 'http://example.com')
self.conn.log('action2', 'http://test.com', error=True, pi=3)
assert self.conn.db.logs.count() == 2
x = self.conn.db.logs.find_one({'error': True})
assert x['action'] == 'action2'
assert x['url'] == 'http://test.com'
assert x['pi'] == 3
def test_track_url(self):
# basic insert
id1 = self.conn.track_url('http://example.com', update_mins=30, pi=3)
obj = self.conn.db.tracked.find_one()
assert '_random' in obj
assert obj['update_mins'] == 30
assert obj['metadata'] == {'pi': 3}
# logging
log = self.conn.db.logs.find_one()
assert log['action'] == 'track'
assert log['url'] == 'http://example.com'
# track same url again with same metadata returns id
id2 = self.conn.track_url('http://example.com', update_mins=30, pi=3)
assert id1 == id2
# can't track same URL twice with different metadata
assert_raises(ValueError, self.conn.track_url, 'http://example.com')
# logged error
assert self.conn.db.logs.find_one({'error': 'tracking conflict'})
def test_md5_versioning(self):
doc = {'url': 'hello.txt'}
self.conn.fs.put('hello!', filename='hello.txt')
assert not self.conn.md5_versioning(doc, 'hello!')
assert self.conn.md5_versioning(doc, 'hey!')
def test_update(self):
# get a single document tracked
self.conn.track_url('http://example.com', update_mins=60, pi=3)
obj = self.conn.db.tracked.find_one()
self.conn.update(obj)
# check that metadata has been updated
newobj = self.conn.db.tracked.find_one()
assert (newobj['last_update'] +
datetime.timedelta(minutes=newobj['update_mins']) ==
newobj['next_update'])
first_update = newobj['last_update']
assert newobj['consecutive_errors'] == 0
# check that document exists in database
doc = self.conn.fs.get_last_version()
assert doc.filename == 'http://example.com'
assert doc.content_type.startswith('text/html')
assert doc.pi == 3
# check logs
assert self.conn.db.logs.find({'action': 'update'}).count() == 1
# and do an update..
self.conn.update(obj)
# hopefully example.com hasn't changed, this tests that md5 worked
assert self.conn.db.fs.files.count() == 1
# check that appropriate metadata updated
newobj = self.conn.db.tracked.find_one()
assert first_update < newobj['last_update']
# check that logs updated
assert self.conn.db.logs.find({'action': 'update'}).count() == 2
def test_update_failure(self):
# track a non-existent URL
self.conn.track_url('http://not_a_url')
obj = self.conn.db.tracked.find_one()
self.conn.update(obj)
obj = self.conn.db.tracked.find_one()
assert obj['consecutive_errors'] == 1
# we should have logged an error too
assert self.conn.db.logs.find({'action': 'update',
'error': {'$ne': False}}).count() == 1
# update again
self.conn.update(obj)
obj = self.conn.db.tracked.find_one()
assert obj['consecutive_errors'] == 2
def test_all_versions(self):
random_url = 'http://en.wikipedia.org/wiki/Special:Random'
self.conn.track_url(random_url)
obj = self.conn.db.tracked.find_one()
self.conn.update(obj)
versions = self.conn.get_all_versions(random_url)
assert versions[0].filename == random_url
self.conn.update(obj)
assert len(self.conn.get_all_versions(random_url)) == 2
def test_get_update_queue(self):
self.conn.track_url('never-updates', update_mins=0.01)
self.conn.track_url('bad-uri', update_mins=0.01)
self.conn.track_url('http://example.com', update_mins=0.01)
never = self.conn.db.tracked.find_one(dict(url='never-updates'))
bad = self.conn.db.tracked.find_one(dict(url='bad-uri'))
good = self.conn.db.tracked.find_one(dict(url='http://example.com'))
# 3 in queue, ordered by random
queue = self.conn.get_update_queue()
assert len(queue) == 3
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
# try and update bad & good
self.conn.update(bad)
self.conn.update(good)
# queue should only have never in it
queue = self.conn.get_update_queue()
assert queue[0]['_id'] == never['_id']
# wait for time to pass so queue should be full
time.sleep(1)
queue = self.conn.get_update_queue()
assert len(queue) == 3
def test_get_update_queue_size(self):
self.conn.track_url('a', update_mins=0.01)
self.conn.track_url('b', update_mins=0.01)
self.conn.track_url('c', update_mins=0.01)
a = self.conn.db.tracked.find_one(dict(url='a'))
b = self.conn.db.tracked.find_one(dict(url='b'))
c = self.conn.db.tracked.find_one(dict(url='c'))
# size should start at 3
assert self.conn.get_update_queue_size() == 3
# goes down one
self.conn.update(a)
assert self.conn.get_update_queue_size() == 2
# wait for it to go back to 3
time.sleep(1)
assert self.conn.get_update_queue_size() == 3

View File

@ -7,7 +7,7 @@ import flask
import pymongo.objectid import pymongo.objectid
from oyster.conf import settings from oyster.conf import settings
from oyster.client import get_configured_client from oyster.connection import get_configured_connection
class JSONEncoder(json.JSONEncoder): class JSONEncoder(json.JSONEncoder):
@ -43,16 +43,16 @@ def api_wrapper(template=None):
app = flask.Flask('oyster') app = flask.Flask('oyster')
client = get_configured_client() conn = get_configured_connection()
@app.route('/') @app.route('/')
@api_wrapper('index.html') @api_wrapper('index.html')
def index(): def index():
status = { status = {
'tracking': client.db.tracked.count(), 'tracking': conn.db.tracked.count(),
'need_update': client.get_update_queue_size(), 'need_update': conn.get_update_queue_size(),
'logs': list(client.db.logs.find().sort('$natural', -1).limit(20)), 'logs': list(conn.db.logs.find().sort('$natural', -1).limit(20)),
'mongo_host': settings.MONGO_HOST, 'mongo_host': settings.MONGO_HOST,
} }
return status return status
@ -62,8 +62,8 @@ def index():
@api_wrapper() @api_wrapper()
def doc_list(): def doc_list():
status = { status = {
'tracking': client.db.tracked.count(), 'tracking': conn.db.tracked.count(),
'need_update': client.get_update_queue_size(), 'need_update': conn.get_update_queue_size(),
} }
return status return status
@ -75,7 +75,7 @@ def log_view():
size = 100 size = 100
prev_offset = max(offset - size, 0) prev_offset = max(offset - size, 0)
next_offset = offset + size next_offset = offset + size
logs = client.db.logs.find().sort('$natural', -1).skip(offset).limit(size) logs = conn.db.logs.find().sort('$natural', -1).skip(offset).limit(size)
return dict(logs=list(logs), prev_offset=prev_offset, return dict(logs=list(logs), prev_offset=prev_offset,
next_offset=next_offset, offset=offset) next_offset=next_offset, offset=offset)
@ -83,14 +83,14 @@ def log_view():
@app.route('/tracked/') @app.route('/tracked/')
@api_wrapper() @api_wrapper()
def tracked(): def tracked():
tracked = list(client.db.tracked.find()) tracked = list(conn.db.tracked.find())
return json.dumps(tracked, cls=JSONEncoder) return json.dumps(tracked, cls=JSONEncoder)
@app.route('/tracked/<path:url>') @app.route('/tracked/<path:url>')
def tracked_view(url): def tracked_view(url):
url = _path_fixer(url) url = _path_fixer(url)
doc = client.db.tracked.find_one({'url': url}) doc = conn.db.tracked.find_one({'url': url})
return json.dumps(doc, cls=JSONEncoder) return json.dumps(doc, cls=JSONEncoder)
@ -99,7 +99,7 @@ def show_doc(url, version):
url = _path_fixer(url) url = _path_fixer(url)
if version == 'latest': if version == 'latest':
version = -1 version = -1
doc = client.get_version(url, version) doc = conn.get_version(url, version)
resp = flask.make_response(doc.read()) resp = flask.make_response(doc.read())
resp.headers['content-type'] = doc.content_type resp.headers['content-type'] = doc.content_type
return resp return resp