python类config()的实例源码

views.py 文件源码 项目:FMBlog 作者: vc12345679 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def search(keyword):
    Blog.update()
    pt = request.args.get('page')
    if pt is None:
        page = 1
    else:
        page = int(pt)
    try:
        keywords = keyword.split()
        from sqlalchemy import and_, or_
        rules = and_(
            *[or_(Post.title.ilike('%%%s%%' % k), Post.summary.ilike('%%%s%%' % k), Post.content.ilike('%%%s%%' % k))
              for k in keywords])
        pagination = Post.query.filter(rules).order_by(Post.date.desc()).paginate(
            page=page, per_page=current_app.config['FMBLOG_PER_PAGE'])
    except Exception:
        return render_template('404.html', e='Error: Empty Keyword', site=current_app.config['FMBLOG_SITE'],
                               value={}), 404
    return render_template('search.html', value={'keyword': keyword},
                           pagination=pagination, endpoint='main.search',
                           page_list=get_page_list(pagination, page),
                           tags=Tag.query.order_by(Tag.count.desc()).all(),
                           cats=Category.query.order_by(Category.count.desc()).all(),
                           site=current_app.config['FMBLOG_SITE'])
api.py 文件源码 项目:invenio-github 作者: inveniosoftware 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sync_repo_hook(self, repo_id):
        """Sync a GitHub repo's hook with the locally stored repo."""
        # Get the hook that we may have set in the past
        gh_repo = self.api.repository_with_id(repo_id)
        hooks = (hook.id for hook in gh_repo.hooks()
                 if hook.config.get('url', '') == self.webhook_url)
        hook_id = next(hooks, None)

        # If hook on GitHub exists, get or create corresponding db object and
        # enable the hook. Otherwise remove the old hook information.
        if hook_id:
            Repository.enable(user_id=self.user_id,
                              github_id=gh_repo.id,
                              name=gh_repo.full_name,
                              hook=hook_id)
        else:
            Repository.disable(user_id=self.user_id,
                               github_id=gh_repo.id,
                               name=gh_repo.full_name)
