an2linuxserver.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:an2linuxserver 作者: rootkiwi 项目源码 文件源码
def handle_notification_connection(self):
        self.incoming = ssl.MemoryBIO()
        self.outgoing = ssl.MemoryBIO()
        try:
            if bluetooth_support_kitkat:
                notif_tls_ctx_kitkat_bt.load_verify_locations(cadata=parse_authorized_certs())
                self.tls_bio = notif_tls_ctx_kitkat_bt.wrap_bio(incoming=self.incoming, outgoing=self.outgoing,
                                                                server_side=True)
            else:
                notif_tls_ctx.load_verify_locations(cadata=parse_authorized_certs())
                self.tls_bio = notif_tls_ctx.wrap_bio(incoming=self.incoming, outgoing=self.outgoing, server_side=True)
            self.do_handshake()
        except Exception as e:
            print_with_timestamp('(Bluetooth) Failed TLS handshake notif_conn: {}'.format(e))
            return

        # one recv should not take longer than 10 sec
        self.socket.settimeout(10)

        notification_flags_size = struct.unpack('>I', recvall(self.socket, 4))[0]
        notification_flags_encrypted = recvall(self.socket, notification_flags_size)
        notification_flags = struct.unpack('>B', self.tls_decrypt(notification_flags_encrypted))[0]

        include_title   = chkflags(notification_flags, FLAG_INCLUDE_TITLE)
        include_message = chkflags(notification_flags, FLAG_INCLUDE_MESSAGE)
        include_icon    = chkflags(notification_flags, FLAG_INCLUDE_ICON)

        title = ''
        message = ''

        if include_title or include_message:
            title_and_or_message_size = struct.unpack('>I', recvall(self.socket, 4))[0]
            title_and_or_message_encrypted = recvall(self.socket, title_and_or_message_size)
            title_and_or_message = self.tls_decrypt(title_and_or_message_encrypted).decode()
            if include_title:
                title = title_and_or_message.split('|||')[0]
            if include_message:
                message = title_and_or_message.split('|||')[1]

        if include_icon:
            icon_tmp_file = tempfile.NamedTemporaryFile(buffering=0, dir=TMP_DIR_PATH)
            icon_size = struct.unpack('>I', recvall(self.socket, 4))[0]
            icon_encrypted = recvall(self.socket, icon_size)
            icon = self.tls_decrypt(icon_encrypted)
            try:
                icon_tmp_file.write(icon)
                Notification(title, message, hashlib.sha256(title.encode() + message.encode() + icon).digest(),
                             icon_tmp_file).show()
            except Exception:
                Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
        else:
            Notification(title, message, hashlib.sha256(title.encode() + message.encode()).digest()).show()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号