def __init__(self, address, base_port):
self.ctx = zmq.Context()
self.loop = IOLoop.instance()
self.stats = {
'started': time(),
'spiders_out_recvd': 0,
'spiders_in_recvd': 0,
'db_in_recvd': 0,
'db_out_recvd': 0,
'sw_in_recvd': 0,
'sw_out_recvd': 0
}
socket_config = SocketConfig(address, base_port)
if socket_config.is_ipv6:
self.ctx.setsockopt(zmq.IPV6, True)
spiders_in_s = self.ctx.socket(zmq.XPUB)
spiders_out_s = self.ctx.socket(zmq.XSUB)
sw_in_s = self.ctx.socket(zmq.XPUB)
sw_out_s = self.ctx.socket(zmq.XSUB)
db_in_s = self.ctx.socket(zmq.XPUB)
db_out_s = self.ctx.socket(zmq.XSUB)
spiders_in_s.bind(socket_config.spiders_in())
spiders_out_s.bind(socket_config.spiders_out())
sw_in_s.bind(socket_config.sw_in())
sw_out_s.bind(socket_config.sw_out())
db_in_s.bind(socket_config.db_in())
db_out_s.bind(socket_config.db_out())
self.spiders_in = ZMQStream(spiders_in_s)
self.spiders_out = ZMQStream(spiders_out_s)
self.sw_in = ZMQStream(sw_in_s)
self.sw_out = ZMQStream(sw_out_s)
self.db_in = ZMQStream(db_in_s)
self.db_out = ZMQStream(db_out_s)
self.spiders_out.on_recv(self.handle_spiders_out_recv)
self.sw_out.on_recv(self.handle_sw_out_recv)
self.db_out.on_recv(self.handle_db_out_recv)
self.sw_in.on_recv(self.handle_sw_in_recv)
self.db_in.on_recv(self.handle_db_in_recv)
self.spiders_in.on_recv(self.handle_spiders_in_recv)
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
self.logger = logging.getLogger("distributed_frontera.messagebus"
".zeromq.broker.Server")
self.logger.info("Using socket: {}:{}".format(socket_config.ip_addr,
socket_config.base_port))
评论列表
文章目录