def update_tags(self,tags,expression):
# support cases where we might be doing basic auth through a proxy
if self.MOLOCH_AUTH == "basic":
auth_handler = urllib2.HTTPPasswordMgrWithDefaultRealm()
auth_handler.add_password(None, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
handler = urllib2.HTTPBasicAuthHandler(auth_handler)
opener = urllib2.build_opener(handler)
else:
auth_handler = urllib2.HTTPDigestAuthHandler()
auth_handler.add_password(self.MOLOCH_REALM, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
opener = urllib2.build_opener(auth_handler)
data = urllib.urlencode({'tags' : tags})
qstring = urllib.urlencode({'date' : "-1",'expression' : expression})
TAG_URL = self.MOLOCH_URL + 'addTags?' + qstring
try:
response = opener.open(TAG_URL,data=data)
if response.code == 200:
plain_answer = response.read()
json_data = json.loads(plain_answer)
except Exception, e:
log.warning("Moloch: Unable to update tags %s" % (e))
python类HTTPDigestAuthHandler()的实例源码
def get_digest_authorization(self, path):
"""
Returns the digest auth header
"""
u = self.parsed_url
if self.digest_auth_handler is None:
pw_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
pw_manager.add_password(None, self.conn.host, self.username, self.password)
self.digest_auth_handler = urllib2.HTTPDigestAuthHandler(pw_manager)
# building a dummy request that gets never sent,
# needed for call to auth_handler.get_authorization
scheme = u.scheme == 'webdavs' and 'https' or 'http'
hostname = u.port and "%s:%s" % (u.hostname, u.port) or u.hostname
dummy_url = "%s://%s%s" % (scheme, hostname, path)
dummy_req = CustomMethodRequest(self.conn._method, dummy_url)
auth_string = self.digest_auth_handler.get_authorization(dummy_req,
self.digest_challenge)
return 'Digest %s' % auth_string
def __init__(self, username, password, # pylint: disable=E1002
verbose=0, use_datetime=False, https_handler=None):
"""Initialize"""
if PY2:
Transport.__init__(self, use_datetime=False)
else:
super().__init__(use_datetime=False)
self._username = username
self._password = password
self._use_datetime = use_datetime
self.verbose = verbose
self._username = username
self._password = password
self._handlers = []
if self._username and self._password:
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
else:
self._auth_handler = None
self._passmgr = None
if https_handler:
self._handlers.append(https_handler)
self._scheme = 'https'
else:
self._scheme = 'http'
if self._auth_handler:
self._handlers.append(self._auth_handler)
def __init__(self, username, password, #pylint: disable=E1002
verbose=0, use_datetime=False, https_handler=None):
"""Initialize"""
if PY2:
Transport.__init__(self, use_datetime=False)
else:
super().__init__(use_datetime=False)
self._username = username
self._password = password
self._use_datetime = use_datetime
self.verbose = verbose
self._username = username
self._password = password
self._handlers = []
if self._username and self._password:
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
else:
self._auth_handler = None
self._passmgr = None
if https_handler:
self._handlers.append(https_handler)
self._scheme = 'https'
else:
self._scheme = 'http'
if self._auth_handler:
self._handlers.append(self._auth_handler)
def __init__(self, username, password, #pylint: disable=E1002
verbose=0, use_datetime=False, https_handler=None):
"""Initialize"""
if PY2:
Transport.__init__(self, use_datetime=False)
else:
super().__init__(use_datetime=False)
self._username = username
self._password = password
self._use_datetime = use_datetime
self.verbose = verbose
self._username = username
self._password = password
self._handlers = []
if self._username and self._password:
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
else:
self._auth_handler = None
self._passmgr = None
if https_handler:
self._handlers.append(https_handler)
self._scheme = 'https'
else:
self._scheme = 'http'
if self._auth_handler:
self._handlers.append(self._auth_handler)
def __setHTTPAuthentication():
"""
Check and set the HTTP authentication method (Basic or Digest),
username and password to perform HTTP requests with.
"""
global authHandler
if not conf.aType and not conf.aCred:
return
elif conf.aType and not conf.aCred:
errMsg = "you specified the HTTP Authentication type, but "
errMsg += "did not provide the credentials"
raise sqlmapSyntaxException, errMsg
elif not conf.aType and conf.aCred:
errMsg = "you specified the HTTP Authentication credentials, "
errMsg += "but did not provide the type"
raise sqlmapSyntaxException, errMsg
parseTargetUrl()
debugMsg = "setting the HTTP Authentication type and credentials"
logger.debug(debugMsg)
aTypeLower = conf.aType.lower()
if aTypeLower not in ( "basic", "digest" ):
errMsg = "HTTP Authentication type value must be "
errMsg += "Basic or Digest"
raise sqlmapSyntaxException, errMsg
aCredRegExp = re.search("^(.*?)\:(.*?)$", conf.aCred)
if not aCredRegExp:
errMsg = "HTTP Authentication credentials value must be "
errMsg += "in format username:password"
raise sqlmapSyntaxException, errMsg
authUsername = aCredRegExp.group(1)
authPassword = aCredRegExp.group(2)
passwordMgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passwordMgr.add_password(None, "%s://%s" % (conf.scheme, conf.hostname), authUsername, authPassword)
if aTypeLower == "basic":
authHandler = urllib2.HTTPBasicAuthHandler(passwordMgr)
elif aTypeLower == "digest":
authHandler = urllib2.HTTPDigestAuthHandler(passwordMgr)
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.oscaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
self.port = str(config.oscaminfo.port.value)
self.username = str(config.oscaminfo.username.value)
self.password = str(config.oscaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return False, err
else:
return True, data
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.ncaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.ncaminfo.ip.value)
self.port = str(config.ncaminfo.port.value)
self.username = str(config.ncaminfo.username.value)
self.password = str(config.ncaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/ncamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/ncamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/ncamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return False, err
else:
return True, data
def openWebIF(self, part = None, reader = None):
if config.oscaminfo.userdatafromconf.getValue():
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return (False, udata)
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.getValue())
self.port = config.oscaminfo.port.getValue()
self.username = config.oscaminfo.username.getValue()
self.password = config.oscaminfo.password.getValue()
if part is None:
self.url = "http://%s:%s/oscamapi.html?part=status" % ( self.ip, self.port )
else:
self.url = "http://%s:%s/oscamapi.html?part=%s" % (self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "http://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.ip, self.port, part, reader )
print "URL=%s" % self.url
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return (False, err)
else:
return (True, data)
def test_basic_and_digest_auth_handlers(self):
# HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
# Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
# try digest first (since it's the strongest auth scheme), so we record
# order of calls here to check digest comes first:
class RecordingOpenerDirector(OpenerDirector):
def __init__(self):
OpenerDirector.__init__(self)
self.recorded = []
def record(self, info):
self.recorded.append(info)
class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("digest")
urllib2.HTTPDigestAuthHandler.http_error_401(self,
*args, **kwds)
class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("basic")
urllib2.HTTPBasicAuthHandler.http_error_401(self,
*args, **kwds)
opener = RecordingOpenerDirector()
password_manager = MockPasswordManager()
digest_handler = TestDigestAuthHandler(password_manager)
basic_handler = TestBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(basic_handler)
opener.add_handler(digest_handler)
opener.add_handler(http_handler)
# check basic auth isn't blocked by digest handler failing
self._test_basic_auth(opener, basic_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected",
)
# check digest was tried before basic (twice, because
# _test_basic_auth called .open() twice)
self.assertEqual(opener.recorded, ["digest", "basic"]*2)
def test_basic_and_digest_auth_handlers(self):
# HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
# Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
# try digest first (since it's the strongest auth scheme), so we record
# order of calls here to check digest comes first:
class RecordingOpenerDirector(OpenerDirector):
def __init__(self):
OpenerDirector.__init__(self)
self.recorded = []
def record(self, info):
self.recorded.append(info)
class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("digest")
urllib2.HTTPDigestAuthHandler.http_error_401(self,
*args, **kwds)
class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("basic")
urllib2.HTTPBasicAuthHandler.http_error_401(self,
*args, **kwds)
opener = RecordingOpenerDirector()
password_manager = MockPasswordManager()
digest_handler = TestDigestAuthHandler(password_manager)
basic_handler = TestBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(basic_handler)
opener.add_handler(digest_handler)
opener.add_handler(http_handler)
# check basic auth isn't blocked by digest handler failing
self._test_basic_auth(opener, basic_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected",
)
# check digest was tried before basic (twice, because
# _test_basic_auth called .open() twice)
self.assertEqual(opener.recorded, ["digest", "basic"]*2)
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.oscaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
self.port = str(config.oscaminfo.port.value)
self.username = str(config.oscaminfo.username.value)
self.password = str(config.oscaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[OScaminfo] Fehler: %s" % err
return False, err
else:
return True, data
def test_basic_and_digest_auth_handlers(self):
# HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
# Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
# try digest first (since it's the strongest auth scheme), so we record
# order of calls here to check digest comes first:
class RecordingOpenerDirector(OpenerDirector):
def __init__(self):
OpenerDirector.__init__(self)
self.recorded = []
def record(self, info):
self.recorded.append(info)
class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("digest")
urllib2.HTTPDigestAuthHandler.http_error_401(self,
*args, **kwds)
class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("basic")
urllib2.HTTPBasicAuthHandler.http_error_401(self,
*args, **kwds)
opener = RecordingOpenerDirector()
password_manager = MockPasswordManager()
digest_handler = TestDigestAuthHandler(password_manager)
basic_handler = TestBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(basic_handler)
opener.add_handler(digest_handler)
opener.add_handler(http_handler)
# check basic auth isn't blocked by digest handler failing
self._test_basic_auth(opener, basic_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected",
)
# check digest was tried before basic (twice, because
# _test_basic_auth called .open() twice)
self.assertEqual(opener.recorded, ["digest", "basic"]*2)
def test_basic_and_digest_auth_handlers(self):
# HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
# Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
# try digest first (since it's the strongest auth scheme), so we record
# order of calls here to check digest comes first:
class RecordingOpenerDirector(OpenerDirector):
def __init__(self):
OpenerDirector.__init__(self)
self.recorded = []
def record(self, info):
self.recorded.append(info)
class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("digest")
urllib2.HTTPDigestAuthHandler.http_error_401(self,
*args, **kwds)
class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("basic")
urllib2.HTTPBasicAuthHandler.http_error_401(self,
*args, **kwds)
opener = RecordingOpenerDirector()
password_manager = MockPasswordManager()
digest_handler = TestDigestAuthHandler(password_manager)
basic_handler = TestBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(basic_handler)
opener.add_handler(digest_handler)
opener.add_handler(http_handler)
# check basic auth isn't blocked by digest handler failing
self._test_basic_auth(opener, basic_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected",
)
# check digest was tried before basic (twice, because
# _test_basic_auth called .open() twice)
self.assertEqual(opener.recorded, ["digest", "basic"]*2)