def view_corpus(request):
page = int(request.matchdict['page'])
corpus = get_corpus(request)
if corpus is None or page > (corpus.n_docs - 1):
raise exc.HTTPNotFound()
nextp = page + 1
if nextp >= corpus.n_docs:
nextp = None
prevp = page - 1
if prevp < 0:
prevp = None
return {
'corpus': corpus,
'doc': corpus[page],
'nextp': nextp,
'prevp': prevp,
'page': page
}
python类HTTPNotFound()的实例源码
def add_language(request):
"""
This view adds a language
:param request: a request object
:return: None
"""
if request.authenticated_userid is None:
raise exc.HTTPNotFound()
name = request.matchdict["language"]
public_group_id = request.matchdict["public_group_id"]
language = models.Language.get_by_name(name)
if not language:
language = models.Language(name=name)
request.db.add(language)
# We need to flush the db session here so that language.id will be generated.
request.db.flush()
url = request.route_url('translation_read', public_language_id=language.pubid, public_group_id=public_group_id)
return exc.HTTPSeeOther(url)
def add_page(request):
"""
Add a page to the database
:param request: a request object
:return: a redirect to the translation_read URL
"""
if request.authenticated_userid is None:
raise exc.HTTPNotFound()
name = request.matchdict["language_name"]
page_id = urllib.unquote(urllib.unquote(request.matchdict["page_url"]))
public_group_id = request.matchdict["public_group_id"]
language = annotran.languages.models.Language.get_by_name(name)
page = annotran.pages.models.Page.get_by_uri(page_id)
if not page:
page = annotran.pages.models.Page(uri=page_id)
request.db.add(page)
request.db.flush()
url = request.route_url('translation_read', public_language_id=language.pubid, public_group_id=public_group_id)
return exc.HTTPSeeOther(url)
def activate_by_email(self, activation_code: str, location=None) -> Response:
"""Active a user after user after the activation email.
* User clicks link in the activation email
* User enters the activation code on the form by hand
"""
request = self.request
settings = request.registry.settings
user_registry = get_user_registry(request)
after_activate_url = request.route_url(settings.get('websauna.activate_redirect', 'index'))
login_after_activation = asbool(settings.get('websauna.login_after_activation', False))
user = user_registry.activate_user_by_email_token(activation_code)
if not user:
raise HTTPNotFound("Activation code not found")
if login_after_activation:
login_service = get_login_service(self.request.registry)
return login_service.authenticate(self.request, user)
else:
self.request.registry.notify(RegistrationActivatedEvent(self.request, user, None))
return HTTPFound(location=location or after_activate_url)
def reset_password(self, activation_code: str, password: str, location=None) -> Response:
"""Perform actual password reset operations.
User has following password reset link (GET) or enters the code on a form.
"""
request = self.request
user_registry = get_user_registry(request)
user = user_registry.get_user_by_password_reset_token(activation_code)
if not user:
return HTTPNotFound("Activation code not found")
user_registry.reset_password(user, password)
messages.add(request, msg="The password reset complete. Please sign in with your new password.", kind='success', msg_id="msg-password-reset-complete")
request.registry.notify(PasswordResetEvent(self.request, user, password))
request.registry.notify(UserAuthSensitiveOperation(self.request, user, "password_reset"))
location = location or get_config_route(request, 'websauna.reset_password_redirect')
return HTTPFound(location=location)
def discussion_attitude(request, for_api=False, api_data=None):
"""
View configuration for discussion step, where we will ask the user for her attitude towards a statement.
:param request: request of the web server
:param for_api: Boolean
:param api_data:
:return: dictionary
"""
# '/discuss/{slug}/attitude/{statement_id}'
logger('Views', 'discussion_attitude', 'request.matchdict: {}'.format(request.matchdict))
logger('Views', 'discussion_attitude', 'request.params: {}'.format(request.params))
prepared_discussion = __call_from_discussion_step(request, discussion.attitude, for_api, api_data)
if not prepared_discussion:
raise HTTPNotFound()
return prepared_discussion
# justify page
def discussion_justify(request, for_api=False, api_data=None):
"""
View configuration for discussion step, where we will ask the user for her a justification of her opinion/interest.
:param request: request of the web server
:param for_api: Boolean
:param api_data:
:return: dictionary
"""
# '/discuss/{slug}/justify/{statement_or_arg_id}/{mode}*relation'
logger('views', 'discussion_justify', 'request.matchdict: {}'.format(request.matchdict))
logger('views', 'discussion_justify', 'request.params: {}'.format(request.params))
prepared_discussion = __call_from_discussion_step(request, discussion.justify, for_api, api_data)
if not prepared_discussion:
raise HTTPNotFound()
return prepared_discussion
# reaction page
def discussion_reaction(request, for_api=False, api_data=None):
"""
View configuration for discussion step, where we will ask the user for her reaction (support, undercut, rebut)...
:param request: request of the web server
:param for_api: Boolean
:param api_data:
:return: dictionary
"""
# '/discuss/{slug}/reaction/{arg_id_user}/{mode}*arg_id_sys'
logger('views', 'discussion_reaction', 'request.matchdict: {}'.format(request.matchdict))
logger('views', 'discussion_reaction', 'request.params: {}'.format(request.params))
prepared_discussion = __call_from_discussion_step(request, discussion.reaction, for_api, api_data)
if not prepared_discussion:
raise HTTPNotFound()
return prepared_discussion
# support page
def discussion_support(request, for_api=False, api_data=None):
"""
View configuration for discussion step, where we will present another supportive argument.
:param request: request of the web server
:param for_api: Boolean
:param api_data:
:return: dictionary
"""
# '/discuss/{slug}/jump/{arg_id}'
logger('views', 'discussion_support', 'request.matchdict: {}'.format(request.matchdict))
logger('views', 'discussion_support', 'request.params: {}'.format(request.params))
prepared_discussion = __call_from_discussion_step(request, discussion.support, for_api, api_data)
if not prepared_discussion:
raise HTTPNotFound()
return prepared_discussion
# finish page
def discussion_choose(request, for_api=False, api_data=None):
"""
View configuration for discussion step, where the user has to choose between given statements.
:param request: request of the web server
:param for_api: Boolean
:param api_data:
:return: dictionary
"""
# '/discuss/{slug}/choose/{is_argument}/{supportive}/{id}*pgroup_ids'
match_dict = request.matchdict
params = request.params
logger('discussion_choose', 'def', 'request.matchdict: {}'.format(match_dict))
logger('discussion_choose', 'def', 'request.params: {}'.format(params))
prepared_discussion = __call_from_discussion_step(request, discussion.choose, for_api, api_data)
if not prepared_discussion:
raise HTTPNotFound()
return prepared_discussion
# jump page
def get_nuimo_component_view(request):
component_id = request.matchdict['component_id']
mac_address = request.matchdict['mac_address'].replace('-', ':')
with open(request.registry.settings['nuimo_app_config_path'], 'r') as f:
config = yaml.load(f)
try:
nuimo = config['nuimos'][mac_address]
except (KeyError, TypeError):
return HTTPNotFound("No Nuimo with such ID")
components = nuimo['components']
try:
return next(c for c in components if c['id'] == component_id)
except StopIteration:
raise HTTPNotFound
def delete_nuimo_component_view(request):
component_id = request.matchdict['component_id']
mac_address = request.matchdict['mac_address'].replace('-', ':')
with open(request.registry.settings['nuimo_app_config_path'], 'r+') as f:
config = yaml.load(f)
try:
nuimo = config['nuimos'][mac_address]
except (KeyError, TypeError):
return HTTPNotFound("No Nuimo with such ID")
components = nuimo['components']
try:
component = next(c for c in components if c['id'] == component_id)
except StopIteration:
raise HTTPNotFound
components.remove(component)
f.seek(0) # We want to overwrite the config file with the new configuration
f.truncate()
yaml.dump(config, f, default_flow_style=False)
def root_factory(request, user_id=None):
"""The factory function for the root context"""
# OK, this is the old code... I need to do better, but fix first.
from ..models import Discussion
if request.matchdict and 'discussion_id' in request.matchdict:
discussion_id = int(request.matchdict['discussion_id'])
discussion = Discussion.default_db.query(Discussion).get(discussion_id)
if not discussion:
raise HTTPNotFound("No discussion ID %d" % (discussion_id,))
return discussion
elif request.matchdict and 'discussion_slug' in request.matchdict:
discussion_slug = request.matchdict['discussion_slug']
discussion = Discussion.default_db.query(Discussion).filter_by(
slug=discussion_slug).first()
if not discussion:
raise HTTPNotFound("No discussion named %s" % (discussion_slug,))
return discussion
return app_root_factory(request, user_id)
def delete_idea(request):
idea_id = request.matchdict['id']
idea = Idea.get_instance(idea_id)
if not idea:
raise HTTPNotFound("Idea with id '%s' not found." % idea_id)
if isinstance(idea, RootIdea):
raise HTTPBadRequest("Cannot delete root idea.")
num_childrens = len(idea.children)
if num_childrens > 0:
raise HTTPBadRequest("Idea cannot be deleted because it still has %d child ideas." % num_childrens)
num_extracts = len(idea.extracts)
if num_extracts > 0:
raise HTTPBadRequest("Idea cannot be deleted because it still has %d extracts." % num_extracts)
for link in idea.source_links:
link.is_tombstone = True
idea.is_tombstone = True
# Maybe return tombstone() ?
request.response.status = HTTPNoContent.code
return HTTPNoContent()
def get_idea_extracts(request):
discussion = request.context
idea_id = request.matchdict['id']
idea = Idea.get_instance(idea_id)
view_def = request.GET.get('view') or 'default'
user_id = authenticated_userid(request) or Everyone
permissions = request.permissions
if not idea:
raise HTTPNotFound("Idea with id '%s' not found." % idea_id)
extracts = Extract.default_db.query(Extract).filter(
Extract.idea_id == idea.id
).order_by(Extract.order.desc())
return [extract.generic_json(view_def, user_id, permissions)
for extract in extracts]
def get_agent(request):
view_def = request.GET.get('view') or 'default'
agent_id = request.matchdict['id']
agent = AgentProfile.get_instance(agent_id)
if not agent:
raise HTTPNotFound("Agent with id '%s' not found." % agent_id)
discussion = request.context
user_id = authenticated_userid(request) or Everyone
permissions = request.permissions
agent_json = agent.generic_json(view_def, user_id, permissions)
if user_id == agent.id:
# We probably should add all profile info here.
agent_json['preferred_email'] = agent.get_preferred_email()
return agent_json
def get_object(request):
classname = request.matchdict['cls']
id = request.matchdict['id']
view = request.matchdict['view'] or '/default'
view = view[1:]
cls = getattr(assembl.models, classname, None)
if not cls:
raise HTTPNotFound("Class '%s' not found." % classname)
obj = cls.get(id)
if not obj:
raise HTTPNotFound("Id %s of class '%s' not found." % (id, classname))
if not get_view_def(view):
raise HTTPNotFound("View '%s' not found." % view)
user_id = authenticated_userid(request) or Everyone
permissions = request.permissions
return obj.generic_json(view, user_id, permissions)
def get_synthesis(request):
synthesis_id = request.matchdict['id']
discussion = request.context
if synthesis_id == 'next_synthesis':
synthesis = discussion.get_next_synthesis()
else:
synthesis = Synthesis.get_instance(synthesis_id)
if not synthesis:
raise HTTPNotFound("Synthesis with id '%s' not found." % synthesis_id)
view_def = request.GET.get('view') or 'default'
user_id = authenticated_userid(request) or Everyone
return synthesis.generic_json(view_def, user_id, request.permissions)
# Update
def register(request):
"""Set the login route and view."""
if request.method == "GET":
return {}
if request.method == "POST":
username = request.POST['username']
password = request.POST['password']
password_check = request.POST['password-check']
check_username = request.dbsession.query(User.username).filter(User.username == username).one_or_none()
if not username or not password:
return {'error': 'Please provide a username and password.'}
if check_username is None:
if password == password_check:
new_user = User(
username=username,
password=hash_password(password)
)
request.dbsession.add(new_user)
return HTTPFound(
location=request.route_url('login'),
detail='Registration successful!'
)
else:
return HTTPNotFound({'error': 'Passwords do not match.'})
return {'error': 'Username already in use.'}
def submission_media_(request):
link_type = request.matchdict['linktype']
submitid = int(request.matchdict['submitid'])
if link_type == "submissions":
link_type = "submission"
submission = Submission.query.get(submitid)
if submission is None:
raise httpexceptions.HTTPForbidden()
elif submission.is_hidden or submission.is_friends_only:
raise httpexceptions.HTTPForbidden()
media_items = media.get_submission_media(submitid)
if not media_items.get(link_type):
raise httpexceptions.HTTPNotFound()
return Response(headerlist=[
('X-Accel-Redirect', str(media_items[link_type][0]['file_url']),),
('Cache-Control', 'max-age=0',),
])
def item_view(context, request):
frame = request.params.get('frame', 'page')
if getattr(request, '__parent__', None) is None:
# We need the response headers from non subrequests
try:
response = render_view_to_response(context, request, name=frame)
except PredicateMismatch:
# Avoid this view emitting PredicateMismatch
exc_class, exc, tb = sys.exc_info()
exc.__class__ = HTTPNotFound
raise_with_traceback(exc, tb)
else:
if response is None:
raise HTTPNotFound('?frame=' + frame)
return response
path = request.resource_path(context, '@@' + frame)
if request.query_string:
path += '?' + request.query_string
return request.embed(path, as_user=True)
def corpus(self):
""" Return a corpus based on environment.
It will try to return it from cache, otherwise load it from disk.
If corpus hasn't been extracted from the document, it will redirect to
a corpus creation tool.
"""
corpus = get_corpus(self.request)
if corpus is None:
raise exc.HTTPNotFound()
return corpus
def corpus(self):
""" Return a corpus based on environment.
It will try to return it from cache, otherwise load it from disk.
"""
corpus = get_corpus(self.request)
if corpus is None:
raise exc.HTTPNotFound()
return corpus
def notfound(context, request):
with JSONAPIResponse(request.response) as resp:
_in = u'Failed'
code, status = JSONAPIResponse.NOT_FOUND
request.response.status_int = code
message = 'Resource not found'
if isinstance(context, HTTPNotFound):
if context.content_type == 'application/json':
return context
elif context.detail:
message = context.detail
return resp.to_json(
_in, code=code,
status=status, message=message)
def test_invalid_page(self):
request = testing.DummyRequest()
request.params['page'] = 'invalid'
request.current_route_url = mock.Mock(side_effect=self.get_current_url)
self.assertRaises(HTTPNotFound, self.paginate_queryset, request)
def test_invalid_page(self):
request = testing.DummyRequest()
request.params['page'] = 'invalid'
request.current_route_url = mock.Mock(side_effect=self.get_current_url)
with pytest.raises(HTTPNotFound):
self.paginate_queryset(request)
def test_invalid_page(self):
request = testing.DummyRequest()
request.params['page'] = 'invalid'
request.current_route_url = mock.Mock(side_effect=self.get_current_url)
with pytest.raises(HTTPNotFound):
self.paginate_queryset(request)
def test_get_object_not_found(self):
view = UserAPIView()
view.request = self.request
view.lookup_url_kwargs = {'id': 3}
self.assertRaises(HTTPNotFound, view.get_object)
def get_object(self):
query = self.filter_query(self.get_query())
# If query joins more than one table and you need to base the lookup on something besides
# an id field on the self.model, you can provide an alternative lookup as tuple of the model class
# and a string of the column name.
if isinstance(self.lookup_field, str):
lookup_col = getattr(self.model, self.lookup_field)
lookup_val = self.lookup_url_kwargs[self.lookup_field]
else:
assert isinstance(self.lookup_field, tuple), (
"'{}' `lookup_field` attribute should be a string or a tuple of (<model class>, `column`) "
.format(self.__class__.__name__)
)
lookup_col = getattr(self.lookup_field[0], self.lookup_field[1])
lookup_val = self.lookup_url_kwargs[self.lookup_field[1]]
try:
instance = query.filter(lookup_col == lookup_val).one()
except NoResultFound:
raise HTTPNotFound()
# May raise HTTPForbidden
self.check_object_permissions(self.request, instance)
return instance
def test_add_report_to_db_pg_auth_rep_group_lang_none():
"""
This should raise HTTPNotFound as all page, author, reporter, group, and language is None.
"""
request = _mock_request(authenticated_user=mock.Mock(username="test"),
matchdict={'public_language_id': '12345',
'public_group_id': '12345',
'user_id': '12345',
'page_uri': 'http://www.annotran_test.com/'})
with pytest.raises(exc.HTTPNotFound):
views.add_report(request)