smbinterp/bin/master.py

125 lines
3.3 KiB
Python

#!/usr/bin/env python
import sys
import os
import time
import shelve
from progressbar import *
from collections import defaultdict
from optparse import OptionParser
from multiprocessing.managers import BaseManager
import numpy as np
import interp.bootstrap
import logging
log = logging.getLogger("interp")
class QueueManager(BaseManager): pass
QueueManager.register('get_inqueue' )
QueueManager.register('get_outqueue')
def run_queries(count, order = 3, extra_points = 8, verbose = False):
r = []
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1,3))[0]
inq.put((i,order,extra_points,X))
submit_end = time.time()
receive_start = time.time()
widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
if verbose:
pbar.start()
for i in xrange(count):
r.append(outq.get())
if verbose:
pbar.update(i+1)
receive_end = time.time()
if verbose:
pbar.finish()
return (r, submit_end - submit_start, receive_end - receive_start)
if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <expected participants> <interp count>")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose flag (display progress bar: %default)")
parser.add_option('-p', '--port',
type="int", dest="port", default=6666,
help="specify the port to use on the server (default: %default)")
parser.add_option("-o", "--order",
type="int", dest="order", default=2,
help="order of interpolation (default: %default)")
parser.add_option("-e", "--extra-points",
type="int", dest="extra", default=3,
help="number of extra points (default: %default)")
parser.add_option('-s', '--shelve',
type="str", dest="shelvename", default=os.path.expanduser('~/interp.shelve'),
help="shelve output file (default: %default)")
(options, args) = parser.parse_args()
if len(args) != 3:
parser.print_usage()
sys.exit(1)
server = args[0]
expected_participants, count = (int(i) for i in args[1:])
m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect()
inq = m.get_inqueue()
outq = m.get_outqueue()
# wait for all participants to be loaded up
for i in xrange(expected_participants):
worker = outq.get()
if options.verbose:
print worker
# run codes
results, submit, receive = run_queries(count, order = options.order, extra_points = options.extra, verbose = options.verbose)
# shut down all participants
for i in xrange(expected_participants):
inq.put([None] * 4)
# post processing
stats = {}
stats['submit' ] = float(submit)
stats['receive' ] = float(receive)
stats['count' ] = count
stats['expected_participants'] = expected_participants
if options.verbose:
print "%s" % stats
log.error("stats: %s", stats)
tasks_accomplished_by = defaultdict(int)
for i in results:
tasks_accomplished_by[i[1]] += 1
stats['tasks'] = tasks_accomplished_by
npresults = np.array([(i[0],i[2],i[3],i[4], i[5]) for i in results])
s = shelve.open(options.shelvename)
n = str(time.time())
s[n] = {
'stats' : stats,
'results' : npresults,
}
s.close()