python类standard_b64decode()的实例源码

cb-powershell-decode.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def main(cb, args):
    powershells = cb.process_search_iter('process_name:powershell.exe')
    for s in powershells:
        if s['cmdline']:
            encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
            if encoded != None:
                i = encoded.group(1)
                if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
                    trailingBytes = len(i) % 4
                    if trailingBytes == 3:
                        i = i + '='
                    elif trailingBytes == 2:
                        i = i + '=='
                decodedCommand = base64.standard_b64decode(i)
                try:
                    a = decodedCommand.encode('ascii', 'replace')
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
                    args['server_url'], s['id'], a.replace('\0', ""))
                except UnicodeError:
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
                    args['server_url'], s['id'], s['cmdline'])
                    pass
rpc_api.py 文件源码 项目:pogom-linux 作者: PokeHunterProject 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
UserContributedManager.py 文件源码 项目:PyDoc 作者: shaun-h 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __getDownloadedUserContributed(self):
        dbManager = DBManager.DBManager()
        t = dbManager.InstalledDocsetsByType('usercontributed')
        ds = []
        for d in t:
            aa = UserContributed()
            aa.name = d[1]
            aa.id = d[0]
            aa.path = os.path.join(os.path.abspath('.'),d[2])
            #aa.image = self.__getLocalIcon(d[2])
            imgData = str(d[4])
            if not imgData == '':
                imgdata = base64.standard_b64decode(imgData)
                aa.image = ui.Image.from_data(imgdata)
            else:
                aa.image = self.__getIconWithName('Other')
            aa.authorName = d[6]
            aa.version = d[5]
            ds.append(aa)
        return ds
constrained_client.py 文件源码 项目:creator-system-test-framework 作者: CreatorDev 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def Decode(value, type):
    data = base64.standard_b64decode(value)

    print("Decoding value %s, type %s" % (str(value), str(type)))
    sys.stdout.flush()

    if type is int:
        result = struct.unpack("=q", data)[0]
    elif type is float:
        result = struct.unpack("=d", data)[0]
    elif type is str:
        result = str(data)
    else:
        raise EncodeException("Unknown type %s with value %s" % (str(type(value)),str(value),))

    return result
canister.py 文件源码 项目:canister 作者: dagnelies 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _buildAuthJWT(config):
    client_id = config.get('canister.auth_jwt_client_id', None)
    secret = config.get('canister.auth_jwt_secret', None)
    encoding = config.get('canister.auth_jwt_encoding', 'clear').lower() # clear, base64std, or base64url

    if not client_id or not secret:
        return None

    import jwt

    if encoding == 'base64std': # with + and /
        secret = base64.standard_b64decode(secret)
    elif encoding == 'base64url': # with - and _
        secret = base64.urlsafe_b64decode(secret)
    elif encoding == 'clear':
        pass
    else:
        raise Exception('Invalid auth_jwt_encoding in config: "%s" (should be "clear", "base64std" or "base64url")' % encoding)

    def validate(token):
        profile = jwt.decode(token, secret, audience=client_id)
        return profile

    return validate
MSL.py 文件源码 项目:plugin.video.netflix 作者: asciidisco 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __parse_crypto_keys(self, headerdata):
        self.__set_master_token(headerdata['keyresponsedata']['mastertoken'])
        # Init Decryption
        enc_key = headerdata['keyresponsedata']['keydata']['encryptionkey']
        hmac_key = headerdata['keyresponsedata']['keydata']['hmackey']
        encrypted_encryption_key = base64.standard_b64decode(enc_key)
        encrypted_sign_key = base64.standard_b64decode(hmac_key)
        cipher_rsa = PKCS1_OAEP.new(self.rsa_key)

        # Decrypt encryption key
        cipher_raw = cipher_rsa.decrypt(encrypted_encryption_key)
        encryption_key_data = json.JSONDecoder().decode(cipher_raw)
        self.encryption_key = base64key_decode(encryption_key_data['k'])

        # Decrypt sign key
        sign_key_raw = cipher_rsa.decrypt(encrypted_sign_key)
        sign_key_data = json.JSONDecoder().decode(sign_key_raw)
        self.sign_key = base64key_decode(sign_key_data['k'])

        self.__save_msl_data()
        self.handshake_performed = True
