added framework for running parametric test on marylou

This commit is contained in:
Stephen McQuay 2011-05-18 11:06:35 -06:00
parent a1bd6a7257
commit dc15bd17bb
2 changed files with 233 additions and 0 deletions

100
bin/sheep.py Normal file
View File

@ -0,0 +1,100 @@
#!/usr/bin/env python
import sys
import os
import time
from multiprocessing.managers import BaseManager
from optparse import OptionParser
import datetime
import numpy as np
import interp.bootstrap
from interp.grid.gmsh import ggrid
from interp.tools import baker_exact_3D as exact
from interp.cluster import QueueManager, get_qs
if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <gmsh file>")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose flag (default: %default)")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 2:
parser.print_usage()
sys.exit(1)
server, input_file = args
myname = "%s-%d" % (os.uname()[1], os.getpid())
if options.verbose:
print "%s: started" % myname
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
tasksq, resultsq, masterq, minionsq = get_qs(m)
if options.verbose:
print "%s: starting parse input file" % myname
g = ggrid(input_file)
g.q = np.array([exact(x) for x in g.verts])
if options.verbose:
print "%s: done parsing input file" % myname
while True:
if options.verbose:
print "%s: letting master know that I am ready" % myname
masterq.put(myname)
if options.verbose:
print "%s: waiting for master to tell me to start" % myname
action = minionsq.get()
if options.verbose:
print "%s: master said go!!" % myname
if action in ('teardown', 'slay'):
break
while not tasksq.empty():
i, order, extra, X = tasksq.get()
try:
s = time.time()
a = g.run_baker(X, order = order, extra_points = extra)
cur_qlin = a['qlin' ]
cur_error = a['error']
cur_final = a['final']
cur_exact = exact(X)
e = time.time()
duration = e-s
except Exception as e:
print >>sys.stderr, X, e
cur_qlin = 0.0
cur_error = 0.0
cur_final = 0.0
cur_exact = 0.0
duration = 0
resultsq.put((
i,
cur_qlin,
cur_error,
cur_final,
cur_exact,
duration,
))
if options.verbose:
print "%s: exiting" % myname

133
bin/shepherd.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/env python
import sys
import os
import time
import shelve
from progressbar import *
from collections import defaultdict
from optparse import OptionParser
import pickle
import numpy as np
import interp.bootstrap
import logging
log = logging.getLogger("interp")
from interp.cluster import QueueManager, get_qs
if __name__ == '__main__':
parser = OptionParser(usage = "%prog [options] <server> <destination vertexes file (pickle)>")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose flag")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
parser.add_option('-n', '--node-count',
type="int", dest="participants", default=None,
help="specify how many participants we should wait for (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 2:
parser.print_usage()
sys.exit(1)
server, dest_verts = args
dest_verts = pickle.load(open(dest_verts, 'r'))
orders = [2,3,4,5]
extras = [4,6,8,12,16,20,32,48,64,96,128,192,256]
count = len(orders) * len(extras) * sum((len(i) for i in dest_verts))
results = [None] * count
# results = [id, order, extra, X, qlin, err, final, exact, time]
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
tasksq, resultsq, masterq, minionsq = get_qs(m)
if not options.participants:
print "wait on all announced participants"
participants = 0
while not masterq.empty():
participants += 1
worker = masterq.get()
print "%s (%d)" % (worker,participants)
if participants == 0:
print "nobody found"
sys.exit(1)
else:
participants = options.participants
print "wait on %d participants" % participants
for i in xrange(participants):
worker = masterq.get()
print "%s (%d/%d)" % (worker, i+1, participants)
print "Submitting %d pieces of workload" % count
widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
submit_start = time.time()
i = 0
for order in orders:
for extra in extras:
for d in dest_verts:
for X in d:
cur_job = (i, order, extra, X)
results[i] = [ order, extra, X]
# tasksq.put(cur_job)
i+=1
pbar.update(i)
submit_end = time.time()
pbar.finish()
print "it took %0.2f seconds to submit the workload" % (submit_end - submit_start,)
print "len(results)", len(results)
for i in xrange(participants):
print "sending worker %d start message" % (i+1,)
minionsq.put("start")
receive_start = time.time()
widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
for i in xrange(count):
rid, qlin, err, final, exact, duration = resultsq.get()
results[rid].extend((qlin, err, final, exact, duration))
pbar.update(i+1)
receive_end = time.time()
pbar.finish()
pickle.dump(results, open("results.p"))
submit = submit_end - submit_start
receive = receive_end - receive_start
# shut down all participants
for i in xrange(participants):
minionsq.put("teardown")
# post processing
stats = {}
stats['submit' ] = float(submit)
stats['receive' ] = float(receive)
stats['count' ] = count
stats['participants'] = participants
print "%s" % stats