added two more queues

This commit is contained in:
Stephen McQuay 2011-04-02 00:10:29 -06:00
parent 314142b11a
commit 3e21904583
5 changed files with 87 additions and 103 deletions

View File

@ -8,3 +8,14 @@ m.connect()
m.get_outqueue() m.get_outqueue()
oq = m.get_outqueue() oq = m.get_outqueue()
iq = m.get_inqueue() iq = m.get_inqueue()
import time
import numpy as np
s = time.time()
for i in xrange(int(1e6)):
a = i
X = np.random.random((1,3))[0]
iq.put((i,5,128,X))
e = time.time()

View File

@ -1,13 +1,13 @@
import interp.bootstrap import interp.bootstrap
from multiprocessing.managers import BaseManager from interp.cluster import QueueManager, get_qs
class QueueManager(BaseManager): pass
QueueManager.register('get_inqueue' )
QueueManager.register('get_outqueue')
m = QueueManager(address=('localhost', 6666), authkey='asdf') m = QueueManager(address=('localhost', 6666), authkey='asdf')
m.connect() m.connect()
m.get_outqueue()
oq = m.get_outqueue() tq,rq,mq,sq = get_qs(m)
iq = m.get_inqueue()
print "interp queue status:" print "interp queue status:"
print " oq: %d, iq: %d" % (oq.qsize(), iq.qsize()) print " tasksq : %d" % tq.qsize()
print " resultsq : %d" % rq.qsize()
print " masterq : %d" % mq.qsize()
print " slavesq : %d" % sq.qsize()

View File

@ -10,8 +10,6 @@ from progressbar import *
from collections import defaultdict from collections import defaultdict
from optparse import OptionParser from optparse import OptionParser
from multiprocessing.managers import BaseManager
import numpy as np import numpy as np
import interp.bootstrap import interp.bootstrap
@ -19,50 +17,11 @@ import interp.bootstrap
import logging import logging
log = logging.getLogger("interp") log = logging.getLogger("interp")
class QueueManager(BaseManager): pass from interp.cluster import QueueManager, get_qs
QueueManager.register('get_inqueue' )
QueueManager.register('get_outqueue')
def run_queries(count, order = 3, extra_points = 8, verbose = False):
r = []
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1,3))[0]
inq.put((i,order,extra_points,X))
submit_end = time.time()
receive_start = time.time()
widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
if verbose:
pbar.start()
for i in xrange(count):
r.append(outq.get())
if verbose:
pbar.update(i+1)
receive_end = time.time()
if verbose:
pbar.finish()
return (r, submit_end - submit_start, receive_end - receive_start)
def clean_up(inq, expected_participants, shutdown = False):
"""
I replace the normal first argument to the inq with None, and then do some
code branching based on the second paramter, which is shutdown
"""
for i in xrange(expected_participants):
inq.put([None, shutdown, None, None])
if __name__ == '__main__': if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <expected participants> <interp count>") parser = OptionParser(usage = "usage: %s [options] <server> <expected participants> <interp count>")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="verbose flag (display progress bar: %default)")
parser.add_option("-l", "--last-time", parser.add_option("-l", "--last-time",
action="store_true", dest="last", default=False, action="store_true", dest="last", default=False,
help="when finished, send shutdown signal to <expected participants> nodes (default: %default)") help="when finished, send shutdown signal to <expected participants> nodes (default: %default)")
@ -94,20 +53,46 @@ if __name__ == '__main__':
m = QueueManager(address=(server, options.port), authkey='asdf') m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect() m.connect()
inq = m.get_inqueue() tasksq, resultsq, masterq, slavesq = get_qs(m)
outq = m.get_outqueue()
# wait for all participants to be loaded up print "wait on all participants"
for i in xrange(expected_participants): for i in xrange(expected_participants):
worker = outq.get() worker = masterq.get()
if options.verbose: print "%d of %d : %s is ready" % (i, expected_participants - 1, worker)
print "%d of %d : %s" % (i, expected_participants, worker)
# run codes print "everyone ready!"
results, submit, receive = run_queries(count, order = options.order, extra_points = options.extra, verbose = options.verbose)
results = []
submit_start = time.time()
for i in xrange(count):
X = np.random.random((1,3))[0]
tasksq.put((i, options.order, options.extra, X))
submit_end = time.time()
for i in xrange(expected_participants):
print "sending %d th start message" % i
slavesq.put("start")
receive_start = time.time()
widgets = ['Progress: ', Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets = widgets, maxval = count)
pbar.start()
for i in xrange(count):
results.append(resultsq.get())
pbar.update(i+1)
receive_end = time.time()
pbar.finish()
submit = submit_end - submit_start
receive = receive_end - receive_start
# shut down all participants # shut down all participants
clean_up(inq, expected_participants) for i in xrange(expected_participants):
if options.last:
slavesq.put("teardown")
else:
slavesq.put("prepare for more")
# post processing # post processing
stats = {} stats = {}
@ -116,8 +101,7 @@ if __name__ == '__main__':
stats['count' ] = count stats['count' ] = count
stats['expected_participants'] = expected_participants stats['expected_participants'] = expected_participants
if options.verbose: print "%s" % stats
print "%s" % stats
log.error("stats: %s", stats) log.error("stats: %s", stats)
tasks_accomplished_by = defaultdict(int) tasks_accomplished_by = defaultdict(int)
@ -134,6 +118,3 @@ if __name__ == '__main__':
'results' : npresults, 'results' : npresults,
} }
s.close() s.close()
if options.last:
clean_up(inq, expected_participants, shutdown = True)

