python类HTTPAdapter()的实例源码

__init__.py 文件源码 项目:open-wob-api 作者: openstate 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setup_http_session(self):
        if self.http_session:
            self.http_session.close()

        self.http_session = Session()
        self.http_session.headers['User-Agent'] = USER_AGENT

        http_retry = Retry(total=5, status_forcelist=[500, 503],
                           backoff_factor=.5)
        http_adapter = HTTPAdapter(max_retries=http_retry)
        self.http_session.mount('http://', http_adapter)

        http_retry = Retry(total=5, status_forcelist=[500, 503],
                           backoff_factor=.5)
        http_adapter = HTTPAdapter(max_retries=http_retry)
        self.http_session.mount('https://', http_adapter)
__init__.py 文件源码 项目:open-wob-api 作者: openstate 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def http_session(self):
        """Returns a :class:`requests.Session` object. A new session is
        created if it doesn't already exist."""
        http_session = getattr(self, '_http_session', None)
        if not http_session:
            requests.packages.urllib3.disable_warnings()
            session = requests.Session()
            session.headers['User-Agent'] = USER_AGENT

            http_retry = Retry(total=5, status_forcelist=[500, 503],
                               backoff_factor=.5)
            http_adapter = HTTPAdapter(max_retries=http_retry)
            session.mount('http://', http_adapter)

            http_retry = Retry(total=5, status_forcelist=[500, 503],
                               backoff_factor=.5)
            http_adapter = HTTPAdapter(max_retries=http_retry)
            session.mount('https://', http_adapter)

            self._http_session = session

        return self._http_session
requests_client.py 文件源码 项目:MundiAPI-PYTHON 作者: mundipagg 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
        """The constructor.

        Args:
            timeout (float): The default global timeout(seconds).

        """
        self.timeout = timeout
        self.session = requests.session()

        if max_retries and retry_interval:
            retries = Retry(total=max_retries, backoff_factor=retry_interval)
            self.session.mount('http://', HTTPAdapter(max_retries=retries))
            self.session.mount('https://', HTTPAdapter(max_retries=retries))

        if cache:
            self.session = CacheControl(self.session)
ovh_shinken.py 文件源码 项目:sauna 作者: NicolasLM 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, endpoint=None, application_key=None,
                 application_secret=None, consumer_key=None, timeout=TIMEOUT):
        from requests import Session
        from requests.adapters import HTTPAdapter

        self._endpoint = ENDPOINTS[endpoint]
        self._application_key = application_key
        self._application_secret = application_secret
        self._consumer_key = consumer_key

        # lazy load time delta
        self._time_delta = None

        try:
            # Some older versions of requests to not have the urllib3
            # vendorized package
            from requests.packages.urllib3.util.retry import Retry
        except ImportError:
            retries = 5
        else:
            # use a requests session to reuse connections between requests
            retries = Retry(
                total=5,
                backoff_factor=0.2,
                status_forcelist=[422, 500, 502, 503, 504]
            )

        self._session = Session()
        self._session.mount('https://', HTTPAdapter(max_retries=retries))
        self._session.mount('http://', HTTPAdapter(max_retries=retries))

        # Override default timeout
        self._timeout = timeout
utils.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def requests_retry_session(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 504),
):
    """Opinionated wrapper that creates a requests session with a
    HTTPAdapter that sets up a Retry policy that includes connection
    retries.

    If you do the more naive retry by simply setting a number. E.g.::

        adapter = HTTPAdapter(max_retries=3)

    then it will raise immediately on any connection errors.
    Retrying on connection errors guards better on unpredictable networks.
    From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter
    it says: "By default, Requests does not retry failed connections."

    The backoff_factor is documented here:
    https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
    A default of retries=3 and backoff_factor=0.3 means it will sleep like::

        [0.3, 0.6, 1.2]
    """  # noqa
    session = requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
responses.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 62 收藏 0 点赞 0 评论 0
def _on_request(self, request, **kwargs):
        match = self._find_match(request)

        # TODO(dcramer): find the correct class for this
        if match is None:
            error_msg = 'Connection refused: {0}'.format(request.url)
            response = ConnectionError(error_msg)

            self._calls.add(request, response)
            raise response

        headers = {
            'Content-Type': match['content_type'],
        }
        if match['adding_headers']:
            headers.update(match['adding_headers'])

        response = HTTPResponse(
            status=match['status'],
            body=BufferIO(match['body']),
            headers=headers,
            preload_content=False,
        )

        adapter = HTTPAdapter()

        response = adapter.build_response(request, response)
        if not match['stream']:
            response.content  # NOQA

        self._calls.add(request, response)

        return response
