def get_session(username, password, cookie_path=COOKIE_PATH):
"""Get session, existing or new."""
class USPSAuth(AuthBase): # pylint: disable=too-few-public-methods
"""USPS authorization storage."""
def __init__(self, username, password, cookie_path):
"""Init."""
self.username = username
self.password = password
self.cookie_path = cookie_path
def __call__(self, r):
"""Call is no-op."""
return r
session = requests.session()
session.auth = USPSAuth(username, password, cookie_path)
session.headers.update({'User-Agent': USER_AGENT})
if os.path.exists(cookie_path):
_LOGGER.debug("cookie found at: %s", cookie_path)
session.cookies = _load_cookies(cookie_path)
else:
_login(session)
return session
python类AuthBase()的实例源码
def _get_http_auth(username: str, password: str, auth_type: str) -> AuthBase:
if auth_type == 'basic':
return HTTPBasicAuth(username, password)
if auth_type == 'digest':
return HTTPDigestAuth(username, password)
raise RetsClientError('unknown auth type %s' % auth_type)
def _get_http_auth(response, url, auth_scheme):
"""Get authentication mechanism required by server
:param response: requests.response
:type response: requests.Response
:param url: parsed request url
:type url: str
:param auth_scheme: str
:type auth_scheme: str
:returns: AuthBase
:rtype: AuthBase
"""
hostname = url.hostname
username = url.username
password = url.password
if 'www-authenticate' in response.headers:
if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']:
msg = ("Server responded with an HTTP 'www-authenticate' field of "
"'{}', DCOS only supports 'Basic'".format(
response.headers['www-authenticate']))
raise DCOSException(msg)
if auth_scheme == 'basic':
# for basic auth if username + password was present,
# we'd already be authed by python requests module
username, password = _get_auth_credentials(username, hostname)
return HTTPBasicAuth(username, password)
# dcos auth (acs or oauth)
else:
return _get_dcos_auth(auth_scheme, username, password, hostname)
else:
msg = ("Invalid HTTP response: server returned an HTTP 401 response "
"with no 'www-authenticate' field")
raise DCOSException(msg)
def _get_dcos_auth(auth_scheme, username, password, hostname):
"""Get authentication flow for dcos acs auth and dcos oauth
:param auth_scheme: authentication_scheme
:type auth_scheme: str
:param username: username user for authentication
:type username: str
:param password: password for authentication
:type password: str
:param hostname: hostname for credentials
:type hostname: str
:returns: DCOSAcsAuth
:rtype: AuthBase
"""
toml_config = util.get_config()
token = toml_config.get("core.dcos_acs_token")
if token is None:
dcos_url = toml_config.get("core.dcos_url")
if auth_scheme == "acsjwt":
creds = _get_dcos_acs_auth_creds(username, password, hostname)
else:
creds = _get_dcos_oauth_creds(dcos_url)
verify = _verify_ssl()
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
if verify is not None:
silence_requests_warnings()
url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
# using private method here, so we don't retry on this request
# error here will be bubbled up to _request_with_auth
response = _request('post', url, json=creds, verify=verify)
if response.status_code == 200:
token = response.json()['token']
config.set_val("core.dcos_acs_token", token)
return DCOSAcsAuth(token)