From dc15bd17bb435aaf6855f6e96476f5b9b74774f0 Mon Sep 17 00:00:00 2001 From: Stephen Mardson McQuay Date: Wed, 18 May 2011 11:06:35 -0600 Subject: [PATCH] added framework for running parametric test on marylou --- bin/sheep.py | 100 ++++++++++++++++++++++++++++++++++++ bin/shepherd.py | 133 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 bin/sheep.py create mode 100644 bin/shepherd.py diff --git a/bin/sheep.py b/bin/sheep.py new file mode 100644 index 0000000..82ff33c --- /dev/null +++ b/bin/sheep.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +import sys +import os +import time + +from multiprocessing.managers import BaseManager + +from optparse import OptionParser +import datetime + +import numpy as np + +import interp.bootstrap +from interp.grid.gmsh import ggrid +from interp.tools import baker_exact_3D as exact + +from interp.cluster import QueueManager, get_qs + +if __name__ == '__main__': + parser = OptionParser(usage = "usage: %s [options] ") + + parser.add_option("-v", "--verbose", + action="store_true", dest="verbose", default=False, + help="verbose flag (default: %default)") + + parser.add_option('-p', '--port', + type="int", dest="port", default=6666, + help="specify the port to use on the server (default: %default)") + + (options, args) = parser.parse_args() + + if len(args) != 2: + parser.print_usage() + sys.exit(1) + + server, input_file = args + + myname = "%s-%d" % (os.uname()[1], os.getpid()) + if options.verbose: + print "%s: started" % myname + + + m = QueueManager(address=(server, options.port), authkey='asdf') + m.connect() + + tasksq, resultsq, masterq, minionsq = get_qs(m) + + if options.verbose: + print "%s: starting parse input file" % myname + g = ggrid(input_file) + g.q = np.array([exact(x) for x in g.verts]) + if options.verbose: + print "%s: done parsing input file" % myname + + + while True: + if options.verbose: + print "%s: letting master know that I am ready" % myname + masterq.put(myname) + + if options.verbose: + print "%s: waiting for master to tell me to start" % myname + action = minionsq.get() + if options.verbose: + print "%s: master said go!!" % myname + + if action in ('teardown', 'slay'): + break + + while not tasksq.empty(): + i, order, extra, X = tasksq.get() + try: + s = time.time() + a = g.run_baker(X, order = order, extra_points = extra) + cur_qlin = a['qlin' ] + cur_error = a['error'] + cur_final = a['final'] + cur_exact = exact(X) + e = time.time() + duration = e-s + except Exception as e: + print >>sys.stderr, X, e + cur_qlin = 0.0 + cur_error = 0.0 + cur_final = 0.0 + cur_exact = 0.0 + duration = 0 + + resultsq.put(( + i, + cur_qlin, + cur_error, + cur_final, + cur_exact, + duration, + )) + + if options.verbose: + print "%s: exiting" % myname diff --git a/bin/shepherd.py b/bin/shepherd.py new file mode 100644 index 0000000..c9c5889 --- /dev/null +++ b/bin/shepherd.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python + +import sys +import os + +import time + +import shelve +from progressbar import * +from collections import defaultdict +from optparse import OptionParser +import pickle + +import numpy as np + +import interp.bootstrap + +import logging +log = logging.getLogger("interp") + +from interp.cluster import QueueManager, get_qs + +if __name__ == '__main__': + parser = OptionParser(usage = "%prog [options] ") + + parser.add_option("-v", "--verbose", + action="store_true", dest="verbose", default=False, + help="verbose flag") + + parser.add_option('-p', '--port', + type="int", dest="port", default=6666, + help="specify the port to use on the server (default: %default)") + + parser.add_option('-n', '--node-count', + type="int", dest="participants", default=None, + help="specify how many participants we should wait for (default: %default)") + + (options, args) = parser.parse_args() + if len(args) != 2: + parser.print_usage() + sys.exit(1) + + server, dest_verts = args + + dest_verts = pickle.load(open(dest_verts, 'r')) + + orders = [2,3,4,5] + extras = [4,6,8,12,16,20,32,48,64,96,128,192,256] + + + count = len(orders) * len(extras) * sum((len(i) for i in dest_verts)) + + results = [None] * count + # results = [id, order, extra, X, qlin, err, final, exact, time] + + m = QueueManager(address=(server, options.port), authkey='asdf') + m.connect() + + tasksq, resultsq, masterq, minionsq = get_qs(m) + + if not options.participants: + print "wait on all announced participants" + participants = 0 + while not masterq.empty(): + participants += 1 + worker = masterq.get() + print "%s (%d)" % (worker,participants) + if participants == 0: + print "nobody found" + sys.exit(1) + else: + participants = options.participants + print "wait on %d participants" % participants + for i in xrange(participants): + worker = masterq.get() + print "%s (%d/%d)" % (worker, i+1, participants) + + print "Submitting %d pieces of workload" % count + + widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()] + pbar = ProgressBar(widgets = widgets, maxval = count) + pbar.start() + + submit_start = time.time() + i = 0 + for order in orders: + for extra in extras: + for d in dest_verts: + for X in d: + cur_job = (i, order, extra, X) + results[i] = [ order, extra, X] + # tasksq.put(cur_job) + i+=1 + pbar.update(i) + submit_end = time.time() + pbar.finish() + + print "it took %0.2f seconds to submit the workload" % (submit_end - submit_start,) + + print "len(results)", len(results) + + for i in xrange(participants): + print "sending worker %d start message" % (i+1,) + minionsq.put("start") + + receive_start = time.time() + widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()] + pbar = ProgressBar(widgets = widgets, maxval = count) + pbar.start() + for i in xrange(count): + rid, qlin, err, final, exact, duration = resultsq.get() + results[rid].extend((qlin, err, final, exact, duration)) + pbar.update(i+1) + receive_end = time.time() + pbar.finish() + + pickle.dump(results, open("results.p")) + + submit = submit_end - submit_start + receive = receive_end - receive_start + + # shut down all participants + for i in xrange(participants): + minionsq.put("teardown") + + # post processing + stats = {} + stats['submit' ] = float(submit) + stats['receive' ] = float(receive) + stats['count' ] = count + stats['participants'] = participants + + print "%s" % stats