views.py 文件源码 项目:dingdian 作者: Blackyukun 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def chapter(book_id):
    page = request.args.get('page', 1, type=int)
    all_chapter = Chapter.query.filter_by(book_id=book_id).first()
    # print(type(pagination))
    if all_chapter:
        pagination = Chapter.query.filter_by(book_id=book_id).paginate(
                page, per_page=current_app.config['CHAPTER_PER_PAGE'],
                error_out=False
        )
        chapters = pagination.items
        book = Novel.query.filter_by(id=book_id).first()
        return render_template('chapter.html', book=book, chapters=chapters, pagination=pagination)

    spider = DdSpider()
    book = Novel.query.filter_by(id=book_id).first()
    for data in spider.get_chapter(book.book_url):
        chapter = Chapter(chapter=data['chapter'],
                           chapter_url=data['url'],
                           book_id=book_id)
        db.session.add(chapter)
    pagination2 = Chapter.query.filter_by(book_id=book_id).paginate(
        page, per_page=current_app.config['CHAPTER_PER_PAGE'],
        error_out=False
    )
    chapters = pagination2.items

    return render_template('chapter.html', book=book, chapters=chapters, pagination=pagination2)
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_default_jobs():  # pragma: no cover
    """Return default jobs."""
    timeout = current_app.config.get('TIMEOUT')
    yield dict(name=warm_up_stats, args=[], kwargs={},
               timeout=timeout, queue='high')
    yield dict(name=warn_old_project_owners, args=[], kwargs={},
               timeout=timeout, queue='low')
    yield dict(name=warm_cache, args=[], kwargs={},
               timeout=timeout, queue='super')
    yield dict(name=news, args=[], kwargs={},
               timeout=timeout, queue='low')
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_maintenance_jobs():
    """Return mantainance jobs."""
    timeout = current_app.config.get('TIMEOUT')
    yield dict(name=check_failed, args=[], kwargs={},
               timeout=timeout, queue='maintenance')
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_project_jobs(queue='super'):
    """Return a list of jobs based on user type."""
    from pybossa.cache import projects as cached_projects
    timeout = current_app.config.get('TIMEOUT')
    return create_dict_jobs(cached_projects.get_from_pro_user(),
                            get_project_stats,
                            timeout=timeout,
                            queue=queue)
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_inactive_users_jobs(queue='quaterly'):
    """Return a list of inactive users that have contributed to a project."""
    from sqlalchemy.sql import text
    from pybossa.model.user import User
    from pybossa.core import db
    # First users that have participated once but more than 3 months ago
    sql = text('''SELECT user_id FROM task_run
               WHERE user_id IS NOT NULL
               AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
               >= NOW() - '12 month'::INTERVAL
               AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
               < NOW() - '3 month'::INTERVAL
               GROUP BY user_id ORDER BY user_id;''')
    results = db.slave_session.execute(sql)

    timeout = current_app.config.get('TIMEOUT')

    for row in results:

        user = User.query.get(row.user_id)

        if user.subscribed:
            subject = "We miss you!"
            body = render_template('/account/email/inactive.md',
                                   user=user.dictize(),
                                   config=current_app.config)
            html = render_template('/account/email/inactive.html',
                                   user=user.dictize(),
                                   config=current_app.config)

            mail_dict = dict(recipients=[user.email_addr],
                             subject=subject,
                             body=body,
                             html=html)

            job = dict(name=send_mail,
                       args=[mail_dict],
                       kwargs={},
                       timeout=timeout,
                       queue=queue)
            yield job
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_leaderboard_jobs(queue='super'):  # pragma: no cover
    """Return leaderboard jobs."""
    timeout = current_app.config.get('TIMEOUT')
    yield dict(name=leaderboard.leaderboard, args=[], kwargs={},
               timeout=timeout, queue=queue)
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_non_contributors_users_jobs(queue='quaterly'):
    """Return a list of users that have never contributed to a project."""
    from sqlalchemy.sql import text
    from pybossa.model.user import User
    from pybossa.core import db
    # Second users that have created an account but never participated
    sql = text('''SELECT id FROM "user" WHERE
               NOT EXISTS (SELECT user_id FROM task_run
               WHERE task_run.user_id="user".id)''')
    results = db.slave_session.execute(sql)
    timeout = current_app.config.get('TIMEOUT')
    for row in results:
        user = User.query.get(row.id)

        if user.subscribed:
            subject = "Why don't you help us?!"
            body = render_template('/account/email/noncontributors.md',
                                   user=user.dictize(),
                                   config=current_app.config)
            html = render_template('/account/email/noncontributors.html',
                                   user=user.dictize(),
                                   config=current_app.config)
            mail_dict = dict(recipients=[user.email_addr],
                             subject=subject,
                             body=body,
                             html=html)

            job = dict(name=send_mail,
                       args=[mail_dict],
                       kwargs={},
                       timeout=timeout,
                       queue=queue)
            yield job
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_project_stats(_id, short_name):  # pragma: no cover
    """Get stats for project."""
    import pybossa.cache.projects as cached_projects
    import pybossa.cache.project_stats as stats
    from flask import current_app

    cached_projects.get_project(short_name)
    cached_projects.n_tasks(_id)
    cached_projects.n_task_runs(_id)
    cached_projects.overall_progress(_id)
    cached_projects.last_activity(_id)
    cached_projects.n_completed_tasks(_id)
    cached_projects.n_volunteers(_id)
    cached_projects.browse_tasks(_id)
    stats.get_stats(_id, current_app.config.get('GEO'))
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def webhook(url, payload=None, oid=None):
    """Post to a webhook."""
    from flask import current_app
    from readability.readability import Document
    try:
        import json
        from pybossa.core import sentinel, webhook_repo, project_repo
        project = project_repo.get(payload['project_id'])
        headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
        if oid:
            webhook = webhook_repo.get(oid)
        else:
            webhook = Webhook(project_id=payload['project_id'],
                              payload=payload)
        if url:
            response = requests.post(url, data=json.dumps(payload), headers=headers)
            webhook.response = Document(response.text).summary()
            webhook.response_status_code = response.status_code
        else:
            raise requests.exceptions.ConnectionError('Not URL')
        if oid:
            webhook_repo.update(webhook)
            webhook = webhook_repo.get(oid)
        else:
            webhook_repo.save(webhook)
    except requests.exceptions.ConnectionError:
        webhook.response = 'Connection Error'
        webhook.response_status_code = None
        webhook_repo.save(webhook)
    finally:
        if project.published and webhook.response_status_code != 200 and current_app.config.get('ADMINS'):
            subject = "Broken: %s webhook failed" % project.name
            body = 'Sorry, but the webhook failed'
            mail_dict = dict(recipients=current_app.config.get('ADMINS'),
                             subject=subject, body=body, html=webhook.response)
            send_mail(mail_dict)
    if current_app.config.get('SSE'):
        publish_channel(sentinel, payload['project_short_name'],
                        data=webhook.dictize(), type='webhook',
                        private=True)
    return webhook
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_weekly_stats_update_projects():
    """Return email jobs with weekly stats update for project owner."""
    from sqlalchemy.sql import text
    from pybossa.core import db
    from pybossa.pro_features import ProFeatureHandler

    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
    only_pros = feature_handler.only_for_pro('project_weekly_report')
    only_pros_sql = 'AND "user".pro=true' if only_pros else ''
    send_emails_date = current_app.config.get('WEEKLY_UPDATE_STATS')
    today = datetime.today().strftime('%A').lower()
    timeout = current_app.config.get('TIMEOUT')
    if today.lower() == send_emails_date.lower():
        sql = text('''
                   SELECT project.id
                   FROM project, "user", task
                   WHERE "user".id=project.owner_id %s
                   AND "user".subscribed=true
                   AND task.project_id=project.id
                   AND task.state!='completed'
                   UNION
                   SELECT project.id
                   FROM project
                   WHERE project.featured=true;
                   ''' % only_pros_sql)
        results = db.slave_session.execute(sql)
        for row in results:
            job = dict(name=send_weekly_stats_project,
                       args=[row.id],
                       kwargs={},
                       timeout=timeout,
                       queue='low')
            yield job
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check_failed():
    """Check the jobs that have failed and requeue them."""
    from rq import Queue, get_failed_queue, requeue_job
    from pybossa.core import sentinel

    fq = get_failed_queue()
    job_ids = fq.job_ids
    count = len(job_ids)
    FAILED_JOBS_RETRIES = current_app.config.get('FAILED_JOBS_RETRIES')
    for job_id in job_ids:
        KEY = 'pybossa:job:failed:%s' % job_id
        job = fq.fetch_job(job_id)
        if sentinel.slave.exists(KEY):
            sentinel.master.incr(KEY)
        else:
            ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
            sentinel.master.setex(KEY, ttl, 1)
        if int(sentinel.slave.get(KEY)) < FAILED_JOBS_RETRIES:
            requeue_job(job_id)
        else:
            KEY = 'pybossa:job:failed:mailed:%s' % job_id
            if (not sentinel.slave.exists(KEY) and
                    current_app.config.get('ADMINS')):
                subject = "JOB: %s has failed more than 3 times" % job_id
                body = "Please, review the background jobs of your server."
                body += "\n This is the trace error\n\n"
                body += "------------------------------\n\n"
                body += job.exc_info
                mail_dict = dict(recipients=current_app.config.get('ADMINS'),
                                 subject=subject, body=body)
                send_mail(mail_dict)
                ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
                sentinel.master.setex(KEY, ttl, True)
    if count > 0:
        return "JOBS: %s You have failed the system." % job_ids
    else:
        return "You have not failed the system"
