python类indexbytes()的实例源码

serialization.py 文件源码 项目:quickstart-git2s3 作者: aws-quickstart 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _ssh_read_next_string(decoded_data)
    data, rest = _ssh_read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
decoder.py 文件源码 项目:go2mapillary 作者: enricofer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:go2mapillary 作者: enricofer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
serialization.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
padding.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def finalize(self):
        if self._buffer is None:
            raise AlreadyFinalized("Context was already finalized.")

        if len(self._buffer) != self.block_size // 8:
            raise ValueError("Invalid padding bytes.")

        valid = _lib.Cryptography_check_pkcs7_padding(
            self._buffer, self.block_size // 8
        )

        if not valid:
            raise ValueError("Invalid padding bytes.")

        pad_size = six.indexbytes(self._buffer, -1)
        res = self._buffer[:-pad_size]
        self._buffer = None
        return res
decoder.py 文件源码 项目:aws_lambda_ortools 作者: tunamonster 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:aws_lambda_ortools 作者: tunamonster 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
serialization.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend):
    curve_name, rest = _read_next_string(decoded_data)
    data, rest = _read_next_string(rest)

    if expected_key_type != b"ecdsa-sha2-" + curve_name:
        raise ValueError(
            'Key header and key body contain different key type values.'
        )

    if rest:
        raise ValueError('Key body contains extra bytes.')

    curve = {
        b"nistp256": ec.SECP256R1,
        b"nistp384": ec.SECP384R1,
        b"nistp521": ec.SECP521R1,
    }[curve_name]()

    if six.indexbytes(data, 0) != 4:
        raise NotImplementedError(
            "Compressed elliptic curve points are not supported"
        )

    numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data)
    return numbers.public_key(backend)
decoder.py 文件源码 项目:rpcDemo 作者: Tangxinwei 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:rpcDemo 作者: Tangxinwei 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
bgp.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serialize(self):
        # fixup
        byte_length = (self.length + 7) // 8
        bin_addr = self._to_bin(self.addr)
        if (self.length % 8) == 0:
            bin_addr = bin_addr[:byte_length]
        else:
            # clear trailing bits in the last octet.
            # rfc doesn't require this.
            mask = 0xff00 >> (self.length % 8)
            last_byte = six.int2byte(
                six.indexbytes(bin_addr, byte_length - 1) & mask)
            bin_addr = bin_addr[:byte_length - 1] + last_byte
        self.addr = self._from_bin(bin_addr)

        buf = bytearray()
        msg_pack_into(self._PACK_STR, buf, 0, self.length)
        return buf + bytes(bin_addr)
bfd.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parser(cls, buf):
        (diag, flags, detect_mult, length, my_discr, your_discr,
         desired_min_tx_interval, required_min_rx_interval,
         required_min_echo_rx_interval) = \
            struct.unpack_from(cls._PACK_STR, buf[:cls._PACK_STR_LEN])

        ver = diag >> 5
        diag = diag & 0x1f
        state = flags >> 6
        flags = flags & 0x3f

        if flags & BFD_FLAG_AUTH_PRESENT:
            auth_type = six.indexbytes(buf, cls._PACK_STR_LEN)
            auth_cls = cls._auth_parsers[auth_type].\
                parser(buf[cls._PACK_STR_LEN:])[0]
        else:
            auth_cls = None

        msg = cls(ver, diag, state, flags, detect_mult,
                  my_discr, your_discr, desired_min_tx_interval,
                  required_min_rx_interval, required_min_echo_rx_interval,
                  auth_cls)

        return msg, None, None
decoder.py 文件源码 项目:googleURLParser 作者: randomaccess3 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _VarintDecoder(mask, result_type):
  """Return an encoder for a basic varint value (does not include tag).

  Decoded values will be bitwise-anded with the given mask before being
  returned, e.g. to limit them to 32 bits.  The returned decoder does not
  take the usual "end" parameter -- the caller is expected to do bounds checking
  after the fact (often the caller can defer such checking until later).  The
  decoder returns a (value, new_pos) pair.
  """

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint
decoder.py 文件源码 项目:googleURLParser 作者: randomaccess3 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def ReadTag(buffer, pos):
  """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple.

  We return the raw bytes of the tag rather than decoding them.  The raw
  bytes can then be used to look up the proper decoder.  This effectively allows
  us to trade some work that would be done in pure-python (decoding a varint)
  for work that is done in C (searching for a byte string in a hash table).
  In a low-level language it would be much cheaper to decode the varint and
  use that, but not in Python.
  """

  start = pos
  while six.indexbytes(buffer, pos) & 0x80:
    pos += 1
  pos += 1
  return (buffer[start:pos], pos)


