def __init__(self, api_key, url=GCM_URL, proxy=None):
""" api_key : google api key
url: url of gcm service.
proxy: can be string "http://host:port" or dict {'https':'host:port'}
"""
self.api_key = api_key
self.url = url
if proxy:
if isinstance(proxy, basestring):
protocol = url.split(':')[0]
proxy = {protocol: proxy}
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(urllib2.ProxyHandler(proxy), auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
python类HTTPHandler()的实例源码
def __init__(self, *args, **kwargs):
self.args = args
self.kw = kwargs
urllib2.HTTPHandler.__init__(self)
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
LOG.info('Loading cookies error')
# install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support,
urllib2.HTTPHandler)
urllib2.install_opener(opener)
LOG.info('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: # If no cookies found
return self.do_login(username, pwd, cookie_file)
def save_cookie(self, text, cookie_file=CONF.cookie_file):
cookie_jar2 = cookielib.LWPCookieJar()
cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2)
opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler)
urllib2.install_opener(opener2)
if six.PY3:
text = text.decode('gbk')
p = re.compile('location\.replace\(\'(.*?)\'\)')
# ???httpfox??????????????
# location.replace('http://weibo.com ?????????
# ?????????????# ????login_url?? ??????re?????
# p = re.compile('location\.replace\(\B'(.*?)'\B\)')
# ??? ??????? re?????\'???????
try:
# Search login redirection URL
login_url = p.search(text).group(1)
data = urllib2.urlopen(login_url).read()
# Verify login feedback, check whether result is TRUE
patt_feedback = 'feedBackUrlCallBack\((.*)\)'
p = re.compile(patt_feedback, re.MULTILINE)
feedback = p.search(data).group(1)
feedback_json = json.loads(feedback)
if feedback_json['result']:
cookie_jar2.save(cookie_file,
ignore_discard=True,
ignore_expires=True)
return 1
else:
return 0
except:
return 0
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
#If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
print 'Loading cookies error'
#install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
print 'Loading cookies success'
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: #If no cookies found
return self.do_login(username, pwd, cookie_file)
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
print('Loading cookies error')
#install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
print('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: #If no cookies found
return self.do_login(username, pwd, cookie_file)
def _request(self, url, method='GET', data=None):
url = self._auth.endpoint + url
headers = self._auth.headers
if data is not None:
data = urlencode(data)
if method in ['GET', 'DELETE']:
url = url + '?' + data
data = None
else:
headers.update({'Content-Type': POST_CONTENT_TYPE})
if sys.version_info > (3,): # python3
data = data.encode('utf-8')
log.debug(method + ' ' + url)
log.debug(data)
try:
opener = build_opener(HTTPHandler)
request = Request(url, data=data, headers=headers)
request.get_method = lambda: method
response = opener.open(request).read()
data = self._parse_response(response)
except HTTPError as e:
log.error(e)
data = self._parse_response(e.read())
raise ApiHandlerError('Invalid server response', data)
except ValueError as e:
log.error(e)
raise ApiHandlerError('Invalid server response')
return data
def check(proxy):
import urllib2
url = 'http://www.baidu.com/js/bdsug.js?v=1.0.3.0'
proxy_handler = urllib2.ProxyHandler({'http': 'http://' + proxy})
opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler)
try:
response = opener.open(url, timeout=3)
return response.code == 200 and response.url == url
except Exception:
return False
def __init__(self, *args, **kwargs):
self.args = args
self.kw = kwargs
urllib2.HTTPHandler.__init__(self)
def __init__(self):
"""Build an HTTPS opener."""
# Based on pip 1.4.1's URLOpener
# This verifies certs on only Python >=2.7.9.
self._opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in self._opener.handlers:
if isinstance(handler, HTTPHandler):
self._opener.handlers.remove(handler)
def hashed_download(url, temp, digest):
"""Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
and return its path."""
# Based on pip 1.4.1's URLOpener but with cert verification removed. Python
# >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
# authenticity has only privacy (not arbitrary code execution)
# implications, since we're checking hashes.
def opener():
opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in opener.handlers:
if isinstance(handler, HTTPHandler):
opener.handlers.remove(handler)
return opener
def read_chunks(response, chunk_size):
while True:
chunk = response.read(chunk_size)
if not chunk:
break
yield chunk
response = opener().open(url)
path = join(temp, urlparse(url).path.split('/')[-1])
actual_hash = sha256()
with open(path, 'wb') as file:
for chunk in read_chunks(response, 4096):
file.write(chunk)
actual_hash.update(chunk)
actual_digest = actual_hash.hexdigest()
if actual_digest != digest:
raise HashError(url, path, actual_digest, digest)
return path
weibologin.py 文件源码
项目:SinaMicroblog_Creeper-Spider_VerificationCode
作者: somethingx64
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def EnableCookie(self, enableProxy):
#"Enable cookie & proxy (if needed)."
cookiejar = cookielib.LWPCookieJar()#construct cookie
cookie_support = urllib2.HTTPCookieProcessor(cookiejar)
if enableProxy:
proxy_support = urllib2.ProxyHandler({'http':'http://xxxxx.pac'})#use proxy
opener = urllib2.build_opener(proxy_support, cookie_support, urllib2.HTTPHandler)
print ("Proxy enabled")
else:
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)#construct cookie's opener
def send_response(event, context, response_status, reason=None, response_data={}):
response_body = {
"Status": response_status,
"PhysicalResourceId": context.log_stream_name,
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
}
if reason:
response_body["Reason"] = reason
if response_data:
response_body["Data"] = response_data
response_body = json.dumps(response_body)
opener = build_opener(HTTPHandler)
request = Request(event["ResponseURL"], data=response_body)
request.add_header("Content-Type", "")
request.add_header("Content-Length", len(response_body))
request.get_method = lambda: "PUT"
try:
response = opener.open(request)
print("Status code: {}".format(response.getcode()))
print("Status message: {}".format(response.msg))
return True
except HTTPError as exc:
print("Failed executing HTTP request: {}".format(exc.code))
return False
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None):
response_data = response_data or {}
response_body = json.dumps(
{
'Status': response_status,
'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name,
'PhysicalResourceId': physical_resource_id or context.log_stream_name,
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': response_data
}
)
opener = build_opener(HTTPHandler)
request = Request(event['ResponseURL'], data=response_body)
request.add_header('Content-Type', '')
request.add_header('Content-Length', len(response_body))
request.get_method = lambda: 'PUT'
try:
response = opener.open(request)
print("Status code: {}".format(response.getcode()))
print("Status message: {}".format(response.msg))
return True
except HTTPError as exc:
print("Failed executing HTTP request: {}".format(exc.code))
return False
def http_request(self, req):
"""Handle a HTTP request. Make sure that Content-Length is specified
if we're using an interable value"""
# Make sure that if we're using an iterable object as the request
# body, that we've also specified Content-Length
if req.has_data():
data = req.get_data()
if hasattr(data, 'read') or hasattr(data, 'next'):
if not req.has_header('Content-length'):
raise ValueError(
"No Content-Length specified for iterable body")
return urllib2.HTTPHandler.do_request_(self, req)
def __build_opener(self):
"""Build opener"""
self.opener = urllib2.build_opener(self.cookie_processor, urllib2.HTTPHandler)
def request(target, httpsproxy=None, useragent=None):
global contenttype
if not useragent:
useragent = "Mozilla/5.0 (X11; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0 Iceweasel/22.0"
else:
print "["+ bc.G + "+" + bc.ENDC + "] User-Agent: " + useragent
if httpsproxy:
print "["+ bc.G + "+" + bc.ENDC + "] Proxy: " + httpsproxy + "\n"
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': 'http://' + httpsproxy}))
urllib2.install_opener(opener)
postdata = [('remoteAddress',target),('key','')]
postdata = urllib.urlencode(postdata)
request = urllib2.Request(url, postdata)
request.add_header("Content-type", contenttype)
request.add_header("User-Agent", useragent)
try:
result = urllib2.urlopen(request).read()
except urllib2.HTTPError, e:
print "Error: " + e.code
except urllib2.URLError, e:
print "Error: " + e.args
obj = json.loads(result)
return obj
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def init(self, proxy=None):
cj = cookielib.LWPCookieJar()
cookie_support = urllib2.HTTPCookieProcessor(cj)
if proxy:
proxy_support = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy_support, cookie_support, urllib2.HTTPHandler)
else:
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
#print 'seton'