python类b64decode()的实例源码

auth.py 文件源码 项目:django-heartbeat 作者: pbs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def auth(func):
    @wraps(func)
    def _decorator(request, *args, **kwargs):
        auth = get_auth()
        if auth.get('disable', False) is True:
            return func(request, *args, **kwargs)
        if 'authorized_ips' in auth:
            ip = get_client_ip(request)
            if is_authorized(ip, auth['authorized_ips']):
                return func(request, *args, **kwargs)
        prepare_credentials(auth)
        if request.META.get('HTTP_AUTHORIZATION'):
            authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ')
            if authmeth.lower() == 'basic':
                auth = base64.b64decode(auth).decode('utf-8')
                username, password = auth.split(':')
                if (username == HEARTBEAT['auth']['username'] and
                        password == HEARTBEAT['auth']['password']):
                    return func(request, *args, **kwargs)

        response = HttpResponse(
            "Authentication failed", status=401)
        response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"'
        return response
    return _decorator
payload_code.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def get_encryption():
    return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES

abbrev = '{2}'
{0} = base64.b64decode('{1}')

def encrypt(raw):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )

def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)

################################################################################
#                       st_protocol.py stitch_gen variables                    #
################################################################################
cookie_tool.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def parse_cookie(cookie, securekey):
    logger.info (">> parse cookie : %s" % cookie)
    parts = cookie.split('.')
    part1 = parts[0]
    part2 = '' if len(parts) < 2 else parts[1]
    try:
        text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8')
    except:
        logger.info ("decode cookie failed")
        return None
    logger.info ("cookie content : %s" % text)
    thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest()
    logger.info ("hash from part1 : %s" % thatpart2)
    logger.info ("hash from part2 : %s" % part2)
    if part2 == thatpart2:
        result = json.loads(text)['name']
    else:
        result = None
    logger.info ("parse from cookie : %s" % result)
    return result
app.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handleDeviceCall(self, deviceId, payload):
        deviceIdHex = binascii.hexlify(deviceId)
        self.logger.debug("Handling request from device {0} with payload {1}".format(deviceIdHex, payload))          
        payloadDict = json.loads(payload)
        session = self.service.sessions[deviceId]        
        model = DeviceModel().loadFromSession(session, self.deviceConfig, payloadDict)
        if "image" in payloadDict:
            imageType = payloadDict.get("type", "jpg")
            imageData = base64.b64decode(payloadDict["image"])
            fn = "{0}.{1}".format(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), imageType)
            imageFile = os.path.join(self.getDeviceFolder(deviceIdHex, "images"), fn)
            self.logger.info("Received {0} bytes of image data from device {1} - saving to {2}".format(len(imageData), deviceIdHex, imageFile))
            with open(imageFile, 'wb+') as f:
                f.write(imageData)             
            model.newImageUrl = "/upload/{0}/images/{1}".format(deviceIdHex, fn)
        if "values" in payloadDict:
            for variable, value in payloadDict["values"].items():
                self.database.save(deviceIdHex, variable, session.protocol, session.clientAddr[0], session.lastUpdateTime, value)
        with session.lock:
            model.saveTrends(self.trends)
            model.computeTrends(self.trends)
        self.webServer.websocketSend(model.toJSON())
aws_auth.py 文件源码 项目:okta-awscli 作者: jmhale 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def choose_aws_role(assertion):
        """ Choose AWS role from SAML assertion """
        aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
        attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
        roles = []
        role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
        root = ET.fromstring(base64.b64decode(assertion))
        for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
            if saml2attribute.get('Name') == aws_attribute_role:
                for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
                    roles.append(role_tuple(*saml2attributevalue.text.split(',')))

        for index, role in enumerate(roles):
            role_name = role.role_arn.split('/')[1]
            print("%d: %s" % (index+1, role_name))
        role_choice = input('Please select the AWS role: ')-1
        return roles[role_choice]
test_cli.py 文件源码 项目:Travis-Encrypt 作者: mandeep 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_password_empty_file():
    """Test the encrypt module's CLI function with an empty YAML file."""
    runner = CliRunner()
    with runner.isolated_filesystem():
        initial_data = {'language': 'python'}
        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

            assert 'password' in config
            assert 'secure' in config['password']
            assert config['language'] == 'python'
            assert base64.b64decode(config['password']['secure'])
