smbinterp/bin/shepherd.py

134 lines
3.5 KiB
Python

#!/usr/bin/env python
import sys
import os
import time
import shelve
from progressbar import *
from collections import defaultdict
from optparse import OptionParser
import pickle
import numpy as np
import interp.bootstrap
import logging
log = logging.getLogger("interp")
from interp.cluster import QueueManager, get_qs
if __name__ == '__main__':
parser = OptionParser(usage = "%prog [options] <server> <destination vertexes file (pickle)> <results pickle>")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose flag")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
parser.add_option('-n', '--node-count',
type="int", dest="participants", default=None,
help="specify how many participants we should wait for (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 3:
parser.print_usage()
sys.exit(1)
server, dest_verts, results_p_name = args
dest_verts = pickle.load(open(dest_verts, 'r'))
orders = [2,3,4,5]
extras = [4,6,8,12,16,20,32,48,64,96,128,192,256]
count = len(orders) * len(extras) * sum((len(i) for i in dest_verts))
results = [None] * count
# results = [id, order, extra, X, qlin, err, final, exact, time]
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
tasksq, resultsq, masterq, minionsq = get_qs(m)
if not options.participants:
print "wait on all announced participants"
participants = 0
while not masterq.empty():
participants += 1
worker = masterq.get()
print "%s (%d)" % (worker,participants)
if participants == 0:
print "nobody found"
sys.exit(1)
else:
participants = options.participants
print "wait on %d participants" % participants
for i in xrange(participants):
worker = masterq.get()
print "%s (%d/%d)" % (worker, i+1, participants)
print "Submitting %d pieces of workload" % count
widgets = ['submit jobs: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
submit_start = time.time()
i = 0
for order in orders:
for extra in extras:
for d in dest_verts:
for X in d:
cur_job = (i, order, extra, X)
results[i] = [ order, extra, X]
tasksq.put(cur_job)
i+=1
pbar.update(i)
submit_end = time.time()
pbar.finish()
print "it took %0.2f seconds to submit the workload" % (submit_end - submit_start,)
print "len(results)", len(results)
for i in xrange(participants):
print "sending worker %d start message" % (i+1,)
minionsq.put("start")
receive_start = time.time()
widgets = ['interpolate: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
for i in xrange(count):
rid, qlin, err, final, exact, duration = resultsq.get()
results[rid].extend((qlin, err, final, exact, duration))
pbar.update(i+1)
receive_end = time.time()
pbar.finish()
pickle.dump(results, open(results_p_name, 'w'))
submit = submit_end - submit_start
receive = receive_end - receive_start
# shut down all participants
for i in xrange(participants):
minionsq.put("teardown")
# post processing
stats = {}
stats['submit' ] = float(submit)
stats['receive' ] = float(receive)
stats['count' ] = count
stats['participants'] = participants
print "%s" % stats