use python logging
This commit is contained in:
parent
2846b044d2
commit
31435df760
@ -1,4 +1,5 @@
|
|||||||
import datetime
|
import datetime
|
||||||
|
import logging
|
||||||
import hashlib
|
import hashlib
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
@ -6,6 +7,7 @@ import sys
|
|||||||
import pymongo
|
import pymongo
|
||||||
import scrapelib
|
import scrapelib
|
||||||
|
|
||||||
|
from .mongolog import MongoHandler
|
||||||
from .storage import engines
|
from .storage import engines
|
||||||
from celery.execute import send_task
|
from celery.execute import send_task
|
||||||
|
|
||||||
@ -23,14 +25,14 @@ class Kernel(object):
|
|||||||
configurable for ease of testing, only one should be instantiated
|
configurable for ease of testing, only one should be instantiated
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# set up a capped log if it doesn't exist
|
# set up the log
|
||||||
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
|
self.db = pymongo.Connection(mongo_host, mongo_port)[mongo_db]
|
||||||
try:
|
|
||||||
self.db.create_collection('logs', capped=True,
|
self.log = logging.getLogger('oyster')
|
||||||
size=mongo_log_maxsize)
|
self.log.setLevel(logging.DEBUG)
|
||||||
except pymongo.errors.CollectionInvalid:
|
self.log.addHandler(MongoHandler(mongo_db, host=mongo_host,
|
||||||
# cap collection if not capped?
|
port=mongo_port,
|
||||||
pass
|
capped_size=mongo_log_maxsize))
|
||||||
|
|
||||||
# create status document if it doesn't exist
|
# create status document if it doesn't exist
|
||||||
if self.db.status.count() == 0:
|
if self.db.status.count() == 0:
|
||||||
@ -72,14 +74,6 @@ class Kernel(object):
|
|||||||
self.db.drop_collection('logs')
|
self.db.drop_collection('logs')
|
||||||
self.db.drop_collection('status')
|
self.db.drop_collection('status')
|
||||||
|
|
||||||
def log(self, action, doc_id, error=False, **kwargs):
|
|
||||||
""" add an entry to the oyster log """
|
|
||||||
kwargs['action'] = action
|
|
||||||
kwargs['doc_id'] = doc_id
|
|
||||||
kwargs['error'] = error
|
|
||||||
kwargs['timestamp'] = datetime.datetime.utcnow()
|
|
||||||
self.db.logs.insert(kwargs)
|
|
||||||
|
|
||||||
def _add_doc_class(self, doc_class, **properties):
|
def _add_doc_class(self, doc_class, **properties):
|
||||||
self.doc_classes[doc_class] = properties
|
self.doc_classes[doc_class] = properties
|
||||||
|
|
||||||
@ -95,9 +89,9 @@ class Kernel(object):
|
|||||||
any keyword args will be added to the document's metadata
|
any keyword args will be added to the document's metadata
|
||||||
"""
|
"""
|
||||||
if doc_class not in self.doc_classes:
|
if doc_class not in self.doc_classes:
|
||||||
error = 'unregistered doc_class %s' % doc_class
|
error = 'error tracking %s, unregistered doc_class %s'
|
||||||
self.log('track', id, url=url, error=error)
|
self.log.error(error, url, doc_class)
|
||||||
raise ValueError(error)
|
raise ValueError(error % (url, doc_class))
|
||||||
|
|
||||||
# try and find an existing version of this document
|
# try and find an existing version of this document
|
||||||
tracked = None
|
tracked = None
|
||||||
@ -118,15 +112,14 @@ class Kernel(object):
|
|||||||
return tracked['_id']
|
return tracked['_id']
|
||||||
else:
|
else:
|
||||||
# id existed but with different URL
|
# id existed but with different URL
|
||||||
error = ('%s already exists with different data (tracked: '
|
message = ('%s already exists with different data (tracked: '
|
||||||
'%s, %s) (new: %s, %s)' % (tracked['_id'],
|
'%s, %s) (new: %s, %s)')
|
||||||
tracked['url'],
|
args = (tracked['_id'], tracked['url'], tracked['doc_class'],
|
||||||
tracked['doc_class'],
|
url, doc_class)
|
||||||
url, doc_class))
|
self.log.error(message, *args)
|
||||||
self.log('track', id, url=url, error=error)
|
raise ValueError(message % args)
|
||||||
raise ValueError(error)
|
|
||||||
|
|
||||||
self.log('track', id, url=url)
|
self.log.info('tracked %s [%s]', url, id)
|
||||||
|
|
||||||
newdoc = dict(url=url, doc_class=doc_class,
|
newdoc = dict(url=url, doc_class=doc_class,
|
||||||
_random=random.randint(0, sys.maxint),
|
_random=random.randint(0, sys.maxint),
|
||||||
@ -209,8 +202,11 @@ class Kernel(object):
|
|||||||
else:
|
else:
|
||||||
doc['next_update'] = None
|
doc['next_update'] = None
|
||||||
|
|
||||||
self.log('update', doc['_id'], url=url, new_doc=new_version,
|
if error:
|
||||||
error=error)
|
self.log.warning('error updating %s [%s]', url, doc['_id'])
|
||||||
|
else:
|
||||||
|
new_version = ' (new)'
|
||||||
|
self.log.info('updated %s [%s]%s', url, doc['_id'], new_version)
|
||||||
|
|
||||||
self.db.tracked.save(doc, safe=True)
|
self.db.tracked.save(doc, safe=True)
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import logging
|
||||||
from celery.task.base import Task
|
from celery.task.base import Task
|
||||||
|
|
||||||
from ..core import kernel
|
from ..core import kernel
|
||||||
@ -10,7 +11,7 @@ es = ES(settings.ELASTICSEARCH_HOST)
|
|||||||
class ElasticSearchPush(Task):
|
class ElasticSearchPush(Task):
|
||||||
# results go straight to elasticsearch
|
# results go straight to elasticsearch
|
||||||
ignore_result = True
|
ignore_result = True
|
||||||
action = 'elasticsearch'
|
log = logging.getLogger('oyster.ext.elasticsearch')
|
||||||
|
|
||||||
def run(self, doc_id):
|
def run(self, doc_id):
|
||||||
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
doc = kernel.db.tracked.find_one({'_id': doc_id})
|
||||||
@ -18,12 +19,15 @@ class ElasticSearchPush(Task):
|
|||||||
try:
|
try:
|
||||||
text = kernel.extract_text(doc)
|
text = kernel.extract_text(doc)
|
||||||
|
|
||||||
kernel.log(self.action, doc_id, error=False)
|
self.log.info('tracked %s', doc_id,
|
||||||
|
extra={'doc_class':doc['doc_class']})
|
||||||
|
|
||||||
es.index(dict(doc['metadata'], text=text),
|
es.index(dict(doc['metadata'], text=text),
|
||||||
settings.ELASTICSEARCH_INDEX,
|
settings.ELASTICSEARCH_INDEX,
|
||||||
settings.ELASTICSEARCH_DOC_TYPE,
|
settings.ELASTICSEARCH_DOC_TYPE,
|
||||||
id=doc_id)
|
id=doc_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
kernel.log(self.action, doc_id, error=True, exception=str(e))
|
self.log.warning('error tracking %s', doc_id,
|
||||||
|
extra={'doc_class':doc['doc_class']},
|
||||||
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
@ -26,7 +26,7 @@ class S3Storage(object):
|
|||||||
""" upload the document to S3 """
|
""" upload the document to S3 """
|
||||||
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
|
aws_prefix = self._get_opt(tracked_doc['doc_class'], 'AWS_PREFIX', '')
|
||||||
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
|
aws_bucket = self._get_opt(tracked_doc['doc_class'], 'AWS_BUCKET')
|
||||||
k = boto.s3.key.Key(aws_bucket)
|
k = boto.s3.key.Key(self.bucket)
|
||||||
key_name = aws_prefix + tracked_doc['_id']
|
key_name = aws_prefix + tracked_doc['_id']
|
||||||
k.key = key_name
|
k.key = key_name
|
||||||
headers = {'x-amz-acl': 'public-read',
|
headers = {'x-amz-acl': 'public-read',
|
||||||
|
@ -2,7 +2,7 @@ import time
|
|||||||
import datetime
|
import datetime
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
|
|
||||||
from nose.tools import assert_raises
|
from nose.tools import assert_raises, assert_equal
|
||||||
|
|
||||||
from oyster.core import Kernel
|
from oyster.core import Kernel
|
||||||
|
|
||||||
@ -44,8 +44,6 @@ class KernelTests(TestCase):
|
|||||||
retry_attempts=7, retry_wait_minutes=8)
|
retry_attempts=7, retry_wait_minutes=8)
|
||||||
assert c.db.connection.host == '127.0.0.1'
|
assert c.db.connection.host == '127.0.0.1'
|
||||||
assert c.db.connection.port == 27017
|
assert c.db.connection.port == 27017
|
||||||
assert c.db.logs.options()['capped'] == True
|
|
||||||
assert c.db.logs.options()['size'] == 5000
|
|
||||||
assert c.retry_wait_minutes == 8
|
assert c.retry_wait_minutes == 8
|
||||||
# TODO: test retry_attempts
|
# TODO: test retry_attempts
|
||||||
assert c.scraper.user_agent == 'test-ua'
|
assert c.scraper.user_agent == 'test-ua'
|
||||||
@ -55,15 +53,6 @@ class KernelTests(TestCase):
|
|||||||
# ensure that a bad document class raises an error
|
# ensure that a bad document class raises an error
|
||||||
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
|
assert_raises(ValueError, Kernel, doc_classes={'bad-doc': {}})
|
||||||
|
|
||||||
def test_log(self):
|
|
||||||
self.kernel.log('action1', 'example')
|
|
||||||
self.kernel.log('action2', 'test', error=True, pi=3)
|
|
||||||
assert self.kernel.db.logs.count() == 2
|
|
||||||
x = self.kernel.db.logs.find_one({'error': True})
|
|
||||||
assert x['action'] == 'action2'
|
|
||||||
assert x['doc_id'] == 'test'
|
|
||||||
assert x['pi'] == 3
|
|
||||||
|
|
||||||
def test_track_url(self):
|
def test_track_url(self):
|
||||||
# basic insert
|
# basic insert
|
||||||
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
id1 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
||||||
@ -73,11 +62,6 @@ class KernelTests(TestCase):
|
|||||||
assert obj['metadata'] == {'pi': 3}
|
assert obj['metadata'] == {'pi': 3}
|
||||||
assert obj['versions'] == []
|
assert obj['versions'] == []
|
||||||
|
|
||||||
# logging
|
|
||||||
log = self.kernel.db.logs.find_one()
|
|
||||||
assert log['action'] == 'track'
|
|
||||||
assert log['url'] == 'http://example.com'
|
|
||||||
|
|
||||||
# track same url again with same metadata returns id
|
# track same url again with same metadata returns id
|
||||||
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
id2 = self.kernel.track_url('http://example.com', 'default', pi=3)
|
||||||
assert id1 == id2
|
assert id1 == id2
|
||||||
@ -88,16 +72,12 @@ class KernelTests(TestCase):
|
|||||||
assert out == 'fixed-id'
|
assert out == 'fixed-id'
|
||||||
|
|
||||||
# can't pass track same id twice with different url
|
# can't pass track same id twice with different url
|
||||||
self.kernel.db.logs.drop()
|
|
||||||
assert_raises(ValueError, self.kernel.track_url,
|
assert_raises(ValueError, self.kernel.track_url,
|
||||||
'http://example.com/3', 'default', 'fixed-id')
|
'http://example.com/3', 'default', 'fixed-id')
|
||||||
assert 'already exists' in self.kernel.db.logs.find_one()['error']
|
|
||||||
|
|
||||||
# ... or different doc class
|
# ... or different doc class
|
||||||
self.kernel.db.logs.drop()
|
|
||||||
assert_raises(ValueError, self.kernel.track_url,
|
assert_raises(ValueError, self.kernel.track_url,
|
||||||
'http://example.com/2', 'change-hook', 'fixed-id')
|
'http://example.com/2', 'change-hook', 'fixed-id')
|
||||||
assert 'already exists' in self.kernel.db.logs.find_one()['error']
|
|
||||||
|
|
||||||
# different metadata is ok, but it should be updated
|
# different metadata is ok, but it should be updated
|
||||||
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
|
self.kernel.track_url('http://example.com/2', 'default', 'fixed-id',
|
||||||
@ -135,9 +115,6 @@ class KernelTests(TestCase):
|
|||||||
|
|
||||||
assert len(newobj['versions']) == 1
|
assert len(newobj['versions']) == 1
|
||||||
|
|
||||||
# check logs
|
|
||||||
assert self.kernel.db.logs.find({'action': 'update'}).count() == 1
|
|
||||||
|
|
||||||
# and do another update..
|
# and do another update..
|
||||||
self.kernel.update(obj)
|
self.kernel.update(obj)
|
||||||
|
|
||||||
@ -148,9 +125,6 @@ class KernelTests(TestCase):
|
|||||||
newobj = self.kernel.db.tracked.find_one()
|
newobj = self.kernel.db.tracked.find_one()
|
||||||
assert first_update < newobj['last_update']
|
assert first_update < newobj['last_update']
|
||||||
|
|
||||||
# check that logs updated
|
|
||||||
assert self.kernel.db.logs.find({'action': 'update'}).count() == 2
|
|
||||||
|
|
||||||
def test_update_failure(self):
|
def test_update_failure(self):
|
||||||
# track a non-existent URL
|
# track a non-existent URL
|
||||||
self.kernel.track_url('http://not_a_url', 'default')
|
self.kernel.track_url('http://not_a_url', 'default')
|
||||||
@ -160,10 +134,6 @@ class KernelTests(TestCase):
|
|||||||
obj = self.kernel.db.tracked.find_one()
|
obj = self.kernel.db.tracked.find_one()
|
||||||
assert obj['consecutive_errors'] == 1
|
assert obj['consecutive_errors'] == 1
|
||||||
|
|
||||||
# we should have logged an error too
|
|
||||||
assert self.kernel.db.logs.find({'action': 'update',
|
|
||||||
'error': {'$ne': False}}).count() == 1
|
|
||||||
|
|
||||||
# update again
|
# update again
|
||||||
self.kernel.update(obj)
|
self.kernel.update(obj)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user