def jsonize(data):
"""Converts data dict to JSON.
@param data: data dict
@return: JSON formatted data
"""
response.content_type = "application/json; charset=UTF-8"
return json.dumps(data, sort_keys=False, indent=4)
python类content_type()的实例源码
def files_get(sha256):
file_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256)
if os.path.exists(file_path):
response.content_type = "application/octet-stream; charset=UTF-8"
return open(file_path, "rb").read()
else:
return HTTPError(404, "File not found")
def pcap_get(task_id):
file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses",
"%d" % task_id, "dump.pcap")
if os.path.exists(file_path):
response.content_type = "application/octet-stream; charset=UTF-8"
try:
return open(file_path, "rb").read()
except:
return HTTPError(500, "An error occurred while reading PCAP")
else:
return HTTPError(404, "File not found")
def get_pcap(task_id):
if not task_id.isdigit():
return HTTPError(code=404, output="The specified ID is invalid")
pcap_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap")
if not os.path.exists(pcap_path):
return HTTPError(code=404, output="PCAP not found")
response.content_type = "application/vnd.tcpdump.pcap"
response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.pcap".format(task_id))
return open(pcap_path, "rb").read()
def status(self):
"""
Reports on the status of this translation server.
"""
response_data = {
'status': self._status,
'models': self._models,
'version': pkg_resources.require("nematus")[0].version,
'service': 'nematus',
}
response.content_type = "application/json"
return json.dumps(response_data)
def create_routes(host, port):
"""
Define registry routes
"""
@route("/runtimes/", method=['OPTIONS', 'GET'])
def get_registry():
"""
Get data about rill runtime
"""
from rill.runtime import Runtime
response.set_header('Access-Control-Allow-Origin', '*')
response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
response.set_header('Allow', 'GET, OPTIONS')
response.set_header(
'Access-Control-Allow-Headers',
'Content-Type, Authorization'
)
if request.method == 'OPTIONS':
return 'GET,OPTIONS'
response.content_type = 'application/json'
runtime = Runtime()
runtime_meta = runtime.get_runtime_meta()
runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port)
runtime_meta['protocol'] = 'websocket'
runtime_meta['id'] = 'rill_' + urlparse(address).netloc
runtime_meta['seen'] = str(datetime.now())
return json.dumps([runtime_meta])
def json_api(func):
def func_wrapper(*args, **kwargs):
rs = func(*args, **kwargs)
try:
ds = json.dumps(rs)
response.content_type = 'application/json; charset=utf-8'
return ds
except:
return rs
return func_wrapper
# decorator for admin APIs
def default_error_handler(resp: BaseResponse):
response.content_type = 'application/problem+json'
msg = "%s, caused by: %s" % (resp.body, resp.exception) \
if getattr(resp, 'exception', None) else resp.body
LOG.error(msg)
if resp.status_code >= 500 and getattr(resp, 'traceback', None):
LOG.debug(resp.traceback)
return json.dumps({'title': resp.body, 'status': resp.status_code})
def web_inbound_call(button=None):
"""
Handles an inbound phone call
This function is called from twilio cloud back-end
"""
if button is None:
button = settings['server']['default']
logging.info("Receiving inbound call for button '{}'".format(button))
try:
button = decode_token(settings, button, action='call')
context = load_button(settings, button)
update, twilio_action = get_push_details(context)
response.content_type = 'text/xml'
behaviour = twilio.twiml.Response()
say = None
if 'call' in twilio_action:
for line in twilio_action['call']:
if line.keys()[0] == 'say':
say = line['say']
break
if say is None:
say = "What's up Doc?"
behaviour.say(say)
return str(behaviour)
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to handle inbound call for '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
#
# the collection of buttons that we manage
#
def tasks_report(task_id, report_format="json"):
formats = {
"json": "report.json",
"html": "report.html",
"htmlsumary": "summary-report.html",
"pdf": "report.pdf",
"maec": "report.maec-4.1.xml",
"metadata": "report.metadata.xml",
}
bz_formats = {
"all": {"type": "-", "files": ["memory.dmp"]},
"dropped": {"type": "+", "files": ["files"]},
"dist" : {"type": "+", "files": ["shots", "reports"]},
"dist2": {"type": "-", "files": ["shots", "reports", "binary"]},
}
tar_formats = {
"bz2": "w:bz2",
"gz": "w:gz",
"tar": "w",
}
if report_format.lower() in formats:
report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses",
"%d" % task_id, "reports",
formats[report_format.lower()])
elif report_format.lower() in bz_formats:
bzf = bz_formats[report_format.lower()]
srcdir = os.path.join(CUCKOO_ROOT, "storage",
"analyses", "%d" % task_id)
s = StringIO()
# By default go for bz2 encoded tar files (for legacy reasons.)
tarmode = tar_formats.get(request.GET.get("tar"), "w:bz2")
tar = tarfile.open(fileobj=s, mode=tarmode)
for filedir in os.listdir(srcdir):
if bzf["type"] == "-" and filedir not in bzf["files"]:
tar.add(os.path.join(srcdir, filedir), arcname=filedir)
if bzf["type"] == "+" and filedir in bzf["files"]:
tar.add(os.path.join(srcdir, filedir), arcname=filedir)
if report_format.lower() == "dist" and FULL_DB:
buf = results_db.analysis.find_one({"info.id": task_id})
tarinfo = tarfile.TarInfo("mongo.json")
buf_dumped = json_util.dumps(buf)
tarinfo.size = len(buf_dumped)
buf = StringIO(buf_dumped)
tar.addfile(tarinfo, buf)
tar.close()
response.content_type = "application/x-tar; charset=UTF-8"
return s.getvalue()
else:
return HTTPError(400, "Invalid report format")
if os.path.exists(report_path):
return open(report_path, "rb").read()
else:
return HTTPError(404, "Report not found")