def __init__(self, username, password=None):
# Get password if necessary
if password is None:
password = getpass()
# Get URL for the database
self.db_url = "http://galaxy-catalogue.dur.ac.uk:8080/Eagle"
# Set up authentication and cookies
self.password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self.password_mgr.add_password(None, self.db_url, username, password)
self.opener = urllib2.OpenerDirector()
self.auth_handler = urllib2.HTTPBasicAuthHandler(self.password_mgr)
self.cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
## This functions executes an SQL query on the database and returns the result as a record array.
python类HTTPBasicAuthHandler()的实例源码
def __init__(self, username, password=None):
# Get password if necessary
if password is None:
password = getpass()
# Get URL for the database
self.db_url = "http://galaxy-catalogue.dur.ac.uk:8080/Eagle"
# Set up authentication and cookies
self.password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self.password_mgr.add_password(None, self.db_url, username, password)
self.opener = urllib2.OpenerDirector()
self.auth_handler = urllib2.HTTPBasicAuthHandler(self.password_mgr)
self.cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
## This functions executes an SQL query on the database and returns the result as a record array.
def __init__(self, *args, **kwargs):
urllib2.HTTPBasicAuthHandler.__init__(self, *args, **kwargs)
self.retried_req = set()
self.retried_count = 0
def http_error_auth_reqed(self, auth_header, host, req, headers):
# Reset the retry counter once for each request.
if hash(req) not in self.retried_req:
self.retried_req.add(hash(req))
self.retried_count = 0
else:
if self.retried_count > 5:
raise urllib2.HTTPError(req.get_full_url(), 401, "basic auth failed",
headers, None)
else:
self.retried_count += 1
return urllib2.HTTPBasicAuthHandler.http_error_auth_reqed(
self, auth_header, host, req, headers)
def call_api(self, path):
'''Call the REST API and convert the results into JSON.'''
url = '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host_name,
self.port, path)
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, self.user_name, self.password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
logging.debug('Issue a rabbit API call to get data on ' + path)
return json.loads(urllib2.build_opener(handler).open(url).read())
def test_urlopen():
# urllib
url = urllib.quote('file:///bin/ls')
urllib.urlopen(url, 'blah', 32)
urllib.urlretrieve('file:///bin/ls', '/bin/ls2')
opener = urllib.URLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
opener = urllib.FancyURLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
# urllib2
handler = urllib2.HTTPBasicAuthHandler()
handler.add_password(realm='test',
uri='http://mysite.com',
user='bob')
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
urllib2.urlopen('file:///bin/ls')
urllib2.Request('file:///bin/ls')
# Python 3
urllib.request.urlopen('file:///bin/ls')
urllib.request.urlretrieve('file:///bin/ls', '/bin/ls2')
opener = urllib.request.URLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
opener = urllib.request.FancyURLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
# Six
six.moves.urllib.request.urlopen('file:///bin/ls')
six.moves.urllib.request.urlretrieve('file:///bin/ls', '/bin/ls2')
opener = six.moves.urllib.request.URLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
opener = six.moves.urllib.request.FancyURLopener()
opener.open('file:///bin/ls')
opener.retrieve('file:///bin/ls')
def getUrl(self,url, ischunkDownloading=False):
try:
post=None
print 'url',url
openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler)
#cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar)
#openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler())
if post:
req = urllib2.Request(url, post)
else:
req = urllib2.Request(url)
ua_header=False
if self.clientHeader:
for n,v in self.clientHeader:
req.add_header(n,v)
if n=='User-Agent':
ua_header=True
if not ua_header:
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0')
#response = urllib2.urlopen(req)
if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ):
req.set_proxy(self.proxy, 'http')
response = openner.open(req)
data=response.read()
return data
except:
print 'Error in getUrl'
traceback.print_exc()
return None
def auth (usr, pwd, dhusAlt=None):
"""Globally install Basic Auth, and set site specific string constants."""
SITE["NAME"] = dhusAlt if dhusAlt is not None else "SciHub"
site,collspec = "https://scihub.copernicus.eu/", "producttype:S2MSI%s"
if SITE["NAME"]=="CODE-DE": site,collspec = "https://code-de.org/", "platformname:Sentinel-2"
SITE["BASE"] = site+"dhus/"
# pm=urllib2.HTTPPasswordMgrWithDefaultRealm()
# pm.add_password(None, SITE["BASE"], usr, pwd)
# urllib2.install_opener(urllib2.build_opener(urllib2.HTTPBasicAuthHandler(pm)))
#...does not work transparently in combination with proxy support => workaround: urlOpen().
import base64; SITE["AUTH"] = "Basic " + base64.b64encode("%s:%s" % (usr, pwd))
SITE["SEARCH"] = SITE["BASE"] + "search?format=xml&sortedby=beginposition&order=desc&rows=%d&q=%s" % (ROWSSTEP, collspec)
product = SITE["BASE"] + "odata/v1/Products('%s')/"
SITE["CHECKSUM"], SITE["SAFEZIP"], SITE["SAFEROOT"] = product+"Checksum/Value/$value", product+"$value", product+"Nodes('%s.SAFE')/"
oauth20_account.py 文件源码
项目:rekall-agent-server
作者: rekall-innovations
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def authorize_basic(user=None, password=None, realm=None, uri=None):
''' Call before calling download '''
if user and password and realm and uri:
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password(realm=realm, uri=uri, user=user, passwd=password)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
def _login_BA(self):
try:
# Creates a PasswordMgr instance
passmanager = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmanager.add_password(None, self.url, self.username, self.password)
# Creates an auth handling object and builds it with opener
auth = urllib2.HTTPBasicAuthHandler(passmanager)
opener = urllib2.build_opener(auth)
response = opener.open(self.url, timeout=8)
data = response.read()
response.close()
return data
except Exception, e:
if 'Error 401' in str(e):
raise Exception('Login credentials incorrect.')
def __init__(self, *args, **kwargs):
urllib2.HTTPBasicAuthHandler.__init__(self, *args, **kwargs)
self.retried_req = set()
self.retried_count = 0
def http_error_auth_reqed(self, auth_header, host, req, headers):
# Reset the retry counter once for each request.
if hash(req) not in self.retried_req:
self.retried_req.add(hash(req))
self.retried_count = 0
else:
if self.retried_count > 5:
raise urllib2.HTTPError(req.get_full_url(), 401, "basic auth failed",
headers, None)
else:
self.retried_count += 1
return urllib2.HTTPBasicAuthHandler.http_error_auth_reqed(
self, auth_header, host, req, headers)
def test_basic_auth_success(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
try:
self.assertTrue(urllib2.urlopen(self.server_url))
except urllib2.HTTPError:
self.fail("Basic Auth Failed for url: %s" % self.server_url)
except Exception as e:
raise e
def test_basic_auth_httperror(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER,
self.INCORRECT_PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
(quote_char, realm, quote_char) )
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def _open_url(self, url):
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, self._routes.host, '', self._api_key)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener.open(url)
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
(quote_char, realm, quote_char) )
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def __init__(self, *args, **kwargs):
urllib2.HTTPBasicAuthHandler.__init__(self, *args, **kwargs)
self.retried_req = set()
self.retried_count = 0