def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting Schema creation..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
con = connect(dbname=dbname, user=user, host=host, password=password)
self.stdout.write(self.style.SUCCESS('Adding schema {schema} to database {dbname}'
.format(schema=settings.SCHEMA, dbname=dbname)))
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('CREATE SCHEMA {schema};'.format(schema=settings.SCHEMA))
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All Done!'))
python类DATABASES的实例源码
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting Schema deletion..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
con = connect(dbname=dbname, user=user, host=host, password=password)
self.stdout.write(self.style.SUCCESS('Removing schema {schema} from database {dbname}'
.format(schema=settings.SCHEMA, dbname=dbname)))
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('DROP SCHEMA {schema} CASCADE;'.format(schema=settings.SCHEMA))
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All Done.'))
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting DB creation..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
self.stdout.write(self.style.SUCCESS('Connecting to host..'))
con = connect(dbname='postgres', user=user, host=host, password=password)
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.stdout.write(self.style.SUCCESS('Creating database'))
cur = con.cursor()
cur.execute('CREATE DATABASE ' + dbname)
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All done!'))
def get_config():
"""
Gets engine type from Django settings
"""
DB = settings.DATABASES['default']
ENGINE = DB.get('ENGINE', '')
config = {}
if 'postgresql' in ENGINE \
or 'psycopg' in ENGINE:
config['engine'] = ENGINE_POSTGRESQL
elif 'mysql' in ENGINE:
config['engine'] = ENGINE_MYSQL
else:
raise BadConfig('Django configured with unsupported database engine: '
'%s' % DB.get('ENGINE', ''))
return config
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def _switch_to_test_user(self, parameters):
"""
Oracle doesn't have the concept of separate databases under the same user.
Thus, we use a separate user (see _create_test_db). This method is used
to switch to that user. We will need the main user again for clean-up when
we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
entries in the settings dict.
"""
real_settings = settings.DATABASES[self.connection.alias]
real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
self.connection.settings_dict['USER']
real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
self.connection.settings_dict['PASSWORD']
real_test_settings = real_settings['TEST']
test_settings = self.connection.settings_dict['TEST']
real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
self.connection.settings_dict['USER'] = parameters['user']
real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
def __init__(self, db_settings=None, db_backup_name=None):
"""
Constructor
Args:
db_settings (dict): A dict of database settings
db_backup_name (str): The name that will be given to the backup database
"""
self.db_settings = db_settings or settings.DATABASES['default']
self.db_name = self.db_settings['NAME']
if self.db_name[0:5] != 'test_':
raise Exception(
"The test suite is attempting to use the database '{}'."
"The test database should have a name that begins with 'test_'. Exiting...".format(self.db_name)
)
self.db_backup_name = db_backup_name or getattr(settings, 'BACKUP_DB_NAME', self.DEFAULT_BACKUP_DB_NAME)
self.db_cmd_args = [
"-h", self.db_settings['HOST'],
"-p", str(self.db_settings['PORT']),
"-U", self.db_settings['USER']
]
def pg_dump(file_location):
env = os.environ.copy()
env.update({ # requires having password set to test
"PGPASSWORD": settings.DATABASES['default']['PASSWORD']
})
pg_dump = [
'pg_dump',
'-h%s' % settings.DATABASES[settings.CLIPS_DATABASE_ALIAS]['HOST'],
'-U%s' % settings.DATABASES['default']['USER'],
settings.DATABASES['default']['NAME']
]
with Popen(pg_dump, env=env, stdout=PIPE, stderr=STDOUT, bufsize=1
) as task, open(file_location, 'wb') as f:
for line in task.stdout:
f.write(line)
return task.wait()
def dbs_by_environment(environment, write_only=True):
"""
Retrieve all database aliases that contain the given environment.
Args:
environment (str): The environment the databases must contain.
write_only (Optional[bool]): Exclude any read-only databases.
Returns:
Set of aliases.
"""
possible = set()
for alias in settings.DATABASES:
if write_only and is_read_db(alias):
continue
if environment in settings.DATABASES[alias]['ENVIRONMENTS']:
possible.add(alias)
return possible
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
from django.core.exceptions import ImproperlyConfigured
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
from django.core.exceptions import ImproperlyConfigured
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False):
"""
Destroy a test database, prompting the user for confirmation if the
database already exists.
"""
self.connection.close()
test_database_name = self.connection.settings_dict['NAME']
if verbosity >= 1:
test_db_repr = ''
action = 'Destroying'
if verbosity >= 2:
test_db_repr = " ('%s')" % test_database_name
if keepdb:
action = 'Preserving'
print("%s test database for alias '%s'%s..." % (
action, self.connection.alias, test_db_repr))
# if we want to preserve the database
# skip the actual destroying piece.
if not keepdb:
self._destroy_test_db(test_database_name, verbosity)
# Restore the original database name
settings.DATABASES[self.connection.alias]["NAME"] = old_database_name
self.connection.settings_dict["NAME"] = old_database_name
def back_up_database(self, backup_storage, temp_backup_path):
logger.info('Start backing up the database.')
file_path = '{database}_{timestamp}.dump'.format(
database=settings.DATABASES['default']['NAME'],
timestamp=self.timestamp
)
temp_file_path = '{backup_path}/{file_path}'.format(backup_path=temp_backup_path, file_path=file_path)
# Run the `pg_dump` command.
os.system('pg_dump -h {host} -U {user} {database} > {file_path}'.format(
host=settings.DATABASES['default']['HOST'],
user=settings.DATABASES['default']['USER'],
database=settings.DATABASES['default']['NAME'],
file_path=temp_file_path
))
# Store the dump file on the backup bucket.
with open(temp_file_path, 'rb') as database_backup_file:
target_file_path = '{timestamp}/{path}'.format(timestamp=self.timestamp, path=file_path)
backup_storage.save(target_file_path, database_backup_file)
logger.info('Database dump successfully copied to the target storage backend.')
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (Database.DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def _switch_to_test_user(self, parameters):
"""
Oracle doesn't have the concept of separate databases under the same user.
Thus, we use a separate user (see _create_test_db). This method is used
to switch to that user. We will need the main user again for clean-up when
we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
entries in the settings dict.
"""
real_settings = settings.DATABASES[self.connection.alias]
real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
self.connection.settings_dict['USER']
real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
self.connection.settings_dict['PASSWORD']
real_test_settings = real_settings['TEST']
test_settings = self.connection.settings_dict['TEST']
real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
self.connection.settings_dict['USER'] = parameters['user']
real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
def ready(self):
for db in settings.DATABASES:
name = db['NAME']
db_wrapper_class = import_string(db['ENGINE'] + '.base.DatabaseWrapper')
base_model = getattr(db_wrapper_class, 'base_model', None)
if base_model:
models = apps.get_models()
for model in models:
if name == router.db_for_read(model):
for k, v in base_model.__dict__:
if k == 'objects':
model_manager = getattr(model, 'objects', None)
if model_manager:
manager_cls = model_manager.__class__
custom_cls = v.__class__
new_manager = type('AnyBackendCustomManager', (custom_cls, manager_cls), {})
setattr(model, 'objects', new_manager())
else:
setattr(model, 'objects', v)
elif not k.startswith('__'):
setattr(model, k ,v)
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (Database.DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def _switch_to_test_user(self, parameters):
"""
Oracle doesn't have the concept of separate databases under the same user.
Thus, we use a separate user (see _create_test_db). This method is used
to switch to that user. We will need the main user again for clean-up when
we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
entries in the settings dict.
"""
real_settings = settings.DATABASES[self.connection.alias]
real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
self.connection.settings_dict['USER']
real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
self.connection.settings_dict['PASSWORD']
real_test_settings = real_settings['TEST']
test_settings = self.connection.settings_dict['TEST']
real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
self.connection.settings_dict['USER'] = parameters['user']
real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def _switch_to_test_user(self, parameters):
"""
Oracle doesn't have the concept of separate databases under the same user.
Thus, we use a separate user (see _create_test_db). This method is used
to switch to that user. We will need the main user again for clean-up when
we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
entries in the settings dict.
"""
real_settings = settings.DATABASES[self.connection.alias]
real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
self.connection.settings_dict['USER']
real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
self.connection.settings_dict['PASSWORD']
real_test_settings = real_settings['TEST']
test_settings = self.connection.settings_dict['TEST']
real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
self.connection.settings_dict['USER'] = parameters['user']
real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
def handle(self, *args, **options):
try:
connection = connections[options['database']]
cursor = connection.cursor()
database_settings = settings.DATABASES[options['database']]
except ConnectionDoesNotExist:
raise CommandError('Database "%s" does not exist in settings' % options['database'])
if connection.vendor == 'sqlite':
print("Deleting database %s" % database_settings['NAME'])
os.remove(database_settings['NAME'])
elif connection.vendor == 'mysql':
print("Dropping database %s" % database_settings['NAME'])
cursor.execute("DROP DATABASE `%s`;" % database_settings['NAME'])
print("Creating database %s" % database_settings['NAME'])
cursor.execute("CREATE DATABASE `%s` CHARACTER SET utf8;" % database_settings['NAME'])
# Should fix some "MySQL has gone away issues"
cursor.execute("SET GLOBAL max_allowed_packet=32*1024*1024;")
elif connection.vendor == 'postgresql':
print("Dropping and recreating schema public")
cursor.execute("DROP schema public CASCADE; CREATE schema public")
else:
raise CommandError('Database vendor not supported')
def line_strain_changed(sender, instance, action, reverse, model, pk_set, using, **kwargs):
"""
Handles changes to the Line <-> Strain relationship caused by adding/removing/changing the
strain associated with a single line in a study. Detects changes that indicate a need to push
changes across to ICE for the (ICE part -> EDD study) link stored in ICE.
"""
# only care about changes in the forward direction, Line -> Strain
if reverse or check_ice_cannot_proceed():
return
# only execute these signals if using a non-testing database
if using in settings.DATABASES:
action_function = {
'post_add': strain_added,
'pre_remove': strain_removing,
'post_remove': strain_removed,
}.get(action, None)
if action_function:
action_function(instance, pk_set)
# ----- helper functions -----
def modify_row(self, decoder):
modulo = importlib.import_module(self.db+".models")
clase_mane = decoder["tb"]
for s in dir(modulo):
if s.lower() == decoder["tb"]:
clase_mane = s
class_model = getattr(modulo, clase_mane)
if "condition" in decoder:
db_name = settings.DATABASES["default"]["NAME"]
row_id = Model(db_name=db_name, table_name=self.db+"_"+decoder['tb'])
row_id.load_first_by_query(**decoder["condition"])
decoder["fields"]["id"] = row_id.id
if 'id' in decoder["fields"]:
try:
row = class_model.objects.get(pk=decoder['fields']['id'])
except:
row = class_model()
else:
row = class_model()
for k, v in decoder["fields"].items():
setattr(row, k, v)
return row
def __init__(self):
self.database_key = 'data'
self.database_config = settings.DATABASES['data']
try:
database_adapter_class = import_class(settings.ADAPTER_DATABASE)
self.database = database_adapter_class(self.database_key, self.database_config)
except AttributeError:
if self.database_config['ENGINE'] == 'django.db.backends.mysql':
self.database = MySQLAdapter(self.database_key, self.database_config)
else:
raise Exception('No suitable database adapter found.')
try:
download_adapter_class = import_class(settings.ADAPTER_DOWNLOAD)
self.download = download_adapter_class(self.database_key, self.database_config)
except AttributeError:
if self.database_config['ENGINE'] == 'django.db.backends.mysql':
self.download = MysqldumpAdapter(self.database_key, self.database_config)
else:
raise Exception('No suitable download adapter found.')
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection