python类urandom()的实例源码

main0.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def new_session(account):
    if account.get('session',None) is None:
        session = requests.session()

        session.verify = True
        session.headers.update({'User-Agent': 'Niantic App'})  # session.headers.update({'User-Agent': 'niantic'})
        if not account['proxy'] is None:
            session.proxies.update(account['proxy'])
        account['session'] = session
    else:
        account['session'].close()
        account['session'].cookies.clear()

    account['session_time'] = get_time()
    account['session_hash'] = os.urandom(32)

    account['api_url'] = API_URL
    account['auth_ticket'] = None
test_copy.py 文件源码 项目:transfert 作者: rbernand 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_copy(tmpdir, storages):
    f = tmpdir.join('alpha')
    f.write_binary(os.urandom(1024 * 40))
    f_http = Resource(storages['http']('index.html'))
    f_file = Resource(storages['file'](f.strpath))
    f_sftp = Resource(storages['sftp']('/gamma'))
    f_ftp = Resource(storages['ftp']('/beta'))
    assert f_http.exists()
    delete_files(f_file, f_sftp, f_ftp)
    assert not f_file.exists()\
        and not f_sftp.exists()\
        and not f_ftp.exists()
    transfert.actions.copy(f_http, f_ftp, size=40960)
    assert f_ftp.exists() and f_http.exists()
    transfert.actions.copy(f_ftp, f_sftp, size=40960)
    assert f_ftp.exists() and f_sftp.exists()
    transfert.actions.copy(f_sftp, f_file, size=40960)
    assert f_sftp.exists() and f_file.exists()
rsa.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def encrypt(self, data, offset=None, length=None):
        """Encrypts the given data with the current key"""
        if offset is None:
            offset = 0
        if length is None:
            length = len(data)

        with BinaryWriter() as writer:
            # Write SHA
            writer.write(sha1(data[offset:offset + length]).digest())
            # Write data
            writer.write(data[offset:offset + length])
            # Add padding if required
            if length < 235:
                writer.write(os.urandom(235 - length))

            result = int.from_bytes(writer.get_bytes(), byteorder='big')
            result = pow(result, self.e, self.m)

            # If the result byte count is less than 256, since the byte order is big,
            # the non-used bytes on the left will be 0 and act as padding,
            # without need of any additional checks
            return int.to_bytes(
                result, length=256, byteorder='big', signed=False)
random.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def seed(self, a=None):
        """Initialize internal state from hashable object.

        None or no argument seeds from current time or from an operating
        system specific randomness source if available.

        If a is not None or an int or long, hash(a) is used instead.
        """

        if a is None:
            try:
                a = long(_hexlify(_urandom(16)), 16)
            except NotImplementedError:
                import time
                a = long(time.time() * 256) # use fractional seconds

        super(Random, self).seed(a)
        self.gauss_next = None
uuid.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def uuid4():
    """Generate a random UUID."""

    # When the system provides a version-4 UUID generator, use it.
    if _uuid_generate_random:
        _buffer = ctypes.create_string_buffer(16)
        _uuid_generate_random(_buffer)
        return UUID(bytes=_buffer.raw)

    # Otherwise, get randomness from urandom or the 'random' module.
    try:
        import os
        return UUID(bytes=os.urandom(16), version=4)
    except:
        import random
        bytes = [chr(random.randrange(256)) for i in range(16)]
        return UUID(bytes=bytes, version=4)
local.py 文件源码 项目:py-secretcrypt 作者: Zemanta 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _key():
    global __key
    if __key:
        return __key

    data_dir = _key_dir()
    key_file = os.path.join(data_dir, 'key')

    if os.path.isfile(key_file):
        with open(key_file, 'rb') as f:
            __key = base64.b64decode(f.read())
            return __key

    __key = base64.b64encode(os.urandom(16))
    try:
        os.makedirs(data_dir)
    except OSError as e:
        # errno17 == dir exists
        if e.errno != 17:
            raise
    with open(key_file, 'wb') as f:
        f.write(__key)
    return __key
directstream.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
FileUtil.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def create_file(path, size):
    """Create a file with random contents and a given size"""

    print('Creating file: {} having a size of {} Bytes'.format(path, size))

    t_start = time.time()

    with open(path, mode='wb') as fp:

        # Write integer number of blocks
        data_left = size
        for _ in range(size // DATA_BLOCK):
            fp.write(os.urandom(DATA_BLOCK))
            data_left -= DATA_BLOCK

        # Write the remainder (if any)
        if data_left > 0:
            fp.write(os.urandom(data_left))

    print('File created in {} sec.'.format(time.time()-t_start))
checkresolvers.py 文件源码 项目:dnsbrute 作者: XiphosResearch 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_for_wildcards(args, server, name, rectype, tries=4):
    """
    Verify that the DNS server doesn't return wildcard results for domains
    which don't exist, it should correctly return NXDOMAIN.
    """
    resolver = Resolver()
    resolver.timeout = args.timeout
    resolver.lifetime = args.timeout
    resolver.nameservers = [server]
    nx_names = [base64.b32encode(
                    os.urandom(
                        random.randint(8, 10))
                ).strip('=').lower() + name
                for _ in range(0, tries)]
    correct_result_count = 0
    for check_nx_name in nx_names:
        try:
            result = resolver.query(check_nx_name, rectype)            
            return False  # Any valid response = immediate fail!
        except (NXDOMAIN, NoNameservers):
            correct_result_count += 1
        except DNSException:
            continue        
    return correct_result_count > (tries / 2.0)
rpc_api.py 文件源码 项目:pogom-linux 作者: PokeHunterProject 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, auth_provider, device_info=None):

        self.log = logging.getLogger(__name__)

        self._auth_provider = auth_provider

        # mystical unknown6 - resolved by PokemonGoDev
        self._signal_agglom_gen = False
        self._signature_lib = None

        if RpcApi.START_TIME == 0:
            RpcApi.START_TIME = get_time(ms=True)

        if RpcApi.RPC_ID == 0:
            RpcApi.RPC_ID = int(random.random() * 10 ** 18)
            self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID)

        # data fields for unknown6
        self.session_hash = os.urandom(32)
        self.token2 = random.randint(1,59)
        self.course = random.uniform(0, 360)

        self.device_info = device_info
randnum.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 87 收藏 0 点赞 0 评论 0
def read_random_bits(nbits):
    '''Reads 'nbits' random bits.

    If nbits isn't a whole number of bytes, an extra byte will be appended with
    only the lower bits set.
    '''

    nbytes, rbits = divmod(nbits, 8)

    # Get the random bytes
    randomdata = os.urandom(nbytes)

    # Add the remaining random bits
    if rbits > 0:
        randomvalue = ord(os.urandom(1))
        randomvalue >>= (8 - rbits)
        randomdata = byte(randomvalue) + randomdata

    return randomdata
views.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _make_flow(request, scopes, return_url=None):
    """Creates a Web Server Flow"""
    # Generate a CSRF token to prevent malicious requests.
    csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest()

    request.session[_CSRF_KEY] = csrf_token

    state = json.dumps({
        'csrf_token': csrf_token,
        'return_url': return_url,
    })

    flow = client.OAuth2WebServerFlow(
        client_id=django_util.oauth2_settings.client_id,
        client_secret=django_util.oauth2_settings.client_secret,
        scope=scopes,
        state=state,
        redirect_uri=request.build_absolute_uri(
            urlresolvers.reverse("google_oauth:callback")))

    flow_key = _FLOW_KEY.format(csrf_token)
    request.session[flow_key] = pickle.dumps(flow)
    return flow
anonymine_engine.py 文件源码 项目:anonymine 作者: oskar-skog 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init_field(self, startpoint):
        '''Place the mines and reveal the starting point.

        Internal details:
            It will wrap in `init_field2` in guessless mode.
            It will place the mines by itself when not in guessless
            mode.
        '''
        if self.guessless:
            # Wrap in the best version.
            self.init_field2(startpoint)
        else:
            safe = self.field.get_neighbours(startpoint) + [startpoint]
            cells = list(filter(
                lambda x: x not in safe,
                self.field.all_cells()
            ))
            # Choose self.n_mines randomly selected mines.
            cells.sort(key=lambda x: os.urandom(1))
            mines = cells[:self.n_mines]
            self.field.clear()
            self.field.fill(mines)
            self.field.reveal(startpoint)
lcdb.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def newcursor(self, dictcursor=False):
        '''
        This creates a DB cursor for the current DB connection using a
        randomly generated handle. Returns a tuple with cursor and handle.

        dictcursor = True -> use a cursor where each returned row can be
                             addressed as a dictionary by column name

        '''

        handle = hashlib.sha256(os.urandom(12)).hexdigest()

        if dictcursor:
            self.cursors[handle] = self.connection.cursor(
                cursor_factory=psycopg2.extras.DictCursor
                )
        else:
            self.cursors[handle] = self.connection.cursor()

            return (self.cursors[handle], handle)
populate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def cluster_config(self):
        """
        Provide the default configuration for a cluster
        """
        if self.cluster:
            cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
            if not os.path.isdir(cluster_dir):
                _create_dirs(cluster_dir, self.root_dir)
            filename = "{}/cluster.yml".format(cluster_dir)
            contents = {}
            contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32)))

            public_networks_str = ", ".join([str(n) for n in self.public_networks])
            cluster_networks_str = ", ".join([str(n) for n in self.cluster_networks])

            contents['public_network'] = public_networks_str
            contents['cluster_network'] = cluster_networks_str
            contents['available_roles'] = self.available_roles

            self.writer.write(filename, contents)
