python类Retry()的实例源码

s3.py 文件源码 项目:chrome-prerender 作者: bosondata 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self) -> None:
        http_client = urllib3.PoolManager(
            timeout=urllib3.Timeout.DEFAULT_TIMEOUT,
            cert_reqs='CERT_REQUIRED',
            ca_certs=certifi.where(),
            retries=urllib3.Retry(
                total=5,
                backoff_factor=0.2,
                status_forcelist=[500, 502, 503, 504]
            ),
            maxsize=20
        )
        self.client = minio.Minio(
            S3_SERVER,
            access_key=S3_ACCESS_KEY,
            secret_key=S3_SECRET_KEY,
            region=S3_REGION,
            secure=S3_SERVER == 's3.amazonaws.com',
            http_client=http_client
        )
test_request.py 文件源码 项目:drf-reverse-proxy 作者: danpoland 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_custom_retries(self):
        RETRIES = Retry(20, backoff_factor=0.1)
        options = {'path': 'test/', 'retries': RETRIES}

        self.factory_custom_proxy_view(**options)
        url = 'http://www.example.com/test/'
        headers = {'Cookie': ''}
        self.urlopen.assert_called_with('GET', url, redirect=False,
                                        retries=RETRIES,
                                        preload_content=False,
                                        decode_content=False,
                                        headers=headers, body=b'')
__init__.py 文件源码 项目:httplib2shim 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _conn_request(self, conn, request_uri, method, body, headers):
        # Reconstruct the full uri from the connection object.
        if isinstance(conn, httplib2.HTTPSConnectionWithTimeout):
            scheme = 'https'
        else:
            scheme = 'http'

        host = conn.host

        # Reformat IPv6 hosts.
        if _is_ipv6(host):
            host = '[{}]'.format(host)

        full_uri = '{}://{}:{}{}'.format(
            scheme, host, conn.port, request_uri)

        decode = True if method != 'HEAD' else False

        try:
            urllib3_response = self.pool.request(
                method,
                full_uri,
                body=body,
                headers=headers,
                redirect=False,
                retries=urllib3.Retry(total=False, redirect=0),
                timeout=urllib3.Timeout(total=self.timeout),
                decode_content=decode)

            response = _map_response(urllib3_response, decode=decode)
            content = urllib3_response.data

        except Exception as e:
            raise _map_exception(e)

        return response, content
telegram.py 文件源码 项目:telegram-bot-framework 作者: alvarogzp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __create_session(self):
        session = requests.session()
        # Retry one time on read errors, as the connection could have been closed
        # by the remote side and its close notification might have been lost,
        # blocked by firewall or dropped by NAT.
        # In that case, the session will try to reuse the connection thinking it is
        # still alive just to fail with a read error when the remote sends the reset.
        #
        # So, by retrying in that case we avoid this error and do not lose that api
        # call, which could be translated in a message lost.
        # That way we try to get the same behavior as we had when not using sessions.
        #
        # The drawback is that we could also be masquerading some *real* read errors
        # and retrying a request already processed by the server, repeating some
        # action (eg. a duplicate message).
        # But, during a year of production use we never had any read error of this kind.
        #
        # Update:
        # Due to a duplicate message case during a telegram bot api partial failure,
        # we are disabling the retry policy.
        # That way, if the edge described above happens a message won't be sent but
        # we will get an error. With the retry policy, a duplicate message could be
        # sent silently on poor network conditions.
        # So, it is preferable a message lost with the error being logged than a
        # duplicate message without noticing it.

        # retry = Retry(total=1, connect=0, read=1, status=0, respect_retry_after_header=False)
        # passing prefix lowered to work-around https://github.com/requests/requests/pull/4349
        # session.mount(self.base_url.lower(), HTTPAdapter(max_retries=retry))
        return session


问题


面经


文章

微信
公众号

扫码关注公众号