diff --git a/bin/results.py b/bin/results.py deleted file mode 100644 index a647756..0000000 --- a/bin/results.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python - -from multiprocessing.managers import BaseManager - -import numpy as np - -import interp.bootstrap - -class QueueManager(BaseManager): pass -QueueManager.register('get_outqueue') - -m = QueueManager(address=('gannon', 50000), authkey='asdf') -m.connect() - -if __name__ == '__main__': - outq = m.get_outqueue() - - while True: - print outq.get() diff --git a/bin/slave.py b/bin/slave.py index 12aad3f..171a115 100644 --- a/bin/slave.py +++ b/bin/slave.py @@ -45,6 +45,7 @@ if __name__ == '__main__': g = ggrid(input_file) g.q = np.array([exact(x) for x in g.verts]) + outq.put((myname, "ready")) while True: i, o, e, X = inq.get() diff --git a/bin/submit.py b/bin/submit.py index 2b20b93..e386e53 100644 --- a/bin/submit.py +++ b/bin/submit.py @@ -2,24 +2,61 @@ import sys +import time + from multiprocessing.managers import BaseManager import numpy as np import interp.bootstrap +import logging +log = logging.getLogger("interp") + class QueueManager(BaseManager): pass QueueManager.register('get_inqueue' ) +QueueManager.register('get_outqueue') -m = QueueManager(address=('gannon', 50000), authkey='asdf') -m.connect() +def run_queries(count, order = 3, extra_points = 8): + r = [] + submit_start = time.time() + for i in xrange(count): + X = np.random.random((1,3))[0] + inq.put((i,order,extra_points,X)) + submit_end = time.time() + + receive_start = time.time() + for i in xrange(count): + r.append(outq.get()) + receive_end = time.time() + + return (r, submit_end - submit_start, receive_end - receive_start) if __name__ == '__main__': - start = int(sys.argv[1]) - end = int(sys.argv[2]) + server = 'localhost' + expected_participants = 6 + + m = QueueManager(address=(server, 50000), authkey='asdf') + m.connect() + + count = int(sys.argv[1]) inq = m.get_inqueue() + outq = m.get_outqueue() - for i in xrange(start, end + 1): - X = np.random.random((1,3))[0] - inq.put((i,X)) + # wait for all participants to be loaded up + for i in xrange(expected_participants): + print outq.get() + + # run codes + r, submit, receive = run_queries(count, order=3, extra_points = 64) + + for i in xrange(expected_participants): + inq.put([None] * expected_participants) + + results = {} + results['submit' ] = float(submit) + results['receive'] = float(receive) + + print "submit time: %(submit)0.2f seconds, results time: %(receive)0.2f" % results + log.error(results)