test.py 文件源码 项目:storjspec 作者: StorjRND 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_send_receive(self):
        random.shuffle(self.swarm)
        senders = self.swarm[:len(self.swarm)/2]
        receivers = self.swarm[len(self.swarm)/2:]
        for sender, receiver in zip(senders, receivers):
            message = binascii.hexlify(os.urandom(64))

            # check queue previously empty
            self.assertFalse(bool(receiver.message_list()))

            # send message
            self.assertTrue(sender.message_send(receiver.dht_id(), message))

            # check received
            received = receiver.message_list()
            self.assertTrue(sender.dht_id() in received)
            messages = received[sender.dht_id()]
            self.assertTrue(len(messages) == 1)
            self.assertEqual(messages[0], message)

            # check queue empty after call to message_list
            self.assertFalse(bool(receiver.message_list()))
test.py 文件源码 项目:storjspec 作者: StorjRND 项目源码 文件源码 阅读 74 收藏 0 点赞 0 评论 0
def test_ordering(self):
        random.shuffle(self.swarm)
        sender = self.swarm[0]
        receiver = self.swarm[-1]

        # send messages
        message_alpha = binascii.hexlify(os.urandom(64))
        message_beta = binascii.hexlify(os.urandom(64))
        message_gamma = binascii.hexlify(os.urandom(64))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_alpha))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_beta))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_gamma))

        # check received in order
        received = receiver.message_list()
        self.assertTrue(sender.dht_id() in received)
        messages = received[sender.dht_id()]
        self.assertEqual(messages[0], message_alpha)
        self.assertEqual(messages[1], message_beta)
        self.assertEqual(messages[2], message_gamma)
test.py 文件源码 项目:storjspec 作者: StorjRND 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_flood(self):

        # every node subscribes and should receive  the event
        topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32)))
        for peer in self.swarm:
            peer.pubsub_subscribe(topic)

        # wait until subscriptions propagate
        time.sleep(SLEEP_TIME)

        # send event
        peer = random.choice(self.swarm)
        event = binascii.hexlify(os.urandom(32))
        peer.pubsub_publish(topic, event)

        # wait until event propagates
        time.sleep(SLEEP_TIME)

        # check all peers received the event
        for peer in self.swarm:
            events = peer.pubsub_events(topic)
            self.assertEqual(events, [event])
test.py 文件源码 项目:storjspec 作者: StorjRND 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_multihop(self):
        random.shuffle(self.swarm)
        senders = self.swarm[:len(self.swarm) / 2]
        receivers = self.swarm[len(self.swarm) / 2:]
        for sender, receiver in zip(senders, receivers):

            # receiver subscribes to topic
            topic = "test_miltihop_{0}".format(binascii.hexlify(os.urandom(32)))
            receiver.pubsub_subscribe(topic)

            # wait until subscriptions propagate
            time.sleep(SLEEP_TIME)

            # send event
            event = binascii.hexlify(os.urandom(32))
            sender.pubsub_publish(topic, event)

            # wait until event propagates
            time.sleep(SLEEP_TIME)

            # check all peers received the event
            events = receiver.pubsub_events(topic)
            self.assertEqual(events, [event])
auth_chain.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def server_udp_pre_encrypt(self, buf, uid):
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        authdata = os.urandom(7)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        rand_len = self.udp_rnd_data_len(md5data, self.random_server)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata
        return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
auth.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 70 收藏 0 点赞 0 评论 0
def __init__(self, method, hashfunc):
        super(auth_aes128_sha1, self).__init__(method)
        self.hashfunc = hashfunc
        self.recv_buf = b''
        self.unit_len = 8100
        self.raw_trans = False
        self.has_sent_header = False
        self.has_recv_header = False
        self.client_id = 0
        self.connection_id = 0
        self.max_time_dif = 60 * 60 * 24 # time dif (second) setting
        self.salt = hashfunc == hashlib.md5 and b"auth_aes128_md5" or b"auth_aes128_sha1"
        self.no_compatible_method = hashfunc == hashlib.md5 and "auth_aes128_md5" or 'auth_aes128_sha1'
        self.extra_wait_size = struct.unpack('>H', os.urandom(2))[0] % 1024
        self.pack_id = 1
        self.recv_id = 1
        self.user_id = None
        self.user_key = None
        self.last_rnd_len = 0
        self.overhead = 9
