handler.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:Stock-Visualizer 作者: saguo 项目源码 文件源码
def consume_messages(cls, **kwargs):
        def job(consumer_, redis_client_, redis_channel_):
            for msg in consumer_.poll():
                message = msg.value
                logger.info(ujson.loads(message))
                redis_client_.publish(redis_channel_, message)

        def shut_down(consumer_):
            consumer_.shut_down()

        # get consumer
        kafka_broker = kwargs.get(KAFKA_BROKER) or DEFAULT_KAFKA_BROKER
        kafka_topic = kwargs.get(KAFKA_OUTPUT_TOPIC) or DEFAULT_KAFKA_OUTPUT_TOPIC
        consumer = Consumer(kafka_broker, kafka_topic)

        # get redis
        redis_channel = kwargs.get(REDIS_CHANNEL) or DEFAULT_REDIS_CHANNEL
        redis_host = kwargs.get(REDIS_HOST) or DEFAULT_REDIS_HOST
        redis_port = kwargs.get(REDIS_PORT) or DEFAULT_REDIS_PORT
        redis_client = redis.StrictRedis(host=redis_host, port=redis_port)

        atexit.register(shut_down, consumer)

        scheduler = Scheduler(1, job, consumer, redis_client, redis_channel)
        scheduler.run()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号