Compare commits

...

137 Commits

Author SHA1 Message Date
James Turk
c6e93fdd48 mark as deprecated 2012-12-05 11:14:32 -05:00
James Turk
0da1b7b0a9 travis mongodb service 2012-09-12 13:30:01 -04:00
James Turk
e84d139df0 simplify settings 2012-07-11 16:52:33 -04:00
James Turk
dd5e94cf86 elasticsearch log fix 2012-07-11 14:58:00 -04:00
James Turk
ebba8174ee pin celery 2012-06-22 14:16:24 -04:00
James Turk
2677ed18b7 bringing back basic web interface 2012-05-16 15:51:55 -04:00
James Turk
6153bdaf2a return if there's no text 2012-05-16 15:12:16 -04:00
James Turk
dfb556a49f sometimes extract_text can't get text 2012-05-16 14:36:01 -04:00
James Turk
1d441a8f86 0.4.0-dev 2012-05-16 13:47:57 -04:00
James Turk
31435df760 use python logging 2012-05-16 13:38:36 -04:00
James Turk
2846b044d2 oops, that was wrong 2012-05-16 00:19:30 -04:00
James Turk
901283ecfa python 2.6 fix 2012-05-16 00:15:34 -04:00
James Turk
2f729bfdbc add mongolog 2012-05-15 23:47:45 -04:00
James Turk
4e8f1430c4 syntax error 2012-05-15 17:42:22 -04:00
James Turk
7387aab273 fix offset 2012-05-15 17:41:55 -04:00
James Turk
4172b43c0f random sampling 2012-05-15 17:40:46 -04:00
James Turk
b4de2ee0f9 print summary 2012-05-15 17:05:19 -04:00
James Turk
f14fc1cd2b --immediate 2012-05-15 16:58:50 -04:00
James Turk
c4b7597772 throw=True 2012-05-15 16:57:04 -04:00
James Turk
9182b966e3 just call task directly 2012-05-15 16:55:49 -04:00
James Turk
aba52c28a3 __import__ task for --sample 2012-05-15 16:54:33 -04:00
James Turk
66acbde1d8 add --sample to signal.py 2012-05-15 16:38:14 -04:00
James Turk
76e172da0f use right bucket for doc class 2012-05-15 15:50:36 -04:00
James Turk
cec4bdc333 new logging 2012-05-14 16:40:13 -04:00
James Turk
5e668b3b21 fix empty versions getting tracked 2012-05-10 17:11:54 -04:00
James Turk
ec9b19d77f es fixes 2012-05-10 17:10:01 -04:00
James Turk
0cddf437fb fix log message 2012-05-10 17:07:21 -04:00
James Turk
d5d77fd79b need latest scrapelib 2012-05-10 11:38:27 -04:00
James Turk
fd8e3706bc elasticsearch v1 2012-05-10 11:38:02 -04:00
James Turk
eb3c6919ac 0.3.3-dev 2012-05-10 11:37:52 -04:00
James Turk
43d4979913 ObjectId moved to bson 2012-05-10 11:33:21 -04:00
James Turk
e6ae2ad634 resolved conflict the wrong way 2012-05-10 11:16:35 -04:00
James Turk
4206539aed fix merge conflict 2012-05-10 11:13:18 -04:00
James Turk
ebc6444bea remove cloudsearch from default apps 2012-05-10 11:08:04 -04:00
James Turk
68b3fafb59 Merge pull request #3 from msabramo/setup.py-test
Enable `python setup.py test`
2012-05-10 08:06:57 -07:00
Marc Abramowitz
cfeb6dd5ac Add .tox to .gitignore 2012-05-10 07:48:05 -07:00
Marc Abramowitz
cee1103e21 Add a tox.ini 2012-05-09 23:46:30 -07:00
Marc Abramowitz
4a4dff4e96 Enable python setup.py test 2012-05-09 22:17:34 -07:00
James Turk
2804b95c4a log.url isn't a link 2012-05-10 00:46:03 -04:00
James Turk
7537f344f8 fix tracked_view 2012-05-10 00:42:15 -04:00
James Turk
8571042b05 Merge branch 'master' of github.com:sunlightlabs/oyster 2012-05-01 16:34:00 -04:00
James Turk
75dadb2579 use ids now 2012-05-01 16:33:51 -04:00
James Turk
a4c7733618 fix superfastmatch defer syntax 2012-04-18 15:23:00 -04:00
James Turk
b697641e13 import CloudSearch 2012-04-17 18:09:58 -04:00
James Turk
9fb39db5cd don't do lookup on url if they passed an id to track_url 2012-04-17 18:09:11 -04:00
James Turk
b23b830419 index on url 2012-04-17 17:57:58 -04:00
James Turk
0d5e01f051 defer analysis in sfm Push 2012-04-17 15:53:28 -04:00
James Turk
f5cd19ba94 absolute imports 2012-04-16 17:30:20 -04:00
James Turk
85d88c1c94 superfastmatch 2012-04-16 17:08:25 -04:00
James Turk
97e164e6e6 some improvements to the cloud client 2012-04-14 00:07:39 -04:00
James Turk
574d1da843 fix auto-flush behavior 2012-04-13 23:44:36 -04:00
James Turk
95a92d124c flush_every typo 2012-04-13 23:42:50 -04:00
James Turk
8510a30175 logging and flush_every on cloud search 2012-04-13 23:41:49 -04:00
James Turk
b60ee969ca change create_bucket to get, try and avoid these 409s" 2012-04-13 22:40:07 -04:00
James Turk
2fca8a23ac fix travis 2012-04-13 17:43:29 -04:00
James Turk
b164134c2a fix storage test 2012-04-13 17:41:14 -04:00
James Turk
7922ac2da7 change how onchanged events fire 2012-04-13 17:41:07 -04:00
James Turk
232cf76a60 changelog/gitignore 2012-04-13 17:23:06 -04:00
James Turk
249b4babde cloudsearch fixes 2012-04-13 16:32:45 -04:00
James Turk
2259824019 fix signal x2 2012-04-13 16:10:13 -04:00
James Turk
c9fd748534 fix signal 2012-04-13 16:09:48 -04:00
James Turk
b1f4bb1a82 signal now send_task's 2012-04-13 14:25:33 -04:00
James Turk
d118d36e6e cloudsearch settings 2012-04-13 13:41:17 -04:00
James Turk
028a145505 change how post_update_tasks fire 2012-04-13 13:37:45 -04:00
James Turk
7b096d76d0 add oyster.ext.cloudsearch 2012-04-13 13:14:57 -04:00
James Turk
24806e6ae0 kernel.extract_text 2012-04-13 13:04:36 -04:00
James Turk
081533d647 no timeout on signal find() 2012-04-11 15:57:35 -04:00
James Turk
8a98d81801 slightly better errors on signal 2012-04-10 17:48:01 -04:00
James Turk
92cf12905b save documents after process in signal script 2012-04-10 17:36:22 -04:00
James Turk
42126e46a9 bugfix for S3 storage backend 2012-03-29 23:18:32 -04:00
James Turk
d8288c7647 0.3.2 release 2012-03-29 23:10:02 -04:00
James Turk
5472e0de60 0.3.2 2012-03-29 23:05:36 -04:00
James Turk
04a8c0123c default storage engine 2012-03-29 22:59:45 -04:00
James Turk
006a50b8e3 use doc_class AWS_* settings if present 2012-03-29 22:59:30 -04:00
James Turk
7ba0d10049 Merge branch 'master' of github.com:sunlightlabs/oyster 2012-03-21 13:19:27 -04:00
James Turk
4aaae6d6e6 changelog 2012-03-21 13:00:21 -04:00
James Turk
3a4d2d968e travis ci 2012-03-21 12:50:22 -04:00
James Turk
b077e679af skip S3 test sometimes 2012-03-21 12:49:00 -04:00
James Turk
5124fe4eab change how duplicates work 2012-03-21 12:44:32 -04:00
James Turk
cca54b6447 0.3.1 2012-03-10 14:33:32 -08:00
James Turk
22800601df changelog 2012-03-10 14:32:51 -08:00
James Turk
981e2cc88f flake8 fixes: mostly whitespace, a few real bugs 2012-03-10 11:21:43 -08:00
James Turk
45a93fcc68 fix syntax error 2012-03-07 11:08:27 -05:00
James Turk
8e4c020f8f better error on duplicate 2012-03-07 11:07:30 -05:00
James Turk
8c589c1ccd add support for AWS_PREFIX 2012-03-06 18:35:43 -05:00
James Turk
05975d878a call end_request in long running threads 2012-02-27 11:54:54 -05:00
James Turk
f11b29dea3 scripts dir 2012-02-21 19:24:58 -05:00
James Turk
cdb7fda3be add get_last_version 2012-02-21 19:24:10 -05:00
James Turk
cab149e241 change onchanged to take doc and newdata 2012-02-21 18:39:04 -05:00
James Turk
cedc71bf64 one time updates 2012-02-21 17:31:39 -05:00
James Turk
2f3825d3e2 add validation of doc_class on insert 2012-02-21 17:00:21 -05:00
James Turk
211e40e211 s3 storage, stop creating bucket repeatedly 2012-02-21 16:25:51 -05:00
James Turk
9ef3a82d75 allow id to be set manually 2012-02-21 15:36:04 -05:00
James Turk
bbcd7a3018 bump version 2012-02-15 18:32:14 -05:00
James Turk
0dbdff5374 onchanged hooks 2012-02-15 18:31:37 -05:00
James Turk
5b01b088d3 delete obsolete ExternalStoreTask 2012-02-15 17:29:50 -05:00
James Turk
86ea27c62b improve how storage is handled 2012-02-15 17:09:43 -05:00
James Turk
39e812c891 dummy storage & storage tests 2012-02-15 15:01:50 -05:00
James Turk
ac5bf809da S3 storage 2012-02-14 17:24:55 -05:00
James Turk
d66ebdd74a major core refactor, shift to how storage is done 2012-02-14 16:42:41 -05:00
James Turk
d1101a2949 fix kernel update_queue task 2012-02-13 18:48:54 -05:00
James Turk
87462beaf5 requirements 2012-02-13 18:39:03 -05:00
James Turk
c65cde6bbd design braindump 2012-02-13 18:35:58 -05:00
James Turk
e73ec6ab6b switch to Kernel 2012-02-13 18:33:41 -05:00
James Turk
f6a85c119b clarify newdata in update 2012-02-13 17:43:24 -05:00
James Turk
3539e50c9d rename client to connection 2012-02-13 14:34:45 -05:00
James Turk
7526280333 tag as 0.2.5 2012-02-12 18:17:08 -05:00
James Turk
00c3fd59f4 set CELERY_CONFIG_MODULE 2011-10-06 10:32:14 -04:00
James Turk
979775ac72 pass metadata to extract_text 2011-10-05 15:57:32 -04:00
James Turk
27a338abf5 fix for extract_text usage 2011-10-05 15:55:48 -04:00
James Turk
5b71fa4aea ExternalStoreTask takes an extract_text argument 2011-10-05 15:52:16 -04:00
James Turk
e6c9cd6a06 pass a string as filedata in ExternalStoreTask 2011-10-04 12:46:32 -04:00
James Turk
d3ed585a8c CELERY_TASK_MODULES oyster setting 2011-10-03 16:01:29 -04:00
James Turk
f6282c8040 oyster_settings to .gitignore 2011-10-03 15:47:28 -04:00
James Turk
8ec1f12610 ExternalStore and S3 tasks 2011-10-03 15:47:04 -04:00
James Turk
f8f66c82f9 update ever 2s 2011-09-30 17:19:41 -04:00
James Turk
7c8363056d fix json request to use script root 2011-09-30 17:18:49 -04:00
James Turk
5247350524 front page updates as it runs 2011-09-28 17:50:03 -04:00
James Turk
51480fafcc track_url with same parameters returns original id 2011-09-28 15:50:32 -04:00
James Turk
da1a3385e1 fix failing tests that didn't work with error code 2011-09-28 15:43:53 -04:00
James Turk
7b8eda5143 Revert "restore limit arg"
This reverts commit d7de2f762c.
2011-09-27 18:11:46 -04:00
James Turk
d7de2f762c restore limit arg 2011-09-27 18:09:29 -04:00
James Turk
d1df737474 use MONGO_ settings for broker 2011-09-27 17:24:47 -04:00
James Turk
cad6ceb4f7 Revert "update tracked view to take id instead of URL"
This reverts commit f2c2e2d6f2.
2011-09-22 13:28:57 -04:00
James Turk
eefc74815e Revert "mongo wants ObjectIds"
This reverts commit 5b85dfa4e5.
2011-09-22 13:28:50 -04:00
James Turk
5b85dfa4e5 mongo wants ObjectIds 2011-09-22 12:57:18 -04:00
James Turk
f2c2e2d6f2 update tracked view to take id instead of URL 2011-09-22 12:52:01 -04:00
James Turk
4174a25595 script_root to absolute URLs 2011-09-22 10:53:33 -04:00
James Turk
84c9c105fd syntax error due to laziness 2011-09-21 18:51:19 -04:00
James Turk
d87a702bfe mongo_host on index 2011-09-21 18:49:11 -04:00
James Turk
4e860f7b44 add an index on tracked._random 2011-09-21 18:27:35 -04:00
James Turk
1047ca8927 default timeout and return tracked _id 2011-09-21 17:00:06 -04:00
James Turk
28450782ee import re first 2011-09-21 15:34:53 -04:00
James Turk
d5aac2323f _path_fixer for :// collapsing to :/ 2011-09-21 15:33:30 -04:00
James Turk
0f0a7b838f non-absolute path for media 2011-09-21 12:05:23 -04:00
James Turk
31d308f0e1 bump to 0.2.1 2011-09-20 17:15:36 -04:00
James Turk
448dee151b change description 2011-09-20 17:15:15 -04:00
35 changed files with 1127 additions and 472 deletions

