Compare commits

..

No commits in common. "master" and "0.3.1" have entirely different histories.

24 changed files with 172 additions and 433 deletions

2
.gitignore vendored
View File

@ -1,5 +1,3 @@
celerybeat-schedule celerybeat-schedule
oyster_settings.py oyster_settings.py
oyster.egg-info/
*.pyc *.pyc
.tox

View File

@ -1,10 +0,0 @@
language: python
python:
- "2.6"
- "2.7"
install: pip install scrapelib pymongo nose celery --use-mirrors
script: nosetests
services: mongodb
notifications:
email:
- jturk@sunlightfoundation.com

View File

@ -1,5 +1,3 @@
**DEPRECATED** - this project is abandoned & will not be seeing future updates
====== ======
oyster oyster
====== ======

View File

@ -1,22 +1,6 @@
oyster changelog oyster changelog
================ ================
0.4.0-dev
---------
* S3 storage backend bugfix
* lots of improvements to signal script
* oyster.ext cloudsearch, elasticsearch, and superfastmatch
* use python logging w/ mongo handler
* add tox/python setup.py test (thanks Marc Abramowitz!)
0.3.2
-----
**2012-03-29**
* become much more tolerant of duplicates
* skip S3 test if not prepared
* use doc_class AWS_PREFIX and AWS_BUCKET if set
* add DEFAULT_STORAGE_ENGINE setting
0.3.1 0.3.1
----- -----
**2012-03-10** **2012-03-10**

View File

@ -1,4 +1,4 @@
__version__ = "0.4.0-dev" __version__ = "0.3.1"
import os import os
os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig' os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig'

View File

@ -1,3 +1,13 @@
from oyster.conf import settings from oyster.conf import settings
CELERY_IMPORTS = ['oyster.tasks'] + list(settings.CELERY_TASK_MODULES) CELERY_IMPORTS = ["oyster.tasks"] + list(settings.CELERY_TASK_MODULES)
BROKER_TRANSPORT = 'mongodb'
BROKER_HOST = settings.MONGO_HOST
BROKER_PORT = settings.MONGO_PORT
CELERY_RESULT_BACKEND = 'mongodb'
CELERY_MONGODB_BACKEND_SETTINGS = {
'host': settings.MONGO_HOST,
'port': settings.MONGO_PORT,
}

View File

@ -17,5 +17,3 @@ RETRY_ATTEMPTS = 3
RETRY_WAIT_MINUTES = 60 RETRY_WAIT_MINUTES = 60
DOCUMENT_CLASSES = {} DOCUMENT_CLASSES = {}
DEFAULT_STORAGE_ENGINE = 'dummy'

View File

