#!/usr/bin/env python from collections import defaultdict from optparse import OptionParser import os from progressbar import ProgressBar, Percentage, Bar, ETA import shelve import sys import time import numpy as np from interp.cluster import QueueManager, get_qs if __name__ == '__main__': parser = OptionParser(usage="usage: %s [options] ") parser.add_option("-l", "--last-time", action="store_true", dest="last", default=False, help="when finished, send shutdown signal to connected nodes" "(default: %default)") parser.add_option('-n', '--node-count', type="int", dest="participants", default=None, help="specify how many participants we should wait for" "(default: %default)") parser.add_option('-p', '--port', type="int", dest="port", default=6666, help="specify the port to use on the server (default: %default)") parser.add_option("-o", "--order", type="int", dest="order", default=2, help="order of interpolation (default: %default)") parser.add_option("-e", "--extra-points", type="int", dest="extra", default=3, help="number of extra points (default: %default)") parser.add_option('-s', '--shelve', type="str", dest="shelvename", default=os.path.expanduser('~/interp.shelve'), help="shelve output file (default: %default)") (options, args) = parser.parse_args() if len(args) != 2: parser.print_usage() sys.exit(1) server = args[0] count = int(float(args[1])) m = QueueManager(address=(server, options.port), authkey='asdf') m.connect() tasksq, resultsq, masterq, minionsq = get_qs(m) workers = [] if not options.participants: print "wait on all announced participants" participants = 0 while not masterq.empty(): participants += 1 worker = masterq.get() workers.append(worker) print "%d: %s is ready" % (participants, worker) if participants == 0: print "nobody found" sys.exit(1) else: participants = options.participants print "wait on %d participants" % participants for i in xrange(participants): worker = masterq.get() workers.append(worker) print "%d of %d: %s is ready" % (i + 1, participants, worker) if len(set(workers)) != len(workers): for i in workers: minionsq.put("slay") raise Exception("duplicate workers reported") results = [] widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()] pbar = ProgressBar(widgets=widgets, maxval=count) pbar.start() submit_start = time.time() for i in xrange(count): X = np.random.random((1, 3))[0] tasksq.put((i, options.order, options.extra, X)) pbar.update(i + 1) submit_end = time.time() pbar.finish() for i in xrange(participants): print "sending worker %d start message" % (i + 1,) minionsq.put("start") receive_start = time.time() widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()] pbar = ProgressBar(widgets=widgets, maxval=count) pbar.start() for i in xrange(count): cur_result = resultsq.get() results.append(cur_result) pbar.update(i + 1) receive_end = time.time() pbar.finish() submit = submit_end - submit_start receive = receive_end - receive_start # shut down all participants for i in xrange(participants): if options.last: minionsq.put("teardown") # post processing stats = {} stats['submit'] = float(submit) stats['receive'] = float(receive) stats['count'] = count stats['participants'] = participants stats['extra'] = options.extra stats['order'] = options.order print "%s" % stats tasks_accomplished_by = defaultdict(int) for i in results: tasks_accomplished_by[i[1]] += 1 stats['tasks'] = tasks_accomplished_by npresults = np.array([(i[0], i[2], i[3], i[4], i[5]) for i in results]) n = str(time.time()) s = shelve.open(options.shelvename) s[n] = { 'stats': stats, 'results': npresults, } s.close()