From 2c13ec8c4c0b0770c9de8ed96da81e82ac82927e Mon Sep 17 00:00:00 2001 From: Stephen Mardson McQuay Date: Wed, 30 Mar 2011 21:12:16 -0600 Subject: [PATCH] major: added progressbar, options, shelving, etc --- bin/master.py | 75 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/bin/master.py b/bin/master.py index 1ba55f7..690df03 100644 --- a/bin/master.py +++ b/bin/master.py @@ -5,6 +5,11 @@ import os import time +import shelve +from progressbar import * +from collections import defaultdict +from optparse import OptionParser + from multiprocessing.managers import BaseManager import numpy as np @@ -18,7 +23,7 @@ class QueueManager(BaseManager): pass QueueManager.register('get_inqueue' ) QueueManager.register('get_outqueue') -def run_queries(count, order = 3, extra_points = 8): +def run_queries(count, order = 3, extra_points = 8, verbose = False): r = [] submit_start = time.time() for i in xrange(count): @@ -27,39 +32,93 @@ def run_queries(count, order = 3, extra_points = 8): submit_end = time.time() receive_start = time.time() + + widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()] + pbar = ProgressBar(widgets = widgets, maxval = count) + if verbose: + pbar.start() for i in xrange(count): r.append(outq.get()) + if verbose: + pbar.update(i+1) receive_end = time.time() + if verbose: + pbar.finish() return (r, submit_end - submit_start, receive_end - receive_start) if __name__ == '__main__': - server = 'localhost' + parser = OptionParser(usage = "usage: %s [options] ") - m = QueueManager(address=(server, 50000), authkey='asdf') + parser.add_option("-v", "--verbose", + action="store_true", dest="verbose", default=False, + help="verbose flag (display progress bar: %default)") + + parser.add_option('-p', '--port', + type="int", dest="port", default=6666, + help="specify the port to use on the server (default: %default)") + + parser.add_option("-o", "--order", + type="int", dest="order", default=2, + help="order of interpolation (default: %default)") + + parser.add_option("-e", "--extra-points", + type="int", dest="extra", default=3, + help="number of extra points (default: %default)") + + parser.add_option('-s', '--shelve', + type="str", dest="shelvename", default=os.path.expanduser('~/interp.shelve'), + help="shelve output file (default: %default)") + + (options, args) = parser.parse_args() + if len(args) != 3: + parser.print_usage() + sys.exit(1) + + server = args[0] + expected_participants, count = (int(i) for i in args[1:]) + + m = QueueManager(address=(server, options.port), authkey='asdf') m.connect() - expected_participants, count = (int(i) for i in sys.argv[1:]) - inq = m.get_inqueue() outq = m.get_outqueue() # wait for all participants to be loaded up for i in xrange(expected_participants): - print outq.get() + worker = outq.get() + if options.verbose: + print worker # run codes - results, submit, receive = run_queries(count, order=3, extra_points = 64) + results, submit, receive = run_queries(count, order = options.order, extra_points = options.extra, verbose = options.verbose) # shut down all participants for i in xrange(expected_participants): inq.put([None] * 4) + # post processing stats = {} stats['submit' ] = float(submit) stats['receive' ] = float(receive) stats['count' ] = count stats['expected_participants'] = expected_participants - print "submit time for %(count)s interps, %(expected_participants)d participants: %(submit)0.2f seconds, results time: %(receive)0.2f" % stats + if options.verbose: + print "%s" % stats log.error("stats: %s", stats) + + tasks_accomplished_by = defaultdict(int) + for i in results: + tasks_accomplished_by[i[1]] += 1 + stats['tasks'] = tasks_accomplished_by + + npresults = np.array([(i[0],i[2],i[3],i[4], i[5]) for i in results]) + + s = shelve.open(options.shelvename) + n = str(time.time()) + s[n] = { + 'stats' : stats, + 'results' : npresults, + } + s.close()