def upload_video():
if 'video_file' not in request.files:
return jsonify(error='upload not found'), 200, {'ContentType': 'application/json'}
upload_file = request.files['video_file']
if upload_file.filename == '':
return jsonify(error='the file has no name'), 200, {'ContentType': 'application/json'}
use_rgb = request.form['use_rgb']
use_flow = request.form['use_flow']
if upload_file and allowed_file(upload_file.filename):
filename = secure_filename(upload_file.filename)
# first save the file
savename = os.path.join(app.config['UPLOAD_FOLDER'], filename)
upload_file.save(savename)
# classify the video
return run_classification(savename, use_rgb, use_flow)
else:
return jsonify(error='empty or not allowed file'), 200, {'ContentType': 'application/json'}
python类secure_filename()的实例源码
def to_os_path(self, _path, _case, _run_id):
path_out = self.run_path(_case, str(_run_id))
_path = str(_path).split('/')
file_out = None
counter = 1
for res in _path:
if counter < len(_path):
res = secure_filename(res)
path_out = os.path.join(path_out, res)
if counter == len(_path):
file_out = res
counter += 1
return path_out, file_out
## Function to read file content
# @param _file_name file name <string>
def update_user_image(user_id):
user = models.Users.query.get(user_id)
user_serialized = serializer.UsersSerializer().serialize([user])
if not user:
return jsonify(result='invalid user id'), 404
if 'image_file' not in request.files:
return jsonify(result='No file part'), 404
file = request.files['image_file']
if file.filename == '':
return jsonify(result='No selected file'), 400
if not file or not allowed_file(file.filename):
return jsonify(result='File with invalid format'), 400
filename = secure_filename(file.filename)
if not user.save_image(file):
return jsonify(user=user_serialized), 400
db.session.commit()
user_serialized = serializer.UsersSerializer().serialize([user])
return jsonify(user=user_serialized), 200
def getAppInfo():
global packageName,main_activity,apk
f = request.files['file']
fname = secure_filename(f.filename)
apk = os.path.join(UPLOAD_FOLDER,fname)
f.save(apk)
cmd_activity = "aapt d badging %s|findstr launchable-activity" %apk
cmd_package = "aapt d badging %s|findstr package" %apk
activity = Popen(cmd_activity,stdout=PIPE,shell=True)
package = Popen(cmd_package,stdout=PIPE,shell=True)
main_activity = activity.stdout.read().decode().split("name='")[1].split("'")[0]
packageName = package.stdout.read().decode().split("name='")[1].split("'")[0]
activity.kill()
package.kill()
return redirect(url_for("index"))
def editor_pic(self):
image_file = request.files['editormd-image-file']
if image_file and allowed_photo(image_file.filename):
try:
filename = secure_filename(image_file.filename)
filename = str(date.today()) + '-' + random_str() + '-' + filename
file_path = os.path.join(bpdir, 'static/editor.md/photoupdate/', filename)
qiniu_path = os.path.join(bpdir, 'static/blog/qiniu_pic/', filename)
image_file.save(file_path)
ting_pic(file_path, qiniu_path)
qiniu_link = get_link(qiniu_path, filename)
data = {
'success': 1,
'message': 'image of editor.md',
'url': qiniu_link
}
return json.dumps(data)
except Exception as e:
current_app.logger.error(e)
else:
return u"??????????????"
def upload_recipe(request):
def isAllowed(filename):
return len(filter_by_extensions(filename)) > 0
redirect_url = "index"
print("request: {}".format(request.FILES.getlist("recipes")))
for file in request.FILES.getlist("recipes"):
print("file: {}".format(file.name))
if isAllowed(file.name):
filename = path.join("recipes", secure_filename(file.name))
print("filename: {}".format(filename))
# file.save(filename)
handle_uploaded_file(file, filename)
redirect_url = "validate"
request.session["recipe_file"] = filename
else:
print("Invalid BeerXML file <%s>." % file.filename)
return redirect(redirect_url)
# @frontend.route("/validate")
def avatar():
basedir = os.path.abspath(os.path.dirname(__file__))
file_dir = os.path.join(basedir,current_app.config['UPLOAD_IMG_FOLDER'])
username = request.values.get('username')
form = AvatarForm()
if not os.path.exists(file_dir):
os.makedirs(file_dir)
if request.method == 'POST':
file = request.files['file']
if file and allowed_file(file.filename):
size = (50, 50)
im = Image.open(file)
im.thumbnail(size)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
newname = username+'.jpg'
im.save(os.path.join(file_dir, newname))
token = base64.b64encode(newname)
user = User.query.filter_by(username=username).first()
user.token = token
db.session.add(user)
db.session.commit()
return redirect(url_for('main.index'))
return render_template('auth/change_avatar.html', form=form)
def download_name(self, project, ty, _format):
"""Get the filename (without) path of the file which should be downloaded.
This function does not check if this filename actually exists!"""
# TODO: Check if ty is valid
name = self._project_name_latin_encoded(project)
filename = '%s_%s_%s_%s.zip' % (str(project.id), name, ty, _format) # Example: 123_feynman_tasks_json.zip
filename = secure_filename(filename)
return filename
def test_export_task_json_support_non_latin1_project_names(self):
project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!')
self.clear_temp_container(project.owner_id)
res = self.app.get('project/%s/tasks/export?type=task&format=json' % project.short_name,
follow_redirects=True)
filename = secure_filename(unidecode(u'?????? ????!'))
assert filename in res.headers.get('Content-Disposition'), res.headers
def test_export_taskrun_json_support_non_latin1_project_names(self):
project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!')
res = self.app.get('project/%s/tasks/export?type=task_run&format=json' % project.short_name,
follow_redirects=True)
filename = secure_filename(unidecode(u'?????? ????!'))
assert filename in res.headers.get('Content-Disposition'), res.headers
def test_export_taskrun_csv_support_non_latin1_project_names(self):
project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!')
task = TaskFactory.create(project=project)
TaskRunFactory.create(task=task)
res = self.app.get('/project/%s/tasks/export?type=task_run&format=csv' % project.short_name,
follow_redirects=True)
filename = secure_filename(unidecode(u'?????? ????!'))
assert filename in res.headers.get('Content-Disposition'), res.headers
def encrypt():
if request.method == 'POST':
# check if the post request has the file part
if 'up_pic' not in request.files:
return "No file selected"
return redirect(request.url)
file = request.files['up_pic']
degree = int(request.form.get('degree'))
pwd = request.form.get('pwd')
# if user does not select file, browser also
# submit a empty part without filename
if file.filename == '':
return "No file selected"
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
links = [str(os.path.join(app.config['UPLOAD_FOLDER'], filename))]
if request.form['submit']=="enc":
(im,arr,path)=enc.encode(links[0],degree,pwd)
else:
(im,arr,path)=dec.decode(links[0],degree,pwd)
links.append(path)
return render_template("display.html",link=links)
def admin_files(chalid):
if request.method == 'GET':
files = Files.query.filter_by(chal=chalid).all()
json_data = {'files':[]}
for x in files:
json_data['files'].append({'id':x.id, 'file':x.location})
return jsonify(json_data)
if request.method == 'POST':
if request.form['method'] == "delete":
f = Files.query.filter_by(id=request.form['file']).first_or_404()
if os.path.exists(os.path.join(app.static_folder, 'uploads', f.location)): ## Some kind of os.path.isfile issue on Windows...
os.unlink(os.path.join(app.static_folder, 'uploads', f.location))
db.session.delete(f)
db.session.commit()
db.session.close()
return "1"
elif request.form['method'] == "upload":
files = request.files.getlist('files[]')
for f in files:
filename = secure_filename(f.filename)
if len(filename) <= 0:
continue
md5hash = hashlib.md5(os.urandom(64)).hexdigest()
if not os.path.exists(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)):
os.makedirs(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash))
f.save(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash, filename))
db_f = Files(chalid, os.path.join('static', 'uploads', md5hash, filename))
db.session.add(db_f)
db.session.commit()
db.session.close()
return redirect('/admin/chals')
def admin_create_chal():
files = request.files.getlist('files[]')
## TODO: Expand to support multiple flags
flags = [{'flag':request.form['key'], 'type':int(request.form['key_type[0]'])}]
# Create challenge
chal = Challenges(request.form['name'], request.form['desc'], request.form['value'], request.form['category'], flags)
db.session.add(chal)
db.session.commit()
for f in files:
filename = secure_filename(f.filename)
if len(filename) <= 0:
continue
md5hash = hashlib.md5(os.urandom(64)).hexdigest()
if not os.path.exists(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)):
os.makedirs(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash))
f.save(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash, filename))
db_f = Files(chal.id, os.path.join('static', 'uploads', md5hash, filename))
db.session.add(db_f)
db.session.commit()
db.session.close()
return redirect('/admin/chals')
def post(self):
args = self.parser.parse_args()
if not args.file:
abort(400, message={'file': 'parameter is required'})
if (args.xres is None) != (args.yres is None):
a, b = ('xres', 'yres') if args.yres is None else ('yres', 'xres')
abort(400, message={a: "can not stand alone without {} being set".format(b)})
if not args.name:
args.name = os.path.basename(os.path.splitext(args.file.filename)[0])
inputdim = (args.xres, args.yres) if args.xres else None
svgdata = args.file.read()
try:
data = convert(io.BytesIO(svgdata), inputdim=inputdim, name=args.name)
data = json.dumps(data, indent=2, sort_keys=True)
except Exception as exc:
abort(400, message={'file': str(exc)})
# Save the shape.
dirname = storage_dir()
if not os.path.isdir(dirname):
os.makedirs(dirname)
basename = secure_filename(datetime.now().strftime('%Y%m%d%H%M%S_' + args.name))
filename = os.path.join(dirname, basename)
with open(filename + '.svg', 'wb') as fp:
fp.write(svgdata)
with open(filename + '.json', 'w') as fp:
fp.write(data)
return {'status': 'ok', 'name': args.name, 'id': basename}
def upload_file(self, folder, param):
if request.method == 'POST':
# check if the post request has the file part
if param not in request.files:
return None
file = request.files[param]
# if user does not select file, browser also submit a empty part without filename
if not file or len(file.filename) == 0:
return None
if '.' in file.filename and file.filename.rsplit('.', 1)[1] == 'py':
filename = secure_filename(file.filename)
file.save(os.path.join(folder, filename))
return filename
def store(self, filename, file_data):
filename = secure_filename(filename)
if self.snake_case:
filename = convert_to_snake_case(filename)
if self._exists(filename):
raise StorageExists()
if self.all_allowed or any(filename.endswith('.' + x) for x in self.allowed):
self.s3.put_object(Bucket=self.bucket_name,
Key=filename,
Body=file_data,
ACL=self.acl)
else:
raise StorageNotAllowed()
return filename
def store(self, filename, file_data):
filename = secure_filename(filename)
if self.snake_case:
filename = convert_to_snake_case(filename)
if filename in self.get_existing_files():
raise StorageExists()
if self.all_allowed or any(filename.endswith('.' + x) for x in self.allowed):
file_data.save(os.path.join(self.abs_img_folder, filename))
else:
raise StorageNotAllowed()
return filename
def create_new_attachment(paste_id, file_name, file_size, mime_type, file_data):
"""
Create a new database entry for an attachment with the given file_name, associated with a particular paste ID.
:param paste_id: Paste ID to associate with this attachment
:param file_name: Raw name of the file
:param file_size: Size of the file in bytes
:param mime_type: MIME type of the file
:param file_data: Binary, base64-encoded file data
:return: An instance of models.Attachment describing this attachment entry
:raises PasteDoesNotExistException: If the associated paste does not exist
"""
# Add an entry into the database describing this file
new_attachment = models.Attachment(
paste_id=paste_id,
file_name=secure_filename(file_name),
file_size=file_size,
mime_type=mime_type,
)
_store_attachment_file(paste_id, file_data, new_attachment.hash_name)
session.add(new_attachment)
session.commit()
return new_attachment
def upload():
from run import config
form = UploadForm()
if form.validate_on_submit():
# Process uploaded file
uploaded_file = request.files[form.file.name]
if uploaded_file:
filename = secure_filename(uploaded_file.filename)
temp_path = os.path.join(
config.get('SAMPLE_REPOSITORY', ''), 'TempFiles', filename)
# Save to temporary location
uploaded_file.save(temp_path)
# Get hash and check if it's already been submitted
file_hash = create_hash_for_sample(temp_path)
if sample_already_uploaded(file_hash):
# Remove existing file and notice user
os.remove(temp_path)
form.errors['file'] = [
'Sample with same hash already uploaded or queued']
else:
add_sample_to_queue(file_hash, temp_path, g.user.id, g.db)
# Redirect
return redirect(url_for('.index'))
return {
'form': form,
'accept': form.accept,
'upload_size': (config.get('MAX_CONTENT_LENGTH', 0) / (1024 * 1024)),
}