def download_vcpython27(self):
"""
Download vcpython27 since some Windows 7 boxes have it and some don't.
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of vcpython27... this may take a few minutes...')
with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of vcpython27 complete')
python类HTTPSHandler()的实例源码
def download_python(self):
"""
Download Python
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of python')
with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of python complete')
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
handlers = []
if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or
(sys.version_info[0] == 3 and sys.version_info >= (3,2,0))):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
handlers.append(urllib2.HTTPSHandler(context=context))
if sessions:
handlers.append(urllib2.HTTPCookieProcessor(CookieJar()))
opener = urllib2.build_opener(*handlers)
self.request_opener = opener.open
self._timeout = timeout
def get_opener():
autoproxy = '127.0.0.1:8087'
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
if not os.path.isfile(cafile):
cafile = None
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def get_opener():
autoproxy = '127.0.0.1:8087'
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
if not os.path.isfile(cafile):
cafile = None
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def get_access_token(self, code, state=None):
'''
In callback url: http://host/callback?code=123&state=xyz
use code and state to get an access token.
'''
kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code)
if self._redirect_uri:
kw['redirect_uri'] = self._redirect_uri
if state:
kw['state'] = state
opener = build_opener(HTTPSHandler)
request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw))
request.get_method = _METHOD_MAP['POST']
request.add_header('Accept', 'application/json')
try:
response = opener.open(request, timeout=TIMEOUT)
r = _parse_json(response.read())
if 'error' in r:
raise ApiAuthError(str(r.error))
return str(r.access_token)
except HTTPError as e:
raise ApiAuthError('HTTPError when get access token')
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
handlers = []
if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or
(sys.version_info[0] == 3 and sys.version_info >= (3,2,0))):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
handlers.append(urllib2.HTTPSHandler(context=context))
if sessions:
handlers.append(urllib2.HTTPCookieProcessor(CookieJar()))
opener = urllib2.build_opener(*handlers)
self.request_opener = opener.open
self._timeout = timeout
def make_request(*args):
if platform.system() == "Windows": #pragma: no cover
sctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sh = urllib2.HTTPSHandler(debuglevel=0, context=sctx)
opener = urllib2.build_opener(sh)
else:
opener = build_opener()
opener.addheaders = [('User-agent',
'Mozilla/5.0' + str(random.randrange(1000000)))]
try:
return opener.open(*args).read().strip()
except Exception as e:
try:
p = e.read().strip()
except:
p = e
raise Exception(p)
def get_opener():
autoproxy = '127.0.0.1:8087'
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
if not os.path.isfile(cafile):
cafile = None
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def _init_urllib(self, secure, debuglevel=0):
cj = cookielib.CookieJar()
no_proxy_support = urllib2.ProxyHandler({})
cookie_handler = urllib2.HTTPCookieProcessor(cj)
ctx = None
if not secure:
self._logger.info('[WARNING] Skip certificate verification.')
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx)
opener = urllib2.build_opener(no_proxy_support,
cookie_handler,
https_handler,
MultipartPostHandler.MultipartPostHandler)
opener.addheaders = [('User-agent', API_USER_AGENT)]
urllib2.install_opener(opener)
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
_debug = 0
self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)
self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self.http_handler = urllib.HTTPHandler(debuglevel=_debug)
self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def get_opener():
autoproxy = '127.0.0.1:8087'
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
if not os.path.isfile(cafile):
cafile = None
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def __init__(self, proxy=None, debuglevel=0):
self.proxy = proxy
urllib2.HTTPSHandler.__init__(self, debuglevel)
def do_open(self, http_class, req):
if self.proxy is not None:
req.set_proxy(self.proxy, "https")
return urllib2.HTTPSHandler.do_open(self, ProxyHTTPSConnection, req)
def __init__(self, cert=None, verify=True):
urllib2.HTTPSHandler.__init__(self)
if cert is None:
certfile = None
keyfile = None
elif isinstance(cert, basestring):
certfile = cert
keyfile = cert
else:
certfile, keyfile = cert
if isinstance(verify, basestring):
require_cert = True
ca_certs = verify
elif verify is True:
require_cert = True
ca_certs = None
elif verify is False:
require_cert = False
ca_certs = None
else:
raise TypeError("\"verify\" parameter must be a boolean or a string")
self._certfile = certfile
self._keyfile = keyfile
self._require_cert = require_cert
self._ca_certs = ca_certs
def __init__(self, ssl_config): # pylint: disable=E1002
"""Initialize"""
if PY2:
urllib2.HTTPSHandler.__init__(self)
else:
super().__init__() # pylint: disable=W0104
self._ssl_config = ssl_config
def __init__(self):
urllib2.HTTPSHandler.__init__(self)
def __init__(self):
"""Build an HTTPS opener."""
# Based on pip 1.4.1's URLOpener
# This verifies certs on only Python >=2.7.9.
self._opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in self._opener.handlers:
if isinstance(handler, HTTPHandler):
self._opener.handlers.remove(handler)
def hashed_download(url, temp, digest):
"""Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
and return its path."""
# Based on pip 1.4.1's URLOpener but with cert verification removed. Python
# >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
# authenticity has only privacy (not arbitrary code execution)
# implications, since we're checking hashes.
def opener():
opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in opener.handlers:
if isinstance(handler, HTTPHandler):
opener.handlers.remove(handler)
return opener
def read_chunks(response, chunk_size):
while True:
chunk = response.read(chunk_size)
if not chunk:
break
yield chunk
response = opener().open(url)
path = join(temp, urlparse(url).path.split('/')[-1])
actual_hash = sha256()
with open(path, 'wb') as file:
for chunk in read_chunks(response, 4096):
file.write(chunk)
actual_hash.update(chunk)
actual_digest = actual_hash.hexdigest()
if actual_digest != digest:
raise HashError(url, path, actual_digest, digest)
return path
def https_request(self, req):
# Make sure that if we're using an iterable object as the request
# body, that we've also specified Content-Length
if req.has_data():
data = req.get_data()
if hasattr(data, 'read') or hasattr(data, 'next'):
if not req.has_header('Content-length'):
raise ValueError(
"No Content-Length specified for iterable body")
return urllib2.HTTPSHandler.do_request_(self, req)
def __init__(self):
urllib2.HTTPSHandler.__init__(self)
def __init__(self):
urllib2.HTTPSHandler.__init__(self)
def get_opener():
autoproxy = '127.0.0.1:%s' % config.LISTEN_PORT
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def __init__(self, ssl_config): #pylint: disable=E1002
"""Initialize"""
if PY2:
urllib2.HTTPSHandler.__init__(self)
else:
super().__init__() # pylint: disable=W0104
self._ssl_config = ssl_config
def get_opener():
autoproxy = '127.0.0.1:%s' % config.LISTEN_PORT
import ssl
if getattr(ssl, "create_default_context", None):
cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=cafile)
https_handler = urllib2.HTTPSHandler(context=context)
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
else:
opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
return opener
def __init__(self, connection_class=VerifiedHTTPSConnection):
self.specialized_conn_class = connection_class
urllib2.HTTPSHandler.__init__(self)
def __init__(self, connection_class=VerifiedHTTPSConnection):
self.specialized_conn_class = connection_class
urllib2.HTTPSHandler.__init__(self)