test_cli.py 文件源码 项目:Travis-Encrypt 作者: mandeep 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_password_nonempty_file():
    """Test the encrypt module's CLI function with a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('password', {'secure': 'SUPER_INSECURE_PASSWORD'})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['password']['secure'])
        assert ['language', 'dist', 'password'] == [key for key in config.keys()]
test_cli.py 文件源码 项目:Travis-Encrypt 作者: mandeep 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_deploy_empty_file():
    """Test the encrypt module's CLI function with the --deploy flag and an empty YAML file."""
    runner = CliRunner()
    with runner.isolated_filesystem():
        initial_data = {'language': 'python'}
        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert base64.b64decode(config['deploy']['password']['secure'])
test_cli.py 文件源码 项目:Travis-Encrypt 作者: mandeep 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_deploy_nonempty_file():
    """Test the encrypt module's CLI function with the --deploy flag and a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('deploy', {'password': {'secure': 'SUPER_INSECURE_PASSWORD'}})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')

        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['deploy']['password']['secure'])
        assert ['language', 'dist', 'deploy'] == [key for key in config.keys()]
test_cli.py 文件源码 项目:Travis-Encrypt 作者: mandeep 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_environment_variable_nonempty_file():
    """Test the encrypt module's CLI function with the --env flag and a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('env', {'global': {'secure': 'API_KEY="SUPER_INSECURE_KEY"'}})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--env', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_API_KEY')

        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['env']['global']['secure'])
        assert ['language', 'dist', 'env'] == [key for key in config.keys()]
local.py 文件源码 项目:py-secretcrypt 作者: Zemanta 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _key():
    global __key
    if __key:
        return __key

    data_dir = _key_dir()
    key_file = os.path.join(data_dir, 'key')

    if os.path.isfile(key_file):
        with open(key_file, 'rb') as f:
            __key = base64.b64decode(f.read())
            return __key

    __key = base64.b64encode(os.urandom(16))
    try:
        os.makedirs(data_dir)
    except OSError as e:
        # errno17 == dir exists
        if e.errno != 17:
            raise
    with open(key_file, 'wb') as f:
        f.write(__key)
    return __key
client.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get(self, result):
        try:
            s = re.compile("S\s*=\s*'([^']+)").findall(result)[0]
            s = base64.b64decode(s)
            s = s.replace(' ', '')
            s = re.sub('String\.fromCharCode\(([^)]+)\)', r'chr(\1)', s)
            s = re.sub('\.slice\((\d+),(\d+)\)', r'[\1:\2]', s)
            s = re.sub('\.charAt\(([^)]+)\)', r'[\1]', s)
            s = re.sub('\.substr\((\d+),(\d+)\)', r'[\1:\1+\2]', s)
            s = re.sub(';location.reload\(\);', '', s)
            s = re.sub(r'\n', '', s)
            s = re.sub(r'document\.cookie', 'cookie', s)

            cookie = '' ; exec(s)
            self.cookie = re.compile('([^=]+)=(.*)').findall(cookie)[0]
            self.cookie = '%s=%s' % (self.cookie[0], self.cookie[1])

            return self.cookie
        except:
            pass
directdl.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
caservice.py 文件源码 项目:fabric-sdk-py 作者: hyperledger 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_cainfo(self):
        """Query the ca service information.

        Args:

        Returns: The base64 encoded CA PEM file content for the caname

        """
        if self._ca_name != "":
            body_data = {"caname": self._ca_name}
        else:
            body_data = {}
        response = self._send_ca_post(path="cainfo", json=body_data)
        _logger.debug("Raw response json {0}".format(response))
        if (response['success'] and response['result']['CAName']
                == self._ca_name):
            return base64.b64decode(response['result']['CAChain'])
        else:
            raise ValueError("get_cainfo failed with errors {0}"
                             .format(response['errors']))
test_agent.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_inject_file_with_old_agent(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        raw_path = base64.b64decode(b64_path)
        raw_file = base64.b64decode(b64_file)
        new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file))
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'injectfile')
        tmp_arg_dict["value"] = json.dumps({"name": "injectfile",
                                            "value": new_b64})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.agent.inject_file(self.agent, FAKE_ARG_DICT)

        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
        self.agent._get_agent_features.assert_called_once()
