Messager.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:RPi-distributed-ML 作者: revan 项目源码 文件源码
def __init__(self):
        self.loop = ioloop.IOLoop.current()
        # load topography from file
        self._loadTopology()

        self.context = zmq.Context()

        self.zk = KazooClient()
        self.zk.start(timeout=1000)

        # send own address to zookeeper
        self.zk.ensure_path("/addr")
        self.zk.create(("/addr/%s" % self.getOwnName()), bytes(self.getOwnAddr(), "UTF-8"))

        # get IP addresses from zookeeper
        all_names = {k for k in self.topo.keys() if k.isnumeric() and k != self.getOwnName()}
        self.addresses = {}
        for name in all_names:
            cv = threading.Condition()
            cv.acquire()

            def wakeup_watch(stat):
                cv.acquire()
                cv.notify()
                cv.release()

            ex = self.zk.exists(("/addr/%s" % name), wakeup_watch)
            if not ex:
                cv.wait()
            (addr, _) = self.zk.get("/addr/%s" % name)
            self.addresses[name] = addr.decode("UTF-8")

        print('All nodes checked in to Zookeeper.')

        # create PAIR connections for each network link
        self.neighbors = {}
        self._allNodes = {}
        for name in all_names:
            # lower device establishes connection to avoid duplicate
            socket = self.context.socket(zmq.PAIR)
            if int(name) > int(self.getOwnName()):
                socket.connect(self.getAddr(name))
            else:
                socket.bind('tcp://*:%d' % self._findPortFor(name))

            self._allNodes[name] = socket
            if name in self.topo[self.getOwnName()]:
                self.neighbors[name] = socket

        self.resetSyncInbox()
        self.sync_cv = threading.Condition()

        self.streams = {}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号