3
.gitignore vendored
View File

@ -1,2 +1,5 @@
celerybeat-schedule
oyster_settings.py
oyster.egg-info/
*.pyc
.tox

10
.travis.yml Normal file
View File

@ -0,0 +1,10 @@
language: python
python:
- "2.6"
- "2.7"
install: pip install scrapelib pymongo nose celery --use-mirrors
script: nosetests
services: mongodb
notifications:
email:
- jturk@sunlightfoundation.com

View File

@ -1,27 +1,31 @@
**DEPRECATED** - this project is abandoned & will not be seeing future updates
======
oyster
======
oyster is a service for tracking regularly-accessed pages, a sort of proactive cache.
It features a daemon, a command line client for interacting with the tracking list, and a web frontend for viewing the status.
Oyster intends to provide a command line client for interacting with the list of tracked documents and web frontend for viewing the status and retrieving data. Behind the scenes it uses a celery queue to manage the documents it is tasked with keeping up to date.
oyster was created by James Turk for `Sunlight Labs <http://sunlightlabs.com>`_.
Source is available via `GitHub <http://github.com/sunlightlabs/oyster/>`_
## ADD PyPI link after release
Installation
============
oyster is available on PyPI: `oyster <http://pypi.python.org/pypi/oyster>`_.
The recommended way to install oyster is to simply ``pip install oyster``
Requirements
------------
* python 2.7
* mongodb 1.8
* pymongo 1.11
* scrapelib 0.5.5
* mongodb 2.0
* pymongo 2.0
* scrapelib 0.5+
Usage
=====

59
changelog.rst Normal file
View File

@ -0,0 +1,59 @@
oyster changelog
================
0.4.0-dev
---------
* S3 storage backend bugfix
* lots of improvements to signal script
* oyster.ext cloudsearch, elasticsearch, and superfastmatch
* use python logging w/ mongo handler
* add tox/python setup.py test (thanks Marc Abramowitz!)
0.3.2
-----
**2012-03-29**
* become much more tolerant of duplicates
* skip S3 test if not prepared
* use doc_class AWS_PREFIX and AWS_BUCKET if set
* add DEFAULT_STORAGE_ENGINE setting
0.3.1
-----
**2012-03-10**
* add validation of doc_class
* add ability to do one-time updates
* change how hooks work
* introduce concept of scripts
* call pymongo's end_request in long running threads
* better error messages for duplicate URLs
* lots of flake8-inspired fixes
* S3 backend: add support for AWS_PREFIX
0.3.0
-----
**2012-02-21**
* switch Connection to Kernel
* add concept of doc_class
* make storage pluggable instead of GridFS
* add S3 backend
* add Dummy backend
* delete obsolete ExternalStoreTask
* addition of onchanged hook
* allow id to be set manually
0.2.5
-----
**2011-10-06**
* lots of fixes to web frontend
* ExternalStoreTask
0.2.0
-----
**2011-09-20**
* major refactor: oysterd replaced by celery
* fix retries
0.1.0
-----
**2011-08-05**
* initial release, basic document tracking

