def __init__(self,
con_pool_size=1,
proxy_url=None,
urllib3_proxy_kwargs=None,
connect_timeout=5.,
read_timeout=5.):
if urllib3_proxy_kwargs is None:
urllib3_proxy_kwargs = dict()
self._connect_timeout = connect_timeout
kwargs = dict(
maxsize=con_pool_size,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where(),
socket_options=HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
],
timeout=urllib3.Timeout(
connect=self._connect_timeout, read=read_timeout, total=None))
# Set a proxy according to the following order:
# * proxy defined in proxy_url (+ urllib3_proxy_kwargs)
# * proxy set in `HTTPS_PROXY` env. var.
# * proxy set in `https_proxy` env. var.
# * None (if no proxy is configured)
if not proxy_url:
proxy_url = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')
if not proxy_url:
if urllib3.contrib.appengine.is_appengine_sandbox():
# Use URLFetch service if running in App Engine
mgr = urllib3.contrib.appengine.AppEngineManager()
else:
mgr = urllib3.PoolManager(**kwargs)
else:
kwargs.update(urllib3_proxy_kwargs)
if proxy_url.startswith('socks'):
try:
from urllib3.contrib.socks import SOCKSProxyManager
except ImportError:
raise RuntimeError('PySocks is missing')
mgr = SOCKSProxyManager(proxy_url, **kwargs)
else:
mgr = urllib3.proxy_from_url(proxy_url, **kwargs)
if mgr.proxy.auth:
# TODO: what about other auth types?
auth_hdrs = urllib3.make_headers(proxy_basic_auth=mgr.proxy.auth)
mgr.proxy_headers.update(auth_hdrs)
self._con_pool = mgr
评论列表
文章目录