Compare commits

...

79 Commits

Author SHA1 Message Date
James Turk
c6e93fdd48 mark as deprecated 2012-12-05 11:14:32 -05:00
James Turk
0da1b7b0a9 travis mongodb service 2012-09-12 13:30:01 -04:00
James Turk
e84d139df0 simplify settings 2012-07-11 16:52:33 -04:00
James Turk
dd5e94cf86 elasticsearch log fix 2012-07-11 14:58:00 -04:00
James Turk
ebba8174ee pin celery 2012-06-22 14:16:24 -04:00
James Turk
2677ed18b7 bringing back basic web interface 2012-05-16 15:51:55 -04:00
James Turk
6153bdaf2a return if there's no text 2012-05-16 15:12:16 -04:00
James Turk
dfb556a49f sometimes extract_text can't get text 2012-05-16 14:36:01 -04:00
James Turk
1d441a8f86 0.4.0-dev 2012-05-16 13:47:57 -04:00
James Turk
31435df760 use python logging 2012-05-16 13:38:36 -04:00
James Turk
2846b044d2 oops, that was wrong 2012-05-16 00:19:30 -04:00
James Turk
901283ecfa python 2.6 fix 2012-05-16 00:15:34 -04:00
James Turk
2f729bfdbc add mongolog 2012-05-15 23:47:45 -04:00
James Turk
4e8f1430c4 syntax error 2012-05-15 17:42:22 -04:00
James Turk
7387aab273 fix offset 2012-05-15 17:41:55 -04:00
James Turk
4172b43c0f random sampling 2012-05-15 17:40:46 -04:00
James Turk
b4de2ee0f9 print summary 2012-05-15 17:05:19 -04:00
James Turk
f14fc1cd2b --immediate 2012-05-15 16:58:50 -04:00
James Turk
c4b7597772 throw=True 2012-05-15 16:57:04 -04:00
James Turk
9182b966e3 just call task directly 2012-05-15 16:55:49 -04:00
James Turk
aba52c28a3 __import__ task for --sample 2012-05-15 16:54:33 -04:00
James Turk
66acbde1d8 add --sample to signal.py 2012-05-15 16:38:14 -04:00
James Turk
76e172da0f use right bucket for doc class 2012-05-15 15:50:36 -04:00
James Turk
cec4bdc333 new logging 2012-05-14 16:40:13 -04:00
James Turk
5e668b3b21 fix empty versions getting tracked 2012-05-10 17:11:54 -04:00
James Turk
ec9b19d77f es fixes 2012-05-10 17:10:01 -04:00
James Turk
0cddf437fb fix log message 2012-05-10 17:07:21 -04:00
James Turk
d5d77fd79b need latest scrapelib 2012-05-10 11:38:27 -04:00
James Turk
fd8e3706bc elasticsearch v1 2012-05-10 11:38:02 -04:00
James Turk
eb3c6919ac 0.3.3-dev 2012-05-10 11:37:52 -04:00
James Turk
43d4979913 ObjectId moved to bson 2012-05-10 11:33:21 -04:00
James Turk
e6ae2ad634 resolved conflict the wrong way 2012-05-10 11:16:35 -04:00
James Turk
4206539aed fix merge conflict 2012-05-10 11:13:18 -04:00
James Turk
ebc6444bea remove cloudsearch from default apps 2012-05-10 11:08:04 -04:00
James Turk
68b3fafb59 Merge pull request #3 from msabramo/setup.py-test
Enable `python setup.py test`
2012-05-10 08:06:57 -07:00
Marc Abramowitz
cfeb6dd5ac Add .tox to .gitignore 2012-05-10 07:48:05 -07:00
Marc Abramowitz
cee1103e21 Add a tox.ini 2012-05-09 23:46:30 -07:00
Marc Abramowitz
4a4dff4e96 Enable python setup.py test 2012-05-09 22:17:34 -07:00
James Turk
2804b95c4a log.url isn't a link 2012-05-10 00:46:03 -04:00
James Turk
7537f344f8 fix tracked_view 2012-05-10 00:42:15 -04:00
James Turk
8571042b05 Merge branch 'master' of github.com:sunlightlabs/oyster 2012-05-01 16:34:00 -04:00
James Turk
75dadb2579 use ids now 2012-05-01 16:33:51 -04:00
James Turk
a4c7733618 fix superfastmatch defer syntax 2012-04-18 15:23:00 -04:00
James Turk
b697641e13 import CloudSearch 2012-04-17 18:09:58 -04:00
James Turk
9fb39db5cd don't do lookup on url if they passed an id to track_url 2012-04-17 18:09:11 -04:00
James Turk
b23b830419 index on url 2012-04-17 17:57:58 -04:00
James Turk
0d5e01f051 defer analysis in sfm Push 2012-04-17 15:53:28 -04:00
James Turk
f5cd19ba94 absolute imports 2012-04-16 17:30:20 -04:00
James Turk
85d88c1c94 superfastmatch 2012-04-16 17:08:25 -04:00
James Turk
97e164e6e6 some improvements to the cloud client 2012-04-14 00:07:39 -04:00
James Turk
574d1da843 fix auto-flush behavior 2012-04-13 23:44:36 -04:00
James Turk
95a92d124c flush_every typo 2012-04-13 23:42:50 -04:00
James Turk
8510a30175 logging and flush_every on cloud search 2012-04-13 23:41:49 -04:00
James Turk
b60ee969ca change create_bucket to get, try and avoid these 409s" 2012-04-13 22:40:07 -04:00
James Turk
2fca8a23ac fix travis 2012-04-13 17:43:29 -04:00
James Turk
b164134c2a fix storage test 2012-04-13 17:41:14 -04:00
James Turk
7922ac2da7 change how onchanged events fire 2012-04-13 17:41:07 -04:00
James Turk
232cf76a60 changelog/gitignore 2012-04-13 17:23:06 -04:00
James Turk
249b4babde cloudsearch fixes 2012-04-13 16:32:45 -04:00
James Turk
2259824019 fix signal x2 2012-04-13 16:10:13 -04:00
James Turk
c9fd748534 fix signal 2012-04-13 16:09:48 -04:00
James Turk
b1f4bb1a82 signal now send_task's 2012-04-13 14:25:33 -04:00
James Turk
d118d36e6e cloudsearch settings 2012-04-13 13:41:17 -04:00
James Turk
028a145505 change how post_update_tasks fire 2012-04-13 13:37:45 -04:00
James Turk
7b096d76d0 add oyster.ext.cloudsearch 2012-04-13 13:14:57 -04:00
James Turk
24806e6ae0 kernel.extract_text 2012-04-13 13:04:36 -04:00
James Turk
081533d647 no timeout on signal find() 2012-04-11 15:57:35 -04:00
James Turk
8a98d81801 slightly better errors on signal 2012-04-10 17:48:01 -04:00
James Turk
92cf12905b save documents after process in signal script 2012-04-10 17:36:22 -04:00
James Turk
42126e46a9 bugfix for S3 storage backend 2012-03-29 23:18:32 -04:00
James Turk
d8288c7647 0.3.2 release 2012-03-29 23:10:02 -04:00
James Turk
5472e0de60 0.3.2 2012-03-29 23:05:36 -04:00
James Turk
04a8c0123c default storage engine 2012-03-29 22:59:45 -04:00
James Turk
006a50b8e3 use doc_class AWS_* settings if present 2012-03-29 22:59:30 -04:00
James Turk
7ba0d10049 Merge branch 'master' of github.com:sunlightlabs/oyster 2012-03-21 13:19:27 -04:00
James Turk
4aaae6d6e6 changelog 2012-03-21 13:00:21 -04:00
James Turk
3a4d2d968e travis ci 2012-03-21 12:50:22 -04:00
James Turk
b077e679af skip S3 test sometimes 2012-03-21 12:49:00 -04:00
James Turk
5124fe4eab change how duplicates work 2012-03-21 12:44:32 -04:00
24 changed files with 433 additions and 172 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
celerybeat-schedule
oyster_settings.py
oyster.egg-info/
*.pyc
.tox

