def test_flood(self):
# every node subscribes and should receive the event
topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32)))
for peer in self.swarm:
peer.pubsub_subscribe(topic)
# wait until subscriptions propagate
time.sleep(SLEEP_TIME)
# send event
peer = random.choice(self.swarm)
event = binascii.hexlify(os.urandom(32))
peer.pubsub_publish(topic, event)
# wait until event propagates
time.sleep(SLEEP_TIME)
# check all peers received the event
for peer in self.swarm:
events = peer.pubsub_events(topic)
self.assertEqual(events, [event])
评论列表
文章目录