smbinterp/bin/master.py

145 lines
4.2 KiB
Python

#!/usr/bin/env python
from collections import defaultdict
from optparse import OptionParser
import os
from progressbar import ProgressBar, Percentage, Bar, ETA
import shelve
import sys
import time
import numpy as np
from interp.cluster import QueueManager, get_qs
if __name__ == '__main__':
parser = OptionParser(usage="usage: %s [options] <server> <interp count>")
parser.add_option("-l", "--last-time",
action="store_true", dest="last", default=False,
help="when finished, send shutdown signal to connected nodes"
"(default: %default)")
parser.add_option('-n', '--node-count',
type="int", dest="participants", default=None,
help="specify how many participants we should wait for"
"(default: %default)")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
parser.add_option("-o", "--order",
type="int", dest="order", default=2,
help="order of interpolation (default: %default)")
parser.add_option("-e", "--extra-points",
type="int", dest="extra", default=3,
help="number of extra points (default: %default)")
parser.add_option('-s', '--shelve',
type="str", dest="shelvename",
default=os.path.expanduser('~/interp.shelve'),
help="shelve output file (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 2:
parser.print_usage()
sys.exit(1)
server = args[0]
count = int(float(args[1]))
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
tasksq, resultsq, masterq, minionsq = get_qs(m)
workers = []
if not options.participants:
print "wait on all announced participants"
participants = 0
while not masterq.empty():
participants += 1
worker = masterq.get()
workers.append(worker)
print "%d: %s is ready" % (participants, worker)
if participants == 0:
print "nobody found"
sys.exit(1)
else:
participants = options.participants
print "wait on %d participants" % participants
for i in xrange(participants):
worker = masterq.get()
workers.append(worker)
print "%d of %d: %s is ready" % (i + 1, participants, worker)
if len(set(workers)) != len(workers):
for i in workers:
minionsq.put("slay")
raise Exception("duplicate workers reported")
results = []
widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets=widgets, maxval=count)
pbar.start()
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1, 3))[0]
tasksq.put((i, options.order, options.extra, X))
pbar.update(i + 1)
submit_end = time.time()
pbar.finish()
for i in xrange(participants):
print "sending worker %d start message" % (i + 1,)
minionsq.put("start")
receive_start = time.time()
widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets=widgets, maxval=count)
pbar.start()
for i in xrange(count):
cur_result = resultsq.get()
results.append(cur_result)
pbar.update(i + 1)
receive_end = time.time()
pbar.finish()
submit = submit_end - submit_start
receive = receive_end - receive_start
# shut down all participants
for i in xrange(participants):
if options.last:
minionsq.put("teardown")
# post processing
stats = {}
stats['submit'] = float(submit)
stats['receive'] = float(receive)
stats['count'] = count
stats['participants'] = participants
stats['extra'] = options.extra
stats['order'] = options.order
print "%s" % stats
tasks_accomplished_by = defaultdict(int)
for i in results:
tasks_accomplished_by[i[1]] += 1
stats['tasks'] = tasks_accomplished_by
npresults = np.array([(i[0], i[2], i[3], i[4], i[5]) for i in results])
n = str(time.time())
s = shelve.open(options.shelvename)
s[n] = {
'stats': stats,
'results': npresults,
}
s.close()