def __exit__(self, exc_type, exc_value, exc_traceback):
event.remove(*self._sql_listen_args)
if exc_type is not None:
return False
if self.test_id:
limit_msg = ((' Limit: %d.' % self.max_count)
if self.max_count is not None else '')
logging.info('%s executed %d queries.%s',
self.test_id, len(self.queries), limit_msg)
if self.max_count is None:
return
if len(self.queries) > self.max_count:
message = ('Maximum query count exceeded: limit %d, executed %d.\n'
'----QUERIES----\n%s\n----END----') % (
self.max_count,
len(self.queries),
'\n'.join(self.queries))
raise AssertionError(message)
python类remove()的实例源码
def no_deps(conn, DBSession):
from sqlalchemy import event
session = DBSession()
@event.listens_for(session, 'after_flush')
def check_dependencies(session, flush_context):
assert not flush_context.cycles
@event.listens_for(conn, "before_execute", retval=True)
def before_execute(conn, clauseelement, multiparams, params):
return clauseelement, multiparams, params
yield
event.remove(session, 'before_flush', check_dependencies)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
test_bulk_lazy_loader.py 文件源码
项目:sqlalchemy_bulk_lazy_loader
作者: operator
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def teardown(self):
super().teardown()
event.remove(Engine, 'before_cursor_execute', self.record_query)
def register(cls, key: str, store_factory, default: bool=False):
"""
Registers the store factory into stores registry, use :meth:`unregister` to remove it.
:param key: The unique key for store.
:param store_factory: A callable that returns an instance of :class:`.Store`.
:param default: If :data:`True` the given store will be marked as default also. in addition you can use
:meth:`.make_default` to mark a store as default.
"""
_factories[key] = store_factory
if default:
cls._default = key
def unregister(cls, key) -> Store:
"""
Opposite of :meth:`.register`. :exc:`.KeyError` may raised if key not found in registry.
:param key: The store key to remove from stores registry.
"""
if key == cls._default:
cls._default = None
if key in _factories:
return _factories.pop(key)
else:
raise KeyError('Cannot find store: %s' % key)
def unbind_events(self) -> None:
"""
Opposite of :meth:`bind_events`.
"""
event.remove(self.session, 'after_commit', self.on_commit)
event.remove(self.session, 'after_soft_rollback', self.on_rollback)
event.remove(self.session, 'persistent_to_deleted', self.on_delete)
def adopted(self, *attachments) -> None:
"""
Opposite of :meth:`.orphaned`
"""
if not self.delete_orphan:
return
for f in attachments:
if f in self._files_orphaned:
self._files_orphaned.remove(f)
# noinspection PyUnresolvedReferences
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def remove(target, identifier, fn):
"""Remove an event listener.
The arguments here should match exactly those which were sent to
:func:`.listen`; all the event registration which proceeded as a result
of this call will be reverted by calling :func:`.remove` with the same
arguments.
e.g.::
# if a function was registered like this...
@event.listens_for(SomeMappedClass, "before_insert", propagate=True)
def my_listener_function(*arg):
pass
# ... it's removed like this
event.remove(SomeMappedClass, "before_insert", my_listener_function)
Above, the listener function associated with ``SomeMappedClass`` was also
propagated to subclasses of ``SomeMappedClass``; the :func:`.remove`
function will revert all of these operations.
.. versionadded:: 0.9.0
"""
_event_key(target, identifier, fn).remove()
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def remove(target, identifier, fn):
"""Remove an event listener.
The arguments here should match exactly those which were sent to
:func:`.listen`; all the event registration which proceeded as a result
of this call will be reverted by calling :func:`.remove` with the same
arguments.
e.g.::
# if a function was registered like this...
@event.listens_for(SomeMappedClass, "before_insert", propagate=True)
def my_listener_function(*arg):
pass
# ... it's removed like this
event.remove(SomeMappedClass, "before_insert", my_listener_function)
Above, the listener function associated with ``SomeMappedClass`` was also
propagated to subclasses of ``SomeMappedClass``; the :func:`.remove`
function will revert all of these operations.
.. versionadded:: 0.9.0
"""
_event_key(target, identifier, fn).remove()
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def tearDown(self):
models.db.session.remove()
models.db.drop_all()
super(BaseTestCase, self).tearDown()
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()
def unregister(cls, session):
if hasattr(session, '_model_changes'):
del session._model_changes
event.remove(session, 'before_flush', cls.record_ops)
event.remove(session, 'before_commit', cls.record_ops)
event.remove(session, 'before_commit', cls.before_commit)
event.remove(session, 'after_commit', cls.after_commit)
event.remove(session, 'after_rollback', cls.after_rollback)
def teardown(self):
for key in self._event_fns:
event.remove(*key)
super_ = super(RemovesEvents, self)
if hasattr(super_, "teardown"):
super_.teardown()