def uploadFile(current_user):
format = "%Y-%m-%dT%H:%M:%S"
now = datetime.datetime.utcnow().strftime(format)
try:
file = request.files['file']
except:
file = None
try:
url = request.form['url']
except:
url = None
if file and allowed_file(file.filename):
filename = now + '_' +str(current_user) + '_' + file.filename
filename = secure_filename(filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
file_uploaded = True
elif url:
file = urllib.urlopen(url)
filename = url.split('/')[-1]
filename = now + '_' +str(current_user) + '_' + filename
filename = secure_filename(filename)
if file and allowed_file(filename):
open(os.path.join(app.config['UPLOAD_FOLDER'], filename),
'wb').write(file.read())
file_uploaded = True
else:
filename = None
file_uploaded = False
return file_uploaded, filename
python类files()的实例源码
def deployment():
"""
"""
hasFiles = False
if request.files.getlist('file')[0]:
hasFiles = True
DATA = {'files': list(zip(request.files.getlist('file'), request.form.getlist('File Type'), request.form.getlist('file_code')))}
for file, fileType, fileCod in DATA['files']:
if fileType in ['static', 'data'] and not fileCod:
return json.dumps('Error - You should specify a file code for statics and outputs'), 500
env = request.values['env']
isNew = True if request.values['isNew'] == 'true' else False
if isNew:
createEnv(env)
if hasFiles:
return deployFiles(env, DATA)
return json.dumps("Environment created"), 200
def is_hot_dog():
if request.method == 'POST':
if not 'file' in request.files:
return jsonify({'error': 'no file'}), 400
# Image info
img_file = request.files.get('file')
img_name = img_file.filename
mimetype = img_file.content_type
# Return an error if not a valid mimetype
if not mimetype in valid_mimetypes:
return jsonify({'error': 'bad-type'})
# Write image to static directory and do the hot dog check
img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
hot_dog_conf = rekognizer.get_confidence(img_name)
# Delete image when done with analysis
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
is_hot_dog = 'false' if hot_dog_conf == 0 else 'true'
return_packet = {
'is_hot_dog': is_hot_dog,
'confidence': hot_dog_conf
}
return jsonify(return_packet)
def avatar(user_id):
if current_user.id == user_id or current_user.can(Permission.UPDATE_OTHERS_INFORMATION):
the_user = User.query.get_or_404(user_id)
avatar_edit_form = AvatarEditForm()
avatar_upload_form = AvatarUploadForm()
if avatar_upload_form.validate_on_submit():
if 'avatar' in request.files:
forder = str(user_id)
avatar_name = avatars.save(avatar_upload_form.avatar.data, folder=forder)
the_user.avatar = json.dumps({"use_out_url": False, "url": avatar_name})
db.session.add(the_user)
db.session.commit()
flash(u'??????!', 'success')
return redirect(url_for('user.detail', user_id=user_id))
if avatar_edit_form.validate_on_submit():
the_user.avatar = json.dumps({"use_out_url": True, "url": avatar_edit_form.avatar_url.data})
db.session.add(the_user)
db.session.commit()
return redirect(url_for('user.detail', user_id=user_id))
return render_template('avatar_edit.html', user=the_user, avatar_edit_form=avatar_edit_form,
avatar_upload_form=avatar_upload_form, title=u"????")
else:
abort(403)
def temporary_upload(name, fileobj):
"""
Upload a file to a temporary location.
Flask will not load sufficiently large files into memory, so it
makes sense to always load files into a temporary directory.
"""
tempdir = mkdtemp()
filename = secure_filename(fileobj.filename)
filepath = join(tempdir, filename)
fileobj.save(filepath)
try:
yield name, filepath, fileobj.filename
finally:
rmtree(tempdir)
def classify():
if 'audio_file' not in request.files:
return redirect('/')
# File-like object than can be directy passed to soundfile.read()
# without saving to disk.
audio_file = request.files['audio_file']
if audio_file.filename == '':
return redirect('/')
class_probabilities = model.predict_probabilities(audio_file)
class_probabilities = class_probabilities.round(5)
label = model.class_label_from_probabilities(
class_probabilities)
return render_template('home.html',
model_id=model_id,
example_files=example_files,
audio_file=audio_file.filename,
predicted_label=label,
class_probabilities=class_probabilities)
def post_file():
if "var.json" in request.files.keys():
usrvar = json.load(request.files["var.json"])
else:
return make_response("no var.json", 200)
if "data.csv" in request.files.keys():
csvfile = request.files["data.csv"]
resp = { "server-connected": False, "data-posted": False, "err-occur": False }
rpmng = ReportManager(usrvar)
try:
if rpmng.connect_server():
resp["server-connected"] = True
if rpmng.submit_progress(csvfile=csvfile): resp["data-posted"] = True
except:
resp["err-occur"] = True
rpmng.finalize()
return make_response(jsonify(resp), 200)
else:
return make_response("no data.csv", 200)
def post_phonemes():
print(request.files)
if 'wave' not in request.files:
abort(400)
file = request.files['wave']
tf = tempfile.NamedTemporaryFile(dir=UPLOAD_FOLDER)
tmp_filename = tf.name
tf.close()
file.save(tmp_filename)
wave = Wave()
wave.load(tmp_filename)
recogn = PhonemeRecognition()
recogn.load('model_en_full')
recogn.predict(wave)
result = {'phonemes': wave.get_phoneme_map()}
print(result)
return json.dumps(result)
def getAuthorizedFiles(fileConfigs, reportObj, report_name, userDirectory, fnct=None, ajax=False):
ALIAS, DISK_NAME = 0, 1
SQL_CONFIG = os.path.join(current_app.config['ROOT_PATH'], config.ARES_SQLITE_FILES_LOCATION)
sqlFileDict = {'data': 'get_file_auth.sql', 'static': 'static_file_map.sql'}
fileNameToParser = {}
for fileConfig in fileConfigs:
queryFileAuthPrm = {'team': session['TEAM'], 'file_cod': fileConfig['filename'], 'username': current_user.email, 'type': fileConfig.get('folder')}
files = executeSelectQuery(os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION, report_name, 'db', 'admin.db'),
open(os.path.join(SQL_CONFIG, sqlFileDict.get(fileConfig.get('folder')))).read(), params=queryFileAuthPrm)
for file in files:
if fileConfig.get('parser', None):
reportObj.files[file[DISK_NAME]] = fileConfig['parser'](open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME])))
elif fileConfig.get('type') == 'pandas':
reportObj.files[file[DISK_NAME]] = os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME])
else:
reportObj.files[file[DISK_NAME]] = open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME]))
if not ajax:
fileNameToParser[file[DISK_NAME]] = "%s.%s" % (fileConfig['parser'].__module__.split(".")[-1], fileConfig['parser'].__name__)
if fnct == 'params' and not ajax:
reportObj.fileMap.setdefault(file[ALIAS], []).append(file[DISK_NAME])
return fileNameToParser
def getAresFilesVersions():
""" Return the files, the version and the size """
aresModulePath = os.path.join(current_app.config['ROOT_PATH'], config.ARES_FOLDER, 'Lib')
files = {}
for pyFile in os.listdir(aresModulePath):
if Ares.isExcluded(current_app.config['ROOT_PATH'], file=pyFile):
continue
stat = os.stat(os.path.join(aresModulePath, pyFile))
files[pyFile] = [stat.st_mtime, stat.st_size]
# Add all the external libraries
libPath = os.path.join(current_app.config['ROOT_PATH'], 'Lib')
for (path, dirs, f) in os.walk(libPath):
for pyFile in f:
if Ares.isExcluded(current_app.config['ROOT_PATH'], file=pyFile):
continue
stat = os.stat(os.path.join(libPath, pyFile))
files[pyFile] = [stat.st_mtime, stat.st_size]
return json.dumps(files)
def index():
form1 = InputFile(request.form)
form2 = InputLim(request.form)
if form2.limit.data:
session['limit'] = int(form2.limit.data)
else: session['limit'] = 500
if form1.submit1.data :
txt = request.files[form1.textfile.name].read()
txt=str(txt.decode("utf-8"))
txt=regex.sub(" ", txt).lower()
if len(txt)>0 :
session['text'] = txt
if 'text' in session:
result = get_plot(session['limit'], session['text'])
else:
result = None
return render_template('view.html', form1=form1, form2 = form2, result=result)
def _take_template_from_uploads_if_needed(fn):
"""Takes template from request.files if 'template' from **kwargs is empty.
Must be called before @use_kwargs.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
template = kwargs.get('template')
if template is None:
template = request.files.to_dict().get('template')
if template is not None:
template = template.stream.read()
kwargs['template'] = template
return fn(*args, **kwargs)
return wrapper
def index(self):
"""Get the list of objects.
.. :quickref: File; Get the list of objects
Response is paginated and will only contain 25 results. The most recent
objects appear first.
:query page: page number.
:type page: int
:>json list files: list of files (see :http:get:`/files/(id)` for details on the format of a file).
"""
page = int(request.args.get('page', 1))
files = current_user.files.find().sort('_id', DESCENDING).limit(PER_PAGE).skip((page - 1) * PER_PAGE)
pagination = Pagination(page=page, per_page=PER_PAGE, total=files.count(), css_framework='bootstrap3')
files = {'files': clean_files(list(files))}
return render(files, 'files/index.html', ctx={'data': files, 'pagination': pagination})
def get(self, id):
"""Get the object with `id`.
.. :quickref: File; Get an object
Resulting object is in the ``file`` field.
:param id: id of the object.
:>json dict _id: ObjectId dict.
:>json string md5: MD5 hash.
:>json string sha1: SHA1 hash.
:>json string sha256: SHA256 hash.
:>json string type: FAME type.
:>json string mime: mime type.
:>json string detailed_type: detailed type.
:>json list groups: list of groups (as strings) that have access to this file.
:>json list owners: list of groups (as strings) that submitted this file.
:>json list probable_names: list of probable names (as strings).
:>json list analysis: list of analyses' ObjectIds.
:>json list parent_analyses: list of analyses (as ObjectIds) that extracted this object.
:>json dict antivirus: dict with antivirus names as keys.
"""
file = {'file': clean_files(get_or_404(current_user.files, _id=id))}
return return_file(file)
def submit_to_av(self, id, module):
"""Submit a file to an Antivirus module.
.. :quickref: File; Submit file to an antivirus module
If succesful, the response will be ``"ok"``. Otherwise, it will be an
error message.
:param id: id of the file to submit.
:param module: name of the module to submit the file to.
"""
f = File(get_or_404(current_user.files, _id=id))
for av_module in dispatcher.get_antivirus_modules():
if av_module.name == module:
av_module.submit(f['filepath'])
f.update_value(['antivirus', module], True)
break
else:
return make_response("antivirus module '{}' not present / enabled.".format(module))
return make_response("ok")
def jwc():
response = {'succeed': 0}
try:
captcha_file = request.files['captcha']
except Exception as e:
response['reason'] = "cannot fetch the image file, please post it with key 'captcha'."
return json.dumps(response)
if not allowed_file(captcha_file.filename):
response['reason'] = "this file type is no supported."
return json.dumps(response)
try:
im = Image.open(captcha_file)
predict = solve_jwc(im)
except Exception as e:
response['reason'] = "an error occurred: %s" % str(e)
else:
response['succeed'] = 1
response['result'] = predict
return json.dumps(response)
def j():
uploaded_file = request.files['file']
if uploaded_file:
paste_file = PasteFile.create_by_upload_file(uploaded_file)
db.session.add(paste_file)
db.session.commit()
width, height = paste_file.image_size
return jsonify({
'url': paste_file.url_i,
'short_url': paste_file.url_s,
'origin_filename': paste_file.filename,
'hash': paste_file.filehash,
'width': width,
'height': height
})
return abort(400)
def create_post():
post_data = {
'title': request.form.get('title'),
'content': request.form.get('content'),
}
post = Post()
post.set(post_data)
post = markdown(post)
upload_image = request.files.get('featured_image')
if upload_image.filename != '' and allowed_file(upload_image.filename):
f = Attachment(upload_image.filename, data=upload_image.stream)
post.set('featured_image', f)
post.save()
tag_names = request.form.get('tags').lower().strip()
tags = [get_tag_by_name(x) for x in split_tag_names(tag_names)]
map_tags_to_post(tags, post)
return redirect(url_for('show_post', post_id=post.id))
def predict():
import ipdb; ipdb.set_trace(context=20)
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
try:
pokemon_name = predict_mlp(file).capitalize()
pokemon_desc = pokemon_entries.get(pokemon_name)
msg = ""
except Exception as e:
pokemon_name = None
pokemon_desc = None
msg = str(e)
return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def phylab():
response = {'succeed': 0}
try:
captcha_file = request.files['captcha']
except Exception as e:
response['reason'] = "cannot fetch the image file, please post it with key 'captcha'."
return json.dumps(response)
if not allowed_file(captcha_file.filename):
response['reason'] = "this file type is no supported."
return json.dumps(response)
try:
im = Image.open(captcha_file)
predict = solve_phylab(im)
except Exception as e:
response['reason'] = "an error occurred: %s" % str(e)
else:
response['succeed'] = 1
response['result'] = predict
return json.dumps(response)
def xgb():
response = {'succeed': 0}
try:
captcha_file = request.files['captcha']
except Exception as e:
response['reason'] = "cannot fetch the image file, please post it with key 'captcha'."
return json.dumps(response)
if not allowed_file(captcha_file.filename):
response['reason'] = "this file type is no supported."
return json.dumps(response)
try:
filename = time.time()
captcha_file.save("tmp/%s" % (str(filename)))
cmd = "tesseract tmp/%s stdout" % (filename)
predict = str(os.popen(cmd).read().strip('\n'))
os.popen("mv tmp/%s tmp/%s" % (str(filename), "".join([str(filename), "_", predict])))
except Exception as e:
response['reason'] = "an error occurred: %s" % str(e)
else:
response['succeed'] = 1
response['result'] = predict
return json.dumps(response)
def j():
uploaded_file = request.files['file']
if uploaded_file:
rs = create(uploaded_file)
if rs['r']:
return rs['error']
paste_file = rs['paste_file']
width, height = paste_file.image_size
return jsonify({
'url': paste_file.url_i,
'short_url': paste_file.url_s,
'origin_filename': paste_file.filename,
'hash': paste_file.filehash,
'width': width,
'height': height
})
return abort(400)
def j():
uploaded_file = request.files['file']
if uploaded_file:
paste_file = PasteFile.create_by_upload_file(uploaded_file)
db.session.add(paste_file)
db.session.commit()
width, height = paste_file.image_size
return jsonify({
'url': paste_file.url_i,
'short_url': paste_file.url_s,
'origin_filename': paste_file.filename,
'hash': paste_file.filehash,
'width': width,
'height': height
})
return abort(400)
def j():
uploaded_file = request.files['file']
if uploaded_file:
paste_file = PasteFile.create_by_upload_file(uploaded_file)
paste_file.save()
width, height = paste_file.image_size
return jsonify({
'url': paste_file.url_i,
'short_url': paste_file.url_s,
'origin_filename': paste_file.filename,
'hash': paste_file.filehash,
'width': width,
'height': height
})
return abort(400)
def onboard(self):
"""
Do all steps to prepare this service to be instantiated
:return:
"""
# 1. extract the contents of the package and store them in our catalog
self._unpack_service_package()
# 2. read in all descriptor files
self._load_package_descriptor()
self._load_nsd()
self._load_vnfd()
if DEPLOY_SAP:
self._load_saps()
# 3. prepare container images (e.g. download or build Dockerfile)
if BUILD_DOCKERFILE:
self._load_docker_files()
self._build_images_from_dockerfiles()
else:
self._load_docker_urls()
self._pull_predefined_dockerimages()
LOG.info("On-boarded service: %r" % self.manifest.get("name"))
def _load_vnfd(self):
"""
Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
:return:
"""
# first make a list of all the vnfds in the package
vnfd_set = dict()
if "package_content" in self.manifest:
for pc in self.manifest.get("package_content"):
if pc.get("content-type") == "application/sonata.function_descriptor":
vnfd_path = os.path.join(
self.package_content_path,
make_relative_path(pc.get("name")))
vnfd = load_yaml(vnfd_path)
vnfd_set[vnfd.get("name")] = vnfd
# then link each vnf_id in the nsd to its vnfd
for vnf_id in self.vnf_id2vnf_name:
vnf_name = self.vnf_id2vnf_name[vnf_id]
self.vnfds[vnf_id] = vnfd_set[vnf_name]
LOG.debug("Loaded VNFD: {0} id: {1}".format(vnf_name, vnf_id))
def restore_backup():
'''
POST:
Receive a backup file and load it into the system
'''
with tempfile.NamedTemporaryFile(suffix='.nft', delete=False) as tf:
backup = request.files['file'].read()
tf.write(backup)
cmd = nft_utils.nft_command('-f ' + tf.name)
cmd_result = cmd.wait()
if cmd_result == 0:
nft_utils.close_nft_command(cmd)
os.remove(tf.name)
return make_response('Backup restored')
else:
return abort(500, NFTError(Error(cmd.stdout.read())))
def new_analysis():
analysis_pcap = None
analysis_title = None
analysis_ruleset = None
pcap_name = None
pcap_path = None
if "title" in request.form:
analysis_title = request.form["title"]
if "pcap" in request.files:
analysis_pcap = request.files["pcap"]
if "ruleset" in request.form:
analysis_ruleset = request.form["ruleset"]
if analysis_pcap is None:
return render_template("index.html", analyses = db_handler.search_analyses())
pcap_name = analysis_pcap.filename
pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap")
pcap_path = os.path.join(storage_folder, pcap_name)
analysis_pcap.save(pcap_path)
x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title)
analysis_pool.add_analysis(x)
return render_template("index.html", analyses=db_handler.search_analyses())
def api_new_analysis():
analysis_pcap = None
analysis_title = None
analysis_ruleset = None
pcap_name = None
pcap_path = None
if "title" in request.form:
analysis_title = request.form["title"]
if "pcap" in request.files:
analysis_pcap = request.files["pcap"]
if "ruleset" in request.form:
analysis_ruleset = request.form["ruleset"]
if analysis_pcap is None:
return "{'id':0}"
pcap_name = analysis_pcap.filename
pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap")
pcap_path = os.path.join(storage_folder, pcap_name)
analysis_pcap.save(pcap_path)
x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title)
analysis_pool.add_analysis(x)
return json.dumps({"id":x.analysis_id})
def update(year, month):
date = datetime.date(year, month, 1)
filename = None
delete_file = False
if bank_adapter.fetch_type == 'file':
if 'file' not in request.files:
abort(400)
file = request.files['file']
if config.get('imports_dir'):
filename = os.path.join(config['imports_dir'],
'%s-%s' % (datetime.date.today().isoformat(), secure_filename(file.filename)))
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
else:
temp_file = NamedTemporaryFile(delete=False)
filename = temp_file.name
temp_file.close()
delete_file = True
file.save(filename)
update_local_data(config, date=date, filename=filename, storage=storage)
if delete_file:
os.unlink(filename)
return redirect(url_for('index', year=year, month=month))