def get_updated_account_counters(total_workers, worker_number, session=None):
"""
Get updated rse_counters.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param session: Database session in use.
:returns: List of rse_ids whose rse_counters need to be updated.
"""
query = session.query(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id).\
distinct(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id)
if total_workers > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(CONCAT(account, rse_id), :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter('mod(md5(concat(account, rse_id)), %s) = %s' % (total_workers + 1, worker_number))
elif session.bind.dialect.name == 'postgresql':
query = query.filter('mod(abs((\'x\'||md5(concat(account, rse_id)))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))
return query.all()
python类bindparam()的实例源码
def test_unicode(self, engine, connection):
unicode_str = '??'
one_row = Table('one_row', MetaData(bind=engine))
returned_str = sqlalchemy.select(
[expression.bindparam('????', unicode_str)],
from_obj=one_row,
).scalar()
self.assertEqual(returned_str, unicode_str)
def test_unicode(self, engine, connection):
unicode_str = '??'
one_row = Table('one_row', MetaData(bind=engine))
returned_str = sqlalchemy.select(
[expression.bindparam('????', unicode_str)],
from_obj=one_row,
).scalar()
self.assertEqual(returned_str, unicode_str)
def get_updated_dids(total_workers, worker_number, limit=100, blacklisted_dids=[], session=None):
"""
Get updated dids.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param limit: Maximum number of dids to return.
:param blacklisted_dids: Blacklisted dids to filter.
:param session: Database session in use.
"""
query = session.query(models.UpdatedDID.id,
models.UpdatedDID.scope,
models.UpdatedDID.name,
models.UpdatedDID.rule_evaluation_action)
if total_workers > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))
if limit:
fetched_dids = query.order_by(models.UpdatedDID.created_at).limit(limit).all()
filtered_dids = [did for did in fetched_dids if (did.scope, did.name) not in blacklisted_dids]
if len(fetched_dids) == limit and len(filtered_dids) == 0:
return get_updated_dids(total_workers=total_workers,
worker_number=worker_number,
limit=None,
blacklisted_dids=blacklisted_dids,
session=session)
else:
return filtered_dids
else:
return [did for did in query.order_by(models.UpdatedDID.created_at).all() if (did.scope, did.name) not in blacklisted_dids]
def get_rules_beyond_eol(date_check, worker_number, total_workers, session):
"""
Get rules which have eol_at before a certain date.
:param date_check: The reference date that should be compared to eol_at.
:param worker_number: id of the executing worker.
:param total_workers: Number of total workers.
:param session: Database session in use.
"""
query = session.query(models.ReplicationRule.scope,
models.ReplicationRule.name,
models.ReplicationRule.rse_expression,
models.ReplicationRule.locked,
models.ReplicationRule.id,
models.ReplicationRule.eol_at,
models.ReplicationRule.expires_at).\
filter(models.ReplicationRule.eol_at < date_check)
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))
return [rule for rule in query.all()]
def get_expired_rules(total_workers, worker_number, limit=100, blacklisted_rules=[], session=None):
"""
Get expired rules.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param limit: Maximum number of rules to return.
:param backlisted_rules: List of blacklisted rules.
:param session: Database session in use.
"""
query = session.query(models.ReplicationRule.id, models.ReplicationRule.rse_expression).filter(models.ReplicationRule.expires_at < datetime.utcnow(),
models.ReplicationRule.locked == False,
models.ReplicationRule.child_rule_id == None).\
with_hint(models.ReplicationRule, "index(rules RULES_EXPIRES_AT_IDX)", 'oracle').\
order_by(models.ReplicationRule.expires_at) # NOQA
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))
if limit:
fetched_rules = query.limit(limit).all()
filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules]
if len(fetched_rules) == limit and len(filtered_rules) == 0:
return get_expired_rules(total_workers=total_workers,
worker_number=worker_number,
limit=None,
blacklisted_rules=blacklisted_rules,
session=session)
else:
return filtered_rules
else:
return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def list_bad_replicas_history(limit=10000, thread=None, total_threads=None, session=None):
"""
List the bad file replicas history. Method only used by necromancer
:param limit: The maximum number of replicas returned.
:param thread: The assigned thread for this necromancer.
:param total_threads: The total number of threads of all necromancers.
:param session: The database session in use.
"""
query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.BadReplicas.rse_id).\
filter(models.BadReplicas.state == BadFilesStatus.BAD)
if total_threads and (total_threads - 1) > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)))
query = query.limit(limit)
bad_replicas = {}
for scope, name, rse_id in query.yield_per(1000):
if rse_id not in bad_replicas:
bad_replicas[rse_id] = []
bad_replicas[rse_id].append({'scope': scope, 'name': name})
return bad_replicas
def list_expired_temporary_dids(rse, limit, worker_number=None, total_workers=None,
session=None):
"""
List expired temporary DIDs.
:param rse: the rse name.
:param limit: The maximum number of replicas returned.
:param worker_number: id of the executing worker.
:param total_workers: Number of total workers.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
rse_id = get_rse_id(rse, session=session)
is_none = None
query = session.query(models.TemporaryDataIdentifier.scope,
models.TemporaryDataIdentifier.name,
models.TemporaryDataIdentifier.path,
models.TemporaryDataIdentifier.bytes).\
with_hint(models.TemporaryDataIdentifier, "INDEX(tmp_dids TMP_DIDS_EXPIRED_AT_IDX)", 'oracle').\
filter(case([(models.TemporaryDataIdentifier.expired_at != is_none, models.TemporaryDataIdentifier.rse_id), ]) == rse_id)
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)))
return [{'path': path,
'rse': rse,
'rse_id': rse_id,
'scope': scope,
'name': name,
'bytes': bytes}
for scope, name, path, bytes in query.limit(limit)]
def test_unicode(engine, table_one_row):
unicode_str = "?????"
returned_str = sqlalchemy.select(
[expression.bindparam("?", unicode_str)],
from_obj=table_one_row,
).scalar()
assert returned_str == unicode_str
def get_descendants_query(
cls, root_idea_id=bindparam('root_idea_id', type_=Integer),
inclusive=True):
if cls.using_virtuoso:
sql = text(
"""SELECT transitive t_in (1) t_out (2) T_DISTINCT T_NO_CYCLES
source_id, target_id FROM idea_idea_link
WHERE tombstone_date IS NULL"""
).columns(column('source_id'), column('target_id')).alias()
select_exp = select([sql.c.target_id.label('id')]
).select_from(sql).where(sql.c.source_id==root_idea_id)
else:
link = select(
[IdeaLink.source_id, IdeaLink.target_id]
).select_from(
IdeaLink
).where(
(IdeaLink.tombstone_date == None) &
(IdeaLink.source_id == root_idea_id)
).cte(recursive=True)
source_alias = aliased(link)
targets_alias = aliased(IdeaLink)
parent_link = targets_alias.source_id == source_alias.c.target_id
children = select(
[targets_alias.source_id, targets_alias.target_id]
).select_from(targets_alias).where(parent_link
& (targets_alias.tombstone_date == None))
with_children = link.union(children)
select_exp = select([with_children.c.target_id.label('id')]
).select_from(with_children)
if inclusive:
if isinstance(root_idea_id, int):
root_idea_id = literal_column(str(root_idea_id), Integer)
select_exp = select_exp.union(
select([root_idea_id.label('id')]))
return select_exp.alias()
def get_injected_rules(total_workers, worker_number, limit=100, blacklisted_rules=[], session=None):
"""
Get rules to be injected.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param limit: Maximum number of rules to return.
:param blacklisted_rules: Blacklisted rules not to include.
:param session: Database session in use.
"""
if session.bind.dialect.name == 'oracle':
query = session.query(models.ReplicationRule.id).\
with_hint(models.ReplicationRule, "index(rules RULES_INJECTIONSTATE_IDX)", 'oracle').\
filter(text("(CASE when rules.state='I' THEN rules.state ELSE null END)= 'I' ")).\
filter(models.ReplicationRule.state == RuleState.INJECT).\
order_by(models.ReplicationRule.created_at)
else:
query = session.query(models.ReplicationRule.id).\
with_hint(models.ReplicationRule, "index(rules RULES_INJECTIONSTATE_IDX)", 'oracle').\
filter(models.ReplicationRule.state == RuleState.INJECT).\
order_by(models.ReplicationRule.created_at)
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))
if limit:
fetched_rules = query.limit(limit).all()
filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules]
if len(fetched_rules) == limit and len(filtered_rules) == 0:
return get_injected_rules(total_workers=total_workers,
worker_number=worker_number,
limit=None,
blacklisted_rules=blacklisted_rules,
session=session)
else:
return filtered_rules
else:
return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blacklisted_rules=[], session=None):
"""
Get stuck rules.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param delta: Delta in seconds to select rules in.
:param limit: Maximum number of rules to select.
:param blacklisted_rules: Blacklisted rules to filter out.
:param session: Database session in use.
"""
if session.bind.dialect.name == 'oracle':
query = session.query(models.ReplicationRule.id).\
with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\
filter(text("(CASE when rules.state='S' THEN rules.state ELSE null END)= 'S' ")).\
filter(models.ReplicationRule.state == RuleState.STUCK).\
filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\
filter(or_(models.ReplicationRule.expires_at == null(),
models.ReplicationRule.expires_at > datetime.utcnow(),
models.ReplicationRule.locked == true())).\
order_by(models.ReplicationRule.updated_at) # NOQA
else:
query = session.query(models.ReplicationRule.id).\
with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\
filter(models.ReplicationRule.state == RuleState.STUCK).\
filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\
filter(or_(models.ReplicationRule.expires_at == null(),
models.ReplicationRule.expires_at > datetime.utcnow(),
models.ReplicationRule.locked == true())).\
order_by(models.ReplicationRule.updated_at)
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number),
bindparam('total_workers', total_workers)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)))
if limit:
fetched_rules = query.limit(limit).all()
filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules]
if len(fetched_rules) == limit and len(filtered_rules) == 0:
return get_stuck_rules(total_workers=total_workers,
worker_number=worker_number,
delta=delta,
limit=None,
blacklisted_rules=blacklisted_rules,
session=session)
else:
return filtered_rules
else:
return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def list_bad_replicas(limit=10000, thread=None, total_threads=None, session=None):
"""
List RSE File replicas with no locks.
:param limit: The maximum number of replicas returned.
:param thread: The assigned thread for this necromancer.
:param total_threads: The total number of threads of all necromancers.
:param session: The database session in use.
:returns: a list of dictionary {'scope' scope, 'name': name, 'rse_id': rse_id, 'rse': rse}.
"""
if session.bind.dialect.name == 'oracle':
# The filter(text...)) is needed otherwise, SQLA uses bind variables and the index is not used.
query = session.query(models.RSEFileAssociation.scope,
models.RSEFileAssociation.name,
models.RSEFileAssociation.rse_id).\
with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_STATE_IDX)", 'oracle').\
filter(text("CASE WHEN (%s.replicas.state != 'A') THEN %s.replicas.rse_id END IS NOT NULL" % (DEFAULT_SCHEMA_NAME,
DEFAULT_SCHEMA_NAME))).\
filter(models.RSEFileAssociation.state == ReplicaState.BAD)
else:
query = session.query(models.RSEFileAssociation.scope,
models.RSEFileAssociation.name,
models.RSEFileAssociation.rse_id).\
filter(models.RSEFileAssociation.state == ReplicaState.BAD)
if total_threads and (total_threads - 1) > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)))
query = query.limit(limit)
rows = []
rse_map = {}
for scope, name, rse_id in query.yield_per(1000):
if rse_id not in rse_map:
rse_map[rse_id] = get_rse_name(rse_id=rse_id, session=session)
rows.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'rse': rse_map[rse_id]})
return rows
def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None):
"""
List RSE File replicas with no locks.
:param rse: the rse name.
:param bytes: the amount of needed bytes.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
if not rse_id:
rse_id = get_rse_id(rse=rse, session=session)
# filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\
none_value = None # Hack to get pep8 happy...
query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\
with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\
filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\
filter(models.RSEFileAssociation.lock_cnt == 0).\
filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)),
and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\
order_by(models.RSEFileAssociation.tombstone)
# do no delete files used as sources
stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.Request.scope,
models.RSEFileAssociation.name == models.Request.name))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)))
needed_space = bytes
total_bytes, total_files = 0, 0
rows = []
for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000):
if state != ReplicaState.UNAVAILABLE:
total_bytes += bytes
if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space:
break
total_files += 1
if total_files > limit:
break
rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes, 'tombstone': tombstone,
'state': state})
return rows
def list_quarantined_replicas(rse, limit, worker_number=None, total_workers=None, session=None):
"""
List RSE Quarantined File replicas.
:param rse: the rse name.
:param limit: The maximum number of replicas returned.
:param worker_number: id of the executing worker.
:param total_workers: Number of total workers.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
rse_id = get_rse_id(rse, session=session)
query = session.query(models.QuarantinedReplica.path,
models.QuarantinedReplica.bytes,
models.QuarantinedReplica.scope,
models.QuarantinedReplica.name,
models.QuarantinedReplica.created_at).\
filter(models.QuarantinedReplica.rse_id == rse_id)
# do no delete valid replicas
stmt = exists(select([1]).prefix_with("/*+ index(REPLICAS REPLICAS_PK) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.QuarantinedReplica.scope,
models.RSEFileAssociation.name == models.QuarantinedReplica.name,
models.RSEFileAssociation.rse_id == models.QuarantinedReplica.rse_id))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(path, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter('mod(md5(path), %s) = %s' % (total_workers - 1, worker_number - 1))
elif session.bind.dialect.name == 'postgresql':
query = query.filter('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))
return [{'path': path,
'rse': rse,
'rse_id': rse_id,
'created_at': created_at,
'scope': scope,
'name': name,
'bytes': bytes}
for path, bytes, scope, name, created_at in query.limit(limit)]
def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, session=None):
"""
List recent identifiers.
:param did_type : The DID type.
:param thread: The assigned thread for this necromancer.
:param total_threads: The total number of threads of all necromancers.
:param chunk_size: Number of requests to return per yield.
:param session: The database session in use.
"""
stmt = select([1]).\
prefix_with("/*+ INDEX(RULES ATLAS_RUCIO.RULES_SCOPE_NAME_IDX) */",
dialect='oracle').\
where(and_(models.DataIdentifier.scope == models.ReplicationRule.scope,
models.DataIdentifier.name == models.ReplicationRule.name,
models.ReplicationRule.state == RuleState.INJECT))
query = session.query(models.DataIdentifier).\
with_hint(models.DataIdentifier, "index(dids DIDS_IS_NEW_IDX)", 'oracle').\
filter_by(is_new=True).\
filter(~exists(stmt))
if did_type:
if isinstance(did_type, str) or isinstance(did_type, unicode):
query = query.filter_by(did_type=DIDType.from_sym(did_type))
elif isinstance(did_type, EnumSymbol):
query = query.filter_by(did_type=did_type)
if total_threads and (total_threads - 1) > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)))
row_count = 0
for chunk in query.yield_per(10):
row_count += 1
if row_count <= chunk_size:
yield {'scope': chunk.scope, 'name': chunk.name, 'did_type': chunk.did_type} # TODO Change this to the proper filebytes [RUCIO-199]
else:
break
def retrieve_messages(bulk=1000, thread=None, total_threads=None, event_type=None,
lock=False, session=None):
"""
Retrieve up to $bulk messages.
:param bulk: Number of messages as an integer.
:param thread: Identifier of the caller thread as an integer.
:param total_threads: Maximum number of threads as an integer.
:param event_type: Return only specified event_type. If None, returns everything except email.
:param lock: Select exclusively some rows.
:param session: The database session to use.
:returns messages: List of dictionaries {id, created_at, event_type, payload}
"""
messages = []
try:
subquery = session.query(Message.id)
if total_threads and (total_threads - 1) > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
subquery = subquery.filter(text('ORA_HASH(id, :total_threads) = :thread_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
subquery = subquery.filter('mod(md5(id), %s) = %s' % (total_threads - 1, thread))
elif session.bind.dialect.name == 'postgresql':
subquery = subquery.filter('mod(abs((\'x\'||md5(id))::bit(32)::int), %s) = %s' % (total_threads - 1, thread))
if event_type:
subquery = subquery.filter_by(event_type=event_type)
else:
subquery = subquery.filter(Message.event_type != 'email')
# Step 1:
# MySQL does not support limits in nested queries, limit on the outer query instead.
# This is not as performant, but the best we can get from MySQL.
if session.bind.dialect.name == 'mysql':
subquery = subquery.order_by(Message.created_at)
else:
subquery = subquery.order_by(Message.created_at).limit(bulk)
query = session.query(Message.id,
Message.created_at,
Message.event_type,
Message.payload)\
.filter(Message.id.in_(subquery))\
.with_for_update(nowait=True)
# Step 2:
# MySQL does not support limits in nested queries, limit on the outer query instead.
# This is not as performant, but the best we can get from MySQL.
if session.bind.dialect.name == 'mysql':
query = query.limit(bulk)
for id, created_at, event_type, payload in query:
messages.append({'id': id,
'created_at': created_at,
'event_type': event_type,
'payload': json.loads(str(payload))})
return messages
except IntegrityError, e:
raise RucioException(e.args)
def get_ancestors_query(
cls, target_id=bindparam('root_id', type_=Integer),
inclusive=True, tombstone_date=None):
if cls.using_virtuoso:
if isinstance(target_id, list):
raise NotImplementedError()
sql = text(
"""SELECT transitive t_in (1) t_out (2) T_DISTINCT T_NO_CYCLES
source_id, target_id FROM idea_idea_link
WHERE tombstone_date IS NULL"""
).columns(column('source_id'), column('target_id')).alias()
select_exp = select([sql.c.source_id.label('id')]
).select_from(sql).where(sql.c.target_id==target_id)
else:
if isinstance(target_id, list):
root_condition = IdeaLink.target_id.in_(target_id)
else:
root_condition = (IdeaLink.target_id == target_id)
link = select(
[IdeaLink.source_id, IdeaLink.target_id]
).select_from(
IdeaLink
).where(
(IdeaLink.tombstone_date == tombstone_date) &
(root_condition)
).cte(recursive=True)
target_alias = aliased(link)
sources_alias = aliased(IdeaLink)
parent_link = sources_alias.target_id == target_alias.c.source_id
parents = select(
[sources_alias.source_id, sources_alias.target_id]
).select_from(sources_alias).where(parent_link
& (sources_alias.tombstone_date == tombstone_date))
with_parents = link.union(parents)
select_exp = select([with_parents.c.source_id.label('id')]
).select_from(with_parents)
if inclusive:
if isinstance(target_id, int):
target_id = literal_column(str(target_id), Integer)
elif isinstance(target_id, list):
raise NotImplementedError()
# postgres: select * from unnest(ARRAY[1,6,7]) as id
else:
select_exp = select_exp.union(
select([target_id.label('id')]))
return select_exp.alias()