def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
if url is None:
return None
try:
req = request.Request(url, headers=headers, data=data)
cookie = cookiejar.CookieJar()
cookie_process = request.HTTPCookieProcessor(cookie)
opener = request.build_opener()
if proxy:
proxies = {urlparse(url).scheme: proxy}
opener.add_handler(request.ProxyHandler(proxies))
content = opener.open(req).read()
except error.URLError as e:
print('HtmlDownLoader download error:', e.reason)
content = None
if retry_count > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
#??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
return self.download(url, retry_count-1, headers, proxy, data)
return content
python类ProxyHandler()的实例源码
def crawl_feed(self, feed_url: str=None) -> List[str]:
urls = []
if not feed_url:
feed_url = constants.rss_url
feed = feedparser.parse(
feed_url,
handlers=ProxyHandler,
request_headers=self.settings.requests_headers
)
for item in feed['items']:
if any([item['title'].startswith(category) for category in self.own_settings.accepted_rss_categories]):
urls.append(item['link'])
return urls
def proxyurllib():
print(COLOR_GREEN+'-'*30+COLOR_NONE)
#TODO proxy
handler=request.ProxyHandler({'http':'http://10.112.5.173:49908'})
'''
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler()
proxy_auth_handler.add_password('realm', 'host', 'username', 'password')
'''
opener=request.build_opener(handler)
request.install_opener(opener)
#??opener??urlopen()?????URL opener??????urlopen()????????opener???response=
google = request.urlopen('http://www.google.com')
print(google.read())
print("?????",request.getproxies())
#proxyurllib()
#FIXME ROBOT.TXT??
def __init__(self, server, port, username, password):
""" Connection Class init call """
self.server = server
self.port = port
self.username = username
self.password = password
self.url = 'https://{0}:{1}'.format(self.server,self.port)
self.api = '/api/1.1/xml'
self.authtoken = ''
self.response = None
self.sync_id = ''
#force urllib2 to not use a proxy
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
self.login()
#Gets called in __init__
def default_urllib2_opener(config):
if config is not None:
proxy_server = config.get("http", "proxy")
else:
proxy_server = None
handlers = []
if proxy_server is not None:
handlers.append(urllib2.ProxyHandler({"http": proxy_server}))
opener = urllib2.build_opener(*handlers)
if config is not None:
user_agent = config.get("http", "useragent")
else:
user_agent = None
if user_agent is None:
user_agent = default_user_agent_string()
opener.addheaders = [('User-agent', user_agent)]
return opener
def __get_handlers(self, tls_proto=None):
"""
Internal method to handle redirection and use TLS protocol.
"""
# tls_handler implements a fallback mechanism for servers that
# do not support TLS 1.1/1.2
tls_handler = (TLSHandler, TLS1Handler)[tls_proto == "tlsv1"]
handlers = [SmartRedirectHandler, tls_handler]
if self.__proxy:
proxy_handler = urllib2.ProxyHandler(
{'http': self.__proxy, 'https': self.__proxy})
handlers.append(proxy_handler)
return handlers
def getFile(cls, getfile, unpack=True):
if cls.getProxy():
proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
auth = req.HTTPBasicAuthHandler()
opener = req.build_opener(proxy, auth, req.HTTPHandler)
req.install_opener(opener)
try:
response = req.urlopen(getfile)
except:
msg = "[!] Could not fetch file %s"%getfile
if cls.exitWhenNoSource(): sys.exit(msg)
else: print(msg)
data = None
data = response.read()
# TODO: if data == text/plain; charset=utf-8, read and decode
if unpack:
if 'gzip' in response.info().get('Content-Type'):
data = gzip.GzipFile(fileobj = BytesIO(data))
elif 'bzip2' in response.info().get('Content-Type'):
data = BytesIO(bz2.decompress(data))
elif 'zip' in response.info().get('Content-Type'):
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
# In case the webserver is being generic
elif 'application/octet-stream' in response.info().get('Content-Type'):
if data[:4] == b'PK\x03\x04': # Zip
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
return (data, response)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
request = urllib.Request(url)
# print('API request url:', request.get_full_url())
if method:
request.get_method = lambda: method
token = token if token != None else token_auth_string()
request.add_header('Authorization', 'token ' + token)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if data is not None:
request.add_data(bytes(data.encode('utf8')))
# print('API request data:', request.get_data())
# print('API request header:', request.header_items())
# https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
# if https_proxy:
# opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
# urllib.ProxyHandler({'https': https_proxy}))
# urllib.install_opener(opener)
try:
with contextlib.closing(urllib.urlopen(request)) as response:
if response.code == 204: # No Content
return None
else:
return json.loads(response.read().decode('utf8', 'ignore'))
except urllib.HTTPError as err:
with contextlib.closing(err):
raise SimpleHTTPError(err.code, err.read())
Spanish_Inspire_Catastral_Downloader.py 文件源码
项目:Spanish_Inspire_Catastral_Downloader
作者: sigdeletras
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def set_proxy(self):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % (_proxy,_port),
'https': '%s:%s' % (_proxy,_port)
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
#Unset Proxy
Spanish_Inspire_Catastral_Downloader.py 文件源码
项目:Spanish_Inspire_Catastral_Downloader
作者: sigdeletras
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def unset_proxy(self):
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
#Encode URL Download
def get_html_by_urllib(url, code = 'utf-8', headers = {}, proxies = {}):
html = None
if not url.endswith('.exe') and not url.endswith('.EXE'):
page = None
is_timeout = False
try:
def timeout_handler(response):
is_timeout = True
if response:
response.close()
if proxies:
proxy_support = request.ProxyHandler(proxies)
opener = request.build_opener(proxy_support)
page = opener.open(quote(url,safe='/:?=&'), timeout = TIME_OUT)
else:
page = request.urlopen(quote(url,safe='/:?=&'), timeout = TIME_OUT)
# ????? ???read???
t = Timer(TIMER_TIME, timeout_handler, [page])
t.start()
# charset = chardet.detect(page.read())['encoding']
html = page.read().decode(code,'ignore')
t.cancel()
except Exception as e:
log.error(e)
finally:
# page and page.close()
if page and not is_timeout:
page.close()
return html and len(html) < 1024 * 1024 and html or None
def __get_handlers(self):
"""
Internal method to handle redirection and use TLS protocol.
"""
handlers = [TLS1Handler]
if self.__proxy:
proxy_handler = urllib2.ProxyHandler(
{'http': self.__proxy, 'https': self.__proxy})
handlers.append(proxy_handler)
return handlers
def set_proxy(proxy):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % proxy,
'https': '%s:%s' % proxy,
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
def unset_proxy():
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
# DEPRECATED in favor of set_proxy() and unset_proxy()
def set_http_proxy(proxy):
if proxy == None: # Use system default setting
proxy_support = request.ProxyHandler()
elif proxy == '': # Don't use any proxy
proxy_support = request.ProxyHandler({})
else: # Use proxy
proxy_support = request.ProxyHandler({'http': '%s' % proxy, 'https': '%s' % proxy})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
def set_proxy(proxy):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % proxy,
'https': '%s:%s' % proxy,
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
def unset_proxy():
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
# DEPRECATED in favor of set_proxy() and unset_proxy()
def _query(self, path, before=None, after=None):
res = []
url = '%s/lookup/%s' % (self.server, path)
params = {}
if self.limit:
params['limit'] = self.limit
if before and after:
params['time_first_after'] = after
params['time_last_before'] = before
else:
if before:
params['time_first_before'] = before
if after:
params['time_last_after'] = after
if params:
url += '?{0}'.format(urlencode(params))
req = Request(url)
req.add_header('Accept', 'application/json')
req.add_header('X-Api-Key', self.apikey)
proxy_args = {}
if self.http_proxy:
proxy_args['http'] = self.http_proxy
if self.https_proxy:
proxy_args['https'] = self.https_proxy
proxy_handler = ProxyHandler(proxy_args)
opener = build_opener(proxy_handler)
try:
http = opener.open(req)
while True:
line = http.readline()
if not line:
break
yield json.loads(line.decode('ascii'))
except (HTTPError, URLError) as e:
raise QueryError(str(e), sys.exc_traceback)
def Proxy_read(proxy_ip_list, user_agent_list):
proxy_ip = random.choice(proxy_ip_list)
print('????ip?%s'%proxy_ip)
user_agent = random.choice(user_agent_list)
print('????user_agent?%s'%user_agent)
sleep_time = random.randint(1,5)
print('?????%s' %sleep_time)
time.sleep(sleep_time)
print('????')
headers = {
'Host': 'www.baidu.com',
'User-Agent': user_agent,
'Accept': r'application/json, text/javascript, */*; q=0.01',
'Referer': r'http://www.cnblogs.com/Lands-ljk/p/5589888.html',
}
proxy_support = request.ProxyHandler({'http':proxy_ip})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
req = request.Request(r'http://www.cnblogs.com/mvc/blog/ViewCountCommentCout.aspx?postId=5589888',headers=headers)
try:
html = request.urlopen(req).read().decode('utf-8')
except Exception as e:
print('?????')
else:
print('OK!')