python类Response()的实例源码

http.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _default_is_success(status_code):
    """Returns true if the success status is between [200, 300).

    :param response_status: the http response status
    :type response_status: int
    :returns: True for success status; False otherwise
    :rtype: bool
    """

    return 200 <= status_code < 300
http.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_auth_scheme(response):
    """Return authentication scheme and realm requested by server for 'Basic'
       or 'acsjwt' (DCOS acs auth) or 'oauthjwt' (DCOS acs oauth) type

    :param response: requests.response
    :type response: requests.Response
    :returns: auth_scheme, realm
    :rtype: (str, str)
    """

    if 'www-authenticate' in response.headers:
        auths = response.headers['www-authenticate'].split(',')
        scheme = next((auth_type.rstrip().lower() for auth_type in auths
                       if auth_type.rstrip().lower().startswith("basic") or
                       auth_type.rstrip().lower().startswith("acsjwt") or
                       auth_type.rstrip().lower().startswith("oauthjwt")),
                      None)
        if scheme:
            scheme_info = scheme.split("=")
            auth_scheme = scheme_info[0].split(" ")[0].lower()
            realm = scheme_info[-1].strip(' \'\"').lower()
            return auth_scheme, realm
        else:
            return None, None
    else:
        return None, None
http.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_http_auth(response, url, auth_scheme):
    """Get authentication mechanism required by server

    :param response: requests.response
    :type response: requests.Response
    :param url: parsed request url
    :type url: str
    :param auth_scheme: str
    :type auth_scheme: str
    :returns: AuthBase
    :rtype: AuthBase
    """

    hostname = url.hostname
    username = url.username
    password = url.password

    if 'www-authenticate' in response.headers:
        if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']:
            msg = ("Server responded with an HTTP 'www-authenticate' field of "
                   "'{}', DCOS only supports 'Basic'".format(
                       response.headers['www-authenticate']))
            raise DCOSException(msg)

        if auth_scheme == 'basic':
            # for basic auth if username + password was present,
            # we'd already be authed by python requests module
            username, password = _get_auth_credentials(username, hostname)
            return HTTPBasicAuth(username, password)
        # dcos auth (acs or oauth)
        else:
            return _get_dcos_auth(auth_scheme, username, password, hostname)
    else:
        msg = ("Invalid HTTP response: server returned an HTTP 401 response "
               "with no 'www-authenticate' field")
        raise DCOSException(msg)
http.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_dcos_auth(auth_scheme, username, password, hostname):
    """Get authentication flow for dcos acs auth and dcos oauth

    :param auth_scheme: authentication_scheme
    :type auth_scheme: str
    :param username: username user for authentication
    :type username: str
    :param password: password for authentication
    :type password: str
    :param hostname: hostname for credentials
    :type hostname: str
    :returns: DCOSAcsAuth
    :rtype: AuthBase
    """

    toml_config = util.get_config()
    token = toml_config.get("core.dcos_acs_token")
    if token is None:
        dcos_url = toml_config.get("core.dcos_url")
        if auth_scheme == "acsjwt":
            creds = _get_dcos_acs_auth_creds(username, password, hostname)
        else:
            creds = _get_dcos_oauth_creds(dcos_url)

        verify = _verify_ssl()
        # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
        if verify is not None:
            silence_requests_warnings()

        url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
        # using private method here, so we don't retry on this request
        # error here will be bubbled up to _request_with_auth
        response = _request('post', url, json=creds, verify=verify)

        if response.status_code == 200:
            token = response.json()['token']
            config.set_val("core.dcos_acs_token", token)

    return DCOSAcsAuth(token)