10
.travis.yml Normal file
View File

@ -0,0 +1,10 @@
language: python
python:
- "2.6"
- "2.7"
install: pip install scrapelib pymongo nose celery --use-mirrors
script: nosetests
services: mongodb
notifications:
email:
- jturk@sunlightfoundation.com

View File

@ -1,3 +1,5 @@
**DEPRECATED** - this project is abandoned & will not be seeing future updates
======
oyster
======

View File

@ -1,6 +1,22 @@
oyster changelog
================
0.4.0-dev
---------
* S3 storage backend bugfix
* lots of improvements to signal script
* oyster.ext cloudsearch, elasticsearch, and superfastmatch
* use python logging w/ mongo handler
* add tox/python setup.py test (thanks Marc Abramowitz!)
0.3.2
-----
**2012-03-29**
* become much more tolerant of duplicates
* skip S3 test if not prepared
* use doc_class AWS_PREFIX and AWS_BUCKET if set
* add DEFAULT_STORAGE_ENGINE setting
0.3.1
-----
**2012-03-10**

View File

@ -1,4 +1,4 @@
__version__ = "0.3.1"
__version__ = "0.4.0-dev"
import os
os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig'

View File

@ -1,13 +1,3 @@
from oyster.conf import settings
CELERY_IMPORTS = ["oyster.tasks"] + list(settings.CELERY_TASK_MODULES)
BROKER_TRANSPORT = 'mongodb'
BROKER_HOST = settings.MONGO_HOST
BROKER_PORT = settings.MONGO_PORT
CELERY_RESULT_BACKEND = 'mongodb'
CELERY_MONGODB_BACKEND_SETTINGS = {
'host': settings.MONGO_HOST,
'port': settings.MONGO_PORT,
}
CELERY_IMPORTS = ['oyster.tasks'] + list(settings.CELERY_TASK_MODULES)

