rule.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:rucio 作者: rucio01 项目源码 文件源码
def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blacklisted_rules=[], session=None):
    """
    Get stuck rules.

    :param total_workers:      Number of total workers.
    :param worker_number:      id of the executing worker.
    :param delta:              Delta in seconds to select rules in.
    :param limit:              Maximum number of rules to select.
    :param blacklisted_rules:  Blacklisted rules to filter out.
    :param session:            Database session in use.
    """
    if session.bind.dialect.name == 'oracle':
        query = session.query(models.ReplicationRule.id).\
            with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\
            filter(text("(CASE when rules.state='S' THEN rules.state ELSE null END)= 'S' ")).\
            filter(models.ReplicationRule.state == RuleState.STUCK).\
            filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\
            filter(or_(models.ReplicationRule.expires_at == null(),
                       models.ReplicationRule.expires_at > datetime.utcnow(),
                       models.ReplicationRule.locked == true())).\
            order_by(models.ReplicationRule.updated_at)  # NOQA
    else:
        query = session.query(models.ReplicationRule.id).\
            with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\
            filter(models.ReplicationRule.state == RuleState.STUCK).\
            filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\
            filter(or_(models.ReplicationRule.expires_at == null(),
                       models.ReplicationRule.expires_at > datetime.utcnow(),
                       models.ReplicationRule.locked == true())).\
            order_by(models.ReplicationRule.updated_at)

    if session.bind.dialect.name == 'oracle':
        bindparams = [bindparam('worker_number', worker_number),
                      bindparam('total_workers', total_workers)]
        query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
    elif session.bind.dialect.name == 'mysql':
        query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
    elif session.bind.dialect.name == 'postgresql':
        query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))

    if limit:
        fetched_rules = query.limit(limit).all()
        filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules]
        if len(fetched_rules) == limit and len(filtered_rules) == 0:
            return get_stuck_rules(total_workers=total_workers,
                                   worker_number=worker_number,
                                   delta=delta,
                                   limit=None,
                                   blacklisted_rules=blacklisted_rules,
                                   session=session)
        else:
            return filtered_rules
    else:
        return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号