replica.py 文件源码

python
阅读 34 收藏 0 点赞 0 评论 0

项目:rucio 作者: rucio01 项目源码 文件源码
def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None):
    """
    List RSE File replicas with no locks.

    :param rse: the rse name.
    :param bytes: the amount of needed bytes.
    :param session: The database session in use.

    :returns: a list of dictionary replica.
    """
    if not rse_id:
        rse_id = get_rse_id(rse=rse, session=session)

    # filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\
    none_value = None  # Hack to get pep8 happy...
    query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\
        with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX)  NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\
        filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\
        filter(models.RSEFileAssociation.lock_cnt == 0).\
        filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
        filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)),
                   and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\
        order_by(models.RSEFileAssociation.tombstone)

    # do no delete files used as sources
    stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\
        where(and_(models.RSEFileAssociation.scope == models.Request.scope,
                   models.RSEFileAssociation.name == models.Request.name))
    query = query.filter(not_(stmt))

    if worker_number and total_workers and total_workers - 1 > 0:
        if session.bind.dialect.name == 'oracle':
            bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
            query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
        elif session.bind.dialect.name == 'mysql':
            query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1)))
        elif session.bind.dialect.name == 'postgresql':
            query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)))

    needed_space = bytes
    total_bytes, total_files = 0, 0
    rows = []
    for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000):
        if state != ReplicaState.UNAVAILABLE:

            total_bytes += bytes
            if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space:
                break

            total_files += 1
            if total_files > limit:
                break

        rows.append({'scope': scope, 'name': name, 'path': path,
                     'bytes': bytes, 'tombstone': tombstone,
                     'state': state})
    return rows
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号