def etalab_get_discussions(request):
# According to the Etalab API specification, an Instance object must have the following fields:
# - url: we send discussion.get_url(). Note: a discussion may have 2 URLs (HTTP and HTTPS). For this, see discussion.get_discussion_urls()
# - name: we use discussion.topic
# - adminEmail: email of the discussion creator, who made the API request to create a discusison (so in our case the field will not show if the discussion has been created by another mean)
# - adminName: name of this guy, also provided in the discussion creation request (so in our case the field will not show if the discussion has been created by another mean)
# - createdAt: creation date
# We also send the following optional fields:
# - id: this field is not in specification's optional nor mandatory fields
# - status: "running"
# - metadata: metadata.creation_date => will probably be renamed, see above
view = "etalab"
user_id = authenticated_userid(request) or Everyone
permissions = request.permissions
if P_READ not in permissions:
raise HTTPUnauthorized()
discussions = discussions_with_access(user_id)
return {"items": [discussion.generic_json(view, user_id, permissions)
for discussion in discussions]}
python类HTTPUnauthorized()的实例源码
def userid_request_property(request):
"""
Used for the userid property on weasyl requests.
"""
api_token = request.headers.get('X_WEASYL_API_KEY')
authorization = request.headers.get('AUTHORIZATION')
if api_token is not None:
# TODO: If reification of userid becomes an issue (e.g. because of userid changing after sign-in) revisit this.
# It's possible that we don't need to reify the entire property, but just cache the result of this query in a
# cache on arguments inner function.
userid = d.engine.scalar("SELECT userid FROM api_tokens WHERE token = %(token)s", token=api_token)
if not userid:
raise HTTPUnauthorized(www_authenticate=('Weasyl-API-Key', 'realm="Weasyl"'))
return userid
elif authorization:
from weasyl.oauth2 import get_userid_from_authorization
userid = get_userid_from_authorization(request)
if not userid:
raise HTTPUnauthorized(www_authenticate=('Bearer', 'realm="Weasyl" error="invalid_token"'))
return userid
else:
userid = request.weasyl_session.userid
return 0 if userid is None else userid
def login(request):
"""the login view
"""
logger.debug('login start')
login_name = request.params.get('login', '')
password = request.params.get('password', '')
# get the user again (first got it in validation)
from stalker import User
from sqlalchemy import or_
user = User.query \
.filter(or_(User.login == login_name, User.email == login_name))\
.first()
if user and user.check_password(password):
logger.debug('Login successful!')
from pyramid.security import remember
headers = remember(request, login_name)
from stalker_pyramid.views.user import UserViews
user_view = UserViews(request)
user_view.entity_id = user.id
response = user_view.get_entity()
response.headers = headers
return response
else:
logger.debug('Bad Login')
from pyramid.httpexceptions import HTTPUnauthorized
return HTTPUnauthorized(detail='Bad Login')
def forbidden_view(request):
response = HTTPUnauthorized()
response.headers.update(forget(request))
return response
# Support the silly auth schemes that Subsonic has
def basic_challenge(request):
response = HTTPUnauthorized()
response.headers.update(forget(request))
return response
def __setitem__(self, key, value):
if key not in Preferences.preference_data_key_set:
raise KeyError("Unknown property")
pref_data = self.dprefs.get_preference_data()
req_permission = pref_data.get(key, {}).get(
self.ALLOW_OVERRIDE, False)
if (not req_permission) or not user_has_permission(
self.target.id if self.target else None,
self.user_id, req_permission):
raise HTTPUnauthorized("Cannot edit")
self.dprefs.validate(key, value)
super(UserPreferenceCollection, self).__setitem__(key, value)
def safe_del(self, key, permissions=(P_READ,)):
if not self.can_edit(key, permissions):
raise HTTPUnauthorized("Cannot delete "+key)
del self[key]
def safe_set(self, key, value, permissions=(P_READ,)):
if not self.can_edit(key, permissions):
raise HTTPUnauthorized("Cannot edit "+key)
self[key] = value
def delete_post_instance(request):
# Users who are allowed to delete (actually tombstone) a Post instance:
# - user who is the author of the Post instance and who has the P_DELETE_MY_POST permission in this discussion
# - user who has the P_DELETE_POST permission in this discussion
ctx = request.context
user_id = authenticated_userid(request) or Everyone
permissions = ctx.get_permissions()
instance = ctx._instance
allowed = False
if (user_id == instance.creator_id and P_DELETE_MY_POST in permissions) or (P_DELETE_POST in permissions):
allowed = True
if not allowed:
raise HTTPUnauthorized()
# Remove extracts associated to this post
extracts_to_remove = instance.db.query(Extract).filter(Extract.content_id == instance.id).all()
number_of_extracts = len(extracts_to_remove)
for extract in extracts_to_remove:
extract.delete()
if user_id == instance.creator_id and P_DELETE_MY_POST in permissions:
cause = PublicationStates.DELETED_BY_USER
elif P_DELETE_POST in permissions:
cause = PublicationStates.DELETED_BY_ADMIN
instance.delete_post(cause)
return {
"result": "Post has been successfully deleted.",
"removed_extracts": number_of_extracts
}
def raise_if_cannot_moderate(request):
ctx = request.context
user_id = authenticated_userid(request)
if not user_id:
raise HTTPUnauthorized()
permissions = ctx.get_permissions()
if P_MODERATE not in permissions:
raise HTTPUnauthorized()
def as_collection(self, parent_instance):
from pyramid.threadlocal import get_current_request
from pyramid.httpexceptions import HTTPUnauthorized
from assembl.models.user_key_values import UserNsDict
request = get_current_request()
if request is not None:
user_id = request.unauthenticated_userid
# Check again downstream for real userid
if user_id is None:
raise HTTPUnauthorized()
else:
raise RuntimeError()
return UserNsDict(parent_instance, user_id)
def api_login_required(view_callable):
"""
Like decorators.login_required, but returning json on an error.
"""
# TODO: If we replace the regular @login_required checks on POSTs with a tween, what do about this?
def inner(request):
if request.userid == 0:
raise HTTPUnauthorized(
json=_ERROR_UNSIGNED,
www_authenticate=_STANDARD_WWW_AUTHENTICATE,
)
return view_callable(request)
return inner
def _do_update_from_json(
self, json, parse_def, ctx,
duplicate_handling=None, object_importer=None):
from ..auth.util import user_has_permission
user_id = ctx.get_user_id()
target_user_id = user_id
user = ctx.get_instance_of_class(User)
if user:
target_user_id = user.id
if self.user_id:
if target_user_id != self.user_id:
if not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC):
raise HTTPUnauthorized()
# For now, do not allow changing user, it's way too complicated.
if 'user' in json and User.get_database_id(json['user']) != self.user_id:
raise HTTPBadRequest()
else:
json_user_id = json.get('user', None)
if json_user_id is None:
json_user_id = target_user_id
else:
json_user_id = User.get_database_id(json_user_id)
if json_user_id != user_id and not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC):
raise HTTPUnauthorized()
self.user_id = json_user_id
if self.discussion_id:
if 'discussion_id' in json and Discussion.get_database_id(json['discussion_id']) != self.discussion_id:
raise HTTPBadRequest()
else:
discussion_id = json.get('discussion', None) or ctx.get_discussion_id()
if discussion_id is None:
raise HTTPBadRequest()
self.discussion_id = Discussion.get_database_id(discussion_id)
new_type = json.get('@type', self.type)
if self.external_typename() != new_type:
polymap = inspect(self.__class__).polymorphic_identity
if new_type not in polymap:
raise HTTPBadRequest()
new_type = polymap[new_type].class_
new_instance = self.change_class(new_type)
return new_instance._do_update_from_json(
json, parse_def, ctx,
DuplicateHandling.USE_ORIGINAL, object_importer)
creation_origin = json.get('creation_origin', "USER_REQUESTED")
if creation_origin is not None:
self.creation_origin = NotificationCreationOrigin.from_string(creation_origin)
if json.get('parent_subscription', None) is not None:
self.parent_subscription_id = self.get_database_id(json['parent_subscription'])
status = json.get('status', None)
if status:
status = NotificationSubscriptionStatus.from_string(status)
if status != self.status:
self.status = status
self.last_status_change_date = datetime.utcnow()
return self
def _do_create_from_json(
cls, json, parse_def, context,
duplicate_handling=None, object_importer=None):
user_id = context.get_user_id()
permissions = context.get_permissions()
duplicate_handling = \
duplicate_handling or cls.default_duplicate_handling
can_create = cls.user_can_cls(
user_id, CrudPermissions.CREATE, permissions)
if duplicate_handling == DuplicateHandling.ERROR and not can_create:
raise HTTPUnauthorized(
"User id <%s> cannot create a <%s> object" % (
user_id, cls.__name__))
# creating an object can be a weird way to find an object by attributes
inst = cls()
i_context = inst.get_instance_context(context)
result = inst._do_update_from_json(
json, parse_def, i_context,
duplicate_handling, object_importer)
# Now look for missing relationships
result.populate_from_context(context)
result = result.handle_duplication(
json, parse_def, context, duplicate_handling, object_importer)
if result is inst and not can_create:
raise HTTPUnauthorized(
"User id <%s> cannot create a <%s> object" % (
user_id, cls.__name__))
elif result is not inst and \
not result.user_can(
user_id, CrudPermissions.UPDATE, permissions
) and cls.default_db.is_modified(result, False):
raise HTTPUnauthorized(
"User id <%s> cannot modify a <%s> object" % (
user_id, cls.__name__))
if result is not inst:
i_context = result.get_instance_context(context)
cls.default_db.add(result)
if '@id' in json and json['@id'] != result.uri() and object_importer:
object_importer.associate(json['@id'], result)
return i_context
def _assign_subobject_list(self, instances, accessor):
# only known case yet is Langstring.entries
if isinstance(accessor, RelationshipProperty):
if not accessor.back_populates:
# Try the brutal approach
setattr(self, accessor.key, instances)
else:
from ..lib.history_mixin import TombstonableMixin
current_instances = getattr(self, accessor.key)
missing = set(instances) - set(current_instances)
if missing:
# Maybe tombstones
missing = filter(
lambda a: not isinstance(a, TombstonableMixin) or
not a.is_tombstone, missing)
assert not missing, "what's wrong with back_populates?"
extra = set(current_instances) - set(instances)
if extra:
remote_columns = list(accessor.remote_side)
if len(accessor.remote_side) > 1:
if issubclass(accessor.mapper.class_, TombstonableMixin):
remote_columns = filter(lambda c: c.name != 'tombstone_date', remote_columns)
assert len(remote_columns) == 1
remote = remote_columns[0]
if remote.nullable:
# TODO: check update permissions on that object.
for inst in missing:
setattr(inst, remote.key, None)
else:
for inst in extra:
if not inst.user_can(
user_id, CrudPermissions.DELETE,
permissions):
raise HTTPUnauthorized(
"Cannot delete object %s", inst.uri())
else:
if isinstance(inst, TombstonableMixin):
inst.is_tombstone = True
else:
self.db.delete(inst)
elif isinstance(accessor, property):
# Note: Does not happen yet.
property.fset(self, instances)
elif isinstance(accessor, Column):
raise HTTPBadRequest(
"%s cannot have multiple values" % (accessor.key, ))
elif isinstance(accessor, AssociationProxy):
# Also never happens
current_instances = accessor.__get__(self, self.__class__)
missing = set(instances) - set(current_instances)
extra = set(current_instances) - set(instances)
for inst in missing:
accessor.add(inst)
for inst in extra:
accessor.remove(inst)
else:
assert False, "we should not get here"