34
design.txt Normal file
View File

@ -0,0 +1,34 @@
Oyster is designed as a 'proactive document cache', meaning that it takes URLs
and keeps a local copy up to date depending on user-specified criteria.
Data Model
==========
tracked - metadata for tracked resources
_id : internal id
_random : a random integer used for sorting
url : url of resource
doc_class : string indicating the document class, allows for different
settings/hooks for some documents
metadata : dictionary of extra user-specified attributes
versions : list of dictionaries with the following keys:
timestamp : UTC timestamp
<storage_key> : storage_id
(may be s3_url, gridfs_id, etc.)
logs - capped log collection
action : log entry
url : url that action was related to
error : boolean error flag
timestamp : UTC timestamp for log entry
status - internal state
update_queue : size of update queue
Storage Interface
=================
storage_key : key to store on versions
put(tracked_doc, data, content_type) -> id
get(id) -> file type object

View File

@ -1 +1,4 @@
__version__ = "0.2.0"
__version__ = "0.4.0-dev"
import os
os.environ['CELERY_CONFIG_MODULE'] = 'oyster.celeryconfig'

View File

@ -1,11 +1,3 @@
from oyster.conf import settings
CELERY_IMPORTS = ("oyster.tasks",)
BROKER_TRANSPORT = 'mongodb'
CELERY_RESULT_BACKEND = 'mongodb'
CELERY_MONGODB_BACKEND_SETTINGS = {
'host': settings.MONGO_HOST,
'port': settings.MONGO_PORT,
}
CELERY_IMPORTS = ['oyster.tasks'] + list(settings.CELERY_TASK_MODULES)

View File

@ -1,191 +0,0 @@
import datetime
import hashlib
import random
import sys
import urllib
import pymongo
import gridfs
import scrapelib
def get_configured_client():
""" helper factory, gets a client configured with oyster.conf.settings """
from oyster.conf import settings
return Client(mongo_host=settings.MONGO_HOST,
mongo_port=settings.MONGO_PORT,
mongo_db=settings.MONGO_DATABASE,
mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE,
user_agent=settings.USER_AGENT,
rpm=settings.REQUESTS_PER_MINUTE,
timeout=settings.REQUEST_TIMEOUT,
retry_attempts=settings.RETRY_ATTEMPTS,
retry_wait_minutes=settings.RETRY_WAIT_MINUTES)
class Client(object):
""" oyster's workhorse, handles tracking """
def __init__(self, mongo_host='localhost', mongo_port=27017,
mongo_db='oyster', mongo_log_maxsize=100000000,
user_agent='oyster', rpm=600, timeout=None,
retry_attempts=100, retry_wait_minutes=60):
# set up a capped log if it doesn't exist
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
try:
self.db.create_collection('logs', capped=True,
size=mongo_log_maxsize)
except pymongo.errors.CollectionInvalid:
pass
# create status document if it doesn't exist
if self.db.status.count() == 0:
self.db.status.insert({'update_queue': 0})
self._collection_name = 'fs'
self.fs = gridfs.GridFS(self.db, self._collection_name)
self.scraper = scrapelib.Scraper(user_agent=user_agent,
requests_per_minute=rpm,
follow_robots=False,
raise_errors=True,
timeout=timeout)
self.retry_attempts = retry_attempts
self.retry_wait_minutes = retry_wait_minutes
def _wipe(self):
""" exists primarily for debug use, wipes entire db """
self.db.drop_collection('tracked')
self.db.drop_collection('%s.chunks' % self._collection_name)
self.db.drop_collection('%s.files' % self._collection_name)
self.db.drop_collection('logs')
self.db.drop_collection('status')
def log(self, action, url, error=False, **kwargs):
""" add an entry to the oyster log """
kwargs['action'] = action
kwargs['url'] = url
kwargs['error'] = error
kwargs['timestamp'] = datetime.datetime.utcnow()
self.db.logs.insert(kwargs)
def track_url(self, url, versioning='md5', update_mins=60*24,
**kwargs):
"""
Add a URL to the set of tracked URLs, accessible via a given filename.
url
URL to start tracking
"""
if self.db.tracked.find_one({'url': url}):
self.log('track', url=url, error='already tracked')
raise ValueError('%s is already tracked' % url)
self.log('track', url=url)
self.db.tracked.insert(dict(url=url, versioning=versioning,
update_mins=update_mins,
_random=random.randint(0, sys.maxint),
metadata=kwargs))
def md5_versioning(self, doc, data):
""" return True if md5 changed or if file is new """
try:
old_md5 = self.fs.get_last_version(filename=doc['url']).md5
new_md5 = hashlib.md5(data).hexdigest()
return (old_md5 != new_md5)
except gridfs.NoFile:
return True
def update(self, doc):
do_put = True
error = False
# update strategies could be implemented here as well
try:
url = doc['url'].replace(' ', '%20')
data = self.scraper.urlopen(url)
content_type = data.response.headers['content-type']
except Exception as e:
do_put = False
error = str(e)
# versioning is a concept for future use, but here's how it can work:
# versioning functions take doc & data, and return True if data is
# different, since they have access to doc, they can also modify
# certain attributes as needed
if do_put:
if doc['versioning'] == 'md5':
do_put = self.md5_versioning(doc, data)
else:
raise ValueError('unknown versioning strategy "%s"' %
doc['versioning'])
if do_put:
self.fs.put(data, filename=doc['url'], content_type=content_type,
**doc['metadata'])
if error:
c_errors = doc.get('consecutive_errors', 0)
doc['consecutive_errors'] = c_errors + 1
if c_errors <= self.retry_attempts:
update_mins = self.retry_wait_minutes * (2**c_errors)
else:
update_mins = doc['update_mins']
else:
doc['consecutive_errors'] = 0
update_mins = doc['update_mins']
# last_update/next_update are separate from question of versioning
doc['last_update'] = datetime.datetime.utcnow()
doc['next_update'] = (doc['last_update'] +
datetime.timedelta(minutes=update_mins))
self.log('update', url=url, new_doc=do_put, error=error)
self.db.tracked.save(doc, safe=True)
def get_all_versions(self, url):
versions = []
n = 0
while True:
try:
versions.append(self.fs.get_version(url, n))
n += 1
except gridfs.NoFile:
break
return versions
def get_version(self, url, n=-1):
return self.fs.get_version(url, n)
def get_update_queue(self):
# results are always sorted by random to avoid piling on single server
# first we try to update anything that we've never retrieved
new = self.db.tracked.find({'next_update':
{'$exists': False}}).sort('_random')
queue = list(new)
# pull the rest from those for which next_update is in the past
next = self.db.tracked.find({'next_update':
{'$lt': datetime.datetime.utcnow()}}).sort('_random')
queue.extend(next)
return queue
def get_update_queue_size(self):
new = self.db.tracked.find({'next_update': {'$exists': False}}).count()
next = self.db.tracked.find({'next_update':
{'$lt': datetime.datetime.utcnow()}}).count()
return new+next

View File

@ -1,5 +1,6 @@
from oyster.conf import default_settings
class Settings(object):
def __init__(self):
pass

View File

@ -4,11 +4,18 @@ MONGO_PORT = 27017
MONGO_DATABASE = 'oyster'
MONGO_LOG_MAXSIZE = 100000000
# extra celery modules
CELERY_TASK_MODULES = []
# scrapelib
USER_AGENT = 'oyster'
REQUESTS_PER_MINUTE = 300
REQUESTS_PER_MINUTE = 60
REQUEST_TIMEOUT = 300
# other
RETRY_ATTEMPTS = 3
RETRY_WAIT_MINUTES = 60
DOCUMENT_CLASSES = {}
DEFAULT_STORAGE_ENGINE = 'dummy'

285
oyster/core.py Normal file
View File

