def list_quarantined_replicas(rse, limit, worker_number=None, total_workers=None, session=None):
"""
List RSE Quarantined File replicas.
:param rse: the rse name.
:param limit: The maximum number of replicas returned.
:param worker_number: id of the executing worker.
:param total_workers: Number of total workers.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
rse_id = get_rse_id(rse, session=session)
query = session.query(models.QuarantinedReplica.path,
models.QuarantinedReplica.bytes,
models.QuarantinedReplica.scope,
models.QuarantinedReplica.name,
models.QuarantinedReplica.created_at).\
filter(models.QuarantinedReplica.rse_id == rse_id)
# do no delete valid replicas
stmt = exists(select([1]).prefix_with("/*+ index(REPLICAS REPLICAS_PK) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.QuarantinedReplica.scope,
models.RSEFileAssociation.name == models.QuarantinedReplica.name,
models.RSEFileAssociation.rse_id == models.QuarantinedReplica.rse_id))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(path, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter('mod(md5(path), %s) = %s' % (total_workers - 1, worker_number - 1))
elif session.bind.dialect.name == 'postgresql':
query = query.filter('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))
return [{'path': path,
'rse': rse,
'rse_id': rse_id,
'created_at': created_at,
'scope': scope,
'name': name,
'bytes': bytes}
for path, bytes, scope, name, created_at in query.limit(limit)]
评论列表
文章目录