def retrieve_channel_information(request):
channel_id = request.matchdict['channel_id']
parent_id = '/channels/{}'.format(channel_id)
registrations, count = request.registry.storage.get_all(
collection_id=REGISTRATION_COLLECTION_ID,
parent_id=parent_id)
user_registered = [r for r in registrations if r['id'] == request.prefixed_userid]
if not user_registered:
raise httpexceptions.HTTPForbidden()
return {"data": {
"registrations": count,
"push": 0
}}
# Channel Registration views
python类HTTPForbidden()的实例源码
def shell2(request):
# Make sure we have a logged in user
auth = request.registry.queryUtility(IAuthenticationPolicy)
username = auth.authenticated_userid(request)
if not username:
# This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")
notebook_context = {"greeting": "**Executing shell2 context**\n"}
config_file = request.registry.settings["global_config"]["__file__"]
startup.make_startup(notebook_context, config_file)
startup.add_script(notebook_context, SCRIPT)
startup.add_greeting(notebook_context, GREETING)
return launch_notebook(request, username, notebook_context=notebook_context)
def delete_plugin(request):
"""
Delete A plugin
"""
plugin_id = request.matchdict['plugin_id']
user = request.user
query = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id)
plugin = query.first()
if plugin is None:
raise exc.HTTPBadRequest('No such plugin')
if user != plugin.user and not user.admin:
raise exc.HTTPForbidden("You don't have access to do that")
request.db_session.query(LedSchedule).filter(LedSchedule.led_plugin_id == plugin_id).delete()
request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).delete()
query.delete()
log(request, 'Deleted plugin ' + plugin.name)
return exc.HTTPFound(location='/plugin')
def show_group(request):
# a regular user will have their groups listed
# admin will see all groups
user = request.user
group = request.db_session.query(LedGroup).filter(LedGroup.id == request.matchdict['group_id']).first()
users = request.db_session.query(LedGroupUser).filter(LedGroupUser.led_group == group).all()
if not (user.admin or user.id in pluck(users, 'led_user_id')):
raise exc.HTTPForbidden("Only site admins or group members can view this")
schedule = request.db_session.query(LedSchedule).filter(LedSchedule.led_group == group).order_by(LedSchedule.position.asc()).all()
subquery = request.db_session.query(LedGroupUser.led_user_id).filter(LedGroupUser.led_group == group)
other_users = request.db_session.query(LedUser).filter(~LedUser.id.in_(subquery))
subquery = request.db_session.query(LedSchedule.led_plugin_id).filter(LedSchedule.led_group == group)
# other_plugins = request.db_session.query(LedPlugin).filter(~LedPlugin.id.in_(subquery))
other_plugins = request.db_session.query(LedPlugin)
return {
'group': group,
'users': users,
'schedule': schedule,
'other_users': other_users,
'other_plugins': other_plugins,
'group_admin': can_modify_group(request, group.id, False)
}
def submission_media_(request):
link_type = request.matchdict['linktype']
submitid = int(request.matchdict['submitid'])
if link_type == "submissions":
link_type = "submission"
submission = Submission.query.get(submitid)
if submission is None:
raise httpexceptions.HTTPForbidden()
elif submission.is_hidden or submission.is_friends_only:
raise httpexceptions.HTTPForbidden()
media_items = media.get_submission_media(submitid)
if not media_items.get(link_type):
raise httpexceptions.HTTPNotFound()
return Response(headerlist=[
('X-Accel-Redirect', str(media_items[link_type][0]['file_url']),),
('Cache-Control', 'max-age=0',),
])
def get_logged_in_user(cls, request):
"""Returns the logged in user as User instance
:param request: Request object
"""
from stalker import User
from stalker.db.session import DBSession
with DBSession.no_autoflush:
user = User.query \
.filter_by(login=authenticated_userid(request)).first()
if not user:
raise HTTPForbidden(request)
return user
def reset_password_page(request):
session = DBSession()
guid = request.params.get('guid')
user_id = request.params.get('u')
reset = session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first()
if not reset.validate_guid(guid):
raise HTTPForbidden()
else:
# Link is over 24 hours old
if reset.time + datetime.timedelta(days=1) < datetime.datetime.now():
raise HTTPForbidden()
return {"guid": guid, "user_id": user_id}
def reset_password(request):
session = DBSession()
guid = request.params.get('guid')
user_id = request.params.get('user_id')
new_password = request.params.get('new_password')
confirm_new_password = request.params.get('confirm_new_password')
if not session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first().validate_guid(guid):
raise HTTPForbidden()
if confirm_new_password != new_password:
params = {"message": "Passwords did not match",
"message_type": "change_password"}
return HTTPFound(location=request.route_url('viewAccount', _query=params))
pword_invalid_check = check_invalid_password(new_password, confirm_new_password)
if pword_invalid_check:
return HTTPFound(location=request.route_url('login', _query=pword_invalid_check))
session.query(User).filter(User.id == user_id).update({User.password: bcrypt.encrypt(new_password)})
session.query(PasswordReset).filter(PasswordReset.user_id == user_id).delete()
params = {"message": "Congratulations! Password successfully changed",
"message_type": "success"}
return HTTPFound(location=request.route_url('login', _query=params))
def permission_denied(self, request, message):
# Todo figure out how to determine if this is a authorization vs authentication error.
raise HTTPForbidden(detail=message)
def notebook_proxy(request):
# Make sure we have a logged in user
auth = request.registry.queryUtility(IAuthenticationPolicy)
username = auth.authenticated_userid(request)
if not username:
# This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")
return _notebook_proxy(request, username)
def shell1(request):
# Make sure we have a logged in user
auth = request.registry.queryUtility(IAuthenticationPolicy)
username = auth.authenticated_userid(request)
if not username:
# This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")
notebook_context = {"greeting": "**Executing shell1 context**\n"}
config_file = request.registry.settings["global_config"]["__file__"]
startup.make_startup(notebook_context, config_file)
return launch_notebook(request, username, notebook_context=notebook_context)
def shutdown_notebook(request):
# Make sure we have a logged in user
auth = request.registry.queryUtility(IAuthenticationPolicy)
username = auth.authenticated_userid(request)
if not username:
# This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")
_shutdown_notebook(request, username)
return HTTPFound(request.route_url("home"))
def forbidden_view(self):
if authenticated_userid(self.request):
return HTTPForbidden()
url = self.request.route_url('login', _query=(
('next', self.request.path),)
)
return HTTPFound(url)
def admin_only(func):
def _admin_only(*args, **kwargs):
user = args[1].user
if user is None or not user.admin:
print user, "is forbidden"
raise exc.HTTPForbidden('You do not have sufficient permissions to view this page')
return func(args, **kwargs)
return _admin_only
def update_plugin(request):
# make sure the plugin id exists
plugin_id = request.matchdict['plugin_id']
plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
if not plugin:
raise exc.HTTPBadRequest("No such plugin")
else:
# only a site admin or plugin owner can edit
# a non-admin plugin owner can suggest changes
user = request.user
if user != plugin.user and not user.admin:
raise exc.HTTPForbidden("You don't have access to do that")
POST = {k: to_null(v) for k, v in request.POST.items()}
if not user.admin:
if 'code' not in POST:
raise exc.HTTPBadRequest("Please provide the code")
plugin_update = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).first()
if plugin_update is not None:
plugin_update.code = POST['code']
else:
request.db_session.add(LedPluginProposed(led_plugin_id=plugin_id, code=POST['code']))
log(request, "Proposed changes to plugin <a href='/plugin/{}'>{}</a>".format(plugin.id, plugin.name))
return {
'msg': 'Your update is now awaiting approval'
}
else:
if POST['code']:
plugin.code = POST['code']
if 'name' in POST and POST['name']:
plugin.name = POST['name']
log(request, 'Updated <a href="/plugin/{0}">plugin {1}</a>: '.format(plugin_id, plugin.name, ))
return {
'msg': 'Updated code'
}
def can_modify_group(request, gid, raise_exc=True):
user = request.user
if user.admin:
return True
group_user = request.db_session.query(LedGroupUser).filter(and_(
LedGroupUser.led_group_id == gid,
LedGroupUser.led_user == user)).first()
if group_user is None or group_user.access_level != 2:
if raise_exc:
raise exc.HTTPForbidden("You can't modify this group")
return False
return True
def cached_view_embedded(context, request):
source = context.model.source
allowed = set(source['principals_allowed']['view'])
if allowed.isdisjoint(request.effective_principals):
raise HTTPForbidden()
return source['embedded']
def cached_view_object(context, request):
source = context.model.source
allowed = set(source['principals_allowed']['view'])
if allowed.isdisjoint(request.effective_principals):
raise HTTPForbidden()
return source['object']
def cached_view_audit(context, request):
source = context.model.source
allowed = set(source['principals_allowed']['audit'])
if allowed.isdisjoint(request.effective_principals):
raise HTTPForbidden()
return {
'@id': source['object']['@id'],
'audit': source['audit'],
}
def cached_view_audit_self(context, request):
source = context.model.source
allowed = set(source['principals_allowed']['audit'])
if allowed.isdisjoint(request.effective_principals):
raise HTTPForbidden()
path = source['object']['@id']
return {
'@id': path,
'audit': [a for a in chain(*source['audit'].values()) if a['path'] == path],
}
def get_changes(self):
# this method is called from all replica servers
# and either returns changelog entry content for {serial} or,
# if it points to the "next" serial, will block and wait
# until that serial is committed. However, after
# MAX_REPLICA_BLOCK_TIME, we return 202 Accepted to indicate
# the replica should try again. The latter has two benefits:
# - nginx' timeout would otherwise return 504 (Gateway Timeout)
# - if the replica is not waiting anymore we would otherwise
# never time out here, leading to more and more threads
# if no commits happen.
if not self.xom.is_master():
raise HTTPForbidden("Replication protocol disabled")
expected_uuid = self.request.headers.get(H_EXPECTED_MASTER_ID, None)
master_uuid = self.xom.config.get_master_uuid()
# we require the header but it is allowed to be empty
# (during initialization)
if expected_uuid is None:
msg = "replica sent no %s header" % H_EXPECTED_MASTER_ID
threadlog.error(msg)
raise HTTPBadRequest(msg)
if expected_uuid and expected_uuid != master_uuid:
threadlog.error("expected %r as master_uuid, replica sent %r", master_uuid,
expected_uuid)
raise HTTPBadRequest("expected %s as master_uuid, replica sent %s" %
(master_uuid, expected_uuid))
serial = self.request.matchdict["serial"]
with self.update_replica_status(serial):
keyfs = self.xom.keyfs
if serial.lower() == "nop":
raw_entry = b""
else:
try:
serial = int(serial)
except ValueError:
raise HTTPNotFound("serial needs to be int")
raw_entry = self._wait_for_entry(serial)
devpi_serial = keyfs.get_current_serial()
r = Response(body=raw_entry, status=200, headers={
str("Content-Type"): str("application/octet-stream"),
str("X-DEVPI-SERIAL"): str(devpi_serial),
})
return r