def test_decode_nonascii_str(self):
decode_funcs = (base64.b64decode,
base64.standard_b64decode,
base64.urlsafe_b64decode,
base64.b32decode,
base64.b16decode,
base64.b85decode,
base64.a85decode)
for f in decode_funcs:
self.assertRaises(ValueError, f, 'with non-ascii \xcb')
python类standard_b64decode()的实例源码
def test_decode_nonascii_str(self):
decode_funcs = (base64.b64decode,
base64.standard_b64decode,
base64.urlsafe_b64decode,
base64.b32decode,
base64.b16decode,
base64.b85decode,
base64.a85decode)
for f in decode_funcs:
self.assertRaises(ValueError, f, 'with non-ascii \xcb')
def decipher(self):
try:
return base64.standard_b64decode(str.encode(self.cipher))
except:
return ''
def _apply(self, value):
value = self._filter(value, Type(binary_type)) # type: binary_type
if self._has_errors:
return None
# Strip out whitespace.
# Technically, whitespace is not part of the Base64 alphabet,
# but virtually every implementation allows it.
value = self.whitespace_re.sub(b'', value)
# Check for invalid characters.
# Note that Python 3's b64decode does this for us, but we also
# have to support Python 2.
# https://docs.python.org/3/library/base64.html#base64.b64decode
if not self.base64_re.match(value):
return self._invalid_value(
value = value,
reason = self.CODE_INVALID,
)
# Check to see if we are working with a URL-safe dialect.
# https://en.wikipedia.org/wiki/Base64#URL_applications
if (b'_' in value) or (b'-' in value):
# You can't mix dialects, silly!
if (b'+' in value) or (b'/' in value):
return self._invalid_value(
value = value,
reason = self.CODE_INVALID,
)
url_safe = True
else:
url_safe = False
# Normalize padding.
# http://stackoverflow.com/a/9807138/
value = value.rstrip(b'=')
value += (b'=' * (4 - (len(value) % 4)))
try:
return (
urlsafe_b64decode(value)
if url_safe
else standard_b64decode(value)
)
except TypeError:
return self._invalid_value(value, self.CODE_INVALID, exc_info=True)
# noinspection SpellCheckingInspection
def __perform_key_handshake(self):
header = self.__generate_msl_header(
is_key_request=True,
is_handshake=True,
compressionalgo='',
encrypt=False)
esn = self.kodi_helper.get_esn()
request = {
'entityauthdata': {
'scheme': 'NONE',
'authdata': {
'identity': esn
}
},
'headerdata': base64.standard_b64encode(header),
'signature': '',
}
self.kodi_helper.log(msg='Key Handshake Request:')
self.kodi_helper.log(msg=json.dumps(request))
try:
resp = self.session.post(
url=self.endpoints['manifest'],
data=json.dumps(request, sort_keys=True))
except:
resp = None
exc = sys.exc_info()
self.kodi_helper.log(
msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1]))
if resp and resp.status_code == 200:
resp = resp.json()
if 'errordata' in resp:
self.kodi_helper.log(msg='Key Exchange failed')
self.kodi_helper.log(
msg=base64.standard_b64decode(resp['errordata']))
return False
base_head = base64.standard_b64decode(resp['headerdata'])
self.__parse_crypto_keys(
headerdata=json.JSONDecoder().decode(base_head))
else:
self.kodi_helper.log(msg='Key Exchange failed')
self.kodi_helper.log(msg=resp.text)
def addcrypted2():
package = request.forms.get("source", None)
crypted = request.forms["crypted"]
jk = request.forms["jk"]
crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
if JS:
jk = "%s f()" % jk
jk = JS.eval(jk)
else:
try:
jk = re.findall(r"return ('|\")(.+)('|\")", jk)[0][1]
except:
## Test for some known js functions to decode
if jk.find("dec") > -1 and jk.find("org") > -1:
org = re.findall(r"var org = ('|\")([^\"']+)", jk)[0][1]
jk = list(org)
jk.reverse()
jk = "".join(jk)
else:
print "Could not decrypt key, please install py-spidermonkey or ossp-js"
try:
Key = unhexlify(jk)
except:
print "Could not decrypt key, please install py-spidermonkey or ossp-js"
return "failed"
IV = Key
obj = AES.new(Key, AES.MODE_CBC, IV)
result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")
result = filter(lambda x: x != "", result)
try:
if package:
PYLOAD.addPackage(package, result, 0)
else:
PYLOAD.generateAndAddPackages(result, 0)
except:
return "failed can't add"
else:
return "success\r\n"
def write_package_in_zip(zip_file, path, package):
"""Write packages files in the zip file
:param zip_file: zip file where the files will get created
:type zip_file: zipfile.ZipFile
:param path: path for the package directory. E.g.
universe/repo/packages/M/marathon/0
:type path: pathlib.Path
:param package: package information dictionary
:type package: dict
:rtype: None
"""
package = package.copy()
package.pop('releaseVersion')
package.pop('minDcosReleaseVersion', None)
package['packagingVersion'] = "2.0"
resource = package.pop('resource', None)
if resource:
cli = resource.pop('cli', None)
if cli and 'command' not in package:
print(('WARNING: Removing binary CLI from ({}, {}) without a '
'Python CLI').format(package['name'], package['version']))
zip_file.writestr(
str(path / 'resource.json'),
json.dumps(resource))
marathon_template = package.pop(
'marathon',
{}
).get(
'v2AppMustacheTemplate'
)
if marathon_template:
zip_file.writestr(
str(path / 'marathon.json.mustache'),
base64.standard_b64decode(marathon_template))
config = package.pop('config', None)
if config:
zip_file.writestr(
str(path / 'config.json'),
json.dumps(config))
command = package.pop('command', None)
if command:
zip_file.writestr(
str(path / 'command.json'),
json.dumps(command))
zip_file.writestr(
str(path / 'package.json'),
json.dumps(package))
encryption_util.py 文件源码
项目:snowflake-connector-python
作者: snowflakedb
项目源码
文件源码
阅读 44
收藏 0
点赞 0
评论 0
def encrypt_file(encryption_material, in_filename,
chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
"""
Encrypts a file
:param s3_metadata: S3 metadata output
:param encryption_material: encryption material
:param in_filename: input file name
:param chunk_size: read chunk size
:param tmp_dir: temporary directory, optional
:return: a encrypted file
"""
logger = getLogger(__name__)
decoded_key = base64.standard_b64decode(
encryption_material.query_stage_master_key)
key_size = len(decoded_key)
logger.debug(u'key_size = %s', key_size)
# Generate key for data encryption
iv_data = SnowflakeEncryptionUtil.get_secure_random(AES.block_size)
file_key = SnowflakeEncryptionUtil.get_secure_random(key_size)
data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_data)
temp_output_fd, temp_output_file = tempfile.mkstemp(
text=False, dir=tmp_dir,
prefix=os.path.basename(in_filename) + "#")
padded = False
logger.debug(u'unencrypted file: %s, temp file: %s, tmp_dir: %s',
in_filename, temp_output_file, tmp_dir)
with open(in_filename, u'rb') as infile:
with os.fdopen(temp_output_fd, u'wb') as outfile:
while True:
chunk = infile.read(chunk_size)
if len(chunk) == 0:
break
elif len(chunk) % AES.block_size != 0:
chunk = PKCS5_PAD(chunk, AES.block_size)
padded = True
outfile.write(data_cipher.encrypt(chunk))
if not padded:
outfile.write(data_cipher.encrypt(
AES.block_size * chr(AES.block_size).encode(UTF8)))
# encrypt key with QRMK
key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
enc_kek = key_cipher.encrypt(PKCS5_PAD(file_key, AES.block_size))
mat_desc = MaterialDescriptor(
smk_id=encryption_material.smk_id,
query_id=encryption_material.query_id,
key_size=key_size * 8)
metadata = EncryptionMetadata(
key=base64.b64encode(enc_kek).decode('utf-8'),
iv=base64.b64encode(iv_data).decode('utf-8'),
matdesc=matdesc_to_unicode(mat_desc),
)
return (metadata, temp_output_file)
encryption_util.py 文件源码
项目:snowflake-connector-python
作者: snowflakedb
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def decrypt_file(metadata, encryption_material, in_filename,
chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
"""
Decrypts a file and stores the output in the temporary directory
:param metadata: metadata input
:param encryption_material: encryption material
:param in_filename: input file name
:param chunk_size: read chunk size
:param tmp_dir: temporary directory, optional
:return: a decrypted file name
"""
logger = getLogger(__name__)
key_base64 = metadata.key
iv_base64 = metadata.iv
decoded_key = base64.standard_b64decode(
encryption_material.query_stage_master_key)
key_bytes = base64.standard_b64decode(key_base64)
iv_bytes = base64.standard_b64decode(iv_base64)
key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
file_key = PKCS5_UNPAD(key_cipher.decrypt(key_bytes))
data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_bytes)
temp_output_fd, temp_output_file = tempfile.mkstemp(
text=False, dir=tmp_dir,
prefix=os.path.basename(in_filename) + "#")
total_file_size = 0
prev_chunk = None
logger.info(u'encrypted file: %s, tmp file: %s',
in_filename, temp_output_file)
with open(in_filename, u'rb') as infile:
with os.fdopen(temp_output_fd, u'wb') as outfile:
while True:
chunk = infile.read(chunk_size)
if len(chunk) == 0:
break
total_file_size += len(chunk)
d = data_cipher.decrypt(chunk)
outfile.write(d)
prev_chunk = d
if prev_chunk is not None:
total_file_size -= PKCS5_OFFSET(prev_chunk)
outfile.truncate(total_file_size)
return temp_output_file
def _unpack_stub_universe_json(self, scratchdir, stub_universe_file):
stub_universe_json = json.loads(stub_universe_file.read().decode('utf-8'), object_pairs_hook=collections.OrderedDict)
# put package files into a subdir of scratchdir: avoids conflicts with reuse of scratchdir elsewhere
pkgdir = os.path.join(scratchdir, 'stub-universe-{}'.format(self._pkg_name))
os.makedirs(pkgdir)
if 'packages' not in stub_universe_json:
raise Exception('Expected "packages" key in root of stub universe JSON: {}'.format(
self._stub_universe_url))
if len(stub_universe_json['packages']) != 1:
raise Exception('Expected single "packages" entry in stub universe JSON: {}'.format(
self._stub_universe_url))
# note: we delete elements from package_json they're unpacked, as package_json is itself written to a file
package_json = stub_universe_json['packages'][0]
def extract_json_file(package_dict, name):
file_dict = package_dict.get(name)
if file_dict is not None:
del package_dict[name]
# ensure that the file has a trailing newline (json.dump() doesn't!)
with open(os.path.join(pkgdir, name + '.json'), 'w') as fileref:
content = json.dumps(file_dict, indent=2)
fileref.write(content)
if not content.endswith('\n'):
fileref.write('\n')
extract_json_file(package_json, 'command')
extract_json_file(package_json, 'config')
extract_json_file(package_json, 'resource')
marathon_json = package_json.get('marathon', {}).get('v2AppMustacheTemplate')
if marathon_json is not None:
del package_json['marathon']
with open(os.path.join(pkgdir, 'marathon.json.mustache'), 'w') as marathon_file:
marathon_file.write(base64.standard_b64decode(marathon_json).decode())
if 'releaseVersion' in package_json:
del package_json['releaseVersion']
with open(os.path.join(pkgdir, 'package.json'), 'w') as package_file:
json.dump(package_json, package_file, indent=2)
return pkgdir