MSL.py 文件源码 项目:plugin.video.netflix 作者: asciidisco 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __load_msl_data(self):
        raw_msl_data = self.load_file(
            msl_data_path=self.kodi_helper.msl_data_path,
            filename='msl_data.json')
        msl_data = json.JSONDecoder().decode(raw_msl_data)
        # Check expire date of the token
        raw_token = msl_data['tokens']['mastertoken']['tokendata']
        base_token = base64.standard_b64decode(raw_token)
        master_token = json.JSONDecoder().decode(base_token)
        exp = int(master_token['expiration'])
        valid_until = datetime.utcfromtimestamp(exp)
        present = datetime.now()
        difference = valid_until - present
        difference = difference.total_seconds() / 60 / 60
        # If token expires in less then 10 hours or is expires renew it
        if difference < 10:
            self.__load_rsa_keys()
            self.__perform_key_handshake()
            return

        self.__set_master_token(msl_data['tokens']['mastertoken'])
        enc_key = msl_data['encryption_key']
        self.encryption_key = base64.standard_b64decode(enc_key)
        self.sign_key = base64.standard_b64decode(msl_data['sign_key'])
test_base64.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_b64decode_invalid_chars(self):
        # issue 1466065: Test some invalid characters.
        tests = ((b'%3d==', b'\xdd'),
                 (b'$3d==', b'\xdd'),
                 (b'[==', b''),
                 (b'YW]3=', b'am'),
                 (b'3{d==', b'\xdd'),
                 (b'3d}==', b'\xdd'),
                 (b'@@', b''),
                 (b'!', b''),
                 (b'YWJj\nYWI=', b'abcab'))
        for bstr, res in tests:
            self.assertEqual(base64.b64decode(bstr), res)
            self.assertEqual(base64.standard_b64decode(bstr), res)
            self.assertEqual(base64.urlsafe_b64decode(bstr), res)

        # Normal alphabet characters not discarded when alternative given
        res = b'\xFB\xEF\xBE\xFF\xFF\xFF'
        self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res)
        self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
test_base64.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_b64decode_invalid_chars(self):
        # issue 1466065: Test some invalid characters.
        tests = ((b'%3d==', b'\xdd'),
                 (b'$3d==', b'\xdd'),
                 (b'[==', b''),
                 (b'YW]3=', b'am'),
                 (b'3{d==', b'\xdd'),
                 (b'3d}==', b'\xdd'),
                 (b'@@', b''),
                 (b'!', b''),
                 (b'YWJj\nYWI=', b'abcab'))
        for bstr, res in tests:
            self.assertEqual(base64.b64decode(bstr), res)
            self.assertEqual(base64.standard_b64decode(bstr), res)
            self.assertEqual(base64.urlsafe_b64decode(bstr), res)

        # Normal alphabet characters not discarded when alternative given
        res = b'\xFB\xEF\xBE\xFF\xFF\xFF'
        self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res)
        self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
ClickAndLoadBackend.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def addcrypted2(self):
        package = self.get_post("source", "ClickAndLoad Package")
        crypted = self.get_post("crypted")
        jk = self.get_post("jk")

        crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
        jk = "%s f()" % jk
        jk = js.eval(jk)
        Key = unhexlify(jk)
        IV = Key

        obj = AES.new(Key, AES.MODE_CBC, IV)
        result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

        result = filter(lambda x: x != "", result)

        self.add_package(package, result, 0)
