def schema_request(media_type_name):
def wrapper(fn):
@wraps(fn)
def decorated(*args, **kwargs):
if isinstance(media_type_name, basestring):
schema = get_schema_for_media_type(media_type_name)
else:
schema = media_type_name
json_object = request.get_json()
try:
validate(json_object, schema)
except ValidationError as e:
report = generate_validation_error_report(e, json_object)
abort(400, description=report)
return fn(*args, **kwargs)
return decorated
return wrapper
python类abort()的实例源码
def put(self, message_id):
"""Edit an existing message.
Argument:
message_id (int): --
"""
text = get_valid_json(self.SCHEMA)['text']
result = db.session.query(models.Message).get(message_id)
# we don't want people editing posts which don't
# belong to them...
if result.user.username == auth.username():
result.text = text
db.session.commit()
return self.get(message_id)
else:
message = "You are not this message's author."
flask_restful.abort(400, message=message)
def delete(self, message_id):
"""Delete a specific message.
Argument:
message_id (int): --
"""
result = db.session.query(models.Message).get(message_id)
if result is None:
flask_restful.abort(404, message="No post by id %d" % message_id)
if result.user.username == auth.username():
db.session.delete(result)
db.session.commit()
return {}
else:
message = "You're not the author of message %d" % message_id
flask_restful.abort(401, message=message)
def post(self, args):
email = args['email']
password = args['password']
# Check the info!
user_from_db = users.User.query.filter_by(email=email).first()
if user_from_db is None:
# FIXME: bad message
flask_restful.abort(401, message={'error': 'failed login'})
elif flask_security.utils.verify_password(password,
user_from_db.password):
return {
'access_token': create_access_token(identity=email),
}
else:
# FIXME: terrible message.
flask_restful.abort(401, message={'error': 'incorrect email/pass'})
def post(self):
"""
Register a new domain name on this resolver.
Returns a serialization which includes a JSON Web Token which can be used to authorize
updates to this domain mapping in future.
"""
args = new_domain_parser.parse_args()
domain_name = "{}.{}".format(args.domain_name["label"], args.domain_name["zone"])
try:
domain = Domain.create(domain_name=domain_name,
zone=args.domain_name["zone"],
public=args.public)
except exc.IntegrityError:
return abort(422,
message={'domain_name': "Domain {} already exists".format(domain_name)})
return domain
def post(self, name):
if not current_user.is_authenticated:
return abort(401, message='permission denied')
for topic in current_user.following_topics:
if topic.name == name:
return topic
topic = topics_service.get(name)
if topic is None:
return abort(400, message='topic not found')
try:
current_user.following_topics.append(topic)
topic.followers_count += 1
print topic.followers_count
db.session.commit()
except:
db.session.rollback()
return abort(500, message='operation failed')
return topic
def delete(self, name):
topic = None
if not current_user.is_authenticated:
return abort(401, message='permission denied')
for topic in current_user.following_topics:
if topic.name == name:
break
else:
return abort(400, message='topic not found')
try:
current_user.following_topics.remove(topic)
if topic.followers_count > 0:
topic.followers_count -= 1
db.session.commit()
except:
db.session.rollback()
return abort(500, message='operation failed')
def handle_validation_error(self, error, bundle_errors):
"""Called when an error is raised while parsing. Aborts the request
with a 400 status and an error message
:param error: the error that was raised
:param bundle_errors: do not abort when first error occurs, return a
dict with the name of the argument and the error message to be
bundled
"""
error_str = six.text_type(error)
error_msg = self.help.format(error_msg=error_str) if self.help else error_str
msg = {self.name: error_msg}
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return error, msg
flask_restful.abort(400, message=msg)
def put(group_id):
try:
args = put_parser.parse_args()
except BadRequest:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
user_id = id_from_token(args['token'])
if user_id is None:
return abort(403, message="Token is not allowed to view this group")
cursor.execute("SELECT id FROM groups_rel WHERE user_id=%s AND groups_rel.group_id=%s",
[user_id, group_id])
group = cursor.fetchone()
if group is None:
return abort(403, message="Token is not allowed to view this group")
cursor.execute("INSERT INTO group_recipe_vote_log (grecipe_id, user_id, action) VALUES "
"((SELECT id FROM group_recipes WHERE group_id=%s AND group_recipes.recipe_id=%s),"
" %s, %s)",
[group_id, args['recipe_id'], user_id, args['action']])
return None, 200
def get(ean):
with db:
with db.cursor() as cursor:
cursor.execute("SELECT name, type FROM products"
" WHERE ean=%s", (ean,))
product = cursor.fetchone()
if product is not None:
return {
"ean": ean,
"name": product[0],
"type": product[1]
}
else:
result = request_product(ean)
if result is not None:
Product.add_product(result['ean'], result['name'], result['type'])
return result
abort(404, message="Product with EAN {} does not exist.".format(ean))
def put(self, ean):
try:
args = parser.parse_args()
args['type'] = int(args['type'])
except Exception:
return abort(400, message="Invalid arguments")
if args['name'] is None or len(args['name']) == 0:
args['name'] = "Unknown"
with db:
with db.cursor() as cursor:
cursor.execute("SELECT id FROM product_types WHERE id=%s", [args['type']])
if cursor.fetchone() is None:
abort(400, message="Invalid arguments, product type does not exist.")
cursor.execute("SELECT name, type FROM products WHERE ean=%s", [ean, ])
if cursor.fetchone() is None:
self.add_product(ean, args['name'], args['type'])
return None, 201
else:
cursor.execute("UPDATE products SET name=%s, type=%s WHERE ean=%s",
(args['name'], args['type'], ean))
db.commit()
return None, 200
def put(self):
try:
args = parser.parse_args()
except BadRequest:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
user_id = get_or_create_id(args['token'])
cursor.execute(u"UPDATE users SET firebase_token=%s WHERE id=%s",
[args['firebase_token'], user_id[0]])
if user_id[1]:
return None, 201
else:
return None, 200
def get(self):
try:
args = parser.parse_args()
except Exception:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
user_id = get_or_create_id(args['token'])
cursor.execute(
"SELECT fridge_items.ean, products.name, products.type FROM fridge_items"
" JOIN products ON products.ean=fridge_items.ean"
" WHERE fridge_items.user_id=%s",
[user_id[0]])
response = []
for item in cursor.fetchall():
response.append({'ean': item[0],
'name': item[1],
'type': item[2]})
return response, 200
def put(self, ean):
try:
args = parser.parse_args()
except Exception:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
user_id = get_or_create_id(args['token'])
cursor.execute(
"SELECT products.name, products.type FROM fridge_items"
" JOIN products ON products.ean=fridge_items.ean"
" WHERE fridge_items.user_id=%s AND fridge_items.ean=%s",
[user_id[0], ean])
query = cursor.fetchone()
if query is None:
p = Product.get(ean) # Produces 404 if not exists
cursor.execute("INSERT INTO fridge_items (ean, user_id) VALUES (%s, %s)", [ean, user_id[0]])
return p, 201
else:
return {
"name": query[0],
"type": query[1]
}, 200
def put(self, group_id):
try:
args = putParser.parse_args()
except BadRequest:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
user_id = id_from_token(args['token'])
if user_id is None:
return abort(403, message="Token is not allowed to view this group")
cursor.execute("SELECT id FROM groups_rel WHERE user_id=%s AND groups_rel.group_id=%s",
[user_id, group_id])
rel = cursor.fetchone()
if rel is None:
return abort(404, message="Group not found")
cursor.execute("UPDATE groups_rel SET accepted=%s WHERE id=%s", [args['accept'], rel[0]])
return 200
def get(self):
try:
args = parser.parse_args()
except BadRequest:
return abort(400, message="Invalid arguments")
with db:
with db.cursor() as cursor:
cursor.execute("SELECT name FROM users WHERE id=%s", [id_from_token(args['token'])])
query = cursor.fetchone()
if query is None:
return None, 404
else:
return {
"name": query[0]
}
def put(self):
try:
args = parser.parse_args()
except BadRequest:
return abort(400, message="Invalid arguments")
if 'resend_all' not in args:
return abort(200, message="")
if args['resend_all']:
with db:
with db.cursor() as cursor:
user_id = id_from_token(args['token'])
if user_id is None:
return abort(400, message="Invalid arguments")
cursor.execute("UPDATE groups_rel SET invited=FALSE WHERE id = (SELECT groups_rel.id FROM groups_rel "
"JOIN groups ON groups_rel.group_id = groups.id "
"WHERE groups_rel.user_id=%s AND groups.day = CURRENT_DATE)",
[user_id])
send_invitations()
return 200
def handle_validation_error(self, error, bundle_errors):
"""Called when an error is raised while parsing. Aborts the request
with a 400 status and an error message
:param error: the error that was raised
:param bundle_errors: do not abort when first error occurs, return a
dict with the name of the argument and the error message to be
bundled
"""
error_str = six.text_type(error)
error_msg = self.help.format(error_msg=error_str) if self.help else error_str
msg = {self.name: "{0}".format(error_msg)}
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return error, msg
flask_restful.abort(400, message=msg)
def handle_validation_error(self, error, bundle_errors):
"""Called when an error is raised while parsing. Aborts the request
with a 400 status and an error message
:param error: the error that was raised
:param bundle_errors: do not abort when first error occurs, return a
dict with the name of the argument and the error message to be
bundled
"""
error_str = six.text_type(error)
error_msg = self.help.format(error_msg=error_str) if self.help else error_str
msg = {self.name: "{0}".format(error_msg)}
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return error, msg
flask_restful.abort(400, message=msg)
def check_schema(json_object, schema, title=None):
"""Do json schema check on object and abort with 400 error if it fails."""
try:
validate(json_object, schema)
except ValidationError as e:
report = generate_validation_error_report(e, json_object)
if title:
report = "Schema check failed: %s\n%s" % (title, report)
log.info("Schema validation failure\n%s", report)
abort(400, description=report)
def post(self):
"""Create a new user.
If a user already exists with the provided
username, an HTTP 400 is sent.
Returns:
dict: If the user was successfully created,
return a dictionary describing that
created user.
None: If aborted.
"""
json_data = get_valid_json(self.SCHEMA_POST)
bio = json_data.get('bio')
username = json_data['username']
password = json_data['password']
new_user = models.User(username, password, bio=bio)
db.session.add(new_user)
try:
db.session.commit()
except sqlalchemy.exc.IntegrityError:
message = "A user already exists with username: %s" % username
flask_restful.abort(400, message=message)
else:
# will happen only if successful/no
# error raised
return new_user.to_dict()
def get(self):
"""Get a range of messages using a "limit"
and an "offset."
Returns:
list: list of dictionaries describing
messages, if successful.
None: If aborted.
"""
json_data = get_valid_json(self.SCHEMA_GET)
offset = int(json_data['offset'])
limit = int(json_data['limit'])
# Just make sure not requesting too many at once!
if limit > config.LIMITS_MESSAGES_GET_LIMIT:
message = ("You may only request %d messages at once."
% config.LIMITS_MESSAGES_GET_LIMIT)
flask_restful.abort(400, message=message)
# Now we're sure we have the right data to
# make a query, actually do that query and
# return said data or 404 if nothing matches.
query = db.session.query(models.Message)
results = query.limit(limit).offset(offset).all()
if results == []:
message = ("No messages found at offset %d limit %d"
% (offset, limit))
flask_restful.abort(404, message=message)
else:
return [r.to_dict() for r in results]
def auth_error():
flask_restful.abort(401, message="Unathorized")
def get_valid_json(schema):
json_data = flask.request.get_json(force=True)
try:
jsonschema.validate(json_data, schema)
except jsonschema.ValidationError as e:
flask_restful.abort(400, message=e.message)
else:
return json_data
def handle_arg_schema_error(error):
"""RESTful error regarding webarg schema mismatch (invalidation).
Arguments:
error (???): An error object from webargs, passed to this
function when a request's arguments are not valid according to
the specified webargs schema (a dict, see: use_args decorator).
Returns:
json "error"...
"""
flask_restful.abort(400, validationError=error.messages)
def get(self):
return abort(501)
def get(self, storageid):
return abort(501)
def get(self, nodeid):
return abort(http_client.NOT_IMPLEMENTED)
def post(self):
"""
Add a new proxy to the proxy list
"""
args = new_proxy_parser.parse_args()
ip_address = args.ip_address
try:
proxy = Proxy.create(ip_address=str(ip_address),
ip_type=str(ip_address.version))
except exc.IntegrityError:
return abort(422, message="Proxy {} already exists".format(str(ip_address)))
return proxy
def post(self, domain_name):
"""
Create a new DNS record on this domain
The request must be validated with the domain token.
"""
probable_onion_mapping = None
args = new_record_parser.parse_args()
num_records = Record.query.filter_by(domain=g.domain).count()
if num_records >= current_app.config["MAX_RECORDS"]:
return abort(403, message="This domain has had reach the DNS record limit. You cannot "
"add more without removing some existing records")
# Record address on domain if this looks like an onion mapping
if is_onion_record(args.value) and args.type == "TXT":
probable_onion_mapping = args.value.split("=")[1]
try:
record = Record.create(domain=g.domain,
label=args.label,
ttl=args.ttl or current_app.config["TXT_RECORD_TTL"],
record_type=args.type,
value=args.value,
is_onion_mapping=probable_onion_mapping)
except exc.IntegrityError:
return abort(422, message="An unknown error occurred when trying to create "
"this record")
# Guess that the user is updating their onion address if its a valid
# onion=theonionaddress.onion type TXT record
if probable_onion_mapping:
# Update the convenience onion_address wrapper on the domain
g.domain.update(onion_address=probable_onion_mapping,
date_updated=datetime.datetime.utcnow(),
service_online=True,
updated_since_synced=True)
return record