python类utils()的实例源码

auth.py 文件源码 项目:alfred-ec2 作者: SoMuchToGrok 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_auth(self, http_request, **kwargs):
        headers = http_request.headers
        params = http_request.params
        params['AWSAccessKeyId'] = self._provider.access_key
        params['SignatureVersion'] = self.SignatureVersion
        params['Timestamp'] = boto.utils.get_ts()
        qs, signature = self._calc_signature(
            http_request.params, http_request.method,
            http_request.auth_path, http_request.host)
        boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
        if http_request.method == 'POST':
            headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
            http_request.body = qs + '&Signature=' + urllib.parse.quote_plus(signature)
            http_request.headers['Content-Length'] = str(len(http_request.body))
        else:
            http_request.body = ''
            # if this is a retried request, the qs from the previous try will
            # already be there, we need to get rid of that and rebuild it
            http_request.path = http_request.path.split('?')[0]
            http_request.path = (http_request.path + '?' + qs +
                                 '&Signature=' + urllib.parse.quote_plus(signature))
auth.py 文件源码 项目:alfred-ec2 作者: SoMuchToGrok 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _calc_signature(self, params, verb, path, server_name):
        boto.log.debug('using _calc_signature_2')
        string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
        hmac = self._get_hmac()
        params['SignatureMethod'] = self.algorithm()
        if self._provider.security_token:
            params['SecurityToken'] = self._provider.security_token
        keys = sorted(params.keys())
        pairs = []
        for key in keys:
            val = boto.utils.get_utf8_value(params[key])
            pairs.append(urllib.parse.quote(key, safe='') + '=' +
                         urllib.parse.quote(val, safe='-_~'))
        qs = '&'.join(pairs)
        boto.log.debug('query string: %s' % qs)
        string_to_sign += qs
        boto.log.debug('string_to_sign: %s' % string_to_sign)
        hmac.update(string_to_sign.encode('utf-8'))
        b64 = base64.b64encode(hmac.digest())
        boto.log.debug('len(b64)=%d' % len(b64))
        boto.log.debug('base64 encoded digest: %s' % b64)
        return (qs, b64)
auth.py 文件源码 项目:alfred-ec2 作者: SoMuchToGrok 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def add_auth(self, req, **kwargs):
        req.params['AWSAccessKeyId'] = self._provider.access_key
        req.params['SignatureVersion'] = self.SignatureVersion
        req.params['Timestamp'] = boto.utils.get_ts()
        qs, signature = self._calc_signature(req.params, req.method,
                                             req.auth_path, req.host)
        boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
        if req.method == 'POST':
            req.headers['Content-Length'] = str(len(req.body))
            req.headers['Content-Type'] = req.headers.get('Content-Type',
                                                          'text/plain')
        else:
            req.body = ''
        # if this is a retried req, the qs from the previous try will
        # already be there, we need to get rid of that and rebuild it
        req.path = req.path.split('?')[0]
        req.path = (req.path + '?' + qs +
                    '&Signature=' + urllib.parse.quote_plus(signature))
provider.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _populate_keys_from_metadata_server(self):
        # get_instance_metadata is imported here because of a circular
        # dependency.
        boto.log.debug("Retrieving credentials from metadata server.")
        from boto.utils import get_instance_metadata
        timeout = config.getfloat('Boto', 'metadata_service_timeout', 1.0)
        metadata = get_instance_metadata(timeout=timeout, num_retries=1)
        # I'm assuming there's only one role on the instance profile.
        if metadata and 'iam' in metadata:
            security = metadata['iam']['security-credentials'].values()[0]
            self._access_key = security['AccessKeyId']
            self._secret_key = self._convert_key_to_str(security['SecretAccessKey'])
            self._security_token = security['Token']
            expires_at = security['Expiration']
            self._credential_expiry_time = datetime.strptime(
                expires_at, "%Y-%m-%dT%H:%M:%SZ")
            boto.log.debug("Retrieved credentials will expire in %s at: %s",
                           self._credential_expiry_time - datetime.now(), expires_at)
bucket.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def set_cors_xml(self, cors_xml, headers=None):
        """
        Set the CORS (Cross-Origin Resource Sharing) for a bucket.

        :type cors_xml: str
        :param cors_xml: The XML document describing your desired
            CORS configuration.  See the S3 documentation for details
            of the exact syntax required.
        """
        fp = StringIO.StringIO(cors_xml)
        md5 = boto.utils.compute_md5(fp)
        if headers is None:
            headers = {}
        headers['Content-MD5'] = md5[1]
        headers['Content-Type'] = 'text/xml'
        response = self.connection.make_request('PUT', self.name,
                                                data=fp.getvalue(),
                                                query_args='cors',
                                                headers=headers)
        body = response.read()
        if response.status == 200:
            return True
        else:
            raise self.connection.provider.storage_response_error(
                response.status, response.reason, body)
