smbinterp/bin/master.py

148 lines
4.0 KiB
Python

#!/usr/bin/env python
import sys
import os
import time
import shelve
from collections import defaultdict
from optparse import OptionParser
import logging
log = logging.getLogger("interp")
import numpy as np
from interp.cluster import QueueManager, get_qs
from progressbar import *
if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <interp count>")
parser.add_option("-l", "--last-time",
action="store_true", dest="last", default=False,
help="when finished, send shutdown signal to connected nodes (default: %default)")
parser.add_option('-n', '--node-count',
type="int", dest="participants", default=None,
help="specify how many participants we should wait for (default: %default)")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
parser.add_option("-o", "--order",
type="int", dest="order", default=2,
help="order of interpolation (default: %default)")
parser.add_option("-e", "--extra-points",
type="int", dest="extra", default=3,
help="number of extra points (default: %default)")
parser.add_option('-s', '--shelve',
type="str", dest="shelvename", default=os.path.expanduser('~/interp.shelve'),
help="shelve output file (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 2:
parser.print_usage()
sys.exit(1)
server = args[0]
count = int(float(args[1]))
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
tasksq, resultsq, masterq, minionsq = get_qs(m)
workers = []
if not options.participants:
print "wait on all announced participants"
participants = 0
while not masterq.empty():
participants += 1
worker = masterq.get()
workers.append(worker)
print "%d: %s is ready" % (participants, worker)
if participants == 0:
print "nobody found"
sys.exit(1)
else:
participants = options.participants
print "wait on %d participants" % participants
for i in xrange(participants):
worker = masterq.get()
workers.append(worker)
print "%d of %d: %s is ready" % (i+1, participants, worker)
if len(set(workers)) != len(workers):
for i in workers:
minionsq.put("slay")
raise Exception("duplicate workers reported")
results = []
widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1,3))[0]
tasksq.put((i, options.order, options.extra, X))
pbar.update(i+1)
submit_end = time.time()
pbar.finish()
for i in xrange(participants):
print "sending worker %d start message" % (i+1,)
minionsq.put("start")
receive_start = time.time()
widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
for i in xrange(count):
cur_result = resultsq.get()
results.append(cur_result)
pbar.update(i+1)
receive_end = time.time()
pbar.finish()
submit = submit_end - submit_start
receive = receive_end - receive_start
# shut down all participants
for i in xrange(participants):
if options.last:
minionsq.put("teardown")
# post processing
stats = {}
stats['submit' ] = float(submit)
stats['receive' ] = float(receive)
stats['count' ] = count
stats['participants'] = participants
stats['extra' ] = options.extra
stats['order' ] = options.order
print "%s" % stats
log.error("stats: %s", stats)
tasks_accomplished_by = defaultdict(int)
for i in results:
tasks_accomplished_by[i[1]] += 1
stats['tasks'] = tasks_accomplished_by
# npresults = np.array([(i[0],i[2],i[3],i[4], i[5]) for i in results])
n = str(time.time())
s = shelve.open(options.shelvename)
s[n] = {
'stats' : stats,
# 'results' : npresults,
}
s.close()