def __init__(self, context, location, partition_id, identity, seq_warnings=False, hwm=1000):
self.subscriber = context.zeromq.socket(zmq.SUB)
self.subscriber.connect(location)
self.subscriber.set(zmq.RCVHWM, hwm)
filter = identity + pack('>B', partition_id) if partition_id is not None else identity
self.subscriber.setsockopt(zmq.SUBSCRIBE, filter)
self.counter = 0
self.count_global = partition_id is None
self.logger = getLogger("distributed_frontera.messagebus.zeromq.Consumer(%s-%s)" % (identity, partition_id))
self.seq_warnings = seq_warnings
self.stats = context.stats
self.stat_key = "consumer-%s" % identity
self.stats[self.stat_key] = 0
评论列表
文章目录