Compare commits
91 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c6e93fdd48 | ||
![]() |
0da1b7b0a9 | ||
![]() |
e84d139df0 | ||
![]() |
dd5e94cf86 | ||
![]() |
ebba8174ee | ||
![]() |
2677ed18b7 | ||
![]() |
6153bdaf2a | ||
![]() |
dfb556a49f | ||
![]() |
1d441a8f86 | ||
![]() |
31435df760 | ||
![]() |
2846b044d2 | ||
![]() |
901283ecfa | ||
![]() |
2f729bfdbc | ||
![]() |
4e8f1430c4 | ||
![]() |
7387aab273 | ||
![]() |
4172b43c0f | ||
![]() |
b4de2ee0f9 | ||
![]() |
f14fc1cd2b | ||
![]() |
c4b7597772 | ||
![]() |
9182b966e3 | ||
![]() |
aba52c28a3 | ||
![]() |
66acbde1d8 | ||
![]() |
76e172da0f | ||
![]() |
cec4bdc333 | ||
![]() |
5e668b3b21 | ||
![]() |
ec9b19d77f | ||
![]() |
0cddf437fb | ||
![]() |
d5d77fd79b | ||
![]() |
fd8e3706bc | ||
![]() |
eb3c6919ac | ||
![]() |
43d4979913 | ||
![]() |
e6ae2ad634 | ||
![]() |
4206539aed | ||
![]() |
ebc6444bea | ||
![]() |
68b3fafb59 | ||
![]() |
cfeb6dd5ac | ||
![]() |
cee1103e21 | ||
![]() |
4a4dff4e96 | ||
![]() |
2804b95c4a | ||
![]() |
7537f344f8 | ||
![]() |
8571042b05 | ||
![]() |
75dadb2579 | ||
![]() |
a4c7733618 | ||
![]() |
b697641e13 | ||
![]() |
9fb39db5cd | ||
![]() |
b23b830419 | ||
![]() |
0d5e01f051 | ||
![]() |
f5cd19ba94 | ||
![]() |
85d88c1c94 | ||
![]() |
97e164e6e6 | ||
![]() |
574d1da843 | ||
![]() |
95a92d124c | ||
![]() |
8510a30175 | ||
![]() |
b60ee969ca | ||
![]() |
2fca8a23ac | ||
![]() |
b164134c2a | ||
![]() |
7922ac2da7 | ||
![]() |
232cf76a60 | ||
![]() |
249b4babde | ||
![]() |
2259824019 | ||
![]() |
c9fd748534 | ||
![]() |
b1f4bb1a82 | ||
![]() |
d118d36e6e | ||
![]() |
028a145505 | ||
![]() |
7b096d76d0 | ||
![]() |
24806e6ae0 | ||
![]() |
081533d647 | ||
![]() |
8a98d81801 | ||
![]() |
92cf12905b | ||
![]() |
42126e46a9 | ||
![]() |
d8288c7647 | ||
![]() |
5472e0de60 | ||
![]() |
04a8c0123c | ||
![]() |
006a50b8e3 | ||
![]() |
7ba0d10049 | ||
![]() |
4aaae6d6e6 | ||
![]() |
3a4d2d968e | ||
![]() |
b077e679af | ||
![]() |
5124fe4eab | ||
![]() |
cca54b6447 | ||
![]() |
22800601df | ||
![]() |
981e2cc88f | ||
![]() |
45a93fcc68 | ||
![]() |
8e4c020f8f | ||
![]() |
8c589c1ccd | ||
![]() |
05975d878a | ||
![]() |
f11b29dea3 | ||
![]() |
cdb7fda3be | ||
![]() |
cab149e241 | ||
![]() |
cedc71bf64 | ||
![]() |
2f3825d3e2 |
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,3 +1,5 @@
|
||||
celerybeat-schedule
|
||||
oyster_settings.py
|
||||
oyster.egg-info/
|
||||
*.pyc
|
||||
.tox
|
||||
|
10
.travis.yml
Normal file
10
.travis.yml
Normal file
@ -0,0 +1,10 @@
|
||||
language: python
|
||||
python:
|
||||
- "2.6"
|
||||
- "2.7"
|
||||
install: pip install scrapelib pymongo nose celery --use-mirrors
|
||||
script: nosetests
|
||||
services: mongodb
|
||||
notifications:
|
||||
email:
|
||||
- jturk@sunlightfoundation.com
|
@ -1,3 +1,5 @@
|
||||
**DEPRECATED** - this project is abandoned & will not be seeing future updates
|
||||
|
||||
======
|
||||
oyster
|
||||
======
|
||||
|
59
changelog.rst
Normal file
59
changelog.rst
Normal file
@ -0,0 +1,59 @@
|
||||
oyster changelog
|
||||
================
|
||||
|
||||
0.4.0-dev
|
||||
---------
|
||||
* S3 storage backend bugfix
|
||||
* lots of improvements to signal script
|
||||
* oyster.ext cloudsearch, elasticsearch, and superfastmatch
|
||||
* use python logging w/ mongo handler
|
||||
* add tox/python setup.py test (thanks Marc Abramowitz!)
|
||||
|
||||
0.3.2
|
||||
-----
|
||||
**2012-03-29**
|
||||
* become much more tolerant of duplicates
|
||||
* skip S3 test if not prepared
|
||||
* use doc_class AWS_PREFIX and AWS_BUCKET if set
|
||||
* add DEFAULT_STORAGE_ENGINE setting
|
||||
|
||||
0.3.1
|
||||
-----
|
||||
**2012-03-10**
|
||||
* add validation of doc_class
|
||||
* add ability to do one-time updates
|
||||
* change how hooks work
|
||||
* introduce concept of scripts
|
||||
* call pymongo's end_request in long running threads
|
||||
* better error messages for duplicate URLs
|
||||
* lots of flake8-inspired fixes
|
||||
* S3 backend: add support for AWS_PREFIX
|
||||
|
||||
0.3.0
|
||||
-----
|
||||
**2012-02-21**
|
||||
* switch Connection to Kernel
|
||||
* add concept of doc_class
|
||||
* make storage pluggable instead of GridFS
|
||||
* add S3 backend
|
||||
* add Dummy backend
|
||||
* delete obsolete ExternalStoreTask
|
||||
* addition of onchanged hook
|
||||
* allow id to be set manually
|
||||
|
||||
0.2.5
|
||||
-----
|
||||
**2011-10-06**
|
||||
* lots of fixes to web frontend
|
||||
* ExternalStoreTask
|
||||
|
||||
0.2.0
|
||||
-----
|
||||
**2011-09-20**
|
||||
* major refactor: oysterd replaced by celery
|
||||
* fix retries
|
||||
|
||||
0.1.0
|
||||
-----
|
||||
**2011-08-05**
|
||||
* initial release, basic document tracking
|
@ -1,4 +1,4 @@
|
||||
__version__ = "0.3.0"
|
||||
__version__ = "0.4.0-dev"
|
||||
|
||||
import os
|
||||
os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig'
|
||||
|
@ -1,13 +1,3 @@
|
||||
from oyster.conf import settings
|
||||
|
||||
CELERY_IMPORTS = ["oyster.tasks"] + list(settings.CELERY_TASK_MODULES)
|
||||
|
||||
BROKER_TRANSPORT = 'mongodb'
|
||||
BROKER_HOST = settings.MONGO_HOST
|
||||
BROKER_PORT = settings.MONGO_PORT
|
||||
|
||||
CELERY_RESULT_BACKEND = 'mongodb'
|
||||
CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||
'host': settings.MONGO_HOST,
|
||||
'port': settings.MONGO_PORT,
|
||||
}
|
||||
CELERY_IMPORTS = ['oyster.tasks'] + list(settings.CELERY_TASK_MODULES)
|
||||
|
@ -1,5 +1,6 @@
|
||||
from oyster.conf import default_settings
|
||||
|
||||
|
||||
class Settings(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
@ -17,3 +17,5 @@ RETRY_ATTEMPTS = 3
|
||||
RETRY_WAIT_MINUTES = 60
|
||||
|
||||
DOCUMENT_CLASSES = {}
|
||||
|
||||
DEFAULT_STORAGE_ENGINE = 'dummy'
|
||||
|
136
oyster/core.py
136
oyster/core.py
@ -1,13 +1,16 @@
|
||||
import datetime
|
||||
import logging
|
||||
import hashlib
|
||||
import random
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
import pymongo
|
||||
import scrapelib
|
||||
|
||||
from .mongolog import MongoHandler
|
||||
from .storage import engines
|
||||
from celery.execute import send_task
|
||||
|
||||
|
||||
class Kernel(object):
|
||||
""" oyster's workhorse, handles tracking """
|
||||
@ -16,27 +19,28 @@ class Kernel(object):
|
||||
mongo_db='oyster', mongo_log_maxsize=100000000,
|
||||
user_agent='oyster', rpm=60, timeout=300,
|
||||
retry_attempts=3, retry_wait_minutes=60,
|
||||
doc_classes=None,
|
||||
doc_classes=None, default_storage_engine='dummy',
|
||||
):
|
||||
"""
|
||||
configurable for ease of testing, only one should be instantiated
|
||||
"""
|
||||
|
||||
# set up a capped log if it doesn't exist
|
||||
# set up the log
|
||||
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
|
||||
try:
|
||||
self.db.create_collection('logs', capped=True,
|
||||
size=mongo_log_maxsize)
|
||||
except pymongo.errors.CollectionInvalid:
|
||||
# cap collection if not capped?
|
||||
pass
|
||||
|
||||
self.log = logging.getLogger('oyster')
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host,
|
||||
port=mongo_port,
|
||||
capped_size=mongo_log_maxsize))
|
||||
|
||||
# create status document if it doesn't exist
|
||||
if self.db.status.count() == 0:
|
||||
self.db.status.insert({'update_queue': 0})
|
||||
|
||||
# ensure an index on _random
|
||||
self.db.tracked.ensure_index([('_random', pymongo.ASCENDING)])
|
||||
self.db.tracked.ensure_index('_random')
|
||||
self.db.tracked.ensure_index('url')
|
||||
|
||||
self.scraper = scrapelib.Scraper(user_agent=user_agent,
|
||||
requests_per_minute=rpm,
|
||||
@ -53,15 +57,16 @@ class Kernel(object):
|
||||
self.storage[name] = StorageCls(self)
|
||||
|
||||
# set document classes
|
||||
_doc_class_fields = ('update_mins', 'storage_engine',
|
||||
'onchanged')
|
||||
_doc_class_fields = ('update_mins', 'onchanged')
|
||||
self.doc_classes = doc_classes or {}
|
||||
for dc_name, dc_props in self.doc_classes.iteritems():
|
||||
for key in _doc_class_fields:
|
||||
if key not in dc_props:
|
||||
raise ValueError('doc_class %s missing key %s' %
|
||||
(dc_name, key))
|
||||
|
||||
# set a default storage engine
|
||||
if 'storage_engine' not in dc_props:
|
||||
dc_props['storage_engine'] = default_storage_engine
|
||||
|
||||
def _wipe(self):
|
||||
""" exists primarily for debug use, wipes entire db """
|
||||
@ -69,20 +74,9 @@ class Kernel(object):
|
||||
self.db.drop_collection('logs')
|
||||
self.db.drop_collection('status')
|
||||
|
||||
|
||||
def log(self, action, url, error=False, **kwargs):
|
||||
""" add an entry to the oyster log """
|
||||
kwargs['action'] = action
|
||||
kwargs['url'] = url
|
||||
kwargs['error'] = error
|
||||
kwargs['timestamp'] = datetime.datetime.utcnow()
|
||||
self.db.logs.insert(kwargs)
|
||||
|
||||
|
||||
def _add_doc_class(self, doc_class, **properties):
|
||||
self.doc_classes[doc_class] = properties
|
||||
|
||||
|
||||
def track_url(self, url, doc_class, id=None, **kwargs):
|
||||
"""
|
||||
Add a URL to the set of tracked URLs, accessible via a given filename.
|
||||
@ -94,31 +88,45 @@ class Kernel(object):
|
||||
**kwargs
|
||||
any keyword args will be added to the document's metadata
|
||||
"""
|
||||
tracked = self.db.tracked.find_one({'url': url})
|
||||
if doc_class not in self.doc_classes:
|
||||
error = 'error tracking %s, unregistered doc_class %s'
|
||||
self.log.error(error, url, doc_class)
|
||||
raise ValueError(error % (url, doc_class))
|
||||
|
||||
# if data is already tracked and this is just a duplicate call
|
||||
# return the original object
|
||||
# try and find an existing version of this document
|
||||
tracked = None
|
||||
|
||||
if id:
|
||||
tracked = self.db.tracked.find_one({'_id': id})
|
||||
else:
|
||||
tracked = self.db.tracked.find_one({'url': url})
|
||||
|
||||
# if id exists, ensure that URL and doc_class haven't changed
|
||||
# then return existing data (possibly with refreshed metadata)
|
||||
if tracked:
|
||||
# only check id if id was passed in
|
||||
id_matches = (tracked['_id'] == id) if id else True
|
||||
if (tracked['metadata'] == kwargs and
|
||||
tracked['doc_class'] == doc_class and
|
||||
id_matches):
|
||||
if (tracked['url'] == url and
|
||||
tracked['doc_class'] == doc_class):
|
||||
if kwargs != tracked['metadata']:
|
||||
tracked['metadata'] = kwargs
|
||||
self.db.tracked.save(tracked, safe=True)
|
||||
return tracked['_id']
|
||||
else:
|
||||
self.log('track', url=url, error='tracking conflict')
|
||||
raise ValueError('%s is already tracked with different '
|
||||
'metadata' % url)
|
||||
# id existed but with different URL
|
||||
message = ('%s already exists with different data (tracked: '
|
||||
'%s, %s) (new: %s, %s)')
|
||||
args = (tracked['_id'], tracked['url'], tracked['doc_class'],
|
||||
url, doc_class)
|
||||
self.log.error(message, *args)
|
||||
raise ValueError(message % args)
|
||||
|
||||
self.log('track', url=url)
|
||||
self.log.info('tracked %s [%s]', url, id)
|
||||
|
||||
newdoc = dict(url=url, doc_class=doc_class,
|
||||
_random=random.randint(0, sys.maxint),
|
||||
versions=[], metadata=kwargs)
|
||||
if id:
|
||||
newdoc['_id'] = id
|
||||
return self.db.tracked.insert(newdoc)
|
||||
|
||||
return self.db.tracked.insert(newdoc, safe=True)
|
||||
|
||||
def md5_versioning(self, olddata, newdata):
|
||||
""" return True if md5 changed or if file is new """
|
||||
@ -126,7 +134,6 @@ class Kernel(object):
|
||||
new_md5 = hashlib.md5(newdata).hexdigest()
|
||||
return old_md5 != new_md5
|
||||
|
||||
|
||||
def update(self, doc):
|
||||
"""
|
||||
perform update upon a given document
|
||||
@ -174,8 +181,8 @@ class Kernel(object):
|
||||
'storage_type': storage.storage_type,
|
||||
})
|
||||
# fire off onchanged functions
|
||||
for onchanged in doc_class['onchanged']:
|
||||
onchanged(doc)
|
||||
for onchanged in doc_class.get('onchanged', []):
|
||||
send_task(onchanged, (doc['_id'],))
|
||||
|
||||
if error:
|
||||
# if there's been an error, increment the consecutive_errors count
|
||||
@ -183,20 +190,26 @@ class Kernel(object):
|
||||
c_errors = doc.get('consecutive_errors', 0)
|
||||
doc['consecutive_errors'] = c_errors + 1
|
||||
if c_errors <= self.retry_attempts:
|
||||
update_mins = self.retry_wait_minutes * (2**c_errors)
|
||||
update_mins = self.retry_wait_minutes * (2 ** c_errors)
|
||||
else:
|
||||
# reset error count if all was ok
|
||||
doc['consecutive_errors'] = 0
|
||||
|
||||
# last_update/next_update are separate from question of versioning
|
||||
doc['last_update'] = now
|
||||
doc['next_update'] = now + datetime.timedelta(minutes=update_mins)
|
||||
if update_mins:
|
||||
doc['next_update'] = now + datetime.timedelta(minutes=update_mins)
|
||||
else:
|
||||
doc['next_update'] = None
|
||||
|
||||
self.log('update', url=url, new_doc=new_version, error=error)
|
||||
if error:
|
||||
self.log.warning('error updating %s [%s]', url, doc['_id'])
|
||||
else:
|
||||
new_version = ' (new)'
|
||||
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
|
||||
|
||||
self.db.tracked.save(doc, safe=True)
|
||||
|
||||
|
||||
def get_update_queue(self):
|
||||
"""
|
||||
Get a list of what needs to be updated.
|
||||
@ -214,23 +227,43 @@ class Kernel(object):
|
||||
queue = list(new)
|
||||
|
||||
# pull the rest from those for which next_update is in the past
|
||||
next = self.db.tracked.find({'next_update':
|
||||
{'$lt': datetime.datetime.utcnow()}}).sort('_random')
|
||||
next = self.db.tracked.find({'$and': [
|
||||
{'next_update': {'$ne': None}},
|
||||
{'next_update': {'$lt': datetime.datetime.utcnow()}},
|
||||
]}).sort('_random')
|
||||
queue.extend(next)
|
||||
|
||||
return queue
|
||||
|
||||
|
||||
def get_update_queue_size(self):
|
||||
"""
|
||||
Get the size of the update queue, this should match
|
||||
``len(self.get_update_queue())``, but is computed more efficiently.
|
||||
"""
|
||||
new = self.db.tracked.find({'next_update': {'$exists': False}}).count()
|
||||
next = self.db.tracked.find({'next_update':
|
||||
{'$lt': datetime.datetime.utcnow()}}).count()
|
||||
return new+next
|
||||
next = self.db.tracked.find({'$and': [
|
||||
{'next_update': {'$ne': None}},
|
||||
{'next_update': {'$lt': datetime.datetime.utcnow()}},
|
||||
]}).count()
|
||||
return new + next
|
||||
|
||||
def get_last_version(self, doc):
|
||||
try:
|
||||
doc_class = self.doc_classes[doc['doc_class']]
|
||||
except KeyError:
|
||||
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
|
||||
storage = self.storage[doc_class['storage_engine']]
|
||||
return storage.get(doc['versions'][-1]['storage_key'])
|
||||
|
||||
def extract_text(self, doc):
|
||||
version = self.get_last_version(doc)
|
||||
doc_class = self.doc_classes[doc['doc_class']]
|
||||
try:
|
||||
extract_text = doc_class['extract_text']
|
||||
except KeyError:
|
||||
raise ValueError('doc_class %s missing extract_text' %
|
||||
doc['doc_class'])
|
||||
return extract_text(doc, version)
|
||||
|
||||
|
||||
def _get_configured_kernel():
|
||||
@ -246,6 +279,7 @@ def _get_configured_kernel():
|
||||
retry_attempts=settings.RETRY_ATTEMPTS,
|
||||
retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
|
||||
doc_classes=settings.DOCUMENT_CLASSES,
|
||||
default_storage_engine=settings.DEFAULT_STORAGE_ENGINE,
|
||||
)
|
||||
|
||||
kernel = _get_configured_kernel()
|
||||
|
0
oyster/ext/__init__.py
Normal file
0
oyster/ext/__init__.py
Normal file
32
oyster/ext/cloudsearch.py
Normal file
32
oyster/ext/cloudsearch.py
Normal file
@ -0,0 +1,32 @@
|
||||
# needed so we can import cloudsearch
|
||||
from __future__ import absolute_import
|
||||
|
||||
from celery.task.base import Task
|
||||
|
||||
from ..core import kernel
|
||||
from ..conf import settings
|
||||
|
||||
from cloudsearch import CloudSearch
|
||||
|
||||
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
|
||||
|
||||
|
||||
class CloudSearchPush(Task):
|
||||
""" task that updates documents """
|
||||
# results go straight to database
|
||||
ignore_result = True
|
||||
|
||||
# a bit under 1MB
|
||||
MAX_BYTES = 1048000
|
||||
|
||||
def run(self, doc_id):
|
||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||
text = kernel.extract_text(doc)
|
||||
pieces = [text[i:i+self.MAX_BYTES] for i in
|
||||
xrange(0, len(text), self.MAX_BYTES)]
|
||||
|
||||
self.get_logger().debug('adding {0} pieces for {1}'.format(
|
||||
len(pieces), doc_id))
|
||||
for i, piece in enumerate(pieces):
|
||||
cloud_id = '%s_%s' % (doc_id.lower(), i)
|
||||
cs.add_document(cloud_id, text=piece, **doc['metadata'])
|
36
oyster/ext/elasticsearch.py
Normal file
36
oyster/ext/elasticsearch.py
Normal file
@ -0,0 +1,36 @@
|
||||
import logging
|
||||
from celery.task.base import Task
|
||||
|
||||
from ..core import kernel
|
||||
from ..conf import settings
|
||||
|
||||
from pyes import ES
|
||||
|
||||
es = ES(settings.ELASTICSEARCH_HOST)
|
||||
log = logging.getLogger('oyster.ext.elasticsearch')
|
||||
|
||||
class ElasticSearchPush(Task):
|
||||
# results go straight to elasticsearch
|
||||
ignore_result = True
|
||||
|
||||
def run(self, doc_id):
|
||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||
|
||||
try:
|
||||
text = kernel.extract_text(doc)
|
||||
if not text:
|
||||
log.info('no text for %s', doc_id,
|
||||
extra={'doc_class':doc['doc_class']})
|
||||
return
|
||||
|
||||
log.info('tracked %s', doc_id,
|
||||
extra={'doc_class':doc['doc_class']})
|
||||
|
||||
es.index(dict(doc['metadata'], text=text),
|
||||
settings.ELASTICSEARCH_INDEX,
|
||||
settings.ELASTICSEARCH_DOC_TYPE,
|
||||
id=doc_id)
|
||||
except Exception as e:
|
||||
log.warning('error tracking %s', doc_id,
|
||||
extra={'doc_class':doc['doc_class']}, exc_info=True)
|
||||
raise
|
23
oyster/ext/superfastmatch.py
Normal file
23
oyster/ext/superfastmatch.py
Normal file
@ -0,0 +1,23 @@
|
||||
# needed so we can import superfastmatch.Client
|
||||
from __future__ import absolute_import
|
||||
from celery.task.base import Task
|
||||
|
||||
from ..core import kernel
|
||||
from ..conf import settings
|
||||
|
||||
from superfastmatch import Client
|
||||
|
||||
sfm = Client(settings.SUPERFASTMATCH_URL)
|
||||
|
||||
|
||||
class SuperFastMatchPush(Task):
|
||||
""" task that pushes documents to SFM """
|
||||
|
||||
# results go straight to database
|
||||
ignore_result = True
|
||||
|
||||
def run(self, doc_id):
|
||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||
text = kernel.extract_text(doc)
|
||||
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
|
||||
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])
|
54
oyster/mongolog.py
Normal file
54
oyster/mongolog.py
Normal file
@ -0,0 +1,54 @@
|
||||
"""
|
||||
MongoDB handler for Python Logging
|
||||
|
||||
inspired by https://github.com/andreisavu/mongodb-log
|
||||
"""
|
||||
|
||||
import logging
|
||||
import datetime
|
||||
import socket
|
||||
import pymongo
|
||||
|
||||
|
||||
class MongoFormatter(logging.Formatter):
|
||||
|
||||
def format(self, record):
|
||||
""" turn a LogRecord into something mongo can store """
|
||||
data = record.__dict__.copy()
|
||||
|
||||
data.update(
|
||||
# format message
|
||||
message=record.getMessage(),
|
||||
# overwrite created (float) w/ a mongo-compatible datetime
|
||||
created=datetime.datetime.utcnow(),
|
||||
host=socket.gethostname(),
|
||||
args=tuple(unicode(arg) for arg in record.args)
|
||||
)
|
||||
data.pop('msecs') # not needed, stored in created
|
||||
|
||||
# TODO: ensure everything in 'extra' is MongoDB-ready
|
||||
exc_info = data.get('exc_info')
|
||||
if exc_info:
|
||||
data['exc_info'] = self.formatException(exc_info)
|
||||
return data
|
||||
|
||||
|
||||
class MongoHandler(logging.Handler):
|
||||
def __init__(self, db, collection='logs', host='localhost', port=None,
|
||||
capped_size=100000000, level=logging.NOTSET, async=True):
|
||||
db = pymongo.connection.Connection(host, port)[db]
|
||||
# try and create the capped log collection
|
||||
if capped_size:
|
||||
try:
|
||||
db.create_collection(collection, capped=True, size=capped_size)
|
||||
except pymongo.errors.CollectionInvalid:
|
||||
pass
|
||||
self.collection = db[collection]
|
||||
self.async = async
|
||||
logging.Handler.__init__(self, level)
|
||||
self.formatter = MongoFormatter()
|
||||
|
||||
def emit(self, record):
|
||||
# explicitly set safe=False to get async insert
|
||||
# TODO: what to do if an error occurs? not safe to log-- ignore?
|
||||
self.collection.save(self.format(record), safe=not self.async)
|
0
oyster/scripts/__init__.py
Normal file
0
oyster/scripts/__init__.py
Normal file
53
oyster/scripts/signal.py
Normal file
53
oyster/scripts/signal.py
Normal file
@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env python
|
||||
import argparse
|
||||
import traceback
|
||||
import random
|
||||
from celery.execute import send_task
|
||||
from celery import current_app
|
||||
|
||||
from oyster.core import kernel
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='do a task for all documents in a doc_class',
|
||||
)
|
||||
|
||||
parser.add_argument('task', type=str, help='task name to apply')
|
||||
parser.add_argument('doc_class', type=str,
|
||||
help='doc_class to apply function to')
|
||||
parser.add_argument('--sample', action='store_true')
|
||||
parser.add_argument('--immediate', action='store_true')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
docs = kernel.db.tracked.find({'doc_class': args.doc_class,
|
||||
'versions': {'$ne': []}
|
||||
}, timeout=False)
|
||||
total = docs.count()
|
||||
print '{0} docs in {1}'.format(total, args.doc_class)
|
||||
|
||||
if args.sample:
|
||||
limit = 100
|
||||
print 'sampling {0} documents'.format(limit)
|
||||
docs = docs.limit(limit).skip(random.randint(0, total-limit))
|
||||
args.immediate = True
|
||||
|
||||
errors = 0
|
||||
|
||||
if args.immediate:
|
||||
module, name = args.task.rsplit('.', 1)
|
||||
task = getattr(__import__(module, fromlist=[name]), name)
|
||||
for doc in docs:
|
||||
try:
|
||||
task.apply((doc['_id'],), throw=True)
|
||||
except Exception:
|
||||
errors += 1
|
||||
traceback.print_exc()
|
||||
print '{0} errors in {1} documents'.format(errors, total)
|
||||
|
||||
else:
|
||||
for doc in docs:
|
||||
send_task(args.task, (doc['_id'], ))
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,8 +1,3 @@
|
||||
import urllib
|
||||
import boto
|
||||
from oyster.conf import settings
|
||||
|
||||
|
||||
class DummyStorage(object):
|
||||
""" should NOT be used outside of testing """
|
||||
|
||||
|
@ -2,8 +2,8 @@ from __future__ import absolute_import
|
||||
|
||||
import gridfs
|
||||
|
||||
class GridFSStorage(object):
|
||||
|
||||
class GridFSStorage(object):
|
||||
storage_type = 'gridfs'
|
||||
|
||||
def __init__(self, kernel):
|
||||
|
@ -7,20 +7,34 @@ class S3Storage(object):
|
||||
storage_type = 's3'
|
||||
|
||||
def __init__(self, kernel):
|
||||
self.kernel = kernel
|
||||
self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET)
|
||||
self.bucket = self.s3conn.create_bucket(settings.AWS_BUCKET)
|
||||
self._bucket = False
|
||||
|
||||
@property
|
||||
def bucket(self):
|
||||
if not self._bucket:
|
||||
self._bucket = self.s3conn.get_bucket(settings.AWS_BUCKET)
|
||||
return self._bucket
|
||||
|
||||
def _get_opt(self, doc_class, setting, default=None):
|
||||
""" doc_class first, then setting, then default """
|
||||
return self.kernel.doc_classes[doc_class].get(setting,
|
||||
getattr(settings, setting, default))
|
||||
|
||||
def put(self, tracked_doc, data, content_type):
|
||||
""" upload the document to S3 """
|
||||
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
|
||||
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
|
||||
k = boto.s3.key.Key(self.bucket)
|
||||
k.key = tracked_doc['_id']
|
||||
key_name = aws_prefix + tracked_doc['_id']
|
||||
k.key = key_name
|
||||
headers = {'x-amz-acl': 'public-read',
|
||||
'Content-Type': content_type}
|
||||
k.set_contents_from_string(data, headers=headers)
|
||||
# can also set metadata if we want, useful?
|
||||
|
||||
url = 'http://%s.s3.amazonaws.com/%s' % (settings.AWS_BUCKET,
|
||||
tracked_doc['_id'])
|
||||
url = 'http://%s.s3.amazonaws.com/%s' % (aws_bucket, key_name)
|
||||
return url
|
||||
|
||||
def get(self, id):
|
||||
|
@ -1,8 +1,6 @@
|
||||
from celery.task.base import Task, PeriodicTask
|
||||
from celery.execute import send_task
|
||||
|
||||
from pymongo.objectid import ObjectId
|
||||
|
||||
from oyster.core import kernel
|
||||
|
||||
|
||||
@ -13,10 +11,10 @@ class UpdateTask(Task):
|
||||
|
||||
def run(self, doc_id):
|
||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||
kernel.update(doc)
|
||||
for task in doc.get('post_update_tasks', []):
|
||||
send_task(hook, (doc_id,))
|
||||
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
|
||||
kernel.update(doc)
|
||||
# don't sit on a connection
|
||||
kernel.db.connection.end_request()
|
||||
|
||||
|
||||
class UpdateTaskScheduler(PeriodicTask):
|
||||
@ -24,15 +22,26 @@ class UpdateTaskScheduler(PeriodicTask):
|
||||
|
||||
# 60s tick
|
||||
run_every = 60
|
||||
ignore_result = True
|
||||
|
||||
def run(self):
|
||||
# if the update queue isn't empty, wait to add more
|
||||
# (currently the only way we avoid duplicates)
|
||||
# alternate option would be to set a _queued flag on documents
|
||||
if kernel.db.status.find_one()['update_queue']:
|
||||
update_queue_size = kernel.db.status.find_one()['update_queue']
|
||||
if update_queue_size:
|
||||
self.get_logger().debug('waiting, update_queue_size={0}'.format(
|
||||
update_queue_size))
|
||||
return
|
||||
|
||||
next_set = kernel.get_update_queue()
|
||||
if next_set:
|
||||
self.get_logger().debug('repopulating update_queue')
|
||||
else:
|
||||
self.get_logger().debug('kernel.update_queue empty')
|
||||
|
||||
for doc in next_set:
|
||||
UpdateTask.delay(doc['_id'])
|
||||
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})
|
||||
# don't sit on a connection
|
||||
kernel.db.connection.end_request()
|
||||
|
@ -1,6 +1,6 @@
|
||||
<tr{% if log.error %} class="error" {% endif %}>
|
||||
<td>{{log.action}}</td>
|
||||
<td><a href="{{request.script_root}}/tracked/{{log.url}}">{{log.url}}</td>
|
||||
<td>{{log.timestamp.strftime("%Y-%m-%d %H:%M:%S")}}</td>
|
||||
<td>{% if log.error %}{{log.error}}{% endif %}</td>
|
||||
<tr class="{{log.levelname.lower}}">
|
||||
<td>{{log.name}}</td>
|
||||
<td>{{log.message}}</td>
|
||||
<td>{{log.created.strftime("%Y-%m-%d %H:%M:%S")}}</td>
|
||||
<td>{% if log.exc_info %}{{log.exc_info}}{% endif %}</td>
|
||||
</tr>
|
||||
|
@ -2,33 +2,39 @@ import time
|
||||
import datetime
|
||||
from unittest import TestCase
|
||||
|
||||
from nose.tools import assert_raises
|
||||
import pymongo
|
||||
from nose.tools import assert_raises, assert_equal
|
||||
|
||||
from oyster.core import Kernel
|
||||
|
||||
def hook_fired(doc):
|
||||
|
||||
def hook_fired(doc, newdata):
|
||||
doc['hook_fired'] = doc.get('hook_fired', 0) + 1
|
||||
|
||||
RANDOM_URL = 'http://www.random.org/integers/?num=1&min=-1000000000&max=1000000000&col=1&base=10&format=plain&rnd=new'
|
||||
RANDOM_URL = ('http://www.random.org/integers/?num=1&min=-1000000000&'
|
||||
'max=1000000000&col=1&base=10&format=plain&rnd=new')
|
||||
|
||||
|
||||
class KernelTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
doc_classes = {'default':
|
||||
{'update_mins': 30, 'storage_engine': 'dummy',
|
||||
# omit doc class, defaults to dummy
|
||||
{'update_mins': 30, 'onchanged': [] },
|
||||
'fast-update':
|
||||
{'update_mins': 1 / 60., 'storage_engine': 'dummy',
|
||||
'onchanged': []
|
||||
},
|
||||
'fast-update':
|
||||
{'update_mins': 1/60., 'storage_engine': 'dummy',
|
||||
'onchanged': []
|
||||
'one-time':
|
||||
{'update_mins': None, 'storage_engine': 'dummy',
|
||||
'onchanged': [],
|
||||
},
|
||||
'change-hook':
|
||||
{'update_mins': 30, 'storage_engine': 'dummy',
|
||||
'onchanged': [hook_fired]
|
||||
}
|
||||
}
|
||||
self.kernel = Kernel(mongo_db='oyster_test', retry_wait_minutes=1/60.,
|
||||
self.kernel = Kernel(mongo_db='oyster_test',
|
||||
retry_wait_minutes=1 / 60.,
|
||||
doc_classes=doc_classes)
|
||||
self.kernel._wipe()
|
||||
|
||||
@ -38,8 +44,6 @@ class KernelTests(TestCase):
|
||||
retry_attempts=7, retry_wait_minutes=8)
|
||||
assert c.db.connection.host == '127.0.0.1'
|
||||
assert c.db.connection.port == 27017
|
||||
assert c.db.logs.options()['capped'] == True
|
||||
assert c.db.logs.options()['size'] == 5000
|
||||
assert c.retry_wait_minutes == 8
|
||||
# TODO: test retry_attempts
|
||||
assert c.scraper.user_agent == 'test-ua'
|
||||
@ -49,17 +53,6 @@ class KernelTests(TestCase):
|
||||
# ensure that a bad document class raises an error
|
||||
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
|
||||
|
||||
|
||||
def test_log(self):
|
||||
self.kernel.log('action1', 'http://example.com')
|
||||
self.kernel.log('action2', 'http://test.com', error=True, pi=3)
|
||||
assert self.kernel.db.logs.count() == 2
|
||||
x = self.kernel.db.logs.find_one({'error': True})
|
||||
assert x['action'] == 'action2'
|
||||
assert x['url'] == 'http://test.com'
|
||||
assert x['pi'] == 3
|
||||
|
||||
|
||||
def test_track_url(self):
|
||||
# basic insert
|
||||
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
||||
@ -69,45 +62,44 @@ class KernelTests(TestCase):
|
||||
assert obj['metadata'] == {'pi': 3}
|
||||
assert obj['versions'] == []
|
||||
|
||||
# logging
|
||||
log = self.kernel.db.logs.find_one()
|
||||
assert log['action'] == 'track'
|
||||
assert log['url'] == 'http://example.com'
|
||||
|
||||
# track same url again with same metadata returns id
|
||||
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
||||
assert id1 == id2
|
||||
|
||||
# test setting id
|
||||
# test manually set id
|
||||
out = self.kernel.track_url('http://example.com/2', 'default',
|
||||
'fixed-id')
|
||||
assert out == 'fixed-id'
|
||||
|
||||
# can't track same URL twice with different id
|
||||
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
|
||||
'default', 'hard-coded-id')
|
||||
# logged error
|
||||
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
|
||||
# can't pass track same id twice with different url
|
||||
assert_raises(ValueError, self.kernel.track_url,
|
||||
'http://example.com/3', 'default', 'fixed-id')
|
||||
|
||||
# ... with different metadata
|
||||
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
|
||||
'default')
|
||||
# logged error
|
||||
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
|
||||
# ... or different doc class
|
||||
assert_raises(ValueError, self.kernel.track_url,
|
||||
'http://example.com/2', 'change-hook', 'fixed-id')
|
||||
|
||||
# ... different doc class
|
||||
assert_raises(ValueError, self.kernel.track_url, 'http://example.com',
|
||||
'special-doc-class', pi=3)
|
||||
# logged error
|
||||
assert self.kernel.db.logs.find_one({'error': 'tracking conflict'})
|
||||
# different metadata is ok, but it should be updated
|
||||
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
|
||||
pi=3)
|
||||
self.kernel.db.tracked.find_one({'_id': 'fixed-id'})['metadata']['pi'] == 3
|
||||
|
||||
def test_no_update(self):
|
||||
# update
|
||||
self.kernel.track_url('http://example.com', 'one-time')
|
||||
obj = self.kernel.db.tracked.find_one()
|
||||
self.kernel.update(obj)
|
||||
|
||||
newobj = self.kernel.db.tracked.find_one()
|
||||
assert newobj['next_update'] == None
|
||||
|
||||
assert self.kernel.get_update_queue() == []
|
||||
assert self.kernel.get_update_queue_size() == 0
|
||||
|
||||
def test_md5_versioning(self):
|
||||
assert not self.kernel.md5_versioning('hello!', 'hello!')
|
||||
assert self.kernel.md5_versioning('hello!', 'hey!')
|
||||
|
||||
|
||||
def test_update(self):
|
||||
# get a single document tracked and call update on it
|
||||
self.kernel.track_url('http://example.com', 'default')
|
||||
@ -123,9 +115,6 @@ class KernelTests(TestCase):
|
||||
|
||||
assert len(newobj['versions']) == 1
|
||||
|
||||
# check logs
|
||||
assert self.kernel.db.logs.find({'action': 'update'}).count() == 1
|
||||
|
||||
# and do another update..
|
||||
self.kernel.update(obj)
|
||||
|
||||
@ -136,10 +125,6 @@ class KernelTests(TestCase):
|
||||
newobj = self.kernel.db.tracked.find_one()
|
||||
assert first_update < newobj['last_update']
|
||||
|
||||
# check that logs updated
|
||||
assert self.kernel.db.logs.find({'action': 'update'}).count() == 2
|
||||
|
||||
|
||||
def test_update_failure(self):
|
||||
# track a non-existent URL
|
||||
self.kernel.track_url('http://not_a_url', 'default')
|
||||
@ -149,43 +134,37 @@ class KernelTests(TestCase):
|
||||
obj = self.kernel.db.tracked.find_one()
|
||||
assert obj['consecutive_errors'] == 1
|
||||
|
||||
# we should have logged an error too
|
||||
assert self.kernel.db.logs.find({'action': 'update',
|
||||
'error': {'$ne': False}}).count() == 1
|
||||
|
||||
# update again
|
||||
self.kernel.update(obj)
|
||||
|
||||
obj = self.kernel.db.tracked.find_one()
|
||||
assert obj['consecutive_errors'] == 2
|
||||
|
||||
#def test_update_onchanged_fire_only_on_change(self):
|
||||
# self.kernel.track_url('http://example.com', 'change-hook')
|
||||
# obj = self.kernel.db.tracked.find_one()
|
||||
# self.kernel.update(obj)
|
||||
|
||||
def test_update_onchanged_fire_only_on_change(self):
|
||||
self.kernel.track_url('http://example.com', 'change-hook')
|
||||
obj = self.kernel.db.tracked.find_one()
|
||||
self.kernel.update(obj)
|
||||
# doc = self.kernel.db.tracked.find_one()
|
||||
# assert doc['hook_fired'] == 1
|
||||
|
||||
doc = self.kernel.db.tracked.find_one()
|
||||
assert doc['hook_fired'] == 1
|
||||
# # again, we rely on example.com not updating
|
||||
# self.kernel.update(obj)
|
||||
# doc = self.kernel.db.tracked.find_one()
|
||||
# assert doc['hook_fired'] == 1
|
||||
|
||||
# again, we rely on example.com not updating
|
||||
self.kernel.update(obj)
|
||||
doc = self.kernel.db.tracked.find_one()
|
||||
assert doc['hook_fired'] == 1
|
||||
#def test_update_onchanged_fire_again_on_change(self):
|
||||
# self.kernel.track_url(RANDOM_URL, 'change-hook')
|
||||
# obj = self.kernel.db.tracked.find_one()
|
||||
# self.kernel.update(obj)
|
||||
|
||||
def test_update_onchanged_fire_again_on_change(self):
|
||||
self.kernel.track_url(RANDOM_URL, 'change-hook')
|
||||
obj = self.kernel.db.tracked.find_one()
|
||||
self.kernel.update(obj)
|
||||
|
||||
doc = self.kernel.db.tracked.find_one()
|
||||
assert doc['hook_fired'] == 1
|
||||
|
||||
# we rely on this URL updating
|
||||
self.kernel.update(obj)
|
||||
doc = self.kernel.db.tracked.find_one()
|
||||
assert doc['hook_fired'] == 2
|
||||
# doc = self.kernel.db.tracked.find_one()
|
||||
# assert doc['hook_fired'] == 1
|
||||
|
||||
# # we rely on this URL updating
|
||||
# self.kernel.update(obj)
|
||||
# doc = self.kernel.db.tracked.find_one()
|
||||
# assert doc['hook_fired'] == 2
|
||||
|
||||
def test_get_update_queue(self):
|
||||
self.kernel.track_url('never-updates', 'fast-update')
|
||||
@ -214,15 +193,12 @@ class KernelTests(TestCase):
|
||||
queue = self.kernel.get_update_queue()
|
||||
assert len(queue) == 3
|
||||
|
||||
|
||||
def test_get_update_queue_size(self):
|
||||
self.kernel.track_url('a', 'fast-update')
|
||||
self.kernel.track_url('b', 'fast-update')
|
||||
self.kernel.track_url('c', 'fast-update')
|
||||
|
||||
a = self.kernel.db.tracked.find_one(dict(url='a'))
|
||||
b = self.kernel.db.tracked.find_one(dict(url='b'))
|
||||
c = self.kernel.db.tracked.find_one(dict(url='c'))
|
||||
|
||||
# size should start at 3
|
||||
assert self.kernel.get_update_queue_size() == 3
|
||||
|
53
oyster/tests/test_mongolog.py
Normal file
53
oyster/tests/test_mongolog.py
Normal file
@ -0,0 +1,53 @@
|
||||
import unittest
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
import pymongo
|
||||
from ..mongolog import MongoHandler
|
||||
|
||||
class TestMongoLog(unittest.TestCase):
|
||||
|
||||
DB_NAME = 'oyster_test'
|
||||
|
||||
def setUp(self):
|
||||
pymongo.Connection().drop_database(self.DB_NAME)
|
||||
self.log = logging.getLogger('mongotest')
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
self.logs = pymongo.Connection()[self.DB_NAME]['logs']
|
||||
# clear handlers upon each setup
|
||||
self.log.handlers = []
|
||||
# async = False for testing
|
||||
self.log.addHandler(MongoHandler(self.DB_NAME, capped_size=4000,
|
||||
async=False))
|
||||
|
||||
def tearDown(self):
|
||||
pymongo.Connection().drop_database(self.DB_NAME)
|
||||
|
||||
def test_basic_write(self):
|
||||
self.log.debug('test')
|
||||
self.assertEqual(self.logs.count(), 1)
|
||||
self.log.debug('test')
|
||||
self.assertEqual(self.logs.count(), 2)
|
||||
# capped_size will limit these
|
||||
self.log.debug('test'*200)
|
||||
self.log.debug('test'*200)
|
||||
self.assertEqual(self.logs.count(), 1)
|
||||
|
||||
def test_attributes(self):
|
||||
self.log.debug('pi=%s', 3.14, extra={'pie':'pizza'})
|
||||
logged = self.logs.find_one()
|
||||
self.assertEqual(logged['message'], 'pi=3.14')
|
||||
self.assertTrue(isinstance(logged['created'], datetime.datetime))
|
||||
self.assertTrue('host' in logged)
|
||||
self.assertEqual(logged['name'], 'mongotest')
|
||||
self.assertEqual(logged['levelname'], 'DEBUG')
|
||||
self.assertEqual(logged['pie'], 'pizza')
|
||||
|
||||
# and exc_info
|
||||
try:
|
||||
raise Exception('error!')
|
||||
except:
|
||||
self.log.warning('oh no!', exc_info=True)
|
||||
logged = self.logs.find_one(sort=[('$natural', -1)])
|
||||
self.assertEqual(logged['levelname'], 'WARNING')
|
||||
self.assertTrue('error!' in logged['exc_info'])
|
@ -1,18 +1,21 @@
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from oyster.conf import settings
|
||||
from oyster.core import Kernel
|
||||
from oyster.storage.s3 import S3Storage
|
||||
from oyster.storage.gridfs import GridFSStorage
|
||||
from oyster.storage.dummy import DummyStorage
|
||||
|
||||
|
||||
def _simple_storage_test(StorageCls):
|
||||
kernel = Kernel(mongo_db='oyster_test')
|
||||
kernel.doc_classes['default'] = {}
|
||||
storage = StorageCls(kernel)
|
||||
|
||||
# ensure the class has a storage_type attribute
|
||||
assert hasattr(storage, 'storage_type')
|
||||
|
||||
doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test',
|
||||
'metadata': {}
|
||||
}
|
||||
'doc_class': 'default', 'metadata': {} }
|
||||
storage_id = storage.put(doc, 'hello oyster', 'text/plain')
|
||||
assert storage_id
|
||||
|
||||
@ -20,6 +23,9 @@ def _simple_storage_test(StorageCls):
|
||||
|
||||
|
||||
def test_s3():
|
||||
if not hasattr(settings, 'AWS_BUCKET'):
|
||||
raise SkipTest('S3 not configured')
|
||||
from oyster.storage.s3 import S3Storage
|
||||
_simple_storage_test(S3Storage)
|
||||
|
||||
|
||||
|
@ -4,7 +4,7 @@ import datetime
|
||||
import functools
|
||||
|
||||
import flask
|
||||
import pymongo.objectid
|
||||
import bson.objectid
|
||||
|
||||
from oyster.conf import settings
|
||||
from oyster.core import kernel
|
||||
@ -14,7 +14,7 @@ class JSONEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, datetime.datetime):
|
||||
return obj.isoformat()
|
||||
elif isinstance(obj, pymongo.objectid.ObjectId):
|
||||
elif isinstance(obj, bson.objectid.ObjectId):
|
||||
return str(obj)
|
||||
else:
|
||||
return super(JSONEncoder, self).default(obj)
|
||||
@ -44,28 +44,19 @@ def api_wrapper(template=None):
|
||||
|
||||
app = flask.Flask('oyster')
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@api_wrapper('index.html')
|
||||
def index():
|
||||
status = {
|
||||
'tracking': kernel.db.tracked.count(),
|
||||
'need_update': kernel.get_update_queue_size(),
|
||||
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(20)),
|
||||
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(100)),
|
||||
'mongo_host': settings.MONGO_HOST,
|
||||
}
|
||||
return status
|
||||
|
||||
|
||||
@app.route('/status/')
|
||||
@api_wrapper()
|
||||
def doc_list():
|
||||
status = {
|
||||
'tracking': kernel.db.tracked.count(),
|
||||
'need_update': kernel.get_update_queue_size(),
|
||||
}
|
||||
return status
|
||||
|
||||
|
||||
@app.route('/log/')
|
||||
@api_wrapper('logs.html')
|
||||
def log_view():
|
||||
@ -85,23 +76,11 @@ def tracked():
|
||||
return json.dumps(tracked, cls=JSONEncoder)
|
||||
|
||||
|
||||
@app.route('/tracked/<path:url>')
|
||||
def tracked_view(url):
|
||||
url = _path_fixer(url)
|
||||
doc = kernel.db.tracked.find_one({'url': url})
|
||||
@app.route('/tracked/<id>')
|
||||
def tracked_view(id):
|
||||
doc = kernel.db.tracked.find_one({'_id': id})
|
||||
return json.dumps(doc, cls=JSONEncoder)
|
||||
|
||||
|
||||
@app.route('/doc/<path:url>/<version>')
|
||||
def show_doc(url, version):
|
||||
url = _path_fixer(url)
|
||||
if version == 'latest':
|
||||
version = -1
|
||||
doc = kernel.get_version(url, version)
|
||||
resp = flask.make_response(doc.read())
|
||||
resp.headers['content-type'] = doc.content_type
|
||||
return resp
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
|
@ -2,4 +2,4 @@ scrapelib
|
||||
pymongo>=2.0
|
||||
flask
|
||||
nose
|
||||
celery
|
||||
celery==2.5.3
|
||||
|
18
setup.py
18
setup.py
@ -1,11 +1,21 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
from setuptools import setup
|
||||
from oyster import __version__
|
||||
|
||||
# Hack to prevent stupid "TypeError: 'NoneType' object is not callable" error
|
||||
# in multiprocessing/util.py _exit_function when running `python
|
||||
# setup.py test` (see
|
||||
# http://www.eby-sarna.com/pipermail/peak/2010-May/003357.html)
|
||||
try:
|
||||
import multiprocessing
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
long_description = open('README.rst').read()
|
||||
|
||||
setup(name="oyster",
|
||||
version=__version__,
|
||||
version='0.4.0-dev',
|
||||
py_modules=['oyster'],
|
||||
author="James Turk",
|
||||
author_email='jturk@sunlightfoundation.com',
|
||||
@ -21,8 +31,10 @@ setup(name="oyster",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python",
|
||||
],
|
||||
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.5.4",
|
||||
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.7.2",
|
||||
"pymongo >= 1.11", "flask", "celery"],
|
||||
tests_require=["nose"],
|
||||
test_suite='nose.collector',
|
||||
entry_points="""
|
||||
[console_scripts]
|
||||
scrapeshell = scrapelib:scrapeshell
|
||||
|
10
tox.ini
Normal file
10
tox.ini
Normal file
@ -0,0 +1,10 @@
|
||||
# Tox (http://codespeak.net/~hpk/tox/) is a tool for running tests
|
||||
# in multiple virtualenvs. This configuration file will run the
|
||||
# test suite on all supported python versions. To use it, "pip install tox"
|
||||
# and then run "tox" from this directory.
|
||||
|
||||
[tox]
|
||||
envlist = py26, py27, pypy
|
||||
|
||||
[testenv]
|
||||
commands = python setup.py test
|
Loading…
Reference in New Issue
Block a user