def consume_messages(cls, **kwargs):
def job(consumer_, redis_client_, redis_channel_):
for msg in consumer_.poll():
message = msg.value
logger.info(ujson.loads(message))
redis_client_.publish(redis_channel_, message)
def shut_down(consumer_):
consumer_.shut_down()
# get consumer
kafka_broker = kwargs.get(KAFKA_BROKER) or DEFAULT_KAFKA_BROKER
kafka_topic = kwargs.get(KAFKA_OUTPUT_TOPIC) or DEFAULT_KAFKA_OUTPUT_TOPIC
consumer = Consumer(kafka_broker, kafka_topic)
# get redis
redis_channel = kwargs.get(REDIS_CHANNEL) or DEFAULT_REDIS_CHANNEL
redis_host = kwargs.get(REDIS_HOST) or DEFAULT_REDIS_HOST
redis_port = kwargs.get(REDIS_PORT) or DEFAULT_REDIS_PORT
redis_client = redis.StrictRedis(host=redis_host, port=redis_port)
atexit.register(shut_down, consumer)
scheduler = Scheduler(1, job, consumer, redis_client, redis_channel)
scheduler.run()
评论列表
文章目录