CNLServer.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def addcrypted2(self):
        package = self.get_post("source", "ClickAndLoad Package")
        crypted = self.get_post("crypted")
        jk = self.get_post("jk")

        crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
        jk = "%s f()" % jk
        jk = js.eval(jk)
        Key = unhexlify(jk)
        IV = Key

        obj = AES.new(Key, AES.MODE_CBC, IV)
        result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

        result = filter(lambda x: x != "", result)

        self.add_package(package, result, 0)
vm.py 文件源码 项目:webkvmmgr 作者: crazw 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def read_vm_file(self, path, uri="qemu:///system"):
        FILE_OPEN_READ="""{"execute":"guest-file-open", "arguments":{"path":"%s","mode":"r"}}"""
        FILE_READ="""{"execute":"guest-file-read", "arguments":{"handle":%s,"count":%d}}"""
        FILE_CLOSE="""{"execute":"guest-file-close", "arguments":{"handle":%s}}"""

        file_handle=-1
        try:
            file_handle=self.EXE(FILE_OPEN_READ % path)["return"]
            file_content=self.EXE(FILE_READ % (file_handle,1024000))["return"]["buf-b64"]
            file_content = base64.standard_b64decode(file_content)
        except Exception,e:
            logger.exception(e)
            return None
        finally:
            self.EXE(FILE_CLOSE % file_handle)
        return file_content
matcher.py 文件源码 项目:cti-pattern-matcher 作者: oasis-open 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _process_prop_suffix(prop_name, value):
    """
    Some JSON properties have suffixes indicating the type of the value.  This
    will translate the json value into a Python type with the proper semantics,
    so that subsequent property tests will work properly.

    :param prop_name: The JSON property name
    :param value: The JSON property value
    :return: If key is a specially suffixed property name, an instance of an
        appropriate python type.  Otherwise, value itself is returned.
    """

    if prop_name.endswith(u"_hex"):
        # binary type, expressed as hex
        value = binascii.a2b_hex(value)
    elif prop_name.endswith(u"_bin"):
        # binary type, expressed as base64
        value = base64.standard_b64decode(value)

    return value
rpc_api.py 文件源码 项目:pogom 作者: favll 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
rpc_api.py 文件源码 项目:pokescan 作者: joaoanes 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms = True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
cfs.py 文件源码 项目:collection 作者: skywind3000 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def sign_extract(signature):
    if not signature:
        return None, None, None
    try:
        text = base64.standard_b64decode(signature)
    except:
        return None, None, None
    part = text.split(':')
    if len(part) != 3:
        return None, None, None
    user = part[0].strip('\r\n\t ')
    try:
        ts = long(part[1])
    except:
        return None, None, None
    verify = part[2].strip('\r\n\t ').lower()
    return user, ts, verify

# ?????
aws_srp.py 文件源码 项目:warrant 作者: capless 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def process_challenge(self, challenge_parameters):
        user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP']
        salt_hex = challenge_parameters['SALT']
        srp_b_hex = challenge_parameters['SRP_B']
        secret_block_b64 = challenge_parameters['SECRET_BLOCK']
        # re strips leading zero from a day number (required by AWS Cognito)
        timestamp = re.sub(r" 0(\d) ", r" \1 ",
                           datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"))
        hkdf = self.get_password_authentication_key(user_id_for_srp,
                                                    self.password, hex_to_long(srp_b_hex), salt_hex)
        secret_block_bytes = base64.standard_b64decode(secret_block_b64)
        msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \
              bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8')
        hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
        signature_string = base64.standard_b64encode(hmac_obj.digest())
        response = {'TIMESTAMP': timestamp,
                    'USERNAME': user_id_for_srp,
                    'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64,
                    'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')}
        if self.client_secret is not None:
            response.update({
                "SECRET_HASH":
                self.get_secret_hash(self.username, self.client_id, self.client_secret)})
        return response
