s3_util.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码
def get_file_header(meta, filename):
        """
        Gets S3 file object
        :param meta: file meta object
        :return: S3 object if no error, otherwise None. Check meta[
        u'result_status'] for status.
        """

        logger = getLogger(__name__)
        akey = SnowflakeS3Util._get_s3_object(meta, filename)
        try:
            # HTTP HEAD request
            akey.load()
        except botocore.exceptions.ClientError as e:
            if e.response[u'Error'][u'Code'] == EXPIRED_TOKEN:
                logger.debug(u"AWS Token expired. Renew and retry")
                meta[u'result_status'] = ResultStatus.RENEW_TOKEN
                return None
            elif e.response[u'Error'][u'Code'] == u'404':
                logger.debug(u'not found. bucket: %s, path: %s',
                             akey.bucket_name, akey.key)
                meta[u'result_status'] = ResultStatus.NOT_FOUND_FILE
                return FileHeader(
                    digest=None,
                    content_length=None,
                    encryption_metadata=None,
                )
            elif e.response[u'Error'][u'Code'] == u'400':
                logger.debug(u'Bad request, token needs to be renewed: %s. '
                             u'bucket: %s, path: %s',
                             e.response[u'Error'][u'Message'],
                             akey.bucket_name, akey.key)
                meta[u'result_status'] = ResultStatus.RENEW_TOKEN
                return None
            logger.debug(
                u"Failed to get metadata for %s, %s: %s",
                akey.bucket_name, akey.key, e)
            meta[u'result_status'] = ResultStatus.ERROR
            return None

        meta[u'result_status'] = ResultStatus.UPLOADED
        encryption_metadata = EncryptionMetadata(
            key=akey.metadata.get(AMZ_KEY),
            iv=akey.metadata.get(AMZ_IV),
            matdesc=akey.metadata.get(AMZ_MATDESC),
        ) if akey.metadata.get(AMZ_KEY) else None

        return FileHeader(
            digest=akey.metadata.get(SFC_DIGEST),
            content_length=akey.content_length,
            encryption_metadata=encryption_metadata
        )
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号