134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
#!/usr/bin/env python
|
|
|
|
from optparse import OptionParser
|
|
import pickle
|
|
import sys
|
|
import time
|
|
|
|
from progressbar import ProgressBar, ETA, Percentage, Bar
|
|
|
|
from interp.cluster import QueueManager, get_qs
|
|
|
|
if __name__ == '__main__':
|
|
parser = OptionParser(usage="%prog [options] <server>"
|
|
" <destination vertexes file (pickle)>"
|
|
" <results output file> <dimension>")
|
|
|
|
parser.add_option("-v", "--verbose",
|
|
action="store_true", dest="verbose", default=False,
|
|
help="verbose flag")
|
|
|
|
parser.add_option('-p', '--port',
|
|
type="int", dest="port", default=6666,
|
|
help="specify the port to use on the server (default: %default)")
|
|
|
|
parser.add_option('-n', '--node-count',
|
|
type="int", dest="participants", default=None,
|
|
help="specify how many participants we should"
|
|
" wait for (default: %default)")
|
|
|
|
(options, args) = parser.parse_args()
|
|
if len(args) != 4:
|
|
parser.print_usage()
|
|
sys.exit(1)
|
|
|
|
server, dest_verts, results_p_name, dimension = args
|
|
dimension = int(dimension)
|
|
|
|
dest_verts = pickle.load(open(dest_verts, 'r'))
|
|
|
|
orders = [2, 3, 4, 5]
|
|
extras = [4, 6, 8, 12, 16, 20, 32, 48, 64, 96, 128, 192, 256]
|
|
|
|
count = len(orders) * len(extras) * len(dest_verts)
|
|
|
|
results = [None] * count
|
|
# results = [id, order, extra, X, qlin, err, final, exact, time]
|
|
|
|
m = QueueManager(address=(server, options.port), authkey='asdf')
|
|
m.connect()
|
|
|
|
tasksq, resultsq, masterq, minionsq = get_qs(m)
|
|
|
|
if not options.participants:
|
|
print "wait on all announced participants"
|
|
participants = 0
|
|
while not masterq.empty():
|
|
participants += 1
|
|
worker = masterq.get()
|
|
print "%s (%d)" % (worker, participants)
|
|
if participants == 0:
|
|
print "nobody found"
|
|
sys.exit(1)
|
|
else:
|
|
participants = options.participants
|
|
print "wait on %d participants" % participants
|
|
for i in xrange(participants):
|
|
worker = masterq.get()
|
|
print "%s (%d/%d)" % (worker, i + 1, participants)
|
|
|
|
print "Submitting %d pieces of workload" % count
|
|
|
|
widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()]
|
|
pbar = ProgressBar(widgets=widgets, maxval=count)
|
|
pbar.start()
|
|
|
|
submit_start = time.time()
|
|
i = 0
|
|
for order in orders:
|
|
for extra in extras:
|
|
for X in dest_verts:
|
|
cur_job = (i, order, extra, X)
|
|
results[i] = [order, extra, X]
|
|
tasksq.put(cur_job)
|
|
i += 1
|
|
pbar.update(i)
|
|
submit_end = time.time()
|
|
pbar.finish()
|
|
|
|
print("it took %0.2f seconds to submit"
|
|
" the workload" % (submit_end - submit_start,))
|
|
|
|
print "len(results)", len(results)
|
|
|
|
for i in xrange(participants):
|
|
print "sending worker %d start message" % (i + 1,)
|
|
minionsq.put("start")
|
|
|
|
receive_start = time.time()
|
|
widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()]
|
|
pbar = ProgressBar(widgets=widgets, maxval=count)
|
|
pbar.start()
|
|
for i in xrange(count):
|
|
rid, qlin, err, final, exact, duration = resultsq.get()
|
|
results[rid].extend((qlin, err, final, exact, duration))
|
|
pbar.update(i + 1)
|
|
receive_end = time.time()
|
|
pbar.finish()
|
|
|
|
# pickle.dump(results, open(results_p_name, 'w'))
|
|
o = open(results_p_name, 'w')
|
|
for i in results:
|
|
if dimension == 2:
|
|
o.write("%d %d %e %e %e %e %e %e %e\n" % (i[0], i[1], i[2][0],
|
|
i[2][1], i[3], i[4], i[5], i[6], i[7]))
|
|
elif dimension == 3:
|
|
o.write("%d %d %e %e %e %e %e %e %e %e\n" % (i[0], i[1], i[2][0],
|
|
i[2][1], i[2][2], i[3], i[4], i[5], i[6], i[7]))
|
|
|
|
submit = submit_end - submit_start
|
|
receive = receive_end - receive_start
|
|
|
|
# shut down all participants
|
|
for i in xrange(participants):
|
|
minionsq.put("teardown")
|
|
|
|
# post processing
|
|
stats = {}
|
|
stats['submit'] = float(submit)
|
|
stats['receive'] = float(receive)
|
|
stats['count'] = count
|
|
stats['participants'] = participants
|
|
|
|
print "%s" % stats
|