def create(
self,
url,
session=None,
method=None,
**params
):
"""Create a new resource
:param string url:
The API-specific portion of the URL path
:param Session session:
HTTP client session
:param string method:
HTTP method (default POST)
"""
if not method:
method = 'POST'
ret = self._request(method, url, session=session, **params)
# Should this move into _requests()?
try:
return ret.json()
except json.JSONDecodeError:
return ret
python类Session()的实例源码
def get_context():
"""Uses environmental variables to generate authentication
credentials"""
username = os.getenv('OS_USERNAME', None)
password = os.getenv('OS_PASSWORD', None)
auth_url = os.getenv('OS_AUTH_URL', None) + '/v' + os.getenv('OS_IDENTITY_API_VERSION', None)
project_name = os.getenv('OS_PROJECT_NAME', None)
auth = identity.V3Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
user_domain_name="default",
project_domain_name="default")
sess = session.Session(auth=auth, verify=False)
return context.RequestContext(auth_token=auth.get_token(sess),
tenant=auth.get_project_id(sess))
def auth_through_token(app: web.Application, handler):
async def middleware_handler(request: web.Request):
headers = request.headers
x_auth_token = headers.get("X-Auth-Token")
project_id = request.match_info.get('project_id')
c = config.Config.config_instance()
try:
auth = identity.Token(c.auth_url,
token=x_auth_token,
project_id=project_id)
sess = session.Session(auth=auth)
ks = client.Client(session=sess,
project_id=project_id)
ks.authenticate(token=x_auth_token)
except Exception as ex:
return web.json_response(status=401, data={
"error": {
"message": ("Not authorized. Reason: {}"
.format(str(ex)))
}
})
return await handler(request)
return middleware_handler
def __init__(self, host, port=None, scheme='http', token=None,
marker=None):
self.__session = requests.Session()
self.__session.headers.update({
'X-Auth-Token': token,
'X-Context-Marker': marker
})
self.host = host
self.scheme = scheme
if port:
self.port = port
self.base_url = "%s://%s:%s/api/" % (self.scheme, self.host,
self.port)
else:
# assume default port for scheme
self.base_url = "%s://%s/api/" % (self.scheme, self.host)
self.token = token
self.marker = marker
self.logger = logging.getLogger(__name__)
def get_cinder_client_from_env():
# We should catch KeyError exception with the purpose of
# source or configure openrc file.
auth_url = os.environ['OS_AUTH_URL']
username = os.environ['OS_USERNAME']
password = os.environ['OS_PASSWORD']
project_name = os.environ['OS_PROJECT_NAME']
# Either project(user)_domain_name or project(user)_domain_id
# would be acceptable.
project_domain_name = os.environ.get("OS_PROJECT_DOMAIN_NAME")
project_domain_id = os.environ.get("OS_PROJECT_DOMAIN_ID")
user_domain_name = os.environ.get("OS_USER_DOMAIN_NAME")
user_domain_id = os.environ.get("OS_USER_DOMAIN_ID")
auth = identity.Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
session = ks.Session(auth=auth)
return client.Client(session=session)
def get_manila_client_from_env():
# We should catch KeyError exception with the purpose of
# source or configure openrc file.
auth_url = os.environ['OS_AUTH_URL']
username = os.environ['OS_USERNAME']
password = os.environ['OS_PASSWORD']
project_name = os.environ['OS_PROJECT_NAME']
# Either project(user)_domain_name or project(user)_domain_id
# would be acceptable.
project_domain_name = os.environ.get("OS_PROJECT_DOMAIN_NAME")
project_domain_id = os.environ.get("OS_PROJECT_DOMAIN_ID")
user_domain_name = os.environ.get("OS_USER_DOMAIN_NAME")
user_domain_id = os.environ.get("OS_USER_DOMAIN_ID")
auth = identity.Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
session = ks.Session(auth=auth)
return manila_client.Client(session=session, client_version='2')
def get_legacy_keystone_session(**kwargs):
keystone_conf = CONF.keystone
config = {}
config['auth_url'] = keystone_conf.auth_url
config['username'] = keystone_conf.admin_user
config['password'] = keystone_conf.admin_password
config['tenant_name'] = keystone_conf.admin_tenant_name
config['token'] = keystone_conf.admin_token
config.update(kwargs)
if keystone_conf.auth_insecure:
verify = False
else:
verify = keystone_conf.auth_ca_cert
return Session(auth=_openstack_auth_from_config(**config), verify=verify)
def nova_live_migrate(node):
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=os.environ["OS_AUTH_URL"],
username=os.environ["OS_USERNAME"],
password=os.environ["OS_PASSWORD"],
user_domain_name=os.environ["OS_USER_DOMAIN_NAME"],
project_domain_name=os.environ["OS_PROJECT_DOMAIN_NAME"],
project_name=os.environ["OS_PROJECT_NAME"])
OS_COMPUTE_API_VERSION = "2"
sess = session.Session(auth=auth)
nova = client.Client(OS_COMPUTE_API_VERSION, session=sess)
LOG.info("Disabling nova-compute service on: %s", node)
nova.services.disable(node, "nova-compute")
for server in nova.servers.list(search_opts={'host': node}):
LOG.info("Live-migrating instance: %s from node: %s", server.name,
node)
server.live_migrate(block_migration=True)
thread.start_new_thread(live_migration_watcher_thread, (nova, node))
def get_neutron_client_from_env():
# We should catch KeyError exception with the purpose of
# source or configure openrc file.
auth_url = os.environ['OS_AUTH_URL']
username = os.environ['OS_USERNAME']
password = os.environ['OS_PASSWORD']
project_name = os.environ['OS_PROJECT_NAME']
# Either project(user)_domain_name or project(user)_domain_id
# would be acceptable.
project_domain_name = os.environ.get("OS_PROJECT_DOMAIN_NAME")
project_domain_id = os.environ.get("OS_PROJECT_DOMAIN_ID")
user_domain_name = os.environ.get("OS_USER_DOMAIN_NAME")
user_domain_id = os.environ.get("OS_USER_DOMAIN_ID")
auth = identity.Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name)
session = ks.Session(auth=auth)
return client.Client(session=session)
def get_local_auth(user_token):
"""Return a Keystone session for the local cluster."""
LOG.debug("Getting session for %s" % user_token)
client = get_client()
token = v3.tokens.TokenManager(client)
try:
token_data = token.validate(token=user_token, include_catalog=False)
except http.NotFound:
abort(401)
project_id = token_data['project']['id']
local_auth = identity.v3.Token(auth_url=CONF.auth.auth_url,
token=user_token,
project_id=project_id)
return session.Session(auth=local_auth)
def get_sp_auth(service_provider, user_token, remote_project_id):
"""Perform K2K auth, and return a session for a remote cluster."""
conf = config.service_providers.get(CONF, service_provider)
local_auth = get_local_auth(user_token).auth
LOG.debug("Getting session for (%s, %s, %s)" % (service_provider,
user_token,
remote_project_id))
remote_auth = identity.v3.Keystone2Keystone(
local_auth,
conf.sp_name,
project_id=remote_project_id
)
return session.Session(auth=remote_auth)
def get_keystone_session(
auth_url=get_env_param('OS_AUTH_URL'),
project_name=os.environ.get('OS_PROJECT_NAME'),
tenant_name=os.environ.get('OS_TENANT_NAME'),
project_domain_name=os.environ.get('OS_PROJECT_DOMAIN_NAME',
'default'), # noqa
username=get_env_param('OS_USERNAME'),
user_domain_name=os.environ.get('OS_USER_DOMAIN_NAME', 'default'),
password=get_env_param('OS_PASSWORD')):
if not project_name:
if not tenant_name:
raise Exception("Either OS_PROJECT_NAME or OS_TENANT_NAME is "
"required.")
project_name = tenant_name
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=auth_url, project_name=project_name,
project_domain_name=project_domain_name, username=username,
user_domain_name=user_domain_name, password=password)
sess = session.Session(auth=auth)
return sess
def get_keystone_session(
auth_url=get_env_param('OS_AUTH_URL'),
project_name=os.environ.get('OS_PROJECT_NAME'),
tenant_name=os.environ.get('OS_TENANT_NAME'),
project_domain_name=os.environ.get('OS_PROJECT_DOMAIN_NAME',
'default'), # noqa
username=get_env_param('OS_USERNAME'),
user_domain_name=os.environ.get('OS_USER_DOMAIN_NAME', 'default'),
password=get_env_param('OS_PASSWORD')):
if not project_name:
if not tenant_name:
raise Exception(
"Either OS_PROJECT_NAME or OS_TENANT_NAME is required.")
project_name = tenant_name
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=auth_url, project_name=project_name,
project_domain_name=project_domain_name, username=username,
user_domain_name=user_domain_name, password=password)
sess = session.Session(auth=auth)
return sess
def __init__(self):
os_auth_url = os.getenv('OS_AUTH_URL')
os_auth_url = os_auth_url.replace('v2.0', 'v3')
if not os_auth_url.endswith('v3'):
os_auth_url += '/v3'
os_username = os.getenv('OS_USERNAME')
os_password = os.getenv('OS_PASSWORD')
os_tenant_name = os.getenv('OS_TENANT_NAME')
self.glance_endpoint = os_auth_url.replace('keystone/v3', 'glance')
sys.stdout.write('Using glance endpoint: ' + self.glance_endpoint)
v3_auth = v3.Password(auth_url=os_auth_url, username=os_username,
password=os_password,
project_name=os_tenant_name,
project_domain_name='default',
user_domain_name='default')
self.sess = session.Session(auth=v3_auth, verify=False) # verify=True
def __init__(
self,
session=None,
endpoint=None,
**kwargs
):
"""Base object that contains some common API objects and methods
:param Session session:
The default session to be used for making the HTTP API calls.
:param string endpoint:
The URL from the Service Catalog to be used as the base for API
requests on this API.
"""
super(KeystoneSession, self).__init__()
# a requests.Session-style interface
self.session = session
self.endpoint = endpoint
def __init__(
self,
session=None,
service_type=None,
endpoint=None,
**kwargs
):
"""Base object that contains some common API objects and methods
:param Session session:
The default session to be used for making the HTTP API calls.
:param string service_type:
API name, i.e. ``identity`` or ``compute``
:param string endpoint:
The URL from the Service Catalog to be used as the base for API
requests on this API.
"""
super(BaseAPI, self).__init__(session=session, endpoint=endpoint)
self.service_type = service_type
# The basic action methods all take a Session and return dict/lists
def create(
self,
url,
session=None,
method=None,
**params
):
"""Create a new resource
:param string url:
The API-specific portion of the URL path
:param Session session:
HTTP client session
:param string method:
HTTP method (default POST)
"""
if not method:
method = 'POST'
ret = self._request(method, url, session=session, **params)
# Should this move into _requests()?
try:
return ret.json()
except json.JSONDecodeError:
return ret
def test_paginate_stops_with_first_empty_list(self):
"""Verify the behaviour of Session#paginate."""
response = self.create_response(
[], 'http://example.com/v1/items?limit=30&marker=foo'
)
mock_session = mock.Mock()
mock_session.request.return_value = response
craton_session = session.Session(session=mock_session)
paginated_items = list(craton_session.paginate(
url='http://example.com/v1/items',
items_key='items',
autopaginate=True,
))
self.assertListEqual([(response, [])], paginated_items)
mock_session.request.assert_called_once_with(
method='GET',
url='http://example.com/v1/items',
endpoint_filter={'service_type': 'fleet_management'},
)
def test_keystone_auth_configures_craton_session(self):
"""Verify the configuration of a cratonclient Session."""
new_session = auth.keystone_auth(
auth_url='https://identity.openstack.org/v3',
username='admin',
password='adminPassword',
project_id=PROJECT_ID,
project_domain_name='Default',
user_domain_name='Default',
)
self.assertIsInstance(new_session, session.Session)
keystone_session = new_session._session
self.assertIsInstance(keystone_session, ksa_session.Session)
self.assertIsInstance(keystone_session.auth, ksa_password.Password)
def create_session_with(auth_plugin, verify):
"""Create a cratonclient Session with the specified auth and verify values.
:param auth_plugin:
The authentication plugin to use with the keystoneauth1 Session
object.
:type auth_plugin:
keystoneauth1.plugin.BaseAuthPlugin
:param bool verify:
Whether or not to verify HTTPS certificates provided by the server.
:returns:
Configured cratonclient session.
:rtype:
cratonclient.session.Session
"""
from cratonclient import session
return session.Session(session=ksa_session.Session(
auth=auth_plugin,
verify=verify,
))
def __init__(self, session=None, username=None, token=None,
project_id=None):
"""Initialize our Session.
:param session:
The session instance to use as an underlying HTTP transport. If
not provided, we will create a keystoneauth1 Session object.
:param str username:
The username of the person authenticating against the API.
:param str token:
The authentication token of the user authenticating.
:param str project_id:
The user's project id in Craton.
"""
if session is None:
_auth = auth.CratonAuth(
username=username,
project_id=project_id,
token=token,
)
session = ksa_session.Session(auth=_auth)
self._session = session
self._session.user_agent = 'python-cratonclient/{0}'.format(
cratonclient.__version__)
def _get_client(self):
"""Get a nova client"""
auth = identity.Password(
auth_url=CONF.openstack_credential.auth_url,
username=CONF.openstack_credential.username,
password=CONF.openstack_credential.password,
user_domain_id=CONF.openstack_credential.user_domain_id,
project_domain_name=CONF.openstack_credential.project_domain_id,
project_name=CONF.openstack_credential.project_name)
sess = session.Session(auth=auth, verify=False)
nova_client_version = CONF.openstack_credential.nova_client_version
n_client = client.Client(
nova_client_version,
session=sess,
region_name=CONF.openstack_credential.region_name)
return n_client
def __init__(self, user, os_identity_api_version="3",
os_network_api_version="2", os_volume_api_version="2",
os_compute_api_version="2", os_image_api_version="2"):
"""Create instance of `ClientFactory` class
@param user: User for client factory instance
@type user: `dict`
"""
self.user = user
self.auth = v3.Password(**user)
self.session = session.Session(auth=self.auth)
self.os_identity_api_version = os_identity_api_version
self.os_network_api_version = os_network_api_version
self.os_volume_api_version = os_volume_api_version
self.os_compute_api_version = os_compute_api_version
self.os_image_api_version = os_image_api_version
def __init__(self, controller_ip, user='admin', passwd='admin',
tenant='admin'):
"""Create API client for manila service"""
super(ManilaActions, self).__init__(controller_ip,
user, passwd,
tenant)
auth_url, cert_path = self.__make_auth_url(controller_ip)
auth = v2.Password(auth_url=auth_url, username=user,
password=passwd, tenant_name=tenant)
if not DISABLE_SSL:
if VERIFY_SSL:
self.__keystone_ses = KeystoneSession(
auth=auth, ca_cert=cert_path)
else:
self.__keystone_ses = KeystoneSession(
auth=auth, verify=False)
else:
self.__keystone_ses = KeystoneSession(
auth=auth)
def _connect_keystone(self):
"""Get an OpenStack Keystone (identity) client object."""
if self._keystone_version == 3:
return keystone_client.Client(session=self._keystone_session,
auth_url=self.auth_url)
else:
# Wow, the internal keystoneV2 implementation is terribly buggy. It
# needs both a separate Session object and the username, password
# again for things to work correctly. Plus, a manual call to
# authenticate() is also required if the service catalog needs
# to be queried.
keystone = keystone_client.Client(
session=self._keystone_session,
auth_url=self.auth_url,
username=self.username,
password=self.password,
project_name=self.project_name,
region_name=self.region_name)
keystone.authenticate()
return keystone
def uncleanable():
"""Session fixture to get data structure with resources not to cleanup.
Each test uses cleanup resources mechanism, but some resources should be
skipped there, because they should be present during several tests. This
data structure contains such resources.
"""
data = attrdict.AttrDict()
data.backup_ids = set()
data.image_ids = set()
data.keypair_ids = set()
data.server_ids = set()
data.nodes_ids = set()
data.chassis_ids = set()
data.snapshot_ids = set()
data.transfer_ids = set()
data.volume_ids = set()
data.network_ids = set()
data.router_ids = set()
return data
def _get_keystone_session(config, project=None):
"""Return a new keystone session based on configuration.
Arguments:
config (dict): a dictionary with the session configuration keys: ``auth_url``, ``username``, ``password``.
project (str, optional): a project to scope the session to.
Returns:
keystoneauth1.session.Session: the Keystone session scoped for the project if specified.
"""
auth = keystone_identity.Password(
auth_url='{auth_url}/v3'.format(auth_url=config.get('auth_url', 'http://localhost:5000')),
username=config.get('username', 'username'),
password=config.get('password', 'password'),
project_name=project,
user_domain_id='default',
project_domain_id='default')
return keystone_session.Session(auth=auth)
def get_auth_session():
""" Returns a global auth session to be shared by all clients """
global client_auth_session
if not client_auth_session:
auth = v3.Password(
username=settings.KEYSTONE['username'],
password=settings.KEYSTONE['password'],
project_name=settings.KEYSTONE['project_name'],
auth_url=settings.KEYSTONE['auth_url'],
user_domain_id=settings.KEYSTONE.get('domain_id', "default"),
project_domain_id=settings.KEYSTONE.get('domain_id', "default"),
)
client_auth_session = session.Session(auth=auth)
return client_auth_session
def get_session(self):
dct = {
'username': self.rc_username,
'password': self.rc_password,
'auth_url': self.rc_auth_url
}
auth = None
if self.rc_identity_api_version == 3:
dct.update({
'project_name': self.rc_project_name,
'project_domain_name': self.rc_project_domain_name,
'user_domain_name': self.rc_user_domain_name
})
auth = v3.Password(**dct)
else:
dct.update({
'tenant_name': self.rc_tenant_name
})
auth = v2.Password(**dct)
return session.Session(auth=auth, verify=self.rc_cacert)
def authenticate_keystone(self, keystone_ip, username, password,
api_version=False, admin_port=False,
user_domain_name=None, domain_name=None,
project_domain_name=None, project_name=None):
"""Authenticate with Keystone"""
self.log.debug('Authenticating with keystone...')
port = 5000
if admin_port:
port = 35357
base_ep = "http://{}:{}".format(keystone_ip.strip().decode('utf-8'),
port)
if not api_version or api_version == 2:
ep = base_ep + "/v2.0"
auth = v2.Password(
username=username,
password=password,
tenant_name=project_name,
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
client = keystone_client.Client(session=sess)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(sess)
return client
else:
ep = base_ep + "/v3"
auth = v3.Password(
user_domain_name=user_domain_name,
username=username,
password=password,
domain_name=domain_name,
project_domain_name=project_domain_name,
project_name=project_name,
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
client = keystone_client_v3.Client(session=sess)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(sess)
return client