def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None):
"""
List RSE File replicas with no locks.
:param rse: the rse name.
:param bytes: the amount of needed bytes.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
if not rse_id:
rse_id = get_rse_id(rse=rse, session=session)
# filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\
none_value = None # Hack to get pep8 happy...
query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\
with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\
filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\
filter(models.RSEFileAssociation.lock_cnt == 0).\
filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)),
and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\
order_by(models.RSEFileAssociation.tombstone)
# do no delete files used as sources
stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.Request.scope,
models.RSEFileAssociation.name == models.Request.name))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)))
needed_space = bytes
total_bytes, total_files = 0, 0
rows = []
for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000):
if state != ReplicaState.UNAVAILABLE:
total_bytes += bytes
if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space:
break
total_files += 1
if total_files > limit:
break
rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes, 'tombstone': tombstone,
'state': state})
return rows
评论列表
文章目录