def __init__(self, credentials,
refresh_status_codes=transport.DEFAULT_REFRESH_STATUS_CODES,
max_refresh_attempts=transport.DEFAULT_MAX_REFRESH_ATTEMPTS,
refresh_timeout=None,
**kwargs):
super(AuthorizedSession, self).__init__(**kwargs)
self.credentials = credentials
self._refresh_status_codes = refresh_status_codes
self._max_refresh_attempts = max_refresh_attempts
self._refresh_timeout = refresh_timeout
auth_request_session = requests.Session()
# Using an adapter to make HTTP requests robust to network errors.
# This adapter retrys HTTP requests when network errors occur
# and the requests seems safely retryable.
retry_adapter = requests.adapters.HTTPAdapter(max_retries=3)
auth_request_session.mount("https://", retry_adapter)
# Request instance used by internal methods (for example,
# credentials.refresh).
# Do not pass `self` as the session here, as it can lead to infinite
# recursion.
self._auth_request = Request(auth_request_session)
python类adapters()的实例源码
requests.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
"""Initializes a urllib3 PoolManager. This method should not be called
from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param connections: The number of urllib3 connection pools to cache.
:param maxsize: The maximum number of connections to save in the pool.
:param block: Block when no free connections are available.
"""
# save these values for pickling
self._pool_connections = connections
self._pool_maxsize = maxsize
self._pool_block = block
self.poolmanager = AsyncPoolManager(num_pools=connections, maxsize=maxsize, block=block)
self.connections = []
def get_connection(self, url, proxies=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
"""
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
proxy_headers = self.proxy_headers(proxy)
if proxy not in self.proxy_manager:
self.proxy_manager[proxy] = proxy_from_url(
proxy,
proxy_headers=proxy_headers,
num_pools=self._pool_connections,
maxsize=self._pool_maxsize,
block=self._pool_block
)
conn = self.proxy_manager[proxy].connection_from_url(url)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url)
self.connections.append(conn)
return conn
def __init__(self, endpoint=None, application_key=None,
application_secret=None, consumer_key=None, timeout=TIMEOUT):
from requests import Session
from requests.adapters import HTTPAdapter
self._endpoint = ENDPOINTS[endpoint]
self._application_key = application_key
self._application_secret = application_secret
self._consumer_key = consumer_key
# lazy load time delta
self._time_delta = None
try:
# Some older versions of requests to not have the urllib3
# vendorized package
from requests.packages.urllib3.util.retry import Retry
except ImportError:
retries = 5
else:
# use a requests session to reuse connections between requests
retries = Retry(
total=5,
backoff_factor=0.2,
status_forcelist=[422, 500, 502, 503, 504]
)
self._session = Session()
self._session.mount('https://', HTTPAdapter(max_retries=retries))
self._session.mount('http://', HTTPAdapter(max_retries=retries))
# Override default timeout
self._timeout = timeout
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
):
"""Opinionated wrapper that creates a requests session with a
HTTPAdapter that sets up a Retry policy that includes connection
retries.
If you do the more naive retry by simply setting a number. E.g.::
adapter = HTTPAdapter(max_retries=3)
then it will raise immediately on any connection errors.
Retrying on connection errors guards better on unpredictable networks.
From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter
it says: "By default, Requests does not retry failed connections."
The backoff_factor is documented here:
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
A default of retries=3 and backoff_factor=0.3 means it will sleep like::
[0.3, 0.6, 1.2]
""" # noqa
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def __init__(self):
spider.Fetcher.__init__(self, max_repeat=3, sleep_time=0)
self.session = requests.Session()
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100))
self.clear_session()
return
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
s.mount('http://github.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://github.com',
'http://github',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://gittip.com/about/',
'http://github.com',
'http://gittip.com',
'http://github',
'http://gittip',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
def test_session_close_proxy_clear(self, mocker):
proxies = {
'one': mocker.Mock(),
'two': mocker.Mock(),
}
session = requests.Session()
mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
session.close()
proxies['one'].clear.assert_called_once_with()
proxies['two'].clear.assert_called_once_with()
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
s.mount('http://github.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://github.com',
'http://github',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://gittip.com/about/',
'http://github.com',
'http://gittip.com',
'http://github',
'http://gittip',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
def test_session_close_proxy_clear(self, mocker):
proxies = {
'one': mocker.Mock(),
'two': mocker.Mock(),
}
session = requests.Session()
mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
session.close()
proxies['one'].clear.assert_called_once_with()
proxies['two'].clear.assert_called_once_with()
def test_session_close_proxy_clear(self, mocker):
proxies = {
'one': mocker.Mock(),
'two': mocker.Mock(),
}
session = requests.Session()
mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
session.close()
proxies['one'].clear.assert_called_once_with()
proxies['two'].clear.assert_called_once_with()
def cancel(self):
for v in self.adapters.values():
v.close()
v.cancel()
def run(self):
self._executor = ThreadPoolExecutor(self.concurrency)
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=self.concurrency, pool_maxsize=self.concurrency)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.session.verify = self.verify_ssl
t0 = time()
last_report = time()
i = 0
r = None
for r in self.perform_requests():
if r is not True:
i += 1
self.ui.info('{} responses sent | time elapsed {}s'
.format(i, time() - t0))
if time() - last_report > REPORT_INTERVAL:
self.progress_queue.put((
ProgressQueueMsg.NETWORK_PROGRESS, {
"processed": self.n_requests,
"retried": self.n_retried,
"consumed": self.n_consumed,
"rusage": get_rusage(),
}))
last_report = time()
self.progress_queue.put((ProgressQueueMsg.NETWORK_DONE, {
"ret": r,
"processed": self.n_requests,
"retried": self.n_retried,
"consumed": self.n_consumed,
"rusage": get_rusage(),
}))
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
s.mount('http://github.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://github.com',
'http://github',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://gittip.com/about/',
'http://github.com',
'http://gittip.com',
'http://github',
'http://gittip',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
def __init__(self, access_key=None, secret_key=None, https_proxy=None,
insecure=False, endpoint=None, search_endpoint=None):
"""Initializes a new API connection.
Args:
access_key (str, optitonal): The user's Matchlight Public
API access key. If not passed as an argument this value
must be set using the ``MATCHLIGHT_ACCESS_KEY``
environment variable.
secret_key (str, optional): The user's Matchlight Public
API access key. If not passed as an argument this value
must be set using the ``MATCHLIGHT_SECRET_KEY``
environment variable.
https_proxy (str): A string defining the HTTPS proxy to
use. Defaults to None.
insecure (bool, optional): Whether or not to verify
certificates for the HTTPS proxy. Defaults to ``False``
(certificates will be verified).
endpoint (str, optional): Base URL for requests. Defaults
to ``'https://api.matchlig.ht/api/v2'``.
search_endpoint (str, optional): Base URL for all search
API requests.
"""
if access_key is None:
access_key = os.environ.get('MATCHLIGHT_ACCESS_KEY', None)
if secret_key is None:
secret_key = os.environ.get('MATCHLIGHT_SECRET_KEY', None)
if access_key is None or secret_key is None:
raise matchlight.error.SDKError(
'The APIConnection object requires your Matchlight '
'API access_key and secret_key either be passed as input '
'parameters or set in the MATCHLIGHT_ACCESS_KEY and '
'MATCHLIGHT_SECRET_KEY environment variables.')
if endpoint is None:
endpoint = MATCHLIGHT_API_URL_V2
if search_endpoint is None:
search_endpoint = MATCHLIGHT_API_URL_V2
self.access_key = access_key
self.secret_key = secret_key
self.proxy = {'https': https_proxy}
self.insecure = insecure
self.endpoint = endpoint
self.search_endpoint = search_endpoint
self.session = requests.Session()
self.session.mount(
self.endpoint,
requests.adapters.HTTPAdapter(
max_retries=requests_urllib3.util.Retry(
total=5, status_forcelist=[500, 502, 503, 504])),
)
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
s.mount('http://github.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://github.com',
'http://github',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
order = [
'http://github.com/about/',
'http://gittip.com/about/',
'http://github.com',
'http://gittip.com',
'http://github',
'http://gittip',
'http://git',
'https://',
'http://',
]
assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters