def test_decrypt_different_key_set(self):
"""
Tests decryption with different fernet key set. Note that now we don't have the old fernet key with which
value was encrypted so we would not be able to decrypt it and we should get an Invalid Token.
"""
old_keys_set = ['test-ferent-key']
self.assertEqual(settings.FERNET_KEYS, old_keys_set)
new_keys_set = ['new-fernet-key']
# Invalidate cached properties so that we get the latest keys
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
with self.assertRaises(InvalidToken):
TranscriptCredentials.objects.get(
org=self.credentials_data['org'], provider=self.credentials_data['provider']
)
python类InvalidToken()的实例源码
re_encrypt_transcript_credentials.py 文件源码
项目:edx-video-pipeline
作者: edx
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
"""
handle method for command class.
"""
LOGGER.info('[Transcript credentials re-encryption] Process started.')
# Invalidate cached properties so that we get the latest keys
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
try:
with transaction.atomic():
# Call save on each credentials record so that re-encryption can be be performed on fernet fields.
for transcript_credential in TranscriptCredentials.objects.all():
transcript_credential.save()
LOGGER.info('[Transcript credentials re-encryption] Process completed.')
except InvalidToken:
LOGGER.exception(
'[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
)
def get_credential(self, hostname):
"""
This function returns credentials for a particular hostname.
:param hostname: hostname
:type hostname: str
"""
hostname = self.cut_hostname(hostname)
try:
if self.KEY:
crypto = Fernet(self.KEY)
return (
self.CREDENTIALS[hostname]["username"],
crypto.decrypt(self.CREDENTIALS[hostname]["password"][2:].encode())
)
else:
#return plain information
return (
self.CREDENTIALS[hostname]["username"],
self.CREDENTIALS[hostname]["password"]
)
except InvalidToken:
raise ContainerException("Invalid password specified!")
except KeyError:
pass
def test__assures_data_integrity(self):
self.write_secret()
testdata = factory.make_bytes(size=10)
token = fernet_encrypt_psk(testdata)
bad_token = bytearray(token)
# Flip a bit in the token, so we can ensure it won't decrypt if it
# has been corrupted. Subtract 4 to avoid the end of the token; that
# portion is just padding, and isn't covered by the HMAC.
byte_to_flip = randint(0, len(bad_token) - 4)
bit_to_flip = 1 << randint(0, 7)
bad_token[byte_to_flip] ^= bit_to_flip
bad_token = bytes(bad_token)
test_description = ("token=%s; token[%d] ^= 0x%02x" % (
token.decode("utf-8"), byte_to_flip, bit_to_flip))
with ExpectedException(InvalidToken, msg=test_description):
fernet_decrypt_psk(bad_token)
def parse_config_file(
cipher_suite, config_file="./config/foxha_config.ini"
):
try:
repo_host, repo_port, repo_database, repo_user,\
encrypted_repo_pass =\
Utils.get_config_values_from_config_file(config_file)
except (ConfigParser.NoSectionError) as err:
print_error("Config file error: {}".format(err))
exit(99)
except (ConfigParser.NoOptionError) as err:
print_error("Config file error: {}".format(err))
exit(99)
try:
decrypted_repo_pass = cipher_suite.decrypt(encrypted_repo_pass)
return repo_host, repo_port, repo_database,\
repo_user, decrypted_repo_pass
except InvalidToken as e:
print_error("ERROR: InvalidToken")
exit(99)
except Exception as e:
print_error("ERROR: %s" % e)
exit(3)
def _load_secrets():
secrets_path = _get_secrets_path()
if not path.isfile(secrets_path):
logging.getLogger(__name__).debug("No secrets file found")
return {}
f = Fernet(key)
with open(secrets_path, 'rb') as secrets_file:
try:
return json.loads(f.decrypt(secrets_file.read()).decode())
except InvalidToken:
logging.getLogger(__name__).critical("You entered the wrong password")
async_handler.shutdown()
return None
def encrypt(self, data):
"""
Symmetric encryption using django's secret key
"""
try:
encrypted = self._fernet.encrypt(data)
return encrypted
except (InvalidSignature, InvalidToken):
raise CryptoException('unable to encrypt data')
def decrypt(self, data):
"""
Symmetric decryption using django's secret key
"""
try:
encrypted = self._fernet.decrypt(data)
return encrypted
except (InvalidSignature, InvalidToken):
raise CryptoException('unable to decrypt data')
test_re_encrypt_transcript_credentials.py 文件源码
项目:edx-video-pipeline
作者: edx
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def verify_access_credentials(self):
"""
Fetches a record to check if we are able to get encrypted data.
Accessing object that is not able to be decrypted, would throw InvalidToken error.
"""
TranscriptCredentials.objects.get(
org=self.credentials_data['org'], provider=self.credentials_data['provider']
)
test_re_encrypt_transcript_credentials.py 文件源码
项目:edx-video-pipeline
作者: edx
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_reencrypt_transcript_credentials_invalid_keys(self, mock_logger):
"""
Test transcript credentials would not be re-encrypted if an decryption key is not provided with which
data was encypted before.
"""
# Verify fernet keys.
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
# Verify we are able to access the record.
self.verify_access_credentials()
# Modify key set so that old key is not presnet in the key list. Note that now we are not providing
# a decryption key for data to be decrypted.
new_keys_set = ['new-fernet-key']
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
# Run re-encryption process.
call_command('re_encrypt_transcript_credentials')
# Verify logging.
mock_logger.info.assert_called_with('[Transcript credentials re-encryption] Process started.')
mock_logger.exception.assert_called_with(
'[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
)
# Verify we are not able to access the record, we should get an error due to decryption key not present.
with self.assertRaises(InvalidToken):
self.verify_access_credentials()
def _decrypt(self, token):
from cryptography.fernet import InvalidToken
try:
return SecureToken.decrypt(token)
except InvalidToken:
logger.warning('Invalid secure token: %s', token)
return ''
def decrypt(cmd, message, args):
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
text = False
if key:
if args:
if args[-1] == ':t':
text = True
crypt_text = ''.join(args[:-1]).encode('utf-8')
else:
crypt_text = ''.join(args).encode('utf-8')
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.decrypt(crypt_text).decode('utf-8')
except InvalidToken:
ciphered = None
except InvalidSignature:
ciphered = None
if ciphered:
if text:
response = ciphered
else:
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? Token Decrypted', value=ciphered)
else:
response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
else:
response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
if text:
await message.channel.send(response)
else:
await message.channel.send(embed=response)
def encrypt(cmd, message, args):
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
text = False
if key:
if args:
if args[-1] == ':t':
text = True
crypt_text = ' '.join(args[:-1]).encode('utf-8')
else:
crypt_text = ' '.join(args).encode('utf-8')
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.encrypt(crypt_text).decode('utf-8')
except InvalidToken:
ciphered = None
except InvalidSignature:
ciphered = None
if ciphered:
if text:
response = ciphered
else:
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? Text Encrypted', value=ciphered)
else:
response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
else:
response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
if text:
await message.channel.send(response)
else:
await message.channel.send(embed=response)
def _assert_valid_stash(self):
if not self._storage.is_initialized:
raise GhostError(
'Stash not initialized. Please initialize it and try again')
else:
try:
key = self._storage.get('stored_passphrase')
if key:
self._decrypt(key['value'])
except InvalidToken:
raise GhostError(
'The passphrase provided is invalid for this stash. '
'Please provide the correct passphrase')
def decrypt(cmd, message, args):
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
text = False
if key:
if args:
if args[-1] == ':t':
text = True
crypt_text = ''.join(args[:-1]).encode('utf-8')
else:
crypt_text = ''.join(args).encode('utf-8')
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.decrypt(crypt_text).decode('utf-8')
except InvalidToken:
ciphered = None
except InvalidSignature:
ciphered = None
if ciphered:
if text:
response = ciphered
else:
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? Token Decrypted', value=ciphered)
else:
response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
else:
response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
if text:
await message.channel.send(response)
else:
await message.channel.send(embed=response)
def encrypt(cmd, message, args):
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
text = False
if key:
if args:
if args[-1] == ':t':
text = True
crypt_text = ' '.join(args[:-1]).encode('utf-8')
else:
crypt_text = ' '.join(args).encode('utf-8')
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.encrypt(crypt_text).decode('utf-8')
except InvalidToken:
ciphered = None
except InvalidSignature:
ciphered = None
if ciphered:
if text:
response = ciphered
else:
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? Text Encrypted', value=ciphered)
else:
response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
else:
response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
if text:
await message.channel.send(response)
else:
await message.channel.send(embed=response)
def read(self):
"""
Returns the cookie if valid and exists, None otherwise.
"""
if self.exists():
with open(config.cdms_cookie_path, 'rb') as f:
try:
ciphertext = self.crypto.decrypt(f.read())
return pickle.loads(ciphertext)
except (InvalidToken, TypeError):
self.reset()
return None
def test__messages_from_the_past_exceeding_ttl_rejected(self):
self.write_secret()
testdata = factory.make_bytes()
now = time.time()
self.patch(time, "time").side_effect = [now - 2, now]
token = fernet_encrypt_psk(testdata)
with ExpectedException(InvalidToken):
fernet_decrypt_psk(token, ttl=1)
def test__messages_from_future_exceeding_clock_skew_limit_rejected(self):
self.write_secret()
testdata = factory.make_bytes()
now = time.time()
self.patch(time, "time").side_effect = [now + 61, now]
token = fernet_encrypt_psk(testdata)
with ExpectedException(InvalidToken):
fernet_decrypt_psk(token, ttl=1)
def crypt_pass(cipher_suite, password):
try:
cipher_text = cipher_suite.encrypt(password)
print cipher_text
except InvalidToken as e:
print_error("ERROR: InvalidToken")
exit(99)
def decrypt_pass(cipher_suite, password):
try:
cipher_text = cipher_suite.decrypt(password)
print cipher_text
except InvalidToken as e:
print_error("ERROR: InvalidToken")
exit(99)
def test_read_enc_wrong_key_raises_InvalidToken(self):
'ValueError: No JSON object could be decoded'
args = [TEST_KEYSTRING_WRONG]
kwargs = { 'filepath': TEST_JSON_OUTFILE }
self.assertRaises(InvalidToken, SecureJson.from_key, *args, **kwargs)
def test_bad_key_raises_InvalidToken(self):
try:
ck = CryptKeeper(TEST_BAD_KEY)
except InvalidToken:
assert True
def test_wrong_key_raises_InvalidToken(self):
enctxt = encrypt_string(TEST_KEYSTRING, 'test string')
self.failUnlessRaises(InvalidToken, self.string_ck_wrong.decrypt, enctxt)
def test_wrong_ck_raises_InvalidToken(self):
scfg = SecureConfigParser(ck=self.ck_wrong)
scfg.read(TEST_INI_OUTFILE)
self.assertRaises(InvalidToken, scfg.get(testd['section'], testd['enc']['key']))
def decrypt(password):
encrypted_passwords = config().get('Security', 'encrypted_credentials') == 'y'
if not encrypted_passwords:
return password
try:
f = Fernet(_get_key())
return f.decrypt(password)
except InvalidToken:
print("Invalid master password")
sys.exit(-1)
def dokidoki(cmd, message, args):
char = None
glitch = False
if args:
if args[0][0].lower() in files:
char = args[0][0].lower()
if args[-1].startswith(':g'):
glitch = True
if not char:
char = secrets.choice(list(files))
char_file = files[char]
with open(f'doki/{char_file}.luci', 'rb') as quote_file:
quotes = quote_file.read()
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
if key:
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.decrypt(quotes).decode('utf-8')
except InvalidToken:
ciphered = None
if ciphered:
if not glitch:
glitch = secrets.randbelow(6)
glitch = not bool(glitch)
if glitch:
line_count = 1
thumbnail = chars_glitch[char]
else:
line_count = 3
thumbnail = secrets.choice(chars[char])
lines = []
for x in range(0, line_count):
output = markovify.Text(ciphered).make_short_sentence(500, tries=100)
output = clean(output, message.author)
if glitch:
output = cipher.encrypt(output.encode('utf-8')).decode('utf-8')
lines.append(output)
output_final = ' '.join(lines)
if glitch:
title = titles_glitch[char]
else:
title = titles[char]
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? {title}', value=output_final)
response.set_thumbnail(url=thumbnail)
else:
response = discord.Embed(color=0xe75a70, title='?? Sorry but that key is incorrect!')
else:
response = discord.Embed(color=0xe75a70, title='?? You are missing the key to my heart!')
await message.channel.send(embed=response)
def dokidoki(cmd, message, args):
char_letters = ['m', 'n', 'y', 's']
char = None
glitch = False
if args:
if args[0][0].lower() in char_letters:
char = args[0][0].lower()
if args[-1].startswith(':g'):
glitch = True
if not char:
char = secrets.choice(char_letters)
char_file = files[char]
with open(f'doki/{char_file}.luci', 'rb') as quote_file:
quotes = quote_file.read()
key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
if key:
key = key.encode('utf-8')
cipher = Fernet(key)
try:
ciphered = cipher.decrypt(quotes).decode('utf-8')
except InvalidToken:
ciphered = None
if ciphered:
if not glitch:
glitch = secrets.randbelow(6)
glitch = not bool(glitch)
if glitch:
line_count = 1
thumbnail = chars_glitch[char]
else:
line_count = 3
thumbnail = secrets.choice(chars[char])
lines = []
for x in range(0, line_count):
output = markovify.Text(ciphered).make_short_sentence(500, tries=100)
output = clean(output, message.author)
if glitch:
output = cipher.encrypt(output.encode('utf-8')).decode('utf-8')
lines.append(output)
output_final = ' '.join(lines)
if glitch:
title = titles_glitch[char]
else:
title = titles[char]
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'?? {title}', value=output_final)
response.set_thumbnail(url=thumbnail)
else:
response = discord.Embed(color=0xe75a70, title='?? Sorry but that key is incorrect!')
else:
response = discord.Embed(color=0xe75a70, title='?? You are missing the key to my heart!')
await message.channel.send(embed=response)
def __manage_credentials(self, hostname, username, password,
remove_entry=False):
"""
This functions adds or removes credentials to/from the authentication
container.
Adding credentials requires a hostname, username and corresponding
password. Removing credentials only requires a hostname.
There are two alias functions for credentials management:
add_credentials() and remove_credentials()
:param hostname: hostname
:type hostname: str
:param username: username
:type username: str
:param password: corresponding password
:type password: str
:param remove_entry: setting True will remove an entry
:type remove_entry: bool
"""
global CREDENTIALS
hostname = self.cut_hostname(hostname)
try:
if remove_entry:
#remove entry
del self.CREDENTIALS[hostname]
else:
#add entry
self.CREDENTIALS[hostname] = {}
self.CREDENTIALS[hostname]["username"] = username
#add encrypted or plain password
if self.KEY:
crypto = Fernet(self.KEY)
self.CREDENTIALS[hostname]["password"] = "s/{0}".format(
crypto.encrypt(password.encode()))
else:
self.CREDENTIALS[hostname]["password"] = password
except InvalidToken:
raise ContainerException("Invalid password specified!")
except KeyError:
pass
#aliases
def read_beacon_payload(beacon_bytes):
"""Returns a BeaconPayload namedtuple representing the given beacon bytes.
Decrypts the inner beacon data if necessary.
:param beacon_bytes: beacon payload (bytes).
:return: BeaconPayload namedtuple
"""
if len(beacon_bytes) < BEACON_HEADER_LENGTH_V1:
raise InvalidBeaconingPacket(
"Beaconing packet must be at least %d bytes." % (
BEACON_HEADER_LENGTH_V1))
header = beacon_bytes[:BEACON_HEADER_LENGTH_V1]
version, beacon_type_code, expected_payload_length = struct.unpack(
BEACON_HEADER_FORMAT_V1, header)
actual_payload_length = len(beacon_bytes) - BEACON_HEADER_LENGTH_V1
if len(beacon_bytes) - BEACON_HEADER_LENGTH_V1 < expected_payload_length:
raise InvalidBeaconingPacket(
"Invalid payload length: expected %d bytes, got %d bytes." % (
expected_payload_length, actual_payload_length))
payload_start = BEACON_HEADER_LENGTH_V1
payload_end = BEACON_HEADER_LENGTH_V1 + expected_payload_length
payload_bytes = beacon_bytes[payload_start:payload_end]
payload = None
if version == 1:
if len(payload_bytes) == 0:
# No encrypted inner payload; nothing to do.
pass
else:
try:
decrypted_data = fernet_decrypt_psk(
payload_bytes, ttl=60, raw=True)
except InvalidToken:
raise InvalidBeaconingPacket(
"Failed to decrypt inner payload: check MAAS secret key.")
try:
decompressed_data = decompress(decrypted_data)
except OSError:
raise InvalidBeaconingPacket(
"Failed to decompress inner payload: %r" % decrypted_data)
try:
# Replace the data in the dictionary with its decrypted form.
payload = BSON.decode(decompressed_data)
except BSONError:
raise InvalidBeaconingPacket(
"Inner beacon payload is not BSON: %r" % decompressed_data)
else:
raise InvalidBeaconingPacket(
"Unknown beacon version: %d" % version)
beacon_type_code = payload["type"] if payload else beacon_type_code
return BeaconPayload(
beacon_bytes, version, BEACON_TYPE_VALUES[beacon_type_code], payload)