@ -1,5 +1,4 @@
import datetime import datetime
import logging
import hashlib import hashlib
import random import random
import sys import sys
@ -7,9 +6,7 @@ import sys
import pymongo import pymongo
import scrapelib import scrapelib
from .mongolog import MongoHandler
from .storage import engines from .storage import engines
from celery.execute import send_task
class Kernel(object): class Kernel(object):
@ -19,28 +16,27 @@ class Kernel(object):
mongo_db='oyster', mongo_log_maxsize=100000000, mongo_db='oyster', mongo_log_maxsize=100000000,
user_agent='oyster', rpm=60, timeout=300, user_agent='oyster', rpm=60, timeout=300,
retry_attempts=3, retry_wait_minutes=60, retry_attempts=3, retry_wait_minutes=60,
doc_classes=None, default_storage_engine='dummy', doc_classes=None,
): ):
""" """
configurable for ease of testing, only one should be instantiated configurable for ease of testing, only one should be instantiated
""" """
# set up the log # set up a capped log if it doesn't exist
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db] self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
try:
self.log = logging.getLogger('oyster') self.db.create_collection('logs', capped=True,
self.log.setLevel(logging.DEBUG) size=mongo_log_maxsize)
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host, except pymongo.errors.CollectionInvalid:
port=mongo_port, # cap collection if not capped?
capped_size=mongo_log_maxsize)) pass
# create status document if it doesn't exist # create status document if it doesn't exist
if self.db.status.count() == 0: if self.db.status.count() == 0:
self.db.status.insert({'update_queue': 0}) self.db.status.insert({'update_queue': 0})
# ensure an index on _random # ensure an index on _random
self.db.tracked.ensure_index('_random') self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)])
self.db.tracked.ensure_index('url')
self.scraper = scrapelib.Scraper(user_agent=user_agent, self.scraper = scrapelib.Scraper(user_agent=user_agent,
requests_per_minute=rpm, requests_per_minute=rpm,
@ -57,16 +53,14 @@ class Kernel(object):
self.storage[name] = StorageCls(self) self.storage[name] = StorageCls(self)
# set document classes # set document classes
_doc_class_fields = ('update_mins', 'onchanged') _doc_class_fields = ('update_mins', 'storage_engine',
'onchanged')
self.doc_classes = doc_classes or {} self.doc_classes = doc_classes or {}
for dc_name, dc_props in self.doc_classes.iteritems(): for dc_name, dc_props in self.doc_classes.iteritems():
for key in _doc_class_fields: for key in _doc_class_fields:
if key not in dc_props: if key not in dc_props:
raise ValueError('doc_class %s missing key %s' % raise ValueError('doc_class %s missing key %s' %
(dc_name, key)) (dc_name, key))
# set a default storage engine
if 'storage_engine' not in dc_props:
dc_props['storage_engine'] = default_storage_engine
def _wipe(self): def _wipe(self):
""" exists primarily for debug use, wipes entire db """ """ exists primarily for debug use, wipes entire db """
@ -74,6 +68,14 @@ class Kernel(object):
self.db.drop_collection('logs') self.db.drop_collection('logs')
self.db.drop_collection('status') self.db.drop_collection('status')
def log(self, action, url, error=False, **kwargs):
""" add an entry to the oyster log """
kwargs['action'] = action
kwargs['url'] = url
kwargs['error'] = error
kwargs['timestamp'] = datetime.datetime.utcnow()
self.db.logs.insert(kwargs)
def _add_doc_class(self, doc_class, **properties): def _add_doc_class(self, doc_class, **properties):
self.doc_classes[doc_class] = properties self.doc_classes[doc_class] = properties
@ -89,44 +91,33 @@ class Kernel(object):
any keyword args will be added to the document's metadata any keyword args will be added to the document's metadata
""" """
if doc_class not in self.doc_classes: if doc_class not in self.doc_classes:
error = 'error tracking %s, unregistered doc_class %s' raise ValueError('unregistered doc_class %s' % doc_class)
self.log.error(error, url, doc_class)
raise ValueError(error % (url, doc_class))
# try and find an existing version of this document tracked = self.db.tracked.find_one({'url': url})
tracked = None
if id: # if data is already tracked and this is just a duplicate call
tracked = self.db.tracked.find_one({'_id': id}) # return the original object
else:
tracked = self.db.tracked.find_one({'url': url})
# if id exists, ensure that URL and doc_class haven't changed
# then return existing data (possibly with refreshed metadata)
if tracked: if tracked:
if (tracked['url'] == url and # only check id if id was passed in
tracked['doc_class'] == doc_class): id_matches = (tracked['_id'] == id) if id else True
if kwargs != tracked['metadata']: if (tracked['metadata'] == kwargs and
tracked['metadata'] = kwargs tracked['doc_class'] == doc_class and
self.db.tracked.save(tracked, safe=True) id_matches):
return tracked['_id'] return tracked['_id']
else: else:
# id existed but with different URL self.log('track', url=url, error='tracking conflict')
message = ('%s already exists with different data (tracked: ' raise ValueError('%s is already tracked with different '
'%s, %s) (new: %s, %s)') 'metadata: (tracked: %r) (new: %r)' %
args = (tracked['_id'], tracked['url'], tracked['doc_class'], (url, tracked['metadata'], kwargs))
url, doc_class)
self.log.error(message, *args)
raise ValueError(message % args)
self.log.info('tracked %s [%s]', url, id) self.log('track', url=url)
newdoc = dict(url=url, doc_class=doc_class, newdoc = dict(url=url, doc_class=doc_class,
_random=random.randint(0, sys.maxint), _random=random.randint(0, sys.maxint),
versions=[], metadata=kwargs) versions=[], metadata=kwargs)
if id: if id:
newdoc['_id'] = id newdoc['_id'] = id
return self.db.tracked.insert(newdoc, safe=True) return self.db.tracked.insert(newdoc)
def md5_versioning(self, olddata, newdata): def md5_versioning(self, olddata, newdata):
""" return True if md5 changed or if file is new """ """ return True if md5 changed or if file is new """
@ -181,8 +172,8 @@ class Kernel(object):
'storage_type': storage.storage_type, 'storage_type': storage.storage_type,
}) })
# fire off onchanged functions # fire off onchanged functions
for onchanged in doc_class.get('onchanged', []): for onchanged in doc_class['onchanged']:
send_task(onchanged, (doc['_id'],)) onchanged(doc, newdata)
if error: if error:
# if there's been an error, increment the consecutive_errors count # if there's been an error, increment the consecutive_errors count
@ -202,11 +193,7 @@ class Kernel(object):
else: else:
doc['next_update'] = None doc['next_update'] = None
if error: self.log('update', url=url, new_doc=new_version, error=error)
self.log.warning('error updating %s [%s]', url, doc['_id'])
else:
new_version = ' (new)'
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
self.db.tracked.save(doc, safe=True) self.db.tracked.save(doc, safe=True)
@ -255,16 +242,6 @@ class Kernel(object):
storage = self.storage[doc_class['storage_engine']] storage = self.storage[doc_class['storage_engine']]
return storage.get(doc['versions'][-1]['storage_key']) return storage.get(doc['versions'][-1]['storage_key'])
def extract_text(self, doc):
version = self.get_last_version(doc)
doc_class = self.doc_classes[doc['doc_class']]
try:
extract_text = doc_class['extract_text']
except KeyError:
raise ValueError('doc_class %s missing extract_text' %
doc['doc_class'])
return extract_text(doc, version)
def _get_configured_kernel(): def _get_configured_kernel():
""" factory, gets a connection configured with oyster.conf.settings """ """ factory, gets a connection configured with oyster.conf.settings """
@ -279,7 +256,6 @@ def _get_configured_kernel():
retry_attempts=settings.RETRY_ATTEMPTS, retry_attempts=settings.RETRY_ATTEMPTS,
retry_wait_minutes=settings.RETRY_WAIT_MINUTES, retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
doc_classes=settings.DOCUMENT_CLASSES, doc_classes=settings.DOCUMENT_CLASSES,
default_storage_engine=settings.DEFAULT_STORAGE_ENGINE,
) )
kernel = _get_configured_kernel() kernel = _get_configured_kernel()

