def httpConnection(url, proxy):
#TODO: habilitar autenticacion ntlm
if (proxy.auth == "ntlm"):
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = HTTPNtlmAuthHandler.HTTPNtlmAuthHandler(passman)
else:
passman = urllib2.HTTPPasswordMgr()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = urllib2.HTTPBasicAuthHandler(passman)
if (proxy.url):
proxy = urllib2.ProxyHandler({'http': proxy.url})
opener = urllib2.build_opener(proxy.url, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
return urllib2.urlopen(url)
python类HTTPPasswordMgr()的实例源码
def __init__(self, password_mgr=None, debuglevel=0):
"""Initialize an instance of a AbstractNtlmAuthHandler.
Verify operation with all default arguments.
>>> abstrct = AbstractNtlmAuthHandler()
Verify "normal" operation.
>>> abstrct = AbstractNtlmAuthHandler(urllib2.HTTPPasswordMgrWithDefaultRealm())
"""
if password_mgr is None:
password_mgr = urllib2.HTTPPasswordMgr()
self.passwd = password_mgr
self.add_password = self.passwd.add_password
self._debuglevel = debuglevel
def __init__(self, password_mgr=None, debuglevel=0):
"""Initialize an instance of a AbstractNtlmAuthHandler.
Verify operation with all default arguments.
>>> abstrct = AbstractNtlmAuthHandler()
Verify "normal" operation.
>>> abstrct = AbstractNtlmAuthHandler(urllib2.HTTPPasswordMgrWithDefaultRealm())
"""
if password_mgr is None:
password_mgr = urllib2.HTTPPasswordMgr()
self.passwd = password_mgr
self.add_password = self.passwd.add_password
self._debuglevel = debuglevel
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, URL, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(URL, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode()
self.data = self.parseClusterData(output)
else:
self.data['msg'] = str(response.getcode())
self.status = 0
except Exception as e:
self.data['msg'] = str(e)
self.status = 0
return self.data
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, URL, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(URL, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode('UTF-8')
self.data = self.parseData(output)
self.data['units'] = UNITS
else:
self.data['msg'] = str(response.getcode())
except Exception as e:
self.data['msg'] = str(e)
return self.data
### Parse Lighttp Server Stats Data
def _openURL2(self):
try:
if (self._userName and self._userPass):
password_mgr = urlconnection.HTTPPasswordMgr()
password_mgr.add_password(self._realm, self._url, self._userName, self._userPass)
auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr)
opener = urlconnection.build_opener(auth_handler)
urlconnection.install_opener(opener)
response = urlconnection.urlopen(self._url, timeout=10)
if (response.getcode() == 200):
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
self._parseStats(str_responseData)
else:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.getcode())
except HTTPError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code)
except URLError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason)
except InvalidURL as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL'
except Exception as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def _openURL3(self):
try:
if (self._userName and self._userPass):
password_mgr = urlconnection.HTTPPasswordMgr()
password_mgr.add_password(self._realm, self._url, self._userName, self._userPass)
auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr)
opener = urlconnection.build_opener(auth_handler)
urlconnection.install_opener(opener)
response = urlconnection.urlopen(self._url, timeout=10)
if (response.status == 200):
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
self._parseStats(str_responseData)
else:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.status)
except HTTPError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code)
except URLError as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason)
except InvalidURL as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL'
except Exception as e:
#self.dictInterfaceData['status'] = 0
self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, self.url, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(self.url, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode('UTF-8')
self.data = self.parseData(output)
self.data['units'] = UNITS
else:
self.data['msg'] = str(response.getcode())
except Exception as e:
print e
self.data['msg'] = str(e)
return self.data
### Parse Glassfish Server Memory Data
def metricCollector3(self):
try:
if (self._userName and self._userPass):
password_mgr = urlconnection.HTTPPasswordMgr()
password_mgr.add_password(self._realm, self._url, self._userName, self._userPass)
auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr)
opener = urlconnection.build_opener(auth_handler)
urlconnection.install_opener(opener)
response = urlconnection.urlopen(self._url, timeout=10)
if response.status == 200:
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
self._parseStats(str_responseData)
else:
self.dictApacheData['status'] = 0
self.dictApacheData['msg'] = 'Error_code' + str(response.status)
except HTTPError as e:
self.dictApacheData['status'] = 0
self.dictApacheData['msg'] = 'Error_code : HTTP Error ' + str(e.code)
except URLError as e:
self.dictApacheData['status'] = 0
self.dictApacheData['msg'] = 'Error_code : URL Error ' + str(e.reason)
except InvalidURL as e:
self.dictApacheData['status'] = 0
self.dictApacheData['msg'] = 'Error_code : Invalid URL'
except Exception as e:
self.dictApacheData['status'] = 0
self.dictApacheData['msg'] = 'Exception occured in collecting data : ' + str(e)
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, self.url, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(self.url, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode('UTF-8')
self.data = self.parseData(output)
self.data['units'] = UNITS
else:
self.data['msg'] = str(response.getcode())
except Exception as e:
self.data['msg'] = str(e)
return self.data
### Parse Lighttp Server Stats Data
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, CLUSTERURL, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(CLUSTERURL, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode()
self.data = self.parseClusterData(output)
else:
self.data['msg'] = str(response.getcode())
self.status = 0
except Exception as e:
self.data['msg'] = str(e)
self.status = 0
return self.data
def _openURL2(self,str_URLsuffix):
str_responseData = None
url = None
try:
url = self._url + str_URLsuffix
password_mgr = urlconnection.HTTPPasswordMgr()
password_mgr.add_password(None, url, self._userName, self._userPass)
auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr)
proxy = urlconnection.ProxyHandler({}) # Uses NO Proxy
opener = urlconnection.build_opener(proxy, auth_handler)
urlconnection.install_opener(opener)
response = urlconnection.urlopen(url, timeout = 5)
if response.getcode() == 200:
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
else:
self.dictEsPluginData['status'] = '0'
self.dictEsPluginData['msg'] = 'Invalid response after opening URL : ' + str(response.getcode())
except HTTPError as e:
self.dictEsPluginData['status'] = '0'
self.dictEsPluginData['msg'] ='HTTP Error '+str(e.code)
except URLError as e:
self.dictEsPluginData['status'] = '0'
self.dictEsPluginData['msg'] = 'URL Error '+str(e.reason)
except InvalidURL as e:
self.dictEsPluginData['status'] = '0'
self.dictEsPluginData['msg'] = 'Invalid URL'
except Exception as e:
self.dictEsPluginData['status'] = '0'
self.dictEsPluginData['msg'] = 'Exception while opening stats url in python 2 : ' + str(e)
finally:
return str_responseData
def getData(self):
try:
### Create Authentication Handler for the HTTP Request
pwdmgr = urllib2.HTTPPasswordMgr()
pwdmgr.add_password(None, self.url, USERNAME, PASSWORD)
auth = urllib2.HTTPBasicAuthHandler(pwdmgr)
### Create Proxy Handler for the HTTP Request
proxy = urllib2.ProxyHandler({}) # Uses NO Proxy
### Create a HTTP Request with the authentication and proxy handlers
opener = urllib2.build_opener(proxy, auth)
urllib2.install_opener(opener)
### Get HTTP Response
response = urllib2.urlopen(self.url, timeout=10)
### Parse the response data
if response.getcode() == 200:
bytedata = response.read()
output = bytedata.decode('UTF-8')
self.data = self.parseData(output)
self.data['units'] = UNITS
else:
self.data['msg'] = str(response.getcode())
except Exception as e:
self.data['msg'] = str(e)
return self.data
### Parse Glassfish Server Memory Data
def test_password_manager_default_port(self):
"""
>>> mgr = urllib2.HTTPPasswordMgr()
>>> add = mgr.add_password
The point to note here is that we can't guess the default port if there's
no scheme. This applies to both add_password and find_user_password.
>>> add("f", "http://g.example.com:80", "10", "j")
>>> add("g", "http://h.example.com", "11", "k")
>>> add("h", "i.example.com:80", "12", "l")
>>> add("i", "j.example.com", "13", "m")
>>> mgr.find_user_password("f", "g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "g.example.com")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "http://g.example.com")
('10', 'j')
>>> mgr.find_user_password("g", "h.example.com")
('11', 'k')
>>> mgr.find_user_password("g", "h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("g", "http://h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("h", "i.example.com")
(None, None)
>>> mgr.find_user_password("h", "i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("h", "http://i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("i", "j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "j.example.com:80")
(None, None)
>>> mgr.find_user_password("i", "http://j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "http://j.example.com:80")
(None, None)
"""
def test_password_manager_default_port(self):
"""
>>> mgr = urllib2.HTTPPasswordMgr()
>>> add = mgr.add_password
The point to note here is that we can't guess the default port if there's
no scheme. This applies to both add_password and find_user_password.
>>> add("f", "http://g.example.com:80", "10", "j")
>>> add("g", "http://h.example.com", "11", "k")
>>> add("h", "i.example.com:80", "12", "l")
>>> add("i", "j.example.com", "13", "m")
>>> mgr.find_user_password("f", "g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "g.example.com")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "http://g.example.com")
('10', 'j')
>>> mgr.find_user_password("g", "h.example.com")
('11', 'k')
>>> mgr.find_user_password("g", "h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("g", "http://h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("h", "i.example.com")
(None, None)
>>> mgr.find_user_password("h", "i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("h", "http://i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("i", "j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "j.example.com:80")
(None, None)
>>> mgr.find_user_password("i", "http://j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "http://j.example.com:80")
(None, None)
"""
def test_password_manager_default_port(self):
"""
>>> mgr = urllib2.HTTPPasswordMgr()
>>> add = mgr.add_password
The point to note here is that we can't guess the default port if there's
no scheme. This applies to both add_password and find_user_password.
>>> add("f", "http://g.example.com:80", "10", "j")
>>> add("g", "http://h.example.com", "11", "k")
>>> add("h", "i.example.com:80", "12", "l")
>>> add("i", "j.example.com", "13", "m")
>>> mgr.find_user_password("f", "g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "g.example.com")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "http://g.example.com")
('10', 'j')
>>> mgr.find_user_password("g", "h.example.com")
('11', 'k')
>>> mgr.find_user_password("g", "h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("g", "http://h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("h", "i.example.com")
(None, None)
>>> mgr.find_user_password("h", "i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("h", "http://i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("i", "j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "j.example.com:80")
(None, None)
>>> mgr.find_user_password("i", "http://j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "http://j.example.com:80")
(None, None)
"""
def test_password_manager_default_port(self):
"""
>>> mgr = urllib2.HTTPPasswordMgr()
>>> add = mgr.add_password
The point to note here is that we can't guess the default port if there's
no scheme. This applies to both add_password and find_user_password.
>>> add("f", "http://g.example.com:80", "10", "j")
>>> add("g", "http://h.example.com", "11", "k")
>>> add("h", "i.example.com:80", "12", "l")
>>> add("i", "j.example.com", "13", "m")
>>> mgr.find_user_password("f", "g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "g.example.com")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:100")
(None, None)
>>> mgr.find_user_password("f", "http://g.example.com:80")
('10', 'j')
>>> mgr.find_user_password("f", "http://g.example.com")
('10', 'j')
>>> mgr.find_user_password("g", "h.example.com")
('11', 'k')
>>> mgr.find_user_password("g", "h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("g", "http://h.example.com:80")
('11', 'k')
>>> mgr.find_user_password("h", "i.example.com")
(None, None)
>>> mgr.find_user_password("h", "i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("h", "http://i.example.com:80")
('12', 'l')
>>> mgr.find_user_password("i", "j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "j.example.com:80")
(None, None)
>>> mgr.find_user_password("i", "http://j.example.com")
('13', 'm')
>>> mgr.find_user_password("i", "http://j.example.com:80")
(None, None)
"""