def handle_notification_connection(self):
self.incoming = ssl.MemoryBIO()
self.outgoing = ssl.MemoryBIO()
try:
if bluetooth_support_kitkat:
notif_tls_ctx_kitkat_bt.load_verify_locations(cadata=parse_authorized_certs())
self.tls_bio = notif_tls_ctx_kitkat_bt.wrap_bio(incoming=self.incoming, outgoing=self.outgoing,
server_side=True)
else:
notif_tls_ctx.load_verify_locations(cadata=parse_authorized_certs())
self.tls_bio = notif_tls_ctx.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True)
self.do_handshake()
except Exception as e:
print_with_timestamp('(Bluetooth) Failed TLS handshake notif_conn: {}'.format(e))
return
# one recv should not take longer than 10 sec
self.socket.settimeout(10)
notification_flags_size = struct.unpack('>I', recvall(self.socket, 4))[0]
notification_flags_encrypted = recvall(self.socket, notification_flags_size)
notification_flags = struct.unpack('>B', self.tls_decrypt(notification_flags_encrypted))[0]
include_title = chkflags(notification_flags, FLAG_INCLUDE_TITLE)
include_message = chkflags(notification_flags, FLAG_INCLUDE_MESSAGE)
include_icon = chkflags(notification_flags, FLAG_INCLUDE_ICON)
title = ''
message = ''
if include_title or include_message:
title_and_or_message_size = struct.unpack('>I', recvall(self.socket, 4))[0]
title_and_or_message_encrypted = recvall(self.socket, title_and_or_message_size)
title_and_or_message = self.tls_decrypt(title_and_or_message_encrypted).decode()
if include_title:
title = title_and_or_message.split('|||')[0]
if include_message:
message = title_and_or_message.split('|||')[1]
if include_icon:
icon_tmp_file = tempfile.NamedTemporaryFile(buffering=0, dir=TMP_DIR_PATH)
icon_size = struct.unpack('>I', recvall(self.socket, 4))[0]
icon_encrypted = recvall(self.socket, icon_size)
icon = self.tls_decrypt(icon_encrypted)
try:
icon_tmp_file.write(icon)
Notification(title, message, hashlib.sha256(title.encode() + message.encode() + icon).digest(),
icon_tmp_file).show()
except Exception:
Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
else:
Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
评论列表
文章目录