#!/usr/bin/env python import sys import os import time import shelve from collections import defaultdict from optparse import OptionParser import logging log = logging.getLogger("interp") import numpy as np from interp.cluster import QueueManager, get_qs from progressbar import * if __name__ == '__main__': parser = OptionParser(usage = "usage: %s [options] ") parser.add_option("-l", "--last-time", action="store_true", dest="last", default=False, help="when finished, send shutdown signal to connected nodes (default: %default)") parser.add_option('-n', '--node-count', type="int", dest="participants", default=None, help="specify how many participants we should wait for (default: %default)") parser.add_option('-p', '--port', type="int", dest="port", default=6666, help="specify the port to use on the server (default: %default)") parser.add_option("-o", "--order", type="int", dest="order", default=2, help="order of interpolation (default: %default)") parser.add_option("-e", "--extra-points", type="int", dest="extra", default=3, help="number of extra points (default: %default)") parser.add_option('-s', '--shelve', type="str", dest="shelvename", default=os.path.expanduser('~/interp.shelve'), help="shelve output file (default: %default)") (options, args) = parser.parse_args() if len(args) != 2: parser.print_usage() sys.exit(1) server = args[0] count = int(float(args[1])) m = QueueManager(address=(server, options.port), authkey='asdf') m.connect() tasksq, resultsq, masterq, minionsq = get_qs(m) workers = [] if not options.participants: print "wait on all announced participants" participants = 0 while not masterq.empty(): participants += 1 worker = masterq.get() workers.append(worker) print "%d: %s is ready" % (participants, worker) if participants == 0: print "nobody found" sys.exit(1) else: participants = options.participants print "wait on %d participants" % participants for i in xrange(participants): worker = masterq.get() workers.append(worker) print "%d of %d: %s is ready" % (i+1, participants, worker) if len(set(workers)) != len(workers): for i in workers: minionsq.put("slay") raise Exception("duplicate workers reported") results = [] widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()] pbar = ProgressBar(widgets = widgets, maxval = count) pbar.start() submit_start = time.time() for i in xrange(count): X = np.random.random((1,3))[0] tasksq.put((i, options.order, options.extra, X)) pbar.update(i+1) submit_end = time.time() pbar.finish() for i in xrange(participants): print "sending worker %d start message" % (i+1,) minionsq.put("start") receive_start = time.time() widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()] pbar = ProgressBar(widgets = widgets, maxval = count) pbar.start() for i in xrange(count): cur_result = resultsq.get() results.append(cur_result) pbar.update(i+1) receive_end = time.time() pbar.finish() submit = submit_end - submit_start receive = receive_end - receive_start # shut down all participants for i in xrange(participants): if options.last: minionsq.put("teardown") # post processing stats = {} stats['submit' ] = float(submit) stats['receive' ] = float(receive) stats['count' ] = count stats['participants'] = participants stats['extra' ] = options.extra stats['order' ] = options.order print "%s" % stats log.error("stats: %s", stats) tasks_accomplished_by = defaultdict(int) for i in results: tasks_accomplished_by[i[1]] += 1 stats['tasks'] = tasks_accomplished_by # npresults = np.array([(i[0],i[2],i[3],i[4], i[5]) for i in results]) n = str(time.time()) s = shelve.open(options.shelvename) s[n] = { 'stats' : stats, # 'results' : npresults, } s.close()