def search(keyword):
Blog.update()
pt = request.args.get('page')
if pt is None:
page = 1
else:
page = int(pt)
try:
keywords = keyword.split()
from sqlalchemy import and_, or_
rules = and_(
*[or_(Post.title.ilike('%%%s%%' % k), Post.summary.ilike('%%%s%%' % k), Post.content.ilike('%%%s%%' % k))
for k in keywords])
pagination = Post.query.filter(rules).order_by(Post.date.desc()).paginate(
page=page, per_page=current_app.config['FMBLOG_PER_PAGE'])
except Exception:
return render_template('404.html', e='Error: Empty Keyword', site=current_app.config['FMBLOG_SITE'],
value={}), 404
return render_template('search.html', value={'keyword': keyword},
pagination=pagination, endpoint='main.search',
page_list=get_page_list(pagination, page),
tags=Tag.query.order_by(Tag.count.desc()).all(),
cats=Category.query.order_by(Category.count.desc()).all(),
site=current_app.config['FMBLOG_SITE'])
python类config()的实例源码
def sync_repo_hook(self, repo_id):
"""Sync a GitHub repo's hook with the locally stored repo."""
# Get the hook that we may have set in the past
gh_repo = self.api.repository_with_id(repo_id)
hooks = (hook.id for hook in gh_repo.hooks()
if hook.config.get('url', '') == self.webhook_url)
hook_id = next(hooks, None)
# If hook on GitHub exists, get or create corresponding db object and
# enable the hook. Otherwise remove the old hook information.
if hook_id:
Repository.enable(user_id=self.user_id,
github_id=gh_repo.id,
name=gh_repo.full_name,
hook=hook_id)
else:
Repository.disable(user_id=self.user_id,
github_id=gh_repo.id,
name=gh_repo.full_name)
def chapter(book_id):
page = request.args.get('page', 1, type=int)
all_chapter = Chapter.query.filter_by(book_id=book_id).first()
# print(type(pagination))
if all_chapter:
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
book = Novel.query.filter_by(id=book_id).first()
return render_template('chapter.html', book=book, chapters=chapters, pagination=pagination)
spider = DdSpider()
book = Novel.query.filter_by(id=book_id).first()
for data in spider.get_chapter(book.book_url):
chapter = Chapter(chapter=data['chapter'],
chapter_url=data['url'],
book_id=book_id)
db.session.add(chapter)
pagination2 = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination2.items
return render_template('chapter.html', book=book, chapters=chapters, pagination=pagination2)
def get_default_jobs(): # pragma: no cover
"""Return default jobs."""
timeout = current_app.config.get('TIMEOUT')
yield dict(name=warm_up_stats, args=[], kwargs={},
timeout=timeout, queue='high')
yield dict(name=warn_old_project_owners, args=[], kwargs={},
timeout=timeout, queue='low')
yield dict(name=warm_cache, args=[], kwargs={},
timeout=timeout, queue='super')
yield dict(name=news, args=[], kwargs={},
timeout=timeout, queue='low')
def get_maintenance_jobs():
"""Return mantainance jobs."""
timeout = current_app.config.get('TIMEOUT')
yield dict(name=check_failed, args=[], kwargs={},
timeout=timeout, queue='maintenance')
def get_project_jobs(queue='super'):
"""Return a list of jobs based on user type."""
from pybossa.cache import projects as cached_projects
timeout = current_app.config.get('TIMEOUT')
return create_dict_jobs(cached_projects.get_from_pro_user(),
get_project_stats,
timeout=timeout,
queue=queue)
def get_inactive_users_jobs(queue='quaterly'):
"""Return a list of inactive users that have contributed to a project."""
from sqlalchemy.sql import text
from pybossa.model.user import User
from pybossa.core import db
# First users that have participated once but more than 3 months ago
sql = text('''SELECT user_id FROM task_run
WHERE user_id IS NOT NULL
AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
>= NOW() - '12 month'::INTERVAL
AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
< NOW() - '3 month'::INTERVAL
GROUP BY user_id ORDER BY user_id;''')
results = db.slave_session.execute(sql)
timeout = current_app.config.get('TIMEOUT')
for row in results:
user = User.query.get(row.user_id)
if user.subscribed:
subject = "We miss you!"
body = render_template('/account/email/inactive.md',
user=user.dictize(),
config=current_app.config)
html = render_template('/account/email/inactive.html',
user=user.dictize(),
config=current_app.config)
mail_dict = dict(recipients=[user.email_addr],
subject=subject,
body=body,
html=html)
job = dict(name=send_mail,
args=[mail_dict],
kwargs={},
timeout=timeout,
queue=queue)
yield job
def get_leaderboard_jobs(queue='super'): # pragma: no cover
"""Return leaderboard jobs."""
timeout = current_app.config.get('TIMEOUT')
yield dict(name=leaderboard.leaderboard, args=[], kwargs={},
timeout=timeout, queue=queue)
def get_non_contributors_users_jobs(queue='quaterly'):
"""Return a list of users that have never contributed to a project."""
from sqlalchemy.sql import text
from pybossa.model.user import User
from pybossa.core import db
# Second users that have created an account but never participated
sql = text('''SELECT id FROM "user" WHERE
NOT EXISTS (SELECT user_id FROM task_run
WHERE task_run.user_id="user".id)''')
results = db.slave_session.execute(sql)
timeout = current_app.config.get('TIMEOUT')
for row in results:
user = User.query.get(row.id)
if user.subscribed:
subject = "Why don't you help us?!"
body = render_template('/account/email/noncontributors.md',
user=user.dictize(),
config=current_app.config)
html = render_template('/account/email/noncontributors.html',
user=user.dictize(),
config=current_app.config)
mail_dict = dict(recipients=[user.email_addr],
subject=subject,
body=body,
html=html)
job = dict(name=send_mail,
args=[mail_dict],
kwargs={},
timeout=timeout,
queue=queue)
yield job
def get_project_stats(_id, short_name): # pragma: no cover
"""Get stats for project."""
import pybossa.cache.projects as cached_projects
import pybossa.cache.project_stats as stats
from flask import current_app
cached_projects.get_project(short_name)
cached_projects.n_tasks(_id)
cached_projects.n_task_runs(_id)
cached_projects.overall_progress(_id)
cached_projects.last_activity(_id)
cached_projects.n_completed_tasks(_id)
cached_projects.n_volunteers(_id)
cached_projects.browse_tasks(_id)
stats.get_stats(_id, current_app.config.get('GEO'))
def webhook(url, payload=None, oid=None):
"""Post to a webhook."""
from flask import current_app
from readability.readability import Document
try:
import json
from pybossa.core import sentinel, webhook_repo, project_repo
project = project_repo.get(payload['project_id'])
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
if oid:
webhook = webhook_repo.get(oid)
else:
webhook = Webhook(project_id=payload['project_id'],
payload=payload)
if url:
response = requests.post(url, data=json.dumps(payload), headers=headers)
webhook.response = Document(response.text).summary()
webhook.response_status_code = response.status_code
else:
raise requests.exceptions.ConnectionError('Not URL')
if oid:
webhook_repo.update(webhook)
webhook = webhook_repo.get(oid)
else:
webhook_repo.save(webhook)
except requests.exceptions.ConnectionError:
webhook.response = 'Connection Error'
webhook.response_status_code = None
webhook_repo.save(webhook)
finally:
if project.published and webhook.response_status_code != 200 and current_app.config.get('ADMINS'):
subject = "Broken: %s webhook failed" % project.name
body = 'Sorry, but the webhook failed'
mail_dict = dict(recipients=current_app.config.get('ADMINS'),
subject=subject, body=body, html=webhook.response)
send_mail(mail_dict)
if current_app.config.get('SSE'):
publish_channel(sentinel, payload['project_short_name'],
data=webhook.dictize(), type='webhook',
private=True)
return webhook
def get_weekly_stats_update_projects():
"""Return email jobs with weekly stats update for project owner."""
from sqlalchemy.sql import text
from pybossa.core import db
from pybossa.pro_features import ProFeatureHandler
feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
only_pros = feature_handler.only_for_pro('project_weekly_report')
only_pros_sql = 'AND "user".pro=true' if only_pros else ''
send_emails_date = current_app.config.get('WEEKLY_UPDATE_STATS')
today = datetime.today().strftime('%A').lower()
timeout = current_app.config.get('TIMEOUT')
if today.lower() == send_emails_date.lower():
sql = text('''
SELECT project.id
FROM project, "user", task
WHERE "user".id=project.owner_id %s
AND "user".subscribed=true
AND task.project_id=project.id
AND task.state!='completed'
UNION
SELECT project.id
FROM project
WHERE project.featured=true;
''' % only_pros_sql)
results = db.slave_session.execute(sql)
for row in results:
job = dict(name=send_weekly_stats_project,
args=[row.id],
kwargs={},
timeout=timeout,
queue='low')
yield job
def check_failed():
"""Check the jobs that have failed and requeue them."""
from rq import Queue, get_failed_queue, requeue_job
from pybossa.core import sentinel
fq = get_failed_queue()
job_ids = fq.job_ids
count = len(job_ids)
FAILED_JOBS_RETRIES = current_app.config.get('FAILED_JOBS_RETRIES')
for job_id in job_ids:
KEY = 'pybossa:job:failed:%s' % job_id
job = fq.fetch_job(job_id)
if sentinel.slave.exists(KEY):
sentinel.master.incr(KEY)
else:
ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
sentinel.master.setex(KEY, ttl, 1)
if int(sentinel.slave.get(KEY)) < FAILED_JOBS_RETRIES:
requeue_job(job_id)
else:
KEY = 'pybossa:job:failed:mailed:%s' % job_id
if (not sentinel.slave.exists(KEY) and
current_app.config.get('ADMINS')):
subject = "JOB: %s has failed more than 3 times" % job_id
body = "Please, review the background jobs of your server."
body += "\n This is the trace error\n\n"
body += "------------------------------\n\n"
body += job.exc_info
mail_dict = dict(recipients=current_app.config.get('ADMINS'),
subject=subject, body=body)
send_mail(mail_dict)
ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
sentinel.master.setex(KEY, ttl, True)
if count > 0:
return "JOBS: %s You have failed the system." % job_ids
else:
return "You have not failed the system"
def push_notification(project_id, **kwargs):
"""Send push notification."""
from pybossa.core import project_repo
project = project_repo.get(project_id)
if project.info.get('onesignal'):
app_id = current_app.config.get('ONESIGNAL_APP_ID')
api_key = current_app.config.get('ONESIGNAL_API_KEY')
client = PybossaOneSignal(app_id=app_id, api_key=api_key)
filters = [{"field": "tag", "key": project_id, "relation": "exists"}]
return client.push_msg(contents=kwargs['contents'],
headings=kwargs['headings'],
launch_url=kwargs['launch_url'],
web_buttons=kwargs['web_buttons'],
filters=filters)
def confirm_account():
"""Confirm account endpoint."""
key = request.args.get('key')
if key is None:
abort(403)
try:
timeout = current_app.config.get('ACCOUNT_LINK_EXPIRATION', 3600)
userdict = signer.loads(key, max_age=timeout, salt='account-validation')
except BadData:
abort(403)
# First check if the user exists
user = user_repo.get_by_name(userdict['name'])
if user is not None:
return _update_user_with_valid_email(user, userdict['email_addr'])
return _create_account(userdict)
def _handle_avatar_update(user, avatar_form):
if avatar_form.validate_on_submit():
_file = request.files['avatar']
coordinates = (avatar_form.x1.data, avatar_form.y1.data,
avatar_form.x2.data, avatar_form.y2.data)
prefix = time.time()
_file.filename = "%s_avatar.png" % prefix
container = "user_%s" % user.id
uploader.upload_file(_file,
container=container,
coordinates=coordinates)
# Delete previous avatar from storage
if user.info.get('avatar'):
uploader.delete_file(user.info['avatar'], container)
upload_method = current_app.config.get('UPLOAD_METHOD')
avatar_url = get_avatar_url(upload_method,
_file.filename, container)
user.info = {'avatar': _file.filename,
'container': container,
'avatar_url': avatar_url}
user_repo.update(user)
cached_users.delete_user_summary(user.name)
flash(gettext('Your avatar has been updated! It may \
take some minutes to refresh...'), 'success')
return True
else:
flash("You have to provide an image file to update your avatar", "error")
return False
def _handle_profile_update(user, update_form):
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED')
if update_form.validate_on_submit():
user.id = update_form.id.data
user.fullname = update_form.fullname.data
user.name = update_form.name.data
if (user.email_addr != update_form.email_addr.data and
acc_conf_dis is False):
user.valid_email = False
user.newsletter_prompted = False
account = dict(fullname=update_form.fullname.data,
name=update_form.name.data,
email_addr=update_form.email_addr.data)
confirm_url = get_email_confirmation_url(account)
subject = ('You have updated your email in %s! Verify it'
% current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[update_form.email_addr.data],
body=render_template(
'/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = markdown(msg['body'])
mail_queue.enqueue(send_mail, msg)
user.confirmation_email_sent = True
fls = gettext('An email has been sent to verify your \
new email: %s. Once you verify it, it will \
be updated.' % account['email_addr'])
flash(fls, 'info')
return True
if acc_conf_dis:
user.email_addr = update_form.email_addr.data
user.privacy_mode = update_form.privacy_mode.data
user.locale = update_form.locale.data
user.subscribed = update_form.subscribed.data
user_repo.update(user)
cached_users.delete_user_summary(user.name)
flash(gettext('Your profile has been updated!'), 'success')
return True
else:
flash(gettext('Please correct the errors'), 'error')
return False
def reset_password():
"""
Reset password method.
Returns a Jinja2 template.
"""
key = request.args.get('key')
if key is None:
abort(403)
userdict = {}
try:
timeout = current_app.config.get('ACCOUNT_LINK_EXPIRATION', 3600)
userdict = signer.loads(key, max_age=timeout, salt='password-reset')
except BadData:
abort(403)
username = userdict.get('user')
if not username or not userdict.get('password'):
abort(403)
user = user_repo.get_by_name(username)
if user.passwd_hash != userdict.get('password'):
abort(403)
form = ChangePasswordForm(request.body)
if form.validate_on_submit():
user.set_password(form.new_password.data)
user_repo.update(user)
flash(gettext('You reset your password successfully!'), 'success')
return _sign_in_user(user)
if request.method == 'POST' and not form.validate():
flash(gettext('Please correct the errors'), 'error')
response = dict(template='/account/password_reset.html', form=form)
return handle_content_type(response)
def pro_features(owner=None):
feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
pro = {
'auditlog_enabled': feature_handler.auditlog_enabled_for(current_user),
'autoimporter_enabled': feature_handler.autoimporter_enabled_for(current_user),
'webhooks_enabled': feature_handler.webhooks_enabled_for(current_user)
}
if owner:
pro['better_stats_enabled'] = feature_handler.better_stats_enabled_for(
current_user,
owner)
return pro