def download5(url, user_agent='wswp', proxy=None, num_retries=2):
"""Download function with support for proxies"""
print 'Downloading:', url
headers = {'User-agent': user_agent}
request = urllib2.Request(url, headers=headers)
opener = urllib2.build_opener()
if proxy:
proxy_params = {urlparse.urlparse(url).scheme: proxy}
opener.add_handler(urllib2.ProxyHandler(proxy_params))
try:
html = opener.open(request).read()
except urllib2.URLError as e:
print 'Download error:', e.reason
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# retry 5XX HTTP errors
html = download5(url, user_agent, proxy, num_retries-1)
return html
python类build_opener()的实例源码
def register(first_name, last_name, email, password, captcha_fn):
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
html = opener.open(REGISTER_URL).read()
form = parse_form(html)
form['first_name'] = first_name
form['last_name'] = last_name
form['email'] = email
form['password'] = form['password_two'] = password
img = extract_image(html)
captcha = captcha_fn(img)
form['recaptcha_response_field'] = captcha
encoded_data = urllib.urlencode(form)
request = urllib2.Request(REGISTER_URL, encoded_data)
response = opener.open(request)
success = '/user/register' not in response.geturl()
return success
def download(self, url, headers, proxy, num_retries, data=None):
print 'Downloading:', url
request = urllib2.Request(url, data, headers or {})
opener = self.opener or urllib2.build_opener()
if proxy:
proxy_params = {urlparse.urlparse(url).scheme: proxy}
opener.add_handler(urllib2.ProxyHandler(proxy_params))
try:
response = opener.open(request)
html = response.read()
code = response.code
except Exception as e:
print 'Download error:', str(e)
html = ''
if hasattr(e, 'code'):
code = e.code
if num_retries > 0 and 500 <= code < 600:
# retry 5XX HTTP errors
return self._get(url, headers, proxy, num_retries-1, data)
else:
code = None
return {'html': html, 'code': code}
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def get_access_token(self, code, state=None):
'''
In callback url: http://host/callback?code=123&state=xyz
use code and state to get an access token.
'''
kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code)
if self._redirect_uri:
kw['redirect_uri'] = self._redirect_uri
if state:
kw['state'] = state
opener = build_opener(HTTPSHandler)
request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw))
request.get_method = _METHOD_MAP['POST']
request.add_header('Accept', 'application/json')
try:
response = opener.open(request, timeout=TIMEOUT)
r = _parse_json(response.read())
if 'error' in r:
raise ApiAuthError(str(r.error))
return str(r.access_token)
except HTTPError as e:
raise ApiAuthError('HTTPError when get access token')
def check_single_proxy_status(self, proxy_address, domain_check):
try:
parse = urlparse(proxy_address)
proxy_scheme = parse.scheme
proxy = str(parse.hostname) + ':' + str(parse.port)
proxy_handler = urllib2.ProxyHandler({ proxy_scheme: proxy})
opener = urllib2.build_opener(proxy_handler)
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')]
urllib2.install_opener(opener)
req = urllib2.Request(domain_check)
start_time = time.time()
sock = urllib2.urlopen(req)
end_time = time.time()
diff_time = round(end_time - start_time, 3)
log.console_log(Y + "{}[+] {} OK! Response Time : {}s".format(Y, proxy_address, str(diff_time), W ))
return 'ok'
except urllib2.HTTPError, e:
print('Error code: ' + str(e.code))
return e.code
except Exception, detail:
print('ERROR ' + str(detail))
return 1
def get(url):
# Build and open the URL
opener = urllib2.build_opener()
opener.addheaders = [('User-Agent', 'Mozilla/5.0')]
response = opener.open(url)
# HLTV redicrects to a .rar or .zip file
final_url = response.geturl()
# Gets the filename (everything after the last trailing /)
filename = final_url.rsplit('/', 1)[-1]
# Gets the Content-Length from the metadata from final_url
filesize = (int(urllib.urlopen(final_url).info().getheaders("Content-Length")[0])/1024)/1024
# Tell user we are downloading filesize
print "Starting %s: %s MB." % (filename, filesize)
# Downloads the file to the directory the user enters
urllib.urlretrieve(final_url, directory+"/"+filename)
# Tell user the current status and file information
print "Completed %s: %s MB." % (filename, filesize)
return filesize
def getRankList(params):
url = 'http://www.newrank.cn/xdnphb/list/month/rank'
datas = getsign('/xdnphb/list/month/rank',params)
time.sleep(np.random.rand() * 5)
try:
data = urllib.urlencode(datas)
request = urllib2.Request(url)
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor())
result = opener.open(request,data).read()
#print params['rank_name']+'?????????'+str(result)
if result:
return json.loads(result)
else:
return {'value':[]}
except (urllib2.HTTPError, urllib2.URLError), e:
print e
return {'value':[]}
#??firefox
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def download_file_chunk(url, buf):
opener = urllib2.build_opener()
opener.addheaders = [('User-Agent', "DropboxLinuxDownloader/1.6.2")]
sock = opener.open(url)
size = int(sock.info()['content-length'])
bufsize = max(size / 200, 4096)
progress = 0
with closing(sock) as f:
yield (0, True)
while True:
try:
chunk = f.read(bufsize)
progress += len(chunk)
buf.write(chunk)
yield (float(progress)/size, True)
if progress == size:
break
except OSError as e:
if hasattr(e, 'errno') and e.errno == errno.EAGAIN:
# nothing left to read
yield (float(progress)/size, False)
else:
raise
def check_gn_proxy(proxy, protocal_type='HTTP'):
url = 'http://icanhazip.com'
proxy_handler = urllib2.ProxyHandler({
'http': 'http://' + proxy,
'https': 'https://' + proxy,
})
if protocal_type == 'HTTPS':
url = 'https://icanhazip.com'
opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler)
try:
response = opener.open(url, timeout=3)
res_ip = response.read().strip()
return response.code == 200 and res_ip == proxy.split(':')[0]
except Exception:
return False
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
global _opener
if _opener is None:
_opener = build_opener()
return _opener.open(url, data, timeout)
def build_opener(*handlers):
"""Create an opener object from a list of handlers.
The opener will use several default handlers, including support
for HTTP, FTP and when applicable, HTTPS.
If any of the handlers passed as arguments are subclasses of the
default handlers, the default handlers will not be used.
"""
import types
def isclass(obj):
return isinstance(obj, (types.ClassType, type))
opener = OpenerDirector()
default_classes = [ProxyHandler, UnknownHandler, HTTPHandler,
HTTPDefaultErrorHandler, HTTPRedirectHandler,
FTPHandler, FileHandler, HTTPErrorProcessor]
if hasattr(httplib, 'HTTPS'):
default_classes.append(HTTPSHandler)
skip = set()
for klass in default_classes:
for check in handlers:
if isclass(check):
if issubclass(check, klass):
skip.add(klass)
elif isinstance(check, klass):
skip.add(klass)
for klass in skip:
default_classes.remove(klass)
for klass in default_classes:
opener.add_handler(klass())
for h in handlers:
if isclass(h):
h = h()
opener.add_handler(h)
return opener
def _create_opener(self):
import urllib2
return urllib2.build_opener()