all created clients in oysterd should obey settings
This commit is contained in:
parent
f5e463e366
commit
c14363ba38
@ -12,17 +12,17 @@ from oyster.web import app
|
|||||||
|
|
||||||
class UpdateProcess(multiprocessing.Process):
|
class UpdateProcess(multiprocessing.Process):
|
||||||
|
|
||||||
def __init__(self, task_q):
|
def __init__(self, task_q, client):
|
||||||
super(UpdateProcess, self).__init__()
|
super(UpdateProcess, self).__init__()
|
||||||
self.task_q = task_q
|
self.task_q = task_q
|
||||||
self.client = Client()
|
self.client = client
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
task = self.task_q.get()
|
task = self.task_q.get()
|
||||||
|
|
||||||
# break on 'None' poison pill
|
# break on 'None' poison pill -- doesn't actually happen currently
|
||||||
if task is None:
|
if task is None:
|
||||||
self.task_q.task_done()
|
self.task_q.task_done()
|
||||||
break
|
break
|
||||||
@ -34,6 +34,16 @@ class UpdateProcess(multiprocessing.Process):
|
|||||||
self.task_q.task_done()
|
self.task_q.task_done()
|
||||||
|
|
||||||
|
|
||||||
|
def _client_from_args(args):
|
||||||
|
return Client(mongo_host=args.mongo_host,
|
||||||
|
mongo_port=args.mongo_port,
|
||||||
|
mongo_db=args.mongo_db,
|
||||||
|
mongo_log_maxsize=args.logsize,
|
||||||
|
user_agent=args.useragent,
|
||||||
|
rpm=args.rpm,
|
||||||
|
timeout=args.timeout,
|
||||||
|
retry_attempts=args.retry_attempts,
|
||||||
|
retry_wait_seconds=args.retry_wait)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -67,11 +77,13 @@ def main():
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
work_queue = multiprocessing.JoinableQueue()
|
work_queue = multiprocessing.JoinableQueue()
|
||||||
|
client = _client_from_args(args)
|
||||||
|
|
||||||
# workers defaults to cpu_count
|
# workers defaults to cpu_count
|
||||||
if not args.workers:
|
if not args.workers:
|
||||||
args.workers = multiprocessing.cpu_count()
|
args.workers = multiprocessing.cpu_count()
|
||||||
workers = [UpdateProcess(work_queue) for i in xrange(args.workers)]
|
workers = [UpdateProcess(work_queue, _client_from_args(args))
|
||||||
|
for i in xrange(args.workers)]
|
||||||
|
|
||||||
# separate process for Flask app
|
# separate process for Flask app
|
||||||
def flask_process():
|
def flask_process():
|
||||||
@ -86,16 +98,6 @@ def main():
|
|||||||
worker.start()
|
worker.start()
|
||||||
server.start()
|
server.start()
|
||||||
|
|
||||||
client = Client(mongo_host=args.mongo_host,
|
|
||||||
mongo_port=args.mongo_port,
|
|
||||||
mongo_db=args.mongo_db,
|
|
||||||
mongo_log_maxsize=args.logsize,
|
|
||||||
user_agent=args.useragent,
|
|
||||||
rpm=args.rpm,
|
|
||||||
timeout=args.timeout,
|
|
||||||
retry_attempts=args.retry_attempts,
|
|
||||||
retry_wait_seconds=args.retry_wait_seconds)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# get everything overdue and put it into the queue
|
# get everything overdue and put it into the queue
|
||||||
next_set = client.get_update_queue()
|
next_set = client.get_update_queue()
|
||||||
|
Loading…
Reference in New Issue
Block a user