diff --git a/oyster/bin/__init__.py b/oyster/bin/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/oyster/bin/oysterd.py b/oyster/bin/oysterd.py deleted file mode 100644 index 1463164..0000000 --- a/oyster/bin/oysterd.py +++ /dev/null @@ -1,116 +0,0 @@ -import argparse -import multiprocessing -import threading -import time -import urllib - -import flask - -from oyster.client import Client -from oyster.web import app - - -class UpdateProcess(multiprocessing.Process): - - def __init__(self, task_q, client): - super(UpdateProcess, self).__init__() - self.task_q = task_q - self.client = client - - - def run(self): - while True: - task = self.task_q.get() - - # break on 'None' poison pill -- doesn't actually happen currently - if task is None: - self.task_q.task_done() - break - - # update tracked document - self.client.update(task) - - # decrement count for semaphore - self.task_q.task_done() - - -def _client_from_args(args): - return Client(mongo_host=args.mongo_host, - mongo_port=args.mongo_port, - mongo_db=args.mongo_db, - mongo_log_maxsize=args.logsize, - user_agent=args.useragent, - rpm=args.rpm, - timeout=args.timeout, - retry_attempts=args.retry_attempts, - retry_wait_seconds=args.retry_wait) - - -def main(): - - parser = argparse.ArgumentParser(description='oyster daemon') - parser.add_argument('-w', '--workers', type=int, default=0, - help='number of worker processes to use (default: # processors)') - parser.add_argument('-p', '--port', type=int, default=31687, - help='port for HTTP service to run on') - parser.add_argument('--debug', action='store_true', - help='enable debug mode') - # client config options - parser.add_argument('--mongo_host', default='localhost', - help='host or IP for mongodb server') - parser.add_argument('--mongo_port', default=27017, - help='port for mongodb server') - parser.add_argument('--mongo_db', default='oyster', - help='mongodb database name') - parser.add_argument('--logsize', default=100000000, - help='mongodb maximum log size (bytes)') - parser.add_argument('--useragent', default='oyster', - help='user agent to use when fetching pages') - parser.add_argument('--rpm', default=600, - help='maximum requests per minute to make') - parser.add_argument('--timeout', default=None, - help='timeout (seconds) when making requests (default: none)') - parser.add_argument('--retry_attempts', default=0, - help='retries when making requests (default: 0)') - parser.add_argument('--retry_wait', default=5, - help='retry wait period (seconds) when making requests (default: 5)') - args = parser.parse_args() - - work_queue = multiprocessing.JoinableQueue() - client = _client_from_args(args) - - # workers defaults to cpu_count - if not args.workers: - args.workers = multiprocessing.cpu_count() - workers = [UpdateProcess(work_queue, _client_from_args(args)) - for i in xrange(args.workers)] - - # separate process for Flask app - def flask_process(): - app.run(debug=args.debug, port=args.port) - server = multiprocessing.Process(target=flask_process) - - # give flask access to our work_queue - app.work_queue = work_queue - - # start all processes - for worker in workers: - worker.start() - server.start() - - while True: - # get everything overdue and put it into the queue - next_set = client.get_update_queue() - if next_set: - for item in next_set: - work_queue.put(item) - - # do all queued work - work_queue.join() - - # sleep for a minute between attempts to check the queue - time.sleep(60) - - -if __name__ == '__main__': - main() diff --git a/requirements.txt b/requirements.txt index 7836693..0fe3f21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ scrapelib pymongo>=1.11 flask nose +celery