def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
python类features()的实例源码
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def skipIfDBFeature(*features):
"""
Skip a test if a database has at least one of the named features.
"""
return _deferredSkip(
lambda: any(getattr(connection.features, feature, False) for feature in features),
"Database has feature(s) %s" % ", ".join(features)
)
def skipUnlessDBFeature(*features):
"""
Skip a test unless a database has all the named features.
"""
return _deferredSkip(
lambda: not all(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support feature(s): %s" % ", ".join(features)
)
def test_mock_django_connection(self, mock_handler):
is_foo_before = bool(getattr(connection.features, 'is_foo', False))
mock_django_connection(disabled_features=['is_foo'])
is_foo_after = bool(getattr(connection.features, 'is_foo', False))
self.assertTrue(is_foo_before)
self.assertFalse(is_foo_after)
# noinspection PyUnresolvedReferences,PyStatementEffect
def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def _should_check_constraints(self, connection):
return (
connection.features.can_defer_constraint_checks and
not connection.needs_rollback and connection.is_usable()
)
def skipIfDBFeature(*features):
"""
Skip a test if a database has at least one of the named features.
"""
return _deferredSkip(
lambda: any(getattr(connection.features, feature, False) for feature in features),
"Database has feature(s) %s" % ", ".join(features)
)
def skipUnlessAnyDBFeature(*features):
"""
Skip a test unless a database has any of the named features.
"""
return _deferredSkip(
lambda: not any(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support any of the feature(s): %s" % ", ".join(features)
)
def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def _should_check_constraints(self, connection):
return (
connection.features.can_defer_constraint_checks and
not connection.needs_rollback and connection.is_usable()
)
def skipIfDBFeature(*features):
"""
Skip a test if a database has at least one of the named features.
"""
return _deferredSkip(
lambda: any(getattr(connection.features, feature, False) for feature in features),
"Database has feature(s) %s" % ", ".join(features)
)
def skipUnlessAnyDBFeature(*features):
"""
Skip a test unless a database has any of the named features.
"""
return _deferredSkip(
lambda: not any(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support any of the feature(s): %s" % ", ".join(features)
)
def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def skipIfDBFeature(*features):
"""
Skip a test if a database has at least one of the named features.
"""
return _deferredSkip(
lambda: any(getattr(connection.features, feature, False) for feature in features),
"Database has feature(s) %s" % ", ".join(features)
)
def skipUnlessDBFeature(*features):
"""
Skip a test unless a database has all the named features.
"""
return _deferredSkip(
lambda: not all(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support feature(s): %s" % ", ".join(features)
)
def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
def connections_support_transactions():
"""
Returns True if all connections support transactions.
"""
return all(conn.features.supports_transactions
for conn in connections.all())
def skipIfDBFeature(*features):
"""
Skip a test if a database has at least one of the named features.
"""
return _deferredSkip(
lambda: any(getattr(connection.features, feature, False) for feature in features),
"Database has feature(s) %s" % ", ".join(features)
)
def skipUnlessDBFeature(*features):
"""
Skip a test unless a database has all the named features.
"""
return _deferredSkip(
lambda: not all(getattr(connection.features, feature, False) for feature in features),
"Database doesn't support feature(s): %s" % ", ".join(features)
)
def can_migrate(self, connection):
"""
Return True if the model can/should be migrated on the `connection`.
`connection` can be either a real connection or a connection alias.
"""
if self.proxy or self.swapped or not self.managed:
return False
if isinstance(connection, six.string_types):
connection = connections[connection]
if self.required_db_vendor:
return self.required_db_vendor == connection.vendor
if self.required_db_features:
return all(getattr(connection.features, feat, False)
for feat in self.required_db_features)
return True
def _reset_sequences(self, db_name):
conn = connections[db_name]
if conn.features.supports_sequence_reset:
sql_list = conn.ops.sequence_reset_by_name_sql(
no_style(), conn.introspection.sequence_list())
if sql_list:
with transaction.atomic(using=db_name):
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)