def unpatch_db():
for c in connections.all():
unpatch_conn(c)
python类all()的实例源码
def process_request(self, request):
for connection in connections.all():
if not hasattr(connection, '_devserver_cursor_old'):
connection._devserver_queries = []
connection._devserver_cursor_old = connection.cursor
connection.cursor = lambda: DevserverCursorWrapper(connection._devserver_cursor_old(), connection)
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if (db.settings_dict['ATOMIC_REQUESTS']
and db.alias not in non_atomic_requests):
view = transaction.atomic(using=db.alias)(view)
return view
def get_debug_promise(self):
if not self.debug_promise:
self.debug_promise = Promise.all(self.promises)
return self.debug_promise.then(self.on_resolve_all_promises)
def enable_instrumentation(self):
# This is thread-safe because database connections are thread-local.
for connection in connections.all():
wrap_cursor(connection, self)
def disable_instrumentation(self):
for connection in connections.all():
unwrap_cursor(connection)
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def check_database_backends(*args, **kwargs):
issues = []
for conn in connections.all():
issues.extend(conn.validation.check(**kwargs))
return issues
def make_view_atomic(self, view):
non_atomic_requests = getattr(view, '_non_atomic_requests', set())
for db in connections.all():
if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
view = transaction.atomic(using=db.alias)(view)
return view
def ready(self):
# Connections may already exist before we are called.
for conn in connections.all():
if conn.connection is not None:
register_hstore_handler(conn)
connection_created.connect(register_hstore_handler)
CharField.register_lookup(Unaccent)
TextField.register_lookup(Unaccent)
CharField.register_lookup(SearchLookup)
TextField.register_lookup(SearchLookup)
CharField.register_lookup(TrigramSimilar)
TextField.register_lookup(TrigramSimilar)
def _post_teardown(self):
"""Performs any post-test things. This includes:
* Flushing the contents of the database, to leave a clean slate. If
the class has an 'available_apps' attribute, post_migrate isn't fired.
* Force-closing the connection, so the next test gets a clean cursor.
"""
try:
self._fixture_teardown()
super(TransactionTestCase, self)._post_teardown()
if self._should_reload_connections():
# Some DB cursors include SQL statements as part of cursor
# creation. If you have a test that does a rollback, the effect
# of these statements is lost, which can affect the operation of
# tests (e.g., losing a timezone setting causing objects to be
# created with the wrong time). To make sure this doesn't
# happen, get a clean connection at the start of every test.
for conn in connections.all():
conn.close()
finally:
if self.available_apps is not None:
apps.unset_available_apps()
setting_changed.send(sender=settings._wrapped.__class__,
setting='INSTALLED_APPS',
value=settings.INSTALLED_APPS,
enter=False)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def tearDownClass(cls):
if connections_support_transactions():
cls._rollback_atomics(cls.cls_atomics)
for conn in connections.all():
conn.close()
super(TestCase, cls).tearDownClass()