auth.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_auth(self, http_request, **kwargs):
        headers = http_request.headers
        method = http_request.method
        auth_path = http_request.auth_path
        if 'Date' not in headers:
            headers['Date'] = formatdate(usegmt=True)

        if self._provider.security_token:
            key = self._provider.security_token_header
            headers[key] = self._provider.security_token
        string_to_sign = boto.utils.canonical_string(method, auth_path,
                                                     headers, None,
                                                     self._provider)
        boto.log.debug('StringToSign:\n%s' % string_to_sign)
        b64_hmac = self.sign_string(string_to_sign)
        auth_hdr = self._provider.auth_header
        headers['Authorization'] = ("%s %s:%s" %
                                    (auth_hdr,
                                     self._provider.access_key, b64_hmac))
auth.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_auth(self, http_request, **kwargs):
        headers = http_request.headers
        params = http_request.params
        params['AWSAccessKeyId'] = self._provider.access_key
        params['SignatureVersion'] = self.SignatureVersion
        params['Timestamp'] = boto.utils.get_ts()
        qs, signature = self._calc_signature(
            http_request.params, http_request.method,
            http_request.auth_path, http_request.host)
        boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
        if http_request.method == 'POST':
            headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
            http_request.body = qs + '&Signature=' + urllib.quote_plus(signature)
            http_request.headers['Content-Length'] = str(len(http_request.body))
        else:
            http_request.body = ''
            # if this is a retried request, the qs from the previous try will
            # already be there, we need to get rid of that and rebuild it
            http_request.path = http_request.path.split('?')[0]
            http_request.path = (http_request.path + '?' + qs +
                                 '&Signature=' + urllib.quote_plus(signature))
auth.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_auth(self, req, **kwargs):
        req.params['AWSAccessKeyId'] = self._provider.access_key
        req.params['SignatureVersion'] = self.SignatureVersion
        req.params['Timestamp'] = boto.utils.get_ts()
        qs, signature = self._calc_signature(req.params, req.method,
                                             req.auth_path, req.host)
        boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
        if req.method == 'POST':
            req.headers['Content-Length'] = str(len(req.body))
            req.headers['Content-Type'] = req.headers.get('Content-Type',
                                                          'text/plain')
        else:
            req.body = ''
        # if this is a retried req, the qs from the previous try will
        # already be there, we need to get rid of that and rebuild it
        req.path = req.path.split('?')[0]
        req.path = (req.path + '?' + qs +
                             '&Signature=' + urllib.quote_plus(signature))
aws.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_aws_instance_meta(utils):
    identity = utils.get_instance_identity()
    meta = utils.get_instance_metadata()
    return {
        'instance-id': meta['instance-id'],
        'region': identity['document']['region'],
        'av-zone': meta['placement']['availability-zone'],
    }
aws.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def detach_ebs(aws_access_key_id, aws_secret_access_key, devices):
    import boto
    import boto.ec2
    meta = get_aws_instance_meta(boto.utils)
    connection = boto.ec2.connect_to_region(
        meta['region'],
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )
    bd_mapping = get_aws_block_device_mapping(
        connection, meta['instance-id']
    )
    for key, bd in bd_mapping.iteritems():
        if key in devices:
            connection.detach_volume(bd.volume_id)
test_password.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_model(self,hashfunc=None):
        from boto.utils import Password
        from boto.sdb.db.model import Model
        from boto.sdb.db.property import PasswordProperty
        import hashlib
        class MyModel(Model):
            password=PasswordProperty(hashfunc=hashfunc)
        return MyModel
test_password.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_custom_password_class(self):
        from boto.utils import Password
        from boto.sdb.db.model import Model
        from boto.sdb.db.property import PasswordProperty
        import hmac, hashlib


        myhashfunc = hashlib.md5
    ## Define a new Password class
        class MyPassword(Password):
            hashfunc = myhashfunc #hashlib.md5 #lambda cls,msg: hmac.new('mysecret',msg)

    ## Define a custom password property using the new Password class

        class MyPasswordProperty(PasswordProperty):
            data_type=MyPassword
            type_name=MyPassword.__name__

    ## Define a model using the new password property

        class MyModel(Model):
            password=MyPasswordProperty()#hashfunc=hashlib.md5)

        obj = MyModel()
        obj.password = 'bar'
        expected = myhashfunc('bar').hexdigest() #hmac.new('mysecret','bar').hexdigest()
        log.debug("\npassword=%s\nexpected=%s" % (obj.password, expected))
        self.assertTrue(obj.password == 'bar' )
        obj.save()
        id= obj.id
        time.sleep(5)
        obj = MyModel.get_by_id(id)
        self.assertEquals(obj.password, 'bar')
        self.assertEquals(str(obj.password), expected)
                          #hmac.new('mysecret','bar').hexdigest())
