def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
python类int2byte()的实例源码
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def _SignedVarintEncoder():
"""Return an encoder for a basic signed varint value (does not include
tag)."""
def EncodeSignedVarint(write, value):
if value < 0:
value += (1 << 64)
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeSignedVarint
def check(self, fmt, value):
from random import randrange
# build a buffer which is surely big enough to contain what we need
# and check:
# 1) that we correctly write the bytes we expect
# 2) that we do NOT write outside the bounds
#
pattern = [six.int2byte(randrange(256)) for _ in range(256)]
pattern = b''.join(pattern)
buf = bytearray(pattern)
buf2 = bytearray(pattern)
offset = 16
pack_into(ord(fmt), buf, offset, value)
struct.pack_into(fmt, buf2, offset, value)
assert buf == buf2
#
# check that it raises if it's out of bound
out_of_bound = 256-struct.calcsize(fmt)+1
pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def _bytes_text(code_point_iter, quote, prefix=b'', suffix=b''):
quote_code_point = six.byte2int(quote)
buf = BytesIO()
buf.write(prefix)
buf.write(quote)
for code_point in code_point_iter:
if code_point == quote_code_point:
buf.write(b'\\' + quote)
elif code_point == six.byte2int(b'\\'):
buf.write(b'\\\\')
elif _is_printable_ascii(code_point):
buf.write(six.int2byte(code_point))
else:
buf.write(_escape(code_point))
buf.write(quote)
buf.write(suffix)
return buf.getvalue()
def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def _truncate_digest(digest, order_bits):
digest_len = len(digest)
if 8 * digest_len > order_bits:
digest_len = (order_bits + 7) // 8
digest = digest[:digest_len]
if 8 * digest_len > order_bits:
rshift = 8 - (order_bits & 0x7)
assert 0 < rshift < 8
mask = 0xFF >> rshift << rshift
# Set the bottom rshift bits to 0
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
return digest
def encode(self, b64=False, always_bytes=True):
"""Encode the packet for transmission."""
if self.binary and not b64:
encoded_packet = six.int2byte(self.packet_type)
else:
encoded_packet = six.text_type(self.packet_type)
if self.binary and b64:
encoded_packet = 'b' + encoded_packet
if self.binary:
if b64:
encoded_packet += base64.b64encode(self.data).decode('utf-8')
else:
encoded_packet += self.data
elif isinstance(self.data, six.string_types):
encoded_packet += self.data
elif isinstance(self.data, dict) or isinstance(self.data, list):
encoded_packet += self.json.dumps(self.data,
separators=(',', ':'))
elif self.data is not None:
encoded_packet += str(self.data)
if always_bytes and not isinstance(encoded_packet, binary_types):
encoded_packet = encoded_packet.encode('utf-8')
return encoded_packet
def encode(self, b64=False):
"""Encode the payload for transmission."""
encoded_payload = b''
for pkt in self.packets:
encoded_packet = pkt.encode(b64=b64)
packet_len = len(encoded_packet)
if b64:
encoded_payload += str(packet_len).encode('utf-8') + b':' + \
encoded_packet
else:
binary_len = b''
while packet_len != 0:
binary_len = six.int2byte(packet_len % 10) + binary_len
packet_len = int(packet_len / 10)
if not pkt.binary:
encoded_payload += b'\0'
else:
encoded_payload += b'\1'
encoded_payload += binary_len + b'\xff' + encoded_packet
return encoded_payload
def _initialize_telnet(connection):
logger.info('Initializing telnet connection')
# Iac Do Linemode
connection.send(IAC + DO + LINEMODE)
# Suppress Go Ahead. (This seems important for Putty to do correct echoing.)
# This will allow bi-directional operation.
connection.send(IAC + WILL + SUPPRESS_GO_AHEAD)
# Iac sb
connection.send(IAC + SB + LINEMODE + MODE + int2byte(0) + IAC + SE)
# IAC Will Echo
connection.send(IAC + WILL + ECHO)
# Negotiate window size
connection.send(IAC + DO + NAWS)
def _initialize_telnet(connection):
logger.info('Initializing telnet connection')
# Iac Do Linemode
connection.send(IAC + DO + LINEMODE)
# Suppress Go Ahead. (This seems important for Putty to do correct echoing.)
# This will allow bi-directional operation.
connection.send(IAC + WILL + SUPPRESS_GO_AHEAD)
# Iac sb
connection.send(IAC + SB + LINEMODE + MODE + int2byte(0) + IAC + SE)
# IAC Will Echo
connection.send(IAC + WILL + ECHO)
# Negotiate window size
connection.send(IAC + DO + NAWS)
def _SignedVarintEncoder():
"""Return an encoder for a basic signed varint value (does not include
tag)."""
def EncodeSignedVarint(write, value):
if value < 0:
value += (1 << 64)
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeSignedVarint
def _SignedVarintEncoder():
"""Return an encoder for a basic signed varint value (does not include
tag)."""
def EncodeSignedVarint(write, value):
if value < 0:
value += (1 << 64)
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeSignedVarint
def _SignedVarintEncoder():
"""Return an encoder for a basic signed varint value (does not include
tag)."""
def EncodeSignedVarint(write, value):
if value < 0:
value += (1 << 64)
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeSignedVarint
def _initialize_telnet(connection):
logger.info('Initializing telnet connection')
# Iac Do Linemode
connection.send(IAC + DO + LINEMODE)
# Suppress Go Ahead. (This seems important for Putty to do correct echoing.)
# This will allow bi-directional operation.
connection.send(IAC + WILL + SUPPRESS_GO_AHEAD)
# Iac sb
connection.send(IAC + SB + LINEMODE + MODE + int2byte(0) + IAC + SE)
# IAC Will Echo
connection.send(IAC + WILL + ECHO)
# Negotiate window size
connection.send(IAC + DO + NAWS)
def test_getbytes(self):
# Test getbytes method.
all_bytes = b''.join(six.int2byte(n) for n in range(256))
with self.fs.open('foo', 'wb') as f:
f.write(all_bytes)
self.assertEqual(self.fs.getbytes('foo'), all_bytes)
_all_bytes = self.fs.getbytes('foo')
self.assertIsInstance(_all_bytes, bytes)
self.assertEqual(_all_bytes, all_bytes)
with self.assertRaises(errors.ResourceNotFound):
self.fs.getbytes('foo/bar')
self.fs.makedir('baz')
with self.assertRaises(errors.FileExpected):
self.fs.getbytes('baz')
def _send_command(self, command, parameters):
try:
if self._serial.inWaiting() != 0:
raise SIReaderException(
'Input buffer must be empty before sending command. Currently %s bytes in the input buffer.' % self._serial.inWaiting())
command_string = command + int2byte(len(parameters)) + parameters
crc = SIReader._crc(command_string)
cmd = SIReader.STX + command_string + crc + SIReader.ETX
if self._debug:
print("==>> command '%s', parameters %s, crc %s" % (hexlify(command).decode('ascii'),
' '.join(
[hexlify(int2byte(c)).decode('ascii') for c in
parameters]),
hexlify(crc).decode('ascii'),
))
self._serial.write(cmd)
except (SerialException, OSError) as msg:
raise SIReaderException('Could not send command: %s' % msg)
if self._logfile:
self._logfile.write('s %s %s\n' % (datetime.now(), cmd))
self._logfile.flush()
os.fsync(self._logfile)
return self._read_command()