From 454130f5eada971bc24008f4fb23f2637eeb671d Mon Sep 17 00:00:00 2001 From: James Turk Date: Thu, 28 Jul 2011 16:11:32 -0400 Subject: [PATCH] move update logic into client, will allow update strategies --- oyster/bin/daemon.py | 7 ++++-- oyster/client.py | 54 +++++++++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/oyster/bin/daemon.py b/oyster/bin/daemon.py index d67e7ed..05728af 100644 --- a/oyster/bin/daemon.py +++ b/oyster/bin/daemon.py @@ -15,13 +15,16 @@ class UpdateProcess(multiprocessing.Process): def run(self): while True: task = self.task_q.get() + + # break on 'None' poison pill if task is None: self.task_q.task_done() break # update tracked document - data = urllib.urlopen(task['url']).read() - self.client.add_version(task, data) + self.client.update(task) + + # decrement count for semaphore self.task_q.task_done() diff --git a/oyster/client.py b/oyster/client.py index badb767..20c4b29 100644 --- a/oyster/client.py +++ b/oyster/client.py @@ -5,23 +5,30 @@ import sys import pymongo import gridfs +import scrapelib class Client(object): + def __init__(self, host='localhost', port=27017, database='oyster', collection='fs'): self.db = pymongo.Connection(host, port)[database] self.fs = gridfs.GridFS(self.db, collection) + self._collection_name = collection + # TODO: add some scrapelib config options + self.scraper = scrapelib.Scraper() def _wipe(self): + """ exists primarily for debug use, wipes entire db """ self.db.drop_collection('tracked') - self.db.fs.chunks.drop() - self.db.fs.files.drop() + self.db.drop_collection('%s.chunks' % self._collection_name) + self.db.drop_collection('%s.files' % self._collection_name) - def track_url(self, url, versioning='md5', update_mins=60*24, **kwargs): + def track_url(self, url, versioning='md5', update_mins=60*24, + **kwargs): """ Add a URL to the set of tracked URLs, accessible via a given filename. @@ -37,36 +44,42 @@ class Client(object): metadata=kwargs)) - def add_version(self, doc, data): - """ - Add a version - """ + def md5_versioning(self, doc, data): + """ return True if md5 changed or if file is new """ + try: + old_md5 = self.fs.get_last_version(filename=doc['url']).md5 + new_md5 = hashlib.md5(data).hexdigest() + return (old_md5 != new_md5) + except gridfs.NoFile: + return True - metadata = doc['metadata'] + + def update(self, doc): + # assume we're going to do the put do_put = True - # add if md5 has changed - if doc['versioning'] == 'md5': - try: - old_md5 = self.fs.get_last_version(filename=doc['url']).md5 - new_md5 = hashlib.md5(data).hexdigest() - do_put = (old_md5 != new_md5) - except gridfs.NoFile: - pass + # update strategies could be implemented here as well + resp = self.scraper.urlopen(doc['url']) + # versioning is a concept for future use, but here's how it can work: + # versioning functions take doc & data, and return True if data is + # different, since they have access to doc, they can also modify + # certain attributes as needed + if doc['versioning'] == 'md5': + do_put = self.md5_versioning(doc, data) else: raise ValueError('unknown versioning strategy "%s"' % doc['versioning']) + if do_put: + self.fs.put(data, filename=doc['url'], **doc['metadata']) + + # _last_update/_next_update are separate from question of versioning doc['_last_update'] = datetime.datetime.utcnow() doc['_next_update'] = (doc['_last_update'] + datetime.timedelta(minutes=doc['update_mins'])) self.db.tracked.save(doc, safe=True) - # true unless md5s were the same - if do_put: - self.fs.put(data, filename=doc['url'], **metadata) - def get_all_versions(self, url): versions = [] @@ -99,7 +112,6 @@ class Client(object): # pull the rest from those for which _next_update is in the past next = self.db.tracked.find({'_next_update': {'$lt': datetime.datetime.utcnow()}}).sort('_random') - if max: max -= len(queue) next = next.limit(max)