def extract(self, filename):
"""
Extracts the given file to a temporarily and returns
the path of the directory with the extracted content.
"""
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_extract')
self.paths_to_remove.append(tempdir)
if self.verbosity >= 2:
self.stdout.write("Extracting %s\n" % filename)
try:
archive.extract(filename, tempdir)
return tempdir
except (archive.ArchiveException, IOError) as e:
raise CommandError("couldn't extract file %s to %s: %s" %
(filename, tempdir, e))
python类CommandError()的实例源码
def build_potfiles(self):
"""
Build pot files and apply msguniq to them.
"""
file_list = self.find_files(".")
self.remove_potfiles()
self.process_files(file_list)
potfiles = []
for path in self.locale_paths:
potfile = os.path.join(path, '%s.pot' % str(self.domain))
if not os.path.exists(potfile):
continue
args = ['msguniq'] + self.msguniq_options + [potfile]
msgs, errors, status = popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msguniq\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
msgs = normalize_eols(msgs)
with io.open(potfile, 'w', encoding='utf-8') as fp:
fp.write(msgs)
potfiles.append(potfile)
return potfiles
def handle(self, **options):
app_name, target = options.pop('name'), options.pop('directory')
self.validate_name(app_name, "app")
# Check that the app_name cannot be imported.
try:
import_module(app_name)
except ImportError:
pass
else:
raise CommandError(
"%r conflicts with the name of an existing Python module and "
"cannot be used as an app name. Please try another name." % app_name
)
super(Command, self).handle('app', app_name, target, **options)
def handle(self, addrport=None, *args, **options):
# deprecation warning to announce future deletion in R21
util.warn("""This command is deprecated.
You should now run your application with the WSGI interface
installed with your project. Ex.:
gunicorn myproject.wsgi:application
See https://docs.djangoproject.com/en/1.5/howto/deployment/wsgi/gunicorn/
for more info.""")
if args:
raise CommandError('Usage is run_gunicorn %s' % self.args)
if addrport:
sys.argv = sys.argv[:-1]
options['bind'] = addrport
admin_media_path = options.pop('admin_media_path', '')
DjangoApplicationCommand(options, admin_media_path).run()
def get_resetable_apps(app_labels=()):
""" ?????? ??????????, ??? ???????? ????? ???????? """
local_apps = {}
for app in apps.get_apps():
app_path = apps._get_app_path(app)
if app_path.startswith(settings.BASE_DIR):
app_name = app.__name__.rsplit('.', 1)[0]
local_apps[app_name] = app_path
if app_labels:
result_apps = {}
for app_label in app_labels:
if app_label in local_apps:
result_apps[app_label] = local_apps[app_label]
else:
raise CommandError('application %s not found' % app_label)
else:
return result_apps
else:
return local_apps
def start_karma(self):
logger.info(
'Starting karma test watcher process from Django runserver command'
)
self.karma_process = subprocess.Popen(
'yarn run test-karma:watch',
shell=True,
stdin=subprocess.PIPE,
stdout=sys.stdout,
stderr=sys.stderr)
if self.karma_process.poll() is not None:
raise CommandError(
'Karma process failed to start from Django runserver command')
logger.info(
'Django Runserver command has spawned a Karma test watcher process on pid {0}'.
format(self.karma_process.pid))
self.karma_process.wait()
if self.karma_process.returncode != 0 and not self.karma_cleanup_closing:
logger.error("Karma process exited unexpectedly.")
def handle(self, root_url='/', stdout=None, stderr=None, **options):
"""Performs the operation"""
operation = self.operation_class(
root_url=root_url, error_class=CommandError,
stdout=self.stdout, stderr=self.stderr, **options
)
return operation.run()
def handle(self, *args, **kwargs):
program_list = list(self.find_programs())
if len(program_list) < 2:
raise CommandError("Only works with multiple programs")
self.report_on_differences(program_list, 'groups', lambda i: i['attributes']['name'])
self.stdout.write('')
self.report_on_differences(program_list, 'members',
lambda i: i['relationships']['user']['data']['attributes']['username'])
self.stdout.write('')
self.report_on_perm_differences(program_list)
def handle(self, *args, **options):
try:
user = User.objects.get(username=options['username'])
except User.DoesNotExist:
raise CommandError('User named does not exist'.format(options['username']))
token, created = Token.objects.get_or_create(user = user)
self.stdout.write('Token: {}'.format(token.key))
def handle(self, *args, **options):
try:
existing = User.objects.get(username=options['user'])
except User.DoesNotExist:
existing = False
if existing:
raise CommandError('User named {} already exists'.format(options['user']))
password = os.environ.get('ADMINUSER_PASS')
if password:
password_source = 'env'
else:
password_source = 'random'
password = User.objects.make_random_password()
password = os.environ.get('ADMINUSER_PASS', User.objects.make_random_password())
admin = User(
username = options['user'],
email = options['email'],
is_staff = True,
is_superuser = True,
)
admin.set_password(password)
admin.save()
self.stdout.write(self.style.SUCCESS('Created superuser named {user}'.format(**options)))
if password_source == 'random':
self.stdout.write('Password: ' + password)
books_create_test_data.py 文件源码
项目:django-elasticsearch-dsl-drf
作者: barseghyanartur
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
if options.get('number'):
number = options['number']
else:
number = DEFAULT_NUMBER_OF_ITEMS_TO_CREATE
with_books = bool(options.get('with_books'))
if with_books:
try:
books = factories.BookFactory.create_batch(number)
print("{} book objects created.".format(number))
except Exception as err:
raise CommandError(str(err))
try:
book = factories.SingleBookFactory()
print("A single book object is created.")
except Exception as err:
raise CommandError(str(err))
try:
addresses = factories.AddressFactory.create_batch(number)
print("{} address objects created.".format(number))
except Exception as err:
raise CommandError(str(err))
def handle(self, file_name=None, **options):
project_name = os.path.basename(os.getcwd())
dst = file_name is not None and file_name or DEFAULT_FILE_NAME
if os.path.exists(dst):
raise CommandError('Error: file "%s" already exists' % dst)
open(dst, 'w').write(render_to_string('cml/cml-pipelines.txt', {
'project': project_name,
'file': os.path.basename(dst).split('.')[0]
}))
self.stdout.write('"%s" written.' % os.path.join(dst))
def __call__(self, parser, namespace, values, option_string=None):
try:
dt = dateutil.parser.parse(values)
except ValueError:
raise CommandError("can't parse date: {}".format(values))
if dt.tzinfo is None:
dt = timezone.make_aware(dt)
setattr(namespace, self.dest, dt)
def handle(self, *args, **options):
filename = options['filename']
datatype = options['datatype']
filetype = options['filetype']
coordSystem = options['coordSystem']
coordSystem2 = options['coordSystem2']
# coord = options['coord']
uid = options.get('uid') or slugid.nice().decode('utf-8')
name = options.get('name') or op.split(filename)[1]
if options['no_upload']:
if not op.isfile(op.join(settings.MEDIA_ROOT, filename)):
raise CommandError('File does not exist under media root')
django_file = filename
else:
django_file = File(open(filename, 'rb'))
# remove the filepath of the filename
django_file.name = op.split(django_file.name)[1]
tm.Tileset.objects.create(
datafile=django_file,
filetype=filetype,
datatype=datatype,
coordSystem=coordSystem,
coordSystem2=coordSystem2,
owner=None,
uuid=uid,
name=name)
def validate_name(self, name, app_or_project):
if name is None:
raise CommandError("you must provide %s %s name" % (
"an" if app_or_project == "app" else "a", app_or_project))
# If it's not a valid directory name.
if not re.search(r'^[_a-zA-Z]\w*$', name):
# Provide a smart error message, depending on the error.
if not re.search(r'^[_a-zA-Z]', name):
message = 'make sure the name begins with a letter or underscore'
else:
message = 'use only numbers, letters and underscores'
raise CommandError("%r is not a valid %s name. Please %s." %
(name, app_or_project, message))
def check_programs(*programs):
for program in programs:
if find_command(program) is None:
raise CommandError("Can't find %s. Make sure you have GNU "
"gettext tools 0.15 or newer installed." % program)
def gettext_version(self):
# Gettext tools will output system-encoded bytestrings instead of UTF-8,
# when looking up the version. It's especially a problem on Windows.
out, err, status = gettext_popen_wrapper(
['xgettext', '--version'],
stdout_encoding=DEFAULT_LOCALE_ENCODING,
)
m = re.search(r'(\d+)\.(\d+)\.?(\d+)?', out)
if m:
return tuple(int(d) for d in m.groups() if d is not None)
else:
raise CommandError("Unable to get gettext version. Is it installed?")
def write_po_file(self, potfile, locale):
"""
Creates or updates the PO file for self.domain and :param locale:.
Uses contents of the existing :param potfile:.
Uses msgmerge, and msgattrib GNU gettext utilities.
"""
basedir = os.path.join(os.path.dirname(potfile), locale, 'LC_MESSAGES')
if not os.path.isdir(basedir):
os.makedirs(basedir)
pofile = os.path.join(basedir, '%s.po' % str(self.domain))
if os.path.exists(pofile):
args = ['msgmerge'] + self.msgmerge_options + [pofile, potfile]
msgs, errors, status = gettext_popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msgmerge\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
else:
with io.open(potfile, 'r', encoding='utf-8') as fp:
msgs = fp.read()
if not self.invoked_for_django:
msgs = self.copy_plural_forms(msgs, locale)
msgs = msgs.replace(
"#. #-#-#-#-# %s.pot (PACKAGE VERSION) #-#-#-#-#\n" % self.domain, "")
with io.open(pofile, 'w', encoding='utf-8') as fp:
fp.write(msgs)
if self.no_obsolete:
args = ['msgattrib'] + self.msgattrib_options + ['-o', pofile, pofile]
msgs, errors, status = gettext_popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msgattrib\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
def show_list(self, connection, app_names=None):
"""
Shows a list of all migrations on the system, or only those of
some named apps.
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection, ignore_no_migrations=True)
graph = loader.graph
# If we were passed a list of apps, validate it
if app_names:
invalid_apps = []
for app_name in app_names:
if app_name not in loader.migrated_apps:
invalid_apps.append(app_name)
if invalid_apps:
raise CommandError("No migrations present for: %s" % (", ".join(invalid_apps)))
# Otherwise, show all apps in alphabetic order
else:
app_names = sorted(loader.migrated_apps)
# For each app, print its migrations in order from oldest (roots) to
# newest (leaves).
for app_name in app_names:
self.stdout.write(app_name, self.style.MIGRATE_LABEL)
shown = set()
for node in graph.leaf_nodes(app_name):
for plan_node in graph.forwards_plan(node):
if plan_node not in shown and plan_node[0] == app_name:
# Give it a nice title if it's a squashed one
title = plan_node[1]
if graph.nodes[plan_node].replaces:
title += " (%s squashed migrations)" % len(graph.nodes[plan_node].replaces)
# Mark it as applied/unapplied
if plan_node in loader.applied_migrations:
self.stdout.write(" [X] %s" % title)
else:
self.stdout.write(" [ ] %s" % title)
shown.add(plan_node)
# If we didn't print anything, then a small message
if not shown:
self.stdout.write(" (no migrations)", self.style.MIGRATE_FAILURE)