def authentication_required(f):
""" This login decorator verifies that the correct username
and password are sent over POST in the XML format.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
postdata = request.data.decode('utf-8')
if len(postdata) == 0:
app.logger.error('Authentication: No xml post data in request')
return abort(403)
else:
root = ETdefused.fromstring(postdata)
user_data = root.find("./Authentication/username")
pass_data = root.find("./Authentication/token")
if user_data is None or pass_data is None:
app.logger.error('Authentication: Invalid XML, token not present or empty')
return abort(403)
username = user_data.text
password = pass_data.text
if not authenticate(username, password):
app.logger.error("Authentication failure for user %s", username)
return abort(403)
return f(*args, **kwargs)
return decorated_function
python类data()的实例源码
def checkCommunityUser():
""" Checks if community credentials are used
"""
postdata = request.data.decode('utf-8')
if len(postdata) == 0:
app.logger.error('no xml post data in request')
return abort(403)
else:
root = ETdefused.fromstring(postdata)
user_data = root.find("./Authentication/username")
pass_data = root.find("./Authentication/token")
if user_data is None or pass_data is None:
app.logger.error('Invalid XML: token not present or empty')
return abort(403)
username = user_data.text
password = pass_data.text
if username == app.config['COMMUNITYUSER'] and password == app.config['COMMUNITYTOKEN']:
return True
if not authenticate(username, password):
app.logger.error("simplePostMessage-Authentication failure for user %s", username)
return abort(403)
return False
def querySingleIP():
""" Retrieve Attack data from index about a single IP
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr))
return Response(getCacheResult)
# query ES
else:
returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 60, "url")
app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr))
return Response(returnResult, mimetype='text/xml')
# Routes with both XML and JSON output
def postSimpleMessage():
if request.data:
tree = putservice.checkPostData(request.data)
if tree:
putservice.handleAlerts(tree, checkCommunityUser(), es, cache)
message = "<Result><StatusCode>OK</StatusCode><Text></Text></Result>"
return Response(message, mimetype='text/xml')
return app.config['DEFAULTRESPONSE']
###############
### Main
###############
def get_method(self):
"""Return a string indicating the HTTP request method."""
if self.method is not None:
return self.method
elif self.data is not None:
return "POST"
else:
return "GET"
def add_data(self, data):
msg = "Request.add_data method is deprecated."
warnings.warn(msg, DeprecationWarning, stacklevel=1)
self.data = data