def _should_use_fr_error_handler(self):
""" Determine if error should be handled with FR or default Flask
The goal is to return Flask error handlers for non-FR-related routes,
and FR errors (with the correct media type) for FR endpoints. This
method currently handles 404 and 405 errors.
:return: bool
"""
adapter = current_app.create_url_adapter(request)
try:
adapter.match()
except MethodNotAllowed as e:
# Check if the other HTTP methods at this url would hit the Api
valid_route_method = e.valid_methods[0]
rule, _ = adapter.match(method=valid_route_method, return_rule=True)
return self.owns_endpoint(rule.endpoint)
except NotFound:
return self.catch_all_404s
except:
# Werkzeug throws other kinds of exceptions, such as Redirect
pass
python类MethodNotAllowed()的实例源码
def _get_twilio_arguments(self, request):
if request.method == 'GET':
from_number = request.args.get('From')
text = request.args.get('Body')
if from_number is None or text is None:
raise BadRequest('Both `From` and `Body` '
'parameters are mandatory.')
return from_number, text
elif request.method == 'POST':
from_number = request.form.get('From', None)
text = request.form.get('Body', None)
if from_number is None or text is None:
raise BadRequest('Both `From` and `Body` '
'parameters are mandatory.')
return from_number, text
else:
raise MethodNotAllowed('Only `GET` and `POST` methods are allowed.')
def _get_plivo_arguments(self, request):
if request.method == 'GET':
from_number = request.args.get('From')
text = request.args.get('Text')
if from_number is None or text is None:
raise BadRequest('Both `From` and `Text` '
'parameters are mandatory.')
return from_number, text
elif request.method == 'POST':
from_number = request.form.get('From', None)
text = request.form.get('Text', None)
if from_number is None or text is None:
raise BadRequest('Both `From` and `Text` '
'parameters are mandatory.')
return from_number, text
else:
raise MethodNotAllowed('Only `GET` and `POST` methods are allowed.')
def register_error_handlers(self):
super(PillarServer, self).register_error_handlers()
# Register error handlers per code.
for code in (403, 404, 412, 500):
self.register_error_handler(code, self.pillar_error_handler)
# Register error handlers per exception.
from pillarsdk import exceptions as sdk_exceptions
sdk_handlers = [
(sdk_exceptions.UnauthorizedAccess, self.handle_sdk_unauth),
(sdk_exceptions.ForbiddenAccess, self.handle_sdk_forbidden),
(sdk_exceptions.ResourceNotFound, self.handle_sdk_resource_not_found),
(sdk_exceptions.ResourceInvalid, self.handle_sdk_resource_invalid),
(sdk_exceptions.MethodNotAllowed, self.handle_sdk_method_not_allowed),
(sdk_exceptions.PreconditionFailed, self.handle_sdk_precondition_failed),
]
for (eclass, handler) in sdk_handlers:
self.register_error_handler(eclass, handler)
def __fake_request_url_rule(self, method: str, url_path: str):
"""Tries to force-set the request URL rule.
This is required by Eve (since 0.70) to be able to construct a
Location HTTP header that points to the resource item.
See post_internal, put_internal and patch_internal.
"""
import werkzeug.exceptions as wz_exceptions
with self.test_request_context(method=method, path=url_path) as ctx:
try:
rule, _ = ctx.url_adapter.match(url_path, method=method, return_rule=True)
except (wz_exceptions.MethodNotAllowed, wz_exceptions.NotFound):
# We're POSTing things that we haven't told Eve are POSTable. Try again using the
# GET method.
rule, _ = ctx.url_adapter.match(url_path, method='GET', return_rule=True)
current_request = request._get_current_object()
current_request.url_rule = rule
yield ctx
def _should_use_fr_error_handler(self):
""" Determine if error should be handled with FR or default Flask
The goal is to return Flask error handlers for non-FR-related routes,
and FR errors (with the correct media type) for FR endpoints. This
method currently handles 404 and 405 errors.
:return: bool
"""
adapter = current_app.create_url_adapter(request)
try:
adapter.match()
except MethodNotAllowed as e:
# Check if the other HTTP methods at this url would hit the Api
valid_route_method = e.valid_methods[0]
rule, _ = adapter.match(method=valid_route_method, return_rule=True)
return self.owns_endpoint(rule.endpoint)
except NotFound:
return self.catch_all_404s
except:
# Werkzeug throws other kinds of exceptions, such as Redirect
pass
def _should_use_fr_error_handler(self):
""" Determine if error should be handled with FR or default Flask
The goal is to return Flask error handlers for non-FR-related routes,
and FR errors (with the correct media type) for FR endpoints. This
method currently handles 404 and 405 errors.
:return: bool
"""
adapter = current_app.create_url_adapter(request)
try:
adapter.match()
except MethodNotAllowed as e:
# Check if the other HTTP methods at this url would hit the Api
valid_route_method = e.valid_methods[0]
rule, _ = adapter.match(method=valid_route_method, return_rule=True)
return self.owns_endpoint(rule.endpoint)
except NotFound:
return self.catch_all_404s
except:
# Werkzeug throws other kinds of exceptions, such as Redirect
pass
def post(self):
raise MethodNotAllowed(valid_methods=['GET'])
def delete(self, oid=None):
raise MethodNotAllowed(valid_methods=['GET'])
def put(self, oid=None):
raise MethodNotAllowed(valid_methods=['GET'])
def post(self):
try:
raise MethodNotAllowed
except MethodNotAllowed as e:
return error.format_exception(
e,
target=self.__class__.__name__.lower(),
action='POST')
def delete(self, oid=None):
try:
raise MethodNotAllowed
except MethodNotAllowed as e:
return error.format_exception(
e,
target=self.__class__.__name__.lower(),
action='DEL')
def test_not_allowed_methods(self):
"""Test POST, DELETE, PUT methods are not allowed for resource token"""
token_api_instance = TokenAPI()
post_response = self.app.post('/api/token')
assert post_response.status_code == 405, post_response.status_code
assert_raises(MethodNotAllowed, token_api_instance.post)
delete_response = self.app.delete('/api/token')
assert delete_response.status_code == 405, delete_response.status_code
assert_raises(MethodNotAllowed, token_api_instance.delete)
put_response = self.app.put('/api/token')
assert put_response.status_code == 405, put_response.status_code
assert_raises(MethodNotAllowed, token_api_instance.put)
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def test_aborter(self):
abort = exceptions.abort
self.assert_raises(exceptions.BadRequest, abort, 400)
self.assert_raises(exceptions.Unauthorized, abort, 401)
self.assert_raises(exceptions.Forbidden, abort, 403)
self.assert_raises(exceptions.NotFound, abort, 404)
self.assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD'])
self.assert_raises(exceptions.NotAcceptable, abort, 406)
self.assert_raises(exceptions.RequestTimeout, abort, 408)
self.assert_raises(exceptions.Gone, abort, 410)
self.assert_raises(exceptions.LengthRequired, abort, 411)
self.assert_raises(exceptions.PreconditionFailed, abort, 412)
self.assert_raises(exceptions.RequestEntityTooLarge, abort, 413)
self.assert_raises(exceptions.RequestURITooLarge, abort, 414)
self.assert_raises(exceptions.UnsupportedMediaType, abort, 415)
self.assert_raises(exceptions.UnprocessableEntity, abort, 422)
self.assert_raises(exceptions.InternalServerError, abort, 500)
self.assert_raises(exceptions.NotImplemented, abort, 501)
self.assert_raises(exceptions.BadGateway, abort, 502)
self.assert_raises(exceptions.ServiceUnavailable, abort, 503)
myabort = exceptions.Aborter({1: exceptions.NotFound})
self.assert_raises(LookupError, myabort, 404)
self.assert_raises(exceptions.NotFound, myabort, 1)
myabort = exceptions.Aborter(extra={1: exceptions.NotFound})
self.assert_raises(exceptions.NotFound, myabort, 404)
self.assert_raises(exceptions.NotFound, myabort, 1)
def test_special_exceptions(self):
exc = exceptions.MethodNotAllowed(['GET', 'HEAD', 'POST'])
h = dict(exc.get_headers({}))
self.assert_equal(h['Allow'], 'GET, HEAD, POST')
self.assert_true('The method is not allowed' in exc.get_description())
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def get_headers(self, environ=None):
headers = DecapodJSONMixin.get_headers(self, environ)
headers.extend(exceptions.MethodNotAllowed.get_headers(self, environ))
return headers
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def allowed_methods(self, path_info=None):
"""Returns the valid methods that match for a given path.
.. versionadded:: 0.7
"""
try:
self.match(path_info, method='--')
except MethodNotAllowed as e:
return e.valid_methods
except HTTPException as e:
pass
return []
def patch_node(node_id):
# Parse the request
node_id = str2id(node_id)
patch = request.get_json()
# Find the node type.
node = mongo.find_one_or_404('nodes', node_id,
projection={'node_type': 1})
try:
node_type = node['node_type']
except KeyError:
msg = 'Node %s has no node_type property' % node_id
log.warning(msg)
raise wz_exceptions.InternalServerError(msg)
log.debug('User %s wants to PATCH %s node %s',
authentication.current_user_id(), node_type, node_id)
# Find the PATCH handler for the node type.
try:
patch_handler = custom.patch_handlers[node_type]
except KeyError:
log.info('No patch handler for node type %r', node_type)
raise wz_exceptions.MethodNotAllowed('PATCH on node type %r not allowed' % node_type)
# Let the PATCH handler do its thing.
return patch_handler(node_id, patch)
def handle_sdk_method_not_allowed(self, error):
"""Forwards 405 Method Not Allowed to the client.
This is actually not fair, as a 405 between Pillar and Pillar-Web
doesn't imply that the request the client did on Pillar-Web is not
allowed. However, it does allow us to debug this if it happens, by
watching for 405s in the browser.
"""
from flask import request
self.log.info('Forwarding MethodNotAllowed exception to client: %s', error, exc_info=True)
self.log.info('HTTP Referer is %r', request.referrer)
# Raising a Werkzeug 405 exception doens't work, as Flask turns it into a 500.
return 'The requested HTTP method is not allowed on this URL.', 405
def run(self, url, order):
from flask import current_app
from werkzeug.exceptions import NotFound, MethodNotAllowed
rows = []
column_length = 0
column_headers = ('Rule', 'Endpoint', 'Arguments')
if url:
try:
rule, arguments = current_app.url_map \
.bind('localhost') \
.match(url, return_rule=True)
rows.append((rule.rule, rule.endpoint, arguments))
column_length = 3
except (NotFound, MethodNotAllowed) as e:
rows.append(("<%s>" % e, None, None))
column_length = 1
else:
rules = sorted(current_app.url_map.iter_rules(), key=lambda rule: getattr(rule, order))
for rule in rules:
rows.append((rule.rule, rule.endpoint, None))
column_length = 2
str_template = ''
table_width = 0
if column_length >= 1:
max_rule_length = max(len(r[0]) for r in rows)
max_rule_length = max_rule_length if max_rule_length > 4 else 4
str_template += '%-' + str(max_rule_length) + 's'
table_width += max_rule_length
if column_length >= 2:
max_endpoint_length = max(len(str(r[1])) for r in rows)
# max_endpoint_length = max(rows, key=len)
max_endpoint_length = max_endpoint_length if max_endpoint_length > 8 else 8
str_template += ' %-' + str(max_endpoint_length) + 's'
table_width += 2 + max_endpoint_length
if column_length >= 3:
max_arguments_length = max(len(str(r[2])) for r in rows)
max_arguments_length = max_arguments_length if max_arguments_length > 9 else 9
str_template += ' %-' + str(max_arguments_length) + 's'
table_width += 2 + max_arguments_length
print(str_template % (column_headers[:column_length]))
print('-' * table_width)
for row in rows:
print(str_template % row[:column_length])
def urls(url, order):
"""Display all of the url matching routes for the project.
Borrowed from Flask-Script, converted to use Click.
"""
rows = []
column_length = 0
column_headers = ('Rule', 'Endpoint', 'Arguments')
if url:
try:
rule, arguments = (
current_app.url_map
.bind('localhost')
.match(url, return_rule=True))
rows.append((rule.rule, rule.endpoint, arguments))
column_length = 3
except (NotFound, MethodNotAllowed) as e:
rows.append(('<{}>'.format(e), None, None))
column_length = 1
else:
rules = sorted(
current_app.url_map.iter_rules(),
key=lambda rule: getattr(rule, order))
for rule in rules:
rows.append((rule.rule, rule.endpoint, None))
column_length = 2
str_template = ''
table_width = 0
if column_length >= 1:
max_rule_length = max(len(r[0]) for r in rows)
max_rule_length = max_rule_length if max_rule_length > 4 else 4
str_template += '{:' + str(max_rule_length) + '}'
table_width += max_rule_length
if column_length >= 2:
max_endpoint_length = max(len(str(r[1])) for r in rows)
# max_endpoint_length = max(rows, key=len)
max_endpoint_length = (
max_endpoint_length if max_endpoint_length > 8 else 8)
str_template += ' {:' + str(max_endpoint_length) + '}'
table_width += 2 + max_endpoint_length
if column_length >= 3:
max_arguments_length = max(len(str(r[2])) for r in rows)
max_arguments_length = (
max_arguments_length if max_arguments_length > 9 else 9)
str_template += ' {:' + str(max_arguments_length) + '}'
table_width += 2 + max_arguments_length
click.echo(str_template.format(*column_headers[:column_length]))
click.echo('-' * table_width)
for row in rows:
click.echo(str_template.format(*row[:column_length]))
def urls(url, order):
"""Display all of the url matching routes for the project."""
rows = []
column_length = 0
column_headers = ('Rule', 'Endpoint', 'Arguments')
if url:
try:
rule, arguments = (
current_app.url_map
.bind('localhost')
.match(url, return_rule=True))
rows.append((rule.rule, rule.endpoint, arguments))
column_length = 3
except (NotFound, MethodNotAllowed) as e:
rows.append((f'<{e}>', None, None))
column_length = 1
else:
rules = sorted(
current_app.url_map.iter_rules(),
key=lambda rule: getattr(rule, order))
for rule in rules:
rows.append((rule.rule, rule.endpoint, None))
column_length = 2
str_template = ''
table_width = 0
if column_length >= 1:
max_rule_length = max(len(r[0]) for r in rows)
max_rule_length = max_rule_length if max_rule_length > 4 else 4
str_template += '{:' + str(max_rule_length) + '}'
table_width += max_rule_length
if column_length >= 2:
max_endpoint_length = max(len(str(r[1])) for r in rows)
max_endpoint_length = (
max_endpoint_length if max_endpoint_length > 8 else 8)
str_template += ' {:' + str(max_endpoint_length) + '}'
table_width += 2 + max_endpoint_length
if column_length >= 3:
max_arguments_length = max(len(str(r[2])) for r in rows)
max_arguments_length = (
max_arguments_length if max_arguments_length > 9 else 9)
str_template += ' {:' + str(max_arguments_length) + '}'
table_width += 2 + max_arguments_length
click.echo(str_template.format(*column_headers[:column_length]))
click.echo('-' * table_width)
for row in rows:
click.echo(str_template.format(*row[:column_length]))