def temporary_upload(name, fileobj):
"""
Upload a file to a temporary location.
Flask will not load sufficiently large files into memory, so it
makes sense to always load files into a temporary directory.
"""
tempdir = mkdtemp()
filename = secure_filename(fileobj.filename)
filepath = join(tempdir, filename)
fileobj.save(filepath)
try:
yield name, filepath, fileobj.filename
finally:
rmtree(tempdir)
python类secure_filename()的实例源码
def get(*args, **kwargs):
if ( ('user_group' in kwargs) == False):
return {"success":'false', "reason":"Cannot get user_group"}
user_group = kwargs['user_group']
if (not ((user_group == 'admin') or (user_group == 'root'))):
return {"success": 'false', "reason": 'Unauthorized Action'}
filepath = logsPath + secure_filename(kwargs['filename'])
try:
if not os.path.exists(filepath):
return {"success": 'false', "reason": 'file not exist'}
logfile = open(filepath, 'r')
logtext = logfile.read()
logfile.close()
return {'success': 'true', 'result': logtext}
except:
return {'success': 'false', 'reason': 'file read error'}
def _make_zip(self, project, ty):
name = self._project_name_latin_encoded(project)
json_task_generator = self._respond_json(ty, project.id)
if json_task_generator is not None:
datafile = tempfile.NamedTemporaryFile()
try:
datafile.write(json.dumps(json_task_generator))
datafile.flush()
zipped_datafile = tempfile.NamedTemporaryFile()
try:
_zip = self._zip_factory(zipped_datafile.name)
_zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty)))
_zip.close()
container = "user_%d" % project.owner_id
_file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile)
uploader.upload_file(_file, container=container)
finally:
zipped_datafile.close()
finally:
datafile.close()
def _make_zip(self, project, ty):
name = self._project_name_latin_encoded(project)
csv_task_generator = self._respond_csv(ty, project.id)
if csv_task_generator is not None:
# TODO: use temp file from csv generation directly
datafile = tempfile.NamedTemporaryFile()
try:
for line in csv_task_generator:
datafile.write(str(line))
datafile.flush()
csv_task_generator.close() # delete temp csv file
zipped_datafile = tempfile.NamedTemporaryFile()
try:
_zip = self._zip_factory(zipped_datafile.name)
_zip.write(
datafile.name, secure_filename('%s_%s.csv' % (name, ty)))
_zip.close()
container = "user_%d" % project.owner_id
_file = FileStorage(
filename=self.download_name(project, ty), stream=zipped_datafile)
uploader.upload_file(_file, container=container)
finally:
zipped_datafile.close()
finally:
datafile.close()
def predict():
import ipdb; ipdb.set_trace(context=20)
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
try:
pokemon_name = predict_mlp(file).capitalize()
pokemon_desc = pokemon_entries.get(pokemon_name)
msg = ""
except Exception as e:
pokemon_name = None
pokemon_desc = None
msg = str(e)
return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def upload_file():
if request.method == 'POST':
file = request.files.get('file')
if not file or file.filename == '':
# return 'No file uploaded'
return redirect(url_for('response_page')+'?res=no_file')
if allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return redirect(url_for('response_page')+'?res=succeeded')
return redirect(url_for('response_page')+'?res=failed')
return '''
<!doctype html>
<title>Upload new File</title>
<h1>Upload new File</h1>
<form action="" method=post enctype=multipart/form-data>
<p><input type=file name=file>
<input type=submit value=Upload>
</form>
'''
def _save_analysis_file(self, id, path):
file = request.files['file']
analysis = Analysis(get_or_404(current_user.analyses, _id=id))
dirpath = os.path.join(path, str(analysis['_id']))
filepath = os.path.join(dirpath, secure_filename(file.filename))
# Create parent dirs if they don't exist
try:
os.makedirs(dirpath)
except:
pass
with open(filepath, "wb") as fd:
copyfileobj(file.stream, fd)
return filepath
def download_support_file(self, id, module, filename):
"""Download a support file.
.. :quickref: Analysis; Download a support file.
:param id: id of the analysis.
:param module: name of the module.
:param filename: name of the file to download.
"""
analysis = get_or_404(current_user.analyses, _id=id)
filepath = os.path.join(fame_config.storage_path, 'support_files', module, str(analysis['_id']), secure_filename(filename))
if os.path.isfile(filepath):
return file_download(filepath)
else:
# This code is here for compatibility
# with older analyses
filepath = os.path.join(fame_config.storage_path, 'support_files', str(analysis['_id']), secure_filename(filename))
if os.path.isfile(filepath):
return file_download(filepath)
else:
abort(404)
def new_analysis():
analysis_pcap = None
analysis_title = None
analysis_ruleset = None
pcap_name = None
pcap_path = None
if "title" in request.form:
analysis_title = request.form["title"]
if "pcap" in request.files:
analysis_pcap = request.files["pcap"]
if "ruleset" in request.form:
analysis_ruleset = request.form["ruleset"]
if analysis_pcap is None:
return render_template("index.html", analyses = db_handler.search_analyses())
pcap_name = analysis_pcap.filename
pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap")
pcap_path = os.path.join(storage_folder, pcap_name)
analysis_pcap.save(pcap_path)
x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title)
analysis_pool.add_analysis(x)
return render_template("index.html", analyses=db_handler.search_analyses())
def api_new_analysis():
analysis_pcap = None
analysis_title = None
analysis_ruleset = None
pcap_name = None
pcap_path = None
if "title" in request.form:
analysis_title = request.form["title"]
if "pcap" in request.files:
analysis_pcap = request.files["pcap"]
if "ruleset" in request.form:
analysis_ruleset = request.form["ruleset"]
if analysis_pcap is None:
return "{'id':0}"
pcap_name = analysis_pcap.filename
pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap")
pcap_path = os.path.join(storage_folder, pcap_name)
analysis_pcap.save(pcap_path)
x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title)
analysis_pool.add_analysis(x)
return json.dumps({"id":x.analysis_id})
def update(year, month):
date = datetime.date(year, month, 1)
filename = None
delete_file = False
if bank_adapter.fetch_type == 'file':
if 'file' not in request.files:
abort(400)
file = request.files['file']
if config.get('imports_dir'):
filename = os.path.join(config['imports_dir'],
'%s-%s' % (datetime.date.today().isoformat(), secure_filename(file.filename)))
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
else:
temp_file = NamedTemporaryFile(delete=False)
filename = temp_file.name
temp_file.close()
delete_file = True
file.save(filename)
update_local_data(config, date=date, filename=filename, storage=storage)
if delete_file:
os.unlink(filename)
return redirect(url_for('index', year=year, month=month))
api.py 文件源码
项目:the-magical-csv-merge-machine
作者: entrepreneur-interet-general
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _parse_request():
'''
Separates data information from parameters and assures that values in data
parameters are safe
'''
# Parse json request
data_params = None
module_params = None
if request.json:
req = request.json
assert isinstance(req, dict)
print(req)
if 'data_params' in req:
data_params = req['data_params']
# Make paths secure
for key, value in data_params.items():
data_params[key] = secure_filename(value)
if 'module_params' in req:
module_params = req['module_params']
return data_params, module_params
api.py 文件源码
项目:the-magical-csv-merge-machine
作者: entrepreneur-interet-general
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def _parse_linking_request():
data_params = None
module_params = None
if request.json:
params = request.json
assert isinstance(params, dict)
if 'data_params' in params:
data_params = params['data_params']
for file_role in ['ref', 'source']:
# Make paths secure
for key, value in data_params[file_role].items():
data_params[file_role][key] = secure_filename(value)
if 'module_params' in params:
module_params = params['module_params']
return data_params, module_params
def save_file(file):
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_to = join(app.config['UPLOAD_FOLDER'], filename)
if exists(upload_to):
filename = '{}_{}.xmind'.format(filename[:-6], arrow.now().strftime('%Y%m%d_%H%M%S'))
upload_to = join(app.config['UPLOAD_FOLDER'], filename)
file.save(upload_to)
insert_record(filename)
g.is_success = True
elif file.filename == '':
g.is_success = False
g.error = "Please select a file!"
else:
g.is_success = False
g.invalid_files.append(file.filename)
def FUN_upload_image():
if request.method == 'POST':
# check if the post request has the file part
if 'file' not in request.files:
return(redirect(url_for("FUN_root")))
file = request.files['file']
# if user does not select file, browser also submit a empty part without filename
if file.filename == '':
return(redirect(url_for("FUN_root")))
if file and allowed_file(file.filename):
filename = os.path.join("static/img_pool", hashlib.sha256(str(datetime.datetime.now())).hexdigest() + secure_filename(file.filename).lower())
file.save(filename)
prediction_result = mx_predict(filename, local=True)
FUN_resize_img(filename)
return render_template("index.html", img_src = filename, prediction_result = prediction_result)
return(redirect(url_for("FUN_root")))
################################################
# Start the service
################################################
def upload_file():
if request.method == 'POST':
file = request.files['file']
if file and allowed_file(file.filename, ALLOWED_EXTENSIONS):
filename = secure_filename(file.filename)
file_location = os.path.join(app.config['UPLOAD_FOLDER'], filename)
print "file location", file_location
file.save(file_location)
saved_file = url_for('uploaded_file', filename=filename)
endpoint_name = os.path.splitext(filename)[0]
database = request.form.get("database")
if not database:
print request.form["database"]
raise InvalidUsage("Database name was not provided", 400, '{ "error" : "database was not provided" }')
else:
handle_file(database, file_location, endpoint_name, host)
return redirect('/loxo/' + database + '/collections/' + endpoint_name)
return render_template('upload.html')
def upload():
#pprint.pprint(request.files)
# check if the post request has the file part
if 'file' not in request.files:
#print("not file")
return redirect("/")
file = request.files['file']
# if user does not select file, browser also
# submit a empty part without filename
if file.filename == '':
return redirect("/")
filename = secure_filename(file.filename)
path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(path)
FLAGS.input_files = path
# inference
result = main(None)
#print(result)
return jsonify(result)
def load_researcher_affiliations():
"""Preload organisation data."""
form = FileUploadForm()
if form.validate_on_submit():
filename = secure_filename(form.file_.data.filename)
try:
task = Task.load_from_csv(read_uploaded_file(form), filename=filename)
flash(f"Successfully loaded {task.record_count} rows.")
return redirect(url_for("affiliationrecord.index_view", task_id=task.id))
except (
ValueError,
ModelException,
) as ex:
flash(f"Failed to load affiliation record file: {ex}", "danger")
app.logger.exception("Failed to load affiliation records.")
return render_template("fileUpload.html", form=form, form_title="Researcher")
def logo():
"""Manage organisation 'logo'."""
org = current_user.organisation
best = request.accept_mimetypes.best_match(["text/html", "image/*"])
if best == "image/*" and org.logo:
return send_file(
BytesIO(org.logo.data),
mimetype=org.logo.mimetype,
attachment_filename=org.logo.filename)
form = LogoForm()
if form.validate_on_submit():
f = form.logo.data
filename = secure_filename(f.filename)
logo = File.create(data=f.read(), mimetype=f.mimetype, filename=f.filename)
org.logo = logo
org.save()
flash(f"Saved organisation logo '{filename}'", "info")
return render_template("logo.html", form=form)
def upload():
form = TrainingDataForm()
if form.validate_on_submit():
model_name = form.model_name.data
learning_rate = float(form.learning_rate.data)
batch_size = int(form.batch_size.data)
filename = secure_filename(form.training_data.data.filename)
print(form.__dict__)
# Save to Redis here
form.training_data.data.save('wine_quality/data/' + filename)
dataframe = pd.read_csv('wine_quality/data/' + filename, sep=',')
my_model.train(dataframe, learning_rate, batch_size, model_name)
else:
filename = None
return render_template('test_data_upload.html', form=form, filename=filename)
def submit_crash(instance_id):
instance = models.FuzzerInstance.get(id=instance_id)
campaign = instance.campaign
for filename, file in request.files.items():
crash = models.Crash.create(
instance_id=instance.id,
campaign_id=instance.campaign_id,
created=request.args.get('time'),
name=file.filename,
analyzed=False
)
crash_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'crashes')
os.makedirs(crash_dir, exist_ok=True)
upload_path = os.path.join(crash_dir, '%d_%s' % (crash.id, secure_filename(file.filename.replace(',', '_'))))
file.save(upload_path)
crash.path = os.path.abspath(upload_path)
crash.commit()
return ''
def validate(self):
check_validate = super().validate()
if not check_validate:
return False
campaign = Campaign.get(name=self.name.data)
if campaign or os.path.exists(os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(self.name.data))):
self.name.errors.append('Campaign with that name already exists')
return False
if self.copy_of.data == -1:
if not self.executable.has_file():
self.executable.errors.append('Must provide an executable or campaign to copy')
return False
if not self.testcases.has_file():
self.testcases.errors.append('Must provide testcases or campaign to copy')
return False
else:
copy_of = Campaign.get(id=self.copy_of.data)
if not copy_of:
self.copy_of.errors.append('Campaign to copy does not exist')
return False
return True
def plugin_upload():
message = None
if request.method == 'POST':
# check if the post request has the file part
if 'file' not in request.files:
message = 'No file part'
else:
file = request.files['file']
# if user does not select file, browser also
# submit a empty part without filename
if file.filename == '':
message = 'No selected file'
elif file and '.' in file.filename and \
file.filename.rsplit('.', 1)[1] == 'zip':
file.filename = secure_filename(file.filename)
# Create plugin
try:
plugin = Plugin.create_from_zip(file, file.filename)
message = 'Plugin {} version {} created'.format(plugin.name, plugin.version)
except ValidationError as e:
render_error('<b>{}</b> is not valid {}'.format(key, e))
else:
message = 'File does not have a zip extension'
log("Plugin upload message: %s" % message)
return render_template('upload.html', message=message)
def process_request():
source = request.form['source']
if source == 'local' and 'path' in request.form:
keypath = request.form['path']
path = get_local(request.form['path'])
if not path:
return None, None
elif source == 'url' and 'path' in request.form:
keypath = request.form['path']
path = get_url(request.form['path'])
elif source == 'embedded' and 'file' in request.files:
keypath = secure_filename(request.files['file'].filename)
path = get_file(request.files['file'])
else:
req_errors.append('Invalid source, path or file parameters')
return None, None
return keypath, path
def upload_file():
if 'file' not in request.files:
return 'No file part', 400
form_file = request.files['file']
if form_file.filename == '':
return 'No selected file', 400
if form_file.filename.lower().endswith(ALLOWED_EXTENSIONS):
filename = secure_filename(form_file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
form_file.save(file_path)
pman.upload_pepe(file_path)
# We delete the file because ipfs has copied it into its repository
os.remove(file_path)
return redirect('/')
else:
return 'Bad extension', 400
def upload_file(sha):
if not redis.get("upload-lock:" + sha):
abort(403)
# check if the post request has the file part
if 'file' not in request.files:
abort(400)
f = request.files['file']
# if user does not select file, browser also
# submit a empty part without filename
if f.filename == '':
abort(400)
if f and f.filename == secure_filename(f.filename):
filename = secure_filename(f.filename)
# Store files in redis with an expiration so we hopefully don't leak resources.
redis.setex("file:" + filename, 120 * 60, f.read())
print(filename, "uploaded")
else:
abort(400)
return jsonify({'msg': 'Ok'})
def nlp():
text_in = request.args.get('text')
# if text param is present and is true
if text_in is not None:
text_out = text_in
else:
if 'data' not in request.files:
abort(400)
audio_in = request.files['data']
if audio_in:
audio_filename = secure_filename(audio_in.filename)
audio_in_path = os.path.join(app.config['UPLOAD_FOLDER'],
audio_filename)
audio_in.save(audio_in_path)
text_out = story_to_text.recognize(audio_in_path)
# return 200 and audio text
return jsonify(text=text_out)
def upload():
# Create instance of long task
if 'file' not in request.files:
print "No file found"
return render_template('index.html')
file = request.files['file']
print file
if file.filename == '':
flash('No file selected!')
return redirect(request.url)
if file and valid_image(file.filename):
filename = secure_filename(file.filename)
out_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(out_path)
return render_template("index.html", image_name=filename)
def validate(self):
if not Form.validate(self):
return False
if self.uploadfile.name not in request.files:
self.uploadfile.errors.append("Please select a valid file.")
return False
self.file_ = request.files[self.uploadfile.name]
self.filename = secure_filename(self.file_.filename)
uploads = os.listdir(os.path.join(app.root_path, app.config['UPLOAD_FOLDER']))
if self.filename in uploads:
self.uploadfile.errors.append("A file with this name already exists.")
return False
if not self.filename:
self.uploadfile.errors.append("Invalid file name.")
return False
return True
def uploadPlugins():
f = request.files['file']
file_name = ''
try:
if f:
fname = secure_filename(f.filename)
suffix = fname.split('.')[-1]
if suffix in ['py','json']:
path = FILE_PATH + fname
if os.path.exists(FILE_PATH + fname):
fname = fname.split('.')[0] + '_' + str(datetime.now().second) + "." + suffix
path = FILE_PATH + fname
f.save(path)
if os.path.exists(path):
file_name = fname.split('.')[0]
# redis publish
with open(path) as pf:
r = getStrictRedis()
r.publish('updateplugins', json.dumps({"filename":file_name+"."+suffix, "content":pf.read()}))
# update esplugins
vulScan.updatePlugins(file_name, suffix)
return jsonify({"message":"ok"})
except Exception as e:
pass