def test_executes_events(self):
queue = Queue()
first_queue_mock = Mock()
queue_name = "queue"
multi_thread_handler = MultiThreadHandler(queue)
multi_thread_handler.add_function(queue_name, first_queue_mock)
event_count = 10
for i in range(0, 10):
queue.put_nowait(Event(queue_name, "some data"))
try:
multi_thread_handler.start()
wait_until_success(lambda: self.assertEqual(first_queue_mock.call_count, event_count))
thread_count = get_config()["handlers"]["multi_thread"]["thread_count"]
self.assertTrue(threading.active_count() >= thread_count + 1)
finally:
multi_thread_handler.stop()
评论列表
文章目录