From b844f24322caaaa8c831f5a4cf0ba1d14befa272 Mon Sep 17 00:00:00 2001 From: James Turk Date: Thu, 28 Jul 2011 12:12:17 -0400 Subject: [PATCH] initial commit, working daemon, non-configurable --- oyster/client.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++ oyster/daemon.py | 56 +++++++++++++++++++++++ oyster/status.py | 7 +++ 3 files changed, 176 insertions(+) create mode 100644 oyster/client.py create mode 100644 oyster/daemon.py create mode 100644 oyster/status.py diff --git a/oyster/client.py b/oyster/client.py new file mode 100644 index 0000000..badb767 --- /dev/null +++ b/oyster/client.py @@ -0,0 +1,113 @@ +import datetime +import hashlib +import random +import sys + +import pymongo +import gridfs + + +class Client(object): + + def __init__(self, host='localhost', port=27017, + database='oyster', collection='fs'): + self.db = pymongo.Connection(host, port)[database] + self.fs = gridfs.GridFS(self.db, collection) + + + def _wipe(self): + self.db.drop_collection('tracked') + self.db.fs.chunks.drop() + self.db.fs.files.drop() + + + def track_url(self, url, versioning='md5', update_mins=60*24, **kwargs): + """ + Add a URL to the set of tracked URLs, accessible via a given filename. + + url + URL to start tracking + """ + if self.db.tracked.find_one({'url': url}): + raise ValueError('%s is already tracked' % url) + + self.db.tracked.insert(dict(url=url, versioning=versioning, + update_mins=update_mins, + _random=random.randint(0, sys.maxint), + metadata=kwargs)) + + + def add_version(self, doc, data): + """ + Add a version + """ + + metadata = doc['metadata'] + do_put = True + + # add if md5 has changed + if doc['versioning'] == 'md5': + try: + old_md5 = self.fs.get_last_version(filename=doc['url']).md5 + new_md5 = hashlib.md5(data).hexdigest() + do_put = (old_md5 != new_md5) + except gridfs.NoFile: + pass + + else: + raise ValueError('unknown versioning strategy "%s"' % + doc['versioning']) + + doc['_last_update'] = datetime.datetime.utcnow() + doc['_next_update'] = (doc['_last_update'] + + datetime.timedelta(minutes=doc['update_mins'])) + self.db.tracked.save(doc, safe=True) + + # true unless md5s were the same + if do_put: + self.fs.put(data, filename=doc['url'], **metadata) + + + def get_all_versions(self, url): + versions = [] + n = 0 + while True: + try: + versions.append(self.fs.get_version(url, n)) + n += 1 + except gridfs.NoFile: + break + return versions + + + def get_version(self, url, n=-1): + return self.fs.get_version(url, n) + + + def get_update_queue(self, max=0): + # results are always sorted by random to avoid piling on + # a single server + + # first we try to update anything that we've never retrieved + new = self.db.tracked.find({'_next_update': + {'$exists': False}}).sort('_random') + if max: + new = new.limit(max) + + queue = list(new) + + # pull the rest from those for which _next_update is in the past + next = self.db.tracked.find({'_next_update': + {'$lt': datetime.datetime.utcnow()}}).sort('_random') + + if max: + max -= len(queue) + next = next.limit(max) + + queue.extend(next) + + return queue + + + def get_update_queue_size(self): + return len(self.get_update_queue()) diff --git a/oyster/daemon.py b/oyster/daemon.py new file mode 100644 index 0000000..0417c72 --- /dev/null +++ b/oyster/daemon.py @@ -0,0 +1,56 @@ +import multiprocessing +import time +import urllib + +from client import Client + +class UpdateProcess(multiprocessing.Process): + + def __init__(self, task_q): + super(UpdateProcess, self).__init__() + self.task_q = task_q + self.client = Client() + + + def run(self): + while True: + task = self.task_q.get() + if task is None: + self.task_q.task_done() + break + + # update tracked document + data = urllib.urlopen(task['url']).read() + self.client.add_version(task, data) + self.task_q.task_done() + + +def main(): + num_processes = 4 + sleep_upon_exhaustion = 60 + + c = Client() + work_queue = multiprocessing.JoinableQueue() + workers = [UpdateProcess(work_queue) for i in xrange(num_processes)] + for w in workers: + w.start() + + # go forever + while True: + + # get everything overdue and put it into the queue + next_set = c.get_update_queue() + if next_set: + for item in next_set: + work_queue.put(item) + + # do all queued work + work_queue.join() + + else: + # allow for a quiet period if queue is exhausted + time.sleep(sleep_upon_exhaustion) + + +if __name__ == '__main__': + main() diff --git a/oyster/status.py b/oyster/status.py new file mode 100644 index 0000000..3ecb14a --- /dev/null +++ b/oyster/status.py @@ -0,0 +1,7 @@ +from client import Client + +if __name__ == '__main__': + c = Client() + + print 'Tracking:', c.db.tracked.count() + print 'Waiting In Queue:', c.get_update_queue_size()