def get_selects(self):
"""
Constructs select queries for this aggregation
Returns: a dictionary of group : queries pairs where
group are the same keys as groups
queries is a list of Select queries, one for each date in dates
"""
queries = {}
for group, groupby in self.groups.items():
columns = [groupby]
columns += self._get_aggregates_sql(group)
gb_clause = make_sql_clause(groupby, ex.literal_column)
query = ex.select(columns=columns, from_obj=self.from_obj)\
.group_by(gb_clause)
queries[group] = [query]
return queries
python类select()的实例源码
def get_creates(self):
"""
Construct create queries for this aggregation
Args:
selects: the dictionary of select queries to use
if None, use self.get_selects()
this allows you to customize select queries before creation
Returns:
a dictionary of group : create pairs where
group are the same keys as groups
create is a CreateTableAs object
"""
return {group: CreateTableAs(self.get_table_name(group),
next(iter(sels)).limit(0))
for group, sels in self.get_selects().items()}
def get_join_table(self):
"""
Generates a join table, consisting of an entry for each combination of
groups and dates in the from_obj
"""
groups = list(self.groups.values())
intervals = list(set(chain(*self.intervals.values())))
queries = []
for date in self.dates:
columns = groups + [ex.literal_column("'%s'::date" % date).label(
self.output_date_column)]
queries.append(ex.select(columns, from_obj=self.from_obj)
.where(self.where(date, intervals))
.group_by(*groups))
return str.join("\nUNION ALL\n", map(str, queries))
def seed_identifier(cls): # @NoSelf
'''returns data_identifier if the latter is not None, else net.sta.loc.cha by querying the
relative channel and station'''
# Needed note: To know what we are doing in 'sel' below, please look:
# http://docs.sqlalchemy.org/en/latest/orm/extensions/hybrid.html#correlated-subquery-relationship-hybrid
# Notes
# - we use limit(1) cause we might get more than one
# result. Regardless of why it happens (because we don't join or apply a distinct?)
# it is relevant for us to get the first result which has the requested
# network+station and location + channel strings
# - the label(...) at the end makes all the difference. The doc is, as always, unclear
# http://docs.sqlalchemy.org/en/latest/core/sqlelement.html#sqlalchemy.sql.expression.label
dot = text("'.'")
sel = select([concat(Station.network, dot, Station.station, dot,
Channel.location, dot, Channel.channel)]).\
where((Channel.id == cls.channel_id) & (Station.id == Channel.station_id)).limit(1).\
label('seedidentifier')
return case([(cls.data_identifier.isnot(None), cls.data_identifier)],
else_=sel)
def get_inserts(self):
"""
Construct insert queries from this aggregation
Args:
selects: the dictionary of select queries to use
if None, use self.get_selects()
this allows you to customize select queries before creation
Returns:
a dictionary of group : inserts pairs where
group are the same keys as groups
inserts is a list of InsertFromSelect objects
"""
return {group: [InsertFromSelect(self.get_table_name(group), sel) for sel in sels]
for group, sels in self.get_selects().items()}
def get_selects(self):
"""
Constructs select queries for this aggregation
Returns: a dictionary of group : queries pairs where
group are the same keys as groups
queries is a list of Select queries, one for each date in dates
"""
queries = {}
for group, groupby in self.groups.items():
intervals = self.intervals[group]
queries[group] = []
for date in self.dates:
columns = [groupby,
ex.literal_column("'%s'::date"
% date).label(self.output_date_column)]
columns += list(chain(*[self._get_aggregates_sql(
i, date, group) for i in intervals]))
gb_clause = make_sql_clause(groupby, ex.literal_column)
query = ex.select(columns=columns, from_obj=self.from_obj)\
.group_by(gb_clause)
query = query.where(self.where(date, intervals))
queries[group].append(query)
return queries
def _loadAttributes(self):
for row in self._connection.execute(ex.select([md.InventoryClasses.c.class_namespace, md.InventoryClasses.c.class_name, md.InventoryClassAttributes]).select_from(ex.join(md.InventoryClassAttributes, md.InventoryClasses, md.InventoryClassAttributes.c.class_id == md.InventoryClasses.c.class_id)).where(and_(md.InventoryClasses.c.class_namespace == self._namespace, md.InventoryClasses.c.class_name == self._class_name))):
self._classId = row["class_id"]
self._attributes[row["attr_key"]] = {}
for i in ["attr_name", "attr_type", "attr_default", "attr_mandatory"]:
self._attributes[row["attr_key"]][i] = row[i]
def getObjectIdByName(self, object_name, object_subname=None):
andList = [ md.InventoryObjects.c.class_id == self._classId, md.InventoryObjects.c.object_name == object_name ]
if not object_subname is None:
andList.append(md.InventoryObjects.c.object_subname == object_subname)
object_id = None
i = 0
for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))):
i = i + 1
object_id = row["object_id"]
if i > 1:
raise LookupException("Too many objects were found")
if i == 0:
raise EmptyLookupException("No objects were found")
return object_id
def search(self, object_id=None, object_name=None, object_subname=None, **kwargs):
andList = [ md.InventoryObjects.c.class_id == self._classId ]
if not object_id is None:
andList.append(md.InventoryObjects.c.object_id == object_id)
if not object_name is None:
andList.append(md.InventoryObjects.c.object_name.like(object_name))
if not object_subname is None:
andList.append(md.InventoryObjects.c.object_subname.like(object_subname))
# append attributes subqueries
for k in kwargs:
if k in self._attributes:
andList.append(md.InventoryObjects.c.object_id.in_(
ex.select([md.InventoryObjectAttributes.c.object_id]).select_from(md.InventoryObjectAttributes).where(and_(
md.InventoryObjectAttributes.c.class_id == self._classId,
md.InventoryObjectAttributes.c.attr_key == k,
md.InventoryObjectAttributes.c.attr_value.like(kwargs[k])
))
))
data = []
for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))):
data.append({
"object_id": row["object_id"],
self._objectName : row["object_name"],
self._objectSubName : row["object_subname"]
})
return data
def attributeExists(self, object_id, attribute_name):
assert not (object_id is None), "At least one identifier must be set"
for count in self._connection.execute(ex.select([func.count()]).select_from(md.InventoryObjectAttributes).where(and_(md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.object_id == object_id, md.InventoryObjectAttributes.c.attr_key == attribute_name))):
count = count[0]
if count == 0:
return False
else:
return True
def _lookupAttribtue(self, index):
if isinstance(index, dict):
if "object_id" in index:
index["attributes"] = {}
for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index["object_id"]))):
index["attributes"][row["attr_key"]] = row["attr_value"]
return index
else:
d = {}
for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index))):
d[row["attr_key"]] = row["attr_value"]
return d
def get_database_engine(config: AttrDict) -> Engine:
try:
return config.DATABASE_ENGINE
except AttributeError:
db_url = config.DATABASE_URL
echo = config.DATABASE_ECHO
engine = create_engine(db_url, echo=echo)
@listens_for(engine, 'engine_connect')
def ping_connection(connection, branch):
"""
Disconnect handling
http://docs.sqlalchemy.org/en/latest/core/
pooling.html#disconnect-handling-pessimistic
"""
if branch:
return
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False
try:
connection.scalar(select([1]))
except DBAPIError as err:
if err.connection_invalidated:
connection.scalar(select([1]))
else:
raise
finally:
connection.should_close_with_result = \
save_should_close_with_result
return engine
def get_account_verification_token(email=None, username=None):
email = email and emailer.normalize_address(email)
username = username and d.get_sysname(username)
logincreate = d.meta.tables['logincreate']
statement = select([logincreate.c.token])
if email:
statement = statement.where(logincreate.c.email.ilike(email))
else:
statement = statement.where(logincreate.c.login_name == username)
return d.engine.scalar(statement)
def _build_total_expressions(self, queryset, totals):
mapper = inspect(self.objects_class)
primary_keys = mapper.primary_key
relationships = {
'aliases': {},
'join_chains': [],
'prefix': 'totals_',
}
aggregates = []
group_cols = OrderedDict()
group_by = []
group_limit = None
for total in totals:
for aggregate, columns in total.items():
if aggregate == self.AGGR_GROUPLIMIT:
if not isinstance(columns, int):
raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value')
group_limit = columns
continue
if not columns:
if aggregate == self.AGGR_GROUPBY:
raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name')
if len(primary_keys) > 1:
aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate))
else:
aggregates.append(Function(aggregate, *primary_keys).label(aggregate))
continue
if not isinstance(columns, list):
columns = [columns]
for column in columns:
expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships,
lambda c, n, v: n)
if expression is not None:
if aggregate == self.AGGR_GROUPBY:
group_cols[column] = expression.label(column)
group_by.append(expression)
else:
aggregates.append(Function(aggregate, expression).label(aggregate))
agg_query = self._apply_joins(queryset, relationships, distinct=False)
group_cols_expr = list(group_cols.values())
columns = group_cols_expr + aggregates
if group_limit:
row_order = list(map(lambda c: c.desc(), aggregates))
columns.append(func.row_number().over(partition_by=group_cols_expr[:-1],
order_by=row_order).label('row_number'))
order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1)))
+ list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr),
len(aggregates) + len(group_cols_expr) + 1))))
agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order)
if group_by:
agg_query = agg_query.group_by(*group_by)
if group_limit:
subquery = agg_query.alias()
agg_query = select([subquery]).where(subquery.c.row_number <= group_limit)
return agg_query, list(group_cols.keys())
def list_bad_replicas_status(state=BadFilesStatus.BAD, rse=None, younger_than=None, older_than=None, limit=None, list_pfns=False, session=None):
"""
List the bad file replicas history states. Method used by the rucio-ui.
:param state: The state of the file (SUSPICIOUS or BAD).
:param rse: The RSE name.
:param younger_than: datetime object to select bad replicas younger than this date.
:param older_than: datetime object to select bad replicas older than this date.
:param limit: The maximum number of replicas returned.
:param session: The database session in use.
"""
result = []
rse_id = None
if rse:
rse_id = get_rse_id(rse, session=session)
query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.created_at, models.BadReplicas.updated_at)
if state:
query = query.filter(models.BadReplicas.state == state)
if rse_id:
query = query.filter(models.BadReplicas.rse_id == rse_id)
if younger_than:
query = query.filter(models.BadReplicas.created_at >= younger_than)
if older_than:
query = query.filter(models.BadReplicas.created_at <= older_than)
query = query.filter(models.RSE.id == models.BadReplicas.rse_id)
if limit:
query = query.limit(limit)
for badfile in query.yield_per(1000):
if list_pfns:
result.append({'scope': badfile.scope, 'name': badfile.name, 'type': DIDType.FILE})
else:
result.append({'scope': badfile.scope, 'name': badfile.name, 'rse': badfile.rse, 'state': badfile.state, 'created_at': badfile.created_at, 'updated_at': badfile.updated_at})
if list_pfns:
reps = []
for rep in list_replicas(result, schemes=['srm', ], unavailable=False, request_id=None, ignore_availability=True, all_states=True, session=session):
pfn = None
if rse in rep['rses'] and rep['rses'][rse]:
pfn = rep['rses'][rse][0]
if pfn and pfn not in reps:
reps.append(pfn)
else:
reps.extend([item for row in rep['rses'].values() for item in row])
list(set(reps))
result = reps
return result
def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None):
"""
List RSE File replicas with no locks.
:param rse: the rse name.
:param bytes: the amount of needed bytes.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
if not rse_id:
rse_id = get_rse_id(rse=rse, session=session)
# filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\
none_value = None # Hack to get pep8 happy...
query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\
with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\
filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\
filter(models.RSEFileAssociation.lock_cnt == 0).\
filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)),
and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\
order_by(models.RSEFileAssociation.tombstone)
# do no delete files used as sources
stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.Request.scope,
models.RSEFileAssociation.name == models.Request.name))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)))
needed_space = bytes
total_bytes, total_files = 0, 0
rows = []
for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000):
if state != ReplicaState.UNAVAILABLE:
total_bytes += bytes
if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space:
break
total_files += 1
if total_files > limit:
break
rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes, 'tombstone': tombstone,
'state': state})
return rows
def list_quarantined_replicas(rse, limit, worker_number=None, total_workers=None, session=None):
"""
List RSE Quarantined File replicas.
:param rse: the rse name.
:param limit: The maximum number of replicas returned.
:param worker_number: id of the executing worker.
:param total_workers: Number of total workers.
:param session: The database session in use.
:returns: a list of dictionary replica.
"""
rse_id = get_rse_id(rse, session=session)
query = session.query(models.QuarantinedReplica.path,
models.QuarantinedReplica.bytes,
models.QuarantinedReplica.scope,
models.QuarantinedReplica.name,
models.QuarantinedReplica.created_at).\
filter(models.QuarantinedReplica.rse_id == rse_id)
# do no delete valid replicas
stmt = exists(select([1]).prefix_with("/*+ index(REPLICAS REPLICAS_PK) */", dialect='oracle')).\
where(and_(models.RSEFileAssociation.scope == models.QuarantinedReplica.scope,
models.RSEFileAssociation.name == models.QuarantinedReplica.name,
models.RSEFileAssociation.rse_id == models.QuarantinedReplica.rse_id))
query = query.filter(not_(stmt))
if worker_number and total_workers and total_workers - 1 > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)]
query = query.filter(text('ORA_HASH(path, :total_workers) = :worker_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter('mod(md5(path), %s) = %s' % (total_workers - 1, worker_number - 1))
elif session.bind.dialect.name == 'postgresql':
query = query.filter('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))
return [{'path': path,
'rse': rse,
'rse_id': rse_id,
'created_at': created_at,
'scope': scope,
'name': name,
'bytes': bytes}
for path, bytes, scope, name, created_at in query.limit(limit)]
def add_volatile_replicas(rse, replicas, session=None):
"""
Bulk add volatile replicas.
:param rse: the rse name.
:param replicas: the list of volatile replicas.
:param session: The database session in use.
:returns: True is successful.
"""
# first check that the rse is a volatile one
try:
rse_id = session.query(models.RSE.id).filter_by(rse=rse, volatile=True).one()[0]
except NoResultFound:
raise exception.UnsupportedOperation('No volatile rse found for %(rse)s !' % locals())
file_clause, replica_clause = [], []
for replica in replicas:
file_clause.append(and_(models.DataIdentifier.scope == replica['scope'],
models.DataIdentifier.name == replica['name'],
~exists(select([1]).prefix_with("/*+ INDEX(REPLICAS REPLICAS_PK) */", dialect='oracle')).where(and_(models.RSEFileAssociation.scope == replica['scope'],
models.RSEFileAssociation.name == replica['name'],
models.RSEFileAssociation.rse_id == rse_id))))
replica_clause.append(and_(models.RSEFileAssociation.scope == replica['scope'],
models.RSEFileAssociation.name == replica['name'],
models.RSEFileAssociation.rse_id == rse_id))
if replica_clause:
now = datetime.utcnow()
session.query(models.RSEFileAssociation).\
with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle').\
filter(or_(*replica_clause)).\
update({'updated_at': now, 'tombstone': now}, synchronize_session=False)
if file_clause:
file_query = session.query(models.DataIdentifier.scope,
models.DataIdentifier.name,
models.DataIdentifier.bytes,
models.DataIdentifier.md5,
models.DataIdentifier.adler32).\
filter(or_(*file_clause))
session.bulk_insert_mappings(
models.RSEFileAssociation,
[{'rse_id': rse_id, 'adler32': adler32, 'state': ReplicaState.AVAILABLE,
'scope': scope, 'name': name, 'lock_cnt': 0, 'tombstone': datetime.utcnow(),
'bytes': bytes, 'md5': md5} for scope, name, bytes, md5, adler32 in file_query])
def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, session=None):
"""
List recent identifiers.
:param did_type : The DID type.
:param thread: The assigned thread for this necromancer.
:param total_threads: The total number of threads of all necromancers.
:param chunk_size: Number of requests to return per yield.
:param session: The database session in use.
"""
stmt = select([1]).\
prefix_with("/*+ INDEX(RULES ATLAS_RUCIO.RULES_SCOPE_NAME_IDX) */",
dialect='oracle').\
where(and_(models.DataIdentifier.scope == models.ReplicationRule.scope,
models.DataIdentifier.name == models.ReplicationRule.name,
models.ReplicationRule.state == RuleState.INJECT))
query = session.query(models.DataIdentifier).\
with_hint(models.DataIdentifier, "index(dids DIDS_IS_NEW_IDX)", 'oracle').\
filter_by(is_new=True).\
filter(~exists(stmt))
if did_type:
if isinstance(did_type, str) or isinstance(did_type, unicode):
query = query.filter_by(did_type=DIDType.from_sym(did_type))
elif isinstance(did_type, EnumSymbol):
query = query.filter_by(did_type=did_type)
if total_threads and (total_threads - 1) > 0:
if session.bind.dialect.name == 'oracle':
bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)]
query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams))
elif session.bind.dialect.name == 'mysql':
query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread)))
elif session.bind.dialect.name == 'postgresql':
query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)))
row_count = 0
for chunk in query.yield_per(10):
row_count += 1
if row_count <= chunk_size:
yield {'scope': chunk.scope, 'name': chunk.name, 'did_type': chunk.did_type} # TODO Change this to the proper filebytes [RUCIO-199]
else:
break
def rebuild_table(table, delete_missing=False):
from virtuoso.alchemy import AddForeignKey, DropForeignKey
print("rebuilding", table)
session = get_session_maker()()
incoming = set(get_incoming_fks(table))
outgoing = set(table.foreign_keys)
all_fkeys = incoming | outgoing
self_ref = incoming & outgoing
try:
for fk in all_fkeys:
if not delete_rows_with_missing_fkey(fk, delete_missing):
print("There are missing keys, will not rebuild " + table.name)
return
except Exception as e:
traceback.print_exc()
print("Could not delete missing keys")
raise e
# Booleans with NULL values
for col in table.c:
if isinstance(col.type, Boolean):
session.execute(table.update().where(col == None).values(**{col.name:0}))
# Drop all keys
for fk in all_fkeys:
try:
session.execute(DropForeignKey(fk))
except Exception as e:
print("Could not drop fkey %s, maybe does not exist." % (fk_as_str(fk),))
print(e)
clone = clone_table(table, table.name+"_temp", False, False)
clone.create(session.bind)
column_names = [c.name for c in table.columns]
sel = select([getattr(table.c, cname) for cname in column_names])
with transaction.manager:
session.execute(clone.insert().from_select(column_names, sel))
mark_changed(session)
session.execute(DropTable(table))
# Should we create it without outgoing first?
table.create(session.bind)
# self ref will make the insert fail.
for fk in self_ref:
try:
session.execute(DropForeignKey(fk))
except Exception as e:
print("Could not drop fkey %s, maybe does not exist." % (fk_as_str(fk),))
print(e)
sel = select([getattr(clone.c, cname) for cname in column_names])
with transaction.manager:
session.execute(table.insert().from_select(column_names, sel))
mark_changed(session)
session.execute(DropTable(clone))
if delete_missing:
# Delete a second time, in case.
for fk in outgoing:
assert delete_rows_with_missing_fkey(fk, True), "OUCH"
for fk in incoming: # includes self_ref
session.execute(AddForeignKey(fk))
def verify(token):
lo = d.meta.tables["login"]
lc = d.meta.tables["logincreate"]
query = d.engine.execute(lc.select().where(lc.c.token == token)).first()
if not query:
raise WeasylError("logincreateRecordMissing")
db = d.connect()
with db.begin():
# Create login record
userid = db.scalar(lo.insert().returning(lo.c.userid), {
"login_name": d.get_sysname(query.username),
"last_login": arrow.now(),
"email": query.email,
})
# Create profile records
db.execute(d.meta.tables["authbcrypt"].insert(), {
"userid": userid,
"hashsum": query.hashpass,
})
db.execute(d.meta.tables["profile"].insert(), {
"userid": userid,
"username": query.username,
"full_name": query.username,
"unixtime": arrow.now(),
"config": "kscftj",
})
db.execute(d.meta.tables["userinfo"].insert(), {
"userid": userid,
"birthday": query.birthday,
})
db.execute(d.meta.tables["userstats"].insert(), {
"userid": userid,
})
db.execute(d.meta.tables["welcomecount"].insert(), {
"userid": userid,
})
# Update logincreate records
db.execute(lc.delete().where(lc.c.token == token))
d.metric('increment', 'verifiedusers')