test_cfpp.py 文件源码 项目:cfpp 作者: dcoker 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_kms(self):
        if "CFPP_RUN_KMS_TESTS" not in os.environ:
            return

        import boto3
        import botocore

        output = subprocess.check_output(["cfpp", "-s", "tests",
                                          "tests/kms_test.template"])
        parsed = json.loads(output)["Parameters"]
        without_context = parsed["EncryptedValue"]["Default"]
        with_context = parsed["EncryptedValueWithContext"]["Default"]

        kms = boto3.client('kms')
        kms.decrypt(CiphertextBlob=base64.b64decode(without_context))
        try:
            kms.decrypt(CiphertextBlob=with_context)
            self.fail("expected KMS to fail due to lack of context")
        except botocore.exceptions.ClientError:
            pass

        kms.decrypt(CiphertextBlob=base64.b64decode(with_context),
                    EncryptionContext={"ContextKey": "ContextValue"})
securecookie.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unquote(cls, value):
        """Unquote the value for the cookie.  If unquoting does not work a
        :exc:`UnquoteError` is raised.

        :param value: the value to unquote.
        """
        try:
            if cls.quote_base64:
                value = base64.b64decode(value)
            if cls.serialization_method is not None:
                value = cls.serialization_method.loads(value)
            return value
        except Exception:
            # unfortunately pickle and other serialization modules can
            # cause pretty every error here.  if we get one we catch it
            # and convert it into an UnquoteError
            raise UnquoteError()
sessions.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
api.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def post(self):
        authenticated = False
        if 'Authorization' in self.request.headers:
            auth = self.request.headers['Authorization']
            decoded = base64.b64decode(auth[6:])
            authenticated = True
        if authenticated:
            self.response.out.write('OK')
        else:    
            site = GetSite()
            template_values = {}
            template_values['site'] = site
            template_values['message'] = "Authentication required"
            path = os.path.join(os.path.dirname(__file__), 'tpl', 'api')
            t = self.get_template(path,'error.json')
            output = t.render(template_values)
            self.set_status(401, 'Unauthorized')
            self.set_header('Content-type', 'application/json')
            self.set_header('WWW-Authenticate', 'Basic realm="' + site.domain + '"')
            self.write(output)

# Replies
# /api/replies/show.json
protocol.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def handshake_cookie_verify(b64_cookie):
        '''
        If b64_cookie matches the expected format for a base-64 encoded
        privcount cookie, return the decoded cookie.
        Otherwise, return False.
        Raises an exception if the cookie is not correctly padded base64.
        '''
        if len(b64_cookie) != PrivCountProtocol.COOKIE_B64_BYTES:
            logging.warning("Invalid cookie: wrong encoded length {} expected {}"
                            .format(len(b64_cookie),
                                    PrivCountProtocol.COOKIE_B64_BYTES))
            return False
        cookie = b64decode(b64_cookie)
        if len(cookie) != PrivCountProtocol.COOKIE_BYTES:
            logging.warning("Invalid cookie: wrong decoded length {} expected {}"
                            .format(len(cookie),
                                    PrivCountProtocol.COOKIE_BYTES))
            return False
        return cookie
__init__.py 文件源码 项目:binja_dynamics 作者: nccgroup 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def decode(self, encoded):
        """ Takes the input from the text box and decodes it using the parameter
        set in the combobox """
        mode = self._encodings[self._decoder.currentIndex()]
        if mode == 'raw':
            return str(encoded)
        try:
            if mode == 'hex':
                return encoded.decode('hex')
            if mode == 'b64':
                return b64decode(encoded)
            if mode == 'py2':
                return eval(encoded)
        except:
            log_alert("Failed to decode input")
            return None
securecookie.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def unquote(cls, value):
        """Unquote the value for the cookie.  If unquoting does not work a
        :exc:`UnquoteError` is raised.

        :param value: the value to unquote.
        """
        try:
            if cls.quote_base64:
                value = base64.b64decode(value)
            if cls.serialization_method is not None:
                value = cls.serialization_method.loads(value)
            return value
        except Exception:
            # unfortunately pickle and other serialization modules can
            # cause pretty every error here.  if we get one we catch it
            # and convert it into an UnquoteError
            raise UnquoteError()
sessions.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
client.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _generate_assertion(self):
      """Generate the assertion that will be used in the request."""
      now = long(time.time())
      payload = {
          'aud': self.token_uri,
          'scope': self.scope,
          'iat': now,
          'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS,
          'iss': self.service_account_name
      }
      payload.update(self.kwargs)
      logger.debug(str(payload))

      private_key = base64.b64decode(self.private_key)
      return crypt.make_signed_jwt(crypt.Signer.from_string(
          private_key, self.private_key_password), payload)

  # Only used in verify_id_token(), which is always calling to the same URI
  # for the certs.
