def get_cors(self, headers=None):
"""
Returns the current CORS configuration on the bucket.
:rtype: :class:`boto.s3.cors.CORSConfiguration`
:returns: A CORSConfiguration object that describes all current
CORS rules in effect for the bucket.
"""
body = self.get_cors_xml(headers)
cors = CORSConfiguration()
h = handler.XmlHandler(cors, self)
xml.sax.parseString(body, h)
return cors
python类s3()的实例源码
def key_check(module, s3, bucket, obj, version=None):
try:
bucket = s3.lookup(bucket)
key_check = bucket.get_key(obj, version_id=version)
except s3.provider.storage_response_error as e:
if version is not None and e.status == 400: # If a specified version doesn't exist a 400 is returned.
key_check = None
else:
module.fail_json(msg=str(e))
if key_check:
return True
else:
return False
def keysum(module, s3, bucket, obj, version=None):
bucket = s3.lookup(bucket)
key_check = bucket.get_key(obj, version_id=version)
if not key_check:
return None
md5_remote = key_check.etag[1:-1]
etag_multipart = '-' in md5_remote # Check for multipart, etag is not md5
if etag_multipart is True:
module.fail_json(msg="Files uploaded with multipart of s3 are not supported with checksum, unable to compute checksum.")
return md5_remote
def bucket_check(module, s3, bucket):
try:
result = s3.lookup(bucket)
except s3.provider.storage_response_error as e:
module.fail_json(msg= str(e))
if result:
return True
else:
return False
def create_bucket(module, s3, bucket, location=None):
if location is None:
location = Location.DEFAULT
try:
bucket = s3.create_bucket(bucket, location=location)
for acl in module.params.get('permission'):
bucket.set_acl(acl)
except s3.provider.storage_response_error as e:
module.fail_json(msg= str(e))
if bucket:
return True
def get_bucket(module, s3, bucket):
try:
return s3.lookup(bucket)
except s3.provider.storage_response_error as e:
module.fail_json(msg= str(e))
def delete_key(module, s3, bucket, obj):
try:
bucket = s3.lookup(bucket)
bucket.delete_key(obj)
module.exit_json(msg="Object deleted from bucket %s"%bucket, changed=True)
except s3.provider.storage_response_error as e:
module.fail_json(msg= str(e))
def create_dirkey(module, s3, bucket, obj):
try:
bucket = s3.lookup(bucket)
key = bucket.new_key(obj)
key.set_contents_from_string('')
module.exit_json(msg="Virtual directory %s created in bucket %s" % (obj, bucket.name), changed=True)
except s3.provider.storage_response_error as e:
module.fail_json(msg= str(e))
def upload_s3file(module, s3, bucket, obj, src, expiry, metadata, encrypt, headers):
try:
bucket = s3.lookup(bucket)
key = bucket.new_key(obj)
if metadata:
for meta_key in metadata.keys():
key.set_metadata(meta_key, metadata[meta_key])
key.set_contents_from_filename(src, encrypt_key=encrypt, headers=headers)
for acl in module.params.get('permission'):
key.set_acl(acl)
url = key.generate_url(expiry)
module.exit_json(msg="PUT operation complete", url=url, changed=True)
except s3.provider.storage_copy_error as e:
module.fail_json(msg= str(e))
def download_s3str(module, s3, bucket, obj, version=None):
try:
bucket = s3.lookup(bucket)
key = bucket.get_key(obj, version_id=version)
contents = key.get_contents_as_string()
module.exit_json(msg="GET operation complete", contents=contents, changed=True)
except s3.provider.storage_copy_error as e:
module.fail_json(msg= str(e))
def s3_create_new_bucket(self, bucketname, bucket_location="sa-east-1", debug=False):
record = {
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
cur_keys = self.aws_get_keys(debug)
import boto
import boto.s3
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.create_bucket(bucketname, location=bucket_location)
if bucket:
self.lg("Created Bucket(" + str(bucketname) + ")", 6)
results = self.build_def_hash("SUCCESS", "", {})
else:
results = self.build_def_hash("Display Error", "Failed to Create Bucket(" + str(bucketname) + ")", {})
except Exception,k:
status = "FAILED"
err_msg = "Unable to Create new S3 Bucket(" + str(bucketname) + ") with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_create_new_s3_bucket
def s3_calculate_bucket_size(self, bucket_name, debug=False):
record = {
"Size" : 0,
"SizeMB" : 0.0,
"SizeGB" : 0.0,
"Files" : 0
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
import boto, math
import boto.s3
cur_keys = self.aws_get_keys(debug)
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(bucket_name, validate=False)
total_bytes = 0
for key in bucket:
record["Size"] += int(key.size)
record["Files"] += 1
# end for all keys
record["SizeMB"] = float(self.to_float_str(float(float(record["Size"]) / 1024.0 / 1024.0)))
record["SizeGB"] = float(self.to_float_str(float(float(record["SizeMB"]) / 1024.0)))
results = self.build_def_hash("SUCCESS", "", record)
except Exception,w:
self.lg("Failed to Process S3 Bucket(" + str(bucket_name) + ") Size Ex(" + str(w) + ")", 0)
results = self.build_def_hash("Display Error", "Not Run", record)
return results
# end of s3_calculate_bucket_size
def set_key_class(self, key_class):
"""
Set the Key class associated with this bucket. By default, this
would be the boto.s3.key.Key class but if you want to subclass that
for some reason this allows you to associate your new class with a
bucket so that when you call bucket.new_key() or when you get a listing
of keys in the bucket you will get an instances of your key class
rather than the default.
:type key_class: class
:param key_class: A subclass of Key that can be more specific
"""
self.key_class = key_class
def lookup(self, key_name, headers=None):
"""
Deprecated: Please use get_key method.
:type key_name: string
:param key_name: The name of the key to retrieve
:rtype: :class:`boto.s3.key.Key`
:returns: A Key object from this bucket.
"""
return self.get_key(key_name, headers=headers)
def new_key(self, key_name=None):
"""
Creates a new key
:type key_name: string
:param key_name: The name of the key to create
:rtype: :class:`boto.s3.key.Key` or subclass
:returns: An instance of the newly created key object
"""
if not key_name:
raise ValueError('Empty key names are not allowed')
return self.key_class(self, key_name)
def delete_key(self, key_name, headers=None, version_id=None,
mfa_token=None):
"""
Deletes a key from the bucket. If a version_id is provided,
only that version of the key will be deleted.
:type key_name: string
:param key_name: The key name to delete
:type version_id: string
:param version_id: The version ID (optional)
:type mfa_token: tuple or list of strings
:param mfa_token: A tuple or list consisting of the serial
number from the MFA device and the current value of the
six-digit token associated with the device. This value is
required anytime you are deleting versioned objects from a
bucket that has the MFADelete option on the bucket.
:rtype: :class:`boto.s3.key.Key` or subclass
:returns: A key object holding information on what was
deleted. The Caller can see if a delete_marker was
created or removed and what version_id the delete created
or removed.
"""
if not key_name:
raise ValueError('Empty key names are not allowed')
return self._delete_key_internal(key_name, headers=headers,
version_id=version_id,
mfa_token=mfa_token,
query_args_l=None)
def configure_lifecycle(self, lifecycle_config, headers=None):
"""
Configure lifecycle for this bucket.
:type lifecycle_config: :class:`boto.s3.lifecycle.Lifecycle`
:param lifecycle_config: The lifecycle configuration you want
to configure for this bucket.
"""
xml = lifecycle_config.to_xml()
#xml = xml.encode('utf-8')
fp = StringIO(xml)
md5 = boto.utils.compute_md5(fp)
if headers is None:
headers = {}
headers['Content-MD5'] = md5[1]
headers['Content-Type'] = 'text/xml'
response = self.connection.make_request('PUT', self.name,
data=fp.getvalue(),
query_args='lifecycle',
headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)
def set_website_configuration(self, config, headers=None):
"""
:type config: boto.s3.website.WebsiteConfiguration
:param config: Configuration data
"""
return self.set_website_configuration_xml(config.to_xml(),
headers=headers)
def get_website_configuration_obj(self, headers=None):
"""Get the website configuration as a
:class:`boto.s3.website.WebsiteConfiguration` object.
"""
config_xml = self.get_website_configuration_xml(headers=headers)
config = website.WebsiteConfiguration()
h = handler.XmlHandler(config, self)
xml.sax.parseString(config_xml, h)
return config
def set_cors(self, cors_config, headers=None):
"""
Set the CORS for this bucket given a boto CORSConfiguration
object.
:type cors_config: :class:`boto.s3.cors.CORSConfiguration`
:param cors_config: The CORS configuration you want
to configure for this bucket.
"""
return self.set_cors_xml(cors_config.to_xml())