def __init__(self):
self.loop = ioloop.IOLoop.current()
# load topography from file
self._loadTopology()
self.context = zmq.Context()
self.zk = KazooClient()
self.zk.start(timeout=1000)
# send own address to zookeeper
self.zk.ensure_path("/addr")
self.zk.create(("/addr/%s" % self.getOwnName()), bytes(self.getOwnAddr(), "UTF-8"))
# get IP addresses from zookeeper
all_names = {k for k in self.topo.keys() if k.isnumeric() and k != self.getOwnName()}
self.addresses = {}
for name in all_names:
cv = threading.Condition()
cv.acquire()
def wakeup_watch(stat):
cv.acquire()
cv.notify()
cv.release()
ex = self.zk.exists(("/addr/%s" % name), wakeup_watch)
if not ex:
cv.wait()
(addr, _) = self.zk.get("/addr/%s" % name)
self.addresses[name] = addr.decode("UTF-8")
print('All nodes checked in to Zookeeper.')
# create PAIR connections for each network link
self.neighbors = {}
self._allNodes = {}
for name in all_names:
# lower device establishes connection to avoid duplicate
socket = self.context.socket(zmq.PAIR)
if int(name) > int(self.getOwnName()):
socket.connect(self.getAddr(name))
else:
socket.bind('tcp://*:%d' % self._findPortFor(name))
self._allNodes[name] = socket
if name in self.topo[self.getOwnName()]:
self.neighbors[name] = socket
self.resetSyncInbox()
self.sync_cv = threading.Condition()
self.streams = {}
评论列表
文章目录