def test_backend_db():
"""Ensure that we are always testing sqlite on fast in memory DB"""
from django.db import connection, connections
if connection.vendor == "sqlite":
assert connections.databases["default"]["NAME"] == ":memory:"
python类databases()的实例源码
def test_broken_database(settings, monkeypatch):
settings.DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:"},
"broken": {
"ENGINE": "django.db.backends.postgresql_psycopg2",
"NAME": "not-existent-db",
"HOST": "127.0.0.1",
"PORT": "21",
"USER": "",
"PASSWORD": ""
},
}
monkeypatch.setattr(connections, "_databases", None)
del connections.databases
ret = get_databases()
assert sorted(ret.keys()) == ["broken", "default"]
# assert ret["default"]["engine"] == "django.db.backends.sqlite3"
assert ret["broken"]["engine"] == "django.db.backends.postgresql_psycopg2"
assert ret["broken"]["host"] == "127.0.0.1:21"
assert ret["broken"]["name"] == "not-existent-db"
assert ret["broken"]["error"] == """could not connect to server: Connection refused
\tIs the server running on host "127.0.0.1" and accepting
\tTCP/IP connections on port 21?
"""
def __exit__(self, return_type, return_value, traceback):
if os.path.exists(self.fullpath):
os.remove(self.fullpath)
name = db_name_from_filename(self.filename)
if name in settings.DATABASES:
settings.DATABASES.pop(name)
if name in connections.databases:
connections.databases.pop(name)
def add_db_to_settings(dbname, filename, gemini_path=GEMINI_DB_PATH):
"""Add a new db to settings.DATABASES"""
connection = {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': join(gemini_path, filename)
}
settings.DATABASES[dbname] = connection
connections.databases[dbname] = connection
logger.debug("(+) Adding connection '{}'".format(dbname))
def remove_db_from_settings(dbname):
"""Remove that connection from settings.DATABASES and connections.databases."""
settings.DATABASES.pop(dbname, None)
connections.databases.pop(dbname, None)
def mock_django_connection(disabled_features=None):
""" Overwrite the Django database configuration with a mocked version.
This is a helper function that does the actual monkey patching.
"""
db = connections.databases['default']
db['PASSWORD'] = '****'
db['USER'] = '**Database disabled for unit tests**'
ConnectionHandler.__getitem__ = MagicMock(name='mock_connection')
# noinspection PyUnresolvedReferences
mock_connection = ConnectionHandler.__getitem__.return_value
if disabled_features:
for feature in disabled_features:
setattr(mock_connection.features, feature, False)
mock_ops = mock_connection.ops
# noinspection PyUnusedLocal
def compiler(queryset, connection, using, **kwargs):
result = MagicMock(name='mock_connection.ops.compiler()')
# noinspection PyProtectedMember
result.execute_sql.side_effect = NotSupportedError(
"Mock database tried to execute SQL for {} model.".format(
queryset.model._meta.object_name))
result.has_results.side_effect = result.execute_sql.side_effect
return result
mock_ops.compiler.return_value.side_effect = compiler
mock_ops.integer_field_range.return_value = (-sys.maxsize - 1, sys.maxsize)
mock_ops.max_name_length.return_value = sys.maxsize
Model.refresh_from_db = Mock() # Make this into a noop.
def set_compiler(self):
engine = connections.databases[self._using]['ENGINE'].rsplit('.')[-1]
self.compile = SMARTSQL_COMPILERS[engine]
def test_time_field(self):
# Only possible to test this on PostGIS at the momemnt. MySQL
# complains about permissions, and SpatiaLite/Oracle are
# insanely difficult to get support compiled in for in GDAL.
if not connections['default'].ops.postgis:
self.skipTest("This database does not support 'ogrinspect'ion")
# Getting the database identifier used by OGR, if None returned
# GDAL does not have the support compiled in.
ogr_db = get_ogr_db_string()
if not ogr_db:
self.skipTest("Your GDAL installation does not support PostGIS databases")
# Writing shapefiles via GDAL currently does not support writing OGRTime
# fields, so we need to actually use a database
model_def = ogrinspect(ogr_db, 'Measurement',
layer_key=AllOGRFields._meta.db_table,
decimal=['f_decimal'])
self.assertTrue(model_def.startswith(
'# This is an auto-generated Django model module created by ogrinspect.\n'
'from django.contrib.gis.db import models\n'
'\n'
'class Measurement(models.Model):\n'
))
# The ordering of model fields might vary depending on several factors (version of GDAL, etc.)
self.assertIn(' f_decimal = models.DecimalField(max_digits=0, decimal_places=0)', model_def)
self.assertIn(' f_int = models.IntegerField()', model_def)
self.assertIn(' f_datetime = models.DateTimeField()', model_def)
self.assertIn(' f_time = models.TimeField()', model_def)
self.assertIn(' f_float = models.FloatField()', model_def)
self.assertIn(' f_char = models.CharField(max_length=10)', model_def)
self.assertIn(' f_date = models.DateField()', model_def)
self.assertTrue(model_def.endswith(
' geom = models.PolygonField()\n'
' objects = models.GeoManager()'
))
def createConnection(self):
"""Create new database connection."""
db = connections.databases[self.alias]
backend = load_backend(db['ENGINE'])
return backend.DatabaseWrapper(
db, self.alias, allow_thread_sharing=True)
def test_atomic_requests_are_enabled(self):
# ATOMIC_REQUESTS *must* be set for the default connection.
self.assertThat(
connections.databases, ContainsDict({
"default": ContainsDict({
"ATOMIC_REQUESTS": Is(True),
}),
}),
)
def test_isolation_level_is_serializable(self):
# Transactions *must* be SERIALIZABLE for the default connection.
self.assertThat(
connections.databases, ContainsDict({
"default": ContainsDict({
"OPTIONS": ContainsDict({
"isolation_level": Equals(
ISOLATION_LEVEL_REPEATABLE_READ),
}),
}),
}),
)
def get_ogr_db_string():
"""
Construct the DB string that GDAL will use to inspect the database.
GDAL will create its own connection to the database, so we re-use the
connection settings from the Django test.
"""
db = connections.databases['default']
# Map from the django backend into the OGR driver name and database identifier
# http://www.gdal.org/ogr/ogr_formats.html
#
# TODO: Support Oracle (OCI).
drivers = {
'django.contrib.gis.db.backends.postgis': ('PostgreSQL', "PG:dbname='%(db_name)s'", ' '),
'django.contrib.gis.db.backends.mysql': ('MySQL', 'MYSQL:"%(db_name)s"', ','),
'django.contrib.gis.db.backends.spatialite': ('SQLite', '%(db_name)s', '')
}
drv_name, db_str, param_sep = drivers[db['ENGINE']]
# Ensure that GDAL library has driver support for the database.
try:
Driver(drv_name)
except:
return None
# SQLite/Spatialite in-memory databases
if db['NAME'] == ":memory:":
return None
# Build the params of the OGR database connection string
params = [db_str % {'db_name': db['NAME']}]
def add(key, template):
value = db.get(key, None)
# Don't add the parameter if it is not in django's settings
if value:
params.append(template % value)
add('HOST', "host='%s'")
add('PORT', "port='%s'")
add('USER', "user='%s'")
add('PASSWORD', "password='%s'")
return param_sep.join(params)