replica.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:rucio 作者: rucio01 项目源码 文件源码
def list_bad_replicas(limit=10000, thread=None, total_threads=None, session=None):
    """
    List RSE File replicas with no locks.

    :param limit: The maximum number of replicas returned.
    :param thread: The assigned thread for this necromancer.
    :param total_threads: The total number of threads of all necromancers.
    :param session: The database session in use.

    :returns: a list of dictionary {'scope' scope, 'name': name, 'rse_id': rse_id, 'rse': rse}.
    """

    if session.bind.dialect.name == 'oracle':
        # The filter(text...)) is needed otherwise, SQLA uses bind variables and the index is not used.
        query = session.query(models.RSEFileAssociation.scope,
                              models.RSEFileAssociation.name,
                              models.RSEFileAssociation.rse_id).\
            with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_STATE_IDX)", 'oracle').\
            filter(text("CASE WHEN (%s.replicas.state != 'A') THEN %s.replicas.rse_id END IS NOT NULL" % (DEFAULT_SCHEMA_NAME,
                                                                                                          DEFAULT_SCHEMA_NAME))).\
            filter(models.RSEFileAssociation.state == ReplicaState.BAD)
    else:
        query = session.query(models.RSEFileAssociation.scope,
                              models.RSEFileAssociation.name,
                              models.RSEFileAssociation.rse_id).\
            filter(models.RSEFileAssociation.state == ReplicaState.BAD)

    if total_threads and (total_threads - 1) > 0:
        if session.bind.dialect.name == 'oracle':
            bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
            query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams))
        elif session.bind.dialect.name == 'mysql':
            query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread)))
        elif session.bind.dialect.name == 'postgresql':
            query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)))

    query = query.limit(limit)
    rows = []
    rse_map = {}
    for scope, name, rse_id in query.yield_per(1000):
        if rse_id not in rse_map:
            rse_map[rse_id] = get_rse_name(rse_id=rse_id, session=session)
        rows.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'rse': rse_map[rse_id]})
    return rows
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号