View File

@ -1,15 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
from multiprocessing.managers import BaseManager from interp.cluster import QueueManager
import Queue
inqueue = Queue.Queue()
outqueue = Queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_inqueue', callable=lambda:inqueue)
QueueManager.register('get_outqueue', callable=lambda:outqueue)
m = QueueManager(address=('', 6666), authkey='asdf') m = QueueManager(address=('', 6666), authkey='asdf')
s = m.get_server() s = m.get_server()

View File

@ -15,33 +15,21 @@ import interp.bootstrap
from interp.grid.gmsh import ggrid from interp.grid.gmsh import ggrid
from interp.tools import baker_exact_3D as exact from interp.tools import baker_exact_3D as exact
class QueueManager(BaseManager): pass from interp.cluster import QueueManager, get_qs
QueueManager.register('get_inqueue' )
QueueManager.register('get_outqueue') def keep_working(controls):
pass
def work(inq, outq, g, myname): def work(inq, outq, g, myname):
# print "%s about to send my name: %s" % (datetime.datetime.now(), myname) pass
outq.put((myname, "ready"))
while True:
i, o, e, X = inq.get()
if i == None:
shutdown = o
return shutdown
try:
a = g.run_baker(X, order = o, extra_points = e)
outq.put((i, myname, a['qlin'], a['error'], a['final'], exact(X)))
except Exception as e:
print X, e
outq.put((i, myname, 0.0, 0.0, 0.0, 0.0))
if __name__ == '__main__': if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <gmsh file>") parser = OptionParser(usage = "usage: %s [options] <server> <gmsh file>")
parser.add_option('-l', '--label', parser.add_option("-v", "--verbose",
type="str", dest="label", default='jane', action="store_true", dest="verbose", default=False,
help="specify this slave's response label (default: %default)") help="verbose flag (display progress bar: %default)")
parser.add_option('-p', '--port', parser.add_option('-p', '--port',
type="int", dest="port", default=6666, type="int", dest="port", default=6666,
@ -58,19 +46,32 @@ if __name__ == '__main__':
m = QueueManager(address=(server, options.port), authkey='asdf') m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect() m.connect()
inq = m.get_inqueue() tasksq, resultsq, masterq, slavesq = get_qs(m)
outq = m.get_outqueue()
g = ggrid(input_file) g = ggrid(input_file)
g.q = np.array([exact(x) for x in g.verts]) g.q = np.array([exact(x) for x in g.verts])
myname = "%s-%d" % (os.uname()[1], os.getpid()) myname = "%s-%d" % (os.uname()[1], os.getpid())
if options.verbose:
print myname
shutdown = False
while not shutdown: while True:
# this sleep prevents a race condition in work(): someone putting # indicate that I am loaded up, and ready for workload
# themselves in as ready, pulling someone elses's shutdown signal, then masterq.put(myname)
# putting their name in again. # wait for master's start signal
time.sleep(5) action = slavesq.get()
if action == "teardown":
break
shutdown = work(inq, outq, g, myname) while not tasksq.empty():
i, o, e, X = tasksq.get()
try:
a = g.run_baker(X, order = o, extra_points = e)
resultsq.put((i, myname, a['qlin'], a['error'], a['final'], exact(X)))
except Exception as e:
print X, e
resultsq.put((i, myname, 0.0, 0.0, 0.0, 0.0))
if options.verbose:
print "exiting"