def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
service_catalog=None, tenant_name=None, roles=None,
authorized_tenants=None, enabled=True, domain_id=None,
user_domain_name=None):
def get_user(request):
return user.User(id=id,
token=token,
user=username,
domain_id=domain_id,
user_domain_name=user_domain_name,
tenant_id=tenant_id,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL)
utils.get_user = get_user
python类utils()的实例源码
def setUp(self):
super(SeleniumTestCase, self).setUp()
test_utils.load_test_data(self)
self.mox = mox.Mox()
self._real_get_user = utils.get_user
self.setActiveUser(id=self.user.id,
token=self.token,
username=self.user.name,
tenant_id=self.tenant.id,
service_catalog=self.service_catalog,
authorized_tenants=self.tenants.list())
self.patchers = {}
self.patchers['aggregates'] = mock.patch(
'openstack_dashboard.dashboards.admin'
'.aggregates.panel.Aggregates.can_access',
mock.Mock(return_value=True))
self.patchers['aggregates'].start()
os.environ["HORIZON_TEST_RUN"] = "True"
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def _execute_wrapper(self, method, query, args):
"""Wrapper around execute() and executemany()"""
try:
return method(query, args)
except (PyMysqlPool.mysql.connector.ProgrammingError) as err:
six.reraise(utils.ProgrammingError,
utils.ProgrammingError(err.msg), sys.exc_info()[2])
except (PyMysqlPool.mysql.connector.IntegrityError) as err:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
except PyMysqlPool.mysql.connector.OperationalError as err:
# Map some error codes to IntegrityError, since they seem to be
# misclassified and Django would prefer the more logical place.
if err.args[0] in self.codes_for_integrityerror:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
else:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
except PyMysqlPool.mysql.connector.DatabaseError as err:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
def _execute_wrapper(self, method, query, args):
"""Wrapper around execute() and executemany()"""
try:
return method(query, args)
except (mysql.connector.ProgrammingError) as err:
six.reraise(utils.ProgrammingError,
utils.ProgrammingError(err.msg), sys.exc_info()[2])
except (mysql.connector.IntegrityError) as err:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
except mysql.connector.OperationalError as err:
# Map some error codes to IntegrityError, since they seem to be
# misclassified and Django would prefer the more logical place.
if err.args[0] in self.codes_for_integrityerror:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
else:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
except mysql.connector.DatabaseError as err:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def preprocess(self):
"""
Preprocess (if necessary) a translatable file before passing it to
xgettext GNU gettext utility.
"""
from django.utils.translation import templatize
if not self.is_templatized:
return
with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp:
src_data = fp.read()
if self.domain == 'djangojs':
content = prepare_js_for_gettext(src_data)
elif self.domain == 'django':
content = templatize(src_data, self.path[2:])
with io.open(self.work_path, 'w', encoding='utf-8') as fp:
fp.write(content)
def _execute_wrapper(self, method, query, args):
"""Wrapper around execute() and executemany()"""
try:
return method(query, args)
except (mysql.connector.ProgrammingError) as err:
six.reraise(utils.ProgrammingError,
utils.ProgrammingError(err.msg), sys.exc_info()[2])
except (mysql.connector.IntegrityError) as err:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
except mysql.connector.OperationalError as err:
# Map some error codes to IntegrityError, since they seem to be
# misclassified and Django would prefer the more logical place.
if err.args[0] in self.codes_for_integrityerror:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
else:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
except mysql.connector.DatabaseError as err:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
def get(self, *args, **kwargs):
req = super(RequestFactoryWithMessages, self).get(*args, **kwargs)
req.user = utils.get_user(req)
req.session = []
req._messages = default_storage(req)
return req
def post(self, *args, **kwargs):
req = super(RequestFactoryWithMessages, self).post(*args, **kwargs)
req.user = utils.get_user(req)
req.session = []
req._messages = default_storage(req)
return req
def _setup_user(self):
self._real_get_user = utils.get_user
tenants = self.context['authorized_tenants']
self.setActiveUser(id=self.user.id,
token=self.token,
username=self.user.name,
domain_id=self.domain.id,
user_domain_name=self.domain.name,
tenant_id=self.tenant.id,
service_catalog=self.service_catalog,
authorized_tenants=tenants)
def tearDown(self):
HTTPConnection.connect = self._real_conn_request
context_processors.openstack = self._real_context_processor
utils.get_user = self._real_get_user
mock.patch.stopall()
super(TestCase, self).tearDown()
# cause a test failure if an unmocked API call was attempted
if self.missing_mocks:
raise AssertionError("An unmocked API call was made.")
def tearDown(self):
self.mox.UnsetStubs()
utils.get_user = self._real_get_user
mock.patch.stopall()
self.mox.VerifyAll()
del os.environ["HORIZON_TEST_RUN"]
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
service_catalog=None, tenant_name=None, roles=None,
authorized_tenants=None, enabled=True):
def get_user(request):
return user.User(id=id,
token=token,
user=username,
tenant_id=tenant_id,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL)
utils.get_user = get_user
def invalidate():
OFFBOARD_DAYS = 31
from django.utils import timezone
from django.contrib.auth.models import User
from django.db.models import Q
from social.apps.django_app.default.models import UserSocialAuth
invalidate_delta = timezone.now() - timezone.timedelta(OFFBOARD_DAYS)
print "Running invalidate process with date: %s" % (invalidate_delta)
# has auth credentials (non-null)
criteria = Q(extra_data__isnull=False)
# also is past timeframe
criteria = criteria & (Q(user__last_login__lte=invalidate_delta) | (Q(user__last_login__isnull=True) & Q(user__date_joined__lte=invalidate_delta)))
users = UserSocialAuth.objects.filter(criteria)
for u in users:
try:
u.extra_data = None
u.save()
print "\tInvalidate user %s (last_login=%s, date_joined=%s)" % (u.user, u.user.last_login, u.user.date_joined)
except UserSocialAuth.DoesNotExist, e:
# no record exists; no need to clear anything
pass
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def check_constraints(self, table_names=None):
"""Check rows in tables for invalid foreign key references
Checks each table name in `table_names` for rows with invalid foreign
key references. This method is intended to be used in conjunction with
`disable_constraint_checking()` and `enable_constraint_checking()`, to
determine if rows with invalid references were entered while
constraint checks were off.
Raises an IntegrityError on the first invalid foreign key reference
encountered (if any) and provides detailed information about the
invalid reference in the error message.
Backends can override this method if they can more directly apply
constraint checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE")
"""
ref_query = """
SELECT REFERRING.`{0}`, REFERRING.`{1}` FROM `{2}` as REFERRING
LEFT JOIN `{3}` as REFERRED
ON (REFERRING.`{4}` = REFERRED.`{5}`)
WHERE REFERRING.`{6}` IS NOT NULL AND REFERRED.`{7}` IS NULL"""
cursor = self.cursor()
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = \
self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor,
table_name)
for column_name, referenced_table_name, referenced_column_name \
in key_columns:
cursor.execute(ref_query.format(primary_key_column_name,
column_name, table_name,
referenced_table_name,
column_name,
referenced_column_name,
column_name,
referenced_column_name))
for bad_row in cursor.fetchall():
msg = ("The row in table '{0}' with primary key '{1}' has "
"an invalid foreign key: {2}.{3} contains a value "
"'{4}' that does not have a corresponding value in "
"{5}.{6}.".format(table_name, bad_row[0],
table_name, column_name,
bad_row[1], referenced_table_name,
referenced_column_name))
raise utils.IntegrityError(msg)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output