Compare commits
No commits in common. "master" and "0.2.0" have entirely different histories.
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,5 +1,2 @@
|
|||||||
celerybeat-schedule
|
celerybeat-schedule
|
||||||
oyster_settings.py
|
|
||||||
oyster.egg-info/
|
|
||||||
*.pyc
|
*.pyc
|
||||||
.tox
|
|
||||||
|
10
.travis.yml
10
.travis.yml
@ -1,10 +0,0 @@
|
|||||||
language: python
|
|
||||||
python:
|
|
||||||
- "2.6"
|
|
||||||
- "2.7"
|
|
||||||
install: pip install scrapelib pymongo nose celery --use-mirrors
|
|
||||||
script: nosetests
|
|
||||||
services: mongodb
|
|
||||||
notifications:
|
|
||||||
email:
|
|
||||||
- jturk@sunlightfoundation.com
|
|
16
README.rst
16
README.rst
@ -1,31 +1,27 @@
|
|||||||
**DEPRECATED** - this project is abandoned & will not be seeing future updates
|
|
||||||
|
|
||||||
======
|
======
|
||||||
oyster
|
oyster
|
||||||
======
|
======
|
||||||
|
|
||||||
oyster is a service for tracking regularly-accessed pages, a sort of proactive cache.
|
oyster is a service for tracking regularly-accessed pages, a sort of proactive cache.
|
||||||
|
|
||||||
Oyster intends to provide a command line client for interacting with the list of tracked documents and web frontend for viewing the status and retrieving data. Behind the scenes it uses a celery queue to manage the documents it is tasked with keeping up to date.
|
It features a daemon, a command line client for interacting with the tracking list, and a web frontend for viewing the status.
|
||||||
|
|
||||||
oyster was created by James Turk for `Sunlight Labs <http://sunlightlabs.com>`_.
|
oyster was created by James Turk for `Sunlight Labs <http://sunlightlabs.com>`_.
|
||||||
|
|
||||||
Source is available via `GitHub <http://github.com/sunlightlabs/oyster/>`_
|
Source is available via `GitHub <http://github.com/sunlightlabs/oyster/>`_
|
||||||
|
|
||||||
|
## ADD PyPI link after release
|
||||||
|
|
||||||
Installation
|
Installation
|
||||||
============
|
============
|
||||||
|
|
||||||
oyster is available on PyPI: `oyster <http://pypi.python.org/pypi/oyster>`_.
|
|
||||||
|
|
||||||
The recommended way to install oyster is to simply ``pip install oyster``
|
|
||||||
|
|
||||||
Requirements
|
Requirements
|
||||||
------------
|
------------
|
||||||
|
|
||||||
* python 2.7
|
* python 2.7
|
||||||
* mongodb 2.0
|
* mongodb 1.8
|
||||||
* pymongo 2.0
|
* pymongo 1.11
|
||||||
* scrapelib 0.5+
|
* scrapelib 0.5.5
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
=====
|
=====
|
||||||
|
@ -1,59 +0,0 @@
|
|||||||
oyster changelog
|
|
||||||
================
|
|
||||||
|
|
||||||
0.4.0-dev
|
|
||||||
---------
|
|
||||||
* S3 storage backend bugfix
|
|
||||||
* lots of improvements to signal script
|
|
||||||
* oyster.ext cloudsearch, elasticsearch, and superfastmatch
|
|
||||||
* use python logging w/ mongo handler
|
|
||||||
* add tox/python setup.py test (thanks Marc Abramowitz!)
|
|
||||||
|
|
||||||
0.3.2
|
|
||||||
-----
|
|
||||||
**2012-03-29**
|
|
||||||
* become much more tolerant of duplicates
|
|
||||||
* skip S3 test if not prepared
|
|
||||||
* use doc_class AWS_PREFIX and AWS_BUCKET if set
|
|
||||||
* add DEFAULT_STORAGE_ENGINE setting
|
|
||||||
|
|
||||||
0.3.1
|
|
||||||
-----
|
|
||||||
**2012-03-10**
|
|
||||||
* add validation of doc_class
|
|
||||||
* add ability to do one-time updates
|
|
||||||
* change how hooks work
|
|
||||||
* introduce concept of scripts
|
|
||||||
* call pymongo's end_request in long running threads
|
|
||||||
* better error messages for duplicate URLs
|
|
||||||
* lots of flake8-inspired fixes
|
|
||||||
* S3 backend: add support for AWS_PREFIX
|
|
||||||
|
|
||||||
0.3.0
|
|
||||||
-----
|
|
||||||
**2012-02-21**
|
|
||||||
* switch Connection to Kernel
|
|
||||||
* add concept of doc_class
|
|
||||||
* make storage pluggable instead of GridFS
|
|
||||||
* add S3 backend
|
|
||||||
* add Dummy backend
|
|
||||||
* delete obsolete ExternalStoreTask
|
|
||||||
* addition of onchanged hook
|
|
||||||
* allow id to be set manually
|
|
||||||
|
|
||||||
0.2.5
|
|
||||||
-----
|
|
||||||
**2011-10-06**
|
|
||||||
* lots of fixes to web frontend
|
|
||||||
* ExternalStoreTask
|
|
||||||
|
|
||||||
0.2.0
|
|
||||||
-----
|
|
||||||
**2011-09-20**
|
|
||||||
* major refactor: oysterd replaced by celery
|
|
||||||
* fix retries
|
|
||||||
|
|
||||||
0.1.0
|
|
||||||
-----
|
|
||||||
**2011-08-05**
|
|
||||||
* initial release, basic document tracking
|
|
34
design.txt
34
design.txt
@ -1,34 +0,0 @@
|
|||||||
Oyster is designed as a 'proactive document cache', meaning that it takes URLs
|
|
||||||
and keeps a local copy up to date depending on user-specified criteria.
|
|
||||||
|
|
||||||
Data Model
|
|
||||||
==========
|
|
||||||
|
|
||||||
tracked - metadata for tracked resources
|
|
||||||
_id : internal id
|
|
||||||
_random : a random integer used for sorting
|
|
||||||
url : url of resource
|
|
||||||
doc_class : string indicating the document class, allows for different
|
|
||||||
settings/hooks for some documents
|
|
||||||
metadata : dictionary of extra user-specified attributes
|
|
||||||
versions : list of dictionaries with the following keys:
|
|
||||||
timestamp : UTC timestamp
|
|
||||||
<storage_key> : storage_id
|
|
||||||
(may be s3_url, gridfs_id, etc.)
|
|
||||||
|
|
||||||
logs - capped log collection
|
|
||||||
action : log entry
|
|
||||||
url : url that action was related to
|
|
||||||
error : boolean error flag
|
|
||||||
timestamp : UTC timestamp for log entry
|
|
||||||
|
|
||||||
status - internal state
|
|
||||||
update_queue : size of update queue
|
|
||||||
|
|
||||||
|
|
||||||
Storage Interface
|
|
||||||
=================
|
|
||||||
storage_key : key to store on versions
|
|
||||||
put(tracked_doc, data, content_type) -> id
|
|
||||||
get(id) -> file type object
|
|
||||||
|
|
@ -1,4 +1 @@
|
|||||||
__version__ = "0.4.0-dev"
|
__version__ = "0.2.0"
|
||||||
|
|
||||||
import os
|
|
||||||
os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig'
|
|
||||||
|
@ -1,3 +1,11 @@
|
|||||||
from oyster.conf import settings
|
from oyster.conf import settings
|
||||||
|
|
||||||
CELERY_IMPORTS = ['oyster.tasks'] + list(settings.CELERY_TASK_MODULES)
|
CELERY_IMPORTS = ("oyster.tasks",)
|
||||||
|
|
||||||
|
BROKER_TRANSPORT = 'mongodb'
|
||||||
|
CELERY_RESULT_BACKEND = 'mongodb'
|
||||||
|
|
||||||
|
CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||||
|
'host': settings.MONGO_HOST,
|
||||||
|
'port': settings.MONGO_PORT,
|
||||||
|
}
|
||||||
|
191
oyster/client.py
Normal file
191
oyster/client.py
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
|
import urllib
|
||||||
|
|
||||||
|
import pymongo
|
||||||
|
import gridfs
|
||||||
|
import scrapelib
|
||||||
|
|
||||||
|
|
||||||
|
def get_configured_client():
|
||||||
|
""" helper factory, gets a client configured with oyster.conf.settings """
|
||||||
|
from oyster.conf import settings
|
||||||
|
return Client(mongo_host=settings.MONGO_HOST,
|
||||||
|
mongo_port=settings.MONGO_PORT,
|
||||||
|
mongo_db=settings.MONGO_DATABASE,
|
||||||
|
mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE,
|
||||||
|
user_agent=settings.USER_AGENT,
|
||||||
|
rpm=settings.REQUESTS_PER_MINUTE,
|
||||||
|
timeout=settings.REQUEST_TIMEOUT,
|
||||||
|
retry_attempts=settings.RETRY_ATTEMPTS,
|
||||||
|
retry_wait_minutes=settings.RETRY_WAIT_MINUTES)
|
||||||
|
|
||||||
|
|
||||||
|
class Client(object):
|
||||||
|
""" oyster's workhorse, handles tracking """
|
||||||
|
|
||||||
|
def __init__(self, mongo_host='localhost', mongo_port=27017,
|
||||||
|
mongo_db='oyster', mongo_log_maxsize=100000000,
|
||||||
|
user_agent='oyster', rpm=600, timeout=None,
|
||||||
|
retry_attempts=100, retry_wait_minutes=60):
|
||||||
|
|
||||||
|
# set up a capped log if it doesn't exist
|
||||||
|
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
|
||||||
|
try:
|
||||||
|
self.db.create_collection('logs', capped=True,
|
||||||
|
size=mongo_log_maxsize)
|
||||||
|
except pymongo.errors.CollectionInvalid:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# create status document if it doesn't exist
|
||||||
|
if self.db.status.count() == 0:
|
||||||
|
self.db.status.insert({'update_queue': 0})
|
||||||
|
|
||||||
|
self._collection_name = 'fs'
|
||||||
|
self.fs = gridfs.GridFS(self.db, self._collection_name)
|
||||||
|
self.scraper = scrapelib.Scraper(user_agent=user_agent,
|
||||||
|
requests_per_minute=rpm,
|
||||||
|
follow_robots=False,
|
||||||
|
raise_errors=True,
|
||||||
|
timeout=timeout)
|
||||||
|
|
||||||
|
self.retry_attempts = retry_attempts
|
||||||
|
self.retry_wait_minutes = retry_wait_minutes
|
||||||
|
|
||||||
|
|
||||||
|
def _wipe(self):
|
||||||
|
""" exists primarily for debug use, wipes entire db """
|
||||||
|
self.db.drop_collection('tracked')
|
||||||
|
self.db.drop_collection('%s.chunks' % self._collection_name)
|
||||||
|
self.db.drop_collection('%s.files' % self._collection_name)
|
||||||
|
self.db.drop_collection('logs')
|
||||||
|
self.db.drop_collection('status')
|
||||||
|
|
||||||
|
|
||||||
|
def log(self, action, url, error=False, **kwargs):
|
||||||
|
""" add an entry to the oyster log """
|
||||||
|
kwargs['action'] = action
|
||||||
|
kwargs['url'] = url
|
||||||
|
kwargs['error'] = error
|
||||||
|
kwargs['timestamp'] = datetime.datetime.utcnow()
|
||||||
|
self.db.logs.insert(kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def track_url(self, url, versioning='md5', update_mins=60*24,
|
||||||
|
**kwargs):
|
||||||
|
"""
|
||||||
|
Add a URL to the set of tracked URLs, accessible via a given filename.
|
||||||
|
|
||||||
|
url
|
||||||
|
URL to start tracking
|
||||||
|
"""
|
||||||
|
if self.db.tracked.find_one({'url': url}):
|
||||||
|
self.log('track', url=url, error='already tracked')
|
||||||
|
raise ValueError('%s is already tracked' % url)
|
||||||
|
|
||||||
|
self.log('track', url=url)
|
||||||
|
self.db.tracked.insert(dict(url=url, versioning=versioning,
|
||||||
|
update_mins=update_mins,
|
||||||
|
_random=random.randint(0, sys.maxint),
|
||||||
|
metadata=kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
def md5_versioning(self, doc, data):
|
||||||
|
""" return True if md5 changed or if file is new """
|
||||||
|
try:
|
||||||
|
old_md5 = self.fs.get_last_version(filename=doc['url']).md5
|
||||||
|
new_md5 = hashlib.md5(data).hexdigest()
|
||||||
|
return (old_md5 != new_md5)
|
||||||
|
except gridfs.NoFile:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def update(self, doc):
|
||||||
|
do_put = True
|
||||||
|
error = False
|
||||||
|
|
||||||
|
# update strategies could be implemented here as well
|
||||||
|
try:
|
||||||
|
url = doc['url'].replace(' ', '%20')
|
||||||
|
data = self.scraper.urlopen(url)
|
||||||
|
content_type = data.response.headers['content-type']
|
||||||
|
except Exception as e:
|
||||||
|
do_put = False
|
||||||
|
error = str(e)
|
||||||
|
|
||||||
|
# versioning is a concept for future use, but here's how it can work:
|
||||||
|
# versioning functions take doc & data, and return True if data is
|
||||||
|
# different, since they have access to doc, they can also modify
|
||||||
|
# certain attributes as needed
|
||||||
|
|
||||||
|
if do_put:
|
||||||
|
if doc['versioning'] == 'md5':
|
||||||
|
do_put = self.md5_versioning(doc, data)
|
||||||
|
else:
|
||||||
|
raise ValueError('unknown versioning strategy "%s"' %
|
||||||
|
doc['versioning'])
|
||||||
|
|
||||||
|
if do_put:
|
||||||
|
self.fs.put(data, filename=doc['url'], content_type=content_type,
|
||||||
|
**doc['metadata'])
|
||||||
|
|
||||||
|
if error:
|
||||||
|
c_errors = doc.get('consecutive_errors', 0)
|
||||||
|
doc['consecutive_errors'] = c_errors + 1
|
||||||
|
if c_errors <= self.retry_attempts:
|
||||||
|
update_mins = self.retry_wait_minutes * (2**c_errors)
|
||||||
|
else:
|
||||||
|
update_mins = doc['update_mins']
|
||||||
|
else:
|
||||||
|
doc['consecutive_errors'] = 0
|
||||||
|
update_mins = doc['update_mins']
|
||||||
|
|
||||||
|
# last_update/next_update are separate from question of versioning
|
||||||
|
doc['last_update'] = datetime.datetime.utcnow()
|
||||||
|
doc['next_update'] = (doc['last_update'] +
|
||||||
|
datetime.timedelta(minutes=update_mins))
|
||||||
|
|
||||||
|
self.log('update', url=url, new_doc=do_put, error=error)
|
||||||
|
|
||||||
|
self.db.tracked.save(doc, safe=True)
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_versions(self, url):
|
||||||
|
versions = []
|
||||||
|
n = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
versions.append(self.fs.get_version(url, n))
|
||||||
|
n += 1
|
||||||
|
except gridfs.NoFile:
|
||||||
|
break
|
||||||
|
return versions
|
||||||
|
|
||||||
|
|
||||||
|
def get_version(self, url, n=-1):
|
||||||
|
return self.fs.get_version(url, n)
|
||||||
|
|
||||||
|
|
||||||
|
def get_update_queue(self):
|
||||||
|
# results are always sorted by random to avoid piling on single server
|
||||||
|
|
||||||
|
# first we try to update anything that we've never retrieved
|
||||||
|
new = self.db.tracked.find({'next_update':
|
||||||
|
{'$exists': False}}).sort('_random')
|
||||||
|
queue = list(new)
|
||||||
|
|
||||||
|
# pull the rest from those for which next_update is in the past
|
||||||
|
next = self.db.tracked.find({'next_update':
|
||||||
|
{'$lt': datetime.datetime.utcnow()}}).sort('_random')
|
||||||
|
queue.extend(next)
|
||||||
|
|
||||||
|
return queue
|
||||||
|
|
||||||
|
|
||||||
|
def get_update_queue_size(self):
|
||||||
|
new = self.db.tracked.find({'next_update': {'$exists': False}}).count()
|
||||||
|
next = self.db.tracked.find({'next_update':
|
||||||
|
{'$lt': datetime.datetime.utcnow()}}).count()
|
||||||
|
return new+next
|
@ -1,6 +1,5 @@
|
|||||||
from oyster.conf import default_settings
|
from oyster.conf import default_settings
|
||||||
|
|
||||||
|
|
||||||
class Settings(object):
|
class Settings(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
@ -4,18 +4,11 @@ MONGO_PORT = 27017
|
|||||||
MONGO_DATABASE = 'oyster'
|
MONGO_DATABASE = 'oyster'
|
||||||
MONGO_LOG_MAXSIZE = 100000000
|
MONGO_LOG_MAXSIZE = 100000000
|
||||||
|
|
||||||
# extra celery modules
|
|
||||||
CELERY_TASK_MODULES = []
|
|
||||||
|
|
||||||
# scrapelib
|
# scrapelib
|
||||||
USER_AGENT = 'oyster'
|
USER_AGENT = 'oyster'
|
||||||
REQUESTS_PER_MINUTE = 60
|
REQUESTS_PER_MINUTE = 300
|
||||||
REQUEST_TIMEOUT = 300
|
REQUEST_TIMEOUT = 300
|
||||||
|
|
||||||
# other
|
# other
|
||||||
RETRY_ATTEMPTS = 3
|
RETRY_ATTEMPTS = 3
|
||||||
RETRY_WAIT_MINUTES = 60
|
RETRY_WAIT_MINUTES = 60
|
||||||
|
|
||||||
DOCUMENT_CLASSES = {}
|
|
||||||
|
|
||||||
DEFAULT_STORAGE_ENGINE = 'dummy'
|
|
||||||
|
285
oyster/core.py
285
oyster/core.py
@ -1,285 +0,0 @@
|
|||||||
import datetime
|
|
||||||
import logging
|
|
||||||
import hashlib
|
|
||||||
import random
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import pymongo
|
|
||||||
import scrapelib
|
|
||||||
|
|
||||||
from .mongolog import MongoHandler
|
|
||||||
from .storage import engines
|
|
||||||
from celery.execute import send_task
|
|
||||||
|
|
||||||
|
|
||||||
class Kernel(object):
|
|
||||||
""" oyster's workhorse, handles tracking """
|
|
||||||
|
|
||||||
def __init__(self, mongo_host='localhost', mongo_port=27017,
|
|
||||||
mongo_db='oyster', mongo_log_maxsize=100000000,
|
|
||||||
user_agent='oyster', rpm=60, timeout=300,
|
|
||||||
retry_attempts=3, retry_wait_minutes=60,
|
|
||||||
doc_classes=None, default_storage_engine='dummy',
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
configurable for ease of testing, only one should be instantiated
|
|
||||||
"""
|
|
||||||
|
|
||||||
# set up the log
|
|
||||||
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
|
|
||||||
|
|
||||||
self.log = logging.getLogger('oyster')
|
|
||||||
self.log.setLevel(logging.DEBUG)
|
|
||||||
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host,
|
|
||||||
port=mongo_port,
|
|
||||||
capped_size=mongo_log_maxsize))
|
|
||||||
|
|
||||||
# create status document if it doesn't exist
|
|
||||||
if self.db.status.count() == 0:
|
|
||||||
self.db.status.insert({'update_queue': 0})
|
|
||||||
|
|
||||||
# ensure an index on _random
|
|
||||||
self.db.tracked.ensure_index('_random')
|
|
||||||
self.db.tracked.ensure_index('url')
|
|
||||||
|
|
||||||
self.scraper = scrapelib.Scraper(user_agent=user_agent,
|
|
||||||
requests_per_minute=rpm,
|
|
||||||
follow_robots=False,
|
|
||||||
raise_errors=True,
|
|
||||||
timeout=timeout)
|
|
||||||
|
|
||||||
self.retry_attempts = retry_attempts
|
|
||||||
self.retry_wait_minutes = retry_wait_minutes
|
|
||||||
|
|
||||||
# load engines
|
|
||||||
self.storage = {}
|
|
||||||
for name, StorageCls in engines.iteritems():
|
|
||||||
self.storage[name] = StorageCls(self)
|
|
||||||
|
|
||||||
# set document classes
|
|
||||||
_doc_class_fields = ('update_mins', 'onchanged')
|
|
||||||
self.doc_classes = doc_classes or {}
|
|
||||||
for dc_name, dc_props in self.doc_classes.iteritems():
|
|
||||||
for key in _doc_class_fields:
|
|
||||||
if key not in dc_props:
|
|
||||||
raise ValueError('doc_class %s missing key %s' %
|
|
||||||
(dc_name, key))
|
|
||||||
# set a default storage engine
|
|
||||||
if 'storage_engine' not in dc_props:
|
|
||||||
dc_props['storage_engine'] = default_storage_engine
|
|
||||||
|
|
||||||
def _wipe(self):
|
|
||||||
""" exists primarily for debug use, wipes entire db """
|
|
||||||
self.db.drop_collection('tracked')
|
|
||||||
self.db.drop_collection('logs')
|
|
||||||
self.db.drop_collection('status')
|
|
||||||
|
|
||||||
def _add_doc_class(self, doc_class, **properties):
|
|
||||||
self.doc_classes[doc_class] = properties
|
|
||||||
|
|
||||||
def track_url(self, url, doc_class, id=None, **kwargs):
|
|
||||||
"""
|
|
||||||
Add a URL to the set of tracked URLs, accessible via a given filename.
|
|
||||||
|
|
||||||
url
|
|
||||||
URL to start tracking
|
|
||||||
doc_class
|
|
||||||
document type, can be any arbitrary string
|
|
||||||
**kwargs
|
|
||||||
any keyword args will be added to the document's metadata
|
|
||||||
"""
|
|
||||||
if doc_class not in self.doc_classes:
|
|
||||||
error = 'error tracking %s, unregistered doc_class %s'
|
|
||||||
self.log.error(error, url, doc_class)
|
|
||||||
raise ValueError(error % (url, doc_class))
|
|
||||||
|
|
||||||
# try and find an existing version of this document
|
|
||||||
tracked = None
|
|
||||||
|
|
||||||
if id:
|
|
||||||
tracked = self.db.tracked.find_one({'_id': id})
|
|
||||||
else:
|
|
||||||
tracked = self.db.tracked.find_one({'url': url})
|
|
||||||
|
|
||||||
# if id exists, ensure that URL and doc_class haven't changed
|
|
||||||
# then return existing data (possibly with refreshed metadata)
|
|
||||||
if tracked:
|
|
||||||
if (tracked['url'] == url and
|
|
||||||
tracked['doc_class'] == doc_class):
|
|
||||||
if kwargs != tracked['metadata']:
|
|
||||||
tracked['metadata'] = kwargs
|
|
||||||
self.db.tracked.save(tracked, safe=True)
|
|
||||||
return tracked['_id']
|
|
||||||
else:
|
|
||||||
# id existed but with different URL
|
|
||||||
message = ('%s already exists with different data (tracked: '
|
|
||||||
'%s, %s) (new: %s, %s)')
|
|
||||||
args = (tracked['_id'], tracked['url'], tracked['doc_class'],
|
|
||||||
url, doc_class)
|
|
||||||
self.log.error(message, *args)
|
|
||||||
raise ValueError(message % args)
|
|
||||||
|
|
||||||
self.log.info('tracked %s [%s]', url, id)
|
|
||||||
|
|
||||||
newdoc = dict(url=url, doc_class=doc_class,
|
|
||||||
_random=random.randint(0, sys.maxint),
|
|
||||||
versions=[], metadata=kwargs)
|
|
||||||
if id:
|
|
||||||
newdoc['_id'] = id
|
|
||||||
return self.db.tracked.insert(newdoc, safe=True)
|
|
||||||
|
|
||||||
def md5_versioning(self, olddata, newdata):
|
|
||||||
""" return True if md5 changed or if file is new """
|
|
||||||
old_md5 = hashlib.md5(olddata).hexdigest()
|
|
||||||
new_md5 = hashlib.md5(newdata).hexdigest()
|
|
||||||
return old_md5 != new_md5
|
|
||||||
|
|
||||||
def update(self, doc):
|
|
||||||
"""
|
|
||||||
perform update upon a given document
|
|
||||||
|
|
||||||
:param:`doc` must be a document from the `tracked` collection
|
|
||||||
|
|
||||||
* download latest document
|
|
||||||
* check if document has changed using versioning func
|
|
||||||
* if a change has occurred save the file
|
|
||||||
* if error occured, log & keep track of how many errors in a row
|
|
||||||
* update last_update/next_update timestamp
|
|
||||||
"""
|
|
||||||
|
|
||||||
new_version = True
|
|
||||||
error = False
|
|
||||||
now = datetime.datetime.utcnow()
|
|
||||||
|
|
||||||
try:
|
|
||||||
doc_class = self.doc_classes[doc['doc_class']]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
|
|
||||||
|
|
||||||
update_mins = doc_class['update_mins']
|
|
||||||
storage = self.storage[doc_class['storage_engine']]
|
|
||||||
|
|
||||||
# fetch strategies could be implemented here as well
|
|
||||||
try:
|
|
||||||
url = doc['url'].replace(' ', '%20')
|
|
||||||
newdata = self.scraper.urlopen(url)
|
|
||||||
content_type = newdata.response.headers['content-type']
|
|
||||||
except Exception as e:
|
|
||||||
new_version = False
|
|
||||||
error = str(e)
|
|
||||||
|
|
||||||
# only do versioning check if at least one version exists
|
|
||||||
if new_version and doc['versions']:
|
|
||||||
# room here for different versioning schemes
|
|
||||||
olddata = storage.get(doc['versions'][-1]['storage_key'])
|
|
||||||
new_version = self.md5_versioning(olddata, newdata)
|
|
||||||
|
|
||||||
if new_version:
|
|
||||||
storage_id = storage.put(doc, newdata, content_type)
|
|
||||||
doc['versions'].append({'timestamp': now,
|
|
||||||
'storage_key': storage_id,
|
|
||||||
'storage_type': storage.storage_type,
|
|
||||||
})
|
|
||||||
# fire off onchanged functions
|
|
||||||
for onchanged in doc_class.get('onchanged', []):
|
|
||||||
send_task(onchanged, (doc['_id'],))
|
|
||||||
|
|
||||||
if error:
|
|
||||||
# if there's been an error, increment the consecutive_errors count
|
|
||||||
# and back off a bit until we've reached our retry limit
|
|
||||||
c_errors = doc.get('consecutive_errors', 0)
|
|
||||||
doc['consecutive_errors'] = c_errors + 1
|
|
||||||
if c_errors <= self.retry_attempts:
|
|
||||||
update_mins = self.retry_wait_minutes * (2 ** c_errors)
|
|
||||||
else:
|
|
||||||
# reset error count if all was ok
|
|
||||||
doc['consecutive_errors'] = 0
|
|
||||||
|
|
||||||
# last_update/next_update are separate from question of versioning
|
|
||||||
doc['last_update'] = now
|
|
||||||
if update_mins:
|
|
||||||
doc['next_update'] = now + datetime.timedelta(minutes=update_mins)
|
|
||||||
else:
|
|
||||||
doc['next_update'] = None
|
|
||||||
|
|
||||||
if error:
|
|
||||||
self.log.warning('error updating %s [%s]', url, doc['_id'])
|
|
||||||
else:
|
|
||||||
new_version = ' (new)'
|
|
||||||
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
|
|
||||||
|
|
||||||
self.db.tracked.save(doc, safe=True)
|
|
||||||
|
|
||||||
def get_update_queue(self):
|
|
||||||
"""
|
|
||||||
Get a list of what needs to be updated.
|
|
||||||
|
|
||||||
Documents that have never been updated take priority, followed by
|
|
||||||
documents that are simply stale. Within these two categories results
|
|
||||||
are sorted in semirandom order to decrease odds of piling on one
|
|
||||||
server.
|
|
||||||
"""
|
|
||||||
# results are always sorted by random to avoid piling on single server
|
|
||||||
|
|
||||||
# first we try to update anything that we've never retrieved
|
|
||||||
new = self.db.tracked.find({'next_update':
|
|
||||||
{'$exists': False}}).sort('_random')
|
|
||||||
queue = list(new)
|
|
||||||
|
|
||||||
# pull the rest from those for which next_update is in the past
|
|
||||||
next = self.db.tracked.find({'$and': [
|
|
||||||
{'next_update': {'$ne': None}},
|
|
||||||
{'next_update': {'$lt': datetime.datetime.utcnow()}},
|
|
||||||
]}).sort('_random')
|
|
||||||
queue.extend(next)
|
|
||||||
|
|
||||||
return queue
|
|
||||||
|
|
||||||
def get_update_queue_size(self):
|
|
||||||
"""
|
|
||||||
Get the size of the update queue, this should match
|
|
||||||
``len(self.get_update_queue())``, but is computed more efficiently.
|
|
||||||
"""
|
|
||||||
new = self.db.tracked.find({'next_update': {'$exists': False}}).count()
|
|
||||||
next = self.db.tracked.find({'$and': [
|
|
||||||
{'next_update': {'$ne': None}},
|
|
||||||
{'next_update': {'$lt': datetime.datetime.utcnow()}},
|
|
||||||
]}).count()
|
|
||||||
return new + next
|
|
||||||
|
|
||||||
def get_last_version(self, doc):
|
|
||||||
try:
|
|
||||||
doc_class = self.doc_classes[doc['doc_class']]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
|
|
||||||
storage = self.storage[doc_class['storage_engine']]
|
|
||||||
return storage.get(doc['versions'][-1]['storage_key'])
|
|
||||||
|
|
||||||
def extract_text(self, doc):
|
|
||||||
version = self.get_last_version(doc)
|
|
||||||
doc_class = self.doc_classes[doc['doc_class']]
|
|
||||||
try:
|
|
||||||
extract_text = doc_class['extract_text']
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError('doc_class %s missing extract_text' %
|
|
||||||
doc['doc_class'])
|
|
||||||
return extract_text(doc, version)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_configured_kernel():
|
|
||||||
""" factory, gets a connection configured with oyster.conf.settings """
|
|
||||||
from oyster.conf import settings
|
|
||||||
return Kernel(mongo_host=settings.MONGO_HOST,
|
|
||||||
mongo_port=settings.MONGO_PORT,
|
|
||||||
mongo_db=settings.MONGO_DATABASE,
|
|
||||||
mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE,
|
|
||||||
user_agent=settings.USER_AGENT,
|
|
||||||
rpm=settings.REQUESTS_PER_MINUTE,
|
|
||||||
timeout=settings.REQUEST_TIMEOUT,
|
|
||||||
retry_attempts=settings.RETRY_ATTEMPTS,
|
|
||||||
retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
|
|
||||||
doc_classes=settings.DOCUMENT_CLASSES,
|
|
||||||
default_storage_engine=settings.DEFAULT_STORAGE_ENGINE,
|
|
||||||
)
|
|
||||||
|
|
||||||
kernel = _get_configured_kernel()
|
|
@ -1,32 +0,0 @@
|
|||||||
# needed so we can import cloudsearch
|
|
||||||
from __future__ import absolute_import
|
|
||||||
|
|
||||||
from celery.task.base import Task
|
|
||||||
|
|
||||||
from ..core import kernel
|
|
||||||
from ..conf import settings
|
|
||||||
|
|
||||||
from cloudsearch import CloudSearch
|
|
||||||
|
|
||||||
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
|
|
||||||
|
|
||||||
|
|
||||||
class CloudSearchPush(Task):
|
|
||||||
""" task that updates documents """
|
|
||||||
# results go straight to database
|
|
||||||
ignore_result = True
|
|
||||||
|
|
||||||
# a bit under 1MB
|
|
||||||
MAX_BYTES = 1048000
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
|
||||||
text = kernel.extract_text(doc)
|
|
||||||
pieces = [text[i:i+self.MAX_BYTES] for i in
|
|
||||||
xrange(0, len(text), self.MAX_BYTES)]
|
|
||||||
|
|
||||||
self.get_logger().debug('adding {0} pieces for {1}'.format(
|
|
||||||
len(pieces), doc_id))
|
|
||||||
for i, piece in enumerate(pieces):
|
|
||||||
cloud_id = '%s_%s' % (doc_id.lower(), i)
|
|
||||||
cs.add_document(cloud_id, text=piece, **doc['metadata'])
|
|
@ -1,36 +0,0 @@
|
|||||||
import logging
|
|
||||||
from celery.task.base import Task
|
|
||||||
|
|
||||||
from ..core import kernel
|
|
||||||
from ..conf import settings
|
|
||||||
|
|
||||||
from pyes import ES
|
|
||||||
|
|
||||||
es = ES(settings.ELASTICSEARCH_HOST)
|
|
||||||
log = logging.getLogger('oyster.ext.elasticsearch')
|
|
||||||
|
|
||||||
class ElasticSearchPush(Task):
|
|
||||||
# results go straight to elasticsearch
|
|
||||||
ignore_result = True
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
|
||||||
|
|
||||||
try:
|
|
||||||
text = kernel.extract_text(doc)
|
|
||||||
if not text:
|
|
||||||
log.info('no text for %s', doc_id,
|
|
||||||
extra={'doc_class':doc['doc_class']})
|
|
||||||
return
|
|
||||||
|
|
||||||
log.info('tracked %s', doc_id,
|
|
||||||
extra={'doc_class':doc['doc_class']})
|
|
||||||
|
|
||||||
es.index(dict(doc['metadata'], text=text),
|
|
||||||
settings.ELASTICSEARCH_INDEX,
|
|
||||||
settings.ELASTICSEARCH_DOC_TYPE,
|
|
||||||
id=doc_id)
|
|
||||||
except Exception as e:
|
|
||||||
log.warning('error tracking %s', doc_id,
|
|
||||||
extra={'doc_class':doc['doc_class']}, exc_info=True)
|
|
||||||
raise
|
|
@ -1,23 +0,0 @@
|
|||||||
# needed so we can import superfastmatch.Client
|
|
||||||
from __future__ import absolute_import
|
|
||||||
from celery.task.base import Task
|
|
||||||
|
|
||||||
from ..core import kernel
|
|
||||||
from ..conf import settings
|
|
||||||
|
|
||||||
from superfastmatch import Client
|
|
||||||
|
|
||||||
sfm = Client(settings.SUPERFASTMATCH_URL)
|
|
||||||
|
|
||||||
|
|
||||||
class SuperFastMatchPush(Task):
|
|
||||||
""" task that pushes documents to SFM """
|
|
||||||
|
|
||||||
# results go straight to database
|
|
||||||
ignore_result = True
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
|
||||||
text = kernel.extract_text(doc)
|
|
||||||
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
|
|
||||||
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])
|
|
@ -1,54 +0,0 @@
|
|||||||
"""
|
|
||||||
MongoDB handler for Python Logging
|
|
||||||
|
|
||||||
inspired by https://github.com/andreisavu/mongodb-log
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import datetime
|
|
||||||
import socket
|
|
||||||
import pymongo
|
|
||||||
|
|
||||||
|
|
||||||
class MongoFormatter(logging.Formatter):
|
|
||||||
|
|
||||||
def format(self, record):
|
|
||||||
""" turn a LogRecord into something mongo can store """
|
|
||||||
data = record.__dict__.copy()
|
|
||||||
|
|
||||||
data.update(
|
|
||||||
# format message
|
|
||||||
message=record.getMessage(),
|
|
||||||
# overwrite created (float) w/ a mongo-compatible datetime
|
|
||||||
created=datetime.datetime.utcnow(),
|
|
||||||
host=socket.gethostname(),
|
|
||||||
args=tuple(unicode(arg) for arg in record.args)
|
|
||||||
)
|
|
||||||
data.pop('msecs') # not needed, stored in created
|
|
||||||
|
|
||||||
# TODO: ensure everything in 'extra' is MongoDB-ready
|
|
||||||
exc_info = data.get('exc_info')
|
|
||||||
if exc_info:
|
|
||||||
data['exc_info'] = self.formatException(exc_info)
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
class MongoHandler(logging.Handler):
|
|
||||||
def __init__(self, db, collection='logs', host='localhost', port=None,
|
|
||||||
capped_size=100000000, level=logging.NOTSET, async=True):
|
|
||||||
db = pymongo.connection.Connection(host, port)[db]
|
|
||||||
# try and create the capped log collection
|
|
||||||
if capped_size:
|
|
||||||
try:
|
|
||||||
db.create_collection(collection, capped=True, size=capped_size)
|
|
||||||
except pymongo.errors.CollectionInvalid:
|
|
||||||
pass
|
|
||||||
self.collection = db[collection]
|
|
||||||
self.async = async
|
|
||||||
logging.Handler.__init__(self, level)
|
|
||||||
self.formatter = MongoFormatter()
|
|
||||||
|
|
||||||
def emit(self, record):
|
|
||||||
# explicitly set safe=False to get async insert
|
|
||||||
# TODO: what to do if an error occurs? not safe to log-- ignore?
|
|
||||||
self.collection.save(self.format(record), safe=not self.async)
|
|
@ -1,53 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
import argparse
|
|
||||||
import traceback
|
|
||||||
import random
|
|
||||||
from celery.execute import send_task
|
|
||||||
from celery import current_app
|
|
||||||
|
|
||||||
from oyster.core import kernel
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description='do a task for all documents in a doc_class',
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument('task', type=str, help='task name to apply')
|
|
||||||
parser.add_argument('doc_class', type=str,
|
|
||||||
help='doc_class to apply function to')
|
|
||||||
parser.add_argument('--sample', action='store_true')
|
|
||||||
parser.add_argument('--immediate', action='store_true')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
docs = kernel.db.tracked.find({'doc_class': args.doc_class,
|
|
||||||
'versions': {'$ne': []}
|
|
||||||
}, timeout=False)
|
|
||||||
total = docs.count()
|
|
||||||
print '{0} docs in {1}'.format(total, args.doc_class)
|
|
||||||
|
|
||||||
if args.sample:
|
|
||||||
limit = 100
|
|
||||||
print 'sampling {0} documents'.format(limit)
|
|
||||||
docs = docs.limit(limit).skip(random.randint(0, total-limit))
|
|
||||||
args.immediate = True
|
|
||||||
|
|
||||||
errors = 0
|
|
||||||
|
|
||||||
if args.immediate:
|
|
||||||
module, name = args.task.rsplit('.', 1)
|
|
||||||
task = getattr(__import__(module, fromlist=[name]), name)
|
|
||||||
for doc in docs:
|
|
||||||
try:
|
|
||||||
task.apply((doc['_id'],), throw=True)
|
|
||||||
except Exception:
|
|
||||||
errors += 1
|
|
||||||
traceback.print_exc()
|
|
||||||
print '{0} errors in {1} documents'.format(errors, total)
|
|
||||||
|
|
||||||
else:
|
|
||||||
for doc in docs:
|
|
||||||
send_task(args.task, (doc['_id'], ))
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -1,19 +0,0 @@
|
|||||||
|
|
||||||
engines = {}
|
|
||||||
try:
|
|
||||||
from .dummy import DummyStorage
|
|
||||||
engines['dummy'] = DummyStorage
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
from .s3 import S3Storage
|
|
||||||
engines['s3'] = S3Storage
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
from .gridfs import GridFSStorage
|
|
||||||
engines['gridfs'] = GridFSStorage
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
@ -1,15 +0,0 @@
|
|||||||
class DummyStorage(object):
|
|
||||||
""" should NOT be used outside of testing """
|
|
||||||
|
|
||||||
storage_type = 'dummy'
|
|
||||||
|
|
||||||
def __init__(self, kernel):
|
|
||||||
self._storage = {}
|
|
||||||
|
|
||||||
def put(self, tracked_doc, data, content_type):
|
|
||||||
""" store the document in local dict """
|
|
||||||
self._storage[tracked_doc['_id']] = data
|
|
||||||
return tracked_doc['_id']
|
|
||||||
|
|
||||||
def get(self, id):
|
|
||||||
return self._storage[id]
|
|
@ -1,20 +0,0 @@
|
|||||||
from __future__ import absolute_import
|
|
||||||
|
|
||||||
import gridfs
|
|
||||||
|
|
||||||
|
|
||||||
class GridFSStorage(object):
|
|
||||||
storage_type = 'gridfs'
|
|
||||||
|
|
||||||
def __init__(self, kernel):
|
|
||||||
self.db = kernel.db
|
|
||||||
self._collection_name = 'fs'
|
|
||||||
self.fs = gridfs.GridFS(self.db, self._collection_name)
|
|
||||||
|
|
||||||
def put(self, tracked_doc, data, content_type):
|
|
||||||
return self.fs.put(data, filename=tracked_doc['url'],
|
|
||||||
content_type=content_type,
|
|
||||||
**tracked_doc['metadata'])
|
|
||||||
|
|
||||||
def get(self, id):
|
|
||||||
return self.fs.get(id).read()
|
|
@ -1,42 +0,0 @@
|
|||||||
import urllib
|
|
||||||
import boto
|
|
||||||
from oyster.conf import settings
|
|
||||||
|
|
||||||
|
|
||||||
class S3Storage(object):
|
|
||||||
storage_type = 's3'
|
|
||||||
|
|
||||||
def __init__(self, kernel):
|
|
||||||
self.kernel = kernel
|
|
||||||
self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET)
|
|
||||||
self._bucket = False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bucket(self):
|
|
||||||
if not self._bucket:
|
|
||||||
self._bucket = self.s3conn.get_bucket(settings.AWS_BUCKET)
|
|
||||||
return self._bucket
|
|
||||||
|
|
||||||
def _get_opt(self, doc_class, setting, default=None):
|
|
||||||
""" doc_class first, then setting, then default """
|
|
||||||
return self.kernel.doc_classes[doc_class].get(setting,
|
|
||||||
getattr(settings, setting, default))
|
|
||||||
|
|
||||||
def put(self, tracked_doc, data, content_type):
|
|
||||||
""" upload the document to S3 """
|
|
||||||
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
|
|
||||||
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
|
|
||||||
k = boto.s3.key.Key(self.bucket)
|
|
||||||
key_name = aws_prefix + tracked_doc['_id']
|
|
||||||
k.key = key_name
|
|
||||||
headers = {'x-amz-acl': 'public-read',
|
|
||||||
'Content-Type': content_type}
|
|
||||||
k.set_contents_from_string(data, headers=headers)
|
|
||||||
# can also set metadata if we want, useful?
|
|
||||||
|
|
||||||
url = 'http://%s.s3.amazonaws.com/%s' % (aws_bucket, key_name)
|
|
||||||
return url
|
|
||||||
|
|
||||||
def get(self, id):
|
|
||||||
# could use get_contents_as_string, any advantages?
|
|
||||||
return urllib.urlopen(id).read()
|
|
@ -1,7 +1,7 @@
|
|||||||
from celery.task.base import Task, PeriodicTask
|
from celery.task.base import Task, PeriodicTask
|
||||||
from celery.execute import send_task
|
from celery.execute import send_task
|
||||||
|
|
||||||
from oyster.core import kernel
|
from oyster.client import get_configured_client
|
||||||
|
|
||||||
|
|
||||||
class UpdateTask(Task):
|
class UpdateTask(Task):
|
||||||
@ -9,12 +9,17 @@ class UpdateTask(Task):
|
|||||||
# results go straight to database
|
# results go straight to database
|
||||||
ignore_result = True
|
ignore_result = True
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# one client per process
|
||||||
|
self.client = get_configured_client()
|
||||||
|
|
||||||
|
|
||||||
def run(self, doc_id):
|
def run(self, doc_id):
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
doc = self.client.db.tracked.find_one({'_id': doc_id})
|
||||||
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
|
self.client.update(doc)
|
||||||
kernel.update(doc)
|
for hook in doc.get('post_update_hooks', []):
|
||||||
# don't sit on a connection
|
send_task(hook, (doc_id,))
|
||||||
kernel.db.connection.end_request()
|
self.client.db.status.update({}, {'$inc': {'update_queue': -1}})
|
||||||
|
|
||||||
|
|
||||||
class UpdateTaskScheduler(PeriodicTask):
|
class UpdateTaskScheduler(PeriodicTask):
|
||||||
@ -22,26 +27,16 @@ class UpdateTaskScheduler(PeriodicTask):
|
|||||||
|
|
||||||
# 60s tick
|
# 60s tick
|
||||||
run_every = 60
|
run_every = 60
|
||||||
ignore_result = True
|
client = get_configured_client()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# if the update queue isn't empty, wait to add more
|
# if the update queue isn't empty, wait to add more
|
||||||
# (currently the only way we avoid duplicates)
|
# (currently the only way we avoid duplicates)
|
||||||
# alternate option would be to set a _queued flag on documents
|
# alternate option would be to set a _queued flag on documents
|
||||||
update_queue_size = kernel.db.status.find_one()['update_queue']
|
if self.client.db.status.find_one()['update_queue']:
|
||||||
if update_queue_size:
|
|
||||||
self.get_logger().debug('waiting, update_queue_size={0}'.format(
|
|
||||||
update_queue_size))
|
|
||||||
return
|
return
|
||||||
|
|
||||||
next_set = kernel.get_update_queue()
|
next_set = self.client.get_update_queue()
|
||||||
if next_set:
|
|
||||||
self.get_logger().debug('repopulating update_queue')
|
|
||||||
else:
|
|
||||||
self.get_logger().debug('kernel.update_queue empty')
|
|
||||||
|
|
||||||
for doc in next_set:
|
for doc in next_set:
|
||||||
UpdateTask.delay(doc['_id'])
|
UpdateTask.delay(doc['_id'])
|
||||||
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})
|
self.client.db.status.update({}, {'$inc': {'update_queue': 1}})
|
||||||
# don't sit on a connection
|
|
||||||
kernel.db.connection.end_request()
|
|
||||||
|
@ -2,10 +2,10 @@
|
|||||||
<html>
|
<html>
|
||||||
<head>
|
<head>
|
||||||
<title>{% block title %} {% endblock %} </title>
|
<title>{% block title %} {% endblock %} </title>
|
||||||
<link rel="stylesheet" href="{{request.script_root}}/static/blueprint/screen.css" type="text/css" media="screen, projection">
|
<link rel="stylesheet" href="/static/blueprint/screen.css" type="text/css" media="screen, projection">
|
||||||
<link rel="stylesheet" href="{{request.script_root}}/static/blueprint/print.css" type="text/css" media="print">
|
<link rel="stylesheet" href="/static/blueprint/print.css" type="text/css" media="print">
|
||||||
<!--[if lt IE 8]>
|
<!--[if lt IE 8]>
|
||||||
<link rel="stylesheet" href="{request.script_root}}/css/blueprint/ie.css" type="text/css" media="screen, projection">
|
<link rel="stylesheet" href="css/blueprint/ie.css" type="text/css" media="screen, projection">
|
||||||
<![endif]-->
|
<![endif]-->
|
||||||
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js"></script>
|
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js"></script>
|
||||||
<style type="text/css">
|
<style type="text/css">
|
||||||
|
@ -5,45 +5,13 @@ oyster
|
|||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
||||||
{% block body %}
|
{% block body %}
|
||||||
<script type="text/javascript">
|
|
||||||
var REFRESH = 2000;
|
|
||||||
var MAX_ROWS = 100;
|
|
||||||
setInterval(function() {
|
|
||||||
jQuery.getJSON('{{request.script_root}}/?json', function(data) {
|
|
||||||
jQuery('#tracking_val').text(data.tracking);
|
|
||||||
jQuery('#need_update_val').text(data.need_update);
|
|
||||||
var latest_link = jQuery('tr td a')[0]['href'].split('/tracked/')[1];
|
|
||||||
var new_rows = ''
|
|
||||||
for(var i=0; i < data.logs.length; ++i) {
|
|
||||||
if(latest_link == data.logs[i].url) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if(data.logs[i].error) {
|
|
||||||
new_rows += '<tr class="error">'
|
|
||||||
} else {
|
|
||||||
new_rows += '<tr>';
|
|
||||||
}
|
|
||||||
new_rows += '<td>' + data.logs[i].action + '</td>\n';
|
|
||||||
new_rows += '<td><a href="{{request.script_root}}/tracked/' + data.logs[i].url + '">' + data.logs[i].url + '</td>';
|
|
||||||
new_rows += '<td>' + data.logs[i].timestamp + '</td>';
|
|
||||||
if(data.logs[i].error) {
|
|
||||||
new_rows += '<td>' + data.logs[i].error + '</td></tr>';
|
|
||||||
} else {
|
|
||||||
new_rows += '<td></td></tr>';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
jQuery('tr:first').after(new_rows);
|
|
||||||
jQuery('tr:gt(' + MAX_ROWS + ')').empty()
|
|
||||||
});
|
|
||||||
}, REFRESH);
|
|
||||||
</script>
|
|
||||||
|
|
||||||
<div class="span-4">
|
<div class="span-4">
|
||||||
<h2>Stats</h2>
|
<h2>Stats</h2>
|
||||||
<dl>
|
<dl>
|
||||||
<dt>Tracking</dt><dd id="tracking_val">{{tracking}}</dd>
|
<dt>Queue Size</dt><dd>{{queue_size}}</dd>
|
||||||
<dt>Need Update</dt><dd id="need_update_val">{{need_update}}</dd>
|
<dt>Tracking</dt><dd>{{tracking}}</dd>
|
||||||
<dt>Mongo Host</dt><dd>{{mongo_host}}</dd>
|
<dt>Need Update</dt><dd>{{need_update}}</dd>
|
||||||
</dl>
|
</dl>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
<tr class="{{log.levelname.lower}}">
|
|
||||||
<td>{{log.name}}</td>
|
<tr{% if log.error %} class="error" {% endif %}>
|
||||||
<td>{{log.message}}</td>
|
<td>{{log.action}}</td>
|
||||||
<td>{{log.created.strftime("%Y-%m-%d %H:%M:%S")}}</td>
|
<td><a href="/tracked/{{log.url}}">{{log.url}}</td>
|
||||||
<td>{% if log.exc_info %}{{log.exc_info}}{% endif %}</td>
|
<td>{{log.timestamp.strftime("%Y-%m-%d %H:%M:%S")}}</td>
|
||||||
|
<td>{% if log.error %}{{log.error}}{% endif %}</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
@ -8,14 +8,14 @@ Oyster Logs
|
|||||||
|
|
||||||
<div class="span-2">
|
<div class="span-2">
|
||||||
{% if offset %}
|
{% if offset %}
|
||||||
<a class="button" href="{{request.script_root}}/log/?offset={{prev_offset}}">« Prev</a>
|
<a class="button" href="/log/?offset={{prev_offset}}">« Prev</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="span-2 prepend-14 last">
|
<div class="span-2 prepend-14 last">
|
||||||
{% if next_offset %}
|
{% if next_offset %}
|
||||||
<a class="button" href="{{request.script_root}}/log/?offset={{next_offset}}">Next »</a>
|
<a class="button" href="/log/?offset={{next_offset}}">Next »</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
196
oyster/tests/test_client.py
Normal file
196
oyster/tests/test_client.py
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
import time
|
||||||
|
import datetime
|
||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
from nose.tools import assert_raises
|
||||||
|
import pymongo
|
||||||
|
|
||||||
|
from oyster.client import Client
|
||||||
|
|
||||||
|
|
||||||
|
class ClientTests(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.client = Client(mongo_db='oyster_test', retry_wait_minutes=1/60.)
|
||||||
|
self.client._wipe()
|
||||||
|
|
||||||
|
|
||||||
|
def test_constructor(self):
|
||||||
|
c = Client('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
|
||||||
|
user_agent='test-ua', rpm=30, timeout=60,
|
||||||
|
retry_attempts=7, retry_wait_minutes=8)
|
||||||
|
assert c.db.connection.host == '127.0.0.1'
|
||||||
|
assert c.db.connection.port == 27017
|
||||||
|
assert c.db.logs.options()['capped'] == True
|
||||||
|
assert c.db.logs.options()['size'] == 5000
|
||||||
|
assert c.retry_wait_minutes == 8
|
||||||
|
# TODO: test retry_attempts
|
||||||
|
assert c.scraper.user_agent == 'test-ua'
|
||||||
|
assert c.scraper.requests_per_minute == 30
|
||||||
|
assert c.scraper.timeout == 60
|
||||||
|
|
||||||
|
def test_log(self):
|
||||||
|
self.client.log('action1', 'http://example.com')
|
||||||
|
self.client.log('action2', 'http://test.com', error=True, pi=3)
|
||||||
|
assert self.client.db.logs.count() == 2
|
||||||
|
x = self.client.db.logs.find_one({'error': True})
|
||||||
|
assert x['action'] == 'action2'
|
||||||
|
assert x['url'] == 'http://test.com'
|
||||||
|
assert x['pi'] == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_track_url(self):
|
||||||
|
# basic insert
|
||||||
|
self.client.track_url('http://example.com', update_mins=30, pi=3)
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
assert '_random' in obj
|
||||||
|
assert obj['update_mins'] == 30
|
||||||
|
assert obj['metadata'] == {'pi': 3}
|
||||||
|
|
||||||
|
# logging
|
||||||
|
log = self.client.db.logs.find_one()
|
||||||
|
assert log['action'] == 'track'
|
||||||
|
assert log['url'] == 'http://example.com'
|
||||||
|
|
||||||
|
# can't track same URL twice
|
||||||
|
assert_raises(ValueError, self.client.track_url, 'http://example.com')
|
||||||
|
|
||||||
|
# logged error
|
||||||
|
assert self.client.db.logs.find_one({'error': 'already tracked'})
|
||||||
|
|
||||||
|
|
||||||
|
def test_md5_versioning(self):
|
||||||
|
doc = {'url': 'hello.txt'}
|
||||||
|
self.client.fs.put('hello!', filename='hello.txt')
|
||||||
|
assert not self.client.md5_versioning(doc, 'hello!')
|
||||||
|
assert self.client.md5_versioning(doc, 'hey!')
|
||||||
|
|
||||||
|
|
||||||
|
def test_update(self):
|
||||||
|
# get a single document tracked
|
||||||
|
self.client.track_url('http://example.com', update_mins=60, pi=3)
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
self.client.update(obj)
|
||||||
|
|
||||||
|
# check that metadata has been updated
|
||||||
|
newobj = self.client.db.tracked.find_one()
|
||||||
|
assert (newobj['last_update'] +
|
||||||
|
datetime.timedelta(minutes=newobj['update_mins']) ==
|
||||||
|
newobj['next_update'])
|
||||||
|
first_update = newobj['last_update']
|
||||||
|
assert newobj['consecutive_errors'] == 0
|
||||||
|
|
||||||
|
# check that document exists in database
|
||||||
|
doc = self.client.fs.get_last_version()
|
||||||
|
assert doc.filename == 'http://example.com'
|
||||||
|
assert doc.content_type.startswith('text/html')
|
||||||
|
assert doc.pi == 3
|
||||||
|
|
||||||
|
# check logs
|
||||||
|
assert self.client.db.logs.find({'action': 'update'}).count() == 1
|
||||||
|
|
||||||
|
# and do an update..
|
||||||
|
self.client.update(obj)
|
||||||
|
|
||||||
|
# hopefully example.com hasn't changed, this tests that md5 worked
|
||||||
|
assert self.client.db.fs.files.count() == 1
|
||||||
|
|
||||||
|
# check that appropriate metadata updated
|
||||||
|
newobj = self.client.db.tracked.find_one()
|
||||||
|
assert first_update < newobj['last_update']
|
||||||
|
|
||||||
|
# check that logs updated
|
||||||
|
assert self.client.db.logs.find({'action': 'update'}).count() == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_failure(self):
|
||||||
|
# track a non-existent URL
|
||||||
|
self.client.track_url('http://not_a_url')
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
self.client.update(obj)
|
||||||
|
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
assert obj['consecutive_errors'] == 1
|
||||||
|
|
||||||
|
# we should have logged an error too
|
||||||
|
assert self.client.db.logs.find({'action': 'update',
|
||||||
|
'error': {'$ne': False}}).count() == 1
|
||||||
|
|
||||||
|
# update again
|
||||||
|
self.client.update(obj)
|
||||||
|
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
assert obj['consecutive_errors'] == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_versions(self):
|
||||||
|
random_url = 'http://en.wikipedia.org/wiki/Special:Random'
|
||||||
|
self.client.track_url(random_url)
|
||||||
|
obj = self.client.db.tracked.find_one()
|
||||||
|
self.client.update(obj)
|
||||||
|
|
||||||
|
versions = self.client.get_all_versions(random_url)
|
||||||
|
assert versions[0].filename == random_url
|
||||||
|
|
||||||
|
self.client.update(obj)
|
||||||
|
assert len(self.client.get_all_versions(random_url)) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_update_queue(self):
|
||||||
|
self.client.track_url('never-updates', update_mins=0.01)
|
||||||
|
self.client.track_url('fake-1', update_mins=0.01)
|
||||||
|
self.client.track_url('fake-2', update_mins=0.01)
|
||||||
|
self.client.track_url('fake-3', update_mins=0.01)
|
||||||
|
|
||||||
|
never = self.client.db.tracked.find_one(dict(url='never-updates'))
|
||||||
|
fake1 = self.client.db.tracked.find_one(dict(url='fake-1'))
|
||||||
|
fake2 = self.client.db.tracked.find_one(dict(url='fake-2'))
|
||||||
|
fake3 = self.client.db.tracked.find_one(dict(url='fake-3'))
|
||||||
|
|
||||||
|
# 4 in queue, ordered by random
|
||||||
|
queue = self.client.get_update_queue()
|
||||||
|
assert len(queue) == 4
|
||||||
|
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
|
||||||
|
|
||||||
|
# update a few
|
||||||
|
self.client.update(fake1)
|
||||||
|
self.client.update(fake2)
|
||||||
|
self.client.update(fake3)
|
||||||
|
|
||||||
|
# queue should only have never in it
|
||||||
|
queue = self.client.get_update_queue()
|
||||||
|
assert len(queue) == 1
|
||||||
|
|
||||||
|
# wait for time to pass
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# queue should be full, but start with the un-updated one
|
||||||
|
queue = self.client.get_update_queue()
|
||||||
|
assert len(queue) == 4
|
||||||
|
assert queue[0]['_id'] == never['_id']
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_update_queue_size(self):
|
||||||
|
self.client.track_url('a', update_mins=0.01)
|
||||||
|
self.client.track_url('b', update_mins=0.01)
|
||||||
|
self.client.track_url('c', update_mins=0.01)
|
||||||
|
|
||||||
|
a = self.client.db.tracked.find_one(dict(url='a'))
|
||||||
|
b = self.client.db.tracked.find_one(dict(url='b'))
|
||||||
|
c = self.client.db.tracked.find_one(dict(url='c'))
|
||||||
|
|
||||||
|
assert self.client.get_update_queue_size() == 3
|
||||||
|
|
||||||
|
self.client.update(a)
|
||||||
|
|
||||||
|
assert self.client.get_update_queue_size() == 2
|
||||||
|
|
||||||
|
self.client.update(b)
|
||||||
|
self.client.update(c)
|
||||||
|
|
||||||
|
assert self.client.get_update_queue_size() == 0
|
||||||
|
|
||||||
|
# wait
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
assert self.client.get_update_queue_size() == 3
|
@ -1,212 +0,0 @@
|
|||||||
import time
|
|
||||||
import datetime
|
|
||||||
from unittest import TestCase
|
|
||||||
|
|
||||||
from nose.tools import assert_raises, assert_equal
|
|
||||||
|
|
||||||
from oyster.core import Kernel
|
|
||||||
|
|
||||||
|
|
||||||
def hook_fired(doc, newdata):
|
|
||||||
doc['hook_fired'] = doc.get('hook_fired', 0) + 1
|
|
||||||
|
|
||||||
RANDOM_URL = ('http://www.random.org/integers/?num=1&min=-1000000000&'
|
|
||||||
'max=1000000000&col=1&base=10&format=plain&rnd=new')
|
|
||||||
|
|
||||||
|
|
||||||
class KernelTests(TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
doc_classes = {'default':
|
|
||||||
# omit doc class, defaults to dummy
|
|
||||||
{'update_mins': 30, 'onchanged': [] },
|
|
||||||
'fast-update':
|
|
||||||
{'update_mins': 1 / 60., 'storage_engine': 'dummy',
|
|
||||||
'onchanged': []
|
|
||||||
},
|
|
||||||
'one-time':
|
|
||||||
{'update_mins': None, 'storage_engine': 'dummy',
|
|
||||||
'onchanged': [],
|
|
||||||
},
|
|
||||||
'change-hook':
|
|
||||||
{'update_mins': 30, 'storage_engine': 'dummy',
|
|
||||||
'onchanged': [hook_fired]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.kernel = Kernel(mongo_db='oyster_test',
|
|
||||||
retry_wait_minutes=1 / 60.,
|
|
||||||
doc_classes=doc_classes)
|
|
||||||
self.kernel._wipe()
|
|
||||||
|
|
||||||
def test_constructor(self):
|
|
||||||
c = Kernel('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
|
|
||||||
user_agent='test-ua', rpm=30, timeout=60,
|
|
||||||
retry_attempts=7, retry_wait_minutes=8)
|
|
||||||
assert c.db.connection.host == '127.0.0.1'
|
|
||||||
assert c.db.connection.port == 27017
|
|
||||||
assert c.retry_wait_minutes == 8
|
|
||||||
# TODO: test retry_attempts
|
|
||||||
assert c.scraper.user_agent == 'test-ua'
|
|
||||||
assert c.scraper.requests_per_minute == 30
|
|
||||||
assert c.scraper.timeout == 60
|
|
||||||
|
|
||||||
# ensure that a bad document class raises an error
|
|
||||||
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
|
|
||||||
|
|
||||||
def test_track_url(self):
|
|
||||||
# basic insert
|
|
||||||
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
assert '_random' in obj
|
|
||||||
assert obj['doc_class'] == 'default'
|
|
||||||
assert obj['metadata'] == {'pi': 3}
|
|
||||||
assert obj['versions'] == []
|
|
||||||
|
|
||||||
# track same url again with same metadata returns id
|
|
||||||
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
|
||||||
assert id1 == id2
|
|
||||||
|
|
||||||
# test manually set id
|
|
||||||
out = self.kernel.track_url('http://example.com/2', 'default',
|
|
||||||
'fixed-id')
|
|
||||||
assert out == 'fixed-id'
|
|
||||||
|
|
||||||
# can't pass track same id twice with different url
|
|
||||||
assert_raises(ValueError, self.kernel.track_url,
|
|
||||||
'http://example.com/3', 'default', 'fixed-id')
|
|
||||||
|
|
||||||
# ... or different doc class
|
|
||||||
assert_raises(ValueError, self.kernel.track_url,
|
|
||||||
'http://example.com/2', 'change-hook', 'fixed-id')
|
|
||||||
|
|
||||||
# different metadata is ok, but it should be updated
|
|
||||||
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
|
|
||||||
pi=3)
|
|
||||||
self.kernel.db.tracked.find_one({'_id': 'fixed-id'})['metadata']['pi'] == 3
|
|
||||||
|
|
||||||
def test_no_update(self):
|
|
||||||
# update
|
|
||||||
self.kernel.track_url('http://example.com', 'one-time')
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
self.kernel.update(obj)
|
|
||||||
|
|
||||||
newobj = self.kernel.db.tracked.find_one()
|
|
||||||
assert newobj['next_update'] == None
|
|
||||||
|
|
||||||
assert self.kernel.get_update_queue() == []
|
|
||||||
assert self.kernel.get_update_queue_size() == 0
|
|
||||||
|
|
||||||
def test_md5_versioning(self):
|
|
||||||
assert not self.kernel.md5_versioning('hello!', 'hello!')
|
|
||||||
assert self.kernel.md5_versioning('hello!', 'hey!')
|
|
||||||
|
|
||||||
def test_update(self):
|
|
||||||
# get a single document tracked and call update on it
|
|
||||||
self.kernel.track_url('http://example.com', 'default')
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
self.kernel.update(obj)
|
|
||||||
|
|
||||||
# check that the metadata has been updated
|
|
||||||
newobj = self.kernel.db.tracked.find_one()
|
|
||||||
assert (newobj['last_update'] + datetime.timedelta(minutes=30) ==
|
|
||||||
newobj['next_update'])
|
|
||||||
first_update = newobj['last_update']
|
|
||||||
assert newobj['consecutive_errors'] == 0
|
|
||||||
|
|
||||||
assert len(newobj['versions']) == 1
|
|
||||||
|
|
||||||
# and do another update..
|
|
||||||
self.kernel.update(obj)
|
|
||||||
|
|
||||||
# hopefully example.com hasn't changed, this tests that md5 worked
|
|
||||||
assert len(newobj['versions']) == 1
|
|
||||||
|
|
||||||
# check that appropriate metadata updated
|
|
||||||
newobj = self.kernel.db.tracked.find_one()
|
|
||||||
assert first_update < newobj['last_update']
|
|
||||||
|
|
||||||
def test_update_failure(self):
|
|
||||||
# track a non-existent URL
|
|
||||||
self.kernel.track_url('http://not_a_url', 'default')
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
self.kernel.update(obj)
|
|
||||||
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
assert obj['consecutive_errors'] == 1
|
|
||||||
|
|
||||||
# update again
|
|
||||||
self.kernel.update(obj)
|
|
||||||
|
|
||||||
obj = self.kernel.db.tracked.find_one()
|
|
||||||
assert obj['consecutive_errors'] == 2
|
|
||||||
|
|
||||||
#def test_update_onchanged_fire_only_on_change(self):
|
|
||||||
# self.kernel.track_url('http://example.com', 'change-hook')
|
|
||||||
# obj = self.kernel.db.tracked.find_one()
|
|
||||||
# self.kernel.update(obj)
|
|
||||||
|
|
||||||
# doc = self.kernel.db.tracked.find_one()
|
|
||||||
# assert doc['hook_fired'] == 1
|
|
||||||
|
|
||||||
# # again, we rely on example.com not updating
|
|
||||||
# self.kernel.update(obj)
|
|
||||||
# doc = self.kernel.db.tracked.find_one()
|
|
||||||
# assert doc['hook_fired'] == 1
|
|
||||||
|
|
||||||
#def test_update_onchanged_fire_again_on_change(self):
|
|
||||||
# self.kernel.track_url(RANDOM_URL, 'change-hook')
|
|
||||||
# obj = self.kernel.db.tracked.find_one()
|
|
||||||
# self.kernel.update(obj)
|
|
||||||
|
|
||||||
# doc = self.kernel.db.tracked.find_one()
|
|
||||||
# assert doc['hook_fired'] == 1
|
|
||||||
|
|
||||||
# # we rely on this URL updating
|
|
||||||
# self.kernel.update(obj)
|
|
||||||
# doc = self.kernel.db.tracked.find_one()
|
|
||||||
# assert doc['hook_fired'] == 2
|
|
||||||
|
|
||||||
def test_get_update_queue(self):
|
|
||||||
self.kernel.track_url('never-updates', 'fast-update')
|
|
||||||
self.kernel.track_url('bad-uri', 'fast-update')
|
|
||||||
self.kernel.track_url('http://example.com', 'fast-update')
|
|
||||||
|
|
||||||
never = self.kernel.db.tracked.find_one(dict(url='never-updates'))
|
|
||||||
bad = self.kernel.db.tracked.find_one(dict(url='bad-uri'))
|
|
||||||
good = self.kernel.db.tracked.find_one(dict(url='http://example.com'))
|
|
||||||
|
|
||||||
# 3 in queue, ordered by random
|
|
||||||
queue = self.kernel.get_update_queue()
|
|
||||||
assert len(queue) == 3
|
|
||||||
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
|
|
||||||
|
|
||||||
# try and update bad & good
|
|
||||||
self.kernel.update(bad)
|
|
||||||
self.kernel.update(good)
|
|
||||||
|
|
||||||
# queue should only have never in it
|
|
||||||
queue = self.kernel.get_update_queue()
|
|
||||||
assert queue[0]['_id'] == never['_id']
|
|
||||||
|
|
||||||
# wait for time to pass so queue should be full
|
|
||||||
time.sleep(1)
|
|
||||||
queue = self.kernel.get_update_queue()
|
|
||||||
assert len(queue) == 3
|
|
||||||
|
|
||||||
def test_get_update_queue_size(self):
|
|
||||||
self.kernel.track_url('a', 'fast-update')
|
|
||||||
self.kernel.track_url('b', 'fast-update')
|
|
||||||
self.kernel.track_url('c', 'fast-update')
|
|
||||||
|
|
||||||
a = self.kernel.db.tracked.find_one(dict(url='a'))
|
|
||||||
|
|
||||||
# size should start at 3
|
|
||||||
assert self.kernel.get_update_queue_size() == 3
|
|
||||||
|
|
||||||
# goes down one
|
|
||||||
self.kernel.update(a)
|
|
||||||
assert self.kernel.get_update_queue_size() == 2
|
|
||||||
|
|
||||||
# wait for it to go back to 3
|
|
||||||
time.sleep(1)
|
|
||||||
assert self.kernel.get_update_queue_size() == 3
|
|
@ -1,53 +0,0 @@
|
|||||||
import unittest
|
|
||||||
import logging
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
import pymongo
|
|
||||||
from ..mongolog import MongoHandler
|
|
||||||
|
|
||||||
class TestMongoLog(unittest.TestCase):
|
|
||||||
|
|
||||||
DB_NAME = 'oyster_test'
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
pymongo.Connection().drop_database(self.DB_NAME)
|
|
||||||
self.log = logging.getLogger('mongotest')
|
|
||||||
self.log.setLevel(logging.DEBUG)
|
|
||||||
self.logs = pymongo.Connection()[self.DB_NAME]['logs']
|
|
||||||
# clear handlers upon each setup
|
|
||||||
self.log.handlers = []
|
|
||||||
# async = False for testing
|
|
||||||
self.log.addHandler(MongoHandler(self.DB_NAME, capped_size=4000,
|
|
||||||
async=False))
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
pymongo.Connection().drop_database(self.DB_NAME)
|
|
||||||
|
|
||||||
def test_basic_write(self):
|
|
||||||
self.log.debug('test')
|
|
||||||
self.assertEqual(self.logs.count(), 1)
|
|
||||||
self.log.debug('test')
|
|
||||||
self.assertEqual(self.logs.count(), 2)
|
|
||||||
# capped_size will limit these
|
|
||||||
self.log.debug('test'*200)
|
|
||||||
self.log.debug('test'*200)
|
|
||||||
self.assertEqual(self.logs.count(), 1)
|
|
||||||
|
|
||||||
def test_attributes(self):
|
|
||||||
self.log.debug('pi=%s', 3.14, extra={'pie':'pizza'})
|
|
||||||
logged = self.logs.find_one()
|
|
||||||
self.assertEqual(logged['message'], 'pi=3.14')
|
|
||||||
self.assertTrue(isinstance(logged['created'], datetime.datetime))
|
|
||||||
self.assertTrue('host' in logged)
|
|
||||||
self.assertEqual(logged['name'], 'mongotest')
|
|
||||||
self.assertEqual(logged['levelname'], 'DEBUG')
|
|
||||||
self.assertEqual(logged['pie'], 'pizza')
|
|
||||||
|
|
||||||
# and exc_info
|
|
||||||
try:
|
|
||||||
raise Exception('error!')
|
|
||||||
except:
|
|
||||||
self.log.warning('oh no!', exc_info=True)
|
|
||||||
logged = self.logs.find_one(sort=[('$natural', -1)])
|
|
||||||
self.assertEqual(logged['levelname'], 'WARNING')
|
|
||||||
self.assertTrue('error!' in logged['exc_info'])
|
|
@ -1,37 +0,0 @@
|
|||||||
from nose.plugins.skip import SkipTest
|
|
||||||
|
|
||||||
from oyster.conf import settings
|
|
||||||
from oyster.core import Kernel
|
|
||||||
from oyster.storage.gridfs import GridFSStorage
|
|
||||||
from oyster.storage.dummy import DummyStorage
|
|
||||||
|
|
||||||
|
|
||||||
def _simple_storage_test(StorageCls):
|
|
||||||
kernel = Kernel(mongo_db='oyster_test')
|
|
||||||
kernel.doc_classes['default'] = {}
|
|
||||||
storage = StorageCls(kernel)
|
|
||||||
|
|
||||||
# ensure the class has a storage_type attribute
|
|
||||||
assert hasattr(storage, 'storage_type')
|
|
||||||
|
|
||||||
doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test',
|
|
||||||
'doc_class': 'default', 'metadata': {} }
|
|
||||||
storage_id = storage.put(doc, 'hello oyster', 'text/plain')
|
|
||||||
assert storage_id
|
|
||||||
|
|
||||||
assert storage.get(storage_id) == 'hello oyster'
|
|
||||||
|
|
||||||
|
|
||||||
def test_s3():
|
|
||||||
if not hasattr(settings, 'AWS_BUCKET'):
|
|
||||||
raise SkipTest('S3 not configured')
|
|
||||||
from oyster.storage.s3 import S3Storage
|
|
||||||
_simple_storage_test(S3Storage)
|
|
||||||
|
|
||||||
|
|
||||||
def test_gridfs():
|
|
||||||
_simple_storage_test(GridFSStorage)
|
|
||||||
|
|
||||||
|
|
||||||
def test_dummy():
|
|
||||||
_simple_storage_test(DummyStorage)
|
|
@ -1,33 +1,23 @@
|
|||||||
import re
|
|
||||||
import json
|
import json
|
||||||
import datetime
|
import datetime
|
||||||
import functools
|
import functools
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
import bson.objectid
|
import pymongo.objectid
|
||||||
|
|
||||||
from oyster.conf import settings
|
from oyster.client import get_configured_client
|
||||||
from oyster.core import kernel
|
|
||||||
|
|
||||||
|
|
||||||
class JSONEncoder(json.JSONEncoder):
|
class JSONEncoder(json.JSONEncoder):
|
||||||
def default(self, obj):
|
def default(self, obj):
|
||||||
if isinstance(obj, datetime.datetime):
|
if isinstance(obj, datetime.datetime):
|
||||||
return obj.isoformat()
|
return obj.isoformat()
|
||||||
elif isinstance(obj, bson.objectid.ObjectId):
|
elif isinstance(obj, pymongo.objectid.ObjectId):
|
||||||
return str(obj)
|
return str(obj)
|
||||||
else:
|
else:
|
||||||
return super(JSONEncoder, self).default(obj)
|
return super(JSONEncoder, self).default(obj)
|
||||||
|
|
||||||
|
|
||||||
def _path_fixer(url):
|
|
||||||
""" this exists because werkzeug seems to collapse // into / sometimes
|
|
||||||
certainly a hack, but given that werkzeug seems to only do the mangling
|
|
||||||
*sometimes* being a bit aggressive was the only viable option
|
|
||||||
"""
|
|
||||||
return re.sub(r'(http|https|ftp):/([^/])', r'\1://\2', url)
|
|
||||||
|
|
||||||
|
|
||||||
def api_wrapper(template=None):
|
def api_wrapper(template=None):
|
||||||
def wrapper(func):
|
def wrapper(func):
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
@ -43,16 +33,26 @@ def api_wrapper(template=None):
|
|||||||
|
|
||||||
|
|
||||||
app = flask.Flask('oyster')
|
app = flask.Flask('oyster')
|
||||||
|
client = get_configured_client()
|
||||||
|
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
@api_wrapper('index.html')
|
@api_wrapper('index.html')
|
||||||
def index():
|
def index():
|
||||||
status = {
|
status = {
|
||||||
'tracking': kernel.db.tracked.count(),
|
'tracking': client.db.tracked.count(),
|
||||||
'need_update': kernel.get_update_queue_size(),
|
'need_update': client.get_update_queue_size(),
|
||||||
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(100)),
|
'logs': client.db.logs.find().sort('$natural', -1).limit(20)
|
||||||
'mongo_host': settings.MONGO_HOST,
|
}
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/status/')
|
||||||
|
@api_wrapper()
|
||||||
|
def doc_list():
|
||||||
|
status = {
|
||||||
|
'tracking': client.db.tracked.count(),
|
||||||
|
'need_update': client.get_update_queue_size(),
|
||||||
}
|
}
|
||||||
return status
|
return status
|
||||||
|
|
||||||
@ -64,7 +64,7 @@ def log_view():
|
|||||||
size = 100
|
size = 100
|
||||||
prev_offset = max(offset - size, 0)
|
prev_offset = max(offset - size, 0)
|
||||||
next_offset = offset + size
|
next_offset = offset + size
|
||||||
logs = kernel.db.logs.find().sort('$natural', -1).skip(offset).limit(size)
|
logs = client.db.logs.find().sort('$natural', -1).skip(offset).limit(size)
|
||||||
return dict(logs=list(logs), prev_offset=prev_offset,
|
return dict(logs=list(logs), prev_offset=prev_offset,
|
||||||
next_offset=next_offset, offset=offset)
|
next_offset=next_offset, offset=offset)
|
||||||
|
|
||||||
@ -72,15 +72,25 @@ def log_view():
|
|||||||
@app.route('/tracked/')
|
@app.route('/tracked/')
|
||||||
@api_wrapper()
|
@api_wrapper()
|
||||||
def tracked():
|
def tracked():
|
||||||
tracked = list(kernel.db.tracked.find())
|
tracked = list(client.db.tracked.find())
|
||||||
return json.dumps(tracked, cls=JSONEncoder)
|
return json.dumps(tracked, cls=JSONEncoder)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/tracked/<id>')
|
@app.route('/tracked/<path:url>')
|
||||||
def tracked_view(id):
|
def tracked_view(url):
|
||||||
doc = kernel.db.tracked.find_one({'_id': id})
|
doc = client.db.tracked.find_one({'url': url})
|
||||||
return json.dumps(doc, cls=JSONEncoder)
|
return json.dumps(doc, cls=JSONEncoder)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/doc/<path:url>/<version>')
|
||||||
|
def show_doc(url, version):
|
||||||
|
if version == 'latest':
|
||||||
|
version = -1
|
||||||
|
doc = client.get_version(url, version)
|
||||||
|
resp = flask.make_response(doc.read())
|
||||||
|
resp.headers['content-type'] = doc.content_type
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
app.run(debug=True)
|
app.run(debug=True)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
scrapelib
|
scrapelib
|
||||||
pymongo>=2.0
|
pymongo>=1.11
|
||||||
flask
|
flask
|
||||||
nose
|
nose
|
||||||
celery==2.5.3
|
celery
|
||||||
|
20
setup.py
20
setup.py
@ -1,28 +1,18 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import os
|
|
||||||
from setuptools import setup
|
from setuptools import setup
|
||||||
|
from oyster import __version__
|
||||||
# Hack to prevent stupid "TypeError: 'NoneType' object is not callable" error
|
|
||||||
# in multiprocessing/util.py _exit_function when running `python
|
|
||||||
# setup.py test` (see
|
|
||||||
# http://www.eby-sarna.com/pipermail/peak/2010-May/003357.html)
|
|
||||||
try:
|
|
||||||
import multiprocessing
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
long_description = open('README.rst').read()
|
long_description = open('README.rst').read()
|
||||||
|
|
||||||
setup(name="oyster",
|
setup(name="oyster",
|
||||||
version='0.4.0-dev',
|
version=__version__,
|
||||||
py_modules=['oyster'],
|
py_modules=['oyster'],
|
||||||
author="James Turk",
|
author="James Turk",
|
||||||
author_email='jturk@sunlightfoundation.com',
|
author_email='jturk@sunlightfoundation.com',
|
||||||
license="BSD",
|
license="BSD",
|
||||||
url="http://github.com/sunlightlabs/oyster/",
|
url="http://github.com/sunlightlabs/oyster/",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
description="a proactive document cache",
|
description="a library for scraping things",
|
||||||
platforms=["any"],
|
platforms=["any"],
|
||||||
classifiers=["Development Status :: 4 - Beta",
|
classifiers=["Development Status :: 4 - Beta",
|
||||||
"Intended Audience :: Developers",
|
"Intended Audience :: Developers",
|
||||||
@ -31,10 +21,8 @@ setup(name="oyster",
|
|||||||
"Operating System :: OS Independent",
|
"Operating System :: OS Independent",
|
||||||
"Programming Language :: Python",
|
"Programming Language :: Python",
|
||||||
],
|
],
|
||||||
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.7.2",
|
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.5.4",
|
||||||
"pymongo >= 1.11", "flask", "celery"],
|
"pymongo >= 1.11", "flask", "celery"],
|
||||||
tests_require=["nose"],
|
|
||||||
test_suite='nose.collector',
|
|
||||||
entry_points="""
|
entry_points="""
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
scrapeshell = scrapelib:scrapeshell
|
scrapeshell = scrapelib:scrapeshell
|
||||||
|
10
tox.ini
10
tox.ini
@ -1,10 +0,0 @@
|
|||||||
# Tox (http://codespeak.net/~hpk/tox/) is a tool for running tests
|
|
||||||
# in multiple virtualenvs. This configuration file will run the
|
|
||||||
# test suite on all supported python versions. To use it, "pip install tox"
|
|
||||||
# and then run "tox" from this directory.
|
|
||||||
|
|
||||||
[tox]
|
|
||||||
envlist = py26, py27, pypy
|
|
||||||
|
|
||||||
[testenv]
|
|
||||||
commands = python setup.py test
|
|
Loading…
Reference in New Issue
Block a user