storage_uri.py 文件源码

python
阅读 15 收藏 0 点赞 0 评论 0

项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码
def connect(self, access_key_id=None, secret_access_key=None, **kwargs):
        """
        Opens a connection to appropriate provider, depending on provider
        portion of URI. Requires Credentials defined in boto config file (see
        boto/pyami/config.py).
        @type storage_uri: StorageUri
        @param storage_uri: StorageUri specifying a bucket or a bucket+object
        @rtype: L{AWSAuthConnection<boto.gs.connection.AWSAuthConnection>}
        @return: A connection to storage service provider of the given URI.
        """
        connection_args = dict(self.connection_args or ())
        # Use OrdinaryCallingFormat instead of boto-default
        # SubdomainCallingFormat because the latter changes the hostname
        # that's checked during cert validation for HTTPS connections,
        # which will fail cert validation (when cert validation is enabled).
        # Note: the following import can't be moved up to the start of
        # this file else it causes a config import failure when run from
        # the resumable upload/download tests.
        from boto.s3.connection import OrdinaryCallingFormat
        connection_args['calling_format'] = OrdinaryCallingFormat()
        if (hasattr(self, 'suppress_consec_slashes') and
            'suppress_consec_slashes' not in connection_args):
            connection_args['suppress_consec_slashes'] = (
                self.suppress_consec_slashes)
        connection_args.update(kwargs)
        if not self.connection:
            if self.scheme in self.provider_pool:
                self.connection = self.provider_pool[self.scheme]
            elif self.scheme == 's3':
                from boto.s3.connection import S3Connection
                self.connection = S3Connection(access_key_id,
                                               secret_access_key,
                                               **connection_args)
                self.provider_pool[self.scheme] = self.connection
            elif self.scheme == 'gs':
                from boto.gs.connection import GSConnection
                self.connection = GSConnection(access_key_id,
                                               secret_access_key,
                                               **connection_args)
                self.provider_pool[self.scheme] = self.connection
            elif self.scheme == 'file':
                from boto.file.connection import FileConnection
                self.connection = FileConnection(self)
            else:
                raise InvalidUriError('Unrecognized scheme "%s"' %
                                      self.scheme)
        self.connection.debug = self.debug
        return self.connection
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号