rpc_api.py 文件源码 项目:pogom-updated 作者: PokeHunterProject 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
recipe-501148.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def authenticate_client(self):
        validuser = False

        if self.headers.has_key('Authorization'):
            # handle Basic authentication
            (enctype, encstr) =  self.headers.get('Authorization').split()
            (emailid, machine_name) = base64.standard_b64decode(encstr).split(':')
            (auth_machine, auth_uuidstr, auth_email) = authenticate(machine_name)

            if emailid == auth_email:
                print "Authenticated"
                # set authentication cookies on client machines
                validuser = True
                if auth_uuidstr:
                    self.setCookie('UUID',auth_uuidstr)

        elif self.headers.has_key('UUID'):
            # handle cookie based authentication
            id =  self.headers.get('UUID')
            (auth_machine, auth_uuidstr, auth_email) = authenticate(id)

            if auth_uuidstr :
                print "Authenticated"
                validuser = True
        else:
            print 'Authentication failed'

        return validuser
message_conversion.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _to_binary_inst(msg):
    if type(msg) in string_types:
        try:
            return standard_b64decode(msg)
        except:
            return msg
    else:
        try:
            return str(bytearray(msg))
        except:
            return msg
lightstep_binary_propagator.py 文件源码 项目:lightstep-tracer-python 作者: lightstep 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def extract(self, carrier):
        if type(carrier) is not bytearray:
            raise InvalidCarrierException()
        serializedProto = standard_b64decode(carrier)
        state = BinaryCarrier()
        state.ParseFromString(bytes(serializedProto))
        baggage = {}
        for k in state.basic_ctx.baggage_items:
            baggage[k] = state.basic_ctx.baggage_items[k]

        return SpanContext(
            span_id=state.basic_ctx.span_id,
            trace_id=state.basic_ctx.trace_id,
            baggage=baggage,
            sampled=state.basic_ctx.sampled)
smart_contracts.py 文件源码 项目:dragonchain 作者: dragonchain 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _sc_provisioning_helper(self, pl, sc_class, sc_key):
        """
        insert sc code into appropriate dictionary
        :param pl: transaction payload to extract sc from
        :param sc_class: type of sc being dealt with (e.g. tsc, ssc, etc.)
        :param sc_key: transaction type
        """
        try:
            if 'smart_contract' in pl:
                sc = pl['smart_contract']
                sc_impl = sc[sc_class]
                if sc_impl:
                    try:
                        sc_impl = base64.standard_b64decode(sc_impl)
                    except TypeError:
                        raise Exception("The Smart Contract implementation for " + str(sc_key) +
                                        " must be base64 encoded.")

                    func = None
                    # define sc function
                    exec(sc_impl)
                    # store sc function for this txn type (sc_key)
                    self.sc_container[sc_class][sc_key] = func
                else:
                    logger().warning("No smart contract code provided...")
                    return False
            else:
                return False
        except Exception as ex:
            template = "An exception of type {0} occurred. Arguments:\n{1!r}"
            message = template.format(type(ex).__name__, ex.args)
            logger().warning(message)
            return False
        return True
UserContributedManager.py 文件源码 项目:PyDoc 作者: shaun-h 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __getUserContributed(self):
        server = self.serverManager.getDownloadServer(self.localServer)
        url = server.url
        if not url[-1] == '/':
            url = url + '/'
        url = url + self.jsonServerLocation
        data = requests.get(url).text
        data = ast.literal_eval(data)
        usercontributed = []
        defaultIcon = self.__getIconWithName('Other')
        for k,d in data['docsets'].items():
            u = UserContributed()
            u.name = d['name']
            if 'aliases' in d.keys():
                u.aliases = d['aliases']
            u.version = d['version']
            u.archive = d['archive']
            u.authorName = d['author']['name']
            if 'icon' in d.keys():
                imgdata = base64.standard_b64decode(d['icon'])
                u.image = ui.Image.from_data(imgdata)
                u.imageData = d['icon']
            else:
                u.image = defaultIcon
            u.onlineid = k
            u.status = 'online'
            usercontributed.append(u)
        return sorted(usercontributed, key=lambda x: x.name.lower())
