test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:storjspec 作者: StorjRND 项目源码 文件源码
def test_flood(self):

        # every node subscribes and should receive  the event
        topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32)))
        for peer in self.swarm:
            peer.pubsub_subscribe(topic)

        # wait until subscriptions propagate
        time.sleep(SLEEP_TIME)

        # send event
        peer = random.choice(self.swarm)
        event = binascii.hexlify(os.urandom(32))
        peer.pubsub_publish(topic, event)

        # wait until event propagates
        time.sleep(SLEEP_TIME)

        # check all peers received the event
        for peer in self.swarm:
            events = peer.pubsub_events(topic)
            self.assertEqual(events, [event])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号