def release_waiting_requests(rse, activity=None, rse_id=None, count=None, account=None, session=None):
"""
Release waiting requests.
:param rse: The RSE name.
:param activity: The activity.
:param rse_id: The RSE id.
:param count: The count to be released. If None, release all waiting requests.
"""
try:
if not rse_id:
rse_id = get_rse_id(rse=rse, session=session)
rowcount = 0
if count is None:
query = session.query(models.Request).\
filter_by(dest_rse_id=rse_id, state=RequestState.WAITING)
if activity:
query = query.filter_by(activity=activity)
if account:
query = query.filter_by(account=account)
rowcount = query.update({'state': RequestState.QUEUED}, synchronize_session=False)
elif count > 0:
subquery = session.query(models.Request.id)\
.filter(models.Request.dest_rse_id == rse_id)\
.filter(models.Request.state == RequestState.WAITING)\
.order_by(asc(models.Request.requested_at))
if activity:
subquery = subquery.filter(models.Request.activity == activity)
if account:
subquery = subquery.filter(models.Request.account == account)
subquery = subquery.limit(count).with_for_update()
rowcount = session.query(models.Request)\
.filter(models.Request.id.in_(subquery))\
.update({'state': RequestState.QUEUED},
synchronize_session=False)
return rowcount
except IntegrityError as error:
raise RucioException(error.args)
评论列表
文章目录