test_bus.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:watcher 作者: nosmokingbandit 项目源码 文件源码
def test_custom_channels(self):
        b = wspbus.Bus()

        self.responses, expected = [], []

        custom_listeners = ('hugh', 'louis', 'dewey')
        for channel in custom_listeners:
            for index, priority in enumerate([None, 10, 60, 40]):
                b.subscribe(channel,
                            self.get_listener(channel, index), priority)

        for channel in custom_listeners:
            b.publish(channel, 'ah so')
            expected.extend([msg % (i, channel, 'ah so')
                            for i in (1, 3, 0, 2)])
            b.publish(channel)
            expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])

        self.assertEqual(self.responses, expected)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号