def get_locs():
"""Return locations (latitude, longitude) for anonymous users."""
# All IP addresses from anonymous users
locs = []
if current_app.config['GEO']:
sql = '''SELECT DISTINCT(user_ip) FROM task_run
WHERE user_ip IS NOT NULL;'''
results = session.execute(sql)
geolite = current_app.root_path + '/../dat/GeoLiteCity.dat'
gic = pygeoip.GeoIP(geolite)
for row in results:
loc = gic.record_by_addr(row.user_ip)
if loc is None:
loc = {}
if (len(loc.keys()) == 0):
loc['latitude'] = 0
loc['longitude'] = 0
locs.append(dict(loc=loc))
return locs
python类root_path()的实例源码
def get_assets(asset_type):
path = os.path.join(current_app.root_path, current_app.config['TEMPLATE_FOLDER'], 'assets.json')
assets = json.load(open(path, 'r'))
output = ''
if asset_type == 'css':
output = '<link rel="stylesheet" href="{0}">'.format(assets['style']['css'])
elif asset_type == 'js':
manifest = assets['manifest']['js']
manifest = manifest[1:]
manifest_file = open(os.path.join(current_app.root_path, current_app.config['STATIC_FOLDER'], manifest), 'r')
output += '<script>' + manifest_file.read() + '</script>'
manifest_file.close()
output += '<script src="{0}"></script>'.format(assets['vendor']['js'])
output += '<script src="{0}"></script>'.format(assets['app']['js'])
return jinja2.Markup(output)
def get_static_folder(app_or_blueprint):
"""Return the static folder of the given Flask app
instance, or module/blueprint.
In newer Flask versions this can be customized, in older
ones (<=0.6) the folder is fixed.
"""
if not hasattr(app_or_blueprint, 'static_folder'):
# I believe this is for app objects in very old Flask
# versions that did not support custom static folders.
return path.join(app_or_blueprint.root_path, 'static')
if not app_or_blueprint.has_static_folder:
# Use an exception type here that is not hidden by spit_prefix.
raise TypeError(('The referenced blueprint %s has no static '
'folder.') % app_or_blueprint)
return app_or_blueprint.static_folder
def load_from_templates(self, env, jinja_extension):
from webassets.ext.jinja2 import Jinja2Loader, AssetsExtension
from flask import current_app as app
# Use the application's Jinja environment to parse
jinja2_env = app.jinja_env
# Get the template directories of app and blueprints
template_dirs = [path.join(app.root_path, app.template_folder)]
for blueprint in app.blueprints.values():
if blueprint.template_folder is None:
continue
template_dirs.append(
path.join(blueprint.root_path, blueprint.template_folder))
return Jinja2Loader(env, template_dirs, [jinja2_env], jinja_ext=jinja_extension).\
load_bundles()
def format_fname(value):
# If the value has a builtin prefix, return it unchanged
if value.startswith(('{', '<')):
return value
value = os.path.normpath(value)
# If the file is absolute, try normalizing it relative to the project root
# to handle it as a project file
if os.path.isabs(value):
value = _shortest_relative_path(
value, [current_app.root_path], os.path)
# If the value is a relative path, it is a project file
if not os.path.isabs(value):
return os.path.join('.', value)
# Otherwise, normalize other paths relative to sys.path
return '<%s>' % _shortest_relative_path(value, sys.path, os.path)
def get(self, oid=None):
"""Return signed VMCP for CernVM requests."""
if current_app.config.get('VMCP_KEY') is None:
message = "The server is not configured properly, contact the admins"
error = self._format_error(status_code=501, message=message)
return Response(json.dumps(error), status=error['status_code'],
mimetype='application/json')
pkey = (current_app.root_path + '/../keys/' +
current_app.config.get('VMCP_KEY'))
if not os.path.exists(pkey):
message = "The server is not configured properly (private key is missing), contact the admins"
error = self._format_error(status_code=501, message=message)
return Response(json.dumps(error), status=error['status_code'],
mimetype='application/json')
if request.args.get('cvm_salt') is None:
message = "cvm_salt parameter is missing"
error = self._format_error(status_code=415, message=message)
return Response(json.dumps(error), status=error['status_code'],
mimetype='application/json')
salt = request.args.get('cvm_salt')
data = request.args.copy()
signed_data = pybossa.vmcp.sign(data, salt, pkey)
return Response(json.dumps(signed_data), 200, mimetype='application/json')
def template_files(filename):
template_path = os.path.join(current_app.root_path,
current_app.template_folder)
return send_from_directory(template_path, filename)
def theme_template_files(identifier, filename):
template_path = os.path.join(
current_app.root_path,
"themes",
identifier,
"templates"
)
return send_from_directory(template_path, filename)
def _remove_compiled_translations():
translations_folder = os.path.join(current_app.root_path, "translations")
# walks through the translations folder and deletes all files
# ending with .mo
for root, dirs, files in os.walk(translations_folder):
for name in files:
if name.endswith(".mo"):
os.unlink(os.path.join(root, name))
def _compile_translations():
PLUGINS_FOLDER = os.path.join(current_app.root_path, "plugins")
translations_folder = os.path.join(current_app.root_path, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
for plugin in plugin_manager.all_plugins:
plugin_folder = os.path.join(PLUGINS_FOLDER, plugin)
translations_folder = os.path.join(plugin_folder, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
def _remove_compiled_translations():
translations_folder = os.path.join(current_app.root_path, "translations")
# walks through the translations folder and deletes all files
# ending with .mo
for root, dirs, files in os.walk(translations_folder):
for name in files:
if name.endswith(".mo"):
os.unlink(os.path.join(root, name))
def _compile_translations():
PLUGINS_FOLDER = os.path.join(current_app.root_path, "plugins")
translations_folder = os.path.join(current_app.root_path, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
for plugin in plugin_manager.all_plugins:
plugin_folder = os.path.join(PLUGINS_FOLDER, plugin)
translations_folder = os.path.join(plugin_folder, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
def _remove_compiled_translations():
translations_folder = os.path.join(current_app.root_path, "translations")
# walks through the translations folder and deletes all files
# ending with .mo
for root, dirs, files in os.walk(translations_folder):
for name in files:
if name.endswith(".mo"):
os.unlink(os.path.join(root, name))
def download():
return send_from_directory(directory=current_app.root_path, filename='rules.db', as_attachment=True)
def download_html_image(url, html, image_path):
""" ??html???? """
soup = BeautifulSoup(html, "html.parser")
imgs = soup.select("img")
for img in imgs:
src = img['src'] if not url else full_url(url, img["src"])
_, ext = os.path.splitext(src)
filename = "/{0}/{1}{2}".format(image_path, uuid.uuid1().hex, ext)
full_filename = "{0}{1}".format(current_app.root_path, filename)
filename = "{0}{1}".format(current_app.config["UPLOADIMG_HOST"], filename)
if not download_file(src, full_filename):
img['src'] = src
else:
img['src'] = filename
return unicode(soup)
def _50px_avatar(self):
""" 50*50????? """
image_path = "/".join(current_app.config["AVATAR_IMAGE_PATH"].split("/")[1:])
filename = "/".join([image_path, "50_50_{0}".format(self.avatar)])
real_file = "/".join([current_app.root_path, filename])
if not os.path.exists(real_file):
return self.thumbnail_avatar()
return filename
def generate_origin_avatar(name, im):
""" ????????? """
avatar_image = "/".join([current_app.root_path,
current_app.config["AVATAR_IMAGE_PATH"], name])
im.save(avatar_image)
def generate_thumbnail_avatar(name, im):
""" ??128*128????? """
name = "thumbnail_{0}".format(name)
avatar_image = "/".join([current_app.root_path,
current_app.config["AVATAR_IMAGE_PATH"], name])
_im = im.resize((128,128), Image.ANTIALIAS)
_im.save(avatar_image)
def generate_50px_avatar(name, im):
""" ??50*50????? """
name = "50_50_{0}".format(name)
avatar_image = "/".join([current_app.root_path,
current_app.config["AVATAR_IMAGE_PATH"], name])
_im = im.resize((50, 50), Image.ANTIALIAS)
_im.save(avatar_image)
def init_avatar():
common_image = '/'.join([current_app.root_path,
current_app.config["IMG_PATH"],
"blue.jpg"])
u = uuid.uuid1()
name = '{0}.jpg'.format(u.hex)
im = Image.open(common_image)
generate_origin_avatar(name, im)
generate_thumbnail_avatar(name, im)
generate_20px_avatar(name, im)
generate_50px_avatar(name, im)
return name
def change_cover(binary, old_cover):
u = uuid.uuid1()
name = "{0}.png".format(u.hex)
cover_image = '/'.join([current_app.root_path,
current_app.config["COVER_IMAGE_PATH"], name])
im = Image.open(StringIO.StringIO(binary))
im.save(cover_image)
im.thumbnail((320, 80), Image.ANTIALIAS)
thumbnail_image = 'thumbnail_{0}'.format(name)
thumbnail_image= '/'.join([current_app.root_path,
current_app.config["COVER_IMAGE_PATH"], thumbnail_image])
im.save(thumbnail_image)
if old_cover:
remove_cover(old_cover)
return name
def generate_tmpfile(file):
"""
????
@file: ????
"""
u = uuid.uuid1()
name, ext = os.path.splitext(file.filename)
_filename = ''.join([u.hex, ext])
path = '/'.join(['', current_app.config["TMP_PATH"], _filename])
tmp_path = ''.join([current_app.root_path, path])
file.save(tmp_path)
return (path, name)
def enable_tmpfile(dest_path, filename):
"""
??????
@dest_path: ????????
@filename: ???
"""
tmp_path = current_app.config["TMP_PATH"]
tmp_file = '/'.join([current_app.root_path, tmp_path, filename])
if not os.path.exists(tmp_file):
return False
dest_file = '/'.join([current_app.root_path, dest_path, filename])
shutil.move(tmp_file, dest_file)
return True
def remove_file(path, filename):
"""
????
@path: ????
@filename: ???
"""
full_file = '/'.join([current_app.root_path, path, filename])
if not os.path.exists(full_file):
return False
print 'remove_file', full_file
os.remove(full_file)
return True
def _remove_compiled_translations():
translations_folder = os.path.join(current_app.root_path, "translations")
# walks through the translations folder and deletes all files
# ending with .mo
for root, dirs, files in os.walk(translations_folder):
for name in files:
if name.endswith(".mo"):
os.unlink(os.path.join(root, name))
def _compile_translations():
PLUGINS_FOLDER = os.path.join(current_app.root_path, "plugins")
translations_folder = os.path.join(current_app.root_path, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
for plugin in plugin_manager.all_plugins:
plugin_folder = os.path.join(PLUGINS_FOLDER, plugin)
translations_folder = os.path.join(plugin_folder, "translations")
subprocess.call(["pybabel", "compile", "-d", translations_folder])
def build_challenges():
chal_path = path.join(app.root_path, "../",
app.config['CTF']['challenges'])
categories = app.config['CTF']['categories']
for c in categories:
problem_config = path.join(chal_path, c, "problems.json")
with open(problem_config, 'r') as config_file:
try:
config = json.load(config_file)
except ValueError:
raise ValueError("%s was malformed" % config_file)
for problem in config["problems"]:
problem_dict = build_problem_options(problem, c)
challenge = Challenge(**problem_dict)
db.session.add(challenge)
for f in problem["resources"]:
file_path = path.join(chal_path, c)
resource = Resource(name=f,
path=file_path,
challenge=challenge)
db.session.add(resource)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
challenge = Challenge.query.filter_by(
title=problem['title'])
problem_dict.update({'id': challenge.first().id})
challenge.update(problem_dict)
db.session.commit()
def claim(research_object, orcid):
if not person_exist(orcid):
req = urllib2.Request('http://orcid.org/' + orcid)
req.add_header('Accept', 'text/turtle')
response = urllib2.urlopen(req)
file_path = os.path.join(current_app.root_path, conf.TMP_DIR) + orcid + '.ttl'
load_turtle(file_path, response.read())
if research_object['type'] == 'repo':
ro_ttl = create_repo(research_object, orcid)
elif research_object['type'] == 'presentation':
ro_ttl = create_presentation(research_object, orcid)
else:
ro_ttl = create_creative_work(research_object, orcid)
file_path = os.path.join(current_app.root_path, conf.TMP_DIR) + str(research_object['id']) + '.ttl'
load_turtle(file_path, ro_ttl)
def create_disco():
disco = request.get_json()
orcid = 'http://orcid.org/' + disco['orcid']
disco_uri = conf.BASE_URI + 'discos/' + uuid.uuid4().hex
disco_rdf = rdft.PREFIXES
disco_rdf += rdft.DISCO.format(disco_uri=disco_uri, disco_description=disco['description'], person_uri=orcid)
for ro in disco['ros']:
disco_rdf += rdft.DISCO_RESOURCE.format(disco_uri=disco_uri, ro_uri=ro)
file_path = os.path.join(current_app.root_path, conf.TMP_DIR) + disco_uri.replace('/', '_') + '.ttl'
load_turtle(file_path, disco_rdf)
return jsonify(disco)
def update_disco():
disco = request.get_json()
orcid = 'http://orcid.org/' + disco['orcid']
disco_uri = disco['uri']
delete_disco(disco_uri)
disco_rdf = rdft.PREFIXES
disco_rdf += rdft.DISCO.format(disco_uri=disco_uri, disco_description=disco['description'], person_uri=orcid)
for ro in disco['ros']:
disco_rdf += rdft.DISCO_RESOURCE.format(disco_uri=disco_uri, ro_uri=ro)
file_path = os.path.join(current_app.root_path, conf.TMP_DIR) + disco_uri.replace('/', '_') + '.ttl'
load_turtle(file_path, disco_rdf)
return jsonify(disco)