python类indexbytes()的实例源码

serialization.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
serialization.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
decoder.py 文件源码 项目:deoplete-asm 作者: zchee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:deoplete-asm 作者: zchee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
serialization.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _ssh_read_next_string(decoded_data)
    data, rest = _ssh_read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
test_frame.py 文件源码 项目:dataplicity-lomond 作者: wildfoundry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_binary_output_length_larger_than_127(frame_factory, monkeypatch):
    # even shorter test, as we don't want to repeat ourselves.
    monkeypatch.setattr(
        'lomond.frame.make_masking_key', lambda: b'\x00\x00\x00\x00')
    large_length = 1 << 16
    frame = frame_factory(Opcode.BINARY, payload=b'\x01' * large_length)
    frame_binary = frame.to_bytes()
    expected_length = 1 + 1 + 8 + 4 + large_length
    #                 ^   ^   ^   ^    ^
    #                 |   |   |   |    +-- payload length     (128 bytes)
    #                 |   |   |   +------- mask length        (4 bytes  )
    #                 |   |   +----------- length as uint64_t (4 bytes  )
    #                 |   +--------------- masking byte
    #                 +------------------- opcode byte
    assert len(frame_binary) == expected_length
    assert six.indexbytes(frame_binary, 0) == 0b10000010
    _length = six.indexbytes(frame_binary, 1) & 0b01111111
    assert _length == 127
    # we can decode the real length as well:
    decoded_length = unpack('!Q', frame_binary[2:10])
    assert len(decoded_length) == 1
    assert decoded_length[0] == large_length
    assert frame_binary[10:14] == b'\x00\x00\x00\x00'
    assert frame_binary[14:] == frame.payload
reader_binary.py 文件源码 项目:ion-python 作者: amzn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _int_factory(sign, data):
    def parse_int():
        value = 0
        length = len(data)
        while length >= 8:
            segment = _rslice(data, length, 8)
            value <<= 64
            value |= unpack('>Q', segment)[0]
            length -= 8
        if length >= 4:
            segment = _rslice(data, length, 4)
            value <<= 32
            value |= unpack('>I', segment)[0]
            length -= 4
        if length >= 2:
            segment = _rslice(data, length, 2)
            value <<= 16
            value |= unpack('>H', segment)[0]
            length -= 2
        if length == 1:
            value <<= 8
            value |= six.indexbytes(data, -length)
        return sign * value
    return parse_int
reader.py 文件源码 项目:ion-python 作者: amzn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_byte(self):
        if self.__size < 1:
            raise IndexError('Buffer queue is empty')
        segments = self.__segments
        segment = segments[0]
        segment_len = len(segment)
        offset = self.__offset
        if BufferQueue.is_eof(segment):
            octet = _EOF
        else:
            octet = self.__ord(six.indexbytes(segment, offset))
        offset += 1
        if offset == segment_len:
            offset = 0
            segments.popleft()
        self.__offset = offset
        self.__size -= 1
        self.position += 1
        return octet
serialization.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
serialization.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
padding.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def finalize(self):
        if self._buffer is None:
            raise AlreadyFinalized("Context was already finalized.")

        if len(self._buffer) != self.block_size // 8:
            raise ValueError("Invalid padding bytes.")

        valid = _lib.Cryptography_check_pkcs7_padding(
            self._buffer, self.block_size // 8
        )

        if not valid:
            raise ValueError("Invalid padding bytes.")

        pad_size = six.indexbytes(self._buffer, -1)
        res = self._buffer[:-pad_size]
        self._buffer = None
        return res
serialization.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
serialization.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
padding.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def finalize(self):
        if self._buffer is None:
            raise AlreadyFinalized("Context was already finalized.")

        if len(self._buffer) != self.block_size // 8:
            raise ValueError("Invalid padding bytes.")

        valid = _lib.Cryptography_check_pkcs7_padding(
            self._buffer, self.block_size // 8
        )

        if not valid:
            raise ValueError("Invalid padding bytes.")

        pad_size = six.indexbytes(self._buffer, -1)
        res = self._buffer[:-pad_size]
        self._buffer = None
        return res
framing.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def extract_message(cls, raw_bytes):
        if b':' not in raw_bytes:
            if len(raw_bytes) > 10:
                raise FramingError(
                    'Length information missing: %s' % raw_bytes)
            return None, raw_bytes
        msg_len, rest = raw_bytes.split(b':', 1)
        try:
            msg_len = int(msg_len)
        except ValueError:
            raise FramingError('Invalid length: %s' % raw_bytes)
        if msg_len < 0:
            raise FramingError('Negative length: %s' % raw_bytes)
        if len(rest) < msg_len + 1:
            return None, raw_bytes
        else:
            if six.indexbytes(rest, msg_len) != 44:
                raise FramingError(
                    'Missing correct end marker: %s' % raw_bytes)
            return rest[:msg_len], rest[(msg_len + 1):]
