def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
"""The constructor.
Args:
timeout (float): The default global timeout(seconds).
"""
self.timeout = timeout
self.session = requests.session()
if max_retries and retry_interval:
retries = Retry(total=max_retries, backoff_factor=retry_interval)
self.session.mount('http://', HTTPAdapter(max_retries=retries))
self.session.mount('https://', HTTPAdapter(max_retries=retries))
if cache:
self.session = CacheControl(self.session)
python类Retry()的实例源码
def retry_session():
# This will give the total wait time in minutes:
# >>> sum([min((0.3 * (2 ** (i - 1))), 120) / 60 for i in range(24)])
# >>> 30.5575
# This works by the using the minimum time in seconds of the backoff time
# and the max back off time which defaults to 120 seconds. The backoff time
# increases after every failed attempt.
session = requests.Session()
retry = Retry(
total=24,
read=5,
connect=24,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
method_whitelist=('GET', 'POST'),
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_http():
"""Get the http object."""
ensure_dir(CLUSTERFUZZ_TESTCASES_DIR)
http = requests_cache.CachedSession(
cache_name=os.path.join(CLUSTERFUZZ_TESTCASES_DIR, 'http_cache'),
backend='sqlite',
allowable_methods=('GET', 'POST'),
allowable_codes=[200],
expire_after=HTTP_CACHE_TTL)
http.mount(
'https://',
adapters.HTTPAdapter(
# backoff_factor is 0.5. Therefore, the max wait time is 16s.
retry.Retry(
total=5, backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]))
)
return http
def _requests_retry_session(
self,
retries=3,
backoff_factor=0.3,
method_whitelist=['HEAD', 'GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'TRACE'],
status_forcelist=(500, 502, 503, 504, 520, 524),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
status=retries,
method_whitelist=method_whitelist,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_soup(url, num_retries = 10):
"""
Takes in a url and returns the parsed BeautifulSoup code for that url with
handling capabilities if the request 'bounces'.
"""
s = requests.Session()
retries = Retry(
total = num_retries,
backoff_factor = 0.1,
status_forcelist = [500, 502, 503, 504]
)
s.mount('http://', HTTPAdapter(max_retries = retries))
return BeautifulSoup(s.get(url).text, 'html.parser')
def demo(base_url):
"""Login through a third-party OAuth handler and print some stats.
Parameters
----------
base_url : str
Base URL of the CMS server.
"""
session = requests.Session()
adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.02))
session.mount('{}://'.format(urlparse(base_url).scheme), adapter)
wb = webbrowser.get()
login_url = os.path.join(base_url, "login?complete=no")
session.get(login_url)
wb.open(login_url)
auth_url = input("Enter the URL returned after authentication:")
response = session.get(auth_url.replace("complete=no", 'complete=yes'))
assert response.status_code == 200
print(session.get(os.path.join(base_url, 'me')).content)
def __init__(self, url, user, password, database, settings=None, stacktrace=False, timeout=10.0,
timeout_retry=0, timeout_retry_delay=0.0):
self.url = url
self.user = user
self.password = password or ''
self.database = database
self.settings = settings or {}
self.cli_settings = {}
self.stacktrace = stacktrace
self.timeout = timeout
self.session = requests.Session()
retries = Retry(
connect=timeout_retry,
# method_whitelist={'GET', 'POST'}, # enabling retries for POST may be a bad idea
backoff_factor=timeout_retry_delay
)
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
def list_folders(csrftoken, sessionid):
print("Getting list of folders")
# Create a session object from requests library
s = requests.Session()
retries = Retry(total=10, backoff_factor=1,
status_forcelist=[500, 502, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))
s.headers.update({'Cookie': 'csrftoken={0};'
'sessionid={1}'.format(csrftoken, sessionid)})
mdatareq = 'https://app.openmailbox.org/requests/webmail?action=folderlist'
print(mdatareq)
metadata = json.loads(s.get(mdatareq).text)
print(metadata)
print('\nFolder names:')
for line in metadata['folders']:
print(line['name'])
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_url(url):
"""
Get the url
:param url: given url
:return: page
"""
response = requests.Session()
retries = Retry(total=10, backoff_factor=.1)
response.mount('http://', HTTPAdapter(max_retries=retries))
try:
response = response.get(url, timeout=5)
response.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError):
return None
return response
def _endpoint_premium_get(self, url):
s = requests.Session()
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[429])
s.mount('http://', HTTPAdapter(max_retries=retries))
try:
r = s.get(url, auth=(self.username, self.api_key))
except requests.exceptions.ConnectionError as e:
raise ConnectionError(repr(e))
s.close()
if r.status_code in [404, 422]:
return None
if r.status_code == 400:
raise BadRequest('Bad Request for url {}'.format(url))
results = r.json()
if results:
return results
else:
return None
def _endpoint_premium_delete(self, url):
s = requests.Session()
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[429])
s.mount('http://', HTTPAdapter(max_retries=retries))
try:
r = s.delete(url, auth=(self.username, self.api_key))
except requests.exceptions.ConnectionError as e:
raise ConnectionError(repr(e))
s.close()
if r.status_code == 400:
raise BadRequest('Bad Request for url {}'.format(url))
if r.status_code == 200:
return True
if r.status_code == 404:
return None
def _endpoint_premium_put(self, url, payload=None):
s = requests.Session()
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[429])
s.mount('http://', HTTPAdapter(max_retries=retries))
try:
r = s.put(url, data=payload, auth=(self.username, self.api_key))
except requests.exceptions.ConnectionError as e:
raise ConnectionError(repr(e))
s.close()
if r.status_code == 400:
raise BadRequest('Bad Request for url {}'.format(url))
if r.status_code == 200:
return True
if r.status_code in [404, 422]:
return None
# Get Show object
def setup_http_session(self):
if self.http_session:
self.http_session.close()
self.http_session = Session()
self.http_session.headers['User-Agent'] = USER_AGENT
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
self.http_session.mount('http://', http_adapter)
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
self.http_session.mount('https://', http_adapter)
def http_session(self):
"""Returns a :class:`requests.Session` object. A new session is
created if it doesn't already exist."""
http_session = getattr(self, '_http_session', None)
if not http_session:
requests.packages.urllib3.disable_warnings()
session = requests.Session()
session.headers['User-Agent'] = USER_AGENT
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('http://', http_adapter)
http_retry = Retry(total=5, status_forcelist=[500, 503],
backoff_factor=.5)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('https://', http_adapter)
self._http_session = session
return self._http_session
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
"""The constructor.
Args:
timeout (float): The default global timeout(seconds).
"""
self.timeout = timeout
self.session = requests.session()
if max_retries and retry_interval:
retries = Retry(total=max_retries, backoff_factor=retry_interval)
self.session.mount('http://', HTTPAdapter(max_retries=retries))
self.session.mount('https://', HTTPAdapter(max_retries=retries))
if cache:
self.session = CacheControl(self.session)
def _connect(self):
self._session = requests.Session()
adaptator = requests.adapters.HTTPAdapter()
adaptator.max_retries = HttpRetry(
read=self.READ_MAX_RETRIES,
connect=self.CONN_MAX_RETRIES,
backoff_factor=self.BACKOFF_FACTOR)
self._session.mount(str(self.url), adaptator)
self.__conn = self._session.get(
self.url,
stream=True,
timeout=(self.CONN_TIMEOUT, self.READ_TIMEOUT))
def __init__(self, endpoint=None, application_key=None,
application_secret=None, consumer_key=None, timeout=TIMEOUT):
from requests import Session
from requests.adapters import HTTPAdapter
self._endpoint = ENDPOINTS[endpoint]
self._application_key = application_key
self._application_secret = application_secret
self._consumer_key = consumer_key
# lazy load time delta
self._time_delta = None
try:
# Some older versions of requests to not have the urllib3
# vendorized package
from requests.packages.urllib3.util.retry import Retry
except ImportError:
retries = 5
else:
# use a requests session to reuse connections between requests
retries = Retry(
total=5,
backoff_factor=0.2,
status_forcelist=[422, 500, 502, 503, 504]
)
self._session = Session()
self._session.mount('https://', HTTPAdapter(max_retries=retries))
self._session.mount('http://', HTTPAdapter(max_retries=retries))
# Override default timeout
self._timeout = timeout
def __init__(self, *args, **kwargs):
super(RetryAdapter, self).__init__(*args, **kwargs)
self.max_retries = Retry(total=RETRY_COUNT,
backoff_factor=BACKOFF_FACTOR)
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
):
"""Opinionated wrapper that creates a requests session with a
HTTPAdapter that sets up a Retry policy that includes connection
retries.
If you do the more naive retry by simply setting a number. E.g.::
adapter = HTTPAdapter(max_retries=3)
then it will raise immediately on any connection errors.
Retrying on connection errors guards better on unpredictable networks.
From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter
it says: "By default, Requests does not retry failed connections."
The backoff_factor is documented here:
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
A default of retries=3 and backoff_factor=0.3 means it will sleep like::
[0.3, 0.6, 1.2]
""" # noqa
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def __init__(self, base_url, queries=None, **kwargs):
"""Constructor
Args:
base_url (str): the server's url
queries (Optional[Query]): the queries
"""
self.session = FuturesSession(max_workers=self.MAX_WORKERS)
retries = Retry(total=Connection.MAX_RETRIES, backoff_factor=1, status_forcelist=Connection.STATUS_FORCELIST)
self.session.mount(base_url, HTTPAdapter(max_retries=retries))
self.results = []
self.queries = queries
if kwargs:
if 'timeout' in kwargs:
self.TIMEOUT = kwargs['timeout']
if 'max_retries' in kwargs:
self.MAX_RETRIES = kwargs['max_retries']
if 'max_workers' in kwargs:
self.MAX_WORKERS = kwargs['max_workers']
if 'user_agent' in kwargs:
self.USER_AGENT = kwargs['user_agent']
if 'x_forwarded_for' in kwargs:
self.X_FORWARDED_FOR = utils.get_x_fwded_for_str(kwargs['x_forwarded_for'])
self.exec_queries()
def get_async_requests_session(num_retries, backoff_factor, pool_size,
status_forcelist=[500, 502, 503, 504]):
# Use requests & urllib3 to auto-retry.
# If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s,
# 0.4s, ...] between retries. It will also force a retry if the status
# code returned is in status_forcelist.
session = FuturesSession(max_workers=pool_size)
# If any regular response is generated, no retry is done. Without using
# the status_forcelist, even a response with status 500 will not be
# retried.
retries = Retry(total=num_retries, backoff_factor=backoff_factor,
status_forcelist=status_forcelist)
# Mount handler on both HTTP & HTTPS.
session.mount('http://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))
session.mount('https://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))
return session
# Evaluates the status of PTC and Niantic request futures, and returns the
# result (optionally with an error).
# Warning: blocking! Can only get status code if request has finished.
def download_webpage(target_url, proxy=None, timeout=5):
s = requests.Session()
retries = Retry(total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504])
s.mount('http://', HTTPAdapter(max_retries=retries))
headers = {
'User-Agent': ('Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) ' +
'Gecko/20100101 Firefox/54.0'),
'Referer': 'http://google.com'
}
r = s.get(target_url,
proxies={'http': proxy, 'https': proxy},
timeout=timeout,
headers=headers)
if r.status_code == 200:
return r.content
return None
# Sockslist.net uses javascript to obfuscate proxies port number.
# Builds a dictionary with decoded values for each variable.
# Dictionary = {'var': intValue, ...})
def _default_session(self):
session = requests.Session()
retries = Retry(
total=5,
backoff_factor=2,
status_forcelist=[502, 503, 504]
)
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def get_session(retries=5):
s = requests.Session()
r = Retry(total=retries, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
s.mount('http://', HTTPAdapter(max_retries=r))
s.mount('https://', HTTPAdapter(max_retries=r))
return s
def initializeRequestSession():
global requestSession
requestSession = requests.Session()
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504])
requestSession.mount('http://', HTTPAdapter(max_retries=retries))
requestSession.mount('https://', HTTPAdapter(max_retries=retries))
def __init__(self, workers=8):
retries = Retry(
total=self.max_retries,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504])
session = requests.Session()
session.mount('https://', HTTPAdapter(max_retries=retries))
self.session = session
self.pool = Pool(workers)
def _create_s3_session():
"""
Creates a session with automatic retries on 5xx errors.
"""
sess = requests.Session()
retries = Retry(total=3,
backoff_factor=.5,
status_forcelist=[500, 502, 503, 504])
sess.mount('https://', HTTPAdapter(max_retries=retries))
return sess
def get_session_retry(retries=3, backoff_factor=0.2, status_forcelist=(404, 500, 502, 504),
session=None):
"""Set HTTP Adapter with retries to session."""
session = session or requests.Session()
retry = Retry(total=retries, read=retries, connect=retries,
backoff_factor=backoff_factor, status_forcelist=status_forcelist)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
return session