def download_from_url(url):
proxy = env_server.get_proxy()
if proxy['enabled']:
server = proxy['server'].replace('http://', '')
proxy_dict = {
'http': 'http://{login}:{pass}@{0}'.format(server, **proxy)
}
proxy_handler = urllib2.ProxyHandler(proxy_dict)
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
run_thread = tc.ServerThread(env_inst.ui_main)
run_thread.kwargs = dict(url=url, timeout=1)
run_thread.routine = urllib2.urlopen
run_thread.run()
result_thread = tc.treat_result(run_thread, silent=True)
if result_thread.isFailed():
return False
else:
return result_thread.result
python类HTTPBasicAuthHandler()的实例源码
def run(self):
password = getword()
try:
print "-"*12
print "User:",username,"Password:",password
req = urllib2.Request(sys.argv[1])
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, sys.argv[1], username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
fd = opener.open(req)
print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n"
print "[+] Retrieved", fd.geturl()
info = fd.info()
for key, value in info.items():
print "%s = %s" % (key, value)
sys.exit(2)
except (urllib2.HTTPError,socket.error):
pass
def run(self):
username, password = getword()
try:
print "-"*12
print "User:",username,"Password:",password
req = urllib2.Request(sys.argv[1])
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, sys.argv[1], username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
fd = opener.open(req)
print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n"
print "Retrieved", fd.geturl()
info = fd.info()
for key, value in info.items():
print "%s = %s" % (key, value)
sys.exit(2)
except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg:
print "An error occurred:", msg
pass
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def authenticate(top_level_url=u'https://api.github.com'):
try:
if 'GH_AUTH_USER' not in os.environ:
try:
username = raw_input(u'Username: ')
except NameError:
username = input(u'Username: ')
else:
username = os.environ['GH_AUTH_USER']
if 'GH_AUTH_PASS' not in os.environ:
password = getpass.getpass(u'Password: ')
else:
password = os.environ['GH_AUTH_USER']
except KeyboardInterrupt:
sys.exit(u'')
try:
import urllib.request as urllib_alias
except ImportError:
import urllib2 as urllib_alias
password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, top_level_url, username, password)
handler = urllib_alias.HTTPBasicAuthHandler(password_mgr)
opener = urllib_alias.build_opener(handler)
urllib_alias.install_opener(opener)
def test_basic_auth_with_unquoted_realm(self):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
msg = "Basic Auth Realm was unquoted"
with test_support.check_warnings((msg, UserWarning)):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def test_basic_auth_with_unquoted_realm(self):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
msg = "Basic Auth Realm was unquoted"
with test_support.check_warnings((msg, UserWarning)):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def __init__(self, configuration):
self.setup(configuration)
self.echo = None
if "ECHO" in configuration:
self.echo = configuration['ECHO']
if self.proxy_scheme is not None and self.proxy_host is not None and \
self.proxy_port is not None:
credentials = ""
if self.proxy_username is not None and self.proxy_password is not None:
credentials = self.proxy_username + ":" + self.proxy_password + "@"
proxyDict = {
self.proxy_scheme: self.proxy_scheme + "://" + credentials +
self.proxy_host + ":" + self.proxy_port
}
proxy = urllib2.ProxyHandler(proxyDict)
if credentials != '':
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler)
else:
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
def test_basic_auth_with_unquoted_realm(self):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
msg = "Basic Auth Realm was unquoted"
with test_support.check_warnings((msg, UserWarning)):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def update_tags(self,tags,expression):
# support cases where we might be doing basic auth through a proxy
if self.MOLOCH_AUTH == "basic":
auth_handler = urllib2.HTTPPasswordMgrWithDefaultRealm()
auth_handler.add_password(None, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
handler = urllib2.HTTPBasicAuthHandler(auth_handler)
opener = urllib2.build_opener(handler)
else:
auth_handler = urllib2.HTTPDigestAuthHandler()
auth_handler.add_password(self.MOLOCH_REALM, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
opener = urllib2.build_opener(auth_handler)
data = urllib.urlencode({'tags' : tags})
qstring = urllib.urlencode({'date' : "-1",'expression' : expression})
TAG_URL = self.MOLOCH_URL + 'addTags?' + qstring
try:
response = opener.open(TAG_URL,data=data)
if response.code == 200:
plain_answer = response.read()
json_data = json.loads(plain_answer)
except Exception, e:
log.warning("Moloch: Unable to update tags %s" % (e))
def test_basic_auth_with_unquoted_realm(self):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
msg = "Basic Auth Realm was unquoted"
with test_support.check_warnings((msg, UserWarning)):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def __init__(self, base_url, exc_class=None, logger=None):
"""
@param base_url: The base url to the API.
@param exc_class: An exception class to handle non-200 results.
Creates an HTTP(S) client to connect to the Cloudera Manager API.
"""
self._base_url = base_url.rstrip('/')
self._exc_class = exc_class or RestException
self._logger = logger or LOG
self._headers = { }
# Make a basic auth handler that does nothing. Set credentials later.
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
authhandler = urllib2.HTTPBasicAuthHandler(self._passmgr)
# Make a cookie processor
cookiejar = cookielib.CookieJar()
self._opener = urllib2.build_opener(
HTTPErrorProcessor(),
urllib2.HTTPCookieProcessor(cookiejar),
authhandler)
def httpConnection(url, proxy):
#TODO: habilitar autenticacion ntlm
if (proxy.auth == "ntlm"):
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = HTTPNtlmAuthHandler.HTTPNtlmAuthHandler(passman)
else:
passman = urllib2.HTTPPasswordMgr()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = urllib2.HTTPBasicAuthHandler(passman)
if (proxy.url):
proxy = urllib2.ProxyHandler({'http': proxy.url})
opener = urllib2.build_opener(proxy.url, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
return urllib2.urlopen(url)
def threader(site):
username, password = getword()
global logins
try:
print "-"*12
print "User:",username,"Password:",password
req = urllib2.Request(site)
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, site, username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
fd = opener.open(req)
site = urllib2.urlopen(fd.geturl()).read()
print "\n[+] Checking the authenticity of the login...\n"
if not re.search(('denied'), site.lower()):
print "\t\n\n[+] Username:",username,"Password:",password,"----- Login successful!!!\n\n"
print "[+] Writing Successful Login:",sys.argv[5],"\n"
logins +=1
file = open(sys.argv[5], "a")
file.writelines("Site: "+site+" Username: "+username+ " Password: "+password+"\n")
file.close()
print "Retrieved", fd.geturl()
info = fd.info()
for key, value in info.items():
print "%s = %s" % (key, value)
else:
print "- Redirection"
except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg:
print "An error occurred:", msg
pass
def run(self):
username, password = getword()
try:
print "-"*12
print "User:",username,"Password:",password
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password("cPanel", server, base64encodestring(username)[:-1], base64encodestring(password)[:-1])
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
urllib2.urlopen(server)
print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n"
except (urllib2.HTTPError, httplib.BadStatusLine), msg:
#print "An error occurred:", msg
pass
def __init__(self, api_key, url=GCM_URL, proxy=None):
""" api_key : google api key
url: url of gcm service.
proxy: can be string "http://host:port" or dict {'https':'host:port'}
"""
self.api_key = api_key
self.url = url
if proxy:
if isinstance(proxy, basestring):
protocol = url.split(':')[0]
proxy = {protocol: proxy}
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(urllib2.ProxyHandler(proxy), auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
#openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def test_basic_auth_success(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
try:
self.assertTrue(urllib2.urlopen(self.server_url))
except urllib2.HTTPError:
self.fail("Basic Auth Failed for url: %s" % self.server_url)
except Exception as e:
raise e
def test_basic_auth_httperror(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER,
self.INCORRECT_PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
(quote_char, realm, quote_char) )
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def test_basic_auth_success(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
try:
self.assertTrue(urllib2.urlopen(self.server_url))
except urllib2.HTTPError:
self.fail("Basic Auth Failed for url: %s" % self.server_url)
except Exception as e:
raise e
def test_basic_auth_httperror(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER,
self.INCORRECT_PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
(quote_char, realm, quote_char) )
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)