def close(self):
try:
logger.debug("Starting to close pooled producer")
super(PooledKafkaProducer, self).close()
assert self.message_buffer_size == 0
logger.debug("Closing the pool")
self.pool.close()
logger.debug("Pool is closed.")
except:
logger.error("Exception occurred when closing pooled producer.")
raise
finally:
# The processes in the pool should be cleaned up in all cases. The
# exception will be re-thrown if there is one.
#
# Joining pools can be flaky in CPython 2.6, and the message buffer
# size is zero here, so terminating the pool is safe and ensure that
# join always works.
self.pool.terminate()
self.pool.join()
评论列表
文章目录