def feed_string(
cls, # type: Any
string, # type: Union[bytes, bytearray, str]
sizeless=False, # type: bool
compressions=None # type: Union[None, Iterable[int]]
): # type: (...) -> InternalMessage
"""Constructs a :py:class:`~py2p.messages.InternalMessage` from a string
or :py:class:`bytes` object.
Args:
string: The string you wish to parse
sizeless: A :py:class:`bool` which describes whether this
string has its size header (default: it does)
compressions: A iterable containing the standardized compression
methods this message might be under
(default: ``[]``)
Returns:
A :py:class:`~py2p.messages.InternalMessage` from the given string
Raises:
AttributeError: Fed a non-string, non-:py:class:`bytes` argument
AssertionError: Initial size header is incorrect
ValueError: Unrecognized compression method fed in compressions
IndexError: Packet headers are incorrect OR
unrecognized compression
"""
# First section checks size header
_string = cls.__sanitize_string(string, sizeless)
# Then we attempt to decompress
_string, compression_fail = cls.__decompress_string(
_string, compressions)
id_ = _string[0:32]
serialized = _string[32:]
checksum = sha256(serialized).digest()
assert id_ == checksum, "Checksum failed: {} != {}".format(
id_, checksum)
packets = unpackb(serialized)
msg = cls(
packets[0], packets[1], packets[3:], compression=compressions)
msg.time = packets[2]
msg.compression_fail = compression_fail
msg._InternalMessage__id = checksum
msg._InternalMessage__string = serialized
# msg.__string = _string
return msg
评论列表
文章目录