def _do_ssl_handshake(self):
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
sslobj = ssl.SSLContext().wrap_bio(incoming, outgoing, False)
# do_handshake()
while True:
try:
sslobj.do_handshake()
except ssl.SSLWantReadError:
self._send_message(TDS_PRELOGIN, outgoing.read())
tag, _, _, buf = self._read_response_packet()
assert tag == TDS_PRELOGIN
incoming.write(buf)
else:
break
return sslobj, incoming, outgoing
python类MemoryBIO()的实例源码
def __init__(self, context, server_side, server_hostname=None):
"""
The *context* argument specifies the ssl.SSLContext to use.
The *server_side* argument indicates whether this is a server side or
client side transport.
The optional *server_hostname* argument can be used to specify the
hostname you are connecting to. You may only specify this parameter if
the _ssl module supports Server Name Indication (SNI).
"""
self._context = context
self._server_side = server_side
self._server_hostname = server_hostname
self._state = _UNWRAPPED
self._incoming = ssl.MemoryBIO()
self._outgoing = ssl.MemoryBIO()
self._sslobj = None
self._need_ssldata = False
self._handshake_cb = None
self._shutdown_cb = None
def __init__(self, context, server_side, server_hostname=None):
"""
The *context* argument specifies the ssl.SSLContext to use.
The *server_side* argument indicates whether this is a server side or
client side transport.
The optional *server_hostname* argument can be used to specify the
hostname you are connecting to. You may only specify this parameter if
the _ssl module supports Server Name Indication (SNI).
"""
self._context = context
self._server_side = server_side
self._server_hostname = server_hostname
self._state = _UNWRAPPED
self._incoming = ssl.MemoryBIO()
self._outgoing = ssl.MemoryBIO()
self._sslobj = None
self._need_ssldata = False
self._handshake_cb = None
self._shutdown_cb = None
def __init__(self, context, server_side, server_hostname=None):
"""
The *context* argument specifies the ssl.SSLContext to use.
The *server_side* argument indicates whether this is a server side or
client side transport.
The optional *server_hostname* argument can be used to specify the
hostname you are connecting to. You may only specify this parameter if
the _ssl module supports Server Name Indication (SNI).
"""
self._context = context
self._server_side = server_side
self._server_hostname = server_hostname
self._state = _UNWRAPPED
self._incoming = ssl.MemoryBIO()
self._outgoing = ssl.MemoryBIO()
self._sslobj = None
self._need_ssldata = False
self._handshake_cb = None
self._shutdown_cb = None
def __init__(self):
"""
initialize a new storage queue
"""
self.memorybio = ssl.MemoryBIO()
def _is_sslproto_available():
return hasattr(ssl, "MemoryBIO")
# States of an _SSLPipe.
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
waiter, *,
server_side=False, server_hostname=None,
extra=None, server=None):
# Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
# on Python 3.4 and older, when ssl.MemoryBIO is not available.
return _SelectorSslTransport(
self, rawsock, protocol, sslcontext, waiter,
server_side, server_hostname, extra, server)
def test_create_ssl_connection(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_match_failed(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_verified(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def __init__(self, ctx, sock, **kwargs):
self.incoming = ssl.MemoryBIO()
self.outgoing = ssl.MemoryBIO()
self.obj = ctx.wrap_bio(self.incoming, self.outgoing, **kwargs)
self.sock = sock
def __init__(
self,
transport_stream,
ssl_context,
*,
server_hostname=None,
server_side=False,
https_compatible=False,
max_refill_bytes=_default_max_refill_bytes
):
self.transport_stream = transport_stream
self._state = _State.OK
self._max_refill_bytes = max_refill_bytes
self._https_compatible = https_compatible
self._outgoing = _stdlib_ssl.MemoryBIO()
self._incoming = _stdlib_ssl.MemoryBIO()
self._ssl_object = ssl_context.wrap_bio(
self._incoming,
self._outgoing,
server_side=server_side,
server_hostname=server_hostname
)
# Tracks whether we've already done the initial handshake
self._handshook = _Once(self._do_handshake)
# These are used to synchronize access to self.transport_stream
self._inner_send_lock = _sync.StrictFIFOLock()
self._inner_recv_count = 0
self._inner_recv_lock = _sync.Lock()
# These are used to make sure that our caller doesn't attempt to make
# multiple concurrent calls to send_all/wait_send_all_might_not_block
# or to receive_some.
self._outer_send_conflict_detector = ConflictDetector(
"another task is currently sending data on this SSLStream"
)
self._outer_recv_conflict_detector = ConflictDetector(
"another task is currently receiving data on this SSLStream"
)
def _is_sslproto_available():
return hasattr(ssl, "MemoryBIO")
# States of an _SSLPipe.
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
waiter, *,
server_side=False, server_hostname=None,
extra=None, server=None):
# Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
# on Python 3.4 and older, when ssl.MemoryBIO is not available.
return _SelectorSslTransport(
self, rawsock, protocol, sslcontext, waiter,
server_side, server_hostname, extra, server)
def _is_sslproto_available():
return hasattr(ssl, "MemoryBIO")
# States of an _SSLPipe.
def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
waiter, *,
server_side=False, server_hostname=None,
extra=None, server=None):
# Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
# on Python 3.4 and older, when ssl.MemoryBIO is not available.
return _SelectorSslTransport(
self, rawsock, protocol, sslcontext, waiter,
server_side, server_hostname, extra, server)
def test_create_ssl_connection(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_match_failed(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_verified(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def init_tls(self):
self.log.debug("initializing TLS context...")
self.tls_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.tls_in = ssl.MemoryBIO()
self.tls_out = ssl.MemoryBIO()
self.tls = self.tls_ctx.wrap_bio(self.tls_in, self.tls_out, False, None)
def first_test():
sock = socket.create_connection(('google.com', 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.create_default_context()
ssl_obj = ctx.wrap_bio(incoming, outgoing)
try:
ssl_obj.write(b'')
except ssl.SSLWantReadError as err:
print("err", err)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
# print()
# print(outgoing.read())
data = outgoing.read()
# print("data", data)
sock.send(data)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
got = sock.recv(10240)
print('sock.recv got', len(got))
incoming.write(got)
# print(incoming.read())
# print(ssl_obj.read())
# print('sock.recv', got)
try:
# ssl_obj.write(got)
ssl_obj.do_handshake()
except ssl.SSLWantReadError as err:
print("err", err)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
def handle_notification_connection(self):
self.incoming = ssl.MemoryBIO()
self.outgoing = ssl.MemoryBIO()
try:
if bluetooth_support_kitkat:
notif_tls_ctx_kitkat_bt.load_verify_locations(cadata=parse_authorized_certs())
self.tls_bio = notif_tls_ctx_kitkat_bt.wrap_bio(incoming=self.incoming, outgoing=self.outgoing,
server_side=True)
else:
notif_tls_ctx.load_verify_locations(cadata=parse_authorized_certs())
self.tls_bio = notif_tls_ctx.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True)
self.do_handshake()
except Exception as e:
print_with_timestamp('(Bluetooth) Failed TLS handshake notif_conn: {}'.format(e))
return
# one recv should not take longer than 10 sec
self.socket.settimeout(10)
notification_flags_size = struct.unpack('>I', recvall(self.socket, 4))[0]
notification_flags_encrypted = recvall(self.socket, notification_flags_size)
notification_flags = struct.unpack('>B', self.tls_decrypt(notification_flags_encrypted))[0]
include_title = chkflags(notification_flags, FLAG_INCLUDE_TITLE)
include_message = chkflags(notification_flags, FLAG_INCLUDE_MESSAGE)
include_icon = chkflags(notification_flags, FLAG_INCLUDE_ICON)
title = ''
message = ''
if include_title or include_message:
title_and_or_message_size = struct.unpack('>I', recvall(self.socket, 4))[0]
title_and_or_message_encrypted = recvall(self.socket, title_and_or_message_size)
title_and_or_message = self.tls_decrypt(title_and_or_message_encrypted).decode()
if include_title:
title = title_and_or_message.split('|||')[0]
if include_message:
message = title_and_or_message.split('|||')[1]
if include_icon:
icon_tmp_file = tempfile.NamedTemporaryFile(buffering=0, dir=TMP_DIR_PATH)
icon_size = struct.unpack('>I', recvall(self.socket, 4))[0]
icon_encrypted = recvall(self.socket, icon_size)
icon = self.tls_decrypt(icon_encrypted)
try:
icon_tmp_file.write(icon)
Notification(title, message, hashlib.sha256(title.encode() + message.encode() + icon).digest(),
icon_tmp_file).show()
except Exception:
Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
else:
Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()