diff --git a/oyster/core.py b/oyster/core.py index dbd20d0..99b1c2a 100644 --- a/oyster/core.py +++ b/oyster/core.py @@ -1,4 +1,5 @@ import datetime +import logging import hashlib import random import sys @@ -6,6 +7,7 @@ import sys import pymongo import scrapelib +from .mongolog import MongoHandler from .storage import engines from celery.execute import send_task @@ -23,14 +25,14 @@ class Kernel(object): configurable for ease of testing, only one should be instantiated """ - # set up a capped log if it doesn't exist + # set up the log self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db] - try: - self.db.create_collection('logs', capped=True, - size=mongo_log_maxsize) - except pymongo.errors.CollectionInvalid: - # cap collection if not capped? - pass + + self.log = logging.getLogger('oyster') + self.log.setLevel(logging.DEBUG) + self.log.addHandler(MongoHandler(mongo_db, host=mongo_host, + port=mongo_port, + capped_size=mongo_log_maxsize)) # create status document if it doesn't exist if self.db.status.count() == 0: @@ -72,14 +74,6 @@ class Kernel(object): self.db.drop_collection('logs') self.db.drop_collection('status') - def log(self, action, doc_id, error=False, **kwargs): - """ add an entry to the oyster log """ - kwargs['action'] = action - kwargs['doc_id'] = doc_id - kwargs['error'] = error - kwargs['timestamp'] = datetime.datetime.utcnow() - self.db.logs.insert(kwargs) - def _add_doc_class(self, doc_class, **properties): self.doc_classes[doc_class] = properties @@ -95,9 +89,9 @@ class Kernel(object): any keyword args will be added to the document's metadata """ if doc_class not in self.doc_classes: - error = 'unregistered doc_class %s' % doc_class - self.log('track', id, url=url, error=error) - raise ValueError(error) + error = 'error tracking %s, unregistered doc_class %s' + self.log.error(error, url, doc_class) + raise ValueError(error % (url, doc_class)) # try and find an existing version of this document tracked = None @@ -118,15 +112,14 @@ class Kernel(object): return tracked['_id'] else: # id existed but with different URL - error = ('%s already exists with different data (tracked: ' - '%s, %s) (new: %s, %s)' % (tracked['_id'], - tracked['url'], - tracked['doc_class'], - url, doc_class)) - self.log('track', id, url=url, error=error) - raise ValueError(error) + message = ('%s already exists with different data (tracked: ' + '%s, %s) (new: %s, %s)') + args = (tracked['_id'], tracked['url'], tracked['doc_class'], + url, doc_class) + self.log.error(message, *args) + raise ValueError(message % args) - self.log('track', id, url=url) + self.log.info('tracked %s [%s]', url, id) newdoc = dict(url=url, doc_class=doc_class, _random=random.randint(0, sys.maxint), @@ -209,8 +202,11 @@ class Kernel(object): else: doc['next_update'] = None - self.log('update', doc['_id'], url=url, new_doc=new_version, - error=error) + if error: + self.log.warning('error updating %s [%s]', url, doc['_id']) + else: + new_version = ' (new)' + self.log.info('updated %s [%s]%s', url, doc['_id'], new_version) self.db.tracked.save(doc, safe=True) diff --git a/oyster/ext/elasticsearch.py b/oyster/ext/elasticsearch.py index 44a7b87..ede9924 100644 --- a/oyster/ext/elasticsearch.py +++ b/oyster/ext/elasticsearch.py @@ -1,3 +1,4 @@ +import logging from celery.task.base import Task from ..core import kernel @@ -10,7 +11,7 @@ es = ES(settings.ELASTICSEARCH_HOST) class ElasticSearchPush(Task): # results go straight to elasticsearch ignore_result = True - action = 'elasticsearch' + log = logging.getLogger('oyster.ext.elasticsearch') def run(self, doc_id): doc = kernel.db.tracked.find_one({'_id': doc_id}) @@ -18,12 +19,15 @@ class ElasticSearchPush(Task): try: text = kernel.extract_text(doc) - kernel.log(self.action, doc_id, error=False) + self.log.info('tracked %s', doc_id, + extra={'doc_class':doc['doc_class']}) es.index(dict(doc['metadata'], text=text), settings.ELASTICSEARCH_INDEX, settings.ELASTICSEARCH_DOC_TYPE, id=doc_id) except Exception as e: - kernel.log(self.action, doc_id, error=True, exception=str(e)) + self.log.warning('error tracking %s', doc_id, + extra={'doc_class':doc['doc_class']}, + exc_info=True) raise diff --git a/oyster/storage/s3.py b/oyster/storage/s3.py index f114b45..414bac7 100644 --- a/oyster/storage/s3.py +++ b/oyster/storage/s3.py @@ -26,7 +26,7 @@ class S3Storage(object): """ upload the document to S3 """ aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '') aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET') - k = boto.s3.key.Key(aws_bucket) + k = boto.s3.key.Key(self.bucket) key_name = aws_prefix + tracked_doc['_id'] k.key = key_name headers = {'x-amz-acl': 'public-read', diff --git a/oyster/tests/test_kernel.py b/oyster/tests/test_kernel.py index ab5a385..631946c 100644 --- a/oyster/tests/test_kernel.py +++ b/oyster/tests/test_kernel.py @@ -2,7 +2,7 @@ import time import datetime from unittest import TestCase -from nose.tools import assert_raises +from nose.tools import assert_raises, assert_equal from oyster.core import Kernel @@ -44,8 +44,6 @@ class KernelTests(TestCase): retry_attempts=7, retry_wait_minutes=8) assert c.db.connection.host == '127.0.0.1' assert c.db.connection.port == 27017 - assert c.db.logs.options()['capped'] == True - assert c.db.logs.options()['size'] == 5000 assert c.retry_wait_minutes == 8 # TODO: test retry_attempts assert c.scraper.user_agent == 'test-ua' @@ -55,15 +53,6 @@ class KernelTests(TestCase): # ensure that a bad document class raises an error assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}}) - def test_log(self): - self.kernel.log('action1', 'example') - self.kernel.log('action2', 'test', error=True, pi=3) - assert self.kernel.db.logs.count() == 2 - x = self.kernel.db.logs.find_one({'error': True}) - assert x['action'] == 'action2' - assert x['doc_id'] == 'test' - assert x['pi'] == 3 - def test_track_url(self): # basic insert id1 = self.kernel.track_url('http://example.com', 'default', pi=3) @@ -73,11 +62,6 @@ class KernelTests(TestCase): assert obj['metadata'] == {'pi': 3} assert obj['versions'] == [] - # logging - log = self.kernel.db.logs.find_one() - assert log['action'] == 'track' - assert log['url'] == 'http://example.com' - # track same url again with same metadata returns id id2 = self.kernel.track_url('http://example.com', 'default', pi=3) assert id1 == id2 @@ -88,16 +72,12 @@ class KernelTests(TestCase): assert out == 'fixed-id' # can't pass track same id twice with different url - self.kernel.db.logs.drop() assert_raises(ValueError, self.kernel.track_url, 'http://example.com/3', 'default', 'fixed-id') - assert 'already exists' in self.kernel.db.logs.find_one()['error'] # ... or different doc class - self.kernel.db.logs.drop() assert_raises(ValueError, self.kernel.track_url, 'http://example.com/2', 'change-hook', 'fixed-id') - assert 'already exists' in self.kernel.db.logs.find_one()['error'] # different metadata is ok, but it should be updated self.kernel.track_url('http://example.com/2', 'default', 'fixed-id', @@ -135,9 +115,6 @@ class KernelTests(TestCase): assert len(newobj['versions']) == 1 - # check logs - assert self.kernel.db.logs.find({'action': 'update'}).count() == 1 - # and do another update.. self.kernel.update(obj) @@ -148,9 +125,6 @@ class KernelTests(TestCase): newobj = self.kernel.db.tracked.find_one() assert first_update < newobj['last_update'] - # check that logs updated - assert self.kernel.db.logs.find({'action': 'update'}).count() == 2 - def test_update_failure(self): # track a non-existent URL self.kernel.track_url('http://not_a_url', 'default') @@ -160,10 +134,6 @@ class KernelTests(TestCase): obj = self.kernel.db.tracked.find_one() assert obj['consecutive_errors'] == 1 - # we should have logged an error too - assert self.kernel.db.logs.find({'action': 'update', - 'error': {'$ne': False}}).count() == 1 - # update again self.kernel.update(obj)