def test_provider_uri(self):
for prov in ('gs', 's3'):
uri_str = '%s://' % prov
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual(prov, uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertFalse(hasattr(uri, 'versionless_uri'))
self.assertEqual('', uri.bucket_name)
self.assertEqual('', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), True)
self.assertEqual(uri.names_container(), True)
self.assertEqual(uri.names_bucket(), False)
self.assertEqual(uri.names_object(), False)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, False)
python类storage_uri()的实例源码
def test_bucket_uri_no_trailing_slash(self):
for prov in ('gs', 's3'):
uri_str = '%s://bucket' % prov
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual(prov, uri.scheme)
self.assertEqual('%s/' % uri_str, uri.uri)
self.assertFalse(hasattr(uri, 'versionless_uri'))
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), True)
self.assertEqual(uri.names_bucket(), True)
self.assertEqual(uri.names_object(), False)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, False)
def test_bucket_uri_with_trailing_slash(self):
for prov in ('gs', 's3'):
uri_str = '%s://bucket/' % prov
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual(prov, uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertFalse(hasattr(uri, 'versionless_uri'))
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), True)
self.assertEqual(uri.names_bucket(), True)
self.assertEqual(uri.names_object(), False)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, False)
def test_non_versioned_object_uri(self):
for prov in ('gs', 's3'):
uri_str = '%s://bucket/obj/a/b' % prov
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual(prov, uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertEqual(uri_str, uri.versionless_uri)
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('obj/a/b', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), False)
self.assertEqual(uri.names_bucket(), False)
self.assertEqual(uri.names_object(), True)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, False)
def test_versioned_gs_object_uri(self):
uri_str = 'gs://bucket/obj/a/b#1359908801674000'
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual('gs', uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('obj/a/b', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(1359908801674000, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), False)
self.assertEqual(uri.names_bucket(), False)
self.assertEqual(uri.names_object(), True)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, True)
def test_versioned_s3_object_uri(self):
uri_str = 's3://bucket/obj/a/b#eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S'
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual('s3', uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertEqual('s3://bucket/obj/a/b', uri.versionless_uri)
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('obj/a/b', uri.object_name)
self.assertEqual('eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S', uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), False)
self.assertEqual(uri.names_bucket(), False)
self.assertEqual(uri.names_object(), True)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, True)
def test_explicit_file_uri(self):
tmp_dir = tempfile.tempdir or ''
uri_str = 'file://%s' % urllib.request.pathname2url(tmp_dir)
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual('file', uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertFalse(hasattr(uri, 'versionless_uri'))
self.assertEqual('', uri.bucket_name)
self.assertEqual(tmp_dir, uri.object_name)
self.assertFalse(hasattr(uri, 'version_id'))
self.assertFalse(hasattr(uri, 'generation'))
self.assertFalse(hasattr(uri, 'is_version_specific'))
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_bucket(), False)
# Don't check uri.names_container(), uri.names_directory(),
# uri.names_file(), or uri.names_object(), because for file URIs these
# functions look at the file system and apparently unit tests run
# chroot'd.
self.assertEqual(uri.is_stream(), False)
def test_implicit_file_uri(self):
tmp_dir = tempfile.tempdir or ''
uri_str = '%s' % urllib.request.pathname2url(tmp_dir)
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual('file', uri.scheme)
self.assertEqual('file://%s' % tmp_dir, uri.uri)
self.assertFalse(hasattr(uri, 'versionless_uri'))
self.assertEqual('', uri.bucket_name)
self.assertEqual(tmp_dir, uri.object_name)
self.assertFalse(hasattr(uri, 'version_id'))
self.assertFalse(hasattr(uri, 'generation'))
self.assertFalse(hasattr(uri, 'is_version_specific'))
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_bucket(), False)
# Don't check uri.names_container(), uri.names_directory(),
# uri.names_file(), or uri.names_object(), because for file URIs these
# functions look at the file system and apparently unit tests run
# chroot'd.
self.assertEqual(uri.is_stream(), False)
def test_gs_object_uri_contains_sharp_not_matching_version_syntax(self):
uri_str = 'gs://bucket/obj#13a990880167400'
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
self.assertEqual('gs', uri.scheme)
self.assertEqual(uri_str, uri.uri)
self.assertEqual('gs://bucket/obj#13a990880167400',
uri.versionless_uri)
self.assertEqual('bucket', uri.bucket_name)
self.assertEqual('obj#13a990880167400', uri.object_name)
self.assertEqual(None, uri.version_id)
self.assertEqual(None, uri.generation)
self.assertEqual(uri.names_provider(), False)
self.assertEqual(uri.names_container(), False)
self.assertEqual(uri.names_bucket(), False)
self.assertEqual(uri.names_object(), True)
self.assertEqual(uri.names_directory(), False)
self.assertEqual(uri.names_file(), False)
self.assertEqual(uri.is_stream(), False)
self.assertEqual(uri.is_version_specific, False)
def testHasVersion(self):
uri = storage_uri("gs://bucket/obj")
self.assertFalse(uri.has_version())
uri.version_id = "versionid"
self.assertTrue(uri.has_version())
uri = storage_uri("gs://bucket/obj")
# Generation triggers versioning.
uri.generation = 12345
self.assertTrue(uri.has_version())
uri.generation = None
self.assertFalse(uri.has_version())
# Zero-generation counts as a version.
uri = storage_uri("gs://bucket/obj")
uri.generation = 0
self.assertTrue(uri.has_version())
def testPropertiesUpdated(self):
b = self._MakeBucket()
bucket_uri = storage_uri("gs://%s" % b.name)
key_uri = bucket_uri.clone_replace_name("obj")
key_uri.set_contents_from_string("data1")
self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+")
k = b.get_key("obj")
self.assertEqual(k.generation, key_uri.generation)
self.assertEquals(k.get_contents_as_string(), "data1")
key_uri.set_contents_from_stream(StringIO.StringIO("data2"))
self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+")
self.assertGreater(key_uri.generation, k.generation)
k = b.get_key("obj")
self.assertEqual(k.generation, key_uri.generation)
self.assertEquals(k.get_contents_as_string(), "data2")
key_uri.set_contents_from_file(StringIO.StringIO("data3"))
self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+")
self.assertGreater(key_uri.generation, k.generation)
k = b.get_key("obj")
self.assertEqual(k.generation, key_uri.generation)
self.assertEquals(k.get_contents_as_string(), "data3")
def test_cors_xml_storage_uri(self):
"""Test setting and getting of CORS XML documents with storage_uri."""
# create a new bucket
bucket = self._MakeBucket()
bucket_name = bucket.name
uri = storage_uri('gs://' + bucket_name)
# get new bucket cors and make sure it's empty
cors = re.sub(r'\s', '', uri.get_cors().to_xml())
self.assertEqual(cors, CORS_EMPTY)
# set cors document on new bucket
cors_obj = Cors()
h = handler.XmlHandler(cors_obj, None)
xml.sax.parseString(CORS_DOC, h)
uri.set_cors(cors_obj)
cors = re.sub(r'\s', '', uri.get_cors().to_xml())
self.assertEqual(cors, CORS_DOC)
def test_storage_uri_regionless(self):
# First, create a bucket in a different region.
conn = S3Connection(
host='s3-us-west-2.amazonaws.com'
)
bucket_name = 'keytest-%d' % int(time.time())
bucket = conn.create_bucket(bucket_name, location=Location.USWest2)
self.addCleanup(self.nuke_bucket, bucket)
# Now use ``storage_uri`` to try to make a new key.
# This would throw a 301 exception.
suri = boto.storage_uri('s3://%s/test' % bucket_name)
the_key = suri.new_key()
the_key.key = 'Test301'
the_key.set_contents_from_string(
'This should store in a different region.'
)
# Check it a different way.
alt_conn = boto.connect_s3(host='s3-us-west-2.amazonaws.com')
alt_bucket = alt_conn.get_bucket(bucket_name)
alt_key = alt_bucket.get_key('Test301')
def resetConnection(self):
import boto
if getattr(self, 'conn', False):
self.conn.close()
self.bucket = None
self.conn = None
self.storage_uri = None
del self.conn
del self.storage_uri
self.storage_uri = boto.storage_uri(self.boto_uri_str)
self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
if not self.conn.lookup(self.bucket_name):
self.bucket = self.conn.create_bucket(self.bucket_name,
location=self.my_location)
else:
self.bucket = self.conn.get_bucket(self.bucket_name)
def get_uri(self):
return boto.storage_uri(self.path, 'gs')
def test_roundtrip_versioned_gs_object_uri_parsed(self):
uri_str = 'gs://bucket/obj#1359908801674000'
uri = boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
roundtrip_uri = boto.storage_uri(uri.uri, validate=False,
suppress_consec_slashes=False)
self.assertEqual(uri.uri, roundtrip_uri.uri)
self.assertEqual(uri.is_version_specific, True)
def test_invalid_scheme(self):
uri_str = 'mars://bucket/object'
try:
boto.storage_uri(uri_str, validate=False,
suppress_consec_slashes=False)
except InvalidUriError as e:
self.assertIn('Unrecognized scheme', e.message)
def testCloneReplaceKey(self):
b = self._MakeBucket()
k = b.new_key("obj")
k.set_contents_from_string("stringdata")
orig_uri = storage_uri("gs://%s/" % b.name)
uri = orig_uri.clone_replace_key(k)
self.assertTrue(uri.has_version())
self.assertRegexpMatches(str(uri.generation), r"[0-9]+")
def testSetAclXml(self):
"""Ensures that calls to the set_xml_acl functions succeed."""
b = self._MakeBucket()
k = b.new_key("obj")
k.set_contents_from_string("stringdata")
bucket_uri = storage_uri("gs://%s/" % b.name)
# Get a valid ACL for an object.
bucket_uri.object_name = "obj"
bucket_acl = bucket_uri.get_acl()
bucket_uri.object_name = None
# Add a permission to the ACL.
all_users_read_permission = ("<Entry><Scope type='AllUsers'/>"
"<Permission>READ</Permission></Entry>")
acl_string = re.sub(r"</Entries>",
all_users_read_permission + "</Entries>",
bucket_acl.to_xml())
# Test-generated owner IDs are not currently valid for buckets
acl_no_owner_string = re.sub(r"<Owner>.*</Owner>", "", acl_string)
# Set ACL on an object.
bucket_uri.set_xml_acl(acl_string, "obj")
# Set ACL on a bucket.
bucket_uri.set_xml_acl(acl_no_owner_string)
# Set the default ACL for a bucket.
bucket_uri.set_def_xml_acl(acl_no_owner_string)
# Verify all the ACLs were successfully applied.
new_obj_acl_string = k.get_acl().to_xml()
new_bucket_acl_string = bucket_uri.get_acl().to_xml()
new_bucket_def_acl_string = bucket_uri.get_def_acl().to_xml()
self.assertRegexpMatches(new_obj_acl_string, r"AllUsers")
self.assertRegexpMatches(new_bucket_acl_string, r"AllUsers")
self.assertRegexpMatches(new_bucket_def_acl_string, r"AllUsers")
def test_default_object_acls_storage_uri(self):
"""Test default object acls using storage_uri."""
# create a new bucket
bucket = self._MakeBucket()
bucket_name = bucket.name
uri = storage_uri('gs://' + bucket_name)
# get default acl and make sure it's project-private
acl = uri.get_def_acl()
self.assertIsNotNone(re.search(PROJECT_PRIVATE_RE, acl.to_xml()))
# set default acl to a canned acl and verify it gets set
uri.set_def_acl('public-read')
acl = uri.get_def_acl()
# save public-read acl for later test
public_read_acl = acl
self.assertEqual(acl.to_xml(), ('<AccessControlList><Entries><Entry>'
'<Scope type="AllUsers"></Scope><Permission>READ</Permission>'
'</Entry></Entries></AccessControlList>'))
# back to private acl
uri.set_def_acl('private')
acl = uri.get_def_acl()
self.assertEqual(acl.to_xml(),
'<AccessControlList></AccessControlList>')
# set default acl to an xml acl and verify it gets set
uri.set_def_acl(public_read_acl)
acl = uri.get_def_acl()
self.assertEqual(acl.to_xml(), ('<AccessControlList><Entries><Entry>'
'<Scope type="AllUsers"></Scope><Permission>READ</Permission>'
'</Entry></Entries></AccessControlList>'))
# back to private acl
uri.set_def_acl('private')
acl = uri.get_def_acl()
self.assertEqual(acl.to_xml(),
'<AccessControlList></AccessControlList>')
def test_lifecycle_config_storage_uri(self):
"""Test setting and getting of lifecycle config with storage_uri."""
# create a new bucket
bucket = self._MakeBucket()
bucket_name = bucket.name
uri = storage_uri('gs://' + bucket_name)
# get lifecycle config and make sure it's empty
xml = uri.get_lifecycle_config().to_xml()
self.assertEqual(xml, LIFECYCLE_EMPTY)
# set lifecycle config
lifecycle_config = LifecycleConfig()
lifecycle_config.add_rule('Delete', None, LIFECYCLE_CONDITIONS)
uri.configure_lifecycle(lifecycle_config)
xml = uri.get_lifecycle_config().to_xml()
self.assertEqual(xml, LIFECYCLE_DOC)
def get_bucket_list(bucket_name):
uri = boto.storage_uri(bucket_name, 'gs')
for obj in uri.get_bucket():
yield obj.name
def get_bucket_list(bucket_name):
uri = boto.storage_uri(bucket_name, 'gs')
for obj in uri.get_bucket():
yield obj.name
def add_thumbnail(file_name):
''' add a thumbnail for the specific file'''
src_uri = boto.storage_uri('{}/{}'.format(conf.photos_bucket_name,
file_name),
'gs')
dest_uri = boto.storage_uri('{}/{}'.format(conf.thumbnails_bucket_name,
file_name),
'gs')
try:
new_key = dest_uri.new_key()
except boto.exception.NoAuthHandlerFound as e:
logging.error(e)
return None
# Create a file-like object for holding the photo contents.
photo = StringIO.StringIO()
src_uri.get_key().get_file(photo)
thumbnail = StringIO.StringIO()
im = Image.open(photo)
im.thumbnail((260, 260))
im.save(thumbnail, 'JPEG')
thumbnail.seek(0)
# save the thumbnail
try:
new_key.set_contents_from_file(thumbnail)
new_key.make_public()
except boto.exception.GSResponseError as e:
logging.error(e)
# Do we have the credentials file set up?
boto_cred_file = os.path.expanduser('~') + '/.boto'
if not os.path.exists(boto_cred_file):
logging.error('Credentials file {} was not found.'.format(boto_cred_file))
def upload_file(file_obj, bucket, file_oid, object_md, make_public=False):
'''
Upload the file object to a bucket using credentials and object metadata.
Object name is a part of its metadata.
'''
if getpass.getuser() == 'bhs':
boto_cred_file = '/home/bhs/.boto'
else:
boto_cred_file = os.path.expanduser('~') + '/.boto'
fn = str(file_oid)
dest_uri = boto.storage_uri(bucket + '/' + fn, 'gs')
try:
new_key = dest_uri.new_key()
except boto.exception.NoAuthHandlerFound as e:
print e.message
return None
new_key.update_metadata(object_md)
try:
new_key.set_contents_from_file(file_obj)
if make_public:
new_key.make_public()
except boto.exception.GSResponseError as e:
# Do we have the credentials file set up?
if not os.path.exists(boto_cred_file):
print('Credentials file {} was not found.'.format(boto_cred_file))
return None
return str(dest_uri)
def save(self, data, name):
''' google storage code
uri = boto.storage_uri(self.dest_bucket_name+name+'.json', 'gs')
uri.new_key().set_contents_from_string(json.dumps(data))
'''
data['id'] = name
data['tree_num'] = self.meta['num']
data['tree_size'] = self.meta['persons']
data['tree_file_id'] = self.meta['file_id']
self.onsave(data)
def __init__(self, parsed_url):
import boto
from boto.s3.connection import Location
duplicity.backend.Backend.__init__(self, parsed_url)
assert boto.Version >= BOTO_MIN_VERSION
# This folds the null prefix and all null parts, which means that:
# //MyBucket/ and //MyBucket are equivalent.
# //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
self.url_parts = [x for x in parsed_url.path.split('/') if x != '']
if self.url_parts:
self.bucket_name = self.url_parts.pop(0)
else:
# Duplicity hangs if boto gets a null bucket name.
# HC: Caught a socket error, trying to recover
raise BackendException('Boto requires a bucket name.')
self.scheme = parsed_url.scheme
if self.url_parts:
self.key_prefix = '%s/' % '/'.join(self.url_parts)
else:
self.key_prefix = ''
self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
self.parsed_url = parsed_url
# duplicity and boto.storage_uri() have different URI formats.
# boto uses scheme://bucket[/name] and specifies hostname on connect()
self.boto_uri_str = '://'.join((parsed_url.scheme[:2],
parsed_url.path.lstrip('/')))
if globals.s3_european_buckets:
self.my_location = Location.EU
else:
self.my_location = ''
self.resetConnection()
self._listed_keys = {}
def _close(self):
del self._listed_keys
self._listed_keys = {}
self.bucket = None
self.conn = None
self.storage_uri = None
del self.conn
del self.storage_uri
def __init__(self, config):
self.__uri = boto.storage_uri(config.url)
self.__key = config.key
self.__secret = config.secret
self.__bucket = None
def handle_uploaded_file(f, user):
gcs_bucket = settings.GCS_BUCKET
user_key = md5.md5(str(user.username)).hexdigest()
# Sometimes things have paths in them? Windows used to do this.
name = f.name.split("/")[-1]
file_key = "%s/%s" % (user_key, name)
uri = boto.storage_uri("%s/%s" % (gcs_bucket, file_key), "gs")
uri.set_contents_from_string(f.read())
return "http://%s/%s" % (gcs_bucket, file_key)