def test_file_to_file_cycle_target_through_symlink(tmpdir):
plaintext = tmpdir.join('source_plaintext')
output_dir = tmpdir.mkdir('output')
os.symlink(str(output_dir), str(tmpdir.join('output_link')))
ciphertext = tmpdir.join('output_link', 'ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), 'wb') as f:
f.write(os.urandom(1024))
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(ciphertext)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext),
target=str(decrypted)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))
assert filecmp.cmp(str(plaintext), str(decrypted))
python类urandom()的实例源码
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_file_overwrite_source_file_to_file_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), 'wb') as f:
f.write(plaintext_source)
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(plaintext)
) + ' --suffix'
test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert test_result == 'Destination and source cannot be the same'
with open(str(plaintext), 'rb') as f:
assert f.read() == plaintext_source
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_file_overwrite_source_dir_to_dir_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), 'wb') as f:
f.write(plaintext_source)
encrypt_args = encrypt_args_template().format(
source=str(tmpdir),
target=str(tmpdir)
) + ' --suffix'
test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert test_result == 'Destination and source cannot be the same'
with open(str(plaintext), 'rb') as f:
assert f.read() == plaintext_source
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_file_overwrite_source_file_to_dir_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), 'wb') as f:
f.write(plaintext_source)
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(tmpdir)
) + ' --suffix'
test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert test_result is None
with open(str(plaintext), 'rb') as f:
assert f.read() == plaintext_source
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_file_to_dir_cycle(tmpdir):
inner_dir = tmpdir.mkdir('inner')
plaintext = tmpdir.join('source_plaintext')
ciphertext = inner_dir.join('source_plaintext.encrypted')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), 'wb') as f:
f.write(os.urandom(1024))
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(inner_dir)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext),
target=str(decrypted)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert os.path.isfile(str(ciphertext))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))
assert filecmp.cmp(str(plaintext), str(decrypted))
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_stdin_to_file_to_stdout_cycle(tmpdir):
ciphertext_file = tmpdir.join('ciphertext')
plaintext = os.urandom(1024)
encrypt_args = 'aws-encryption-cli ' + encrypt_args_template(decode=True).format(
source='-',
target=str(ciphertext_file)
)
decrypt_args = 'aws-encryption-cli ' + decrypt_args_template(encode=True).format(
source=str(ciphertext_file),
target='-'
)
proc = Popen(shlex.split(encrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE)
_stdout, _stderr = proc.communicate(input=base64.b64encode(plaintext))
proc = Popen(shlex.split(decrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE)
decrypted_stdout, _stderr = proc.communicate()
assert base64.b64decode(decrypted_stdout) == plaintext
def encryptPassword(plaintext):
try:
letters = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890????????????????????????????????????????????????????????????????'
SALT_SIZE = 8
salt = urandom(SALT_SIZE)
randomText = ''.join([random.choice(letters) for i in range(8)])
plaintext = "%3d" % len(plaintext) + plaintext + randomText
plaintext = plaintext.encode('utf-8')
arc4 = ARC4.new(salt)
crypted = arc4.encrypt(plaintext)
res = salt + crypted
# print('salt=',salt)
# print('cr=',crypted)
return b64encode(res).decode('ascii')
except:
return ""
def _maybe_seed(self):
if not self.seeded:
try:
seed = os.urandom(16)
except:
try:
r = open('/dev/urandom', 'rb', 0)
try:
seed = r.read(16)
finally:
r.close()
except:
seed = str(time.time())
self.seeded = True
seed = bytearray(seed)
self.stir(seed, True)
def seed(self, a=None):
"""Initialize internal state from hashable object.
None or no argument seeds from current time or from an operating
system specific randomness source if available.
If a is not None or an int or long, hash(a) is used instead.
"""
if a is None:
try:
# Seed with enough bytes to span the 19937 bit
# state space for the Mersenne Twister
a = long(_hexlify(_urandom(2500)), 16)
except NotImplementedError:
import time
a = long(time.time() * 256) # use fractional seconds
super(Random, self).seed(a)
self.gauss_next = None
def touch(self, path, size=None, random=False, perm=None, time=None):
"""Simplify the dynamic creation of files or the updating of their
modified time. If a size is specified, then a file of that size
will be created on the disk. If the file already exists, then the
size= attribute is ignored (for safey reasons).
if random is set to true, then the file created is actually
created using tons of randomly generated content. This is MUCH
slower but nessisary for certain tests.
"""
path = abspath(path)
if not isdir(dirname(path)):
mkdir(dirname(path), 0700)
if not exists(path):
size = strsize_to_bytes(size)
if not random:
f = open(path, "wb")
if isinstance(size, int) and size > 0:
f.seek(size-1)
f.write("\0")
f.close()
else: # fill our file with randomly generaed content
with open(path, 'wb') as f:
# Fill our file with garbage
f.write(urandom(size))
# Update our path
utime(path, time)
if perm is not None:
# Adjust permissions
chmod(path, perm)
# Return True
return True
def read_segments(segments, metadata):
for segment in segments:
yield manifest_structure.Segment(
metadata.backupset_id,
metadata.incremental,
segment,
0,
0,
os.urandom(20)
)
def initialize(self):
with open(self.manifest_file, 'wb', 4096) as f:
f.write(os.urandom(20))
def dsa_test():
import os
msg = str(random.randint(q,q+q)).encode('utf-8')
sk = os.urandom(32)
pk = publickey(sk)
sig = signature(msg, sk, pk)
return checkvalid(sig, msg, pk)
def crypto_sign_keypair(seed=None):
"""Return (verifying, secret) key from a given seed, or os.urandom(32)"""
if seed is None:
seed = os.urandom(PUBLICKEYBYTES)
else:
warnings.warn("ed25519ll should choose random seed.",
RuntimeWarning)
if len(seed) != 32:
raise ValueError("seed must be 32 random bytes or None.")
skbytes = seed
vkbytes = djbec.publickey(skbytes)
return Keypair(vkbytes, skbytes+vkbytes)
def random(self):
"""Get the next random number in the range [0.0, 1.0)."""
return (int.from_bytes(_urandom(7), 'big') >> 3) * RECIP_BPF
def getrandbits(self, k):
"""getrandbits(k) -> x. Generates an int with k random bits."""
if k <= 0:
raise ValueError('number of bits must be greater than zero')
if k != int(k):
raise TypeError('number of bits should be an integer')
numbytes = (k + 7) // 8 # bits / 8 and rounded up
x = int.from_bytes(_urandom(numbytes), 'big')
return x >> (numbytes * 8 - k) # trim excess bits
def generate_signature(signature_plain, signature_lib):
iv = os.urandom(32)
output_size = ctypes.c_size_t()
signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, None, ctypes.byref(output_size))
output = (ctypes.c_ubyte * output_size.value)()
signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, ctypes.byref(output), ctypes.byref(output_size))
signature = b''.join(list(map(lambda x: six.int2byte(x), output)))
return signature
def start(self):
self.logger.info("starting web server listening at https port {0}".format(self.httpsPort))
dir = os.path.dirname(os.path.realpath(__file__))
handlersArgs = dict(iotManager=self.iotManager)
application = [
(r'/(favicon.ico)', tornado.web.StaticFileHandler, {'path': dir + '/img'}),
(r'/static/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/static'}),
(r'/upload/(.*)', handlers.AuthFileHandler, {'path': self.uploadDir}),
(r'/img/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/img'}),
(r'/login', handlers.LoginWebHandler, dict(adminPasswordHash=self.adminPasswordHash)),
(r'/logout', handlers.LogoutWebHandler),
(r'/ws', handlers.WSHandler, handlersArgs),
(r'/device/(.*)', handlers.DeviceWebHandler, handlersArgs),
(r'/rss', handlers.RssWebHandler, handlersArgs),
(r'/history', handlers.HistoryWebHandler, handlersArgs),
(r'/devices', handlers.DevicesWebHandler, handlersArgs),
(r'/video', handlers.VideoWebHandler, dict(localVideoPort=self.localVideoPort)),
(r'/', handlers.HomeWebHandler, handlersArgs),
]
self.logger.info("starting web server listening at http {0} (plain)".format(self.httpPort))
self.httpsApp = tornado.web.Application(application, cookie_secret=os.urandom(32), compiled_template_cache=True)
sslOptions={ "certfile": self.httpsCertFile, "keyfile": self.httpsKeyFile, "ssl_version": ssl.PROTOCOL_TLSv1 }
if self.httpsChainFile:
sslOptions["certfile"] = self.httpsChainFile
self.logger.info("Using certificate file at {0}".format(sslOptions["certfile"]))
self.httpsServer = tornado.httpserver.HTTPServer(self.httpsApp, ssl_options=sslOptions)
self.httpsServer.listen(self.httpsPort)
httpApplication = [
(r'/rss', handlers.RssWebHandler, handlersArgs),
(r'/', handlers.RedirectorHandler, dict(manager = self)),
(r'/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/plain' })
]
self.httpApp = tornado.web.Application(httpApplication)
self.httpServer = tornado.httpserver.HTTPServer(self.httpApp)
self.httpServer.listen(self.httpPort)
tornado.ioloop.IOLoop.current().start()
def _temporary_keychain():
"""
This function creates a temporary Mac keychain that we can use to work with
credentials. This keychain uses a one-time password and a temporary file to
store the data. We expect to have one keychain per socket. The returned
SecKeychainRef must be freed by the caller, including calling
SecKeychainDelete.
Returns a tuple of the SecKeychainRef and the path to the temporary
directory that contains it.
"""
# Unfortunately, SecKeychainCreate requires a path to a keychain. This
# means we cannot use mkstemp to use a generic temporary file. Instead,
# we're going to create a temporary directory and a filename to use there.
# This filename will be 8 random bytes expanded into base64. We also need
# some random bytes to password-protect the keychain we're creating, so we
# ask for 40 random bytes.
random_bytes = os.urandom(40)
filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8
tempdirectory = tempfile.mkdtemp()
keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')
# We now want to create the keychain itself.
keychain = Security.SecKeychainRef()
status = Security.SecKeychainCreate(
keychain_path,
len(password),
password,
False,
None,
ctypes.byref(keychain)
)
_assert_no_error(status)
# Having created the keychain, we want to pass it off to the caller.
return keychain, tempdirectory
def dsa_test():
import os
msg = str(random.randint(q, q + q)).encode('utf-8')
sk = os.urandom(32)
pk = publickey(sk)
sig = signature(msg, sk, pk)
return checkvalid(sig, msg, pk)