auto calculate participants

This commit is contained in:
Stephen McQuay 2011-04-02 00:34:54 -06:00
parent 1fa69670c3
commit 36ba080d14
1 changed files with 14 additions and 11 deletions

View File

@ -20,11 +20,11 @@ log = logging.getLogger("interp")
from interp.cluster import QueueManager, get_qs from interp.cluster import QueueManager, get_qs
if __name__ == '__main__': if __name__ == '__main__':
parser = OptionParser(usage = "usage: %s [options] <server> <expected participants> <interp count>") parser = OptionParser(usage = "usage: %s [options] <server> <interp count>")
parser.add_option("-l", "--last-time", parser.add_option("-l", "--last-time",
action="store_true", dest="last", default=False, action="store_true", dest="last", default=False,
help="when finished, send shutdown signal to <expected participants> nodes (default: %default)") help="when finished, send shutdown signal to connected nodes (default: %default)")
parser.add_option('-p', '--port', parser.add_option('-p', '--port',
type="int", dest="port", default=6666, type="int", dest="port", default=6666,
@ -48,7 +48,7 @@ if __name__ == '__main__':
sys.exit(1) sys.exit(1)
server = args[0] server = args[0]
expected_participants, count = (int(i) for i in args[1:]) count = int(args[1])
m = QueueManager(address=(server, options.port), authkey='asdf') m = QueueManager(address=(server, options.port), authkey='asdf')
m.connect() m.connect()
@ -56,9 +56,12 @@ if __name__ == '__main__':
tasksq, resultsq, masterq, slavesq = get_qs(m) tasksq, resultsq, masterq, slavesq = get_qs(m)
print "wait on all participants" print "wait on all participants"
for i in xrange(expected_participants): for i in xrange(participants):
participants = 0
while not masterq.empty():
participants += 1:
worker = masterq.get() worker = masterq.get()
print "%d of %d : %s is ready" % (i, expected_participants - 1, worker) print "%d: %s is ready" % (participants - 1, worker)
print "everyone ready!" print "everyone ready!"
@ -70,7 +73,7 @@ if __name__ == '__main__':
tasksq.put((i, options.order, options.extra, X)) tasksq.put((i, options.order, options.extra, X))
submit_end = time.time() submit_end = time.time()
for i in xrange(expected_participants): for i in xrange(participants):
print "sending %d th start message" % i print "sending %d th start message" % i
slavesq.put("start") slavesq.put("start")
@ -88,7 +91,7 @@ if __name__ == '__main__':
receive = receive_end - receive_start receive = receive_end - receive_start
# shut down all participants # shut down all participants
for i in xrange(expected_participants): for i in xrange(participants):
if options.last: if options.last:
slavesq.put("teardown") slavesq.put("teardown")
else: else:
@ -96,10 +99,10 @@ if __name__ == '__main__':
# post processing # post processing
stats = {} stats = {}
stats['submit' ] = float(submit) stats['submit' ] = float(submit)
stats['receive' ] = float(receive) stats['receive' ] = float(receive)
stats['count' ] = count stats['count' ] = count
stats['expected_participants'] = expected_participants stats['participants'] = participants
print "%s" % stats print "%s" % stats
log.error("stats: %s", stats) log.error("stats: %s", stats)