def post(current_user, bucket_id):
"""
Storing an item into a Bucket
:param current_user: User
:param bucket_id: Bucket Id
:return: Http Response
"""
if not request.content_type == 'application/json':
return response('failed', 'Content-type must be application/json', 401)
data = request.get_json()
item_name = data.get('name')
if not item_name:
return response('failed', 'No name or value attribute found', 401)
# Get the user Bucket
bucket = get_user_bucket(current_user, bucket_id)
if bucket is None:
return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)
# Save the Bucket Item into the Database
item = BucketItem(item_name.lower(), data.get('description', None), bucket.id)
item.save()
return response_with_bucket_item('success', item, 200)
python类content_type()的实例源码
def validate_jpeg_binary(func):
""" checks the mimetype and the binary data to ensure it's a JPEG """
@wraps(func)
def wrapper(*args, **kwargs):
if request.content_type != "image/jpeg":
return ErrorResponseJson("invalid content type: {}".format(request.content_type)).make_response()
if imghdr.test_jpeg(request.data, None) != "jpeg":
return ErrorResponseJson("invalid jpeg data").make_response()
return func(*args, **kwargs)
return wrapper
def builtin_jq():
"""
Builtin program: `jq`.
It will run a `jq` progress and return a json object.
"""
program = request.args.get('program', ".")
command = request.data
try:
data = jq(program, command)
resp = make_response(data)
resp.content_type = 'application/json'
return resp
except InvalidJQFilter as exception:
return jsonify(message=str(exception)), 400
def reset_password(current_user):
if request.content_type == "application/json":
data = request.get_json()
old_password = data.get('oldPassword')
new_password = data.get('newPassword')
password_confirmation = data.get('passwordConfirmation')
if not old_password or not new_password or not password_confirmation:
return response('failed', "Missing required attributes", 400)
if bcrypt.check_password_hash(current_user.password, old_password.encode('utf-8')):
if not new_password == password_confirmation:
return response('failed', 'New Passwords do not match', 400)
if not len(new_password) > 4:
return response('failed', 'New password should be greater than four characters long', 400)
current_user.reset_password(new_password)
return response('success', 'Password reset successfully', 200)
return response('failed', "Incorrect password", 401)
return response('failed', 'Content type must be json', 400)
# Register classes as views
def edit_bucket(current_user, bucket_id):
"""
Validate the bucket Id. Also check for the name attribute in the json payload.
If the name exists update the bucket with the new name.
:param current_user: Current User
:param bucket_id: Bucket Id
:return: Http Json response
"""
if request.content_type == 'application/json':
data = request.get_json()
name = data.get('name')
if name:
try:
int(bucket_id)
except ValueError:
return response('failed', 'Please provide a valid Bucket Id', 400)
user_bucket = User.get_by_id(current_user.id).buckets.filter_by(id=bucket_id).first()
if user_bucket:
user_bucket.update(name)
return response_for_created_bucket(user_bucket, 201)
return response('failed', 'The Bucket with Id ' + bucket_id + ' does not exist', 404)
return response('failed', 'No attribute or value was specified, nothing was changed', 400)
return response('failed', 'Content-type must be json', 202)
def builtin_echo():
"""
Builtin program: `echo`.
It will response form data.
"""
resp = make_response(request.data)
resp.content_type = request.content_type
return resp
def report(node):
if not node.maintenance_mode:
try:
node.active_config_version = int(request.args.get('version'))
except (ValueError, TypeError):
return abort(400)
if request.content_type != 'application/json':
return abort(400)
provision = models.Provision()
provision.node = node
provision.config_version = node.active_config_version
provision.ignition_config = request.data
if node.target_config_version == node.active_config_version:
provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
models.db.session.add(provision)
models.db.session.add(node)
node.disks.update({
models.Disk.wipe_next_boot: False
})
if node.cluster.are_etcd_nodes_configured:
node.cluster.assert_etcd_cluster_exists = True
models.db.session.add(node.cluster)
models.db.session.commit()
return Response('ok', mimetype='application/json')
def image():
# json = request.get_json()
print('contentType', request.content_type)
# length = len(json['binaryData'])
print('data length', len(request.data))
resultImageBuffer = process_image(request.data)
# resultJsonString = JSONEncoder().encode({'resultImageBuffer': resultImageBuffer})
return resultImageBuffer
def default_after_request(param):
response_param = {'charset': param.charset,
'content_length': param.content_length,
'content_type': param.content_type,
'content_encoding': param.content_encoding,
'mimetype': param.mimetype,
'response': g.result if hasattr(g, 'result') else None,
'status': param.status,
'status_code': param.status_code}
g.response_time = datetime.now()
time_consuming = str(g.response_time - g.request_time)
log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'),
'request_param': g.get('request_param'), 'request_form': g.get('request_form'),
'querystring': g.get('request_param')['query_string'], 'request_json': g.get('request_json'),
'response_param': response_param, 'request_raw_data': g.request_raw_data,
'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
'time_consuming': time_consuming}
if param.status_code >= 400:
current_app.logger.error(log_info)
else:
current_app.logger.debug(log_info)
return param
# ??????
def edit_item(current_user, bucket_id, item_id):
"""
Edit an item with a valid Id. The request content-type must be json and also the Bucket
in which the item belongs must be among the user`s Buckets.
The name of the item must be present in the payload but the description is optional.
:param current_user: User
:param bucket_id: Bucket Id
:param item_id: Item Id
:return: Response of Edit Item
"""
if not request.content_type == 'application/json':
return response('failed', 'Content-type must be application/json', 401)
try:
int(item_id)
except ValueError:
return response('failed', 'Provide a valid item Id', 202)
# Get the user Bucket
bucket = get_user_bucket(current_user, bucket_id)
if bucket is None:
return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)
# Get the item
item = bucket.items.filter_by(id=item_id).first()
if not item:
abort(404)
# Check for Json data
request_json_data = request.get_json()
item_new_name = request_json_data.get('name')
item_new_description = request_json_data.get('description', None)
if not request_json_data:
return response('failed', 'No attributes specified in the request', 401)
if not item_new_name:
return response('failed', 'No name or value attribute found', 401)
# Update the item record
item.update(item_new_name, item_new_description)
return response_with_bucket_item('success', item, 200)
def client_pushed():
if request.content_type == 'application/json':
data = request.get_json(silent=True)
else:
data = request.form.to_dict()
if not data:
logger.error("Invalid scan request from: %r", request.remote_addr)
abort(400)
logger.debug("Client %r request dump:\n%s", request.remote_addr, json.dumps(data, indent=4, sort_keys=True))
if ('eventType' in data and data['eventType'] == 'Test') or ('EventType' in data and data['EventType'] == 'Test'):
logger.info("Client %r made a test request, event: '%s'", request.remote_addr, 'Test')
elif 'eventType' in data and data['eventType'] == 'Manual':
logger.info("Client %r made a manual scan request for: '%s'", request.remote_addr, data['filepath'])
final_path = utils.map_pushed_path(conf.configs, data['filepath'])
# ignore this request?
ignore, ignore_match = utils.should_ignore(final_path, conf.configs)
if ignore:
logger.info("Ignored scan request for '%s' because '%s' was matched from SERVER_IGNORE_LIST", final_path,
ignore_match)
return "Ignoring scan request because %s was matched from your SERVER_IGNORE_LIST" % ignore_match
if start_scan(final_path, 'manual', 'Manual'):
return "'%s' was added to scan backlog." % final_path
else:
return "Error adding '%s' to scan backlog." % data['filepath']
elif 'Movie' in data:
logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr,
data['Movie']['FilePath'], data['EventType'])
final_path = utils.map_pushed_path(conf.configs, data['Movie']['FilePath'])
start_scan(final_path, 'radarr', data['EventType'])
elif 'movie' in data:
# new radarr webhook
path = os.path.join(data['movie']['folderPath'], data['movieFile']['relativePath'])
logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr, path,
"Upgrade" if data['isUpgrade'] else data['eventType'])
final_path = utils.map_pushed_path(conf.configs, path)
start_scan(final_path, 'radarr', "Upgrade" if data['isUpgrade'] else data['eventType'])
elif 'Series' in data:
logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, data['Series']['Path'],
data['EventType'])
final_path = utils.map_pushed_path(conf.configs, data['Series']['Path'])
start_scan(final_path, 'sonarr', data['EventType'])
elif 'series' and 'episodeFile' in data:
# new sonarr webhook
path = os.path.join(data['series']['path'], data['episodeFile']['relativePath'])
logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, path,
"Upgrade" if data['isUpgrade'] else data['eventType'])
final_path = utils.map_pushed_path(conf.configs, path)
start_scan(final_path, 'sonarr_dev', "Upgrade" if data['isUpgrade'] else data['eventType'])
else:
logger.error("Unknown scan request from: %r", request.remote_addr)
abort(400)
return "OK"
############################################################
# MAIN
############################################################