View File

View File

@ -1,32 +0,0 @@
# needed so we can import cloudsearch
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from cloudsearch import CloudSearch
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
class CloudSearchPush(Task):
""" task that updates documents """
# results go straight to database
ignore_result = True
# a bit under 1MB
MAX_BYTES = 1048000
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
pieces = [text[i:i+self.MAX_BYTES] for i in
xrange(0, len(text), self.MAX_BYTES)]
self.get_logger().debug('adding {0} pieces for {1}'.format(
len(pieces), doc_id))
for i, piece in enumerate(pieces):
cloud_id = '%s_%s' % (doc_id.lower(), i)
cs.add_document(cloud_id, text=piece, **doc['metadata'])

View File

@ -1,36 +0,0 @@
import logging
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from pyes import ES
es = ES(settings.ELASTICSEARCH_HOST)
log = logging.getLogger('oyster.ext.elasticsearch')
class ElasticSearchPush(Task):
# results go straight to elasticsearch
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
try:
text = kernel.extract_text(doc)
if not text:
log.info('no text for %s', doc_id,
extra={'doc_class':doc['doc_class']})
return
log.info('tracked %s', doc_id,
extra={'doc_class':doc['doc_class']})
es.index(dict(doc['metadata'], text=text),
settings.ELASTICSEARCH_INDEX,
settings.ELASTICSEARCH_DOC_TYPE,
id=doc_id)
except Exception as e:
log.warning('error tracking %s', doc_id,
extra={'doc_class':doc['doc_class']}, exc_info=True)
raise

View File

