def get_chapter(book_id):
# chapters = Chapter.query.filter_by(book_id=book_id).all()
page = request.args.get('page', 1, type=int)
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_chapter', book_id=book_id, page=page+1, _external=True)
return jsonify({
'chapters': [chapter.to_json() for chapter in chapters],
'prev': prev,
'next': next
})
python类config()的实例源码
def get_export_task_jobs(queue):
"""Export tasks to zip."""
from pybossa.core import project_repo
import pybossa.cache.projects as cached_projects
from pybossa.pro_features import ProFeatureHandler
feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
timeout = current_app.config.get('TIMEOUT')
if feature_handler.only_for_pro('updated_exports'):
if queue == 'high':
projects = cached_projects.get_from_pro_user()
else:
projects = (p.dictize() for p in project_repo.filter_by(published=True)
if p.owner.pro is False)
else:
projects = (p.dictize() for p in project_repo.filter_by(published=True))
for project in projects:
project_id = project.get('id')
job = dict(name=project_export,
args=[project_id], kwargs={},
timeout=timeout,
queue=queue)
yield job
def get_autoimport_jobs(queue='low'):
"""Get autoimport jobs."""
from pybossa.core import project_repo
import pybossa.cache.projects as cached_projects
from pybossa.pro_features import ProFeatureHandler
feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
timeout = current_app.config.get('TIMEOUT')
if feature_handler.only_for_pro('autoimporter'):
projects = cached_projects.get_from_pro_user()
else:
projects = (p.dictize() for p in project_repo.get_all())
for project_dict in projects:
project = project_repo.get(project_dict['id'])
if project.has_autoimporter():
job = dict(name=import_tasks,
args=[project.id, True],
kwargs=project.get_autoimporter(),
timeout=timeout,
queue=queue)
yield job
# The following are the actual jobs (i.e. tasks performed in the background)
def import_tasks(project_id, from_auto=False, **form_data):
"""Import tasks for a project."""
from pybossa.core import project_repo
project = project_repo.get(project_id)
report = importer.create_tasks(task_repo, project_id, **form_data)
if from_auto:
form_data['last_import_meta'] = report.metadata
project.set_autoimporter(form_data)
project_repo.save(project)
msg = report.message + ' to your project %s!' % project.name
subject = 'Tasks Import to your project %s' % project.name
body = 'Hello,\n\n' + msg + '\n\nAll the best,\nThe %s team.'\
% current_app.config.get('BRAND')
mail_dict = dict(recipients=[project.owner.email_addr],
subject=subject, body=body)
send_mail(mail_dict)
return msg
def index(page=1):
"""Index page for all PYBOSSA registered users."""
update_feed = get_update_feed()
per_page = 24
count = cached_users.get_total_users()
accounts = cached_users.get_users_page(page, per_page)
if not accounts and page != 1:
abort(404)
pagination = Pagination(page, per_page, count)
if current_user.is_authenticated():
user_id = current_user.id
else:
user_id = None
top_users = cached_users.get_leaderboard(current_app.config['LEADERBOARD'],
user_id)
tmp = dict(template='account/index.html', accounts=accounts,
total=count,
top_users=top_users,
title="Community", pagination=pagination,
update_feed=update_feed)
return handle_content_type(tmp)
def confirm_email():
"""Send email to confirm user email."""
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED')
if acc_conf_dis:
return abort(404)
if current_user.valid_email is False:
user = user_repo.get(current_user.id)
account = dict(fullname=current_user.fullname, name=current_user.name,
email_addr=current_user.email_addr)
confirm_url = get_email_confirmation_url(account)
subject = ('Verify your email in %s' % current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[current_user.email_addr],
body=render_template('/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = render_template('/account/email/validate_email.html',
user=account, confirm_url=confirm_url)
mail_queue.enqueue(send_mail, msg)
msg = gettext("An e-mail has been sent to \
validate your e-mail address.")
flash(msg, 'info')
user.confirmation_email_sent = True
user_repo.update(user)
return redirect_content_type(url_for('.profile', name=current_user.name))
def index(window=0):
"""Get the last activity from users and projects."""
if current_user.is_authenticated():
user_id = current_user.id
else:
user_id = None
if window >= 10:
window = 10
top_users = cached_users.get_leaderboard(current_app.config['LEADERBOARD'],
user_id=user_id,
window=window)
response = dict(template='/stats/index.html',
title="Community Leaderboard",
top_users=top_users)
return handle_content_type(response)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def get_disqus_sso_payload(user):
"""Return remote_auth_s3 and api_key for user."""
DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
if user:
data = simplejson.dumps({
'id': user.id,
'username': user.name,
'email': user.email_addr,
})
else:
data = simplejson.dumps({})
# encode the data to base64
message = base64.b64encode(data)
# generate a timestamp for signing the message
timestamp = int(time.time())
# generate our hmac signature
sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
hashlib.sha1).hexdigest()
return message, timestamp, sig, DISQUS_PUBLIC_KEY
else:
return None, None, None, None
def test_register_json_errors_get(self):
"""Test WEB register errors JSON works"""
with patch.dict(self.flask_app.config, {'WTF_CSRF_ENABLED': True}):
csrf = self.get_csrf('/account/register')
userdict = {'fullname': 'a', 'name': 'name',
'email_addr': None, 'password': 'p'}
res = self.app.post('/account/register', data=json.dumps(userdict),
content_type='application/json',
headers={'X-CSRFToken': csrf})
# The output should have a mime-type: application/json
errors = json.loads(res.data).get('form').get('errors')
assert res.mimetype == 'application/json', res.data
err_msg = "There should be an error with the email"
assert errors.get('email_addr'), err_msg
err_msg = "There should be an error with fullname"
assert errors.get('fullname'), err_msg
err_msg = "There should be an error with password"
assert errors.get('password'), err_msg
err_msg = "There should NOT be an error with name"
assert errors.get('name') is None, err_msg
def get_job_results(job_id):
"""Get data from previous jobs
:param job_id: ID number to get results of job
:return: results_path - path to the job data
This checks to see if the job that you are looking for exists, and returns the path to that job's data
"""
if not os.path.exists(os.path.join(app.config['BOOMERANG_FETCH_DIR'], str(job_id))):
raise ValueError("Job Does not Exist")
results_path = os.path.join(app.config['BOOMERANG_FETCH_DIR'], str(job_id), "results.zip")
if not os.path.isfile(results_path):
raise ValueError("Results not there...")
return results_path
def get_connection(self, bind=None):
"""Return a py2neo.Graph object based on config settings"""
# assume default if no bind specified
if bind is None:
bind = self.DEFAULT_GRAPH_KEY
# credentials are bound to the application context
ctx = stack.top
if ctx is not None:
if not hasattr(ctx, 'ogm_graphs'.format(bind)):
ctx.ogm_graphs = {}
if bind not in ctx.ogm_graphs:
ctx.ogm_graphs[bind] = self.connect(bind=bind)
return ctx.ogm_graphs[bind]
raise OutOfApplicationContextError()
def change_email(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def get_posts():
page = request.args.get('page', 1, type=int)
pagination = Post.query.paginate(
page, per_page=current_app.config['CIRCULATE_POSTS_PER_PAGE'],
error_out=False)
posts = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_posts', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_posts', page=page+1, _external=True)
return jsonify({
'posts': [post.to_json() for post in posts],
'prev': prev,
'next': next,
'count': pagination.total
})
def get_user_posts(id):
user = User.query.get_or_404(id)
page = request.args.get('page', 1, type=int)
pagination = user.posts.order_by(Post.timestamp.desc()).paginate(
page, per_page=current_app.config['CIRCULATE_POSTS_PER_PAGE'],
error_out=False)
posts = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_user_posts', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_user_posts', page=page+1, _external=True)
return jsonify({
'posts': [post.to_json() for post in posts],
'prev': prev,
'next': next,
'count': pagination.total
})
def get_comments():
page = request.args.get('page', 1, type=int)
pagination = Comment.query.order_by(Comment.timestamp.desc()).paginate(
page, per_page=current_app.config['CIRCULATE_COMMENTS_PER_PAGE'],
error_out=False)
comments = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_comments', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_comments', page=page+1, _external=True)
return jsonify({
'comments': [comment.to_json() for comment in comments],
'prev': prev,
'next': next,
'count': pagination.total
})
def get_post_comments(id):
post = Post.query.get_or_404(id)
page = request.args.get('page', 1, type=int)
pagination = post.comments.order_by(Comment.timestamp.asc()).paginate(
page, per_page=current_app.config['CIRCULATE_COMMENTS_PER_PAGE'],
error_out=False)
comments = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_post_comments', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_post_comments', page=page+1, _external=True)
return jsonify({
'comments': [comment.to_json() for comment in comments],
'prev': prev,
'next': next,
'count': pagination.total
})
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def protect(self):
if request.method not in current_app.config['WTF_CSRF_METHODS']:
return
try:
validate_csrf(self._get_csrf_token())
except ValidationError as e:
logger.info(e.args[0])
self._error_response(e.args[0])
if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']:
if not request.referrer:
self._error_response('The referrer header is missing.')
good_referrer = 'https://{0}/'.format(request.host)
if not same_origin(request.referrer, good_referrer):
self._error_response('The referrer does not match the host.')
g.csrf_valid = True # mark this request as CSRF valid
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def protect(self):
if request.method not in current_app.config['WTF_CSRF_METHODS']:
return
try:
validate_csrf(self._get_csrf_token())
except ValidationError as e:
logger.info(e.args[0])
self._error_response(e.args[0])
if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']:
if not request.referrer:
self._error_response('The referrer header is missing.')
good_referrer = 'https://{0}/'.format(request.host)
if not same_origin(request.referrer, good_referrer):
self._error_response('The referrer does not match the host.')
g.csrf_valid = True # mark this request as CSRF valid
def create_standard_claims(expire=None):
"""Return standard payload for JWT."""
expire = expire or JWT_EXPIRATION_DELTA
iat = datetime.utcnow()
exp = iat + timedelta(seconds=expire)
jti = str(uuid.uuid4()).replace("-", "")
iss = current_app.config["name"]
standard_claims = {
# JWT standard fields
'iat': iat,
'exp': exp,
'jti': jti,
'iss': iss,
# Drift fields
'tier': g.driftenv["tier_name"],
'tenant': g.driftenv["name"],
}
return standard_claims
def sendmail(addr, text):
if get_config('mg_api_key') and app.config['ADMINS']:
ctf_name = get_config('ctf_name')
mg_api_key = get_config('mg_api_key')
return requests.post(
"https://api.mailgun.net/v2/mail"+app.config['HOST']+"/messages",
auth=("api", mg_api_key),
data={"from": "{} Admin <{}>".format(ctf_name, app.config['ADMINS'][0]),
"to": [addr],
"subject": "Message from {0}".format(ctf_name),
"text": text})
elif app.config['MAIL_SERVER'] and app.config['MAIL_PORT'] and app.config['ADMINS']:
try:
msg = Message("Message from {0}".format(get_config('ctf_name')), sender=app.config['ADMINS'][0], recipients=[addr])
msg.body = text
mail.send(msg)
return True
except:
return False
else:
return False
def _ldap_authenticate(username, password):
"""Performs a search bind to authenticate a user.
LDAP server details are defined in :doc:`config`.
:param username: LDAP username
:param password: LDAP password
:return: Returns a tuple of user_info and authentication status
:rtype: tuple
"""
user = ldap3_manager.get_user_info_for_username(username)
ldap_auth = ldap3_manager.authenticate_search_bind(username, password)
if ldap_auth.status is AuthenticationResponseStatus.success:
authenticated = True
else:
authenticated = False
return user, authenticated
def send_email(subject, recipients, template, **kwargs):
if not isinstance(recipients, list):
recipients = list(recipients)
msg = Message(subject, reply_to=current_app.config['MAIL_DEFAULT_SENDER'],
recipients=recipients)
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
attachments = kwargs.get('attachments', [])
mimes = MimeTypes()
for file in attachments:
path_ = os.path.join(current_app.config['APP_UPLOADS'], file)
with current_app.open_resource(path_) as fp:
mime = mimes.guess_type(fp.name)
msg.attach(path_, mime[0], fp.read())
app = current_app._get_current_object()
t = Thread(target=send_async_email, args=[app, msg])
t.start()
return t
def reset_password(self, token, new_pass):
"""Reset password. Token is generated by
:meth:`~User.generate_reset_token`
:param token:
:param new_pass:
:return:
"""
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('user_id') == self.id:
self.password = new_pass
db.session.add(self)
db.session.commit()
return True
return False
def load_into_pgsql(self, capture_stderr=True):
if not os.path.exists(self.overpass_filename):
return 'no data from overpass to load with osm2pgsql'
if os.stat(self.overpass_filename).st_size == 0:
return 'no data from overpass to load with osm2pgsql'
cmd = self.osm2pgsql_cmd()
if not capture_stderr:
p = subprocess.run(cmd,
env={'PGPASSWORD': current_app.config['DB_PASS']})
return
p = subprocess.run(cmd,
stderr=subprocess.PIPE,
env={'PGPASSWORD': current_app.config['DB_PASS']})
if p.returncode != 0:
if b'Out of memory' in p.stderr:
return 'out of memory'
else:
return p.stderr.decode('utf-8')
def lookup_with_params(**kwargs):
url = 'http://nominatim.openstreetmap.org/search'
params = {
'format': 'jsonv2',
'addressdetails': 1,
'email': current_app.config['ADMIN_EMAIL'],
'extratags': 1,
'limit': 20,
'namedetails': 1,
'accept-language': 'en',
'polygon_text': 1,
}
params.update(kwargs)
r = requests.get(url, params=params, headers=user_agent_headers())
if r.status_code == 500:
raise SearchError
try:
return json.loads(r.text, object_pairs_hook=OrderedDict)
except json.decoder.JSONDecodeError:
raise SearchError(r)