def pack(self, message, ascii_encoded=False):
"""Packs a message for transport as described in y/cep342.
Use :func:`unpack` to decode the packed message.
Args:
message (data_pipeline.message.Message): The message to pack
ascii_decoded (Optional[bool]): Set to True if message is not valid ASCII
Returns:
bytes: Avro byte string prepended by magic envelope version byte
The initial "magic byte" is meant to specify the envelope schema version.
See y/cep342 for details. In other words, the version number of the current
schema is the null byte. In the event we need to add additional envelope
versions, we'll use this byte to identify it.
In addition, the "magic byte" is used as a protocol to encode the serialized
message in base64. See DATAPIPE-1350 for more detail. This option has been
added because as of now, yelp_clog only supports sending valid ASCII strings.
Producer/Consumer registration will make use of this to instead send base64
encoded strings.
"""
msg = bytes(0) + self._avro_string_writer.encode(message.avro_repr)
if ascii_encoded:
return self.ASCII_MAGIC_BYTE + base64.urlsafe_b64encode(msg)
else:
return msg
评论列表
文章目录