@ -1,23 +0,0 @@
# needed so we can import superfastmatch.Client
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from superfastmatch import Client
sfm = Client(settings.SUPERFASTMATCH_URL)
class SuperFastMatchPush(Task):
""" task that pushes documents to SFM """
# results go straight to database
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])

View File

@ -1,54 +0,0 @@
"""
MongoDB handler for Python Logging
inspired by https://github.com/andreisavu/mongodb-log
"""
import logging
import datetime
import socket
import pymongo
class MongoFormatter(logging.Formatter):
def format(self, record):
""" turn a LogRecord into something mongo can store """
data = record.__dict__.copy()
data.update(
# format message
message=record.getMessage(),
# overwrite created (float) w/ a mongo-compatible datetime
created=datetime.datetime.utcnow(),
host=socket.gethostname(),
args=tuple(unicode(arg) for arg in record.args)
)
data.pop('msecs') # not needed, stored in created
# TODO: ensure everything in 'extra' is MongoDB-ready
exc_info = data.get('exc_info')
if exc_info:
data['exc_info'] = self.formatException(exc_info)
return data
class MongoHandler(logging.Handler):
def __init__(self, db, collection='logs', host='localhost', port=None,
capped_size=100000000, level=logging.NOTSET, async=True):
db = pymongo.connection.Connection(host, port)[db]
# try and create the capped log collection
if capped_size:
try:
db.create_collection(collection, capped=True, size=capped_size)
except pymongo.errors.CollectionInvalid:
pass
self.collection = db[collection]
self.async = async
logging.Handler.__init__(self, level)
self.formatter = MongoFormatter()
def emit(self, record):
# explicitly set safe=False to get async insert
# TODO: what to do if an error occurs? not safe to log-- ignore?
self.collection.save(self.format(record), safe=not self.async)

View File

@ -1,53 +1,30 @@
#!/usr/bin/env python #!/usr/bin/env python
import argparse import argparse
import traceback
import random
from celery.execute import send_task
from celery import current_app
from oyster.core import kernel from oyster.core import kernel
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='do a task for all documents in a doc_class', description='do a task for all documents in a doc_class',
) )
parser.add_argument('task', type=str, help='task name to apply') parser.add_argument('function', type=str, help='path to function to apply')
parser.add_argument('doc_class', type=str, parser.add_argument('doc_class', type=str,
help='doc_class to apply function to') help='doc_class to apply function to')
parser.add_argument('--sample', action='store_true')
parser.add_argument('--immediate', action='store_true')
args = parser.parse_args() args = parser.parse_args()
docs = kernel.db.tracked.find({'doc_class': args.doc_class, docs = kernel.db.tracked.find({'doc_class': args.doc_class})
'versions': {'$ne': []} print '%s docs in %s' % (docs.count(), args.doc_class)
}, timeout=False)
total = docs.count()
print '{0} docs in {1}'.format(total, args.doc_class)
if args.sample: path, func = args.function.rsplit('.', 1)
limit = 100 mod = __import__(path, fromlist=[func])
print 'sampling {0} documents'.format(limit) func = getattr(mod, func)
docs = docs.limit(limit).skip(random.randint(0, total-limit))
args.immediate = True
errors = 0 for doc in docs:
func(doc, kernel.get_last_version(doc))
if args.immediate: # optionally save doc?
module, name = args.task.rsplit('.', 1)
task = getattr(__import__(module, fromlist=[name]), name)
for doc in docs:
try:
task.apply((doc['_id'],), throw=True)
except Exception:
errors += 1
traceback.print_exc()
print '{0} errors in {1} documents'.format(errors, total)
else:
for doc in docs:
send_task(args.task, (doc['_id'], ))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -7,34 +7,20 @@ class S3Storage(object):
storage_type = 's3' storage_type = 's3'
def __init__(self, kernel): def __init__(self, kernel):
self.kernel = kernel
self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET) self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET)
self._bucket = False self.bucket = self.s3conn.create_bucket(settings.AWS_BUCKET)
@property
def bucket(self):
if not self._bucket:
self._bucket = self.s3conn.get_bucket(settings.AWS_BUCKET)
return self._bucket
def _get_opt(self, doc_class, setting, default=None):
""" doc_class first, then setting, then default """
return self.kernel.doc_classes[doc_class].get(setting,
getattr(settings, setting, default))
def put(self, tracked_doc, data, content_type): def put(self, tracked_doc, data, content_type):
""" upload the document to S3 """ """ upload the document to S3 """
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
k = boto.s3.key.Key(self.bucket) k = boto.s3.key.Key(self.bucket)
key_name = aws_prefix + tracked_doc['_id'] key_name = getattr(settings, 'AWS_PREFIX', '') + tracked_doc['_id']
k.key = key_name k.key = key_name
headers = {'x-amz-acl': 'public-read', headers = {'x-amz-acl': 'public-read',
'Content-Type': content_type} 'Content-Type': content_type}
k.set_contents_from_string(data, headers=headers) k.set_contents_from_string(data, headers=headers)
# can also set metadata if we want, useful? # can also set metadata if we want, useful?
url = 'http://%s.s3.amazonaws.com/%s' % (aws_bucket, key_name) url = 'http://%s.s3.amazonaws.com/%s' % (settings.AWS_BUCKET, key_name)
return url return url
def get(self, id): def get(self, id):

