def get_file_header(meta, filename):
"""
Gets S3 file object
:param meta: file meta object
:return: S3 object if no error, otherwise None. Check meta[
u'result_status'] for status.
"""
logger = getLogger(__name__)
akey = SnowflakeS3Util._get_s3_object(meta, filename)
try:
# HTTP HEAD request
akey.load()
except botocore.exceptions.ClientError as e:
if e.response[u'Error'][u'Code'] == EXPIRED_TOKEN:
logger.debug(u"AWS Token expired. Renew and retry")
meta[u'result_status'] = ResultStatus.RENEW_TOKEN
return None
elif e.response[u'Error'][u'Code'] == u'404':
logger.debug(u'not found. bucket: %s, path: %s',
akey.bucket_name, akey.key)
meta[u'result_status'] = ResultStatus.NOT_FOUND_FILE
return FileHeader(
digest=None,
content_length=None,
encryption_metadata=None,
)
elif e.response[u'Error'][u'Code'] == u'400':
logger.debug(u'Bad request, token needs to be renewed: %s. '
u'bucket: %s, path: %s',
e.response[u'Error'][u'Message'],
akey.bucket_name, akey.key)
meta[u'result_status'] = ResultStatus.RENEW_TOKEN
return None
logger.debug(
u"Failed to get metadata for %s, %s: %s",
akey.bucket_name, akey.key, e)
meta[u'result_status'] = ResultStatus.ERROR
return None
meta[u'result_status'] = ResultStatus.UPLOADED
encryption_metadata = EncryptionMetadata(
key=akey.metadata.get(AMZ_KEY),
iv=akey.metadata.get(AMZ_IV),
matdesc=akey.metadata.get(AMZ_MATDESC),
) if akey.metadata.get(AMZ_KEY) else None
return FileHeader(
digest=akey.metadata.get(SFC_DIGEST),
content_length=akey.content_length,
encryption_metadata=encryption_metadata
)
评论列表
文章目录