def start(self):
'''
Startup the zmq consumer.
'''
zmq_uri = '{protocol}://{address}:{port}'.format(
protocol=self.protocol,
address=self.address,
port=self.port
) if self.port else\
'{protocol}://{address}'.format( # noqa
protocol=self.protocol,
address=self.address
)
log.debug('ZMQ URI: %s', zmq_uri)
self.ctx = zmq.Context()
if hasattr(zmq, self.type):
skt_type = getattr(zmq, self.type)
else:
skt_type = zmq.PULL
self.sub = self.ctx.socket(skt_type)
self.sub.connect(zmq_uri)
if self.hwm is not None:
try:
self.sub.setsockopt(zmq.HWM, self.hwm)
except AttributeError:
self.sub.setsockopt(zmq.RCVHWM, self.hwm)
if self.recvtimeout is not None:
log.debug('Setting RCVTIMEO to %d', self.recvtimeout)
self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout)
if self.keepalive is not None:
log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive)
self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive)
if self.keepalive_idle is not None:
log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle)
if self.keepalive_interval is not None:
log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
评论列表
文章目录