query.py 文件源码 项目:gaia-on-tap 作者: andycasey 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def query(query, authenticate=False, json=False, full_output=False, **kwargs):
    """
    Execute a synchronous TAP query to the ESA Gaia database.

    :param query:
        The TAP query to execute.

    :param authenticate: [optional]
        Authenticate with the username and password information stored in the
        config.

    :param json: [optional]
        Return the data in JSON format. If set to False, then the data will be
        returned as an `astropy.table.Table`.

    :param full_output: [optional]
        Return a two-length tuple containing the data and the corresponding
        `requests.response` object.

    :returns:
        The data returned - either as an astropy table or a dictionary (JSON) -
        and optionally, the `requests.response` object used.
    """

    format = "json" if json else "votable"
    params = dict(REQUEST="doQuery", LANG="ADQL", FORMAT=format, query=query)
    params.update(kwargs)

    # Create session.
    session = requests.Session()
    if authenticate:
        utils.login(session)
    response = session.get("{}/tap/sync".format(config.url), params=params)

    if not response.ok:
        raise TAPQueryException(response)

    if json:
        data = response.json()

    else:
        # Take the table contents and return an astropy table.
        data = Table.read(StringIO(response.text), format="votable")

    return (data, response) if full_output else data
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, endpoint, ok_statuses=None, to_none_statuses=None,
                 empty_to_none=True, close_slash=True,
                 logger=None, name=None, keep_blank_values=True):
        """Create a client

        :param endpoint: str, ex. http://localhost:5000 or http://localhost:5000/api/
        :param ok_statuses: default - (200, 201, 202, ), status codes for "ok"
        :param to_none_statuses: statuses, for generate None as response, default - (404, )
        :param empty_to_none: boolean, default - True, if True - empty response will be generate None response (empty str, empty list, empty dict)
        :param close_slash: boolean, url += '/', if url.endswith != '/', default - True
        :param logger: logger instance
        :param name: name for client
        :type name: str
        """
        if name is None:
            name = '<client: {}>'.format(endpoint)

        if logger is None:
            logger = get_logger(__name__)

        self.logger = InstanceLogger(self, logger)
        if endpoint.endswith('/'):
            endpoint = endpoint[:-1]
        if ok_statuses is not None:
            self.ok_statuses = ok_statuses
        if to_none_statuses is not None:
            self.to_none_statuses = to_none_statuses
        self.empty_to_none = empty_to_none
        self.close_slash = close_slash
        parsed_url = urlparse.urlparse(endpoint)
        endpoint = self.get_endpoint_from_parsed_url(parsed_url)
        self.keep_blank_values = keep_blank_values
        self.endpoint = endpoint
        self.path = parsed_url.path
        self.query = urlparse.parse_qs(parsed_url.query,
                                       keep_blank_values=self.keep_blank_values)
        self.fragment = parsed_url.fragment
        self.params = parsed_url.params
        self.name = name
        self.logger.debug(
            'Client built, endpoint: "%s", path: "%s", query: %s, params: %s, fragment: %s',
            self.endpoint, self.path,
            self.query, self.params, self.fragment)
bitrequests.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def request(self, method, url, max_price=None, mock_requests=False, **kwargs):
        """Make a 402 request for a resource.

        This is the BitRequests public method that should be used to complete a
        402 request using the desired payment method (as constructed by a class
        implementing BitRequests)

        Args:
            method (string): HTTP method for completing the request in lower-
                case letters. Examples: 'get', 'post', 'put'
            url (string): URL of the requested resource.
            data (dict): python dict of parameters to send with the request.
            max_price (int): maximum allowed price for a request (in satoshi).

        Returns:
            response (requests.response):
                response from paying for the requested resource.
        """
        if mock_requests:
            fake_response = requests.models.Response()
            fake_response.status_code = 200
            fake_response._content = b''
            return fake_response

        # Make the initial request for the resource
        response = requests.request(method, url, **kwargs)

        # Return if we receive a status code other than 402: payment required
        if response.status_code != requests.codes.payment_required:
            return response

        # Pass the response to the main method for handling payment
        logger.debug('[BitRequests] 402 payment required: {} satoshi.'.format(
            response.headers['price']))
        payment_headers = self.make_402_payment(response, max_price)

        # Reset the position of any files that have been used
        self._reset_file_positions(kwargs.get('files'), kwargs.get('data'))

        # Add any user-provided headers to the payment headers dict
        if 'headers' in kwargs:
            if isinstance(kwargs['headers'], dict):
                kwargs['headers'].update(payment_headers)
            else:
                raise ValueError('argument \'headers\' must be a dict.')
        else:
            kwargs['headers'] = payment_headers

        paid_response = requests.request(method, url, **kwargs)
        setattr(paid_response, 'amount_paid', int(response.headers['price']))

        if paid_response.status_code == requests.codes.ok:
            logger.debug('[BitRequests] Successfully purchased resource.')
        else:
            logger.debug('[BitRequests] Could not purchase resource.')

        return paid_response
