python类standard_b64decode()的实例源码

test_base64.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode,
                        base64.b85decode,
                        base64.a85decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
test_base64.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode,
                        base64.b85decode,
                        base64.a85decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
Base64.py 文件源码 项目:whatcripto 作者: vandalvnl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def decipher(self):
        try:
            return base64.standard_b64decode(str.encode(self.cipher))
        except:
            return ''
string.py 文件源码 项目:filters 作者: eflglobal 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _apply(self, value):
        value = self._filter(value, Type(binary_type)) # type: binary_type

        if self._has_errors:
            return None

        # Strip out whitespace.
        # Technically, whitespace is not part of the Base64 alphabet,
        # but virtually every implementation allows it.
        value = self.whitespace_re.sub(b'', value)

        # Check for invalid characters.
        # Note that Python 3's b64decode does this for us, but we also
        # have to support Python 2.
        # https://docs.python.org/3/library/base64.html#base64.b64decode
        if not self.base64_re.match(value):
            return self._invalid_value(
                value   = value,
                reason  = self.CODE_INVALID,
            )

        # Check to see if we are working with a URL-safe dialect.
        # https://en.wikipedia.org/wiki/Base64#URL_applications
        if (b'_' in value) or (b'-' in value):
            # You can't mix dialects, silly!
            if (b'+' in value) or (b'/' in value):
                return self._invalid_value(
                    value   = value,
                    reason  = self.CODE_INVALID,
                )

            url_safe = True
        else:
            url_safe = False

        # Normalize padding.
        # http://stackoverflow.com/a/9807138/
        value = value.rstrip(b'=')
        value += (b'=' * (4 - (len(value) % 4)))

        try:
            return (
                urlsafe_b64decode(value)
                    if url_safe
                    else standard_b64decode(value)
            )
        except TypeError:
            return self._invalid_value(value, self.CODE_INVALID, exc_info=True)


# noinspection SpellCheckingInspection
MSL.py 文件源码 项目:plugin.video.netflix 作者: asciidisco 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __perform_key_handshake(self):
        header = self.__generate_msl_header(
            is_key_request=True,
            is_handshake=True,
            compressionalgo='',
            encrypt=False)
        esn = self.kodi_helper.get_esn()

        request = {
            'entityauthdata': {
                'scheme': 'NONE',
                'authdata': {
                    'identity': esn
                }
            },
            'headerdata': base64.standard_b64encode(header),
            'signature': '',
        }
        self.kodi_helper.log(msg='Key Handshake Request:')
        self.kodi_helper.log(msg=json.dumps(request))

        try:
            resp = self.session.post(
                url=self.endpoints['manifest'],
                data=json.dumps(request, sort_keys=True))
        except:
            resp = None
            exc = sys.exc_info()
            self.kodi_helper.log(
                msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1]))

        if resp and resp.status_code == 200:
            resp = resp.json()
            if 'errordata' in resp:
                self.kodi_helper.log(msg='Key Exchange failed')
                self.kodi_helper.log(
                    msg=base64.standard_b64decode(resp['errordata']))
                return False
            base_head = base64.standard_b64decode(resp['headerdata'])
            self.__parse_crypto_keys(
                headerdata=json.JSONDecoder().decode(base_head))
        else:
            self.kodi_helper.log(msg='Key Exchange failed')
            self.kodi_helper.log(msg=resp.text)
cnl_app.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def addcrypted2():

    package = request.forms.get("source", None)
    crypted = request.forms["crypted"]
    jk = request.forms["jk"]

    crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
    if JS:
        jk = "%s f()" % jk
        jk = JS.eval(jk)

    else:
        try:
            jk = re.findall(r"return ('|\")(.+)('|\")", jk)[0][1]
        except:
        ## Test for some known js functions to decode
            if jk.find("dec") > -1 and jk.find("org") > -1:
                org = re.findall(r"var org = ('|\")([^\"']+)", jk)[0][1]
                jk = list(org)
                jk.reverse()
                jk = "".join(jk)
            else:
                print "Could not decrypt key, please install py-spidermonkey or ossp-js"

    try:
        Key = unhexlify(jk)
    except:
        print "Could not decrypt key, please install py-spidermonkey or ossp-js"
        return "failed"

    IV = Key

    obj = AES.new(Key, AES.MODE_CBC, IV)
    result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

    result = filter(lambda x: x != "", result)

    try:
        if package:
            PYLOAD.addPackage(package, result, 0)
        else:
            PYLOAD.generateAndAddPackages(result, 0)
    except:
        return "failed can't add"
    else:
        return "success\r\n"
