def setup_http_session(self):
if self.http_session:
self.http_session.close()
self.http_session = Session()
self.http_session.headers['User-Agent'] = USER_AGENT
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
self.http_session.mount('http://', http_adapter)
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
self.http_session.mount('https://', http_adapter)
python类HTTPAdapter()的实例源码
def http_session(self):
"""Returns a :class:`requests.Session` object. A new session is
created if it doesn't already exist."""
http_session = getattr(self, '_http_session', None)
if not http_session:
requests.packages.urllib3.disable_warnings()
session = requests.Session()
session.headers['User-Agent'] = USER_AGENT
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('http://', http_adapter)
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('https://', http_adapter)
self._http_session = session
return self._http_session
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
"""The constructor.
Args:
timeout (float): The default global timeout(seconds).
"""
self.timeout = timeout
self.session = requests.session()
if max_retries and retry_interval:
retries = Retry(total=max_retries, backoff_factor=retry_interval)
self.session.mount('http://', HTTPAdapter(max_retries=retries))
self.session.mount('https://', HTTPAdapter(max_retries=retries))
if cache:
self.session = CacheControl(self.session)
def __init__(self, endpoint=None, application_key=None,
application_secret=None, consumer_key=None, timeout=TIMEOUT):
from requests import Session
from requests.adapters import HTTPAdapter
self._endpoint = ENDPOINTS[endpoint]
self._application_key = application_key
self._application_secret = application_secret
self._consumer_key = consumer_key
# lazy load time delta
self._time_delta = None
try:
# Some older versions of requests to not have the urllib3
# vendorized package
from requests.packages.urllib3.util.retry import Retry
except ImportError:
retries = 5
else:
# use a requests session to reuse connections between requests
retries = Retry(
total=5,
backoff_factor=0.2,
status_forcelist=[422, 500, 502, 503, 504]
)
self._session = Session()
self._session.mount('https://', HTTPAdapter(max_retries=retries))
self._session.mount('http://', HTTPAdapter(max_retries=retries))
# Override default timeout
self._timeout = timeout
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
):
"""Opinionated wrapper that creates a requests session with a
HTTPAdapter that sets up a Retry policy that includes connection
retries.
If you do the more naive retry by simply setting a number. E.g.::
adapter = HTTPAdapter(max_retries=3)
then it will raise immediately on any connection errors.
Retrying on connection errors guards better on unpredictable networks.
From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter
it says: "By default, Requests does not retry failed connections."
The backoff_factor is documented here:
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
A default of retries=3 and backoff_factor=0.3 means it will sleep like::
[0.3, 0.6, 1.2]
""" # noqa
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def _on_request(self, request, **kwargs):
match = self._find_match(request)
# TODO(dcramer): find the correct class for this
if match is None:
error_msg = 'Connection refused: {0}'.format(request.url)
response = ConnectionError(error_msg)
self._calls.add(request, response)
raise response
headers = {
'Content-Type': match['content_type'],
}
if match['adding_headers']:
headers.update(match['adding_headers'])
response = HTTPResponse(
status=match['status'],
body=BufferIO(match['body']),
headers=headers,
preload_content=False,
)
adapter = HTTPAdapter()
response = adapter.build_response(request, response)
if not match['stream']:
response.content # NOQA
self._calls.add(request, response)
return response
def _on_request(self, request, **kwargs):
match = self._find_match(request)
# TODO(dcramer): find the correct class for this
if match is None:
error_msg = 'Connection refused: {0}'.format(request.url)
response = ConnectionError(error_msg)
self._calls.add(request, response)
raise response
headers = {
'Content-Type': match['content_type'],
}
if match['adding_headers']:
headers.update(match['adding_headers'])
response = HTTPResponse(
status=match['status'],
body=BufferIO(match['body']),
headers=headers,
preload_content=False,
)
adapter = HTTPAdapter()
response = adapter.build_response(request, response)
if not match['stream']:
response.content # NOQA
self._calls.add(request, response)
return response
def __init__(self, base_url, queries=None, **kwargs):
"""Constructor
Args:
base_url (str): the server's url
queries (Optional[Query]): the queries
"""
self.session = FuturesSession(max_workers=self.MAX_WORKERS)
retries = Retry(total=Connection.MAX_RETRIES, backoff_factor=1, status_forcelist=Connection.STATUS_FORCELIST)
self.session.mount(base_url, HTTPAdapter(max_retries=retries))
self.results = []
self.queries = queries
if kwargs:
if 'timeout' in kwargs:
self.TIMEOUT = kwargs['timeout']
if 'max_retries' in kwargs:
self.MAX_RETRIES = kwargs['max_retries']
if 'max_workers' in kwargs:
self.MAX_WORKERS = kwargs['max_workers']
if 'user_agent' in kwargs:
self.USER_AGENT = kwargs['user_agent']
if 'x_forwarded_for' in kwargs:
self.X_FORWARDED_FOR = utils.get_x_fwded_for_str(kwargs['x_forwarded_for'])
self.exec_queries()
def __init__(self, base_url=BASE_URL, login_url=LOGIN_URL, session=None):
self.base_url = base_url
self.login_url = login_url
self.url = base_url.rstrip('/') + '/mods'
self.session = session or requests.session()
adapter = HTTPAdapter(max_retries=Retry(status_forcelist=[500, 503]))
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)
def new_connection(self, cluster_api, provider):
config = cluster_api.nsxlib_config
session = TimeoutSession(config.http_timeout,
config.http_read_timeout)
if config.client_cert_provider:
session.cert_provider = config.client_cert_provider
else:
session.auth = (provider.username, provider.password)
# NSX v3 doesn't use redirects
session.max_redirects = 0
session.verify = not config.insecure
if session.verify and provider.ca_file:
# verify using the said ca bundle path
session.verify = provider.ca_file
# we are pooling with eventlet in the cluster class
adapter = adapters.HTTPAdapter(
pool_connections=1, pool_maxsize=1,
max_retries=config.retries,
pool_block=False)
session.mount('http://', adapter)
session.mount('https://', adapter)
self.get_default_headers(session, provider,
config.allow_overwrite_header)
return session
def rsess():
s = requests.Session()
# Just so one random 500 doesn't break an uptime; two consecutive errors are
# worrying though.
s.mount('http://', HTTPAdapter(max_retries=MAX_RETRIES))
s.mount('https://', HTTPAdapter(max_retries=MAX_RETRIES))
s.headers.update({
'User-Agent': USER_AGENT,
})
return s
def get_async_requests_session(num_retries, backoff_factor, pool_size,
status_forcelist=[500, 502, 503, 504]):
# Use requests & urllib3 to auto-retry.
# If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s,
# 0.4s, ...] between retries. It will also force a retry if the status
# code returned is in status_forcelist.
session = FuturesSession(max_workers=pool_size)
# If any regular response is generated, no retry is done. Without using
# the status_forcelist, even a response with status 500 will not be
# retried.
retries = Retry(total=num_retries, backoff_factor=backoff_factor,
status_forcelist=status_forcelist)
# Mount handler on both HTTP & HTTPS.
session.mount('http://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))
session.mount('https://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))
return session
# Evaluates the status of PTC and Niantic request futures, and returns the
# result (optionally with an error).
# Warning: blocking! Can only get status code if request has finished.
def download_webpage(target_url, proxy=None, timeout=5):
s = requests.Session()
retries = Retry(total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504])
s.mount('http://', HTTPAdapter(max_retries=retries))
headers = {
'User-Agent': ('Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) ' +
'Gecko/20100101 Firefox/54.0'),
'Referer': 'http://google.com'
}
r = s.get(target_url,
proxies={'http': proxy, 'https': proxy},
timeout=timeout,
headers=headers)
if r.status_code == 200:
return r.content
return None
# Sockslist.net uses javascript to obfuscate proxies port number.
# Builds a dictionary with decoded values for each variable.
# Dictionary = {'var': intValue, ...})
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
s.mount('http://github.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://github.com',
'http://github',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://gittip.com/about/',
'http://github.com',
'http://gittip.com',
'http://github',
'http://gittip',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
def test_urllib3_retries(httpbin):
from requests.packages.urllib3.util import Retry
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=Retry(
total=2, status_forcelist=[500]
)))
with pytest.raises(RetryError):
s.get(httpbin('status/500'))
def test_urllib3_pool_connection_closed(httpbin):
s = requests.Session()
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
try:
s.get(httpbin('status/200'))
except ConnectionError as e:
assert u"Pool is closed." in str(e)
def start(self):
assert not self.started, "File download has already been started, and cannot be started again"
# initialize the requests session, with backoff-retries enabled
self.session = requests.Session()
self.session.mount('http://', HTTPAdapter(max_retries=retries))
self.session.mount('https://', HTTPAdapter(max_retries=retries))
# initiate the download, check for status errors, and calculate download size
self.response = self.session.get(
self.source, stream=True, timeout=self.timeout)
self.response.raise_for_status()
self.total_size = int(self.response.headers['content-length'])
self.started = True
def __init__(self, config):
self.config = config
self.client = Session()
retries = Retry(
total=self.config.connection_retries,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)
self.client.mount(
self.config.protocol + "://", HTTPAdapter(max_retries=retries)
)
if hasattr(self.config, "timeout") and self.config.timeout:
self.client.send = partial(
self.client.send, timeout=self.config.timeout
)
def __init__(self, server_http_url = '', *args, **kwargs):
self.server_http_url = server_http_url
self.session = requests.session()
self.session.headers = {"Accept": "application/json",
"Content-type": "application/json"}
self.session.verify = False
# Increase the number of pool connections so we can test large numbers
# of connections to the api.
adapter = adapters.HTTPAdapter(pool_connections=2000, pool_maxsize=2000)
self.session.mount('http://', adapter)
def __init__(self, timeout=None, retries=None):
super(Session, self).__init__()
self.timeout = timeout
if retries is None:
retry = Retry(**DEFAULT_RETRY_ARGS)
elif isinstance(retries, int):
args = DEFAULT_RETRY_ARGS.copy()
args.pop('total', None)
retry = Retry(total=retries, **args)
elif isinstance(retries, dict):
retry = Retry(**retries)
self.mount('http://', HTTPAdapter(max_retries=retry))
self.mount('https://', HTTPAdapter(max_retries=retry))