diff --git a/bin/interactq.py b/bin/interactq.py index a448799..92facb1 100644 --- a/bin/interactq.py +++ b/bin/interactq.py @@ -8,3 +8,14 @@ m.connect() m.get_outqueue() oq = m.get_outqueue() iq = m.get_inqueue() + +import time +import numpy as np +s = time.time() + +for i in xrange(int(1e6)): + a = i + X = np.random.random((1,3))[0] + iq.put((i,5,128,X)) + +e = time.time() diff --git a/bin/iqstat.py b/bin/iqstat.py index 0c196c6..dd5eb71 100644 --- a/bin/iqstat.py +++ b/bin/iqstat.py @@ -1,13 +1,13 @@ import interp.bootstrap -from multiprocessing.managers import BaseManager -class QueueManager(BaseManager): pass -QueueManager.register('get_inqueue' ) -QueueManager.register('get_outqueue') +from interp.cluster import QueueManager, get_qs + m = QueueManager(address=('localhost', 6666), authkey='asdf') m.connect() -m.get_outqueue() -oq = m.get_outqueue() -iq = m.get_inqueue() + +tq,rq,mq,sq = get_qs(m) print "interp queue status:" -print " oq: %d, iq: %d" % (oq.qsize(), iq.qsize()) +print " tasksq : %d" % tq.qsize() +print " resultsq : %d" % rq.qsize() +print " masterq : %d" % mq.qsize() +print " slavesq : %d" % sq.qsize() diff --git a/bin/master.py b/bin/master.py index 9b2947d..9b4ae49 100644 --- a/bin/master.py +++ b/bin/master.py @@ -10,8 +10,6 @@ from progressbar import * from collections import defaultdict from optparse import OptionParser -from multiprocessing.managers import BaseManager - import numpy as np import interp.bootstrap @@ -19,50 +17,11 @@ import interp.bootstrap import logging log = logging.getLogger("interp") -class QueueManager(BaseManager): pass -QueueManager.register('get_inqueue' ) -QueueManager.register('get_outqueue') - -def run_queries(count, order = 3, extra_points = 8, verbose = False): - r = [] - submit_start = time.time() - for i in xrange(count): - X = np.random.random((1,3))[0] - inq.put((i,order,extra_points,X)) - submit_end = time.time() - - receive_start = time.time() - - widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()] - pbar = ProgressBar(widgets = widgets, maxval = count) - if verbose: - pbar.start() - for i in xrange(count): - r.append(outq.get()) - if verbose: - pbar.update(i+1) - receive_end = time.time() - if verbose: - pbar.finish() - - return (r, submit_end - submit_start, receive_end - receive_start) - -def clean_up(inq, expected_participants, shutdown = False): - """ - I replace the normal first argument to the inq with None, and then do some - code branching based on the second paramter, which is shutdown - """ - for i in xrange(expected_participants): - inq.put([None, shutdown, None, None]) - +from interp.cluster import QueueManager, get_qs if __name__ == '__main__': parser = OptionParser(usage = "usage: %s [options] ") - parser.add_option("-v", "--verbose", - action="store_true", dest="verbose", default=False, - help="verbose flag (display progress bar: %default)") - parser.add_option("-l", "--last-time", action="store_true", dest="last", default=False, help="when finished, send shutdown signal to nodes (default: %default)") @@ -94,20 +53,46 @@ if __name__ == '__main__': m = QueueManager(address=(server, options.port), authkey='asdf') m.connect() - inq = m.get_inqueue() - outq = m.get_outqueue() + tasksq, resultsq, masterq, slavesq = get_qs(m) - # wait for all participants to be loaded up + print "wait on all participants" for i in xrange(expected_participants): - worker = outq.get() - if options.verbose: - print "%d of %d : %s" % (i, expected_participants, worker) + worker = masterq.get() + print "%d of %d : %s is ready" % (i, expected_participants - 1, worker) - # run codes - results, submit, receive = run_queries(count, order = options.order, extra_points = options.extra, verbose = options.verbose) + print "everyone ready!" + + results = [] + + submit_start = time.time() + for i in xrange(count): + X = np.random.random((1,3))[0] + tasksq.put((i, options.order, options.extra, X)) + submit_end = time.time() + + for i in xrange(expected_participants): + print "sending %d th start message" % i + slavesq.put("start") + + receive_start = time.time() + widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()] + pbar = ProgressBar(widgets = widgets, maxval = count) + pbar.start() + for i in xrange(count): + results.append(resultsq.get()) + pbar.update(i+1) + receive_end = time.time() + pbar.finish() + + submit = submit_end - submit_start + receive = receive_end - receive_start # shut down all participants - clean_up(inq, expected_participants) + for i in xrange(expected_participants): + if options.last: + slavesq.put("teardown") + else: + slavesq.put("prepare for more") # post processing stats = {} @@ -116,8 +101,7 @@ if __name__ == '__main__': stats['count' ] = count stats['expected_participants'] = expected_participants - if options.verbose: - print "%s" % stats + print "%s" % stats log.error("stats: %s", stats) tasks_accomplished_by = defaultdict(int) @@ -134,6 +118,3 @@ if __name__ == '__main__': 'results' : npresults, } s.close() - - if options.last: - clean_up(inq, expected_participants, shutdown = True) diff --git a/bin/server.py b/bin/server.py index 34daba5..06da8ab 100644 --- a/bin/server.py +++ b/bin/server.py @@ -1,15 +1,6 @@ #!/usr/bin/env python -from multiprocessing.managers import BaseManager -import Queue - -inqueue = Queue.Queue() -outqueue = Queue.Queue() -class QueueManager(BaseManager): - pass -QueueManager.register('get_inqueue', callable=lambda:inqueue) -QueueManager.register('get_outqueue', callable=lambda:outqueue) - +from interp.cluster import QueueManager m = QueueManager(address=('', 6666), authkey='asdf') s = m.get_server() diff --git a/bin/slave.py b/bin/slave.py index 3215570..182823f 100644 --- a/bin/slave.py +++ b/bin/slave.py @@ -15,33 +15,21 @@ import interp.bootstrap from interp.grid.gmsh import ggrid from interp.tools import baker_exact_3D as exact -class QueueManager(BaseManager): pass -QueueManager.register('get_inqueue' ) -QueueManager.register('get_outqueue') +from interp.cluster import QueueManager, get_qs + +def keep_working(controls): + pass def work(inq, outq, g, myname): - # print "%s about to send my name: %s" % (datetime.datetime.now(), myname) - outq.put((myname, "ready")) - - while True: - i, o, e, X = inq.get() - if i == None: - shutdown = o - return shutdown - try: - a = g.run_baker(X, order = o, extra_points = e) - outq.put((i, myname, a['qlin'], a['error'], a['final'], exact(X))) - except Exception as e: - print X, e - outq.put((i, myname, 0.0, 0.0, 0.0, 0.0)) + pass if __name__ == '__main__': parser = OptionParser(usage = "usage: %s [options] ") - parser.add_option('-l', '--label', - type="str", dest="label", default='jane', - help="specify this slave's response label (default: %default)") + parser.add_option("-v", "--verbose", + action="store_true", dest="verbose", default=False, + help="verbose flag (display progress bar: %default)") parser.add_option('-p', '--port', type="int", dest="port", default=6666, @@ -58,19 +46,32 @@ if __name__ == '__main__': m = QueueManager(address=(server, options.port), authkey='asdf') m.connect() - inq = m.get_inqueue() - outq = m.get_outqueue() + tasksq, resultsq, masterq, slavesq = get_qs(m) g = ggrid(input_file) g.q = np.array([exact(x) for x in g.verts]) myname = "%s-%d" % (os.uname()[1], os.getpid()) + if options.verbose: + print myname - shutdown = False - while not shutdown: - # this sleep prevents a race condition in work(): someone putting - # themselves in as ready, pulling someone elses's shutdown signal, then - # putting their name in again. - time.sleep(5) + + while True: + # indicate that I am loaded up, and ready for workload + masterq.put(myname) + # wait for master's start signal + action = slavesq.get() + if action == "teardown": + break - shutdown = work(inq, outq, g, myname) + while not tasksq.empty(): + i, o, e, X = tasksq.get() + try: + a = g.run_baker(X, order = o, extra_points = e) + resultsq.put((i, myname, a['qlin'], a['error'], a['final'], exact(X))) + except Exception as e: + print X, e + resultsq.put((i, myname, 0.0, 0.0, 0.0, 0.0)) + + if options.verbose: + print "exiting"