View File

@ -11,8 +11,10 @@ class UpdateTask(Task):
def run(self, doc_id): def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id}) doc = kernel.db.tracked.find_one({'_id': doc_id})
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
kernel.update(doc) kernel.update(doc)
for task in doc.get('post_update_tasks', []):
send_task(task, (doc_id,))
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
# don't sit on a connection # don't sit on a connection
kernel.db.connection.end_request() kernel.db.connection.end_request()
@ -22,24 +24,15 @@ class UpdateTaskScheduler(PeriodicTask):
# 60s tick # 60s tick
run_every = 60 run_every = 60
ignore_result = True
def run(self): def run(self):
# if the update queue isn't empty, wait to add more # if the update queue isn't empty, wait to add more
# (currently the only way we avoid duplicates) # (currently the only way we avoid duplicates)
# alternate option would be to set a _queued flag on documents # alternate option would be to set a _queued flag on documents
update_queue_size = kernel.db.status.find_one()['update_queue'] if kernel.db.status.find_one()['update_queue']:
if update_queue_size:
self.get_logger().debug('waiting, update_queue_size={0}'.format(
update_queue_size))
return return
next_set = kernel.get_update_queue() next_set = kernel.get_update_queue()
if next_set:
self.get_logger().debug('repopulating update_queue')
else:
self.get_logger().debug('kernel.update_queue empty')
for doc in next_set: for doc in next_set:
UpdateTask.delay(doc['_id']) UpdateTask.delay(doc['_id'])
kernel.db.status.update({}, {'$inc': {'update_queue': 1}}) kernel.db.status.update({}, {'$inc': {'update_queue': 1}})

View File

@ -1,6 +1,6 @@
<tr class="{{log.levelname.lower}}"> <tr{% if log.error %} class="error" {% endif %}>
<td>{{log.name}}</td> <td>{{log.action}}</td>
<td>{{log.message}}</td> <td><a href="{{request.script_root}}/tracked/{{log.url}}">{{log.url}}</td>
<td>{{log.created.strftime("%Y-%m-%d %H:%M:%S")}}</td> <td>{{log.timestamp.strftime("%Y-%m-%d %H:%M:%S")}}</td>
<td>{% if log.exc_info %}{{log.exc_info}}{% endif %}</td> <td>{% if log.error %}{{log.error}}{% endif %}</td>
</tr> </tr>

View File

