broker.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码
def __init__(self, address, base_port):
        self.ctx = zmq.Context()
        self.loop = IOLoop.instance()
        self.stats = {
            'started': time(),
            'spiders_out_recvd': 0,
            'spiders_in_recvd': 0,
            'db_in_recvd': 0,
            'db_out_recvd': 0,
            'sw_in_recvd': 0,
            'sw_out_recvd': 0
        }

        socket_config = SocketConfig(address, base_port)

        if socket_config.is_ipv6:
            self.ctx.setsockopt(zmq.IPV6, True)

        spiders_in_s = self.ctx.socket(zmq.XPUB)
        spiders_out_s = self.ctx.socket(zmq.XSUB)
        sw_in_s = self.ctx.socket(zmq.XPUB)
        sw_out_s = self.ctx.socket(zmq.XSUB)
        db_in_s = self.ctx.socket(zmq.XPUB)
        db_out_s = self.ctx.socket(zmq.XSUB)

        spiders_in_s.bind(socket_config.spiders_in())
        spiders_out_s.bind(socket_config.spiders_out())
        sw_in_s.bind(socket_config.sw_in())
        sw_out_s.bind(socket_config.sw_out())
        db_in_s.bind(socket_config.db_in())
        db_out_s.bind(socket_config.db_out())

        self.spiders_in = ZMQStream(spiders_in_s)
        self.spiders_out = ZMQStream(spiders_out_s)
        self.sw_in = ZMQStream(sw_in_s)
        self.sw_out = ZMQStream(sw_out_s)
        self.db_in = ZMQStream(db_in_s)
        self.db_out = ZMQStream(db_out_s)

        self.spiders_out.on_recv(self.handle_spiders_out_recv)
        self.sw_out.on_recv(self.handle_sw_out_recv)
        self.db_out.on_recv(self.handle_db_out_recv)

        self.sw_in.on_recv(self.handle_sw_in_recv)
        self.db_in.on_recv(self.handle_db_in_recv)
        self.spiders_in.on_recv(self.handle_spiders_in_recv)
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
        self.logger = logging.getLogger("distributed_frontera.messagebus"
                                        ".zeromq.broker.Server")
        self.logger.info("Using socket: {}:{}".format(socket_config.ip_addr,
                                                      socket_config.base_port))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号