test_multi_thread_handler.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:wqueue 作者: waltsu 项目源码 文件源码
def test_executes_events(self):
        queue = Queue()

        first_queue_mock = Mock()
        queue_name = "queue"

        multi_thread_handler = MultiThreadHandler(queue)
        multi_thread_handler.add_function(queue_name, first_queue_mock)

        event_count = 10
        for i in range(0, 10):
            queue.put_nowait(Event(queue_name, "some data"))

        try:
            multi_thread_handler.start()
            wait_until_success(lambda: self.assertEqual(first_queue_mock.call_count, event_count))

            thread_count = get_config()["handlers"]["multi_thread"]["thread_count"]
            self.assertTrue(threading.active_count() >= thread_count + 1)
        finally:
            multi_thread_handler.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号