@ -0,0 +1,285 @@
import datetime
import logging
import hashlib
import random
import sys
import pymongo
import scrapelib
from .mongolog import MongoHandler
from .storage import engines
from celery.execute import send_task
class Kernel(object):
""" oyster's workhorse, handles tracking """
def __init__(self, mongo_host='localhost', mongo_port=27017,
mongo_db='oyster', mongo_log_maxsize=100000000,
user_agent='oyster', rpm=60, timeout=300,
retry_attempts=3, retry_wait_minutes=60,
doc_classes=None, default_storage_engine='dummy',
):
"""
configurable for ease of testing, only one should be instantiated
"""
# set up the log
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
self.log = logging.getLogger('oyster')
self.log.setLevel(logging.DEBUG)
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host,
port=mongo_port,
capped_size=mongo_log_maxsize))
# create status document if it doesn't exist
if self.db.status.count() == 0:
self.db.status.insert({'update_queue': 0})
# ensure an index on _random
self.db.tracked.ensure_index('_random')
self.db.tracked.ensure_index('url')
self.scraper = scrapelib.Scraper(user_agent=user_agent,
requests_per_minute=rpm,
follow_robots=False,
raise_errors=True,
timeout=timeout)
self.retry_attempts = retry_attempts
self.retry_wait_minutes = retry_wait_minutes
# load engines
self.storage = {}
for name, StorageCls in engines.iteritems():
self.storage[name] = StorageCls(self)
# set document classes
_doc_class_fields = ('update_mins', 'onchanged')
self.doc_classes = doc_classes or {}
for dc_name, dc_props in self.doc_classes.iteritems():
for key in _doc_class_fields:
if key not in dc_props:
raise ValueError('doc_class %s missing key %s' %
(dc_name, key))
# set a default storage engine
if 'storage_engine' not in dc_props:
dc_props['storage_engine'] = default_storage_engine
def _wipe(self):
""" exists primarily for debug use, wipes entire db """
self.db.drop_collection('tracked')
self.db.drop_collection('logs')
self.db.drop_collection('status')
def _add_doc_class(self, doc_class, **properties):
self.doc_classes[doc_class] = properties
def track_url(self, url, doc_class, id=None, **kwargs):
"""
Add a URL to the set of tracked URLs, accessible via a given filename.
url
URL to start tracking
doc_class
document type, can be any arbitrary string
**kwargs
any keyword args will be added to the document's metadata
"""
if doc_class not in self.doc_classes:
error = 'error tracking %s, unregistered doc_class %s'
self.log.error(error, url, doc_class)
raise ValueError(error % (url, doc_class))
# try and find an existing version of this document
tracked = None
if id:
tracked = self.db.tracked.find_one({'_id': id})
else:
tracked = self.db.tracked.find_one({'url': url})
# if id exists, ensure that URL and doc_class haven't changed
# then return existing data (possibly with refreshed metadata)
if tracked:
if (tracked['url'] == url and
tracked['doc_class'] == doc_class):
if kwargs != tracked['metadata']:
tracked['metadata'] = kwargs
self.db.tracked.save(tracked, safe=True)
return tracked['_id']
else:
# id existed but with different URL
message = ('%s already exists with different data (tracked: '
'%s, %s) (new: %s, %s)')
args = (tracked['_id'], tracked['url'], tracked['doc_class'],
url, doc_class)
self.log.error(message, *args)
raise ValueError(message % args)
self.log.info('tracked %s [%s]', url, id)
newdoc = dict(url=url, doc_class=doc_class,
_random=random.randint(0, sys.maxint),
versions=[], metadata=kwargs)
if id:
newdoc['_id'] = id
return self.db.tracked.insert(newdoc, safe=True)
def md5_versioning(self, olddata, newdata):
""" return True if md5 changed or if file is new """
old_md5 = hashlib.md5(olddata).hexdigest()
new_md5 = hashlib.md5(newdata).hexdigest()
return old_md5 != new_md5
def update(self, doc):
"""
perform update upon a given document
:param:`doc` must be a document from the `tracked` collection
* download latest document
* check if document has changed using versioning func
* if a change has occurred save the file
* if error occured, log & keep track of how many errors in a row
* update last_update/next_update timestamp
"""
new_version = True
error = False
now = datetime.datetime.utcnow()
try:
doc_class = self.doc_classes[doc['doc_class']]
except KeyError:
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
update_mins = doc_class['update_mins']
storage = self.storage[doc_class['storage_engine']]
# fetch strategies could be implemented here as well
try:
url = doc['url'].replace(' ', '%20')
newdata = self.scraper.urlopen(url)
content_type = newdata.response.headers['content-type']
except Exception as e:
new_version = False
error = str(e)
# only do versioning check if at least one version exists
if new_version and doc['versions']:
# room here for different versioning schemes
olddata = storage.get(doc['versions'][-1]['storage_key'])
new_version = self.md5_versioning(olddata, newdata)
if new_version:
storage_id = storage.put(doc, newdata, content_type)
doc['versions'].append({'timestamp': now,
'storage_key': storage_id,
'storage_type': storage.storage_type,
})
# fire off onchanged functions
for onchanged in doc_class.get('onchanged', []):
send_task(onchanged, (doc['_id'],))
if error:
# if there's been an error, increment the consecutive_errors count
# and back off a bit until we've reached our retry limit
c_errors = doc.get('consecutive_errors', 0)
doc['consecutive_errors'] = c_errors + 1
if c_errors <= self.retry_attempts:
update_mins = self.retry_wait_minutes * (2 ** c_errors)
else:
# reset error count if all was ok
doc['consecutive_errors'] = 0
# last_update/next_update are separate from question of versioning
doc['last_update'] = now
if update_mins:
doc['next_update'] = now + datetime.timedelta(minutes=update_mins)
else:
doc['next_update'] = None
if error:
self.log.warning('error updating %s [%s]', url, doc['_id'])
else:
new_version = ' (new)'
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
self.db.tracked.save(doc, safe=True)
def get_update_queue(self):
"""
Get a list of what needs to be updated.
Documents that have never been updated take priority, followed by
documents that are simply stale. Within these two categories results
are sorted in semirandom order to decrease odds of piling on one
server.
"""
# results are always sorted by random to avoid piling on single server
# first we try to update anything that we've never retrieved
new = self.db.tracked.find({'next_update':
{'$exists': False}}).sort('_random')
queue = list(new)
# pull the rest from those for which next_update is in the past
next = self.db.tracked.find({'$and': [
{'next_update': {'$ne': None}},
{'next_update': {'$lt': datetime.datetime.utcnow()}},
]}).sort('_random')
queue.extend(next)
return queue
def get_update_queue_size(self):
"""
Get the size of the update queue, this should match
``len(self.get_update_queue())``, but is computed more efficiently.
"""
new = self.db.tracked.find({'next_update': {'$exists': False}}).count()
next = self.db.tracked.find({'$and': [
{'next_update': {'$ne': None}},
{'next_update': {'$lt': datetime.datetime.utcnow()}},
]}).count()
return new + next
def get_last_version(self, doc):
try:
doc_class = self.doc_classes[doc['doc_class']]
except KeyError:
raise ValueError('unregistered doc_class %s' % doc['doc_class'])
storage = self.storage[doc_class['storage_engine']]
return storage.get(doc['versions'][-1]['storage_key'])
def extract_text(self, doc):
version = self.get_last_version(doc)
doc_class = self.doc_classes[doc['doc_class']]
try:
extract_text = doc_class['extract_text']
except KeyError:
raise ValueError('doc_class %s missing extract_text' %
doc['doc_class'])
return extract_text(doc, version)
def _get_configured_kernel():
""" factory, gets a connection configured with oyster.conf.settings """
from oyster.conf import settings
return Kernel(mongo_host=settings.MONGO_HOST,
mongo_port=settings.MONGO_PORT,
mongo_db=settings.MONGO_DATABASE,
mongo_log_maxsize=settings.MONGO_LOG_MAXSIZE,
user_agent=settings.USER_AGENT,
rpm=settings.REQUESTS_PER_MINUTE,
timeout=settings.REQUEST_TIMEOUT,
retry_attempts=settings.RETRY_ATTEMPTS,
retry_wait_minutes=settings.RETRY_WAIT_MINUTES,
doc_classes=settings.DOCUMENT_CLASSES,
default_storage_engine=settings.DEFAULT_STORAGE_ENGINE,
)
kernel = _get_configured_kernel()