@ -2,7 +2,7 @@ import time
import datetime import datetime
from unittest import TestCase from unittest import TestCase
from nose.tools import assert_raises, assert_equal from nose.tools import assert_raises
from oyster.core import Kernel from oyster.core import Kernel
@ -18,8 +18,9 @@ class KernelTests(TestCase):
def setUp(self): def setUp(self):
doc_classes = {'default': doc_classes = {'default':
# omit doc class, defaults to dummy {'update_mins': 30, 'storage_engine': 'dummy',
{'update_mins': 30, 'onchanged': [] }, 'onchanged': []
},
'fast-update': 'fast-update':
{'update_mins': 1 / 60., 'storage_engine': 'dummy', {'update_mins': 1 / 60., 'storage_engine': 'dummy',
'onchanged': [] 'onchanged': []
@ -44,6 +45,8 @@ class KernelTests(TestCase):
retry_attempts=7, retry_wait_minutes=8) retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1' assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017 assert c.db.connection.port == 27017
assert c.db.logs.options()['capped'] == True
assert c.db.logs.options()['size'] == 5000
assert c.retry_wait_minutes == 8 assert c.retry_wait_minutes == 8
# TODO: test retry_attempts # TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua' assert c.scraper.user_agent == 'test-ua'
@ -53,6 +56,15 @@ class KernelTests(TestCase):
# ensure that a bad document class raises an error # ensure that a bad document class raises an error
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}}) assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
def test_log(self):
self.kernel.log('action1', 'http://example.com')
self.kernel.log('action2', 'http://test.com', error=True, pi=3)
assert self.kernel.db.logs.count() == 2
x = self.kernel.db.logs.find_one({'error': True})
assert x['action'] == 'action2'
assert x['url'] == 'http://test.com'
assert x['pi'] == 3
def test_track_url(self): def test_track_url(self):
# basic insert # basic insert
id1 = self.kernel.track_url('http://example.com', 'default', pi=3) id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
@ -62,27 +74,37 @@ class KernelTests(TestCase):
assert obj['metadata'] == {'pi': 3} assert obj['metadata'] == {'pi': 3}
assert obj['versions'] == [] assert obj['versions'] == []
# logging
log = self.kernel.db.logs.find_one()
assert log['action'] == 'track'
assert log['url'] == 'http://example.com'
# track same url again with same metadata returns id # track same url again with same metadata returns id
id2 = self.kernel.track_url('http://example.com', 'default', pi=3) id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
assert id1 == id2 assert id1 == id2
# test manually set id # test setting id
out = self.kernel.track_url('http://example.com/2', 'default', out = self.kernel.track_url('http://example.com/2', 'default',
'fixed-id') 'fixed-id')
assert out == 'fixed-id' assert out == 'fixed-id'
# can't pass track same id twice with different url # can't track same URL twice with different id
assert_raises(ValueError, self.kernel.track_url, assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'http://example.com/3', 'default', 'fixed-id') 'default', 'hard-coded-id')
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
# ... or different doc class # ... with different metadata
assert_raises(ValueError, self.kernel.track_url, assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'http://example.com/2', 'change-hook', 'fixed-id') 'default')
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
# different metadata is ok, but it should be updated # ... different doc class
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id', assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
pi=3) 'special-doc-class', pi=3)
self.kernel.db.tracked.find_one({'_id': 'fixed-id'})['metadata']['pi'] == 3 # logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
def test_no_update(self): def test_no_update(self):
# update # update
@ -115,6 +137,9 @@ class KernelTests(TestCase):
assert len(newobj['versions']) == 1 assert len(newobj['versions']) == 1
# check logs
assert self.kernel.db.logs.find({'action': 'update'}).count() == 1
# and do another update.. # and do another update..
self.kernel.update(obj) self.kernel.update(obj)
@ -125,6 +150,9 @@ class KernelTests(TestCase):
newobj = self.kernel.db.tracked.find_one() newobj = self.kernel.db.tracked.find_one()
assert first_update < newobj['last_update'] assert first_update < newobj['last_update']
# check that logs updated
assert self.kernel.db.logs.find({'action': 'update'}).count() == 2
def test_update_failure(self): def test_update_failure(self):
# track a non-existent URL # track a non-existent URL
self.kernel.track_url('http://not_a_url', 'default') self.kernel.track_url('http://not_a_url', 'default')
@ -134,37 +162,41 @@ class KernelTests(TestCase):
obj = self.kernel.db.tracked.find_one() obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 1 assert obj['consecutive_errors'] == 1
# we should have logged an error too
assert self.kernel.db.logs.find({'action': 'update',
'error': {'$ne': False}}).count() == 1
# update again # update again
self.kernel.update(obj) self.kernel.update(obj)
obj = self.kernel.db.tracked.find_one() obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 2 assert obj['consecutive_errors'] == 2
#def test_update_onchanged_fire_only_on_change(self): def test_update_onchanged_fire_only_on_change(self):
# self.kernel.track_url('http://example.com', 'change-hook') self.kernel.track_url('http://example.com', 'change-hook')
# obj = self.kernel.db.tracked.find_one() obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj) self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one() doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1 assert doc['hook_fired'] == 1
# # again, we rely on example.com not updating # again, we rely on example.com not updating
# self.kernel.update(obj) self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one() doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1 assert doc['hook_fired'] == 1
#def test_update_onchanged_fire_again_on_change(self): def test_update_onchanged_fire_again_on_change(self):
# self.kernel.track_url(RANDOM_URL, 'change-hook') self.kernel.track_url(RANDOM_URL, 'change-hook')
# obj = self.kernel.db.tracked.find_one() obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj) self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one() doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1 assert doc['hook_fired'] == 1
# # we rely on this URL updating # we rely on this URL updating
# self.kernel.update(obj) self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one() doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 2 assert doc['hook_fired'] == 2
def test_get_update_queue(self): def test_get_update_queue(self):
self.kernel.track_url('never-updates', 'fast-update') self.kernel.track_url('never-updates', 'fast-update')

