def uploadFile(current_user):
format = "%Y-%m-%dT%H:%M:%S"
now = datetime.datetime.utcnow().strftime(format)
try:
file = request.files['file']
except:
file = None
try:
url = request.form['url']
except:
url = None
if file and allowed_file(file.filename):
filename = now + '_' +str(current_user) + '_' + file.filename
filename = secure_filename(filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
file_uploaded = True
elif url:
file = urllib.urlopen(url)
filename = url.split('/')[-1]
filename = now + '_' +str(current_user) + '_' + filename
filename = secure_filename(filename)
if file and allowed_file(filename):
open(os.path.join(app.config['UPLOAD_FOLDER'], filename),
'wb').write(file.read())
file_uploaded = True
else:
filename = None
file_uploaded = False
return file_uploaded, filename
python类secure_filename()的实例源码
def secure_path(path):
dirname = os.path.dirname(path)
filename = os.path.basename(path)
file_base, file_ext = os.path.splitext(path)
dirname = secure_filename(slugify(dirname, only_ascii=True))
file_base = secure_filename(slugify(file_base, only_ascii=True)) or 'unnamed'
file_ext = secure_filename(slugify(file_ext, only_ascii=True))
if file_ext:
filename = '.'.join([file_base, file_ext])
else:
filename = file_base
if len(filename) > 200:
filename = '%s__%s' % (filename[:99], filename[-99:])
if dirname:
return os.path.join(dirname, filename)
return filename
def _upload_file_to_rackspace(self, file, container, attempt=0):
"""Upload file to rackspace."""
try:
chksum = pyrax.utils.get_checksum(file)
self.cf.upload_file(container,
file,
obj_name=secure_filename(file.filename),
etag=chksum)
return True
except Exception as e:
print "Uploader exception" # TODO: Log this!
traceback.print_exc()
attempt += 1
if (attempt < 3):
time.sleep(1) # Wait one second and try again
return self._upload_file_to_rackspace(file, container, attempt)
else:
print "Tried to upload two times. Failed!" # TODO: Log this!
raise
def create(uploaded_files=[], uploaded_url=[]):
paths = []
for file in uploaded_files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# TODO check path before save
path = os.path.join(app.config['UPLOAD_FOLDER'])
if not os.path.exists(path):
os.makedirs(path)
path_filename = os.path.join(path, filename)
file.save(path_filename)
paths.append(path_filename)
inp = Input(paths, urls=uploaded_url)
analysis_id = inp.analyze()
logging.info("Analysis {} : {}".format(analysis_id, paths))
return analysis_id
def dated_path(obj, file_data):
try:
prefix = getattr(obj, 'model_name')
except BaseException:
prefix = "undefined"
parts = op.splitext(file_data.filename)
rand = random.getrandbits(16)
filename = u"{name}_{rand}{ext}".format(
rand=rand, name=parts[0], ext=parts[1]
)
filename = secure_filename(filename)
today = date.today()
path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
prefix=prefix, t=today, filename=filename
)
return path
def save_image(file_, extension, message, name, email, branch='master'):
"""
Save image to github as a commit
:param file_: Open file object containing image
:param: Extension to use for saved filename
:param message: Commit message to save image with
:param name: Name of user committing image
:param email: Email address of user committing image
:param branch: Branch to save image to
:returns: Public URL to image or None if not successfully saved
"""
file_name = secure_filename('%s%s%s' % (str(uuid.uuid4()), os.extsep, extension))
path = os.path.join(main_image_path(), file_name)
url = None
if commit_image_to_github(path, message, file_, name, email,
branch=branch) is not None:
url = github_url_from_upload_path(path, file_name, branch=branch)
return url
def upload_files(target, sessionID, datafiles):
"""Upload the files to the server directory
Keyword arguments:
target - The target directory to upload the files to
sessionID - The user session ID
datafiles - The list of the files to be uploaded
Returns:
List
"""
filename_list = []
for datafile in datafiles:
filename = secure_filename(datafile.filename).rsplit("/")[0]
update_file_metadata(sessionID, filename)
filename_list.append(filename)
destination = os.path.join(target, filename)
app.logger.info("Accepting incoming file: %s" % filename)
app.logger.info("Saving it to %s" % destination)
datafile.save(destination)
return filename_list
def upload():
# Get the name of the uploaded file
file = request.files['file']
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
# Move the file form the temporal folder to
# the upload folder we setup
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# Redirect the user to the uploaded_file route, which
# will basicaly show on the browser the uploaded file
# CV2
#img_np = cv2.imdecode(np.fromstring(file.read(), np.uint8), cv2.IMREAD_UNCHANGED) # cv2.IMREAD_COLOR in OpenCV 3.1
img_np = cv2.imread(os.path.join(app.config['UPLOAD_FOLDER'], filename), -1)
cv2.imshow("Image", img_np)
return redirect(url_for('uploaded_file',
filename=filename))
# This route is expecting a parameter containing the name
# of a file. Then it will locate that file on the upload
# directory and show it on the browser, so if the user uploads
# an image, that image is going to be show after the upload
def upload():
# import pdb
# pdb.set_trace()
if request.method == "POST":
typ = request.form.get('type')
file = request.files.get('file')
result = False
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
save_as = os.path.join(app.config['UPLOAD_DIR'], filename)
try:
file.save(save_as)
except Exception as e:
return render_template('error_code/404.html', msg='Failed to save file...[Err:{0}]'.format(e))
else:
result = move_file(typ, save_as, filename, backup=True, linkit=True)
if not result:
flash('error:Failed To Upload file..., Try again...')
else:
flash('info:File uploaded Successfully!')
return render_template('upload/upload.html')
def _rename_temp_file(self, tmp_path, name=None, ext=None):
name = name if name else self.get_file_md5(tmp_path)
# ????????
if os.stat(tmp_path).st_size < 200:
os.remove(tmp_path)
raise ValueError('file is too small')
filename = secure_filename('%s%s' % (name, ext))
folder = os.path.join(self.root, self.sub_dir, name[:3])
if not os.path.exists(folder):
os.makedirs(folder)
new_path = os.path.join(folder, filename)
if os.path.exists(new_path):
os.remove(tmp_path)
else:
os.rename(tmp_path, new_path)
url = os.path.join('/', self.sub_dir, name[:3], filename)
return url
def upload_midi():
file = request.files['file']
if file and allowed_file(file.filename, set(['mid'])):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
# Move the file form the temporal folder to the upload folder
filepath = os.path.join(app.config['UPLOAD_FOLDER_MIDI'], filename)
file.save(filepath)
# Call python script to predict file
# return labels for each category
try:
confidence_score = midi_predictor.predict_song(filepath)
flash(confidence_score, 'mood')
return redirect(url_for('index'))
except Exception, e:
flash(str(e),'error')
return redirect(url_for('index'))
else:
flash("Invalid File",'error')
return redirect(url_for('index'))
def upload_lyrics():
file = request.files['file']
print file.filename, "File"
if file and allowed_file(file.filename, set(['txt'])):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
# Move the file form the temporal folder to
# the upload folder
file.save(os.path.join(app.config['UPLOAD_FOLDER_LYRICS'], filename))
# return labels for predicted mood
lyric_confidence_score = lyrics_predictor.predict_lyrics(file.read())
flash(lyric_confidence_score, 'mood')
return redirect(url_for('index'))
else:
flash("Invalid File",'error')
return redirect(url_for('index'))
def store_form(form):
entry = FormEntry(form_id=g.page.id)
for f in form:
field = Field.query.filter_by(form_id=g.page.id).filter_by(name=f.name).one_or_none()
if field is None:
continue
field_entry = FieldEntry(field_id=field.id)
data = f.data
if field.type == 'file_input':
file_data = request.files[field.name]
filename = '%s-%s-%s.%s' %(field.name, date_stamp(), str(time.time()).replace('.', ''), os.path.splitext(file_data.filename)[-1])
path = os.path.join(app.config['FORM_UPLOADS_PATH'], secure_filename(filename))
file_data.save(path)
data = filename
field_entry.value = data
db.session.add(field_entry)
entry.fields.append(field_entry)
db.session.add(entry)
db.session.commit()
def upload_avatar():
form = UploadAvatarForm()
if form.validate_on_submit():
avatar = request.files['avatar']
filename = secure_filename(avatar.filename)
UPLOAD_FOLDER = current_app.config['UPLOAD_FOLDER']
ALLOWED_EXTENTIONS = set(['jpg', 'png', 'jpeg', 'gif'])
flag = '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENTIONS
if not flag:
flash('Error file types.')
return redirect(url_for('.user', username=current_user.username))
avatar.save(os.path.join(UPLOAD_FOLDER, filename))
current_user.user_avatar = os.path.join(UPLOAD_FOLDER, filename)
db.session.add(current_user)
flash('Your avatar has been updated.')
return redirect(url_for('.user', username=current_user.username))
return render_template('upload_avatar.html', form=form)
def upload_file_kmeans():
file1 = request.files['trainfile']
if file1 and allowed_file(file1.filename):
trainingfilename = secure_filename(file1.filename)
file1.save(os.path.join(app.config['UPLOAD_FOLDER'], trainingfilename))
trainingfilename = 'uploads/' + trainingfilename
my_cluster_df = parse_file(trainingfilename)
my_cluster_colnames = list(my_cluster_df.columns.values)
my_cluster_colnames.extend(['cluster'])
my_cluster_datatypes = []
list_dataypes = list(my_cluster_df.dtypes)
for col in list_dataypes:
if col == 'object':
my_cluster_datatypes.append('string')
else:
my_cluster_datatypes.append('float')
my_cluster_datatypes.extend(['float'])
return (json.dumps([my_cluster_colnames, my_cluster_datatypes]))
def create_image_filename(image_filename, filename_append_list=[]):
'''
.. function:: create_image_filename(image_filename, filename_append_list=[])
Generate a filename with optional strings appended.
:param image_filename: file owner
:type image_filename: User
:param image_filename: list of strings to append to the gnerated filename
:type image_filename: List
:rtype: String
'''
from werkzeug import secure_filename
new_filename = secure_filename(image_filename)
fix_filename = os.path.splitext(new_filename)
# Optionally append strings, question id etc
for append in filename_append_list:
new_filename += '_' + append
app.logger.debug("new_filename + appendics = %s", new_filename)
new_filename = hash_string(new_filename) + fix_filename[1]
return new_filename
def upload(name):
message = "Success"
code = 200
try:
datasetFolder = "./data/" + name + "/dataset/"
projectmgr.ValidateServiceExists(name, constants.ServiceTypes.MachineLearning)
if not os.path.exists(datasetFolder):
os.makedirs(datasetFolder)
if len(request.files) == 0:
code = 1002
message = "No file found"
return jsonify({"statuscode": code, "message": message})
postedfile = request.files.items(0)[0][1]
postedfile.save(os.path.join(datasetFolder, werkzeug.secure_filename(postedfile.filename)))
except Exception as e:
code = 500
message = str(e)
return jsonify({"statuscode": code, "message": message})
def course_summary():
if request.method == 'POST' and 'course_results' in request.files:
pdf_file = request.files['course_results']
if pdf_file and allowed_file(pdf_file.filename):
original_filename = secure_filename(pdf_file.filename)
unique_filename = create_unique_filename(app.config['UPLOAD_FOLDER'],
original_filename)
pdf_file.save(os.path.join(app.config['UPLOAD_FOLDER'], unique_filename))
pdf_abs_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'],
unique_filename))
student_course_summary = None
try:
student_course_summary = course_statistics.get_course_statistics(
pdf_abs_path)
except course_statistics.ReadPDFException as e:
return render_template('failure.html',
title='Failure',
redirect=True,
message='It seems the file you provided cound not be read as a PDF.')
return render_template('student_summary_%s.html' % student_course_summary.language,
title='Summary',
student_summary=student_course_summary)
return redirect(url_for('index'))
def bcresrv():
form = Upload()
if form.validate_on_submit():
session['filename'] = secure_filename(form.file.data.filename)
if session.get('filename'):
if session.get('filename').split('.')[-1] in ALLOWED_EXTENSIONS:
form.file.data.save('uploads/' + session.get('filename'))
cmd = "mydocker --bcreatesrv ./uploads/%s > /dev/null &" %session.get('filename')
os.system(cmd)
flash('The Server Containers is being created!')
return redirect(url_for('index'))
else:
flash('Looks like you do not choose a excel file!')
return redirect(url_for('bcresrv'))
else:
flash('Looks like you do not choose any file!')
return redirect(url_for('bcresrv'))
return render_template('bcresrv.html',form=form)
#??????????????
def bcrecli():
form = Upload()
if form.validate_on_submit():
session['filename'] = secure_filename(form.file.data.filename)
if session.get('filename'):
if session.get('filename').split('.')[-1] in ALLOWED_EXTENSIONS:
form.file.data.save('uploads/' + session.get('filename'))
cmd = "mydocker --bcreatecli ./uploads/%s > /dev/null &" %session.get('filename')
os.system(cmd)
flash('The Client Containers is being created!')
return redirect(url_for('index'))
else:
flash('Looks like you do not choose a excel file!')
return redirect(url_for('bcrecli'))
else:
flash('Looks like you do not choose any file!')
return redirect(url_for('bcrecli'))
return render_template('bcrecli.html',form=form)
#??????????????
def bdelsrv():
form = Upload()
if form.validate_on_submit():
session['filename'] = secure_filename(form.file.data.filename)
if session.get('filename'):
if session.get('filename').split('.')[-1] in ALLOWED_EXTENSIONS:
form.file.data.save('uploads/' + session.get('filename'))
cmd = "mydocker --bdelsrv ./uploads/%s > /dev/null &" %session.get('filename')
os.system(cmd)
flash('The Server Containers is being deleted!')
return redirect(url_for('index'))
else:
flash('Looks like you do not choose a excel file!')
return redirect(url_for('bdelsrv'))
else:
flash('Looks like you do not choose any file!')
return redirect(url_for('bdelsrv'))
return render_template('bdelsrv.html',form=form)
#??????????????
def bdelcli():
form = Upload()
if form.validate_on_submit():
session['filename'] = secure_filename(form.file.data.filename)
if session.get('filename'):
if session.get('filename').split('.')[-1] in ALLOWED_EXTENSIONS:
form.file.data.save('uploads/' + session.get('filename'))
cmd = "mydocker --bdelcli ./uploads/%s > /dev/null &" %session.get('filename')
os.system(cmd)
flash('The Client Containers is being deleted!')
return redirect(url_for('index'))
else:
flash('Looks like you do not choose a excel file!')
return redirect(url_for('bdelcli'))
else:
flash('Looks like you do not choose any file!')
return redirect(url_for('bdelcli'))
return render_template('bdelcli.html',form=form)
#????????????
def data(command):
def handle_file():
f_name = request.args.get('file_name')
path = app.config['UPLOAD_FOLDER']
if not f_name:
path, f_name = os.path.split(app._cr.csv_file)
return path, f_name
def _set_data_file(path, f_name):
_file = os.path.join(path, f_name)
app._cr.csv_file = _file
app._ar.csv_file = _file
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS
if request.method == 'GET':
if command == 'set':
path, f_name = handle_file()
_set_data_file(path, f_name)
return 'data file set to %s\n' % f_name
elif command == 'download':
path, f_name = handle_file()
return send_from_directory(path, f_name, as_attachment=True)
elif command == 'upload':
return render_template('upload_file.html')
elif command == 'list':
files = os.listdir(app.config['UPLOAD_FOLDER'])
files = [f for f in files if allowed_file(f)]
return render_template('file_list.html', file_list=files)
if request.method == 'POST':
file = request.files['data_file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return "File Saved!\n"
def event_upload(request, event, form):
if not form.validate():
print('upload failed')
flash('Upload failed.')
return redirect(url_for('events.event_detail', event_id=event.id))
if current_user.is_anonymous:
print('user not logged in')
flash('You must be logged in to do this.')
return redirect(url_for('events.event_detail', event_id=event.id))
if current_user not in event.users:
flash('You must be registered for this event to do this.')
return redirect(url_for('events.event_detail', event_id=event.id))
photo = form.photo.data
filename = secure_filename(form.photo.data.filename)
filename = current_user.username + '_' + filename
try:
abs_filename = getcwd() + '/luminance/static/photos/' + filename
form.photo.data.save(abs_filename)
p = Photo(url='/static/photos/' + filename)
current_user.photos.append(p)
current_user.exp += 1
event.photos.append(p)
db_session.add(p)
db_session.add(current_user)
db_session.add(event)
db_session.commit()
except Exception:
print_exc()
flash('Upload failed.')
return redirect(url_for('events.event_list'))
flash('Upload successful!')
return redirect(url_for('events.event_list'))
def _upload_file(self, file, container):
"""Upload a file into a container/folder."""
try:
print "coming here"
filename = secure_filename(file.filename)
if not os.path.isdir(os.path.join(self.upload_folder, container)):
os.makedirs(os.path.join(self.upload_folder, container))
print "not saved"
file.save(os.path.join(self.upload_folder, container, filename))
print "saved"
return True
except Exception:
return False
def parse_upload_file(self, field_name, multi=False):
""" Parse uploaded file from request.files """
# TODO: Set allowed extensions in the backend and compare
files = request.httprequest.files.getlist(field_name)
attachments = []
for file in files:
attachments.append({
'name': secure_filename(file.filename),
'datas': base64.b64encode(file.stream.read())
})
if not multi:
return attachments
return attachments
def ui_sample_upload():
"""
Sample creation from binary file.
"""
upload_form = UploadSampleForm()
families_choices = [(0, "None")]
families_choices += [(f.id, f.name) for f in Family.query.order_by('name')]
upload_form.family.choices = families_choices
if upload_form.validate_on_submit():
family_id = upload_form.family.data
zipflag = upload_form.zipflag.data
family = None
if family_id != 0:
family = api.get_elem_by_type("family", family_id)
for mfile in upload_form.files.raw_data:
file_data = mfile.stream
file_name = secure_filename(mfile.filename)
samples = api.dispatch_sample_creation(
file_data,
file_name,
g.user,
upload_form.level.data,
family,
zipflag)
if len(samples) == 0:
flash("Error during sample creation", "error")
else:
for sample in samples:
flash("Created sample " + str(sample.id), "success")
return redirect(url_for('index'))
def classify_url():
if 'url' in flask.request.args:
imageurl = flask.request.args.get('url', '')
try:
string_buffer = StringIO.StringIO(
urllib.urlopen(imageurl).read())
image = caffe.io.load_image(string_buffer)
except Exception as err:
# For any exception we encounter in reading the image, we will just
# not continue.
logging.info('URL Image open error: %s', err)
return json.dumps({ 'accuracy': [], 'specificity': [] })
logging.info('Image: %s', imageurl)
result = app.clf.classify_image(image)
accuracy = [{ 'label': label, 'score': score } for label, score in result[2]]
return json.dumps({ 'image': embed_image_html(image), 'accuracy': accuracy, 'specificity': accuracy })
else:
try:
# We will save the file to disk for possible data collection.
imagefile = flask.request.files['file']
filename_ = str(datetime.datetime.now()).replace(' ', '_') + \
werkzeug.secure_filename(imagefile.filename)
filename = os.path.join(UPLOAD_FOLDER, filename_)
imagefile.save(filename)
logging.info('Saving to %s.', filename)
image = exifutil.open_oriented_im(filename)
except Exception as err:
logging.info('Uploaded image open error: %s', err)
return json.dumps({ 'accuracy': [], 'specificity': [] })
result = app.clf.classify_image(image)
accuracy = [{ 'label': label, 'score': score } for label, score in result[2]]
return json.dumps({ 'image': embed_image_html(image), 'accuracy': accuracy, 'specificity': accuracy })
def upload():
# remove exsiting files
exsiting_files = os.listdir(app.config['UPLOAD_FOLDER'])
for file in exsiting_files:
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], file))
# Get the name of the uploaded files
uploaded_files = request.files.getlist("file[]")
filenames = []
for file in uploaded_files:
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
# Move the file form the temporal folder to the upload
# folder we setup
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# Save the filename into a list, we'll use it later
filenames.append(filename)
# Redirect the user to the uploaded_file route, which
# will basicaly show on the browser the uploaded file
# Load an html page with a link to each uploaded file
results = identify_logos(app.config['UPLOAD_FOLDER'])
return render_template('upload.html', results=results, length=len(results))
# This route is expecting a parameter containing the name
# of a file. Then it will locate that file on the upload
# directory and show it on the browser, so if the user uploads
# an image, that image is going to be show after the upload
def upload_file_handler(file):
# Save unique file per user
filename = str(current_user.id) + "_" + secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
current_user.file = filename
# Delete old file
path = os.path.abspath(app.config['UPLOAD_FOLDER'])
list = os.listdir(path)
for item in list:
id = int(item.split('_')[0])
if id == int(current_user.id) and filename != item:
os.remove(os.path.join(path, item))