def js_dynamic(path):
response.headers['Expires'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
time.gmtime(time.time() + 60 * 60 * 24 * 2))
response.headers['Cache-control'] = "public"
response.headers['Content-Type'] = "text/javascript; charset=UTF-8"
try:
# static files are not rendered
if "static" not in path and "mootools" not in path:
t = env.get_template("js/%s" % path)
return t.render()
else:
return static_file(path, root=join(PROJECT_DIR, "media", "js"))
except:
return HTTPError(404, "Not Found")
python类HTTPError()的实例源码
def test_chunked_not_terminated(self):
self._test_chunked('1\r\nx\r\n', HTTPError)
def test_chunked_wrong_size(self):
self._test_chunked('2\r\nx\r\n', HTTPError)
def test_chunked_illegal_size(self):
self._test_chunked('x\r\nx\r\n', HTTPError)
def test_chunked_not_chunked_at_all(self):
self._test_chunked('abcdef', HTTPError)
def test_json_tobig(self):
""" Environ: Request.json property with huge body. """
test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX)
e = {'CONTENT_TYPE': 'application/json'}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(tob(json_dumps(test)))
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
self.assertRaises(HTTPError, lambda: BaseRequest(e).json)
def testBasic(self):
self.assertMatches('/static', '/static')
self.assertMatches('/\\:its/:#.+#/:test/:name#[a-z]+#/',
'/:its/a/cruel/world/',
test='cruel', name='world')
self.assertMatches('/:test', '/test', test='test') # No tail
self.assertMatches(':test/', 'test/', test='test') # No head
self.assertMatches('/:test/', '/test/', test='test') # Middle
self.assertMatches(':test', 'test', test='test') # Full wildcard
self.assertMatches('/:#anon#/match', '/anon/match') # Anon wildcards
self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
def testNewSyntax(self):
self.assertMatches('/static', '/static')
self.assertMatches('/\\<its>/<:re:.+>/<test>/<name:re:[a-z]+>/',
'/<its>/a/cruel/world/',
test='cruel', name='world')
self.assertMatches('/<test>', '/test', test='test') # No tail
self.assertMatches('<test>/', 'test/', test='test') # No head
self.assertMatches('/<test>/', '/test/', test='test') # Middle
self.assertMatches('<test>', 'test', test='test') # Full wildcard
self.assertMatches('/<:re:anon>/match', '/anon/match') # Anon wildcards
self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
def testValueErrorInFilter(self):
self.r.add_filter('test', lambda x: ('.*', int, int))
self.assertMatches('/int/<i:test>', '/int/5', i=5) # No tail
self.assertRaises(bottle.HTTPError, self.match, '/int/noint')
def testIntFilter(self):
self.assertMatches('/object/<id:int>', '/object/567', id=567)
self.assertRaises(bottle.HTTPError, self.match, '/object/abc')
def testFloatFilter(self):
self.assertMatches('/object/<id:float>', '/object/1', id=1)
self.assertMatches('/object/<id:float>', '/object/1.1', id=1.1)
self.assertMatches('/object/<id:float>', '/object/.1', id=0.1)
self.assertMatches('/object/<id:float>', '/object/1.', id=1)
self.assertRaises(bottle.HTTPError, self.match, '/object/abc')
self.assertRaises(bottle.HTTPError, self.match, '/object/')
self.assertRaises(bottle.HTTPError, self.match, '/object/.')
def test_401(self):
""" WSGI: abort(401, '') (HTTP 401) """
@bottle.route('/')
def test(): bottle.abort(401)
self.assertStatus(401, '/')
@bottle.error(401)
def err(e):
bottle.response.status = 200
return str(type(e))
self.assertStatus(200, '/')
self.assertBody("<class 'bottle.HTTPError'>",'/')
def test_view_error(self):
""" WSGI: Test if view-decorator reacts on non-dict return values correctly."""
@bottle.route('/tpl')
@bottle.view('stpl_t2main')
def test():
return bottle.HTTPError(401, 'The cake is a lie!')
self.assertInBody('The cake is a lie!', '/tpl')
self.assertInBody('401 Unauthorized', '/tpl')
self.assertStatus(401, '/tpl')
def dispatch(self):
raise HTTPError('418 Teapot', [1,2,3])
def tasks_create_file():
response = {}
data = request.files.file
package = request.forms.get("package", "")
timeout = request.forms.get("timeout", "")
priority = request.forms.get("priority", 1)
options = request.forms.get("options", "")
machine = request.forms.get("machine", "")
platform = request.forms.get("platform", "")
tags = request.forms.get("tags", None)
custom = request.forms.get("custom", "")
memory = request.forms.get("memory", 'False')
clock = request.forms.get("clock", None)
shrike_url = request.forms.get("shrike_url", None)
shrike_msg = request.forms.get("shrike_msg", None)
shrike_sid = request.forms.get("shrike_sid", None)
shrike_refer = request.forms.get("shrike_refer", None)
if memory.upper() == 'FALSE' or memory == '0':
memory = False
else:
memory = True
enforce_timeout = request.forms.get("enforce_timeout", 'False')
if enforce_timeout.upper() == 'FALSE' or enforce_timeout == '0':
enforce_timeout = False
else:
enforce_timeout = True
temp_file_path = store_temp_file(data.file.read(), data.filename)
try:
task_ids = db.demux_sample_and_add_to_db(file_path=temp_file_path, package=package, timeout=timeout, options=options, priority=priority,
machine=machine, platform=platform, custom=custom, memory=memory, enforce_timeout=enforce_timeout, tags=tags, clock=clock,
shrike_url=shrike_url, shrike_msg=shrike_msg, shrike_sid=shrike_sid, shrike_refer=shrike_refer)
except CuckooDemuxError as e:
return HTTPError(500, e)
response["task_ids"] = task_ids
return jsonize(response)
def tasks_reschedule(task_id):
response = {}
if not db.view_task(task_id):
return HTTPError(404, "There is no analysis with the specified ID")
if db.reschedule(task_id):
response["status"] = "OK"
else:
return HTTPError(500, "An error occurred while trying to "
"reschedule the task")
return jsonize(response)
def tasks_delete(task_id):
response = {}
task = db.view_task(task_id)
if task:
if task.status == TASK_RUNNING:
return HTTPError(500, "The task is currently being "
"processed, cannot delete")
if db.delete_task(task_id):
delete_folder(os.path.join(CUCKOO_ROOT, "storage",
"analyses", "%d" % task_id))
if FULL_DB:
task = results_db.analysis.find_one({"info.id": task_id})
for processes in task.get("behavior", {}).get("processes", []):
[results_db.calls.remove(call) for call in processes.get("calls", [])]
results_db.analysis.remove({"info.id": task_id})
response["status"] = "OK"
else:
return HTTPError(500, "An error occurred while trying to "
"delete the task")
else:
return HTTPError(404, "Task not found")
return jsonize(response)
def files_get(sha256):
file_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256)
if os.path.exists(file_path):
response.content_type = "application/octet-stream; charset=UTF-8"
return open(file_path, "rb").read()
else:
return HTTPError(404, "File not found")
def pcap_get(task_id):
file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses",
"%d" % task_id, "dump.pcap")
if os.path.exists(file_path):
response.content_type = "application/octet-stream; charset=UTF-8"
try:
return open(file_path, "rb").read()
except:
return HTTPError(500, "An error occurred while reading PCAP")
else:
return HTTPError(404, "File not found")
def machines_view(name=None):
response = {}
machine = db.view_machine(name=name)
if machine:
response["machine"] = machine.to_dict()
else:
return HTTPError(404, "Machine not found")
return jsonize(response)