def GetObjectStorage(container, filename):
if 'VCAP_SERVICES' not in os.environ:
return MakeJSONMsgResponse({"message":"cannot authenticate"}, 500)
try:
ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region})
obj = ibmobjectstoreconn.get_object(container, filename)
if filename.endswith('.txt'):
return Response(obj[1], mimetype='text/plain', status=200)
elif filename.endswith('.csv'):
return Response(obj[1], mimetype='text/csv', status=200)
elif filename.endswith('.json'):
return Response(obj[1], mimetype='application/json', status=200)
else:
return Response(obj[1], mimetype='application/binary', status=200)
except ClientException as ce:
return MakeJSONMsgResponse({"message":ce.msg,"containername":container,"filename":filename}, 404)
python类Connection()的实例源码
def _get_object_storage_client(self, username, password, tenant_name):
auth_url = CONF.identity.uri
# add current tenant to swift operator role group.
keystone_admin = self._get_identity_client(
CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name)
# enable test user to operate swift by adding operator role to him.
roles = keystone_admin.roles.list()
operator_role = CONF.object_storage.operator_role
member_role = [role for role in roles if role.name == operator_role][0]
# NOTE(maurosr): This is surrounded in the try-except block cause
# neutron tests doesn't have tenant isolation.
try:
keystone_admin.roles.add_user_role(self.identity_client.user_id,
member_role.id,
self.identity_client.tenant_id)
except keystoneclient.apiclient.exceptions.Conflict:
pass
return swiftclient.Connection(auth_url, username, password,
tenant_name=tenant_name,
auth_version='2')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
endpoint_type='publicURL')
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def GetObjStoreInfo():
try:
ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region})
conns = []
for container in ibmobjectstoreconn.get_account()[1]:
container['accessURL'] = thehost + "/" + container['name']
container['objects'] = container['count']
del container['count']
conns.append(container)
jconns = '{"containers":'+json.dumps(conns)+'}'
return Response(jconns, mimetype='application/json', status=200)
except ClientException as ce:
return MakeJSONMsgResponse({"message":ce.msg,"containername":container,"filename":filename}, 404)
def GetObjStoContainerInfo(container):
try:
ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region})
objs = []
for data in ibmobjectstoreconn.get_container(container)[1]:
del data['hash']
data['downloadURL'] = thehost + "/" + container + "/" + data['name']
objs.append(data)
jobjs = '{"objects":'+json.dumps(objs)+'}'
return Response(jobjs, mimetype='application/json', status=200)
except ClientException as ce:
return MakeJSONMsgResponse({"message":ce.msg+". Bad container name?","containername":container}, 404)
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def get_swiftclient():
session = _get_user_keystone_session()
conn = swiftclient.Connection(session=session)
return conn
def main(context, container, object):
conn = swiftclient.Connection(
session=context['os_session'],
os_options={'region_name': 'RegionOne'},
)
new_container = '%s_thumb' % container
# Download original photo
image_path = '/%s' % object
_, obj_contents = conn.get_object(container, object)
with open(image_path, 'w') as local:
local.write(obj_contents)
print('Downloaded object %s from container %s' % (object, container))
thumb_path = '/thumb_%s' % object
resize_image(image_path, thumb_path)
print('Resized.')
# Upload thumb photo
with open(thumb_path, 'r') as new_local:
conn.put_object(
new_container,
object,
contents=new_local,
content_type='text/plain'
)
os.remove(image_path)
os.remove(thumb_path)
print('Uploaded object %s to container %s' % (object, new_container))
def stat_object(context, container, object):
conn = swiftclient.Connection(
session=context['os_session'],
os_options={'region_name': 'RegionOne'},
)
obj_header = conn.head_object(container, object)
return obj_header
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with swift api."""
self.log.debug('Authenticating swift user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
if keystone.session:
return swiftclient.Connection(session=keystone.session)
else:
return swiftclient.Connection(authurl=ep,
user=user,
key=password,
tenant_name=tenant,
auth_version='2.0')