0
oyster/ext/__init__.py Normal file
View File

32
oyster/ext/cloudsearch.py Normal file
View File

@ -0,0 +1,32 @@
# needed so we can import cloudsearch
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from cloudsearch import CloudSearch
cs = CloudSearch(settings.CLOUDSEARCH_DOMAIN, settings.CLOUDSEARCH_ID, 20)
class CloudSearchPush(Task):
""" task that updates documents """
# results go straight to database
ignore_result = True
# a bit under 1MB
MAX_BYTES = 1048000
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
pieces = [text[i:i+self.MAX_BYTES] for i in
xrange(0, len(text), self.MAX_BYTES)]
self.get_logger().debug('adding {0} pieces for {1}'.format(
len(pieces), doc_id))
for i, piece in enumerate(pieces):
cloud_id = '%s_%s' % (doc_id.lower(), i)
cs.add_document(cloud_id, text=piece, **doc['metadata'])

View File

@ -0,0 +1,36 @@
import logging
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from pyes import ES
es = ES(settings.ELASTICSEARCH_HOST)
log = logging.getLogger('oyster.ext.elasticsearch')
class ElasticSearchPush(Task):
# results go straight to elasticsearch
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
try:
text = kernel.extract_text(doc)
if not text:
log.info('no text for %s', doc_id,
extra={'doc_class':doc['doc_class']})
return
log.info('tracked %s', doc_id,
extra={'doc_class':doc['doc_class']})
es.index(dict(doc['metadata'], text=text),
settings.ELASTICSEARCH_INDEX,
settings.ELASTICSEARCH_DOC_TYPE,
id=doc_id)
except Exception as e:
log.warning('error tracking %s', doc_id,
extra={'doc_class':doc['doc_class']}, exc_info=True)
raise

View File

@ -0,0 +1,23 @@
# needed so we can import superfastmatch.Client
from __future__ import absolute_import
from celery.task.base import Task
from ..core import kernel
from ..conf import settings
from superfastmatch import Client
sfm = Client(settings.SUPERFASTMATCH_URL)
class SuperFastMatchPush(Task):
""" task that pushes documents to SFM """
# results go straight to database
ignore_result = True
def run(self, doc_id):
doc = kernel.db.tracked.find_one({'_id': doc_id})
text = kernel.extract_text(doc)
doctype, docid = settings.SUPERFASTMATCH_ID_FUNC(doc_id)
sfm.add(doctype, docid, text, defer=True, **doc['metadata'])

54
oyster/mongolog.py Normal file
View File

@ -0,0 +1,54 @@
"""
MongoDB handler for Python Logging
inspired by https://github.com/andreisavu/mongodb-log
"""
import logging
import datetime
import socket
import pymongo
class MongoFormatter(logging.Formatter):
def format(self, record):
""" turn a LogRecord into something mongo can store """
data = record.__dict__.copy()
data.update(
# format message
message=record.getMessage(),
# overwrite created (float) w/ a mongo-compatible datetime
created=datetime.datetime.utcnow(),
host=socket.gethostname(),
args=tuple(unicode(arg) for arg in record.args)
)
data.pop('msecs') # not needed, stored in created
# TODO: ensure everything in 'extra' is MongoDB-ready
exc_info = data.get('exc_info')
if exc_info:
data['exc_info'] = self.formatException(exc_info)
return data
class MongoHandler(logging.Handler):
def __init__(self, db, collection='logs', host='localhost', port=None,
capped_size=100000000, level=logging.NOTSET, async=True):
db = pymongo.connection.Connection(host, port)[db]
# try and create the capped log collection
if capped_size:
try:
db.create_collection(collection, capped=True, size=capped_size)
except pymongo.errors.CollectionInvalid:
pass
self.collection = db[collection]
self.async = async
logging.Handler.__init__(self, level)
self.formatter = MongoFormatter()
def emit(self, record):
# explicitly set safe=False to get async insert
# TODO: what to do if an error occurs? not safe to log-- ignore?
self.collection.save(self.format(record), safe=not self.async)

View File

53
oyster/scripts/signal.py Normal file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
import argparse
import traceback
import random
from celery.execute import send_task
from celery import current_app
from oyster.core import kernel
def main():
parser = argparse.ArgumentParser(
description='do a task for all documents in a doc_class',
)
parser.add_argument('task', type=str, help='task name to apply')
parser.add_argument('doc_class', type=str,
help='doc_class to apply function to')
parser.add_argument('--sample', action='store_true')
parser.add_argument('--immediate', action='store_true')
args = parser.parse_args()
docs = kernel.db.tracked.find({'doc_class': args.doc_class,
'versions': {'$ne': []}
}, timeout=False)
total = docs.count()
print '{0} docs in {1}'.format(total, args.doc_class)
if args.sample:
limit = 100
print 'sampling {0} documents'.format(limit)
docs = docs.limit(limit).skip(random.randint(0, total-limit))
args.immediate = True
errors = 0
if args.immediate:
module, name = args.task.rsplit('.', 1)
task = getattr(__import__(module, fromlist=[name]), name)
for doc in docs:
try:
task.apply((doc['_id'],), throw=True)
except Exception:
errors += 1
traceback.print_exc()
print '{0} errors in {1} documents'.format(errors, total)
else:
for doc in docs:
send_task(args.task, (doc['_id'], ))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,19 @@
engines = {}
try:
from .dummy import DummyStorage
engines['dummy'] = DummyStorage
except ImportError:
pass
try:
from .s3 import S3Storage
engines['s3'] = S3Storage
except ImportError:
pass
try:
from .gridfs import GridFSStorage
engines['gridfs'] = GridFSStorage
except ImportError:
pass

15
oyster/storage/dummy.py Normal file
View File

@ -0,0 +1,15 @@
class DummyStorage(object):
""" should NOT be used outside of testing """
storage_type = 'dummy'
def __init__(self, kernel):
self._storage = {}
def put(self, tracked_doc, data, content_type):
""" store the document in local dict """
self._storage[tracked_doc['_id']] = data
return tracked_doc['_id']
def get(self, id):
return self._storage[id]

20
oyster/storage/gridfs.py Normal file
View File

@ -0,0 +1,20 @@
from __future__ import absolute_import
import gridfs
class GridFSStorage(object):
storage_type = 'gridfs'
def __init__(self, kernel):
self.db = kernel.db
self._collection_name = 'fs'
self.fs = gridfs.GridFS(self.db, self._collection_name)
def put(self, tracked_doc, data, content_type):
return self.fs.put(data, filename=tracked_doc['url'],
content_type=content_type,
**tracked_doc['metadata'])
def get(self, id):
return self.fs.get(id).read()

42
oyster/storage/s3.py Normal file
View File

@ -0,0 +1,42 @@
import urllib
import boto
from oyster.conf import settings
class S3Storage(object):
storage_type = 's3'
def __init__(self, kernel):
self.kernel = kernel
self.s3conn = boto.connect_s3(settings.AWS_KEY, settings.AWS_SECRET)
self._bucket = False
@property
def bucket(self):
if not self._bucket:
self._bucket = self.s3conn.get_bucket(settings.AWS_BUCKET)
return self._bucket
def _get_opt(self, doc_class, setting, default=None):
""" doc_class first, then setting, then default """
return self.kernel.doc_classes[doc_class].get(setting,
getattr(settings, setting, default))
def put(self, tracked_doc, data, content_type):
""" upload the document to S3 """
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
k = boto.s3.key.Key(self.bucket)
key_name = aws_prefix + tracked_doc['_id']
k.key = key_name
headers = {'x-amz-acl': 'public-read',
'Content-Type': content_type}
k.set_contents_from_string(data, headers=headers)
# can also set metadata if we want, useful?
url = 'http://%s.s3.amazonaws.com/%s' % (aws_bucket, key_name)
return url
def get(self, id):
# could use get_contents_as_string, any advantages?
return urllib.urlopen(id).read()

View File

