def test_child_processes_do_not_survive_an_exception(self, producer_instance, message):
with pytest.raises(RandomException), producer_instance as producer:
producer.publish(message)
producer.flush()
producer.publish(message)
raise RandomException()
assert len(multiprocessing.active_children()) == 0
评论列表
文章目录