move update logic into client, will allow update strategies

This commit is contained in:
James Turk 2011-07-28 16:11:32 -04:00
parent 4752e0a7ce
commit 454130f5ea
2 changed files with 38 additions and 23 deletions

View File

@ -15,13 +15,16 @@ class UpdateProcess(multiprocessing.Process):
def run(self): def run(self):
while True: while True:
task = self.task_q.get() task = self.task_q.get()
# break on 'None' poison pill
if task is None: if task is None:
self.task_q.task_done() self.task_q.task_done()
break break
# update tracked document # update tracked document
data = urllib.urlopen(task['url']).read() self.client.update(task)
self.client.add_version(task, data)
# decrement count for semaphore
self.task_q.task_done() self.task_q.task_done()

View File

@ -5,23 +5,30 @@ import sys
import pymongo import pymongo
import gridfs import gridfs
import scrapelib
class Client(object): class Client(object):
def __init__(self, host='localhost', port=27017, def __init__(self, host='localhost', port=27017,
database='oyster', collection='fs'): database='oyster', collection='fs'):
self.db = pymongo.Connection(host, port)[database] self.db = pymongo.Connection(host, port)[database]
self.fs = gridfs.GridFS(self.db, collection) self.fs = gridfs.GridFS(self.db, collection)
self._collection_name = collection
# TODO: add some scrapelib config options
self.scraper = scrapelib.Scraper()
def _wipe(self): def _wipe(self):
""" exists primarily for debug use, wipes entire db """
self.db.drop_collection('tracked') self.db.drop_collection('tracked')
self.db.fs.chunks.drop() self.db.drop_collection('%s.chunks' % self._collection_name)
self.db.fs.files.drop() self.db.drop_collection('%s.files' % self._collection_name)
def track_url(self, url, versioning='md5', update_mins=60*24, **kwargs): def track_url(self, url, versioning='md5', update_mins=60*24,
**kwargs):
""" """
Add a URL to the set of tracked URLs, accessible via a given filename. Add a URL to the set of tracked URLs, accessible via a given filename.
@ -37,36 +44,42 @@ class Client(object):
metadata=kwargs)) metadata=kwargs))
def add_version(self, doc, data): def md5_versioning(self, doc, data):
""" """ return True if md5 changed or if file is new """
Add a version
"""
metadata = doc['metadata']
do_put = True
# add if md5 has changed
if doc['versioning'] == 'md5':
try: try:
old_md5 = self.fs.get_last_version(filename=doc['url']).md5 old_md5 = self.fs.get_last_version(filename=doc['url']).md5
new_md5 = hashlib.md5(data).hexdigest() new_md5 = hashlib.md5(data).hexdigest()
do_put = (old_md5 != new_md5) return (old_md5 != new_md5)
except gridfs.NoFile: except gridfs.NoFile:
pass return True
def update(self, doc):
# assume we're going to do the put
do_put = True
# update strategies could be implemented here as well
resp = self.scraper.urlopen(doc['url'])
# versioning is a concept for future use, but here's how it can work:
# versioning functions take doc & data, and return True if data is
# different, since they have access to doc, they can also modify
# certain attributes as needed
if doc['versioning'] == 'md5':
do_put = self.md5_versioning(doc, data)
else: else:
raise ValueError('unknown versioning strategy "%s"' % raise ValueError('unknown versioning strategy "%s"' %
doc['versioning']) doc['versioning'])
if do_put:
self.fs.put(data, filename=doc['url'], **doc['metadata'])
# _last_update/_next_update are separate from question of versioning
doc['_last_update'] = datetime.datetime.utcnow() doc['_last_update'] = datetime.datetime.utcnow()
doc['_next_update'] = (doc['_last_update'] + doc['_next_update'] = (doc['_last_update'] +
datetime.timedelta(minutes=doc['update_mins'])) datetime.timedelta(minutes=doc['update_mins']))
self.db.tracked.save(doc, safe=True) self.db.tracked.save(doc, safe=True)
# true unless md5s were the same
if do_put:
self.fs.put(data, filename=doc['url'], **metadata)
def get_all_versions(self, url): def get_all_versions(self, url):
versions = [] versions = []
@ -99,7 +112,6 @@ class Client(object):
# pull the rest from those for which _next_update is in the past # pull the rest from those for which _next_update is in the past
next = self.db.tracked.find({'_next_update': next = self.db.tracked.find({'_next_update':
{'$lt': datetime.datetime.utcnow()}}).sort('_random') {'$lt': datetime.datetime.utcnow()}}).sort('_random')
if max: if max:
max -= len(queue) max -= len(queue)
next = next.limit(max) next = next.limit(max)