def login_required(perm=None):
def _dec(func):
def _view(*args, **kwargs):
s = request.environ.get('beaker.session')
if s.get("name", None) and s.get("authenticated", False):
if perm:
perms = parse_permissions(s)
if perm not in perms or not perms[perm]:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/nopermission")
return func(*args, **kwargs)
else:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/login")
return _view
return _dec
python类HTTPError()的实例源码
def enforce_api_rate_limit(action, limit_in_window):
"""Enforces API rate limit.
Args:
action: Action name.
limit_in_window: Maximum number of requests of this action in a window.
Raises:
bottle.HTTPError: If the rate limit is exceeded.
"""
if (FLAGS.enable_load_test_hacks and
bottle.request.headers.get('X-Load-Test', '') == 'yes'):
return
if get_current_user()['organizer']:
return
username = get_current_username()
if (model.record_last_api_access_time(username) <
FLAGS.api_rate_limit_request_interval):
bottle.abort(429, 'Rate limit exceeded (per-second limit).')
count = model.increment_api_rate_limit_counter(username, action)
if count > limit_in_window:
bottle.abort(429, 'Rate limit exceeded (per-hour limit).')
# http://flask.pocoo.org/snippets/44/
def addcrypted():
package = request.forms.get('referer', 'ClickAndLoad Package')
dlc = request.forms['crypted'].replace(" ", "+")
dlc_path = join(DL_ROOT, package.replace("/", "").replace("\\", "").replace(":", "") + ".dlc")
dlc_file = open(dlc_path, "wb")
dlc_file.write(dlc)
dlc_file.close()
try:
PYLOAD.addPackage(package, [dlc_path], 0)
except:
return HTTPError()
else:
return "success\r\n"
def links():
try:
links = [toDict(x) for x in PYLOAD.statusDownloads()]
ids = []
for link in links:
ids.append(link['fid'])
if link['status'] == 12:
link['info'] = "%s @ %s/s" % (link['format_eta'], formatSize(link['speed']))
elif link['status'] == 5:
link['percent'] = 0
link['size'] = 0
link['bleft'] = 0
link['info'] = _("waiting %s") % link['format_wait']
else:
link['info'] = ""
data = {'links': links, 'ids': ids}
return data
except Exception, e:
print_exc()
return HTTPError()
def test_dispatch_error(self, app, view_render_errors):
"""Dispatch raises an HTTPError"""
@app.routecv
class ExampleView(ExampleErrorView):
render_errors = view_render_errors
renderer_classes = [JSONRenderer]
resp = app.webtest.get('/error',
expect_errors=True)
if view_render_errors:
assert resp.status == '418 Teapot'
assert resp.body == b'[1, 2, 3]'
assert resp.headers['Content-Type'] == 'application/json'
else:
assert resp.status == '418 Teapot'
assert b'DOCTYPE HTML PUBLIC' in resp.body
assert resp.headers['Content-Type'] == 'text/html; charset=UTF-8'
def tasks_view(task_id):
response = {}
task = db.view_task(task_id, details=True)
if task:
entry = task.to_dict()
entry["guest"] = {}
if task.guest:
entry["guest"] = task.guest.to_dict()
entry["errors"] = []
for error in task.errors:
entry["errors"].append(error.message)
entry["sample"] = {}
if task.sample_id:
sample = db.view_sample(task.sample_id)
entry["sample"] = sample.to_dict()
response["task"] = entry
else:
return HTTPError(404, "Task not found")
return jsonize(response)
def task_screenshots(task=0, screenshot=None):
folder_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task), "shots")
if os.path.exists(folder_path):
if screenshot:
screenshot_name = "{0}.jpg".format(screenshot)
screenshot_path = os.path.join(folder_path, screenshot_name)
if os.path.exists(screenshot_path):
# TODO: Add content disposition.
response.content_type = "image/jpeg"
return open(screenshot_path, "rb").read()
else:
return HTTPError(404, screenshot_path)
else:
zip_data = StringIO()
with ZipFile(zip_data, "w", ZIP_STORED) as zip_file:
for shot_name in os.listdir(folder_path):
zip_file.write(os.path.join(folder_path, shot_name), shot_name)
# TODO: Add content disposition.
response.content_type = "application/zip"
return zip_data.getvalue()
else:
return HTTPError(404, folder_path)
def get_files(task_id):
if not task_id.isdigit():
return HTTPError(code=404, output="The specified ID is invalid")
files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files")
zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip")
with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive:
root_len = len(os.path.abspath(files_path))
for root, dirs, files in os.walk(files_path):
archive_root = os.path.abspath(root)[root_len:]
for f in files:
fullpath = os.path.join(root, f)
archive_name = os.path.join(archive_root, f)
archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED)
if not os.path.exists(files_path):
return HTTPError(code=404, output="Files not found")
response.content_type = "application/zip"
response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id))
return open(zip_file, "rb").read()
def audio_random(db):
audio = random.choice(list(db.query(Page).filter_by(type=2).limit(10)))
if audio:
return json.dumps({
'id': audio.id,
'title': audio.title,
'cover': audio.cover,
'para': audio.para,
'type': audio.type,
'tag': audio.tag,
'author': {
'id': audio.author.id,
'name': audio.author.name
},
'time': audio.time
})
return HTTPError(404, 'id not found.')
def native_latest_news(db):
all = db.query(Page).filter_by(type=10).order_by(desc(Page.id)).limit(10)
if all:
return json.dumps([{
'id': page.id,
'key': page.id,
'title': page.title,
'cover': page.cover,
'type': page.type,
'tag': page.tag,
'author': {
'id': page.author.id,
'name': page.author.name
},
'time': page.time
} for page in all])
return HTTPError(404, 'id not found.')
def latest_news(db):
all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).limit(10)
if all:
return json.dumps([{
'id': page.id,
'key': page.id,
'title': page.title,
'cover': page.cover,
'type': page.type,
'tag': page.tag,
'author': {
'id': page.author.id,
'name': page.author.name
},
'time': page.time
} for page in all])
return HTTPError(404, 'id not found.')
def anchor_news(anchor, db):
all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).filter(Page.id < anchor).limit(10)
if all:
return json.dumps([{
'id': page.id,
'key': page.id,
'title': page.title,
'cover': page.cover,
'type': page.type,
'tag': page.tag,
'author': {
'id': page.author.id,
'name': page.author.name
},
'time': page.time
} for page in all])
return HTTPError(404, 'id not found.')
def ensure_admin():
"""Ensures the client is an admin.
Raises:
HTTPError: If the client is not authenticated as an admin.
"""
if not is_admin():
response = bottle.HTTPError(401, 'Unauthorized')
response.headers['WWW-Authenticate'] = 'Basic realm="admin only" domain="/"'
raise response
def json_api_handler(handler):
"""Bottle handler decorator for JSON REST API handlers.
Args:
handler: A Bottle handler function.
Returns:
A wrapped Bottle handler function.
"""
@functools.wraps(handler)
def wrapped_handler(*args, **kwargs):
try:
handler_result = handler(*args, **kwargs)
except bottle.HTTPResponse as handler_result:
pass
except Exception:
logging.exception('Uncaught exception')
handler_result = bottle.HTTPError(500, 'Internal Server Error')
if isinstance(handler_result, bottle.HTTPResponse):
# For now, we do not support raising successful HTTPResponse.
assert handler_result.status_code // 100 != 2
# Forcibly convert HTTPError to HTTPResponse to avoid formatting.
response = handler_result.copy(cls=bottle.HTTPResponse)
response_data = {'ok': False, 'error': handler_result.body}
else:
assert isinstance(handler_result, dict)
response = bottle.response.copy(cls=bottle.HTTPResponse)
response_data = handler_result
response_data['ok'] = True
response.body = ujson.dumps(response_data, double_precision=6)
response.content_type = 'application/json'
return response
return wrapped_handler
def secret():
if not session.user:
err = bottle.HTTPError(401, "Login required")
err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
return err
return 'Welcome %s! <a href="/">Go back</a> (note that there is no reliable way to logout using basic auth)' % session.user
def secret():
if not session.user:
err = bottle.HTTPError(401, "Login required")
err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
return err
return 'You are authenticated as ' + str(session.user)
def index():
if can.session.user:
return "Hi " + str(can.session.user) + "!";
else:
err = bottle.HTTPError(401, "Login required")
err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
return err
def get_certificate():
username = post_get('username')
if not username:
raise bottle.HTTPError(500, "Username missing")
password = post_get('password', default=None)
days = int(post_get('days', default=None))
cert_gen = CertificateGenerator()
cert_gen.generate(password, username, days)
openvpn_config = cert_gen.get_openvpn_config()
headers = {
'Content-Type': 'text/plain;charset=UTF-8',
'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username,
"Content-Length": len(openvpn_config)
}
return bottle.HTTPResponse(openvpn_config, 200, **headers)
def server_static(filename):
""" route to the css and static files"""
if ".." in filename:
return HTTPError(status=403)
return bottle.static_file(filename, root='%s/static' % get_config('api', 'view-path', '/etc/softfire/views'))
#########
# Utils #
#########
def call_api(func, args=""):
response.headers.replace("Content-type", "application/json")
response.headers.append("Cache-Control", "no-cache, must-revalidate")
s = request.environ.get('beaker.session')
if 'session' in request.POST:
s = s.get_by_id(request.POST['session'])
if not s or not s.get("authenticated", False):
return HTTPError(403, json.dumps("Forbidden"))
if not PYLOAD.isAuthorized(func, {"role": s["role"], "permission": s["perms"]}):
return HTTPError(401, json.dumps("Unauthorized"))
args = args.split("/")[1:]
kwargs = {}
for x, y in chain(request.GET.iteritems(), request.POST.iteritems()):
if x == "session": continue
kwargs[x] = unquote(y)
try:
return callApi(func, *args, **kwargs)
except Exception, e:
print_exc()
return HTTPError(500, json.dumps({"error": e.message, "traceback": format_exc()}))
def callApi(func, *args, **kwargs):
if not hasattr(PYLOAD.EXTERNAL, func) or func.startswith("_"):
print "Invalid API call", func
return HTTPError(404, json.dumps("Not Found"))
result = getattr(PYLOAD, func)(*[literal_eval(x) for x in args],
**dict([(x, literal_eval(y)) for x, y in kwargs.iteritems()]))
# null is invalid json response
if result is None: result = True
return json.dumps(result, cls=TBaseEncoder)
#post -> username, password
def local_check(function):
def _view(*args, **kwargs):
if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \
or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666':
return function(*args, **kwargs)
else:
return HTTPError(403, "Forbidden")
return _view
def flashgot():
if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot":
return HTTPError()
autostart = int(request.forms.get('autostart', 0))
package = request.forms.get('package', None)
urls = filter(lambda x: x != "", request.forms['urls'].split("\n"))
folder = request.forms.get('dir', None)
if package:
PYLOAD.addPackage(package, urls, autostart)
else:
PYLOAD.generateAndAddPackages(urls, autostart)
return ""
def packages():
print "/json/packages"
try:
data = PYLOAD.getQueue()
for package in data:
package['links'] = []
for file in PYLOAD.get_package_files(package['id']):
package['links'].append(PYLOAD.get_file_info(file))
return data
except:
return HTTPError()
def package(id):
try:
data = toDict(PYLOAD.getPackageData(id))
data["links"] = [toDict(x) for x in data["links"]]
for pyfile in data["links"]:
if pyfile["status"] == 0:
pyfile["icon"] = "status_finished.png"
elif pyfile["status"] in (2, 3):
pyfile["icon"] = "status_queue.png"
elif pyfile["status"] in (9, 1):
pyfile["icon"] = "status_offline.png"
elif pyfile["status"] == 5:
pyfile["icon"] = "status_waiting.png"
elif pyfile["status"] == 8:
pyfile["icon"] = "status_failed.png"
elif pyfile["status"] == 4:
pyfile["icon"] = "arrow_right.png"
elif pyfile["status"] in (11, 13):
pyfile["icon"] = "status_proc.png"
else:
pyfile["icon"] = "status_downloading.png"
tmp = data["links"]
tmp.sort(key=get_sort_key)
data["links"] = tmp
return data
except:
print_exc()
return HTTPError()
def package_order(ids):
try:
pid, pos = ids.split("|")
PYLOAD.orderPackage(int(pid), int(pos))
return {"response": "success"}
except:
return HTTPError()
def abort_link(id):
try:
PYLOAD.stopDownloads([id])
return {"response": "success"}
except:
return HTTPError()
def move_package(dest, id):
try:
PYLOAD.movePackage(dest, id)
return {"response": "success"}
except:
return HTTPError()
def edit_package():
try:
id = int(request.forms.get("pack_id"))
data = {"name": request.forms.get("pack_name").decode("utf8", "ignore"),
"folder": request.forms.get("pack_folder").decode("utf8", "ignore"),
"password": request.forms.get("pack_pws").decode("utf8", "ignore")}
PYLOAD.setPackageData(id, data)
return {"response": "success"}
except:
return HTTPError()
def change_password():
user = request.POST["user_login"]
oldpw = request.POST["login_current_password"]
newpw = request.POST["login_new_password"]
if not PYLOAD.changePassword(user, oldpw, newpw):
print "Wrong password"
return HTTPError()