def _get_dcos_auth(auth_scheme, username, password, hostname):
"""Get authentication flow for dcos acs auth and dcos oauth
:param auth_scheme: authentication_scheme
:type auth_scheme: str
:param username: username user for authentication
:type username: str
:param password: password for authentication
:type password: str
:param hostname: hostname for credentials
:type hostname: str
:returns: DCOSAcsAuth
:rtype: AuthBase
"""
toml_config = util.get_config()
token = toml_config.get("core.dcos_acs_token")
if token is None:
dcos_url = toml_config.get("core.dcos_url")
if auth_scheme == "acsjwt":
creds = _get_dcos_acs_auth_creds(username, password, hostname)
else:
creds = _get_dcos_oauth_creds(dcos_url)
verify = _verify_ssl()
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
if verify is not None:
silence_requests_warnings()
url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
# using private method here, so we don't retry on this request
# error here will be bubbled up to _request_with_auth
response = _request('post', url, json=creds, verify=verify)
if response.status_code == 200:
token = response.json()['token']
config.set_val("core.dcos_acs_token", token)
return DCOSAcsAuth(token)
评论列表
文章目录