def _encode_payload(schema_cache, topic, batch):
value_schema = avro.schema.make_avsc_object(schema_cache[topic]['value'], avro.schema.Names())
value_serializer = AvroJsonSerializer(value_schema)
if schema_cache[topic].get('key') is not None:
key_schema = avro.schema.make_avsc_object(schema_cache[topic]['key'], avro.schema.Names())
key_serializer = AvroJsonSerializer(key_schema)
body = {'records': [{'value': value_serializer.to_ordered_dict(message.value),
'key': key_serializer.to_ordered_dict(message.key) if message.key is not None else None,
'partition': message.partition}
for message in batch]}
# The REST proxy's API requires us to double-encode the schemas.
# Don't ask why, because I have no idea.
if schema_cache[topic].get('value-id') is None:
body['value_schema'] = json_encode(schema_cache[topic]['value'])
else:
body['value_schema_id'] = schema_cache[topic]['value-id']
if schema_cache[topic].get('key') is not None:
if schema_cache[topic].get('key-id') is None:
body['key_schema'] = json_encode(schema_cache[topic]['key'])
else:
body['key_schema_id'] = schema_cache[topic]['key-id']
return json_encode(body)
评论列表
文章目录