def get_updated_account_counters(total_workers, worker_number, session=None):
"""
Get updated rse_counters.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param session: Database session in use.
:returns: List of rse_ids whose rse_counters need to be updated.
"""
query = session.query(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id).\
distinct(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id)
if total_workers > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(CONCAT(account, rse_id), :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter('mod(md5(concat(account, rse_id)), %s) = %s' % (total_workers + 1, worker_number))
elif session.bind.dialect.name == 'postgresql':
query = query.filter('mod(abs((\'x\'||md5(concat(account, rse_id)))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))
return query.all()
评论列表
文章目录