@ -1,7 +1,7 @@
from celery.task.base import Task, PeriodicTask
from celery.execute import send_task
from oyster.client import get_configured_client
from oyster.core import kernel
class UpdateTask(Task):
@ -9,17 +9,12 @@ class UpdateTask(Task):
# results go straight to database
ignore_result = True
def __init__(self):
# one client per process
self.client = get_configured_client()
def run(self, doc_id):
doc = self.client.db.tracked.find_one({'_id': doc_id})
self.client.update(doc)
for hook in doc.get('post_update_hooks', []):
send_task(hook, (doc_id,))
self.client.db.status.update({}, {'$inc': {'update_queue': -1}})
doc = kernel.db.tracked.find_one({'_id': doc_id})
kernel.db.status.update({}, {'$inc': {'update_queue': -1}})
kernel.update(doc)
# don't sit on a connection
kernel.db.connection.end_request()
class UpdateTaskScheduler(PeriodicTask):
@ -27,16 +22,26 @@ class UpdateTaskScheduler(PeriodicTask):
# 60s tick
run_every = 60
client = get_configured_client()
ignore_result = True
def run(self):
# if the update queue isn't empty, wait to add more
# (currently the only way we avoid duplicates)
# alternate option would be to set a _queued flag on documents
if self.client.db.status.find_one()['update_queue']:
update_queue_size = kernel.db.status.find_one()['update_queue']
if update_queue_size:
self.get_logger().debug('waiting, update_queue_size={0}'.format(
update_queue_size))
return
next_set = self.client.get_update_queue()
next_set = kernel.get_update_queue()
if next_set:
self.get_logger().debug('repopulating update_queue')
else:
self.get_logger().debug('kernel.update_queue empty')
for doc in next_set:
UpdateTask.delay(doc['_id'])
self.client.db.status.update({}, {'$inc': {'update_queue': 1}})
kernel.db.status.update({}, {'$inc': {'update_queue': 1}})
# don't sit on a connection
kernel.db.connection.end_request()

View File

@ -2,10 +2,10 @@
<html>
<head>
<title>{% block title %} {% endblock %} </title>
<link rel="stylesheet" href="/static/blueprint/screen.css" type="text/css" media="screen, projection">
<link rel="stylesheet" href="/static/blueprint/print.css" type="text/css" media="print">
<link rel="stylesheet" href="{{request.script_root}}/static/blueprint/screen.css" type="text/css" media="screen, projection">
<link rel="stylesheet" href="{{request.script_root}}/static/blueprint/print.css" type="text/css" media="print">
<!--[if lt IE 8]>
<link rel="stylesheet" href="css/blueprint/ie.css" type="text/css" media="screen, projection">
<link rel="stylesheet" href="{request.script_root}}/css/blueprint/ie.css" type="text/css" media="screen, projection">
<![endif]-->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js"></script>
<style type="text/css">

View File

@ -5,13 +5,45 @@ oyster
{% endblock %}
{% block body %}
<script type="text/javascript">
var REFRESH = 2000;
var MAX_ROWS = 100;
setInterval(function() {
jQuery.getJSON('{{request.script_root}}/?json', function(data) {
jQuery('#tracking_val').text(data.tracking);
jQuery('#need_update_val').text(data.need_update);
var latest_link = jQuery('tr td a')[0]['href'].split('/tracked/')[1];
var new_rows = ''
for(var i=0; i < data.logs.length; ++i) {
if(latest_link == data.logs[i].url) {
break;
}
if(data.logs[i].error) {
new_rows += '<tr class="error">'
} else {
new_rows += '<tr>';
}
new_rows += '<td>' + data.logs[i].action + '</td>\n';
new_rows += '<td><a href="{{request.script_root}}/tracked/' + data.logs[i].url + '">' + data.logs[i].url + '</td>';
new_rows += '<td>' + data.logs[i].timestamp + '</td>';
if(data.logs[i].error) {
new_rows += '<td>' + data.logs[i].error + '</td></tr>';
} else {
new_rows += '<td></td></tr>';
}
}
jQuery('tr:first').after(new_rows);
jQuery('tr:gt(' + MAX_ROWS + ')').empty()
});
}, REFRESH);
</script>
<div class="span-4">
<h2>Stats</h2>
<dl>
<dt>Queue Size</dt><dd>{{queue_size}}</dd>
<dt>Tracking</dt><dd>{{tracking}}</dd>
<dt>Need Update</dt><dd>{{need_update}}</dd>
<dt>Tracking</dt><dd id="tracking_val">{{tracking}}</dd>
<dt>Need Update</dt><dd id="need_update_val">{{need_update}}</dd>
<dt>Mongo Host</dt><dd>{{mongo_host}}</dd>
</dl>
</div>

View File

@ -1,7 +1,6 @@
<tr{% if log.error %} class="error" {% endif %}>
<td>{{log.action}}</td>
<td><a href="/tracked/{{log.url}}">{{log.url}}</td>
<td>{{log.timestamp.strftime("%Y-%m-%d %H:%M:%S")}}</td>
<td>{% if log.error %}{{log.error}}{% endif %}</td>
<tr class="{{log.levelname.lower}}">
<td>{{log.name}}</td>
<td>{{log.message}}</td>
<td>{{log.created.strftime("%Y-%m-%d %H:%M:%S")}}</td>
<td>{% if log.exc_info %}{{log.exc_info}}{% endif %}</td>
</tr>

View File

@ -8,14 +8,14 @@ Oyster Logs
<div class="span-2">
{% if offset %}
<a class="button" href="/log/?offset={{prev_offset}}">&laquo; Prev</a>
<a class="button" href="{{request.script_root}}/log/?offset={{prev_offset}}">&laquo; Prev</a>
{% endif %}
&nbsp;
</div>
<div class="span-2 prepend-14 last">
{% if next_offset %}
<a class="button" href="/log/?offset={{next_offset}}">Next &raquo;</a>
<a class="button" href="{{request.script_root}}/log/?offset={{next_offset}}">Next &raquo;</a>
{% endif %}
</div>

View File