View File

@ -1,53 +0,0 @@
import unittest
import logging
import datetime
import pymongo
from ..mongolog import MongoHandler
class TestMongoLog(unittest.TestCase):
DB_NAME = 'oyster_test'
def setUp(self):
pymongo.Connection().drop_database(self.DB_NAME)
self.log = logging.getLogger('mongotest')
self.log.setLevel(logging.DEBUG)
self.logs = pymongo.Connection()[self.DB_NAME]['logs']
# clear handlers upon each setup
self.log.handlers = []
# async = False for testing
self.log.addHandler(MongoHandler(self.DB_NAME, capped_size=4000,
async=False))
def tearDown(self):
pymongo.Connection().drop_database(self.DB_NAME)
def test_basic_write(self):
self.log.debug('test')
self.assertEqual(self.logs.count(), 1)
self.log.debug('test')
self.assertEqual(self.logs.count(), 2)
# capped_size will limit these
self.log.debug('test'*200)
self.log.debug('test'*200)
self.assertEqual(self.logs.count(), 1)
def test_attributes(self):
self.log.debug('pi=%s', 3.14, extra={'pie':'pizza'})
logged = self.logs.find_one()
self.assertEqual(logged['message'], 'pi=3.14')
self.assertTrue(isinstance(logged['created'], datetime.datetime))
self.assertTrue('host' in logged)
self.assertEqual(logged['name'], 'mongotest')
self.assertEqual(logged['levelname'], 'DEBUG')
self.assertEqual(logged['pie'], 'pizza')
# and exc_info
try:
raise Exception('error!')
except:
self.log.warning('oh no!', exc_info=True)
logged = self.logs.find_one(sort=[('$natural', -1)])
self.assertEqual(logged['levelname'], 'WARNING')
self.assertTrue('error!' in logged['exc_info'])

View File

@ -1,21 +1,19 @@
from nose.plugins.skip import SkipTest
from oyster.conf import settings
from oyster.core import Kernel from oyster.core import Kernel
from oyster.storage.s3 import S3Storage
from oyster.storage.gridfs import GridFSStorage from oyster.storage.gridfs import GridFSStorage
from oyster.storage.dummy import DummyStorage from oyster.storage.dummy import DummyStorage
def _simple_storage_test(StorageCls): def _simple_storage_test(StorageCls):
kernel = Kernel(mongo_db='oyster_test') kernel = Kernel(mongo_db='oyster_test')
kernel.doc_classes['default'] = {}
storage = StorageCls(kernel) storage = StorageCls(kernel)
# ensure the class has a storage_type attribute # ensure the class has a storage_type attribute
assert hasattr(storage, 'storage_type') assert hasattr(storage, 'storage_type')
doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test', doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test',
'doc_class': 'default', 'metadata': {} } 'metadata': {}
}
storage_id = storage.put(doc, 'hello oyster', 'text/plain') storage_id = storage.put(doc, 'hello oyster', 'text/plain')
assert storage_id assert storage_id
@ -23,9 +21,6 @@ def _simple_storage_test(StorageCls):
def test_s3(): def test_s3():
if not hasattr(settings, 'AWS_BUCKET'):
raise SkipTest('S3 not configured')
from oyster.storage.s3 import S3Storage
_simple_storage_test(S3Storage) _simple_storage_test(S3Storage)