jobs.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def push_notification(project_id, **kwargs):
    """Send push notification."""
    from pybossa.core import project_repo
    project = project_repo.get(project_id)
    if project.info.get('onesignal'):
        app_id = current_app.config.get('ONESIGNAL_APP_ID') 
        api_key = current_app.config.get('ONESIGNAL_API_KEY')
        client = PybossaOneSignal(app_id=app_id, api_key=api_key)
        filters = [{"field": "tag", "key": project_id, "relation": "exists"}]
        return client.push_msg(contents=kwargs['contents'],
                               headings=kwargs['headings'],
                               launch_url=kwargs['launch_url'],
                               web_buttons=kwargs['web_buttons'],
                               filters=filters)
account.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 88 收藏 0 点赞 0 评论 0
def confirm_account():
    """Confirm account endpoint."""
    key = request.args.get('key')
    if key is None:
        abort(403)
    try:
        timeout = current_app.config.get('ACCOUNT_LINK_EXPIRATION', 3600)
        userdict = signer.loads(key, max_age=timeout, salt='account-validation')
    except BadData:
        abort(403)
    # First check if the user exists
    user = user_repo.get_by_name(userdict['name'])
    if user is not None:
        return _update_user_with_valid_email(user, userdict['email_addr'])
    return _create_account(userdict)