connection.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def build_base_http_request(self, method, path, auth_path,
                                params=None, headers=None, data='', host=None):
        path = self.get_path(path)
        if auth_path is not None:
            auth_path = self.get_path(auth_path)
        if params is None:
            params = {}
        else:
            params = params.copy()
        if headers is None:
            headers = {}
        else:
            headers = headers.copy()
        if self.host_header and not boto.utils.find_matching_headers('host', headers):
            headers['host'] = self.host_header
        host = host or self.host
        if self.use_proxy:
            if not auth_path:
                auth_path = path
            path = self.prefix_proxy_to_path(path, host)
            if self.proxy_user and self.proxy_pass and not self.is_secure:
                # If is_secure, we don't have to set the proxy authentication
                # header here, we did that in the CONNECT to the proxy.
                headers.update(self.get_proxy_auth_header())
        return HTTPRequest(method, self.protocol, host, self.port,
                           path, auth_path, params, headers, data)
connection.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_utf8_value(self, value):
        return boto.utils.get_utf8_value(value)
bucket.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_key_internal(self, key_name, headers, query_args_l):
        query_args = '&'.join(query_args_l) or None
        response = self.connection.make_request('HEAD', self.name, key_name,
                                                headers=headers,
                                                query_args=query_args)
        response.read()
        # Allow any success status (2xx) - for example this lets us
        # support Range gets, which return status 206:
        if response.status / 100 == 2:
            k = self.key_class(self)
            provider = self.connection.provider
            k.metadata = boto.utils.get_aws_metadata(response.msg, provider)
            for field in Key.base_fields:
                k.__dict__[field.lower().replace('-', '_')] = \
                    response.getheader(field)
            # the following machinations are a workaround to the fact that
            # apache/fastcgi omits the content-length header on HEAD
            # requests when the content-length is zero.
            # See http://goo.gl/0Tdax for more details.
            clen = response.getheader('content-length')
            if clen:
                k.size = int(response.getheader('content-length'))
            else:
                k.size = 0
            k.name = key_name
            k.handle_version_headers(response)
            k.handle_encryption_headers(response)
            k.handle_restore_headers(response)
            k.handle_addl_headers(response.getheaders())
            return k, response
        else:
            if response.status == 404:
                return None, response
            else:
                raise self.connection.provider.storage_response_error(
                    response.status, response.reason, '')
bucket.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def configure_lifecycle(self, lifecycle_config, headers=None):
        """
        Configure lifecycle for this bucket.

        :type lifecycle_config: :class:`boto.s3.lifecycle.Lifecycle`
        :param lifecycle_config: The lifecycle configuration you want
            to configure for this bucket.
        """
        xml = lifecycle_config.to_xml()
        #xml = xml.encode('utf-8')
        fp = StringIO(xml)
        md5 = boto.utils.compute_md5(fp)
        if headers is None:
            headers = {}
        headers['Content-MD5'] = md5[1]
        headers['Content-Type'] = 'text/xml'
        response = self.connection.make_request('PUT', self.name,
                                                data=fp.getvalue(),
                                                query_args='lifecycle',
                                                headers=headers)
        body = response.read()
        if response.status == 200:
            return True
        else:
            raise self.connection.provider.storage_response_error(
                response.status, response.reason, body)
auth.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query_string(self, http_request):
        parameter_names = sorted(http_request.params.keys())
        pairs = []
        for pname in parameter_names:
            pval = boto.utils.get_utf8_value(http_request.params[pname])
            pairs.append(urllib.parse.quote(pname, safe='') + '=' +
                         urllib.parse.quote(pval, safe='-_~'))
        return '&'.join(pairs)
auth.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def canonical_query_string(self, http_request):
        # POST requests pass parameters in through the
        # http_request.body field.
        if http_request.method == 'POST':
            return ""
        l = []
        for param in sorted(http_request.params):
            value = boto.utils.get_utf8_value(http_request.params[param])
            l.append('%s=%s' % (urllib.parse.quote(param, safe='-_.~'),
                                urllib.parse.quote(value, safe='-_.~')))
        return '&'.join(l)
auth.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def payload(self, http_request):
        body = http_request.body
        # If the body is a file like object, we can use
        # boto.utils.compute_hash, which will avoid reading
        # the entire body into memory.
        if hasattr(body, 'seek') and hasattr(body, 'read'):
            return boto.utils.compute_hash(body, hash_algorithm=sha256)[0]
        elif not isinstance(body, bytes):
            body = body.encode('utf-8')
        return sha256(body).hexdigest()
auth.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def canonical_query_string(self, http_request):
        # Note that we just do not return an empty string for
        # POST request. Query strings in url are included in canonical
        # query string.
        l = []
        for param in sorted(http_request.params):
            value = boto.utils.get_utf8_value(http_request.params[param])
            l.append('%s=%s' % (urllib.parse.quote(param, safe='-_.~'),
                                urllib.parse.quote(value, safe='-_.~')))
        return '&'.join(l)


问题


面经


文章

微信
公众号

扫码关注公众号