def get_response(url, faker = False):
logging.debug('get_response: %s' % url)
# install cookies
if cookies:
opener = request.build_opener(request.HTTPCookieProcessor(cookies))
request.install_opener(opener)
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
else:
response = request.urlopen(url)
data = response.read()
if response.info().get('Content-Encoding') == 'gzip':
data = ungzip(data)
elif response.info().get('Content-Encoding') == 'deflate':
data = undeflate(data)
response.data = data
return response
# DEPRECATED in favor of get_content()
python类install_opener()的实例源码
def authenticate(top_level_url=u'https://api.github.com'):
try:
if 'GH_AUTH_USER' not in os.environ:
try:
username = raw_input(u'Username: ')
except NameError:
username = input(u'Username: ')
else:
username = os.environ['GH_AUTH_USER']
if 'GH_AUTH_PASS' not in os.environ:
password = getpass.getpass(u'Password: ')
else:
password = os.environ['GH_AUTH_USER']
except KeyboardInterrupt:
sys.exit(u'')
try:
import urllib.request as urllib_alias
except ImportError:
import urllib2 as urllib_alias
password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, top_level_url, username, password)
handler = urllib_alias.HTTPBasicAuthHandler(password_mgr)
opener = urllib_alias.build_opener(handler)
urllib_alias.install_opener(opener)
def proxyurllib():
print(COLOR_GREEN+'-'*30+COLOR_NONE)
#TODO proxy
handler=request.ProxyHandler({'http':'http://10.112.5.173:49908'})
'''
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler()
proxy_auth_handler.add_password('realm', 'host', 'username', 'password')
'''
opener=request.build_opener(handler)
request.install_opener(opener)
#??opener??urlopen()?????URL opener??????urlopen()????????opener???response=
google = request.urlopen('http://www.google.com')
print(google.read())
print("?????",request.getproxies())
#proxyurllib()
#FIXME ROBOT.TXT??
def index(request):
if request.method == "GET":
try:
ssl._create_default_https_context = ssl._create_unverified_context
opener = wdf_urllib.build_opener(
wdf_urllib.HTTPCookieProcessor(CookieJar()))
wdf_urllib.install_opener(opener)
except:
pass
uuid = getUUID()
url = 'https://login.weixin.qq.com/qrcode/' + uuid
params = {
't': 'webwx',
'_': int(time.time()),
}
request = getRequest(url=url, data=urlencode(params))
response = wdf_urllib.urlopen(request)
context = {
'uuid': uuid,
'response': response.read(),
'delyou': '',
}
return render_to_response('index.html', context)
def get_response(url, faker = False):
logging.debug('get_response: %s' % url)
# install cookies
if cookies:
opener = request.build_opener(request.HTTPCookieProcessor(cookies))
request.install_opener(opener)
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
else:
response = request.urlopen(url)
data = response.read()
if response.info().get('Content-Encoding') == 'gzip':
data = ungzip(data)
elif response.info().get('Content-Encoding') == 'deflate':
data = undeflate(data)
response.data = data
return response
# DEPRECATED in favor of get_content()
def __init__(self, server, port, username, password):
""" Connection Class init call """
self.server = server
self.port = port
self.username = username
self.password = password
self.url = 'https://{0}:{1}'.format(self.server,self.port)
self.api = '/api/1.1/xml'
self.authtoken = ''
self.response = None
self.sync_id = ''
#force urllib2 to not use a proxy
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
self.login()
#Gets called in __init__
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
LOG.info('Loading cookies error')
# install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support,
urllib2.HTTPHandler)
urllib2.install_opener(opener)
LOG.info('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: # If no cookies found
return self.do_login(username, pwd, cookie_file)
def save_cookie(self, text, cookie_file=CONF.cookie_file):
cookie_jar2 = cookielib.LWPCookieJar()
cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2)
opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler)
urllib2.install_opener(opener2)
if six.PY3:
text = text.decode('gbk')
p = re.compile('location\.replace\(\'(.*?)\'\)')
# ???httpfox??????????????
# location.replace('http://weibo.com ?????????
# ?????????????# ????login_url?? ??????re?????
# p = re.compile('location\.replace\(\B'(.*?)'\B\)')
# ??? ??????? re?????\'???????
try:
# Search login redirection URL
login_url = p.search(text).group(1)
data = urllib2.urlopen(login_url).read()
# Verify login feedback, check whether result is TRUE
patt_feedback = 'feedBackUrlCallBack\((.*)\)'
p = re.compile(patt_feedback, re.MULTILINE)
feedback = p.search(data).group(1)
feedback_json = json.loads(feedback)
if feedback_json['result']:
cookie_jar2.save(cookie_file,
ignore_discard=True,
ignore_expires=True)
return 1
else:
return 0
except:
return 0
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
print('Loading cookies error')
#install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
print('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: #If no cookies found
return self.do_login(username, pwd, cookie_file)
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
("Referer", "https://passport.weibo.cn"),
("Origin", "https://passport.weibo.cn"),
("Host", "passport.weibo.cn")]
request.install_opener(opener)
#??
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0"),
("Referer", "http://cn.v2ex.com/signin"),
("Origin", "http://cn.v2ex.com"),
("Host", "cn.v2ex.com")]
request.install_opener(opener)
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36"),
("Referer", "https://wx.qq.com/"),
("Origin", "https://wx.qq.com/"),
("Host", "wx.qq.com")]
request.install_opener(opener)
#??uuid
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
("Referer", "https://www.zhihu.com/"),
("Origin", "https://www.zhihu.com/"),
("Host", "www.zhihu.com")]
request.install_opener(opener)
def __init__(self):
self.DEBUG = False
self.appid = 'wx782c26e4c19acffb'
self.uuid = ''
self.base_uri = ''
self.redirect_uri = ''
self.uin = ''
self.sid = ''
self.skey = ''
self.pass_ticket = ''
self.deviceId = 'e' + repr(random.random())[2:17]
self.BaseRequest = {}
self.synckey = ''
self.SyncKey = []
self.User = []
self.MemberList = []
self.ContactList = []
self.GroupList = []
self.autoReplyMode = False
self.syncHost = ''
self._handlers = dict((k, []) for k in self.message_types)
self._handlers['location'] = []
self._handlers['all'] = []
self._filters = dict()
opener = request.build_opener(request.HTTPCookieProcessor(CookieJar()))
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'),
('Referer','https://wx2.qq.com/')]
request.install_opener(opener)
def getFile(cls, getfile, unpack=True):
if cls.getProxy():
proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
auth = req.HTTPBasicAuthHandler()
opener = req.build_opener(proxy, auth, req.HTTPHandler)
req.install_opener(opener)
try:
response = req.urlopen(getfile)
except:
msg = "[!] Could not fetch file %s"%getfile
if cls.exitWhenNoSource(): sys.exit(msg)
else: print(msg)
data = None
data = response.read()
# TODO: if data == text/plain; charset=utf-8, read and decode
if unpack:
if 'gzip' in response.info().get('Content-Type'):
data = gzip.GzipFile(fileobj = BytesIO(data))
elif 'bzip2' in response.info().get('Content-Type'):
data = BytesIO(bz2.decompress(data))
elif 'zip' in response.info().get('Content-Type'):
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
# In case the webserver is being generic
elif 'application/octet-stream' in response.info().get('Content-Type'):
if data[:4] == b'PK\x03\x04': # Zip
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
return (data, response)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
request = urllib.Request(url)
# print('API request url:', request.get_full_url())
if method:
request.get_method = lambda: method
token = token if token != None else token_auth_string()
request.add_header('Authorization', 'token ' + token)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if data is not None:
request.add_data(bytes(data.encode('utf8')))
# print('API request data:', request.get_data())
# print('API request header:', request.header_items())
# https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
# if https_proxy:
# opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
# urllib.ProxyHandler({'https': https_proxy}))
# urllib.install_opener(opener)
try:
with contextlib.closing(urllib.urlopen(request)) as response:
if response.code == 204: # No Content
return None
else:
return json.loads(response.read().decode('utf8', 'ignore'))
except urllib.HTTPError as err:
with contextlib.closing(err):
raise SimpleHTTPError(err.code, err.read())
Spanish_Inspire_Catastral_Downloader.py 文件源码
项目:Spanish_Inspire_Catastral_Downloader
作者: sigdeletras
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def set_proxy(self):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % (_proxy,_port),
'https': '%s:%s' % (_proxy,_port)
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
#Unset Proxy
Spanish_Inspire_Catastral_Downloader.py 文件源码
项目:Spanish_Inspire_Catastral_Downloader
作者: sigdeletras
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def unset_proxy(self):
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
#Encode URL Download
def _downloadFile(self, toDownload):
'''
Downloads the file from the url and saves it in the directory folderPath with the name fileName.
'''
fileName, url = toDownload
# Opens the web page and creates a file in the folder folderPAth and with the name fileName
try:
#===============================================================================
# passman = request.HTTPPasswordMgrWithDefaultRealm()
# passman.add_password(self.realm, url, self.username, self.password)
#
# authhandler = request.HTTPBasicAuthHandler(passman)
# opener = request.build_opener(authhandler)
# request.install_opener(opener)
#===============================================================================
u = request.urlopen(url)
f = open(fileName, 'wb')
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
f.write(buffer)
# Closes the file
f.close()
u.close()
return os.path.getsize(fileName)
except Exception as ex:
warnings.warn(str(ex), UserWarning)
return -1
def __init__(self, bot, config_file):
super().__init__(bot)
opener = urllib_request.build_opener()
opener.addheaders = [("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) Tnybot/1.0 Chrome/55.0")]
urllib_request.install_opener(opener)
config = configparser.RawConfigParser()
config.read(config_file)
self.base_dir = config["Images"]["dir"]
self.checksum = config["Images"]["checksum"] == "True" or False
if self.checksum:
print(
"""!!!! Warning! Using checksums to detect duplicate images.
Processing image hashes may take awhile on older CPUs.
This will save disk space, but will cause an increase in downloads on restart.
"""
)
self.channels = self.get_config_values(config, "Channels")
self.merged_channels = self.get_config_values(config, "MergedChannels") or []
self.upload_channels = self.get_config_values(config, "Upload")
if not self.bot.unit_tests: # pragma: no cover
self.bot.loop.create_task(self.background())
# self.bot.loop.create_task(self.upload())
def __init__(self, bot):
super().__init__(bot)
opener = urllib_request.build_opener()
opener.addheaders = [
("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) Tnybot/1.0 Chrome/55.0")]
urllib_request.install_opener(opener)
def set_proxy(proxy):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % proxy,
'https': '%s:%s' % proxy,
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
def unset_proxy():
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
# DEPRECATED in favor of set_proxy() and unset_proxy()
def set_http_proxy(proxy):
if proxy == None: # Use system default setting
proxy_support = request.ProxyHandler()
elif proxy == '': # Don't use any proxy
proxy_support = request.ProxyHandler({})
else: # Use proxy
proxy_support = request.ProxyHandler({'http': '%s' % proxy, 'https': '%s' % proxy})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
def unset_proxy():
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
# DEPRECATED in favor of set_proxy() and unset_proxy()
def set_http_proxy(proxy):
if proxy == None: # Use system default setting
proxy_support = request.ProxyHandler()
elif proxy == '': # Don't use any proxy
proxy_support = request.ProxyHandler({})
else: # Use proxy
proxy_support = request.ProxyHandler({'http': '%s' % proxy, 'https': '%s' % proxy})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
def install_opener():
if 'http_proxy' in os.environ or 'https_proxy' in os.environ:
raise RuntimeError(
'http_proxy or https_proxy set in environment, please unset')
handlers = [WSGI_HTTPHandler()]
if WSGI_HTTPSHandler is not None:
handlers.append(WSGI_HTTPSHandler())
opener = url_lib.build_opener(*handlers)
url_lib.install_opener(opener)
return opener
def uninstall_opener():
url_lib.install_opener(None)
def Proxy_read(proxy_ip_list, user_agent_list):
proxy_ip = random.choice(proxy_ip_list)
print('????ip?%s'%proxy_ip)
user_agent = random.choice(user_agent_list)
print('????user_agent?%s'%user_agent)
sleep_time = random.randint(1,5)
print('?????%s' %sleep_time)
time.sleep(sleep_time)
print('????')
headers = {
'Host': 'www.baidu.com',
'User-Agent': user_agent,
'Accept': r'application/json, text/javascript, */*; q=0.01',
'Referer': r'http://www.cnblogs.com/Lands-ljk/p/5589888.html',
}
proxy_support = request.ProxyHandler({'http':proxy_ip})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
req = request.Request(r'http://www.cnblogs.com/mvc/blog/ViewCountCommentCout.aspx?postId=5589888',headers=headers)
try:
html = request.urlopen(req).read().decode('utf-8')
except Exception as e:
print('?????')
else:
print('OK!')
def _openURL2(self):
try:
if (self._userName and self._userPass):
password_mgr = urlconnection.HTTPPasswordMgr()
password_mgr.add_password(self._realm, self._url, self._userName, self._userPass)
auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr)
opener = urlconnection.build_opener(auth_handler)
urlconnection.install_opener(opener)
response = urlconnection.urlopen(self._url, timeout=10)
if (response.getcode() == 200):
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
self._parseStats(str_responseData)
else:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.getcode())
except HTTPError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code)
except URLError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason)
except InvalidURL as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL'
except Exception as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)