def test_async_produce_thread_exception(self):
"""Ensure that an exception on a worker thread is raised to the main thread"""
consumer = self._get_consumer()
with self.assertRaises(AttributeError):
with self._get_producer(min_queued_messages=1) as producer:
# get some dummy data into the queue that will cause a crash
# when flushed:
msg = Message("stuff", partition_id=0)
del msg.value
producer._produce(msg)
while consumer.consume() is not None:
time.sleep(.05)
评论列表
文章目录