starting to add robustness to distributed scripts

This commit is contained in:
Stephen McQuay 2011-03-30 16:04:49 -06:00
parent d4aea22e0b
commit 13ff63bb9b
3 changed files with 45 additions and 26 deletions

View File

@ -1,19 +0,0 @@
#!/usr/bin/env python
from multiprocessing.managers import BaseManager
import numpy as np
import interp.bootstrap
class QueueManager(BaseManager): pass
QueueManager.register('get_outqueue')
m = QueueManager(address=('gannon', 50000), authkey='asdf')
m.connect()
if __name__ == '__main__':
outq = m.get_outqueue()
while True:
print outq.get()

View File

@ -45,6 +45,7 @@ if __name__ == '__main__':
g = ggrid(input_file) g = ggrid(input_file)
g.q = np.array([exact(x) for x in g.verts]) g.q = np.array([exact(x) for x in g.verts])
outq.put((myname, "ready"))
while True: while True:
i, o, e, X = inq.get() i, o, e, X = inq.get()

View File

@ -2,24 +2,61 @@
import sys import sys
import time
from multiprocessing.managers import BaseManager from multiprocessing.managers import BaseManager
import numpy as np import numpy as np
import interp.bootstrap import interp.bootstrap
import logging
log = logging.getLogger("interp")
class QueueManager(BaseManager): pass class QueueManager(BaseManager): pass
QueueManager.register('get_inqueue' ) QueueManager.register('get_inqueue' )
QueueManager.register('get_outqueue')
m = QueueManager(address=('gannon', 50000), authkey='asdf') def run_queries(count, order = 3, extra_points = 8):
m.connect() r = []
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1,3))[0]
inq.put((i,order,extra_points,X))
submit_end = time.time()
receive_start = time.time()
for i in xrange(count):
r.append(outq.get())
receive_end = time.time()
return (r, submit_end - submit_start, receive_end - receive_start)
if __name__ == '__main__': if __name__ == '__main__':
start = int(sys.argv[1]) server = 'localhost'
end = int(sys.argv[2]) expected_participants = 6
m = QueueManager(address=(server, 50000), authkey='asdf')
m.connect()
count = int(sys.argv[1])
inq = m.get_inqueue() inq = m.get_inqueue()
outq = m.get_outqueue()
for i in xrange(start, end + 1): # wait for all participants to be loaded up
X = np.random.random((1,3))[0] for i in xrange(expected_participants):
inq.put((i,X)) print outq.get()
# run codes
r, submit, receive = run_queries(count, order=3, extra_points = 64)
for i in xrange(expected_participants):
inq.put([None] * expected_participants)
results = {}
results['submit' ] = float(submit)
results['receive'] = float(receive)
print "submit time: %(submit)0.2f seconds, results time: %(receive)0.2f" % results
log.error(results)