def _default_is_success(status_code):
"""Returns true if the success status is between [200, 300).
:param response_status: the http response status
:type response_status: int
:returns: True for success status; False otherwise
:rtype: bool
"""
return 200 <= status_code < 300
python类Response()的实例源码
def get_auth_scheme(response):
"""Return authentication scheme and realm requested by server for 'Basic'
or 'acsjwt' (DCOS acs auth) or 'oauthjwt' (DCOS acs oauth) type
:param response: requests.response
:type response: requests.Response
:returns: auth_scheme, realm
:rtype: (str, str)
"""
if 'www-authenticate' in response.headers:
auths = response.headers['www-authenticate'].split(',')
scheme = next((auth_type.rstrip().lower() for auth_type in auths
if auth_type.rstrip().lower().startswith("basic") or
auth_type.rstrip().lower().startswith("acsjwt") or
auth_type.rstrip().lower().startswith("oauthjwt")),
None)
if scheme:
scheme_info = scheme.split("=")
auth_scheme = scheme_info[0].split(" ")[0].lower()
realm = scheme_info[-1].strip(' \'\"').lower()
return auth_scheme, realm
else:
return None, None
else:
return None, None
def _get_http_auth(response, url, auth_scheme):
"""Get authentication mechanism required by server
:param response: requests.response
:type response: requests.Response
:param url: parsed request url
:type url: str
:param auth_scheme: str
:type auth_scheme: str
:returns: AuthBase
:rtype: AuthBase
"""
hostname = url.hostname
username = url.username
password = url.password
if 'www-authenticate' in response.headers:
if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']:
msg = ("Server responded with an HTTP 'www-authenticate' field of "
"'{}', DCOS only supports 'Basic'".format(
response.headers['www-authenticate']))
raise DCOSException(msg)
if auth_scheme == 'basic':
# for basic auth if username + password was present,
# we'd already be authed by python requests module
username, password = _get_auth_credentials(username, hostname)
return HTTPBasicAuth(username, password)
# dcos auth (acs or oauth)
else:
return _get_dcos_auth(auth_scheme, username, password, hostname)
else:
msg = ("Invalid HTTP response: server returned an HTTP 401 response "
"with no 'www-authenticate' field")
raise DCOSException(msg)
def _get_dcos_auth(auth_scheme, username, password, hostname):
"""Get authentication flow for dcos acs auth and dcos oauth
:param auth_scheme: authentication_scheme
:type auth_scheme: str
:param username: username user for authentication
:type username: str
:param password: password for authentication
:type password: str
:param hostname: hostname for credentials
:type hostname: str
:returns: DCOSAcsAuth
:rtype: AuthBase
"""
toml_config = util.get_config()
token = toml_config.get("core.dcos_acs_token")
if token is None:
dcos_url = toml_config.get("core.dcos_url")
if auth_scheme == "acsjwt":
creds = _get_dcos_acs_auth_creds(username, password, hostname)
else:
creds = _get_dcos_oauth_creds(dcos_url)
verify = _verify_ssl()
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
if verify is not None:
silence_requests_warnings()
url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
# using private method here, so we don't retry on this request
# error here will be bubbled up to _request_with_auth
response = _request('post', url, json=creds, verify=verify)
if response.status_code == 200:
token = response.json()['token']
config.set_val("core.dcos_acs_token", token)
return DCOSAcsAuth(token)
def query(query, authenticate=False, json=False, full_output=False, **kwargs):
"""
Execute a synchronous TAP query to the ESA Gaia database.
:param query:
The TAP query to execute.
:param authenticate: [optional]
Authenticate with the username and password information stored in the
config.
:param json: [optional]
Return the data in JSON format. If set to False, then the data will be
returned as an `astropy.table.Table`.
:param full_output: [optional]
Return a two-length tuple containing the data and the corresponding
`requests.response` object.
:returns:
The data returned - either as an astropy table or a dictionary (JSON) -
and optionally, the `requests.response` object used.
"""
format = "json" if json else "votable"
params = dict(REQUEST="doQuery", LANG="ADQL", FORMAT=format, query=query)
params.update(kwargs)
# Create session.
session = requests.Session()
if authenticate:
utils.login(session)
response = session.get("{}/tap/sync".format(config.url), params=params)
if not response.ok:
raise TAPQueryException(response)
if json:
data = response.json()
else:
# Take the table contents and return an astropy table.
data = Table.read(StringIO(response.text), format="votable")
return (data, response) if full_output else data
def __init__(self, endpoint, ok_statuses=None, to_none_statuses=None,
empty_to_none=True, close_slash=True,
logger=None, name=None, keep_blank_values=True):
"""Create a client
:param endpoint: str, ex. http://localhost:5000 or http://localhost:5000/api/
:param ok_statuses: default - (200, 201, 202, ), status codes for "ok"
:param to_none_statuses: statuses, for generate None as response, default - (404, )
:param empty_to_none: boolean, default - True, if True - empty response will be generate None response (empty str, empty list, empty dict)
:param close_slash: boolean, url += '/', if url.endswith != '/', default - True
:param logger: logger instance
:param name: name for client
:type name: str
"""
if name is None:
name = '<client: {}>'.format(endpoint)
if logger is None:
logger = get_logger(__name__)
self.logger = InstanceLogger(self, logger)
if endpoint.endswith('/'):
endpoint = endpoint[:-1]
if ok_statuses is not None:
self.ok_statuses = ok_statuses
if to_none_statuses is not None:
self.to_none_statuses = to_none_statuses
self.empty_to_none = empty_to_none
self.close_slash = close_slash
parsed_url = urlparse.urlparse(endpoint)
endpoint = self.get_endpoint_from_parsed_url(parsed_url)
self.keep_blank_values = keep_blank_values
self.endpoint = endpoint
self.path = parsed_url.path
self.query = urlparse.parse_qs(parsed_url.query,
keep_blank_values=self.keep_blank_values)
self.fragment = parsed_url.fragment
self.params = parsed_url.params
self.name = name
self.logger.debug(
'Client built, endpoint: "%s", path: "%s", query: %s, params: %s, fragment: %s',
self.endpoint, self.path,
self.query, self.params, self.fragment)
def request(self, method, url, max_price=None, mock_requests=False, **kwargs):
"""Make a 402 request for a resource.
This is the BitRequests public method that should be used to complete a
402 request using the desired payment method (as constructed by a class
implementing BitRequests)
Args:
method (string): HTTP method for completing the request in lower-
case letters. Examples: 'get', 'post', 'put'
url (string): URL of the requested resource.
data (dict): python dict of parameters to send with the request.
max_price (int): maximum allowed price for a request (in satoshi).
Returns:
response (requests.response):
response from paying for the requested resource.
"""
if mock_requests:
fake_response = requests.models.Response()
fake_response.status_code = 200
fake_response._content = b''
return fake_response
# Make the initial request for the resource
response = requests.request(method, url, **kwargs)
# Return if we receive a status code other than 402: payment required
if response.status_code != requests.codes.payment_required:
return response
# Pass the response to the main method for handling payment
logger.debug('[BitRequests] 402 payment required: {} satoshi.'.format(
response.headers['price']))
payment_headers = self.make_402_payment(response, max_price)
# Reset the position of any files that have been used
self._reset_file_positions(kwargs.get('files'), kwargs.get('data'))
# Add any user-provided headers to the payment headers dict
if 'headers' in kwargs:
if isinstance(kwargs['headers'], dict):
kwargs['headers'].update(payment_headers)
else:
raise ValueError('argument \'headers\' must be a dict.')
else:
kwargs['headers'] = payment_headers
paid_response = requests.request(method, url, **kwargs)
setattr(paid_response, 'amount_paid', int(response.headers['price']))
if paid_response.status_code == requests.codes.ok:
logger.debug('[BitRequests] Successfully purchased resource.')
else:
logger.debug('[BitRequests] Could not purchase resource.')
return paid_response
def make_402_payment(self, response, max_price):
"""Make a bit-transfer payment to the payment-handling service."""
# Retrieve payment headers
headers = response.headers
price = headers.get(BitTransferRequests.HTTP_BITCOIN_PRICE)
payee_address = headers.get(BitTransferRequests.HTTP_BITCOIN_ADDRESS)
payee_username = headers.get(BitTransferRequests.HTTP_BITCOIN_USERNAME)
# Verify that the payment method is supported
if price is None or payee_address is None or payee_username is None:
raise UnsupportedPaymentMethodError(
'Resource does not support that payment method.')
# Convert string headers into correct data types
price = int(price)
# verify that we have the money to purchase the resource
buffer_balance = self.client.get_earnings()["total_earnings"]
if price > buffer_balance:
insuff_funds_err = 'Resource price ({}) exceeds buffer balance ({}).'
raise InsufficientBalanceError(insuff_funds_err.format(price, buffer_balance))
# Verify resource cost against our budget
if max_price and price > max_price:
max_price_err = 'Resource price ({}) exceeds max price ({}).'
raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))
# Get the signing public key
pubkey = self.wallet.get_public_key()
compressed_pubkey = codecs.encode(pubkey.compressed_bytes, 'base64').decode()
# Create and sign BitTranfer
bittransfer = json.dumps({
'payer': self.username,
'payer_pubkey': compressed_pubkey,
'payee_address': payee_address,
'payee_username': payee_username,
'amount': price,
'timestamp': time.time(),
'description': response.url
})
if not isinstance(bittransfer, str):
raise TypeError("Serialized bittransfer must be a string")
signature = self.wallet.sign_message(bittransfer)
logger.debug('[BitTransferRequests] Signature: {}'.format(signature))
logger.debug('[BitTransferRequests] BitTransfer: {}'.format(bittransfer))
return {
'Bitcoin-Transfer': bittransfer,
'Authorization': signature
}
def request(method,
url,
is_success=_default_is_success,
timeout=None,
verify=None,
**kwargs):
"""Sends an HTTP request. If the server responds with a 401, ask the
user for their credentials, and try request again (up to 3 times).
:param method: method for the new Request object
:type method: str
:param url: URL for the new Request object
:type url: str
:param is_success: Defines successful status codes for the request
:type is_success: Function from int to bool
:param timeout: request timeout
:type timeout: int
:param verify: whether to verify SSL certs or path to cert(s)
:type verify: bool | str
:param kwargs: Additional arguments to requests.request
(see http://docs.python-requests.org/en/latest/api/#requests.request)
:type kwargs: dict
:rtype: Response
"""
if 'headers' not in kwargs:
kwargs['headers'] = {'Accept': 'application/json'}
verify = _verify_ssl(verify)
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad certs
if verify is not None:
silence_requests_warnings()
response = _request(method, url, is_success, timeout,
verify=verify, **kwargs)
if response.status_code == 401:
response = _request_with_auth(response, method, url, is_success,
timeout, verify, **kwargs)
if is_success(response.status_code):
return response
elif response.status_code == 403:
raise DCOSAuthorizationException(response)
else:
raise DCOSHTTPException(response)