def message_received(self, unwrapped_message):
"""
message is of type UnwrappedMessage
"""
self._batch.append(unwrapped_message) # [(destination, sphinx_packet)
if len(self._batch) >= self.threshold_count:
delay = self._sys_rand.randint(0, self.max_delay)
action = start_action(
action_type=u"send delayed message batch",
delay=delay,
)
with action.context():
released = self._batch
self._batch = []
random.shuffle(released)
d = deferLater(self.reactor, delay, self.batch_send, released)
DeferredContext(d).addActionFinish()
self._pending_batch_sends.add(d)
def _remove(res, d=d):
self._pending_batch_sends.remove(d)
return res
d.addBoth(_remove)
评论列表
文章目录