responses.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _on_request(self, request, **kwargs):
        match = self._find_match(request)

        # TODO(dcramer): find the correct class for this
        if match is None:
            error_msg = 'Connection refused: {0}'.format(request.url)
            response = ConnectionError(error_msg)

            self._calls.add(request, response)
            raise response

        headers = {
            'Content-Type': match['content_type'],
        }
        if match['adding_headers']:
            headers.update(match['adding_headers'])

        response = HTTPResponse(
            status=match['status'],
            body=BufferIO(match['body']),
            headers=headers,
            preload_content=False,
        )

        adapter = HTTPAdapter()

        response = adapter.build_response(request, response)
        if not match['stream']:
            response.content  # NOQA

        self._calls.add(request, response)

        return response
connection.py 文件源码 项目:libmozdata 作者: mozilla 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, base_url, queries=None, **kwargs):
        """Constructor

        Args:
            base_url (str): the server's url
            queries (Optional[Query]): the queries
        """

        self.session = FuturesSession(max_workers=self.MAX_WORKERS)
        retries = Retry(total=Connection.MAX_RETRIES, backoff_factor=1, status_forcelist=Connection.STATUS_FORCELIST)
        self.session.mount(base_url, HTTPAdapter(max_retries=retries))
        self.results = []
        self.queries = queries

        if kwargs:
            if 'timeout' in kwargs:
                self.TIMEOUT = kwargs['timeout']
            if 'max_retries' in kwargs:
                self.MAX_RETRIES = kwargs['max_retries']
            if 'max_workers' in kwargs:
                self.MAX_WORKERS = kwargs['max_workers']
            if 'user_agent' in kwargs:
                self.USER_AGENT = kwargs['user_agent']
            if 'x_forwarded_for' in kwargs:
                self.X_FORWARDED_FOR = utils.get_x_fwded_for_str(kwargs['x_forwarded_for'])

        self.exec_queries()
api.py 文件源码 项目:fac 作者: mickael9 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, base_url=BASE_URL, login_url=LOGIN_URL, session=None):
        self.base_url = base_url
        self.login_url = login_url
        self.url = base_url.rstrip('/') + '/mods'
        self.session = session or requests.session()
        adapter = HTTPAdapter(max_retries=Retry(status_forcelist=[500, 503]))
        self.session.mount('https://', adapter)
        self.session.mount('http://', adapter)
cluster.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def new_connection(self, cluster_api, provider):
        config = cluster_api.nsxlib_config
        session = TimeoutSession(config.http_timeout,
                                 config.http_read_timeout)
        if config.client_cert_provider:
            session.cert_provider = config.client_cert_provider
        else:
            session.auth = (provider.username, provider.password)

        # NSX v3 doesn't use redirects
        session.max_redirects = 0

        session.verify = not config.insecure
        if session.verify and provider.ca_file:
            # verify using the said ca bundle path
            session.verify = provider.ca_file

        # we are pooling with eventlet in the cluster class
        adapter = adapters.HTTPAdapter(
            pool_connections=1, pool_maxsize=1,
            max_retries=config.retries,
            pool_block=False)
        session.mount('http://', adapter)
        session.mount('https://', adapter)

        self.get_default_headers(session, provider,
                                 config.allow_overwrite_header)

        return session
tasks.py 文件源码 项目:minstances 作者: 0xa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def rsess():
    s = requests.Session()

    # Just so one random 500 doesn't break an uptime; two consecutive errors are
    # worrying though.
    s.mount('http://', HTTPAdapter(max_retries=MAX_RETRIES))
    s.mount('https://', HTTPAdapter(max_retries=MAX_RETRIES))
    s.headers.update({
        'User-Agent': USER_AGENT,
    })

    return s
proxy_tester.py 文件源码 项目:PoGo-Proxies 作者: neskk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_async_requests_session(num_retries, backoff_factor, pool_size,
                               status_forcelist=[500, 502, 503, 504]):
    # Use requests & urllib3 to auto-retry.
    # If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s,
    # 0.4s, ...] between retries. It will also force a retry if the status
    # code returned is in status_forcelist.
    session = FuturesSession(max_workers=pool_size)

    # If any regular response is generated, no retry is done. Without using
    # the status_forcelist, even a response with status 500 will not be
    # retried.
    retries = Retry(total=num_retries, backoff_factor=backoff_factor,
                    status_forcelist=status_forcelist)

    # Mount handler on both HTTP & HTTPS.
    session.mount('http://', HTTPAdapter(max_retries=retries,
                                         pool_connections=pool_size,
                                         pool_maxsize=pool_size))
    session.mount('https://', HTTPAdapter(max_retries=retries,
                                          pool_connections=pool_size,
                                          pool_maxsize=pool_size))

    return session