gen-universe.py 文件源码 项目:linkerUniverse 作者: LinkerNetworks 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def write_package_in_zip(zip_file, path, package):
    """Write packages files in the zip file

    :param zip_file: zip file where the files will get created
    :type zip_file: zipfile.ZipFile
    :param path: path for the package directory. E.g.
                 universe/repo/packages/M/marathon/0
    :type path: pathlib.Path
    :param package: package information dictionary
    :type package: dict
    :rtype: None
    """

    package = package.copy()
    package.pop('releaseVersion')
    package.pop('minDcosReleaseVersion', None)
    package['packagingVersion'] = "2.0"

    resource = package.pop('resource', None)
    if resource:
        cli = resource.pop('cli', None)
        if cli and 'command' not in package:
            print(('WARNING: Removing binary CLI from ({}, {}) without a '
                  'Python CLI').format(package['name'], package['version']))
        zip_file.writestr(
            str(path / 'resource.json'),
            json.dumps(resource))

    marathon_template = package.pop(
        'marathon',
        {}
    ).get(
        'v2AppMustacheTemplate'
    )
    if marathon_template:
        zip_file.writestr(
            str(path / 'marathon.json.mustache'),
            base64.standard_b64decode(marathon_template))

    config = package.pop('config', None)
    if config:
        zip_file.writestr(
            str(path / 'config.json'),
            json.dumps(config))

    command = package.pop('command', None)
    if command:
        zip_file.writestr(
            str(path / 'command.json'),
            json.dumps(command))

    zip_file.writestr(
        str(path / 'package.json'),
        json.dumps(package))
encryption_util.py 文件源码 项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def encrypt_file(encryption_material, in_filename,
                     chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
        """
        Encrypts a file
        :param s3_metadata: S3 metadata output
        :param encryption_material: encryption material
        :param in_filename: input file name
        :param chunk_size: read chunk size
        :param tmp_dir: temporary directory, optional
        :return: a encrypted file
        """
        logger = getLogger(__name__)
        decoded_key = base64.standard_b64decode(
            encryption_material.query_stage_master_key)
        key_size = len(decoded_key)
        logger.debug(u'key_size = %s', key_size)

        # Generate key for data encryption
        iv_data = SnowflakeEncryptionUtil.get_secure_random(AES.block_size)
        file_key = SnowflakeEncryptionUtil.get_secure_random(key_size)
        data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_data)

        temp_output_fd, temp_output_file = tempfile.mkstemp(
            text=False, dir=tmp_dir,
            prefix=os.path.basename(in_filename) + "#")
        padded = False
        logger.debug(u'unencrypted file: %s, temp file: %s, tmp_dir: %s',
                     in_filename, temp_output_file, tmp_dir)
        with open(in_filename, u'rb') as infile:
            with os.fdopen(temp_output_fd, u'wb') as outfile:
                while True:
                    chunk = infile.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    elif len(chunk) % AES.block_size != 0:
                        chunk = PKCS5_PAD(chunk, AES.block_size)
                        padded = True
                    outfile.write(data_cipher.encrypt(chunk))
                if not padded:
                    outfile.write(data_cipher.encrypt(
                        AES.block_size * chr(AES.block_size).encode(UTF8)))

        # encrypt key with QRMK
        key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
        enc_kek = key_cipher.encrypt(PKCS5_PAD(file_key, AES.block_size))

        mat_desc = MaterialDescriptor(
            smk_id=encryption_material.smk_id,
            query_id=encryption_material.query_id,
            key_size=key_size * 8)
        metadata = EncryptionMetadata(
            key=base64.b64encode(enc_kek).decode('utf-8'),
            iv=base64.b64encode(iv_data).decode('utf-8'),
            matdesc=matdesc_to_unicode(mat_desc),
        )
        return (metadata, temp_output_file)
