def __init__(self, connection_class=UnverifiedHTTPSConnection):
self.specialized_conn_class = connection_class
urllib2.HTTPSHandler.__init__(self)
python类HTTPSHandler()的实例源码
def _additional_handlers(self):
handlers = []
if self.session.get('proxy'):
protocol, host, port = self._get_proxy()
if protocol and host and port:
handlers.append(
sockshandler.SocksiPyHandler(
protocol,
host,
port
)
)
else:
raise ChannelException(messages.channels.error_proxy_format)
# Skip certificate checks
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
handlers.append(urllib2.HTTPSHandler(context=ctx))
return handlers
def request(target, httpsproxy=None, useragent=None):
global contenttype
if not useragent:
useragent = "Mozilla/5.0 (X11; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0 Iceweasel/22.0"
else:
print "["+ bc.G + "+" + bc.ENDC + "] User-Agent: " + useragent
if httpsproxy:
print "["+ bc.G + "+" + bc.ENDC + "] Proxy: " + httpsproxy + "\n"
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': 'http://' + httpsproxy}))
urllib2.install_opener(opener)
postdata = [('remoteAddress',target),('key','')]
postdata = urllib.urlencode(postdata)
request = urllib2.Request(url, postdata)
request.add_header("Content-type", contenttype)
request.add_header("User-Agent", useragent)
try:
result = urllib2.urlopen(request).read()
except urllib2.HTTPError, e:
print "Error: " + e.code
except urllib2.URLError, e:
print "Error: " + e.args
obj = json.loads(result)
return obj
def _http(self, _method, _path, **kw):
data = None
params = None
if _method=='GET' and kw:
_path = '%s?%s' % (_path, _encode_params(kw))
if _method in ['POST', 'PATCH', 'PUT']:
data = bytes(_encode_json(kw), 'utf-8')
url = '%s%s' % (_URL, _path)
opener = build_opener(HTTPSHandler)
request = Request(url, data=data)
request.get_method = _METHOD_MAP[_method]
if self._authorization:
request.add_header('Authorization', self._authorization)
if _method in ['POST', 'PATCH', 'PUT']:
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
try:
response = opener.open(request, timeout=TIMEOUT)
is_json = self._process_resp(response.headers)
if is_json:
return _parse_json(response.read().decode('utf-8'))
except HTTPError as e:
is_json = self._process_resp(e.headers)
if is_json:
json = _parse_json(e.read().decode('utf-8'))
else:
json = e.read().decode('utf-8')
req = JsonObject(method=_method, url=url)
resp = JsonObject(code=e.code, json=json)
if resp.code==404:
raise ApiNotFoundError(url, req, resp)
raise ApiError(url, req, resp)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
request = urllib.Request(url)
# print('API request url:', request.get_full_url())
if method:
request.get_method = lambda: method
token = token if token != None else token_auth_string()
request.add_header('Authorization', 'token ' + token)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if data is not None:
request.add_data(bytes(data.encode('utf8')))
# print('API request data:', request.get_data())
# print('API request header:', request.header_items())
# https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
# if https_proxy:
# opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
# urllib.ProxyHandler({'https': https_proxy}))
# urllib.install_opener(opener)
try:
with contextlib.closing(urllib.urlopen(request)) as response:
if response.code == 204: # No Content
return None
else:
return json.loads(response.read().decode('utf8', 'ignore'))
except urllib.HTTPError as err:
with contextlib.closing(err):
raise SimpleHTTPError(err.code, err.read())
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def __init__(self):
"""Build an HTTPS opener."""
# Based on pip 1.4.1's URLOpener
# This verifies certs on only Python >=2.7.9.
self._opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in self._opener.handlers:
if isinstance(handler, HTTPHandler):
self._opener.handlers.remove(handler)
def hashed_download(url, temp, digest):
"""Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
and return its path."""
# Based on pip 1.4.1's URLOpener but with cert verification removed. Python
# >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
# authenticity has only privacy (not arbitrary code execution)
# implications, since we're checking hashes.
def opener():
opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in opener.handlers:
if isinstance(handler, HTTPHandler):
opener.handlers.remove(handler)
return opener
def read_chunks(response, chunk_size):
while True:
chunk = response.read(chunk_size)
if not chunk:
break
yield chunk
response = opener().open(url)
path = join(temp, urlparse(url).path.split('/')[-1])
actual_hash = sha256()
with open(path, 'wb') as file:
for chunk in read_chunks(response, 4096):
file.write(chunk)
actual_hash.update(chunk)
actual_digest = actual_hash.hexdigest()
if actual_digest != digest:
raise HashError(url, path, actual_digest, digest)
return path
def debug():
""" Activate debugging on urllib2 """
handler = HTTPSHandler(debuglevel = 1)
opener = build_opener(handler)
install_opener(opener)
# Store properties for all requests
def __init__(self, auth_file):
urllib2.HTTPSHandler.__init__(self)
self.auth_file = auth_file
def _GetHandlers(self):
return [urllib2.HTTPSHandler()]
def __init__(self, headers = {},debug = True, p = ''):
#timeout
self.timeout = 10
#cookie handler
self.cookie_processor = urllib2.HTTPCookieProcessor(cookielib.LWPCookieJar())
#debug handler
self.debug = debug
if self.debug:
self.httpHandler = urllib2.HTTPHandler(debuglevel=1)
self.httpsHandler = urllib2.HTTPSHandler(debuglevel=1)
else:
self.httpHandler = urllib2.HTTPHandler(debuglevel=0)
self.httpsHandler = urllib2.HTTPSHandler(debuglevel=0)
#proxy handler (http)
if p != '' and p != 'None' and p != None and p != 'NULL':
self.proxy_handler = urllib2.ProxyHandler({'http': p})
else:
self.proxy_handler = urllib2.ProxyHandler({})
#opener
self.opener = urllib2.build_opener( self.cookie_processor,self.proxy_handler, self.httpHandler, self.httpsHandler)
self.opener.addheaders = [('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'),]
#header
for key in headers.keys():
cur=self._replace(key)
if cur!=-1:
self.opener.addheaders.pop(cur)
self.opener.addheaders += [(key, headers[key]), ]
def __init__(self, *args, **kwargs):
try:
kwargs['context'] = ssl._create_unverified_context()
except AttributeError:
# Python prior to 2.7.9 doesn't have default-enabled certificate
# verification
pass
urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
#cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
#openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def openUrl(self,url, ischunkDownloading=False):
try:
post=None
openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
return response
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def __init__(self):
self.cookies = cookielib.LWPCookieJar()
self.handlers = (urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.HTTPCookieProcessor(self.cookies))
self.opener = urllib2.build_opener(*self.handlers)
SignAndSearch.py 文件源码
项目:relational-social-media-search-engine
作者: indervirbanipal
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def __init__(self, login, password):
""" Start up... """
self.login = login
self.password = password
# Simulate browser with cookies enabled
self.cj = cookielib.MozillaCookieJar(cookie_filename)
'''
Creating settings for the proxy
'''
# proxy_handler = urllib2.ProxyHandler({'http':'209.222.25.83:3128'})
# 216.58.194.113
# proxy_handler = urllib2.ProxyHandler({'http':'8.8.8.8'})
proxy_handler = urllib2.ProxyHandler({'http':'notional-sign-110911.appspot.com'})
# proxy_auth_handler = urllib2.ProxyBasicAuthHandler()
if os.access(cookie_filename, os.F_OK):
self.cj.load()
self.opener = urllib2.build_opener(
urllib2.HTTPRedirectHandler(),
urllib2.HTTPHandler(debuglevel=0),
urllib2.HTTPSHandler(debuglevel=0),
proxy_handler,
urllib2.HTTPCookieProcessor(self.cj)
)
self.opener.addheaders = [
('User-agent', ('Mozilla/4.0 (compatible; MSIE 6.0; '
'Windows NT 5.2; .NET CLR 1.1.4322)'))
]
SignAndSearch.py 文件源码
项目:relational-social-media-search-engine
作者: indervirbanipal
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def performFullSearch(self, searchParams, dbHost, dbPort, dbName):
""" Performs search and Saves the information gathered into DB. This method almost performs everything this class is created for """
print "inside Perform Search ... "
try:
#self.login = login
#self.password = password
# Simulate browser with cookies enabled
self.cj = cookielib.MozillaCookieJar(cookie_filename)
if os.access(cookie_filename, os.F_OK):
self.cj.load()
self.opener = urllib2.build_opener(
urllib2.HTTPRedirectHandler(),
urllib2.HTTPHandler(debuglevel=0),
urllib2.HTTPSHandler(debuglevel=0),
urllib2.HTTPCookieProcessor(self.cj)
)
self.opener.addheaders = [
('User-agent', ('Mozilla/4.0 (compatible; MSIE 6.0; '
'Windows NT 5.2; .NET CLR 1.1.4322)'))
]
self.checkLogin(url1)
fName = searchParams['firstName']
mailId = searchParams['email']
if fName == 'EMPTY' or mailId == 'EMPTY':
raise Exception('Info: Search has to be performed from Search page only, Please try again', 'Info')
fSrchURL = self.formSearchURL(searchParams)
linkedJSON = self.loadSearch(fSrchURL, fName)
recordJSON = self.formTrimmedJSON(linkedJSON)
dbRecord = self.formDBRecord(recordJSON, mailId)
client = self.connect2DB(dbHost, dbPort)
print "Client details : "+client.__str__()
self.store2DB(dbRecord, mailId, client)
return 'Success'
except Exception as e:
x,y = e.args
return x