protocol.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def writeConfiguredCookieFile(self, cookie_string = None):
        '''
        Write a random 32-byte value to the configured cookie file.
        If cookie_string is not None, use that value.
        Return the value written to the file, or None if there is no cookie
        file, or if writing the file fails.
        '''
        cookie_file = self.getConfiguredCookieFile()
        if cookie_file is not None:
            if cookie_string is None:
                cookie_string = urandom(TorControlProtocol.SAFECOOKIE_LENGTH)
            try:
                with open(cookie_file, 'w') as f:
                    f.write(cookie_string)
            except IOError as e:
                logging.warning("Disabling SAFECOOKIE authentication, writing cookie file '{}' failed with error: {}"
                                .format(cookie_file, e))
                return None
            # sanity check: this will fail in write-only environments
            assert cookie_string == TorControlProtocol.readCookieFile(
                cookie_file)
            return cookie_string
        else:
            return None
test_cache_system.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_io_and_many_files(self):
        import os
        from time import time

        start = time()
        for i in range(10000):  # ?????????????
            obj = os.urandom(16 * 1024)
            self.cache.put_obj(i, obj, info_dict={"bin": obj})
        for i in range(10000):
            info = self.cache.get_info(i)
            obj = self.cache.get_obj(i)
            self.assertEqual(info['bin'], obj)
        print("test_io_and_many_files IO total time:", time() - start)

        # test clean delete
        all_cache_file_path = [v[0] for v in self.cache.items_dict.values()]
        start = time()
        del self.cache
        print("test_io_and_many_files DELETE ALL total time:", time() - start)
        for path in all_cache_file_path:
            self.assertFalse(os.path.exists(path), msg=path)
test_encoding.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_base64io_encode_file(tmpdir):
    source_plaintext = os.urandom(1024 * 1024)
    plaintext_b64 = base64.b64encode(source_plaintext)
    plaintext = tmpdir.join('plaintext')
    b64_plaintext = tmpdir.join('base64_plaintext')

    with open(str(plaintext), 'wb') as file:
        file.write(source_plaintext)

    with open(str(plaintext), 'rb') as source, open(str(b64_plaintext), 'wb') as target:
        with Base64IO(target) as encoder:
            for chunk in source:
                encoder.write(chunk)

    with open(str(b64_plaintext), 'rb') as file2:
        encoded = file2.read()

    assert encoded == plaintext_b64
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_encrypt_with_metadata_output_write_to_file(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')
    metadata = tmpdir.join('metadata')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=str(plaintext),
        target=str(ciphertext),
        metadata='--metadata-output ' + str(metadata)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    raw_metadata = metadata.read()
    output_metadata = json.loads(raw_metadata)
    for key, value in (('a', 'b'), ('c', 'd')):
        assert output_metadata['header']['encryption_context'][key] == value
    assert output_metadata['mode'] == 'encrypt'
    assert output_metadata['input'] == str(plaintext)
    assert output_metadata['output'] == str(ciphertext)
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_encrypt_with_metadata_full_file_path(tmpdir):
    plaintext_filename = 'source_plaintext'
    plaintext_file = tmpdir.join(plaintext_filename)
    plaintext_file.write_binary(os.urandom(1024))
    plaintext_file_full_path = str(plaintext_file)
    ciphertext_filename = 'ciphertext'
    ciphertext_file = tmpdir.join(ciphertext_filename)
    ciphertext_file_full_path = str(ciphertext_file)
    metadata = tmpdir.join('metadata')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=plaintext_filename,
        target=ciphertext_filename,
        metadata='--metadata-output ' + str(metadata)
    )

    with tmpdir.as_cwd():
        aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    raw_metadata = metadata.read()
    output_metadata = json.loads(raw_metadata)
    assert output_metadata['input'] == plaintext_file_full_path
    assert output_metadata['output'] == ciphertext_file_full_path
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_encrypt_with_metadata_output_write_to_stdout(tmpdir, capsys):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=str(plaintext),
        target=str(ciphertext),
        metadata='--metadata-output -'
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    out, _err = capsys.readouterr()
    output_metadata = json.loads(out)
    for key, value in (('a', 'b'), ('c', 'd')):
        assert output_metadata['header']['encryption_context'][key] == value
    assert output_metadata['mode'] == 'encrypt'
    assert output_metadata['input'] == str(plaintext)
    assert output_metadata['output'] == str(ciphertext)
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    ) + ' --encryption-context ' + required_encryption_context

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_file_to_file_decrypt_required_encryption_context_fail(tmpdir, required_encryption_context):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')
    metadata_file = tmpdir.join('metadata')
    decrypted = tmpdir.join('decrypted')

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template(metadata=True).format(
        source=str(ciphertext),
        target=str(decrypted),
        metadata=' --metadata-output ' + str(metadata_file)
    ) + ' --encryption-context ' + required_encryption_context

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert not decrypted.isfile()
    raw_metadata = metadata_file.read()
    parsed_metadata = json.loads(raw_metadata)
    assert parsed_metadata['skipped']
    assert parsed_metadata['reason'] == 'Missing encryption context key or value'
test_i_aws_encryption_sdk_cli.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_file_to_file_cycle(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))


问题


面经


文章

微信
公众号

扫码关注公众号