encryption_util.py 文件源码 项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decrypt_file(metadata, encryption_material, in_filename,
                     chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
        """
        Decrypts a file and stores the output in the temporary directory
        :param metadata: metadata input
        :param encryption_material: encryption material
        :param in_filename: input file name
        :param chunk_size: read chunk size
        :param tmp_dir: temporary directory, optional
        :return: a decrypted file name
        """
        logger = getLogger(__name__)
        key_base64 = metadata.key
        iv_base64 = metadata.iv
        decoded_key = base64.standard_b64decode(
            encryption_material.query_stage_master_key)
        key_bytes = base64.standard_b64decode(key_base64)
        iv_bytes = base64.standard_b64decode(iv_base64)

        key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
        file_key = PKCS5_UNPAD(key_cipher.decrypt(key_bytes))

        data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_bytes)

        temp_output_fd, temp_output_file = tempfile.mkstemp(
            text=False, dir=tmp_dir,
            prefix=os.path.basename(in_filename) + "#")
        total_file_size = 0
        prev_chunk = None
        logger.info(u'encrypted file: %s, tmp file: %s',
                    in_filename, temp_output_file)
        with open(in_filename, u'rb') as infile:
            with os.fdopen(temp_output_fd, u'wb') as outfile:
                while True:
                    chunk = infile.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    total_file_size += len(chunk)
                    d = data_cipher.decrypt(chunk)
                    outfile.write(d)
                    prev_chunk = d
                if prev_chunk is not None:
                    total_file_size -= PKCS5_OFFSET(prev_chunk)
                outfile.truncate(total_file_size)
        return temp_output_file
release_builder.py 文件源码 项目:dcos-rabbitmq 作者: sheepkiller 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _unpack_stub_universe_json(self, scratchdir, stub_universe_file):
        stub_universe_json = json.loads(stub_universe_file.read().decode('utf-8'), object_pairs_hook=collections.OrderedDict)

        # put package files into a subdir of scratchdir: avoids conflicts with reuse of scratchdir elsewhere
        pkgdir = os.path.join(scratchdir, 'stub-universe-{}'.format(self._pkg_name))
        os.makedirs(pkgdir)

        if 'packages' not in stub_universe_json:
            raise Exception('Expected "packages" key in root of stub universe JSON: {}'.format(
                self._stub_universe_url))
        if len(stub_universe_json['packages']) != 1:
            raise Exception('Expected single "packages" entry in stub universe JSON: {}'.format(
                self._stub_universe_url))

        # note: we delete elements from package_json they're unpacked, as package_json is itself written to a file
        package_json = stub_universe_json['packages'][0]

        def extract_json_file(package_dict, name):
            file_dict = package_dict.get(name)
            if file_dict is not None:
                del package_dict[name]
                # ensure that the file has a trailing newline (json.dump() doesn't!)
                with open(os.path.join(pkgdir, name + '.json'), 'w') as fileref:
                    content = json.dumps(file_dict, indent=2)
                    fileref.write(content)
                    if not content.endswith('\n'):
                        fileref.write('\n')
        extract_json_file(package_json, 'command')
        extract_json_file(package_json, 'config')
        extract_json_file(package_json, 'resource')

        marathon_json = package_json.get('marathon', {}).get('v2AppMustacheTemplate')
        if marathon_json is not None:
            del package_json['marathon']
            with open(os.path.join(pkgdir, 'marathon.json.mustache'), 'w') as marathon_file:
                marathon_file.write(base64.standard_b64decode(marathon_json).decode())

        if 'releaseVersion' in package_json:
            del package_json['releaseVersion']

        with open(os.path.join(pkgdir, 'package.json'), 'w') as package_file:
            json.dump(package_json, package_file, indent=2)

        return pkgdir


问题


面经


文章

微信
公众号

扫码关注公众号