@ -1,196 +0,0 @@
import time
import datetime
from unittest import TestCase
from nose.tools import assert_raises
import pymongo
from oyster.client import Client
class ClientTests(TestCase):
def setUp(self):
self.client = Client(mongo_db='oyster_test', retry_wait_minutes=1/60.)
self.client._wipe()
def test_constructor(self):
c = Client('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
user_agent='test-ua', rpm=30, timeout=60,
retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017
assert c.db.logs.options()['capped'] == True
assert c.db.logs.options()['size'] == 5000
assert c.retry_wait_minutes == 8
# TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua'
assert c.scraper.requests_per_minute == 30
assert c.scraper.timeout == 60
def test_log(self):
self.client.log('action1', 'http://example.com')
self.client.log('action2', 'http://test.com', error=True, pi=3)
assert self.client.db.logs.count() == 2
x = self.client.db.logs.find_one({'error': True})
assert x['action'] == 'action2'
assert x['url'] == 'http://test.com'
assert x['pi'] == 3
def test_track_url(self):
# basic insert
self.client.track_url('http://example.com', update_mins=30, pi=3)
obj = self.client.db.tracked.find_one()
assert '_random' in obj
assert obj['update_mins'] == 30
assert obj['metadata'] == {'pi': 3}
# logging
log = self.client.db.logs.find_one()
assert log['action'] == 'track'
assert log['url'] == 'http://example.com'
# can't track same URL twice
assert_raises(ValueError, self.client.track_url, 'http://example.com')
# logged error
assert self.client.db.logs.find_one({'error': 'already tracked'})
def test_md5_versioning(self):
doc = {'url': 'hello.txt'}
self.client.fs.put('hello!', filename='hello.txt')
assert not self.client.md5_versioning(doc, 'hello!')
assert self.client.md5_versioning(doc, 'hey!')
def test_update(self):
# get a single document tracked
self.client.track_url('http://example.com', update_mins=60, pi=3)
obj = self.client.db.tracked.find_one()
self.client.update(obj)
# check that metadata has been updated
newobj = self.client.db.tracked.find_one()
assert (newobj['last_update'] +
datetime.timedelta(minutes=newobj['update_mins']) ==
newobj['next_update'])
first_update = newobj['last_update']
assert newobj['consecutive_errors'] == 0
# check that document exists in database
doc = self.client.fs.get_last_version()
assert doc.filename == 'http://example.com'
assert doc.content_type.startswith('text/html')
assert doc.pi == 3
# check logs
assert self.client.db.logs.find({'action': 'update'}).count() == 1
# and do an update..
self.client.update(obj)
# hopefully example.com hasn't changed, this tests that md5 worked
assert self.client.db.fs.files.count() == 1
# check that appropriate metadata updated
newobj = self.client.db.tracked.find_one()
assert first_update < newobj['last_update']
# check that logs updated
assert self.client.db.logs.find({'action': 'update'}).count() == 2
def test_update_failure(self):
# track a non-existent URL
self.client.track_url('http://not_a_url')
obj = self.client.db.tracked.find_one()
self.client.update(obj)
obj = self.client.db.tracked.find_one()
assert obj['consecutive_errors'] == 1
# we should have logged an error too
assert self.client.db.logs.find({'action': 'update',
'error': {'$ne': False}}).count() == 1
# update again
self.client.update(obj)
obj = self.client.db.tracked.find_one()
assert obj['consecutive_errors'] == 2
def test_all_versions(self):
random_url = 'http://en.wikipedia.org/wiki/Special:Random'
self.client.track_url(random_url)
obj = self.client.db.tracked.find_one()
self.client.update(obj)
versions = self.client.get_all_versions(random_url)
assert versions[0].filename == random_url
self.client.update(obj)
assert len(self.client.get_all_versions(random_url)) == 2
def test_get_update_queue(self):
self.client.track_url('never-updates', update_mins=0.01)
self.client.track_url('fake-1', update_mins=0.01)
self.client.track_url('fake-2', update_mins=0.01)
self.client.track_url('fake-3', update_mins=0.01)
never = self.client.db.tracked.find_one(dict(url='never-updates'))
fake1 = self.client.db.tracked.find_one(dict(url='fake-1'))
fake2 = self.client.db.tracked.find_one(dict(url='fake-2'))
fake3 = self.client.db.tracked.find_one(dict(url='fake-3'))
# 4 in queue, ordered by random
queue = self.client.get_update_queue()
assert len(queue) == 4
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
# update a few
self.client.update(fake1)
self.client.update(fake2)
self.client.update(fake3)
# queue should only have never in it
queue = self.client.get_update_queue()
assert len(queue) == 1
# wait for time to pass
time.sleep(1)
# queue should be full, but start with the un-updated one
queue = self.client.get_update_queue()
assert len(queue) == 4
assert queue[0]['_id'] == never['_id']
def test_get_update_queue_size(self):
self.client.track_url('a', update_mins=0.01)
self.client.track_url('b', update_mins=0.01)
self.client.track_url('c', update_mins=0.01)
a = self.client.db.tracked.find_one(dict(url='a'))
b = self.client.db.tracked.find_one(dict(url='b'))
c = self.client.db.tracked.find_one(dict(url='c'))
assert self.client.get_update_queue_size() == 3
self.client.update(a)
assert self.client.get_update_queue_size() == 2
self.client.update(b)
self.client.update(c)
assert self.client.get_update_queue_size() == 0
# wait
time.sleep(1)
assert self.client.get_update_queue_size() == 3

212
oyster/tests/test_kernel.py Normal file
View File

@ -0,0 +1,212 @@
import time
import datetime
from unittest import TestCase
from nose.tools import assert_raises, assert_equal
from oyster.core import Kernel
def hook_fired(doc, newdata):
doc['hook_fired'] = doc.get('hook_fired', 0) + 1
RANDOM_URL = ('http://www.random.org/integers/?num=1&min=-1000000000&'
'max=1000000000&col=1&base=10&format=plain&rnd=new')
class KernelTests(TestCase):
def setUp(self):
doc_classes = {'default':
# omit doc class, defaults to dummy
{'update_mins': 30, 'onchanged': [] },
'fast-update':
{'update_mins': 1 / 60., 'storage_engine': 'dummy',
'onchanged': []
},
'one-time':
{'update_mins': None, 'storage_engine': 'dummy',
'onchanged': [],
},
'change-hook':
{'update_mins': 30, 'storage_engine': 'dummy',
'onchanged': [hook_fired]
}
}
self.kernel = Kernel(mongo_db='oyster_test',
retry_wait_minutes=1 / 60.,
doc_classes=doc_classes)
self.kernel._wipe()
def test_constructor(self):
c = Kernel('127.0.0.1', 27017, 'testdb', mongo_log_maxsize=5000,
user_agent='test-ua', rpm=30, timeout=60,
retry_attempts=7, retry_wait_minutes=8)
assert c.db.connection.host == '127.0.0.1'
assert c.db.connection.port == 27017
assert c.retry_wait_minutes == 8
# TODO: test retry_attempts
assert c.scraper.user_agent == 'test-ua'
assert c.scraper.requests_per_minute == 30
assert c.scraper.timeout == 60
# ensure that a bad document class raises an error
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
def test_track_url(self):
# basic insert
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
obj = self.kernel.db.tracked.find_one()
assert '_random' in obj
assert obj['doc_class'] == 'default'
assert obj['metadata'] == {'pi': 3}
assert obj['versions'] == []
# track same url again with same metadata returns id
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
assert id1 == id2
# test manually set id
out = self.kernel.track_url('http://example.com/2', 'default',
'fixed-id')
assert out == 'fixed-id'
# can't pass track same id twice with different url
assert_raises(ValueError, self.kernel.track_url,
'http://example.com/3', 'default', 'fixed-id')
# ... or different doc class
assert_raises(ValueError, self.kernel.track_url,
'http://example.com/2', 'change-hook', 'fixed-id')
# different metadata is ok, but it should be updated
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
pi=3)
self.kernel.db.tracked.find_one({'_id': 'fixed-id'})['metadata']['pi'] == 3
def test_no_update(self):
# update
self.kernel.track_url('http://example.com', 'one-time')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
newobj = self.kernel.db.tracked.find_one()
assert newobj['next_update'] == None
assert self.kernel.get_update_queue() == []
assert self.kernel.get_update_queue_size() == 0
def test_md5_versioning(self):
assert not self.kernel.md5_versioning('hello!', 'hello!')
assert self.kernel.md5_versioning('hello!', 'hey!')
def test_update(self):
# get a single document tracked and call update on it
self.kernel.track_url('http://example.com', 'default')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
# check that the metadata has been updated
newobj = self.kernel.db.tracked.find_one()
assert (newobj['last_update'] + datetime.timedelta(minutes=30) ==
newobj['next_update'])
first_update = newobj['last_update']
assert newobj['consecutive_errors'] == 0
assert len(newobj['versions']) == 1
# and do another update..
self.kernel.update(obj)
# hopefully example.com hasn't changed, this tests that md5 worked
assert len(newobj['versions']) == 1
# check that appropriate metadata updated
newobj = self.kernel.db.tracked.find_one()
assert first_update < newobj['last_update']
def test_update_failure(self):
# track a non-existent URL
self.kernel.track_url('http://not_a_url', 'default')
obj = self.kernel.db.tracked.find_one()
self.kernel.update(obj)
obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 1
# update again
self.kernel.update(obj)
obj = self.kernel.db.tracked.find_one()
assert obj['consecutive_errors'] == 2
#def test_update_onchanged_fire_only_on_change(self):
# self.kernel.track_url('http://example.com', 'change-hook')
# obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
# # again, we rely on example.com not updating
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
#def test_update_onchanged_fire_again_on_change(self):
# self.kernel.track_url(RANDOM_URL, 'change-hook')
# obj = self.kernel.db.tracked.find_one()
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 1
# # we rely on this URL updating
# self.kernel.update(obj)
# doc = self.kernel.db.tracked.find_one()
# assert doc['hook_fired'] == 2
def test_get_update_queue(self):
self.kernel.track_url('never-updates', 'fast-update')
self.kernel.track_url('bad-uri', 'fast-update')
self.kernel.track_url('http://example.com', 'fast-update')
never = self.kernel.db.tracked.find_one(dict(url='never-updates'))
bad = self.kernel.db.tracked.find_one(dict(url='bad-uri'))
good = self.kernel.db.tracked.find_one(dict(url='http://example.com'))
# 3 in queue, ordered by random
queue = self.kernel.get_update_queue()
assert len(queue) == 3
assert queue[0]['_random'] < queue[1]['_random'] < queue[2]['_random']
# try and update bad & good
self.kernel.update(bad)
self.kernel.update(good)
# queue should only have never in it
queue = self.kernel.get_update_queue()
assert queue[0]['_id'] == never['_id']
# wait for time to pass so queue should be full
time.sleep(1)
queue = self.kernel.get_update_queue()
assert len(queue) == 3
def test_get_update_queue_size(self):
self.kernel.track_url('a', 'fast-update')
self.kernel.track_url('b', 'fast-update')
self.kernel.track_url('c', 'fast-update')
a = self.kernel.db.tracked.find_one(dict(url='a'))
# size should start at 3
assert self.kernel.get_update_queue_size() == 3
# goes down one
self.kernel.update(a)
assert self.kernel.get_update_queue_size() == 2
# wait for it to go back to 3
time.sleep(1)
assert self.kernel.get_update_queue_size() == 3

