def check_constraints(self, table_names=None):
"""Check rows in tables for invalid foreign key references
Checks each table name in `table_names` for rows with invalid foreign
key references. This method is intended to be used in conjunction with
`disable_constraint_checking()` and `enable_constraint_checking()`, to
determine if rows with invalid references were entered while
constraint checks were off.
Raises an IntegrityError on the first invalid foreign key reference
encountered (if any) and provides detailed information about the
invalid reference in the error message.
Backends can override this method if they can more directly apply
constraint checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE")
"""
ref_query = """
SELECT REFERRING.`{0}`, REFERRING.`{1}` FROM `{2}` as REFERRING
LEFT JOIN `{3}` as REFERRED
ON (REFERRING.`{4}` = REFERRED.`{5}`)
WHERE REFERRING.`{6}` IS NOT NULL AND REFERRED.`{7}` IS NULL"""
cursor = self.cursor()
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = \
self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor,
table_name)
for column_name, referenced_table_name, referenced_column_name \
in key_columns:
cursor.execute(ref_query.format(primary_key_column_name,
column_name, table_name,
referenced_table_name,
column_name,
referenced_column_name,
column_name,
referenced_column_name))
for bad_row in cursor.fetchall():
msg = ("The row in table '{0}' with primary key '{1}' has "
"an invalid foreign key: {2}.{3} contains a value "
"'{4}' that does not have a corresponding value in "
"{5}.{6}.".format(table_name, bad_row[0],
table_name, column_name,
bad_row[1], referenced_table_name,
referenced_column_name))
raise utils.IntegrityError(msg)
python类utils()的实例源码
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by attributes ``self.requires_system_checks`` and
``self.requires_model_validation``, except if force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by attributes ``self.requires_system_checks`` and
``self.requires_model_validation``, except if force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by attributes ``self.requires_system_checks`` and
``self.requires_model_validation``, except if force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by the ``requires_system_checks`` attribute, except if
force-skipped).
"""
if options['no_color']:
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if self.requires_system_checks and not options.get('skip_checks'):
self.check()
if self.requires_migrations_checks:
self.check_migrations()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
output = '%s\n%s\n%s' % (
self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()),
output,
self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()),
)
self.stdout.write(output)
finally:
if saved_locale is not None:
translation.activate(saved_locale)
return output
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path