proof.py 文件源码 项目:PTE 作者: pwn2winctf 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def proof_open(team, proof):
    assert isinstance(proof, bytes)
    proof = b64decode(proof)

    claimed_chall_id = proof[2*64:].decode('utf-8')
    claimed_chall = Challenge(claimed_chall_id)

    chall_pk = claimed_chall['pk']
    team_pk = team['sign_pk']

    membership_proof = pysodium.crypto_sign_open(proof, chall_pk)
    chall_id = pysodium.crypto_sign_open(membership_proof,
                                         team_pk).decode('utf-8')

    if claimed_chall_id != chall_id:
        raise ValueError('invalid proof')

    return claimed_chall
lambda_function.py 文件源码 项目:PTE 作者: pwn2winctf 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def setup_environment():
    root = os.getenv('LAMBDA_TASK_ROOT')
    bin_dir = os.path.join(root, 'bin')
    os.environ['PATH'] += ':' + bin_dir
    os.environ['GIT_EXEC_PATH'] = bin_dir

    ssh_dir = tempfile.mkdtemp()

    ssh_identity = os.path.join(ssh_dir, 'identity')
    with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
                   'w') as f:
        f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))

    ssh_config = os.path.join(ssh_dir, 'config')
    with open(ssh_config, 'w') as f:
        f.write('CheckHostIP no\n'
                'StrictHostKeyChecking yes\n'
                'IdentityFile %s\n'
                'UserKnownHostsFile %s\n' %
                (ssh_identity, os.path.join(root, 'known_hosts')))

    os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
synthetics.py 文件源码 项目:newrelic-cli 作者: NativeInstruments 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_monitor_script(self, name):
        # we will need monitor ID. So get the monitor object first
        monitor = self.get_monitor_by_name(name)
        if not monitor:
            raise ItemNotFoundError('Monitor {} not found'.format(name))

        url = '{}/v3/monitors/{}/script'.format(self.base_url, monitor['id'])
        try:
            r = self._get(
                url,
                headers=self.default_headers,
                timeout=self.timeout
            )
        # Make error message more specific
        except ItemNotFoundError:
            raise ItemNotFoundError(
                'Script for monitor {} not found'
                .format(name)
            )
        script_base64 = r.json()['scriptText']
        script = base64.b64decode(script_base64)
        return script
cloud_verifier_common.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prepare_v(instance):
    # be very careful printing K, U, or V as they leak in logs stored on unprotected disks
    if common.DEVELOP_IN_ECLIPSE:
        logger.debug("b64_V (non encrypted): " + instance['v'])

    if instance.get('b64_encrypted_V',"") !="":
        b64_encrypted_V = instance['b64_encrypted_V']
        logger.debug("Re-using cached encrypted V")
    else:
        # encrypt V with the public key
        b64_encrypted_V = base64.b64encode(crypto.rsa_encrypt(crypto.rsa_import_pubkey(instance['public_key']),str(base64.b64decode(instance['v']))))
        instance['b64_encrypted_V'] = b64_encrypted_V

    logger.debug("b64_encrypted_V:" + b64_encrypted_V)
    post_data = {
              'encrypted_key': b64_encrypted_V
            }
    v_json_message = json.dumps(post_data)
    return v_json_message
tenant_webapp.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_data_uri(data_uri):
    if data_uri is None:
        return None

    data = []

    dataset_uris = data_uri.split("\n")
    for uri in dataset_uris:
        fpos = uri.find(",")
        if fpos == -1: 
            return None

        try:
            data.append(base64.b64decode(uri[fpos:]))
        except Exception as e:
            # skip bad data
            continue

    return data
vtpm_manager.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def activate_group(uuid,keyblob):
    if common.STUB_TPM:
        return common.TEST_AES_REG_KEY

    group_id = get_group_num(uuid)
    priv_ca = base64.b64decode(keyblob)
    assert len(priv_ca) == 256
    logger.debug('Activating group number %d', group_id)
    body = vtpm_cmd(VTPM_ORD_GROUP_ACTIVATE,
                    struct.pack('>II', group_id, 256) + priv_ca)
    (algId, encScheme, size), body = unpack('>IHH', body)
    logger.info('Received Key. AlgID: 0x%x, encScheme: 0x%x, size: 0x%x',
                algId, encScheme, size)
    logger.info('Key: %r', body)
    assert size == len(body)
    return body


问题


面经


文章

微信
公众号

扫码关注公众号