# Evaluates the status of PTC and Niantic request futures, and returns the
# result (optionally with an error).
# Warning: blocking! Can only get status code if request has finished.
proxy_scrapper.py 文件源码 项目:PoGo-Proxies 作者: neskk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_webpage(target_url, proxy=None, timeout=5):
    s = requests.Session()

    retries = Retry(total=3,
                    backoff_factor=0.5,
                    status_forcelist=[500, 502, 503, 504])

    s.mount('http://', HTTPAdapter(max_retries=retries))

    headers = {
        'User-Agent': ('Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) ' +
                       'Gecko/20100101 Firefox/54.0'),
        'Referer': 'http://google.com'
    }

    r = s.get(target_url,
              proxies={'http': proxy, 'https': proxy},
              timeout=timeout,
              headers=headers)

    if r.status_code == 200:
        return r.content

    return None


# Sockslist.net uses javascript to obfuscate proxies port number.
# Builds a dictionary with decoded values for each variable.
# Dictionary = {'var': intValue, ...})
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_transport_adapter_ordering(self):
        s = requests.Session()
        order = ['https://', 'http://']
        assert order == list(s.adapters)
        s.mount('http://git', HTTPAdapter())
        s.mount('http://github', HTTPAdapter())
        s.mount('http://github.com', HTTPAdapter())
        s.mount('http://github.com/about/', HTTPAdapter())
        order = [
            'http://github.com/about/',
            'http://github.com',
            'http://github',
            'http://git',
            'https://',
            'http://',
        ]
        assert order == list(s.adapters)
        s.mount('http://gittip', HTTPAdapter())
        s.mount('http://gittip.com', HTTPAdapter())
        s.mount('http://gittip.com/about/', HTTPAdapter())
        order = [
            'http://github.com/about/',
            'http://gittip.com/about/',
            'http://github.com',
            'http://gittip.com',
            'http://github',
            'http://gittip',
            'http://git',
            'https://',
            'http://',
        ]
        assert order == list(s.adapters)
        s2 = requests.Session()
        s2.adapters = {'http://': HTTPAdapter()}
        s2.mount('https://', HTTPAdapter())
        assert 'http://' in s2.adapters
        assert 'https://' in s2.adapters
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 86 收藏 0 点赞 0 评论 0
def test_urllib3_retries(httpbin):
    from requests.packages.urllib3.util import Retry
    s = requests.Session()
    s.mount('http://', HTTPAdapter(max_retries=Retry(
        total=2, status_forcelist=[500]
    )))

    with pytest.raises(RetryError):
        s.get(httpbin('status/500'))
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_urllib3_pool_connection_closed(httpbin):
    s = requests.Session()
    s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))

    try:
        s.get(httpbin('status/200'))
    except ConnectionError as e:
        assert u"Pool is closed." in str(e)
transfer.py 文件源码 项目:kolibri 作者: learningequality 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start(self):
        assert not self.started, "File download has already been started, and cannot be started again"

        # initialize the requests session, with backoff-retries enabled
        self.session = requests.Session()
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

        # initiate the download, check for status errors, and calculate download size
        self.response = self.session.get(
            self.source, stream=True, timeout=self.timeout)
        self.response.raise_for_status()
        self.total_size = int(self.response.headers['content-length'])

        self.started = True
qingstor.py 文件源码 项目:qingstor-sdk-python 作者: yunify 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, config):
        self.config = config
        self.client = Session()
        retries = Retry(
            total=self.config.connection_retries,
            backoff_factor=1,
            status_forcelist=[500, 502, 503, 504]
        )
        self.client.mount(
            self.config.protocol + "://", HTTPAdapter(max_retries=retries)
        )
        if hasattr(self.config, "timeout") and self.config.timeout:
            self.client.send = partial(
                self.client.send, timeout=self.config.timeout
            )
http_requests.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, server_http_url = '', *args, **kwargs):
        self.server_http_url = server_http_url
        self.session = requests.session()
        self.session.headers = {"Accept": "application/json",
                                "Content-type": "application/json"}
        self.session.verify = False

        # Increase the number of pool connections so we can test large numbers
        # of connections to the api.
        adapter = adapters.HTTPAdapter(pool_connections=2000, pool_maxsize=2000)
        self.session.mount('http://', adapter)
requests.py 文件源码 项目:fleece 作者: racker 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, timeout=None, retries=None):
        super(Session, self).__init__()
        self.timeout = timeout
        if retries is None:
            retry = Retry(**DEFAULT_RETRY_ARGS)
        elif isinstance(retries, int):
            args = DEFAULT_RETRY_ARGS.copy()
            args.pop('total', None)
            retry = Retry(total=retries, **args)
        elif isinstance(retries, dict):
            retry = Retry(**retries)
        self.mount('http://', HTTPAdapter(max_retries=retry))
        self.mount('https://', HTTPAdapter(max_retries=retry))


问题


面经


文章

微信
公众号

扫码关注公众号