def get(self, *columns): # DEPRECATED: used for testing
'''Gets the values in the relative columns'''
qry = object_session(self).query(*columns) # .select_from(self.__class__)
jointables = set(c.class_ for c in columns)
if jointables:
joins = []
model = self.__class__
for r in self.__mapper__.relationships.values():
if r.mapper.class_ in jointables:
joins.append(getattr(model, r.key))
if joins:
qry = qry.join(*joins)
metadata = qry.filter(Segment.id == self.id).all()
if len(metadata) == 1:
return metadata[0]
else:
return metadata
python类object_session()的实例源码
def ensure_unique_name(self, key, name):
session = object_session(self)
if session is not None:
model_class = self.__class__
name = make_unique_name(session, model_class, name)
return name
else:
return name
def save_bunch(self, bunch):
session = object_session(self)
return serialize_scan_bunch(session, bunch, self.id)
def save(self, commit=True):
"""Save the record."""
db = object_session(self)
db.add(self)
if commit:
db.commit()
return self
def delete(self, commit=True, *args, **kwargs):
"""Remove the record from the database."""
db = object_session(self)
db.add(self)
db.delete(self)
return commit and db.commit()
def get_bind(obj):
"""
Return the bind for given SQLAlchemy Engine / Connection / declarative
model object.
:param obj: SQLAlchemy Engine / Connection / declarative model object
::
from sqlalchemy_utils import get_bind
get_bind(session) # Connection object
get_bind(user)
"""
if hasattr(obj, 'bind'):
conn = obj.bind
else:
try:
conn = object_session(obj).bind
except UnmappedInstanceError:
conn = obj
if not hasattr(conn, 'execute'):
raise TypeError(
'This method accepts only Session, Engine, Connection and '
'declarative model objects.'
)
return conn
def is_deleted(obj):
return obj in sa.orm.object_session(obj).deleted
def save_inventory(station, downloaded_bytes_data):
"""Saves the inventory. Raises SqlAlchemyError if station's session is None,
or (most likely IntegrityError) on failure
"""
station.inventory_xml = dumps_inv(downloaded_bytes_data)
try:
object_session(station).commit()
except UnmappedInstanceError:
raise
except SQLAlchemyError:
object_session(station).rollback()
raise
def object_session(self):
return object_session(self)
def orm_update_listener(mapper, connection, target):
if getattr(target, '__history_table__', None):
return
session = object_session(target)
if session.is_modified(target, include_collections=False):
target.send_to_changes(connection, CrudOperation.UPDATE)
def convert(self, fitted=True, deconvoluted=True):
precursor_information = self.precursor_information.convert(
) if self.precursor_information is not None else None
session = object_session(self)
conn = session.connection()
if fitted:
q = conn.execute(select([FittedPeak.__table__]).where(
FittedPeak.__table__.c.scan_id == self.id)).fetchall()
peak_set_items = list(
map(make_memory_fitted_peak, q))
peak_set = PeakSet(peak_set_items)
peak_set._index()
peak_index = PeakIndex(np.array([], dtype=np.float64), np.array(
[], dtype=np.float64), peak_set)
else:
peak_index = PeakIndex(np.array([], dtype=np.float64), np.array(
[], dtype=np.float64), PeakSet([]))
if deconvoluted:
q = conn.execute(select([DeconvolutedPeak.__table__]).where(
DeconvolutedPeak.__table__.c.scan_id == self.id)).fetchall()
deconvoluted_peak_set_items = list(
map(make_memory_deconvoluted_peak, q))
deconvoluted_peak_set = DeconvolutedPeakSet(
deconvoluted_peak_set_items)
deconvoluted_peak_set._reindex()
else:
deconvoluted_peak_set = DeconvolutedPeakSet([])
info = self.info or {}
scan = ProcessedScan(
self.scan_id, self.title, precursor_information, int(self.ms_level),
float(self.scan_time), self.index, peak_index, deconvoluted_peak_set,
activation=info.get('activation'))
return scan
def edit_classes(self, *ids_or_labels, mode, auto_commit=True, annotator=None):
""":param mode: either 'add' 'set' or 'del'"""
sess = object_session(self)
needs_commit = False
ids = set(ids_or_labels)
labels = set(_ for _ in ids if type(_) in (bytes, str))
ids -= labels
if mode == 'set':
self.classes[:] = []
else:
classes = list(self.classes)
if mode == 'del':
for c in classes:
if c.id in ids or c.label in labels:
self.classes.remove(c)
needs_commit = True
ids = labels = set() # do not add anything
elif mode == 'add':
for c in classes:
if c.id in ids:
ids.remove(c.id) # already set, remove it and don't add it again
if c.label in labels:
labels.remove(c.label) # already set, remove it and don't add it again
elif mode != 'set':
raise TypeError("`mode` argument needs to be in ('add', 'del', 'set'), "
"'%s' supplied" % str(mode))
if ids or labels:
flt1 = None if not ids else Class.id.in_(ids) # filter on ids, or None
flt2 = None if not labels else Class.label.in_(labels) # filter on labels, or None
flt = flt1 if flt2 is None else flt2 if flt1 is None else (flt1 | flt2)
classids2add = [_[0] for _ in sess.query(Class.id).filter(flt)]
if classids2add:
needs_commit = True
sess.add_all((ClassLabelling(class_id=cid,
segment_id=self.id, annotator=annotator,
is_hand_labelled=annotator or False)
for cid in classids2add))
if needs_commit and auto_commit:
try:
sess.commit()
except SQLAlchemyError as _:
sess.rollback()
raise