def get(self, oid):
"""Get an object.
Returns an item from the DB with the request.data JSON object or all
the items if oid == None
:arg self: The class of the object to be retrieved
:arg integer oid: the ID of the object in the DB
:returns: The JSON item/s stored in the DB
"""
try:
ensure_authorized_to('read', self.__class__)
query = self._db_query(oid)
json_response = self._create_json_response(query, oid)
return Response(json_response, mimetype='application/json')
except Exception as e:
return error.format_exception(
e,
target=self.__class__.__name__.lower(),
action='GET')
python类Response()的实例源码
def export_to_csv(self, ids):
qs = json.loads(self.model.objects(id__in=ids).to_json())
def generate():
yield ','.join(list(max(qs, key=lambda x: len(x)).keys())) + '\n'
for item in qs:
yield ','.join([str(i) for i in list(item.values())]) + '\n'
return Response(
generate(),
mimetype="text/csv",
headers={
"Content-Disposition":
"attachment;filename=%s.csv" % self.model.__name__.lower()
}
)
def enqueue_mwoffliner():
token = UserJWT.from_request_header(request)
# only admins can enqueue task
if not token.is_admin:
raise exception.NotEnoughPrivilege()
def check_task(config: dict):
# check config is a dict
if not isinstance(config, dict):
raise exception.InvalidRequest()
# check config has mandatory data
if config.get('mwUrl') is None or config.get('adminEmail') is None:
raise exception.InvalidRequest()
def enqueue_task(config: dict):
task_name = 'mwoffliner'
celery_task = celery.send_task(task_name, kwargs={
'token': TaskJWT.new(task_name),
'config': config
})
TasksCollection().insert_one({
'_id': celery_task.id,
'celery_task_name': task_name,
'status': 'PENDING',
'time_stamp': {'created': datetime.utcnow(), 'started': None, 'ended': None},
'options': config,
'steps': []
})
task_configs = request.get_json()
if not isinstance(task_configs, list):
raise exception.InvalidRequest()
for task_config in task_configs:
check_task(task_config)
for task_config in task_configs:
enqueue_task(task_config)
return Response(status=202)
def invalid_request(_):
return Response(status=400)
def retrieveAlertsCyber():
""" Retrieve Alerts from ElasticSearch and return formatted
XML with limited alert content
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
app.logger.debug('Returning /retrieveAlertsCyber from Cache for %s' % str(request.remote_addr))
return Response(getCacheResult)
# query ES
else:
returnResult = formatAlertsXml(queryAlerts(app.config['MAXALERTS'], checkCommunityIndex(request)))
setCache(request.url, returnResult, 1, "url")
app.logger.debug('Returning /retrieveAlertsCyber from ES for %s' % str(request.remote_addr))
return Response(returnResult, mimetype='text/xml')
def querySingleIP():
""" Retrieve Attack data from index about a single IP
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr))
return Response(getCacheResult)
# query ES
else:
returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 60, "url")
app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr))
return Response(returnResult, mimetype='text/xml')
# Routes with both XML and JSON output
def retrieveAlertsCount():
""" Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") """
# Retrieve Number of Alerts from ElasticSearch and return as xml / json
if not request.args.get('time'):
app.logger.error('No time GET-parameter supplied in retrieveAlertsCount. Must be decimal number (in minutes) or string "day"')
return app.config['DEFAULTRESPONSE']
else:
if request.args.get('out') and request.args.get('out') == 'json':
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
else:
returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'json')
setCache(request.url, returnResult, 60, "url")
return jsonify(returnResult)
else:
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return Response(getCacheResult, mimetype='text/xml')
else:
returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'xml')
setCache(request.url, returnResult, 60, "url")
return Response(returnResult, mimetype='text/xml')
def retrieveIPs():
""" Retrieve IPs from ElasticSearch and return formatted XML or JSON with IPs """
if request.args.get('out') and request.args.get('out') == 'json':
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
else:
returnResult = formatBadIP(
queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'json')
setCache(request.url, returnResult, 60, "url")
return jsonify(returnResult)
else:
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return Response(getCacheResult, mimetype='text/xml')
else:
returnResult = formatBadIP(
queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'xml')
setCache(request.url, returnResult, 60, "url")
return Response(returnResult, mimetype='text/xml')
# Routes with JSON output
def getSimpleMessage():
return Response("POST is required for this action.", mimetype='text/html', status=500)
def authFailure():
""" Authentication failure --> 401 """
return Response("Authentication failed!", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]:
""" Given a function returns one that requires basic auth """
@wraps(func)
def decorator():
auth = request.authorization
if auth and validAuth(auth.username, auth.password):
return func()
return authFailure()
return decorator
def cache(self, seconds=60):
def inner_decorator(f):
@wraps(f)
def wrapper(*args, **kwds):
resp = f(*args, **kwds)
if not isinstance(resp, flask.Response):
resp = flask.make_response(resp)
resp.headers['Cache-Control'] = 'public, max-age={}'.format(seconds)
resp.headers["Expires"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time() + seconds))
resp.add_etag()
return resp
return wrapper
return inner_decorator
def require_api_token(self, f):
@wraps(f)
def wrapper(*args, **kwds):
token = flask.request.headers.get('X-Status-API-Token', flask.request.headers.get('X-Live-Status-API-Token'))
if token != self.settings['api_token']:
resp = flask.make_response(flask.jsonify({}), 403)
else:
resp = f(*args, **kwds)
if not isinstance(resp, flask.Response):
resp = flask.make_response(resp)
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
resp.headers['Pragma'] = 'no-cache'
resp.headers['Expires'] = '0'
return resp
return wrapper
def _chat():
data = request.args
question = data.get('question')
session = data.get('session')
lang = data.get('lang', 'en')
query = data.get('query', 'false')
query = query.lower() == 'true'
request_id = request.headers.get('X-Request-Id')
marker = data.get('marker', 'default')
response, ret = ask(
question, lang, session, query, request_id=request_id, marker=marker)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _batch_chat():
auth = request.form.get('Auth')
if not auth or not check_auth(auth):
return authenticate()
questions = request.form.get('questions')
questions = json.loads(questions)
session = request.form.get('session')
lang = request.form.get('lang', 'en')
responses = []
for idx, question in questions:
response, ret = ask(str(question), lang, session)
responses.append((idx, response, ret))
return Response(json_encode({'ret': 0, 'response': responses}),
mimetype="application/json")
def _said():
data = request.args
session = data.get('session')
message = data.get('message')
ret, response = said(session, message)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _rate():
data = request.args
response = ''
try:
ret = rate_answer(data.get('session'), int(
data.get('index')), data.get('rate'))
except Exception as ex:
response = ex.message
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _chatbots():
data = request.args
lang = data.get('lang', None)
session = data.get('session')
characters = list_character(lang, session)
return Response(json_encode({'ret': 0, 'response': characters}),
mimetype="application/json")
def _start_session():
botname = request.args.get('botname')
user = request.args.get('user')
test = request.args.get('test', 'false')
refresh = request.args.get('refresh', 'false')
test = test.lower() == 'true'
refresh = refresh.lower() == 'true'
sid = session_manager.start_session(
user=user, key=botname, test=test, refresh=refresh)
sess = session_manager.get_session(sid)
sess.sdata.botname = botname
sess.sdata.user = user
return Response(json_encode({'ret': 0, 'sid': str(sid)}),
mimetype="application/json")
def _sessions():
sessions = session_manager.list_sessions()
return Response(json_encode({'ret': 0, 'response': sessions}),
mimetype="application/json")
def _set_weights():
data = request.args
lang = data.get('lang', None)
param = data.get('param')
sid = data.get('session')
ret, response = set_weights(param, lang, sid)
if ret:
sess = session_manager.get_session(sid)
if sess and hasattr(sess.sdata, 'weights'):
logger.info("Set weights {} successfully".format(sess.sdata.weights))
else:
logger.info("Set weights failed.")
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _set_context():
data = request.args
context_str = data.get('context')
context = {}
for tok in context_str.split(','):
k, v = tok.split('=')
context[k.strip()] = v.strip()
sid = data.get('session')
ret, response = set_context(context, sid)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _remove_context():
data = request.args
keys = data.get('keys')
keys = keys.split(',')
sid = data.get('session')
ret, response = remove_context(keys, sid)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _update_config():
data = request.args.to_dict()
for k, v in data.iteritems():
if v.lower() == 'true':
data[k]=True
elif v.lower() == 'false':
data[k]=False
elif re.match(r'[0-9]+', v):
data[k]=int(v)
elif re.match(r'[0-9]+\.[0-9]+', v):
data[k]=float(v)
else:
data[k]=str(v)
ret, response = update_config(**data)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _log():
def generate():
with open(LOG_CONFIG_FILE) as f:
for row in f:
yield row
return Response(generate(), mimetype='text/plain')
def _reset_session():
data = request.args
sid = data.get('session')
if session_manager.has_session(sid):
session_manager.reset_session(sid)
ret, response = True, "Session reset"
else:
ret, response = False, "No such session"
return Response(json_encode({
'ret': ret,
'response': response
}),
mimetype="application/json")
def _dump_history():
try:
dump_history()
ret, response = True, "Success"
except Exception:
ret, response = False, "Failure"
return Response(json_encode({
'ret': ret,
'response': response
}),
mimetype="application/json")
def _stats():
try:
data = request.args
days = int(data.get('lookback', 7))
dump_history()
response = history_stats(HISTORY_DIR, days)
ret = True
except Exception as ex:
ret, response = False, {'err_msg': str(ex)}
logger.error(ex)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def authenticate():
return Response(json_encode({'ret': 401, 'response': {'text': 'Could not verify your access'}}),
mimetype="application/json")
def add_virtual_tn():
"""
The VirtualTN resource endpoint for adding VirtualTN's from the pool.
"""
body = request.json
try:
value = str(body['value'])
assert len(value) <= 18
except (AssertionError, KeyError):
raise InvalidAPIUsage(
"Required argument: 'value' (str, length <= 18)",
payload={'reason':
'invalidAPIUsage'})
virtual_tn = VirtualTN(value)
try:
db_session.add(virtual_tn)
db_session.commit()
except IntegrityError:
db_session.rollback()
msg = ("Did not add virtual TN {} to the pool "
"-- already exists").format(value)
log.info({"message": msg})
raise InvalidAPIUsage(
"Virtual TN already exists",
payload={'reason':
'duplicate virtual TN'})
return Response(
json.dumps(
{"message": "Successfully added TN to pool",
"value": value}),
content_type="application/json")