View File

@ -4,7 +4,7 @@ import datetime
import functools import functools
import flask import flask
import bson.objectid import pymongo.objectid
from oyster.conf import settings from oyster.conf import settings
from oyster.core import kernel from oyster.core import kernel
@ -14,7 +14,7 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj): def default(self, obj):
if isinstance(obj, datetime.datetime): if isinstance(obj, datetime.datetime):
return obj.isoformat() return obj.isoformat()
elif isinstance(obj, bson.objectid.ObjectId): elif isinstance(obj, pymongo.objectid.ObjectId):
return str(obj) return str(obj)
else: else:
return super(JSONEncoder, self).default(obj) return super(JSONEncoder, self).default(obj)
@ -51,12 +51,22 @@ def index():
status = { status = {
'tracking': kernel.db.tracked.count(), 'tracking': kernel.db.tracked.count(),
'need_update': kernel.get_update_queue_size(), 'need_update': kernel.get_update_queue_size(),
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(100)), 'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(20)),
'mongo_host': settings.MONGO_HOST, 'mongo_host': settings.MONGO_HOST,
} }
return status return status
@app.route('/status/')
@api_wrapper()
def doc_list():
status = {
'tracking': kernel.db.tracked.count(),
'need_update': kernel.get_update_queue_size(),
}
return status
@app.route('/log/') @app.route('/log/')
@api_wrapper('logs.html') @api_wrapper('logs.html')
def log_view(): def log_view():
@ -76,11 +86,23 @@ def tracked():
return json.dumps(tracked, cls=JSONEncoder) return json.dumps(tracked, cls=JSONEncoder)
@app.route('/tracked/<id>') @app.route('/tracked/<path:url>')
def tracked_view(id): def tracked_view(url):
doc = kernel.db.tracked.find_one({'_id': id}) url = _path_fixer(url)
doc = kernel.db.tracked.find_one({'url': url})
return json.dumps(doc, cls=JSONEncoder) return json.dumps(doc, cls=JSONEncoder)
@app.route('/doc/<path:url>/<version>')
def show_doc(url, version):
url = _path_fixer(url)
if version == 'latest':
version = -1
doc = kernel.get_version(url, version)
resp = flask.make_response(doc.read())
resp.headers['content-type'] = doc.content_type
return resp
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True) app.run(debug=True)

View File

@ -2,4 +2,4 @@ scrapelib
pymongo>=2.0 pymongo>=2.0
flask flask
nose nose
celery==2.5.3 celery

View File

@ -1,21 +1,11 @@
#!/usr/bin/env python #!/usr/bin/env python
import os
from setuptools import setup from setuptools import setup
from oyster import __version__
# Hack to prevent stupid "TypeError: 'NoneType' object is not callable" error
# in multiprocessing/util.py _exit_function when running `python
# setup.py test` (see
# http://www.eby-sarna.com/pipermail/peak/2010-May/003357.html)
try:
import multiprocessing
except ImportError:
pass
long_description = open('README.rst').read() long_description = open('README.rst').read()
setup(name="oyster", setup(name="oyster",
version='0.4.0-dev', version=__version__,
py_modules=['oyster'], py_modules=['oyster'],
author="James Turk", author="James Turk",
author_email='jturk@sunlightfoundation.com', author_email='jturk@sunlightfoundation.com',
@ -31,10 +21,8 @@ setup(name="oyster",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python", "Programming Language :: Python",
], ],
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.7.2", install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.5.4",
"pymongo >= 1.11", "flask", "celery"], "pymongo >= 1.11", "flask", "celery"],
tests_require=["nose"],
test_suite='nose.collector',
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
scrapeshell = scrapelib:scrapeshell scrapeshell = scrapelib:scrapeshell

10
tox.ini
View File

@ -1,10 +0,0 @@
# Tox (http://codespeak.net/~hpk/tox/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py26, py27, pypy
[testenv]
commands = python setup.py test