def process_interstitial(request: Request, choices: t.List[Choice], *args, **kwargs):
"""Check if user pressed any of the buttons on form and the choice accordingly.
For example use case see :py:class:`websauna.system.crud.views.Delete`.
:param args: Passed to choice callback
:param kwargs: Passed to choice callback
:return: HTTP response given by a choice callback
"""
assert request.method == "POST"
# Force CSRF check always
check_csrf_token(request)
for c in choices:
if c.id in request.POST:
return c.callback(*args, **kwargs)
raise HTTPBadRequest("Unknown choice made")
python类HTTPBadRequest()的实例源码
def process_model(request):
mongohost = request.registry.settings['MONGOHOST']
domain = request.matchdict.get('domain').replace(' ', '_')
modelName = domain + '_mdl'
gfsm = GridFSModel(modelName=modelName, host=mongohost, port=27017, logger=logger)
model = gfsm.getModelFromGridFS()
if model is None:
raise exc.HTTPBadRequest(explanation="The domain {} does not have a domain model loaded.".format(domain))
w2vmodel = M(modelName=modelName, model=model, logger=logger, verbose=True)
log.info("Created W2VModel:" + modelName)
terms = request.matchdict.get('terms', -1)
w2vmodel.setTerms(terms)
most_similar_terms = w2vmodel.process()
log.info("Processing Terms :" + str(w2vmodel.getTerms()))
return most_similar_terms
def test_it_rejects_invalid_or_missing_urls(self):
invalid_urls = [None,
# Unsupported protocols.
'ftp://foo.bar',
'doi:10.1.2/345',
'file://foo.bar',
# Malformed URLs.
'http://goo\[g']
for url in invalid_urls:
request = mock_request()
request.GET['url'] = url
with pytest.raises(httpexceptions.HTTPBadRequest):
views.goto_url(request)
def __call__(self):
searchers = []
for param_name, param in self.params.items():
if hasattr(PackagesSearchParams, param_name):
param_method = getattr(PackagesSearchParams, param_name)
if (not hasattr(param_method, '_no_param') and
(len(param) == 0 or param[0] == '')):
raise HTTPBadRequest(
detail=Messages.no_values % param_name)
if hasattr(param_method, '_only_one'):
if len(param) > 1:
raise HTTPBadRequest(
detail=Messages.too_many_values % (1, len(param),))
else:
param = param[0]
search = param_method(param)
searchers.append(search)
else:
raise HTTPBadRequest(
detail=Messages.bad_search_param % param_name)
self.searchers = searchers
return searchers
def delete_plugin(request):
"""
Delete A plugin
"""
plugin_id = request.matchdict['plugin_id']
user = request.user
query = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id)
plugin = query.first()
if plugin is None:
raise exc.HTTPBadRequest('No such plugin')
if user != plugin.user and not user.admin:
raise exc.HTTPForbidden("You don't have access to do that")
request.db_session.query(LedSchedule).filter(LedSchedule.led_plugin_id == plugin_id).delete()
request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).delete()
query.delete()
log(request, 'Deleted plugin ' + plugin.name)
return exc.HTTPFound(location='/plugin')
def add_group_users(request):
# make sure the users are in the group:
# only a site admin or group admin can do this
gid = request.matchdict['group_id']
can_modify_group(request, gid)
users = request.POST.get('users', None)
# print request.POST
if users is None:
raise exc.HTTPBadRequest('Please specify users to add to the group')
new_users = []
for user in request.POST.values():
group_user = LedGroupUser(led_group_id=gid, led_user_id=user)
try:
request.db_session.add(group_user)
new_users.append(get_user_by_id(request, user).email)
except sql_exc.DatabaseError as e:
print group_user, "already in group"
log(request, 'Added users to <a href="/group/{0}">group {0}</a>: {1}'.format(gid, ', '.join(new_users)))
return exc.HTTPFound(location='/group/' + gid)
def user_delete(request):
"""
Deletes a user and their group memberships
Their plugins will be handed over to the user that deleted them
:param request:
:return:
"""
# make sure the user actually exists
user_id = request.matchdict['user_id']
query = request.db_session.query(LedUser).filter(LedUser.id == user_id)
user = query.first()
if user is None:
return exc.HTTPBadRequest("No such user exists")
logged_in_user = request.user
for plugin in request.db_session.query(LedPlugin).filter(LedPlugin.user == user).all():
plugin.user_id = logged_in_user.id
request.db_session.query(LedGroupUser).filter(LedGroupUser.led_user == user).delete()
query.delete()
log(request, 'Deleted user '+user.email)
return exc.HTTPFound(location='/users')
def check_url_or_none(self, url):
if url == '':
url = None
if url is not None:
from urllib.parse import urlparse
parsed_url = urlparse(url)
from pyramid.httpexceptions import HTTPBadRequest
if not parsed_url.scheme:
raise HTTPBadRequest(
"The homepage url does not have a scheme. Must be either http or https"
)
if parsed_url.scheme not in (u'http', u'https'):
raise HTTPBadRequest(
"The url has an incorrect scheme. Only http and https are accepted for homepage url"
)
return url
def delete_idea(request):
idea_id = request.matchdict['id']
idea = Idea.get_instance(idea_id)
if not idea:
raise HTTPNotFound("Idea with id '%s' not found." % idea_id)
if isinstance(idea, RootIdea):
raise HTTPBadRequest("Cannot delete root idea.")
num_childrens = len(idea.children)
if num_childrens > 0:
raise HTTPBadRequest("Idea cannot be deleted because it still has %d child ideas." % num_childrens)
num_extracts = len(idea.extracts)
if num_extracts > 0:
raise HTTPBadRequest("Idea cannot be deleted because it still has %d extracts." % num_extracts)
for link in idea.source_links:
link.is_tombstone = True
idea.is_tombstone = True
# Maybe return tombstone() ?
request.response.status = HTTPNoContent.code
return HTTPNoContent()
def put_roles(request):
session = Role.default_db
try:
data = json.loads(request.body)
except Exception as e:
raise HTTPBadRequest("Malformed Json")
if not isinstance(data, list):
raise HTTPBadRequest("Not a list")
if data and frozenset((type(x) for x in data)) != frozenset((str,)):
raise HTTPBadRequest("not strings")
data = set(data)
known_roles = session.query(Role).all()
roles_by_name = {r.name: r for r in known_roles}
role_names = set(roles_by_name.keys())
# new roles
for name in data - role_names:
session.add(Role(name=name))
# delete non-system roles.
for name in role_names - data - SYSTEM_ROLES:
session.delete(roles_by_name[name])
return {"added": list(data - role_names),
"removed": list(role_names - data - SYSTEM_ROLES)}
def save_synthesis(request):
synthesis_id = request.matchdict['id']
discussion = request.context
if synthesis_id == 'next_synthesis':
synthesis = discussion.get_next_synthesis()
else:
synthesis = Synthesis.get_instance(synthesis_id)
if not synthesis:
raise HTTPBadRequest("Synthesis with id '%s' not found." % synthesis_id)
synthesis_data = json.loads(request.body)
synthesis.subject = synthesis_data.get('subject')
synthesis.introduction = synthesis_data.get('introduction')
synthesis.conclusion = synthesis_data.get('conclusion')
Synthesis.default_db.add(synthesis)
Synthesis.default_db.flush()
return {'ok': True, 'id': synthesis.uri()}
def authorize_post_(request):
form = request.web_input(credentials='', username='', password='', remember_me='', mobile='', not_me='')
try:
credentials = json.loads(form.credentials)
except ValueError:
raise HTTPBadRequest()
scopes = credentials.pop('scopes')
error = None
if form.not_me and form.username:
userid, error = login.authenticate_bcrypt(form.username, form.password, bool(form.remember_me))
if error:
error = errorcode.login_errors.get(error, 'Unknown error.')
elif not request.userid:
error = "You must specify a username and password."
else:
userid = request.userid
if error:
return Response(render_form(request, scopes, credentials, bool(form.mobile), error,
form.username, form.password, bool(form.remember_me),
bool(form.not_me)))
credentials['userid'] = userid
response_attrs = server.create_authorization_response(
*(extract_params(request) + (scopes, credentials)))
return OAuthResponse(*response_attrs)
def prepare_search_term(request):
from antlr4 import IllegalStateException
from lucenequery.prefixfields import prefixfields
from lucenequery import dialects
search_term = request.params.get('searchTerm', '').strip() or '*'
if search_term == '*':
return search_term
# avoid interpreting slashes as regular expressions
search_term = search_term.replace('/', r'\/')
# elasticsearch uses : as field delimiter, but we use it as namespace designator
# if you need to search fields you have to use @type:field
# if you need to search fields where the field contains ":", you will have to escape it
# yourself
if search_term.find("@type") < 0:
search_term = search_term.replace(':', '\:')
try:
query = prefixfields('embedded.', search_term, dialects.elasticsearch)
except (IllegalStateException):
msg = "Invalid query: {}".format(search_term)
raise HTTPBadRequest(explanation=msg)
else:
return query.getText()
def process_record(self, new, old=None):
new = super(Subscription, self).process_record(new, old)
try:
WebPusher(new)
except WebPushException as e:
raise http_error(HTTPBadRequest(),
errno=ERRORS.INVALID_PARAMETERS,
message='Invalid subscription: %s' % e)
return new
def portier_verify(request):
"""Helper to redirect client towards Portier login form."""
broker_uri = portier_conf(request, 'broker_uri')
token = request.validated['body']['id_token']
# Get the data from the config because the request might only
# have local network information and not the public facing ones.
audience = '{scheme}://{host}'.format(scheme=request.registry.settings['http_scheme'],
host=request.registry.settings['http_host'])
try:
email, stored_redirect = get_verified_email(
broker_url=broker_uri,
token=token,
audience=audience,
issuer=broker_uri,
cache=request.registry.cache)
except ValueError as exc:
error_details = 'Portier token validation failed: %s' % exc
return http_error(httpexceptions.HTTPBadRequest(),
errno=ERRORS.INVALID_AUTH_TOKEN, error='Invalid Auth Token',
message=error_details)
# Generate a random token
user_token = codecs.encode(os.urandom(32), 'hex').decode('utf-8')
# Encrypt the email with the token
encrypted_email = encrypt(email, user_token)
# Generate a user ID from the token
hmac_secret = request.registry.settings['userid_hmac_secret']
userID = utils.hmac_digest(hmac_secret, user_token)
# Store the encrypted user ID with the token
session_ttl = portier_conf(request, 'session_ttl_seconds')
request.registry.cache.set('portier:' + userID, encrypted_email, session_ttl)
location = '%s%s' % (stored_redirect, user_token)
return httpexceptions.HTTPFound(location=location)
def test_add_existing_report_to_db():
"""
This should not add a new report to the database session as the one already exists.
"""
with mock.patch('annotran.languages.models.Language') as language:
language.get_by_public_language_id = MagicMock(return_value=language)
with mock.patch('annotran.pages.models.Page') as page:
page.get_by_uri = MagicMock(return_value=page)
with mock.patch('h.groups.models.Group') as group:
group.get_by_pubid = MagicMock(return_value=group)
with mock.patch('h.models.User') as user:
user.get_by_username = MagicMock(return_value=user)
with mock.patch('annotran.translations.models.Translation') as translation:
translation.get_translation = MagicMock(return_value=translation)
with mock.patch('annotran.reports.models.Report') as report:
report.get_report = MagicMock(return_value=report)
request = _mock_request(authenticated_user=mock.Mock(username="test"),
matchdict={'public_language_id': '12345',
'public_group_id': '12345',
'user_id': '12345',
'page_uri': 'http://www.annotran_test.com/'})
result = views.add_report(request)
assert not request.db.add.called
assert isinstance(result, exc.HTTPBadRequest)
def gettaxonomy_for_domain(request):
mongohost = request.registry.settings['MONGOHOST']
domain = request.matchdict.get('domain')
log.info('Getting Taxonomy for Domain : domain:' + domain )
ma = MA(host=mongohost, port=27017, logger=logger, db='KnowledgeModelTaxonomy', collection=domain.replace(' ', '_'))
record = ma.getRecord({"domain" : domain})
if record is None:
raise exc.HTTPBadRequest(explanation="The domain {} does not have a taxonomy in the database.".format(domain))
return record
def devices_details_view(request):
device_id = request.matchdict["device_id"]
logger.debug("Getting details for device with ID=%s", device_id)
device_list_path = request.registry.settings['devices_path']
device = get_device(device_list_path, device_id)
# Only Philips Hue has device details at the moment
# TODO: In the future, each single Philips Hue light should be returned as a regular
# device. Philips Hue bridges should be flagged with `virtual: True` since they
# are not controlled by the user. However, all its lights should be returned as
# well when requesting `/devices`.
if device['type'] != 'philips_hue':
return {}
homeassistant_data_path = request.registry.settings["homeassistant_data_path"]
phue_bridge_config = os.path.join(homeassistant_data_path, '{}.conf'.format(device["id"]))
config = read_json(phue_bridge_config, {})
username = config.get(device["ip"], {}).get("username")
try:
bridge = PhilipsHueBridgeApiClient(device["ip"], username)
return bridge.get_lights()
# TODO create a tween to handle exceptions for all views
except UnauthenticatedDeviceError as e:
raise HTTPBadRequest(e.message)
except UpstreamError as e:
raise HTTPBadGateway(e.message)
def test_check_list_of_strs(self):
from hel.utils.query import check_list_of_strs
try:
check_list_of_strs(['test', 555], 'test')
except HTTPBadRequest as e:
self.assertEqual(e.detail, 'test')
else:
raise AssertionError()
check_list_of_strs(['test', 'test2'])
def param(name):
def wrap(func):
def f(self):
tests = func(self)
# Zero values: should fail
with self.subTest(test='0 values'):
try:
PackagesSearcher({
name: []
})()
except HTTPBadRequest as e:
if e.detail == Messages.no_values % name:
# Expected exception
pass
else:
raise e
else:
raise AssertionError()
for test_case in tests:
with self.subTest(test=test_case):
values, expected = test_case
searcher = PackagesSearcher({
name: values
})
searcher()
packages = [x for x in self.db['packages'].find({})]
search_result = searcher.search(packages)
for num, doc in enumerate(search_result):
if '_id' in search_result[num]:
del search_result[num]['_id']
self.assertTrue(are_equal(search_result, expected))
return f
return wrap
def test_bad_search_param(self):
try:
PackagesSearcher({'hi': ['test']})()
except HTTPBadRequest as e:
self.assertEqual(e.detail, Messages.bad_search_param % 'hi')
else:
raise AssertionError()
def check_list_of_strs(value, message=None):
check(value, list, message)
for item in value:
if type(item) != str:
raise HTTPBadRequest(detail=message)
return value
def parse_url(url):
try:
matches = rfc3987.parse(url, rule='URI')
except ValueError:
raise HTTPBadRequest(detail=Messages.invalid_uri)
if matches['scheme'] not in ['http', 'https']:
raise HTTPBadRequest(detail=Messages.invalid_uri)
matches['path'] = matches['path'] or '/'
matches['fragment'] = None
return rfc3987.compose(**matches)
def check_path(path):
if path[-1] == '/':
raise HTTPBadRequest(detail=Messages.bad_path)
return True
def check_filename(name):
if name.find('/') != -1:
raise HTTPBadRequest(detail=Messages.bad_path)
return True
def update(request):
'Update an OData entity'
odata_tablename, etag, odata_dict = common.django_to_odata(request)
odata_metadata = request.registry.settings['odata_metadata']
odata_table = odata_metadata.tables[odata_tablename]
ident = odata_dict.pop(etl_utils.primary_key(odata_table), None)
if ident is None:
raise http_exc.HTTPBadRequest('No identifier provided; pass `id` key')
cdms_client = request.registry.settings['cdms_client']
response = cdms_client.update(
odata_tablename, etag, fmt_guid(ident), odata_dict
)
return common.odata_to_django(odata_tablename, response)
def django_to_odata(request):
'Transform request into spec for “onwards” request to OData service'
django_tablename, odata_tablename = request_tablenames(request)
try:
etag, odata_dict = transform.django_to_odata(
django_tablename, request.json_body
)
except json.JSONDecodeError as exc:
raise http_exc.HTTPBadRequest('Invalid JSON')
return odata_tablename, etag, odata_dict
def approve_plugin_update(request):
plugin_id = request.matchdict['plugin_id']
plugin_update_query = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id)
plugin_update = plugin_update_query.first()
if plugin_update is None:
raise exc.HTTPBadRequest("No such plugin to update")
plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
plugin.code = plugin_update.code
plugin_update_query.delete()
log(request, 'Approved updates to plugin <a href="/plugin/{}">{}</a>'.format(plugin.id, plugin.name))
return exc.HTTPFound(location='/plugin_approve')
def reject_plugin_update(request):
plugin_id = request.matchdict['plugin_id']
plugin_query = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id)
plugin_update = plugin_query.first()
if plugin_update is None:
raise exc.HTTPBadRequest("No proposal for this plugin")
plugin = plugin_update.led_plugin
plugin_query.delete()
log(request, 'Rejected updates to plugin <a href="/plugin/{}">{}</a>'.format(plugin.id, plugin.name))
return exc.HTTPFound(location='/plugin_approve')
def update_plugin(request):
# make sure the plugin id exists
plugin_id = request.matchdict['plugin_id']
plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
if not plugin:
raise exc.HTTPBadRequest("No such plugin")
else:
# only a site admin or plugin owner can edit
# a non-admin plugin owner can suggest changes
user = request.user
if user != plugin.user and not user.admin:
raise exc.HTTPForbidden("You don't have access to do that")
POST = {k: to_null(v) for k, v in request.POST.items()}
if not user.admin:
if 'code' not in POST:
raise exc.HTTPBadRequest("Please provide the code")
plugin_update = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).first()
if plugin_update is not None:
plugin_update.code = POST['code']
else:
request.db_session.add(LedPluginProposed(led_plugin_id=plugin_id, code=POST['code']))
log(request, "Proposed changes to plugin <a href='/plugin/{}'>{}</a>".format(plugin.id, plugin.name))
return {
'msg': 'Your update is now awaiting approval'
}
else:
if POST['code']:
plugin.code = POST['code']
if 'name' in POST and POST['name']:
plugin.name = POST['name']
log(request, 'Updated <a href="/plugin/{0}">plugin {1}</a>: '.format(plugin_id, plugin.name, ))
return {
'msg': 'Updated code'
}