rest_proxy.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:kafka-rest 作者: gamechanger 项目源码 文件源码
def _encode_payload(schema_cache, topic, batch):
    value_schema = avro.schema.make_avsc_object(schema_cache[topic]['value'], avro.schema.Names())
    value_serializer = AvroJsonSerializer(value_schema)
    if schema_cache[topic].get('key') is not None:
        key_schema = avro.schema.make_avsc_object(schema_cache[topic]['key'], avro.schema.Names())
        key_serializer = AvroJsonSerializer(key_schema)

    body = {'records': [{'value': value_serializer.to_ordered_dict(message.value),
                         'key': key_serializer.to_ordered_dict(message.key) if message.key is not None else None,
                         'partition': message.partition}
                        for message in batch]}

    # The REST proxy's API requires us to double-encode the schemas.
    # Don't ask why, because I have no idea.
    if schema_cache[topic].get('value-id') is None:
        body['value_schema'] = json_encode(schema_cache[topic]['value'])
    else:
        body['value_schema_id'] = schema_cache[topic]['value-id']
    if schema_cache[topic].get('key') is not None:
        if schema_cache[topic].get('key-id') is None:
            body['key_schema'] = json_encode(schema_cache[topic]['key'])
        else:
            body['key_schema_id'] = schema_cache[topic]['key-id']

    return json_encode(body)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号