def getAPICCookie(ip_addr, authheader, username, password):
url = 'http://'+ip_addr+'/api/aaaLogin.xml'
# create 'opener' (OpenerDirector instance)
opener = urllib2.build_opener(*handlers)
# Install the opener.
# Now all calls to urllib2.urlopen use our opener.
urllib2.install_opener(opener)
http_header["Host"]=ip_addr
xml_string = "<aaaUser name='%s' pwd='%s'/>" % (username, password)
req = urllib2.Request(url=url, data=xml_string, headers=http_header)
try:
response = urllib2.urlopen(req)
except urllib2.URLError, e:
print 'Failed to obtain auth cookie: %s' % (e)
return 0
else:
rawcookie=response.info().getheaders('Set-Cookie')
return rawcookie[0]
python类install_opener()的实例源码
def get(self, url, proxy=None):
if proxy:
proxy = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
try:
response = urllib2.urlopen(url)
except HTTPError, e:
resp = e.read()
self.status_code = e.code
except URLError, e:
resp = e.read()
self.status_code = e.code
else:
self.status_code = response.code
resp = response.read()
return resp
def download_from_url(url):
proxy = env_server.get_proxy()
if proxy['enabled']:
server = proxy['server'].replace('http://', '')
proxy_dict = {
'http': 'http://{login}:{pass}@{0}'.format(server, **proxy)
}
proxy_handler = urllib2.ProxyHandler(proxy_dict)
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
run_thread = tc.ServerThread(env_inst.ui_main)
run_thread.kwargs = dict(url=url, timeout=1)
run_thread.routine = urllib2.urlopen
run_thread.run()
result_thread = tc.treat_result(run_thread, silent=True)
if result_thread.isFailed():
return False
else:
return result_thread.result
def ipcheck(proxy):
try:
pxhandle = urllib2.ProxyHandler({"http": proxy})
opener = urllib2.build_opener(pxhandle)
urllib2.install_opener(opener)
myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read()
xs = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip))
if xs[0] == myipadress or myipadress == myip:
trans_list.append(proxy)
print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT"
elif xs == None:
pass
else:
anon_list.append(proxy)
print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0]
except KeyboardInterrupt:
print "\n\nCTRL+C - check temporary proxylist file\n\n"
sys.exit(0)
except:
pass
def _api_call(url, opener):
"""
Makes a REST call against the Couchbase API.
Args:
url (str): The URL to get, including endpoint
Returns:
list: The JSON response
"""
try:
urllib2.install_opener(opener)
resp = urllib2.urlopen(url, timeout=http_timeout)
except (urllib2.HTTPError, urllib2.URLError) as e:
collectd.error("Error making API call (%s) %s" % (e, url))
return None
try:
return json.load(resp)
except ValueError, e:
collectd.error("Error parsing JSON for API call (%s) %s" % (e, url))
return None
def download_vcpython27(self):
"""
Download vcpython27 since some Windows 7 boxes have it and some don't.
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of vcpython27... this may take a few minutes...')
with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of vcpython27 complete')
def download_python(self):
"""
Download Python
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of python')
with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of python complete')
def _install_socks_proxy_opener(proxytype, proxyaddr, proxyport=None):
""" Install a socks proxy handler so that all urllib2 requests are routed through the socks proxy. """
try:
import socks
from sockshandler import SocksiPyHandler
except ImportError:
warn('WARNING: Failed to load PySocks module. Try installing it with `pip install PySocks`.')
return
if proxytype == 4:
proxytype = socks.SOCKS4
elif proxytype == 5:
proxytype = socks.SOCKS5
else:
abort("Unknown Socks Proxy type {0}".format(proxytype))
opener = urllib2.build_opener(SocksiPyHandler(proxytype, proxyaddr, proxyport))
urllib2.install_opener(opener)
def __init__(self, url, proxy, cafile):
self.url = url
self.proxy = proxy
if proxy:
logging.info("Using HTTPS proxy: " + proxy)
proxy_handler = urllib2.ProxyHandler({'https': proxy})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
self.kwargs = {}
if cafile and hasattr(ssl, "create_default_context"):
logging.info("Using CA file: " + cafile)
ctx = ssl.create_default_context()
ctx.load_verify_locations(cafile = cafile)
self.kwargs['context'] = ctx
# given an infoMap returned by the local node, call up the home server
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def __openrequest__(self, req):
# Opens the passed in HTTP request
if self.debug:
print "\n----- REQUEST -----"
handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
print "- API ENDPOINT: "+req.get_full_url()
print "- REQUEST METHOD: "+req.get_method()
print "- AUTHORIZATION HEADER: "+req.get_header("Authorization")
print "\n----- REQUEST DATA -----"
print req.get_data()
res = urllib2.urlopen(req)
out = res.read()
if self.debug:
print "\n----- REQUEST INFO -----"
print res.info()
print "\n----- RESPONSE -----"
print out
return out
def add_proxy(self, addr, proxy_type='all',
user=None, password=None):
"""Add proxy"""
if proxy_type == 'all':
self.proxies = {
'http': addr,
'https': addr,
'ftp': addr
}
else:
self.proxies[proxy_type] = addr
proxy_handler = urllib2.ProxyHandler(self.proxies)
self.__build_opener()
self.opener.add_handler(proxy_handler)
if user and password:
pwd_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_manager.add_password(None, addr, user, password)
proxy_auth_handler = urllib2.ProxyBasicAuthHandler(pwd_manager)
self.opener.add_handler(proxy_auth_handler)
urllib2.install_opener(self.opener)
def check_single_proxy_status(self, proxy_address, domain_check):
try:
parse = urlparse(proxy_address)
proxy_scheme = parse.scheme
proxy = str(parse.hostname) + ':' + str(parse.port)
proxy_handler = urllib2.ProxyHandler({ proxy_scheme: proxy})
opener = urllib2.build_opener(proxy_handler)
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')]
urllib2.install_opener(opener)
req = urllib2.Request(domain_check)
start_time = time.time()
sock = urllib2.urlopen(req)
end_time = time.time()
diff_time = round(end_time - start_time, 3)
log.console_log(Y + "{}[+] {} OK! Response Time : {}s".format(Y, proxy_address, str(diff_time), W ))
return 'ok'
except urllib2.HTTPError, e:
print('Error code: ' + str(e.code))
return e.code
except Exception, detail:
print('ERROR ' + str(detail))
return 1
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def authenticate(top_level_url=u'https://api.github.com'):
try:
if 'GH_AUTH_USER' not in os.environ:
try:
username = raw_input(u'Username: ')
except NameError:
username = input(u'Username: ')
else:
username = os.environ['GH_AUTH_USER']
if 'GH_AUTH_PASS' not in os.environ:
password = getpass.getpass(u'Password: ')
else:
password = os.environ['GH_AUTH_USER']
except KeyboardInterrupt:
sys.exit(u'')
try:
import urllib.request as urllib_alias
except ImportError:
import urllib2 as urllib_alias
password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, top_level_url, username, password)
handler = urllib_alias.HTTPBasicAuthHandler(password_mgr)
opener = urllib_alias.build_opener(handler)
urllib_alias.install_opener(opener)
def open(aurl,post='',Referer=''):
#proxy = 'http://127.0.0.1:8088'
#opener = urllib2.build_opener( urllib2.ProxyHandler({'http':proxy}) )
#urllib2.install_opener(opener)
if post!='':
test_data_urlencode = urllib.urlencode(post)
req = urllib2.Request(url=aurl,data = test_data_urlencode)
else:
req = urllib2.Request(url=aurl)
if Referer!='':
req.add_header('Referer',Referer)
if aspxsession!="":
req.add_header('Cookie',aspxsession)
res_data = urllib2.urlopen(req)
return res_data
#????????session
def open(aurl,post='',Referer=''):
#proxy = 'http://127.0.0.1:8088'
#opener = urllib2.build_opener( urllib2.ProxyHandler({'http':proxy}) )
#urllib2.install_opener(opener)
if post!='':
test_data_urlencode = urllib.urlencode(post)
req = urllib2.Request(url=aurl,data = test_data_urlencode)
else:
req = urllib2.Request(url=aurl)
if Referer!='':
req.add_header('Referer',Referer)
if aspxsession!="":
req.add_header('Cookie',aspxsession)
res_data = urllib2.urlopen(req)
return res_data
#????????session
def error_handler(url):
global HANDLE_ERRORS
orig = HANDLE_ERRORS
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)
pos = {0: 'off', 1: 'on'}
for i in (0, 1):
print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
HANDLE_ERRORS = i
try:
fo = urllib2.urlopen(url)
foo = fo.read()
fo.close()
try: status, reason = fo.status, fo.reason
except AttributeError: status, reason = None, None
except IOError, e:
print " EXCEPTION: %s" % e
raise
else:
print " status = %s, reason = %s" % (status, reason)
HANDLE_ERRORS = orig
hosts = keepalive_handler.open_connections()
print "open connections:", hosts
keepalive_handler.close_all()
def comp(N, url):
print ' making %i connections to:\n %s' % (N, url)
sys.stdout.write(' first using the normal urllib handlers')
# first use normal opener
opener = urllib2.build_opener()
urllib2.install_opener(opener)
t1 = fetch(N, url)
print ' TIME: %.3f s' % t1
sys.stdout.write(' now using the keepalive handler ')
# now install the keepalive handler and try again
opener = urllib2.build_opener(HTTPHandler())
urllib2.install_opener(opener)
t2 = fetch(N, url)
print ' TIME: %.3f s' % t2
print ' improvement factor: %.2f' % (t1/t2, )
def set_cookie(cookie_file):
"""
@brief Load cookie from file
@param cookie_file
@param user_agent
@return cookie, LWPCookieJar
"""
cookie = cookielib.LWPCookieJar(cookie_file)
try:
cookie.load(ignore_discard=True)
except:
Log.error(traceback.format_exc())
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie))
opener.addheaders = Constant.HTTP_HEADER_USERAGENT
urllib2.install_opener(opener)
return cookie
def adminLogin(username, password):
print '--- Initializing ---'
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
urllib2.install_opener(opener)
print '--- Geting Cookie ---'
link = urllib2.urlopen('http://www.baidu.com/')
print '--- Geting Token ---'
token = eval(urllib2.urlopen('https://passport.baidu.com/v2/api/?getapi&tpl=pp&apiver=v3&class=login').read())['data']['token']
print "Token: " + token
print '--- Sign In Posting ---'
postdata = {
'token' : token,
'tpl' : 'pp',
'username' : username,
'password' : password,
}
sendRequest('https://passport.baidu.com/v2/api/?login', postdata)
link.close()
return
def cache_resource(self, url):
if self.proxy_url is not None:
proxy = urllib2.ProxyHandler({'http': self.proxy_url})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
request = urllib2.Request(url)
user_agent = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.35 Safari/535.1'
request.add_header('User-Agent', user_agent)
handler = urllib2.urlopen(request, timeout=self.http_timeout)
try:
resource_type = MIME_TYPES[handler.headers.get('Content-Type')]
if not resource_type:
raise UnsupportedResourceFormat("Resource format not found")
except KeyError:
raise UnsupportedResourceFormat("Resource format not supported")
etag = handler.headers.get('ETag')
last_modified = handler.headers.get('Last-Modified')
resource_key = self.get_resource_key(url)
stream = handler.read()
self.update_resource_params(resource_key, resource_type, etag, last_modified, stream)
return stream, resource_type
def _urllib2_fetch(self, uri, params, method=None):
# install error processor to handle HTTP 201 response correctly
if self.opener == None:
self.opener = urllib2.build_opener(HTTPErrorProcessor)
urllib2.install_opener(self.opener)
if method and method == 'GET':
uri = self._build_get_uri(uri, params)
req = PlivoUrlRequest(uri)
else:
req = PlivoUrlRequest(uri, urllib.urlencode(params))
if method and (method == 'DELETE' or method == 'PUT'):
req.http_method = method
authstring = base64.encodestring('%s:%s' % (self.auth_id, self.auth_token))
authstring = authstring.replace('\n', '')
req.add_header("Authorization", "Basic %s" % authstring)
response = urllib2.urlopen(req)
return response.read()
def _urllib2_fetch(self, uri, params, method=None):
# install error processor to handle HTTP 201 response correctly
if self.opener == None:
self.opener = urllib2.build_opener(HTTPErrorProcessor)
urllib2.install_opener(self.opener)
if method and method == 'GET':
uri = self._build_get_uri(uri, params)
req = PlivoUrlRequest(uri)
else:
req = PlivoUrlRequest(uri, urllib.urlencode(params))
if method and (method == 'DELETE' or method == 'PUT'):
req.http_method = method
authstring = base64.encodestring('%s:%s' % (self.auth_id, self.auth_token))
authstring = authstring.replace('\n', '')
req.add_header("Authorization", "Basic %s" % authstring)
response = urllib2.urlopen(req)
return response.read()
def _urllib2_fetch(self, uri, params, method=None):
# install error processor to handle HTTP 201 response correctly
if self.opener == None:
self.opener = urllib2.build_opener(HTTPErrorProcessor)
urllib2.install_opener(self.opener)
if method and method == 'GET':
uri = self._build_get_uri(uri, params)
req = PlivoUrlRequest(uri)
else:
req = PlivoUrlRequest(uri, urllib.urlencode(params))
if method and (method == 'DELETE' or method == 'PUT'):
req.http_method = method
authstring = base64.encodestring('%s:%s' % (self.auth_id, self.auth_token))
authstring = authstring.replace('\n', '')
req.add_header("Authorization", "Basic %s" % authstring)
response = urllib2.urlopen(req)
return response.read()
def members(limit=2000):
url='https://graph.facebook.com/v2.7/'+FACEBOOK_GROUP+'/members?fields=picture,name&limit=%s&access_token=%s' % (limit,TOKEN)
user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'
values = {}
headers = { 'Authorization': BEAR}
h = MyHTTPRedirectHandler()
opener = urllib2.build_opener(h)
urllib2.install_opener(opener)
data = urllib.urlencode(values)
json_data = ""
try:
req = urllib2.build_opener(h)
req = urllib2.Request(url)
response = urllib2.urlopen(req)
the_page = response.read()
json_data = json.loads(the_page)
except:
print("Error reading data members")
return json_data