def kafka(TOPIC=None):
# Lazy init of the Kafka producer
#
global PRODUCER
if PRODUCER is None:
PRODUCER = KafkaProducer(
bootstrap_servers=KAFKA_BOOSTRAP_SERVERS,
sasl_mechanism=KAFKA_SASL_MECHANISM,
sasl_plain_username=KAFKA_USER,
sasl_plain_password=KAFKA_PASSWORD)
try:
future = PRODUCER.send(TOPIC, request.get_data())
future.get(timeout=60)
return "OK", 200, None
except KafkaTimeoutError:
return "Internal Server Error", 500, None
评论列表
文章目录