newBitcoinECC.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def VerifyMessageFromAddress(self,addr,message,sig):
        #Check a signature (r,s) for the message m signed by the Bitcoin 
        # address "addr".

        sign=base64.standard_b64decode(sig)
        (r,s)=(Byte2Int(sign[1:33]),Byte2Int(sign[33:65]))

        z=Byte2Int(Hash(Hash(MsgMagic(message),"SHA256"),"SHA256"))        

        val=ord(sign[0])
        if val<27 or val>=35:
            return False

        if val>=31:
            uncompressed=False
            val-=4
        else:
            uncompressed=True

        x=r
        y2=(pow(x,3,self.p) + self.a*x + self.b) % self.p
        y=Cipolla(y2,self.p)

        for _ in range(2):
            kG=EllipticCurvePoint([x,y,1],self.a,self.b,self.p,self.n)  
            mzG=self*((-z)%self.n)
            Q=(kG*s+mzG)*InvMod(r,self.n)

            if self.AddressFromPublicKey(Q,uncompressed)==addr:
                return True

            y=self.p-y

        return False
MSL.py 文件源码 项目:plugin.video.netflix 作者: asciidisco 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def __decrypt_payload_chunks(self, payloadchunks):
        decrypted_payload = ''
        for chunk in payloadchunks:
            payloadchunk = json.JSONDecoder().decode(chunk)
            payload = payloadchunk.get('payload')
            decoded_payload = base64.standard_b64decode(payload)
            encryption_envelope = json.JSONDecoder().decode(decoded_payload)
            # Decrypt the text
            cipher = AES.new(
                self.encryption_key,
                AES.MODE_CBC,
                base64.standard_b64decode(encryption_envelope['iv']))
            ciphertext = encryption_envelope.get('ciphertext')
            plaintext = cipher.decrypt(base64.standard_b64decode(ciphertext))
            # unpad the plaintext
            plaintext = json.JSONDecoder().decode(Padding.unpad(plaintext, 16))
            data = plaintext.get('data')

            # uncompress data if compressed
            if plaintext.get('compressionalgo') == 'GZIP':
                decoded_data = base64.standard_b64decode(data)
                data = zlib.decompress(decoded_data, 16 + zlib.MAX_WBITS)
            else:
                data = base64.standard_b64decode(data)
            decrypted_payload += data

        decrypted_payload = json.JSONDecoder().decode(decrypted_payload)[1]['payload']['data']
        decrypted_payload = base64.standard_b64decode(decrypted_payload)
        return json.JSONDecoder().decode(decrypted_payload)
MSL.py 文件源码 项目:plugin.video.netflix 作者: asciidisco 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __set_master_token(self, master_token):
        self.mastertoken = master_token
        raw_token = master_token['tokendata']
        base_token = base64.standard_b64decode(raw_token)
        decoded_token = json.JSONDecoder().decode(base_token)
        self.sequence_number = decoded_token.get('sequencenumber')
test_base64.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
payloads.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def base64PayloadDecoder(data):
    return base64.standard_b64decode(data)
utils.py 文件源码 项目:django-oscar-api-checkout 作者: thelabnyc 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def _session_unpickle(utfed):
    base64ed = utfed.encode('utf8')
    pickled = base64.standard_b64decode(base64ed)
    obj = pickle.loads(pickled)
    return obj
utils.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def wsse_digest(secret, b64_encoded_nonce, timestamp):
    m = hashlib.sha256()
    m.update(base64.standard_b64decode(b64_encoded_nonce))
    m.update(timestamp)
    m.update(secret)
    digest = m.digest()
    return base64.standard_b64encode(digest)


问题


面经


文章

微信
公众号

扫码关注公众号