framing.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def extract_message(cls, raw_bytes):
        if b':' not in raw_bytes:
            if len(raw_bytes) > 10:
                raise FramingError(
                    'Length information missing: %s' % raw_bytes)
            return None, raw_bytes
        msg_len, rest = raw_bytes.split(b':', 1)
        try:
            msg_len = int(msg_len)
        except ValueError:
            raise FramingError('Invalid length: %s' % raw_bytes)
        if msg_len < 0:
            raise FramingError('Negative length: %s' % raw_bytes)
        if len(rest) < msg_len + 1:
            return None, raw_bytes
        else:
            if six.indexbytes(rest, msg_len) != 44:
                raise FramingError(
                    'Missing correct end marker: %s' % raw_bytes)
            return rest[:msg_len], rest[(msg_len + 1):]
decoder.py 文件源码 项目:Vector-Tiles-Reader-QGIS-Plugin 作者: geometalab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:Vector-Tiles-Reader-QGIS-Plugin 作者: geometalab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
decoder.py 文件源码 项目:krpcScripts 作者: jwvanderbeck 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:krpcScripts 作者: jwvanderbeck 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
decoder.py 文件源码 项目:coremltools 作者: apple 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:coremltools 作者: apple 项目源码 文件源码 阅读 275 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
padding.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def finalize(self):
        if self._buffer is None:
            raise AlreadyFinalized("Context was already finalized.")

        if len(self._buffer) != self.block_size // 8:
            raise ValueError("Invalid padding bytes.")

        valid = lib.Cryptography_check_pkcs7_padding(
            self._buffer, self.block_size // 8
        )

        if not valid:
            raise ValueError("Invalid padding bytes.")

        pad_size = six.indexbytes(self._buffer, -1)
        res = self._buffer[:-pad_size]
        self._buffer = None
        return res
bgp.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def serialize(self):
        # fixup
        byte_length = (self.length + 7) // 8
        bin_addr = self._to_bin(self.addr)
        if (self.length % 8) == 0:
            bin_addr = bin_addr[:byte_length]
        else:
            # clear trailing bits in the last octet.
            # rfc doesn't require this.
            mask = 0xff00 >> (self.length % 8)
            last_byte = six.int2byte(
                six.indexbytes(bin_addr, byte_length - 1) & mask)
            bin_addr = bin_addr[:byte_length - 1] + last_byte
        self.addr = self._from_bin(bin_addr)

        buf = bytearray()
        msg_pack_into(self._PACK_STR, buf, 0, self.length)
        return buf + bytes(bin_addr)
bfd.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parser(cls, buf):
        (diag, flags, detect_mult, length, my_discr, your_discr,
         desired_min_tx_interval, required_min_rx_interval,
         required_min_echo_rx_interval) = \
            struct.unpack_from(cls._PACK_STR, buf[:cls._PACK_STR_LEN])

        ver = diag >> 5
        diag = diag & 0x1f
        state = flags >> 6
        flags = flags & 0x3f

        if flags & BFD_FLAG_AUTH_PRESENT:
            auth_type = six.indexbytes(buf, cls._PACK_STR_LEN)
            auth_cls = cls._auth_parsers[auth_type].\
                parser(buf[cls._PACK_STR_LEN:])[0]
        else:
            auth_cls = None

        msg = cls(ver, diag, state, flags, detect_mult,
                  my_discr, your_discr, desired_min_tx_interval,
                  required_min_rx_interval, required_min_echo_rx_interval,
                  auth_cls)

        return msg, None, None
serialization.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
serialization.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
padding.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def finalize(self):
        if self._buffer is None:
            raise AlreadyFinalized("Context was already finalized.")

        if len(self._buffer) != self.block_size // 8:
            raise ValueError("Invalid padding bytes.")

        valid = _lib.Cryptography_check_pkcs7_padding(
            self._buffer, self.block_size // 8
        )

        if not valid:
            raise ValueError("Invalid padding bytes.")

        pad_size = six.indexbytes(self._buffer, -1)
        res = self._buffer[:-pad_size]
        self._buffer = None
        return res
serialization.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _ssh_read_next_string(decoded_data)
    data, rest = _ssh_read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
serialization.py 文件源码 项目:RemoteTree 作者: deNULL 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _ssh_read_next_string(decoded_data)
    data, rest = _ssh_read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)


问题


面经


文章

微信
公众号

扫码关注公众号