View File

@ -0,0 +1,53 @@
import unittest
import logging
import datetime
import pymongo
from ..mongolog import MongoHandler
class TestMongoLog(unittest.TestCase):
DB_NAME = 'oyster_test'
def setUp(self):
pymongo.Connection().drop_database(self.DB_NAME)
self.log = logging.getLogger('mongotest')
self.log.setLevel(logging.DEBUG)
self.logs = pymongo.Connection()[self.DB_NAME]['logs']
# clear handlers upon each setup
self.log.handlers = []
# async = False for testing
self.log.addHandler(MongoHandler(self.DB_NAME, capped_size=4000,
async=False))
def tearDown(self):
pymongo.Connection().drop_database(self.DB_NAME)
def test_basic_write(self):
self.log.debug('test')
self.assertEqual(self.logs.count(), 1)
self.log.debug('test')
self.assertEqual(self.logs.count(), 2)
# capped_size will limit these
self.log.debug('test'*200)
self.log.debug('test'*200)
self.assertEqual(self.logs.count(), 1)
def test_attributes(self):
self.log.debug('pi=%s', 3.14, extra={'pie':'pizza'})
logged = self.logs.find_one()
self.assertEqual(logged['message'], 'pi=3.14')
self.assertTrue(isinstance(logged['created'], datetime.datetime))
self.assertTrue('host' in logged)
self.assertEqual(logged['name'], 'mongotest')
self.assertEqual(logged['levelname'], 'DEBUG')
self.assertEqual(logged['pie'], 'pizza')
# and exc_info
try:
raise Exception('error!')
except:
self.log.warning('oh no!', exc_info=True)
logged = self.logs.find_one(sort=[('$natural', -1)])
self.assertEqual(logged['levelname'], 'WARNING')
self.assertTrue('error!' in logged['exc_info'])

View File

@ -0,0 +1,37 @@
from nose.plugins.skip import SkipTest
from oyster.conf import settings
from oyster.core import Kernel
from oyster.storage.gridfs import GridFSStorage
from oyster.storage.dummy import DummyStorage
def _simple_storage_test(StorageCls):
kernel = Kernel(mongo_db='oyster_test')
kernel.doc_classes['default'] = {}
storage = StorageCls(kernel)
# ensure the class has a storage_type attribute
assert hasattr(storage, 'storage_type')
doc = {'_id': 'aabbccddeeff', 'url': 'http://localhost:8000/#test',
'doc_class': 'default', 'metadata': {} }
storage_id = storage.put(doc, 'hello oyster', 'text/plain')
assert storage_id
assert storage.get(storage_id) == 'hello oyster'
def test_s3():
if not hasattr(settings, 'AWS_BUCKET'):
raise SkipTest('S3 not configured')
from oyster.storage.s3 import S3Storage
_simple_storage_test(S3Storage)
def test_gridfs():
_simple_storage_test(GridFSStorage)
def test_dummy():
_simple_storage_test(DummyStorage)

View File

@ -1,23 +1,33 @@
import re
import json
import datetime
import functools
import flask
import pymongo.objectid
import bson.objectid
from oyster.client import get_configured_client
from oyster.conf import settings
from oyster.core import kernel
class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, pymongo.objectid.ObjectId):
elif isinstance(obj, bson.objectid.ObjectId):
return str(obj)
else:
return super(JSONEncoder, self).default(obj)
def _path_fixer(url):
""" this exists because werkzeug seems to collapse // into / sometimes
certainly a hack, but given that werkzeug seems to only do the mangling
*sometimes* being a bit aggressive was the only viable option
"""
return re.sub(r'(http|https|ftp):/([^/])', r'\1://\2', url)
def api_wrapper(template=None):
def wrapper(func):
@functools.wraps(func)
@ -33,26 +43,16 @@ def api_wrapper(template=None):
app = flask.Flask('oyster')
client = get_configured_client()
@app.route('/')
@api_wrapper('index.html')
def index():
status = {
'tracking': client.db.tracked.count(),
'need_update': client.get_update_queue_size(),
'logs': client.db.logs.find().sort('$natural', -1).limit(20)
}
return status
@app.route('/status/')
@api_wrapper()
def doc_list():
status = {
'tracking': client.db.tracked.count(),
'need_update': client.get_update_queue_size(),
'tracking': kernel.db.tracked.count(),
'need_update': kernel.get_update_queue_size(),
'logs': list(kernel.db.logs.find().sort('$natural', -1).limit(100)),
'mongo_host': settings.MONGO_HOST,
}
return status
@ -64,7 +64,7 @@ def log_view():
size = 100
prev_offset = max(offset - size, 0)
next_offset = offset + size
logs = client.db.logs.find().sort('$natural', -1).skip(offset).limit(size)
logs = kernel.db.logs.find().sort('$natural', -1).skip(offset).limit(size)
return dict(logs=list(logs), prev_offset=prev_offset,
next_offset=next_offset, offset=offset)
@ -72,25 +72,15 @@ def log_view():
@app.route('/tracked/')
@api_wrapper()
def tracked():
tracked = list(client.db.tracked.find())
tracked = list(kernel.db.tracked.find())
return json.dumps(tracked, cls=JSONEncoder)
@app.route('/tracked/<path:url>')
def tracked_view(url):
doc = client.db.tracked.find_one({'url': url})
@app.route('/tracked/<id>')
def tracked_view(id):
doc = kernel.db.tracked.find_one({'_id': id})
return json.dumps(doc, cls=JSONEncoder)
@app.route('/doc/<path:url>/<version>')
def show_doc(url, version):
if version == 'latest':
version = -1
doc = client.get_version(url, version)
resp = flask.make_response(doc.read())
resp.headers['content-type'] = doc.content_type
return resp
if __name__ == '__main__':
app.run(debug=True)

View File

@ -1,5 +1,5 @@
scrapelib
pymongo>=1.11
pymongo>=2.0
flask
nose
celery
celery==2.5.3

View File

@ -1,18 +1,28 @@
#!/usr/bin/env python
import os
from setuptools import setup
from oyster import __version__
# Hack to prevent stupid "TypeError: 'NoneType' object is not callable" error
# in multiprocessing/util.py _exit_function when running `python
# setup.py test` (see
# http://www.eby-sarna.com/pipermail/peak/2010-May/003357.html)
try:
import multiprocessing
except ImportError:
pass
long_description = open('README.rst').read()
setup(name="oyster",
version=__version__,
version='0.4.0-dev',
py_modules=['oyster'],
author="James Turk",
author_email='jturk@sunlightfoundation.com',
license="BSD",
url="http://github.com/sunlightlabs/oyster/",
long_description=long_description,
description="a library for scraping things",
description="a proactive document cache",
platforms=["any"],
classifiers=["Development Status :: 4 - Beta",
"Intended Audience :: Developers",
@ -21,8 +31,10 @@ setup(name="oyster",
"Operating System :: OS Independent",
"Programming Language :: Python",
],
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.5.4",
install_requires=["httplib2 >= 0.6.0", "scrapelib >= 0.7.2",
"pymongo >= 1.11", "flask", "celery"],
tests_require=["nose"],
test_suite='nose.collector',
entry_points="""
[console_scripts]
scrapeshell = scrapelib:scrapeshell

10
tox.ini Normal file
View File

@ -0,0 +1,10 @@
# Tox (http://codespeak.net/~hpk/tox/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py26, py27, pypy
[testenv]
commands = python setup.py test