View File

@ -17,3 +17,5 @@ RETRY_ATTEMPTS = 3
RETRY_WAIT_MINUTES = 60
DOCUMENT_CLASSES = {}
DEFAULT_STORAGE_ENGINE = 'dummy'

View File

@ -1,4 +1,5 @@
import datetime
import logging
import hashlib
import random
import sys
@ -6,7 +7,9 @@ import sys
import pymongo
import scrapelib
from .mongolog import MongoHandler
from .storage import engines
from celery.execute import send_task
class Kernel(object):
@ -16,27 +19,28 @@ class Kernel(object):
mongo_db='oyster', mongo_log_maxsize=100000000,
user_agent='oyster', rpm=60, timeout=300,
retry_attempts=3, retry_wait_minutes=60,
doc_classes=None,
doc_classes=None, default_storage_engine='dummy',
):
"""
configurable for ease of testing, only one should be instantiated
"""
# set up a capped log if it doesn't exist
# set up the log
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
try:
self.db.create_collection('logs', capped=True,
size=mongo_log_maxsize)
except pymongo.errors.CollectionInvalid:
# cap collection if not capped?
pass
self.log = logging.getLogger('oyster')
self.log.setLevel(logging.DEBUG)
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host,
port=mongo_port,
capped_size=mongo_log_maxsize))
# create status document if it doesn't exist
if self.db.status.count() == 0:
self.db.status.insert({'update_queue': 0})
# ensure an index on _random
self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)])
self.db.tracked.ensure_index('_random')
self.db.tracked.ensure_index('url')
self.scraper = scrapelib.Scraper(user_agent=user_agent,
requests_per_minute=rpm,
@ -53,14 +57,16 @@ class Kernel(object):
self.storage[name] = StorageCls(self)
# set document classes
_doc_class_fields = ('update_mins', 'storage_engine',
'onchanged')
_doc_class_fields = ('update_mins', 'onchanged')
self.doc_classes = doc_classes or {}
for dc_name, dc_props in self.doc_classes.iteritems():
for key in _doc_class_fields:
if key not in dc_props:
raise ValueError('doc_class %s missing key %s' %
(dc_name, key))
# set a default storage engine
if 'storage_engine' not in dc_props:
dc_props['storage_engine'] = default_storage_engine
def _wipe(self):
""" exists primarily for debug use, wipes entire db """
@ -68,14 +74,6 @@ class Kernel(object):
self.db.drop_collection('logs')
self.db.drop_collection('status')
def log(self, action, url, error=False, **kwargs):
""" add an entry to the oyster log """
kwargs['action'] = action
kwargs['url'] = url
kwargs['error'] = error
kwargs['timestamp'] = datetime.datetime.utcnow()
self.db.logs.insert(kwargs)
def _add_doc_class(self, doc_class, **properties):
self.doc_classes[doc_class] = properties
@ -91,33 +89,44 @@ class Kernel(object):
any keyword args will be added to the document's metadata
"""
if doc_class not in self.doc_classes:
raise ValueError('unregistered doc_class %s' % doc_class)
error = 'error tracking %s, unregistered doc_class %s'
self.log.error(error, url, doc_class)
raise ValueError(error % (url, doc_class))
tracked = self.db.tracked.find_one({'url': url})
# try and find an existing version of this document
tracked = None
# if data is already tracked and this is just a duplicate call
# return the original object
if id:
tracked = self.db.tracked.find_one({'_id': id})
else:
tracked = self.db.tracked.find_one({'url': url})
# if id exists, ensure that URL and doc_class haven't changed
# then return existing data (possibly with refreshed metadata)
if tracked:
# only check id if id was passed in
id_matches = (tracked['_id'] == id) if id else True
if (tracked['metadata'] == kwargs and
tracked['doc_class'] == doc_class and
id_matches):
if (tracked['url'] == url and
tracked['doc_class'] == doc_class):
if kwargs != tracked['metadata']:
tracked['metadata'] = kwargs
self.db.tracked.save(tracked, safe=True)
return tracked['_id']
else:
self.log('track', url=url, error='tracking conflict')
raise ValueError('%s is already tracked with different '
'metadata: (tracked: %r) (new: %r)' %
(url, tracked['metadata'], kwargs))
# id existed but with different URL
message = ('%s already exists with different data (tracked: '
'%s, %s) (new: %s, %s)')
args = (tracked['_id'], tracked['url'], tracked['doc_class'],
url, doc_class)
self.log.error(message, *args)
raise ValueError(message % args)
self.log('track', url=url)
self.log.info('tracked %s [%s]', url, id)
newdoc = dict(url=url, doc_class=doc_class,
_random=random.randint(0, sys.maxint),
versions=[], metadata=kwargs)
if id:
newdoc['_id'] = id
return self.db.tracked.insert(newdoc)
return self.db.tracked.insert(newdoc, safe=True)
def md5_versioning(self, olddata, newdata):
""" return True if md5 changed or if file is new """
@ -172,8 +181,8 @@ class Kernel(object):
'storage_type': storage.storage_type,
})
# fire off onchanged functions
for onchanged in doc_class['onchanged']:
onchanged(doc, newdata)
for onchanged in doc_class.get('onchanged', []):
send_task(onchanged, (doc['_id'],))
if error:
# if there's been an error, increment the consecutive_errors count
@ -193,7 +202,11 @@ class Kernel(object):
else:
doc['next_update'] = None
self.log('update', url=url, new_doc=new_version, error=error)
if error:
self.log.warning('error updating %s [%s]', url, doc['_id'])
else:
new_version = ' (new)'
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
self.db.tracked.save(doc, safe=True)
@ -242,6 +255,16 @@ class Kernel(object):
storage = self.storage[doc_class['storage_engine']]
return storage.get(doc['versions'][-1]['storage_key'])
def extract_text(self, doc):
version = self.get_last_version(doc)
doc_class = self.doc_classes[doc['doc_class']]
try:
extract_text = doc_class['extract_text']
except KeyError:
raise ValueError('doc_class %s missing extract_text' %
doc['doc_class'])
return extract_text(doc, version)
def _get_configured_kernel():
""" factory, gets a connection configured with oyster.conf.settings """
@ -256,6 +279,7 @@ def _get_configured_kernel():
retry_attempts=settings.RETRY_ATTEMPTS,
retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
doc_classes=settings.DOCUMENT_CLASSES,
default_storage_engine=settings.DEFAULT_STORAGE_ENGINE,
)
kernel = _get_configured_kernel()

0
oyster/ext/__init__.py Normal file
View File

32
oyster/ext/cloudsearch.py Normal file
View File

@ -0,0 +1,32 @@
# needed so we can import cloudsearch
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from cloudsearch import CloudSearch
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
class CloudSearchPush(Task):
""" task that updates documents """
# results go straight to database
ignore_result = True
# a bit under 1MB
MAX_BYTES = 1048000
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
pieces = [text[i:i+self.MAX_BYTES] for i in
xrange(0, len(text), self.MAX_BYTES)]
self.get_logger().debug('adding {0} pieces for {1}'.format(
len(pieces), doc_id))
for i, piece in enumerate(pieces):
cloud_id = '%s_%s' % (doc_id.lower(), i)
cs.add_document(cloud_id, text=piece, **doc['metadata'])

View File

@ -0,0 +1,36 @@
import logging
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from pyes import ES
es = ES(settings.ELASTICSEARCH_HOST)
log = logging.getLogger('oyster.ext.elasticsearch')
class ElasticSearchPush(Task):
# results go straight to elasticsearch
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
try:
text = kernel.extract_text(doc)
if not text:
log.info('no text for %s', doc_id,
extra={'doc_class':doc['doc_class']})
return
log.info('tracked %s', doc_id,
extra={'doc_class':doc['doc_class']})
es.index(dict(doc['metadata'], text=text),
settings.ELASTICSEARCH_INDEX,
settings.ELASTICSEARCH_DOC_TYPE,
id=doc_id)
except Exception as e:
log.warning('error tracking %s', doc_id,
extra={'doc_class':doc['doc_class']}, exc_info=True)
raise

View File

@ -0,0 +1,23 @@
# needed so we can import superfastmatch.Client
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from superfastmatch import Client
sfm = Client(settings.SUPERFASTMATCH_URL)
class SuperFastMatchPush(Task):
""" task that pushes documents to SFM """
# results go straight to database
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])

54
oyster/mongolog.py Normal file
View File

@ -0,0 +1,54 @@
"""
MongoDB handler for Python Logging
inspired by https://github.com/andreisavu/mongodb-log
"""
import logging
import datetime
import socket
import pymongo
class MongoFormatter(logging.Formatter):
def format(self, record):
""" turn a LogRecord into something mongo can store """
data = record.__dict__.copy()
data.update(
# format message
message=record.getMessage(),
# overwrite created (float) w/ a mongo-compatible datetime
created=datetime.datetime.utcnow(),
host=socket.gethostname(),
args=tuple(unicode(arg) for arg in record.args)
)
data.pop('msecs') # not needed, stored in created
# TODO: ensure everything in 'extra' is MongoDB-ready
exc_info = data.get('exc_info')
if exc_info:
data['exc_info'] = self.formatException(exc_info)
return data
class MongoHandler(logging.Handler):
def __init__(self, db, collection='logs', host='localhost', port=None,
capped_size=100000000, level=logging.NOTSET, async=True):
db = pymongo.connection.Connection(host, port)[db]
# try and create the capped log collection
if capped_size:
try:
db.create_collection(collection, capped=True, size=capped_size)
except pymongo.errors.CollectionInvalid:
pass
self.collection = db[collection]
self.async = async
logging.Handler.__init__(self, level)
self.formatter = MongoFormatter()
def emit(self, record):
# explicitly set safe=False to get async insert
# TODO: what to do if an error occurs? not safe to log-- ignore?
self.collection.save(self.format(record), safe=not self.async)

View File

@ -1,30 +1,53 @@
#!/usr/bin/env python
import argparse
import traceback
import random
from celery.execute import send_task
from celery import current_app
from oyster.core import kernel
def main():
parser = argparse.ArgumentParser(
description='do a task for all documents in a doc_class',
)
parser.add_argument('function', type=str, help='path to function to apply')
parser.add_argument('task', type=str, help='task name to apply')
parser.add_argument('doc_class', type=str,
help='doc_class to apply function to')
parser.add_argument('--sample', action='store_true')
parser.add_argument('--immediate', action='store_true')
args = parser.parse_args()
docs = kernel.db.tracked.find({'doc_class': args.doc_class})
print '%s docs in %s' % (docs.count(), args.doc_class)
docs = kernel.db.tracked.find({'doc_class': args.doc_class,
'versions': {'$ne': []}
}, timeout=False)
total = docs.count()
print '{0} docs in {1}'.format(total, args.doc_class)
path, func = args.function.rsplit('.', 1)
mod = __import__(path, fromlist=[func])
func = getattr(mod, func)
if args.sample:
limit = 100
print 'sampling {0} documents'.format(limit)
docs = docs.limit(limit).skip(random.randint(0, total-limit))
args.immediate = True
for doc in docs:
func(doc, kernel.get_last_version(doc))
# optionally save doc?
errors = 0
if args.immediate:
module, name = args.task.rsplit('.', 1)
task = getattr(__import__(module, fromlist=[name]), name)
for doc in docs:
try:
task.apply((doc['_id'],), throw=True)
except Exception:
errors += 1
traceback.print_exc()
print '{0} errors in {1} documents'.format(errors, total)
else:
for doc in docs:
send_task(args.task, (doc['_id'], ))
if __name__ == '__main__':
main()

View File

@ -7,20 +7,34 @@ class S3Storage(object):
storage_type = 's3'
def __init__(self, kernel):
self.kernel = kernel
self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET)
self.bucket = self.s3conn.create_bucket(settings.AWS_BUCKET)
self._bucket = False
@property
def bucket(self):
if not self._bucket:
self._bucket = self.s3conn.get_bucket(settings.AWS_BUCKET)
return self._bucket
def _get_opt(self, doc_class, setting, default=None):
""" doc_class first, then setting, then default """
return self.kernel.doc_classes[doc_class].get(setting,
getattr(settings, setting, default))
def put(self, tracked_doc, data, content_type):
""" upload the document to S3 """
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
k = boto.s3.key.Key(self.bucket)
key_name = getattr(settings, 'AWS_PREFIX', '') + tracked_doc['_id']
key_name = aws_prefix + tracked_doc['_id']
k.key = key_name
headers = {'x-amz-acl': 'public-read',
'Content-Type': content_type}
k.set_contents_from_string(data, headers=headers)
# can also set metadata if we want, useful?
url = 'http://%s.s3.amazonaws.com/%s' % (settings.AWS_BUCKET, key_name)
url = 'http://%s.s3.amazonaws.com/%s' % (aws_bucket, key_name)
return url
def get(self, id):

View File

@ -11,10 +11,8 @@ class UpdateTask(Task):
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
kernel.update(doc)
for task in doc.get('post_update_tasks', []):
send_task(task, (doc_id,))
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
kernel.update(doc)
# don't sit on a connection
kernel.db.connection.end_request()
@ -24,15 +22,24 @@ class UpdateTaskScheduler(PeriodicTask):
# 60s tick
run_every = 60
ignore_result = True
def run(self):
# if the update queue isn't empty, wait to add more
# (currently the only way we avoid duplicates)
# alternate option would be to set a _queued flag on documents
if kernel.db.status.find_one()['update_queue']:
update_queue_size = kernel.db.status.find_one()['update_queue']
if update_queue_size:
self.get_logger().debug('waiting, update_queue_size={0}'.format(
update_queue_size))
return
next_set = kernel.get_update_queue()
if next_set:
self.get_logger().debug('repopulating update_queue')
else:
self.get_logger().debug('kernel.update_queue empty')
for doc in next_set:
UpdateTask.delay(doc['_id'])
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})

View File

@ -1,6 +1,6 @@
<tr{% if log.error %} class="error" {% endif %}>
<td>{{log.action}}</td>
<td><a href="{{request.script_root}}/tracked/{{log.url}}">{{log.url}}</td>
<td>{{log.timestamp.strftime("%Y-%m-%d %H:%M:%S")}}</td>
<td>{% if log.error %}{{log.error}}{% endif %}</td>
<tr class="{{log.levelname.lower}}">
<td>{{log.name}}</td>
<td>{{log.message}}</td>
<td>{{log.created.strftime("%Y-%m-%d %H:%M:%S")}}</td>
<td>{% if log.exc_info %}{{log.exc_info}}{% endif %}</td>
</tr>

View File

@ -2,7 +2,7 @@ import time
import datetime
from unittest import TestCase
from nose.tools import assert_raises
from nose.tools import assert_raises, assert_equal
from oyster.core import Kernel
@ -18,9 +18,8 @@ class KernelTests(TestCase):
def setUp(self):
doc_classes = {'default':
{'update_mins': 30, 'storage_engine': 'dummy',
'onchanged': []
},
# omit doc class, defaults to dummy
{'update_mins': 30, 'onchanged': [] },
'fast-update':
{'update_mins': 1 / 60., 'storage_engine': 'dummy',
'onchanged': []
@ -45,8 +44,6 @@ class KernelTests(TestCase):
retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017
assert c.db.logs.options()['capped'] == True
assert c.db.logs.options()['size'] == 5000
assert c.retry_wait_minutes == 8
# TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua'
@ -56,15 +53,6 @@ class KernelTests(TestCase):
# ensure that a bad document class raises an error
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
def test_log(self):
self.kernel.log('action1', 'http://example.com')
self.kernel.log('action2', 'http://test.com', error=True, pi=3)
assert self.kernel.db.logs.count() == 2
x = self.kernel.db.logs.find_one({'error': True})
assert x['action'] == 'action2'
assert x['url'] == 'http://test.com'
assert x['pi'] == 3
def test_track_url(self):
# basic insert
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
@ -74,37 +62,27 @@ class KernelTests(TestCase):
assert obj['metadata'] == {'pi': 3}
assert obj['versions'] == []
# logging
log = self.kernel.db.logs.find_one()
assert log['action'] == 'track'
assert log['url'] == 'http://example.com'
# track same url again with same metadata returns id
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
assert id1 == id2
# test setting id
# test manually set id
out = self.kernel.track_url('http://example.com/2', 'default',
'fixed-id')
assert out == 'fixed-id'
# can't track same URL twice with different id
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'default', 'hard-coded-id')
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
# can't pass track same id twice with different url
assert_raises(ValueError, self.kernel.track_url,
'http://example.com/3', 'default', 'fixed-id')
# ... with different metadata
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'default')
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
# ... or different doc class
assert_raises(ValueError, self.kernel.track_url,
'http://example.com/2', 'change-hook', 'fixed-id')
# ... different doc class
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
'special-doc-class', pi=3)
# logged error
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
# different metadata is ok, but it should be updated
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
pi=3)
self.kernel.db.tracked.find_one({'_id': 'fixed-id'})['metadata']['pi'] == 3
def test_no_update(self):
# update
@ -137,9 +115,6 @@ class KernelTests(TestCase):
assert len(newobj['versions']) == 1
# check logs
assert self.kernel.db.logs.find({'action': 'update'}).count() == 1
# and do another update..
self.kernel.update(obj)
@ -150,9 +125,6 @@ class KernelTests(TestCase):
newobj = self.kernel.db.tracked.find_one()
assert first_update < newobj['last_update']
# check that logs updated
assert self.kernel.db.logs.find({'action': 'update'}).count() == 2
def test_update_failure(self):
# track a non-existent URL
self.kernel.track_url('http://not_a_url', 'default')
@ -162,41 +134,37 @@ class KernelTests(TestCase):
obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 1
# we should have logged an error too
assert self.kernel.db.logs.find({'action': 'update',
'error': {'$ne': False}}).count() == 1
# update again
self.kernel.update(obj)
obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 2
def test_update_onchanged_fire_only_on_change(self):
self.kernel.track_url('http://example.com', 'change-hook')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
#def test_update_onchanged_fire_only_on_change(self):
# self.kernel.track_url('http://example.com', 'change-hook')
# obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj)
doc = self.kernel.db.tracked.find_one()
assert doc['hook_fired'] == 1
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
# again, we rely on example.com not updating
self.kernel.update(obj)
doc = self.kernel.db.tracked.find_one()
assert doc['hook_fired'] == 1
# # again, we rely on example.com not updating
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
def test_update_onchanged_fire_again_on_change(self):
self.kernel.track_url(RANDOM_URL, 'change-hook')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
#def test_update_onchanged_fire_again_on_change(self):
# self.kernel.track_url(RANDOM_URL, 'change-hook')
# obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj)
doc = self.kernel.db.tracked.find_one()
assert doc['hook_fired'] == 1
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
# we rely on this URL updating
self.kernel.update(obj)
doc = self.kernel.db.tracked.find_one()
assert doc['hook_fired'] == 2
# # we rely on this URL updating
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 2
def test_get_update_queue(self):
self.kernel.track_url('never-updates', 'fast-update')

View File

@ -0,0 +1,53 @@
import unittest
import logging
import datetime
import pymongo
from ..mongolog import MongoHandler
class TestMongoLog(unittest.TestCase):
DB_NAME = 'oyster_test'
def setUp(self):
pymongo.Connection().drop_database(self.DB_NAME)
self.log = logging.getLogger('mongotest')
self.log.setLevel(logging.DEBUG)
self.logs = pymongo.Connection()[self.DB_NAME]['logs']
# clear handlers upon each setup
self.log.handlers = []
# async = False for testing
self.log.addHandler(MongoHandler(self.DB_NAME, capped_size=4000,
async=False))
def tearDown(self):
pymongo.Connection().drop_database(self.DB_NAME)
def test_basic_write(self):
self.log.debug('test')
self.assertEqual(self.logs.count(), 1)
self.log.debug('test')
self.assertEqual(self.logs.count(), 2)
# capped_size will limit these
self.log.debug('test'*200)
self.log.debug('test'*200)
self.assertEqual(self.logs.count(), 1)
def test_attributes(self):
self.log.debug('pi=%s', 3.14, extra={'pie':'pizza'})
logged = self.logs.find_one()
self.assertEqual(logged['message'], 'pi=3.14')
self.assertTrue(isinstance(logged['created'], datetime.datetime))
self.assertTrue('host' in logged)
self.assertEqual(logged['name'], 'mongotest')
self.assertEqual(logged['levelname'], 'DEBUG')
self.assertEqual(logged['pie'], 'pizza')
# and exc_info
try:
raise Exception('error!')
except:
self.log.warning('oh no!', exc_info=True)
logged = self.logs.find_one(sort=[('$natural', -1)])
self.assertEqual(logged['levelname'], 'WARNING')
self.assertTrue('error!' in logged['exc_info'])

View File

@ -1,19 +1,21 @@
from nose.plugins.skip import SkipTest
from oyster.conf import settings
from oyster.core import Kernel
from oyster.storage.s3 import S3Storage
from oyster.storage.gridfs import GridFSStorage
from oyster.storage.dummy import DummyStorage
def _simple_storage_test(StorageCls):
kernel = Kernel(mongo_db='oyster_test')
kernel.doc_classes['default'] = {}
storage = StorageCls(kernel)
# ensure the class has a storage_type attribute
assert hasattr(storage, 'storage_type')
doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test',
'metadata': {}
}
'doc_class': 'default', 'metadata': {} }
storage_id = storage.put(doc, 'hello oyster', 'text/plain')
assert storage_id
@ -21,6 +23,9 @@ def _simple_storage_test(StorageCls):
def test_s3():
if not hasattr(settings, 'AWS_BUCKET'):
raise SkipTest('S3 not configured')
from oyster.storage.s3 import S3Storage
_simple_storage_test(S3Storage)

View File

@ -4,7 +4,7 @@ import datetime
import functools
import flask
import pymongo.objectid
import bson.objectid
from oyster.conf import settings
from oyster.core import kernel
@ -14,7 +14,7 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, pymongo.objectid.ObjectId):
elif isinstance(obj, bson.objectid.ObjectId):
return str(obj)
else:
return super(JSONEncoder, self).default(obj)
@ -51,22 +51,12 @@ def index():
status = {
'tracking': kernel.db.tracked.count(),
'need_update': kernel.get_update_queue_size(),
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(20)),
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(100)),
'mongo_host': settings.MONGO_HOST,
}
return status
@app.route('/status/')
@api_wrapper()
def doc_list():
status = {
'tracking': kernel.db.tracked.count(),
'need_update': kernel.get_update_queue_size(),
}
return status
@app.route('/log/')
@api_wrapper('logs.html')
def log_view():
@ -86,23 +76,11 @@ def tracked():
return json.dumps(tracked, cls=JSONEncoder)
@app.route('/tracked/<path:url>')
def tracked_view(url):
url = _path_fixer(url)
doc = kernel.db.tracked.find_one({'url': url})
@app.route('/tracked/<id>')
def tracked_view(id):
doc = kernel.db.tracked.find_one({'_id': id})
return json.dumps(doc, cls=JSONEncoder)
@app.route('/doc/<path:url>/<version>')
def show_doc(url, version):
url = _path_fixer(url)
if version == 'latest':
version = -1
doc = kernel.get_version(url, version)
resp = flask.make_response(doc.read())
resp.headers['content-type'] = doc.content_type
return resp
if __name__ == '__main__':
app.run(debug=True)

View File

@ -2,4 +2,4 @@ scrapelib
pymongo>=2.0
flask
nose
celery
celery==2.5.3

View File

@ -1,11 +1,21 @@
#!/usr/bin/env python
import os
from setuptools import setup
from oyster import __version__
# Hack to prevent stupid "TypeError: 'NoneType' object is not callable" error
# in multiprocessing/util.py _exit_function when running `python
# setup.py test` (see
# http://www.eby-sarna.com/pipermail/peak/2010-May/003357.html)
try:
import multiprocessing
except ImportError:
pass
long_description = open('README.rst').read()
setup(name="oyster",
version=__version__,
version='0.4.0-dev',
py_modules=['oyster'],
author="James Turk",
author_email='jturk@sunlightfoundation.com',
@ -21,8 +31,10 @@ setup(name="oyster",
"Operating System :: OS Independent",
"Programming Language :: Python",
],
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.5.4",
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.7.2",
"pymongo >= 1.11", "flask", "celery"],
tests_require=["nose"],
test_suite='nose.collector',
entry_points="""
[console_scripts]
scrapeshell = scrapelib:scrapeshell

10
tox.ini Normal file
View File

@ -0,0 +1,10 @@
# Tox (http://codespeak.net/~hpk/tox/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py26, py27, pypy
[testenv]
commands = python setup.py test