account.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _handle_avatar_update(user, avatar_form):
    if avatar_form.validate_on_submit():
        _file = request.files['avatar']
        coordinates = (avatar_form.x1.data, avatar_form.y1.data,
                       avatar_form.x2.data, avatar_form.y2.data)
        prefix = time.time()
        _file.filename = "%s_avatar.png" % prefix
        container = "user_%s" % user.id
        uploader.upload_file(_file,
                             container=container,
                             coordinates=coordinates)
        # Delete previous avatar from storage
        if user.info.get('avatar'):
            uploader.delete_file(user.info['avatar'], container)
        upload_method = current_app.config.get('UPLOAD_METHOD')
        avatar_url = get_avatar_url(upload_method,
                                    _file.filename, container)
        user.info = {'avatar': _file.filename,
                     'container': container,
                     'avatar_url': avatar_url}
        user_repo.update(user)
        cached_users.delete_user_summary(user.name)
        flash(gettext('Your avatar has been updated! It may \
                      take some minutes to refresh...'), 'success')
        return True
    else:
        flash("You have to provide an image file to update your avatar", "error")
        return False
account.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _handle_profile_update(user, update_form):
    acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED')
    if update_form.validate_on_submit():
        user.id = update_form.id.data
        user.fullname = update_form.fullname.data
        user.name = update_form.name.data
        if (user.email_addr != update_form.email_addr.data and
                acc_conf_dis is False):
            user.valid_email = False
            user.newsletter_prompted = False
            account = dict(fullname=update_form.fullname.data,
                           name=update_form.name.data,
                           email_addr=update_form.email_addr.data)
            confirm_url = get_email_confirmation_url(account)
            subject = ('You have updated your email in %s! Verify it'
                       % current_app.config.get('BRAND'))
            msg = dict(subject=subject,
                       recipients=[update_form.email_addr.data],
                       body=render_template(
                           '/account/email/validate_email.md',
                           user=account, confirm_url=confirm_url))
            msg['html'] = markdown(msg['body'])
            mail_queue.enqueue(send_mail, msg)
            user.confirmation_email_sent = True
            fls = gettext('An email has been sent to verify your \
                          new email: %s. Once you verify it, it will \
                          be updated.' % account['email_addr'])
            flash(fls, 'info')
            return True
        if acc_conf_dis:
            user.email_addr = update_form.email_addr.data
        user.privacy_mode = update_form.privacy_mode.data
        user.locale = update_form.locale.data
        user.subscribed = update_form.subscribed.data
        user_repo.update(user)
        cached_users.delete_user_summary(user.name)
        flash(gettext('Your profile has been updated!'), 'success')
        return True
    else:
        flash(gettext('Please correct the errors'), 'error')
        return False
account.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reset_password():
    """
    Reset password method.

    Returns a Jinja2 template.

    """
    key = request.args.get('key')
    if key is None:
        abort(403)
    userdict = {}
    try:
        timeout = current_app.config.get('ACCOUNT_LINK_EXPIRATION', 3600)
        userdict = signer.loads(key, max_age=timeout, salt='password-reset')
    except BadData:
        abort(403)
    username = userdict.get('user')
    if not username or not userdict.get('password'):
        abort(403)
    user = user_repo.get_by_name(username)
    if user.passwd_hash != userdict.get('password'):
        abort(403)
    form = ChangePasswordForm(request.body)
    if form.validate_on_submit():
        user.set_password(form.new_password.data)
        user_repo.update(user)
        flash(gettext('You reset your password successfully!'), 'success')
        return _sign_in_user(user)
    if request.method == 'POST' and not form.validate():
        flash(gettext('Please correct the errors'), 'error')
    response = dict(template='/account/password_reset.html', form=form)
    return handle_content_type(response)
projects.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pro_features(owner=None):
    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
    pro = {
        'auditlog_enabled': feature_handler.auditlog_enabled_for(current_user),
        'autoimporter_enabled': feature_handler.autoimporter_enabled_for(current_user),
        'webhooks_enabled': feature_handler.webhooks_enabled_for(current_user)
    }
    if owner:
        pro['better_stats_enabled'] = feature_handler.better_stats_enabled_for(
                                          current_user,
                                          owner)
    return pro


问题


面经


文章

微信
公众号

扫码关注公众号