bitrequests.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def make_402_payment(self, response, max_price):
        """Make a bit-transfer payment to the payment-handling service."""
        # Retrieve payment headers
        headers = response.headers
        price = headers.get(BitTransferRequests.HTTP_BITCOIN_PRICE)
        payee_address = headers.get(BitTransferRequests.HTTP_BITCOIN_ADDRESS)
        payee_username = headers.get(BitTransferRequests.HTTP_BITCOIN_USERNAME)

        # Verify that the payment method is supported
        if price is None or payee_address is None or payee_username is None:
            raise UnsupportedPaymentMethodError(
                'Resource does not support that payment method.')

        # Convert string headers into correct data types
        price = int(price)

        # verify that we have the money to purchase the resource
        buffer_balance = self.client.get_earnings()["total_earnings"]
        if price > buffer_balance:
            insuff_funds_err = 'Resource price ({}) exceeds buffer balance ({}).'
            raise InsufficientBalanceError(insuff_funds_err.format(price, buffer_balance))

        # Verify resource cost against our budget
        if max_price and price > max_price:
            max_price_err = 'Resource price ({}) exceeds max price ({}).'
            raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))

        # Get the signing public key
        pubkey = self.wallet.get_public_key()
        compressed_pubkey = codecs.encode(pubkey.compressed_bytes, 'base64').decode()

        # Create and sign BitTranfer
        bittransfer = json.dumps({
            'payer': self.username,
            'payer_pubkey': compressed_pubkey,
            'payee_address': payee_address,
            'payee_username': payee_username,
            'amount': price,
            'timestamp': time.time(),
            'description': response.url
        })
        if not isinstance(bittransfer, str):
            raise TypeError("Serialized bittransfer must be a string")
        signature = self.wallet.sign_message(bittransfer)
        logger.debug('[BitTransferRequests] Signature: {}'.format(signature))
        logger.debug('[BitTransferRequests] BitTransfer: {}'.format(bittransfer))
        return {
            'Bitcoin-Transfer': bittransfer,
            'Authorization': signature
        }
http.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def request(method,
            url,
            is_success=_default_is_success,
            timeout=None,
            verify=None,
            **kwargs):
    """Sends an HTTP request. If the server responds with a 401, ask the
    user for their credentials, and try request again (up to 3 times).

    :param method: method for the new Request object
    :type method: str
    :param url: URL for the new Request object
    :type url: str
    :param is_success: Defines successful status codes for the request
    :type is_success: Function from int to bool
    :param timeout: request timeout
    :type timeout: int
    :param verify: whether to verify SSL certs or path to cert(s)
    :type verify: bool | str
    :param kwargs: Additional arguments to requests.request
        (see http://docs.python-requests.org/en/latest/api/#requests.request)
    :type kwargs: dict
    :rtype: Response
    """

    if 'headers' not in kwargs:
        kwargs['headers'] = {'Accept': 'application/json'}

    verify = _verify_ssl(verify)

    # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad certs
    if verify is not None:
        silence_requests_warnings()

    response = _request(method, url, is_success, timeout,
                        verify=verify, **kwargs)

    if response.status_code == 401:
        response = _request_with_auth(response, method, url, is_success,
                                      timeout, verify, **kwargs)

    if is_success(response.status_code):
        return response
    elif response.status_code == 403:
        raise DCOSAuthorizationException(response)
    else:
        raise DCOSHTTPException(response)


问题


面经


文章

微信
公众号

扫码关注公众号