# --------------------------------------------------------------------
padding.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _byte_unpadding_check(buffer_, block_size, checkfn):
    if buffer_ is None:
        raise AlreadyFinalized("Context was already finalized.")

    if len(buffer_) != block_size // 8:
        raise ValueError("Invalid padding bytes.")

    valid = checkfn(buffer_, block_size // 8)

    if not valid:
        raise ValueError("Invalid padding bytes.")

    pad_size = six.indexbytes(buffer_, -1)
    return buffer_[:-pad_size]
padding.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _byte_unpadding_check(buffer_, block_size, checkfn):
    if buffer_ is None:
        raise AlreadyFinalized("Context was already finalized.")

    if len(buffer_) != block_size // 8:
        raise ValueError("Invalid padding bytes.")

    valid = checkfn(buffer_, block_size // 8)

    if not valid:
        raise ValueError("Invalid padding bytes.")

    pad_size = six.indexbytes(buffer_, -1)
    return buffer_[:-pad_size]
decoder.py 文件源码 项目:deoplete-asm 作者: zchee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _SignedVarintDecoder(bits, result_type):
  """Like _VarintDecoder() but decodes signed values."""

  signbit = 1 << (bits - 1)
  mask = (1 << bits) - 1

  def DecodeVarint(buffer, pos):
    result = 0
    shift = 0
    while 1:
      b = six.indexbytes(buffer, pos)
      result |= ((b & 0x7f) << shift)
      pos += 1
      if not (b & 0x80):
        result &= mask
        result = (result ^ signbit) - signbit
        result = result_type(result)
        return (result, pos)
      shift += 7
      if shift >= 64:
        raise _DecodeError('Too many bytes when decoding varint.')
  return DecodeVarint

# We force 32-bit values to int and 64-bit values to long to make
# alternate implementations where the distinction is more significant
# (e.g. the C++ implementation) simpler.
serialization.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _ssh_write_mpint(value):
    data = utils.int_to_bytes(value)
    if six.indexbytes(data, 0) & 0x80:
        data = b"\x00" + data
    return _ssh_write_string(data)
hotp.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _dynamic_truncate(self, counter):
        ctx = hmac.HMAC(self._key, self._algorithm, self._backend)
        ctx.update(struct.pack(">Q", counter))
        hmac_value = ctx.finalize()

        offset = six.indexbytes(hmac_value, len(hmac_value) - 1) & 0b1111
        p = hmac_value[offset:offset + 4]
        return struct.unpack(">I", p)[0] & 0x7fffffff
test_frame.py 文件源码 项目:dataplicity-lomond 作者: wildfoundry 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_binary_output_length_smaller_or_equal_127(frame_factory, monkeypatch):
    # this unit test is very similar to previous one, the difference being of
    # course the length of the packet in question
    monkeypatch.setattr(
        'lomond.frame.make_masking_key', lambda: b'\x00\x00\x00\x00')
    frame = frame_factory(Opcode.BINARY, payload=b'\x01' * 127)
    frame_binary = frame.to_bytes()

    expected_length = 1 + 1 + 2 + 4 + 127
    #                 ^   ^   ^   ^    ^
    #                 |   |   |   |    +-- payload length     (127 bytes)
    #                 |   |   |   +------- mask length        (4 bytes  )
    #                 |   |   +----------- length as uint16_t (2 bytes  )
    #                 |   +--------------- masking byte
    #                 +------------------- opcode byte
    assert len(frame_binary) == expected_length

    # the first byte doesn't change at all, so please look inside the previous
    # function for in-depth explanation
    assert six.indexbytes(frame_binary, 0) == 0b10000010
    # in the second byte of the header, length field should be set to 126 to
    # indicate a payload of length which should be encoded as uint16_t
    # just for the fun of it, we can decode the actual length value:
    _length = six.indexbytes(frame_binary, 1) & 0b01111111
    assert _length == 126
    assert frame_binary[4:8] == b'\x00\x00\x00\x00'
    assert frame_binary[8:] == frame.payload


问题


面经


文章

微信
公众号

扫码关注公众号