def handle_template(self, template, subdir):
"""
Determines where the app or project templates are.
Use django.__path__[0] as the default because we don't
know into which directory Django has been installed.
"""
if template is None:
return path.join(django.__path__[0], 'conf', subdir)
else:
if template.startswith('file://'):
template = template[7:]
expanded_template = path.expanduser(template)
expanded_template = path.normpath(expanded_template)
if path.isdir(expanded_template):
return expanded_template
if self.is_url(template):
# downloads the file and returns the path
absolute_path = self.download(template)
else:
absolute_path = path.abspath(expanded_template)
if path.exists(absolute_path):
return self.extract(absolute_path)
raise CommandError("couldn't handle %s template %s." %
(self.app_or_project, template))
python类CommandError()的实例源码
def handle(self, *args, **options):
print('Populate Genes!')
populate_genes()
print('Populate Diseases from OMIM!')
populate_diseases()
#populate diseases from OMIM
print('Populate Diseases from CGD!')
populate_CGD()
#populate dieases from CGD
#populate dieases from CGD
# for poll_id in args:
# try:
# poll = Poll.objects.get(pk=int(poll_id))
# except Poll.DoesNotExist:
# raise CommandError('Poll "%s" does not exist' % poll_id)
# poll.opened = False
# poll.save()
# self.stdout.write('Successfully closed poll "%s"' % poll_id)
def handle(self, *args, **options):
report = options['report'] or 'monthly'
if (report in ['daily', 'monthly', 'quarterly', 'yearly'] and
options['subscription_start'] is None
):
raise CommandError("{} report requires --subscription-start"
.format(report))
report_method = getattr(self, '{}_report'.format(report))
headers, rows = report_method(options)
if options['recent']:
rows = rows[-options['recent']:]
# reports are chronologically ascending by default (mainly because of
# enumerations), but for display we prefer reversed by default.
if not options['ascending']:
rows = reversed(rows)
return headers, rows
def extract(self, filename):
"""
Extracts the given file to a temporarily and returns
the path of the directory with the extracted content.
"""
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_extract')
self.paths_to_remove.append(tempdir)
if self.verbosity >= 2:
self.stdout.write("Extracting %s\n" % filename)
try:
archive.extract(filename, tempdir)
return tempdir
except (archive.ArchiveException, IOError) as e:
raise CommandError("couldn't extract file %s to %s: %s" %
(filename, tempdir, e))
def build_potfiles(self):
"""
Build pot files and apply msguniq to them.
"""
file_list = self.find_files(".")
self.remove_potfiles()
self.process_files(file_list)
potfiles = []
for path in self.locale_paths:
potfile = os.path.join(path, '%s.pot' % str(self.domain))
if not os.path.exists(potfile):
continue
args = ['msguniq'] + self.msguniq_options + [potfile]
msgs, errors, status = gettext_popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msguniq\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
with io.open(potfile, 'w', encoding='utf-8') as fp:
fp.write(msgs)
potfiles.append(potfile)
return potfiles
def _check_permission_clashing(custom, builtin, ctype):
"""
Check that permissions for a model do not clash. Raises CommandError if
there are duplicate permissions.
"""
pool = set()
builtin_codenames = set(p[0] for p in builtin)
for codename, _name in custom:
if codename in pool:
raise CommandError(
"The permission codename '%s' is duplicated for model '%s.%s'." %
(codename, ctype.app_label, ctype.model_class().__name__))
elif codename in builtin_codenames:
raise CommandError(
"The permission codename '%s' clashes with a builtin permission "
"for model '%s.%s'." %
(codename, ctype.app_label, ctype.model_class().__name__))
pool.add(codename)
def handle_template(self, template, subdir):
"""
Determines where the app or project templates are.
Use django.__path__[0] as the default because we don't
know into which directory Django has been installed.
"""
if template is None:
return path.join(django.__path__[0], 'conf', subdir)
else:
if template.startswith('file://'):
template = template[7:]
expanded_template = path.expanduser(template)
expanded_template = path.normpath(expanded_template)
if path.isdir(expanded_template):
return expanded_template
if self.is_url(template):
# downloads the file and returns the path
absolute_path = self.download(template)
else:
absolute_path = path.abspath(expanded_template)
if path.exists(absolute_path):
return self.extract(absolute_path)
raise CommandError("couldn't handle %s template %s." %
(self.app_or_project, template))
def validate_name(self, name, app_or_project):
if name is None:
raise CommandError("you must provide %s %s name" % (
"an" if app_or_project == "app" else "a", app_or_project))
# If it's not a valid directory name.
if six.PY2:
if not re.search(r'^[_a-zA-Z]\w*$', name):
# Provide a smart error message, depending on the error.
if not re.search(r'^[_a-zA-Z]', name):
message = 'make sure the name begins with a letter or underscore'
else:
message = 'use only numbers, letters and underscores'
raise CommandError("%r is not a valid %s name. Please %s." %
(name, app_or_project, message))
else:
if not name.isidentifier():
raise CommandError(
"%r is not a valid %s name. Please make sure the name is "
"a valid identifier." % (name, app_or_project)
)
def extract(self, filename):
"""
Extracts the given file to a temporarily and returns
the path of the directory with the extracted content.
"""
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_extract')
self.paths_to_remove.append(tempdir)
if self.verbosity >= 2:
self.stdout.write("Extracting %s\n" % filename)
try:
archive.extract(filename, tempdir)
return tempdir
except (archive.ArchiveException, IOError) as e:
raise CommandError("couldn't extract file %s to %s: %s" %
(filename, tempdir, e))
def build_potfiles(self):
"""
Build pot files and apply msguniq to them.
"""
file_list = self.find_files(".")
self.remove_potfiles()
self.process_files(file_list)
potfiles = []
for path in self.locale_paths:
potfile = os.path.join(path, '%s.pot' % str(self.domain))
if not os.path.exists(potfile):
continue
args = ['msguniq'] + self.msguniq_options + [potfile]
msgs, errors, status = popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msguniq\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
msgs = normalize_eols(msgs)
with io.open(potfile, 'w', encoding='utf-8') as fp:
fp.write(msgs)
potfiles.append(potfile)
return potfiles
def handle(self, **options):
project_name, target = options.pop('name'), options.pop('directory')
self.validate_name(project_name, "project")
# Check that the project_name cannot be imported.
try:
import_module(project_name)
except ImportError:
pass
else:
raise CommandError(
"%r conflicts with the name of an existing Python module and "
"cannot be used as a project name. Please try another name." % project_name
)
# Create a random SECRET_KEY to put it in the main settings.
options['secret_key'] = get_random_secret_key()
super(Command, self).handle('project', project_name, target, **options)
def handle(self, **options):
app_name, target = options.pop('name'), options.pop('directory')
self.validate_name(app_name, "app")
# Check that the app_name cannot be imported.
try:
import_module(app_name)
except ImportError:
pass
else:
raise CommandError(
"%r conflicts with the name of an existing Python module and "
"cannot be used as an app name. Please try another name." % app_name
)
super(Command, self).handle('app', app_name, target, **options)
loadocdcandidaciesfrom501s.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
"""
Make it happen.
"""
super(Command, self).handle(*args, **options)
if not CandidateContest.objects.exists():
error_message = 'No contests currently loaded (run loadocdcandidatecontests).'
if self._called_from_command_line:
self.failure(error_message)
else:
raise CommandError(error_message)
else:
form501_count = Form501Filing.objects.without_candidacy().count()
self.header(
"Processing %s Form 501 filings without candidacies" % form501_count
)
self.load()
self.success("Done!")
def handle(self, *args, **options):
email = options['email']
if ' ' in email or email.count('@') != 1:
raise CommandError(f'Invalid email {email!r}')
users = find_users(
settings.OIDC_RP_CLIENT_ID,
settings.OIDC_RP_CLIENT_SECRET,
urlparse(settings.OIDC_OP_USER_ENDPOINT).netloc,
email,
requests,
)
for user in users:
if user.get('blocked'):
self.stdout.write(self.style.ERROR('BLOCKED!'))
else:
self.stdout.write(self.style.SUCCESS('NOT blocked!'))
break
else:
self.stdout.write(self.style.WARNING(
f'{email} could not be found in Auth0'
))
def handle(self, **options):
if bool(options['user']) == options['all']:
raise CommandError("Either provide a 'user' to verify or "
"use '--all' to verify all users")
if options['all']:
for user in get_user_model().objects.hide_meta():
try:
utils.verify_user(user)
self.stdout.write("Verified user '%s'" % user.username)
except (ValueError, ValidationError) as e:
self.stderr.write(e.message)
if options['user']:
for user in options['user']:
try:
utils.verify_user(self.get_user(user))
self.stdout.write("User '%s' has been verified" % user)
except (ValueError, ValidationError) as e:
self.stderr.write(e.message)
def build_row_data(self, rawrow):
if len(rawrow) < len(self.name2col):
raise CommandError('line %d: too few columns' % self.rowcount)
statusval = rawrow[self.name2col['status']]
if len(statusval) == 0:
statusval = '0'
row = RowData(
kit=re.sub('_', ' ', rawrow[self.name2col['kit']]).strip(),
subkit=re.sub('_', ' ', rawrow[self.name2col['subkit']]).strip(),
barcode=rawrow[self.name2col['barcode']],
index_seq=rawrow[self.name2col['index_seq']],
index=rawrow[self.name2col['index']],
adapter_seq=rawrow[self.name2col['adapter_seq']],
version=rawrow[self.name2col['version']],
manufacturer=rawrow[self.name2col['manufacturer']],
model_range=rawrow[self.name2col['model_range']],
status=statusval,
)
return row
def handle(self, *args, **opts):
user = None
if opts['username'] is not None:
user = User.objects.get(username=opts['username'])
elif opts['userid'] is not None:
user = User.objects.get(id=opts['userid'])
else:
raise CommandError('Either username or userid is required')
if opts['clear']:
Run.objects.all().delete()
Adapter.objects.all().delete()
Kit.objects.all().delete()
with open(opts['csvfile'], 'r') as f:
if opts['format'] == 'chaim':
self.import_chaim_csv(user, f)
else:
self.import_csv(user, f)
def _download_latest_clinvar_xml(self, dest_dir):
ftp = FTP('ftp.ncbi.nlm.nih.gov')
ftp.login()
ftp.cwd(CV_XML_DIR)
# sort just in case the ftp lists the files in random order
cv_xml_w_date = sorted(
[f for f in ftp.nlst() if re.match(CV_XML_REGEX, f)])
if len(cv_xml_w_date) == 0:
raise CommandError('ClinVar reporting zero XML matching' +
' regex: \'{0}\' in directory {1}'.format(
CV_XML_REGEX, CV_XML_DIR))
ftp_xml_filename = cv_xml_w_date[-1]
dest_filepath = os.path.join(dest_dir, ftp_xml_filename)
with open(dest_filepath, 'w') as fh:
ftp.retrbinary('RETR {0}'.format(ftp_xml_filename), fh.write)
return dest_filepath, ftp_xml_filename
def _create(self, fixture_file_path, less_verbose=0):
try:
self.write_info('Creating fixture %s' % fixture_file_path, 1+less_verbose)
fixture_file_path = re.sub(r'\.zip$', '', fixture_file_path) # we strip away .zip if given
tmp_dir = tempfile.mkdtemp()
# copy media root
shutil.copytree(self._media_root, join(tmp_dir, 'MEDIA_ROOT'))
# database dump
with open(join(tmp_dir, 'db.sql'), 'w') as fp:
return_code = subprocess.call(['pg_dump', '--clean', '--no-owner', self._database_name], stdout=fp)
if return_code != 0:
raise CommandError('pg_dump failed with exit code {}'.format(return_code))
# creating the fixture archive
archive_name = shutil.make_archive(fixture_file_path, 'zip', root_dir=tmp_dir)
self.write_debug(subprocess.check_output(['unzip', '-l', archive_name]))
except:
self.write_debug('Temporary directory %s kept due to exception.' % tmp_dir)
raise
else:
self.write_info('... fixture created', 1+less_verbose)
shutil.rmtree(tmp_dir)
def handle(self, *args, **options):
if options['username']:
users = list(User.objects.filter(username__in=options['username']))
usernames = {user.username for user in users}
if usernames != set(options['username']):
raise CommandError('User mismatch: couldn\'t find "{}"'.format(
'", "'.join(set(options['username']) - usernames)))
else:
users = list(User.objects.filter(is_active=True, email_verified=True))
for user in users:
self.stdout.write(user.username, ending=' ')
if not user.is_active or not user.email_verified:
self.stdout.write(self.style.WARNING('SKIP'))
continue
try:
self.send_update(user)
self.stdout.write(self.style.SUCCESS('OK'))
except Exception as ex:
self.stdout.write(self.style.ERROR('failed: {}'.format(repr(ex))))
def handle_template(self, template, subdir):
"""
Determines where the app or project templates are.
Use django.__path__[0] as the default because we don't
know into which directory Django has been installed.
"""
if template is None:
return path.join(django.__path__[0], 'conf', subdir)
else:
if template.startswith('file://'):
template = template[7:]
expanded_template = path.expanduser(template)
expanded_template = path.normpath(expanded_template)
if path.isdir(expanded_template):
return expanded_template
if self.is_url(template):
# downloads the file and returns the path
absolute_path = self.download(template)
else:
absolute_path = path.abspath(expanded_template)
if path.exists(absolute_path):
return self.extract(absolute_path)
raise CommandError("couldn't handle %s template %s." %
(self.app_or_project, template))
def validate_name(self, name, app_or_project):
if name is None:
raise CommandError("you must provide %s %s name" % (
"an" if app_or_project == "app" else "a", app_or_project))
# If it's not a valid directory name.
if six.PY2:
if not re.search(r'^[_a-zA-Z]\w*$', name):
# Provide a smart error message, depending on the error.
if not re.search(r'^[_a-zA-Z]', name):
message = 'make sure the name begins with a letter or underscore'
else:
message = 'use only numbers, letters and underscores'
raise CommandError("%r is not a valid %s name. Please %s." %
(name, app_or_project, message))
else:
if not name.isidentifier():
raise CommandError(
"%r is not a valid %s name. Please make sure the name is "
"a valid identifier." % (name, app_or_project)
)
def extract(self, filename):
"""
Extracts the given file to a temporarily and returns
the path of the directory with the extracted content.
"""
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_extract')
self.paths_to_remove.append(tempdir)
if self.verbosity >= 2:
self.stdout.write("Extracting %s\n" % filename)
try:
archive.extract(filename, tempdir)
return tempdir
except (archive.ArchiveException, IOError) as e:
raise CommandError("couldn't extract file %s to %s: %s" %
(filename, tempdir, e))
def build_potfiles(self):
"""
Build pot files and apply msguniq to them.
"""
file_list = self.find_files(".")
self.remove_potfiles()
self.process_files(file_list)
potfiles = []
for path in self.locale_paths:
potfile = os.path.join(path, '%s.pot' % str(self.domain))
if not os.path.exists(potfile):
continue
args = ['msguniq'] + self.msguniq_options + [potfile]
msgs, errors, status = popen_wrapper(args)
if errors:
if status != STATUS_OK:
raise CommandError(
"errors happened while running msguniq\n%s" % errors)
elif self.verbosity > 0:
self.stdout.write(errors)
msgs = normalize_eols(msgs)
with io.open(potfile, 'w', encoding='utf-8') as fp:
fp.write(msgs)
potfiles.append(potfile)
return potfiles
def handle(self, **options):
app_name, target = options.pop('name'), options.pop('directory')
self.validate_name(app_name, "app")
# Check that the app_name cannot be imported.
try:
import_module(app_name)
except ImportError:
pass
else:
raise CommandError(
"%r conflicts with the name of an existing Python module and "
"cannot be used as an app name. Please try another name." % app_name
)
super(Command, self).handle('app', app_name, target, **options)
def create_cache(sizes, options):
"""
Clears the cache for the given files
"""
size_list = [size.strip(' ,') for size in sizes]
if len(size_list) < 1:
sizes = PhotoSize.objects.all()
else:
sizes = PhotoSize.objects.filter(name__in=size_list)
if not len(sizes):
raise CommandError('No photo sizes were found.')
print('Flushing cache...')
for cls in ImageModel.__subclasses__():
for photosize in sizes:
print('Flushing %s size images' % photosize.name)
for obj in cls.objects.all():
obj.remove_size(photosize)
def create_cache(sizes, options):
"""
Creates the cache for the given files
"""
reset = options.get('reset', None)
size_list = [size.strip(' ,') for size in sizes]
if len(size_list) < 1:
sizes = PhotoSize.objects.filter(pre_cache=True)
else:
sizes = PhotoSize.objects.filter(name__in=size_list)
if not len(sizes):
raise CommandError('No photo sizes were found.')
print('Caching photos, this may take a while...')
for cls in ImageModel.__subclasses__():
for photosize in sizes:
print('Cacheing %s size images' % photosize.name)
for obj in cls.objects.all():
if reset:
obj.remove_size(photosize)
obj.create_size(photosize)
def handle(self, *args, **options):
path = options.get('path') or settings.CARD_IMAGES_PATH
path = os.path.realpath(path)
if not os.path.exists(path):
raise CommandError('Image path does not exist')
def check_extension(f):
f = f.lower()
return f[f.rfind('.'):] in SUPPORTED_FORMATS
for *_, files in os.walk(path):
available = set(filter(check_extension, files))
registered = {c.path for c in Card.objects.all()}
Card.objects.exclude(path__in=available).delete()
for path in available.difference(registered):
Card(path=path).save()
def handle_template(self, template, subdir):
"""
Determines where the app or project templates are.
Use django.__path__[0] as the default because we don't
know into which directory Django has been installed.
"""
if template is None:
return path.join(django.__path__[0], 'conf', subdir)
else:
if template.startswith('file://'):
template = template[7:]
expanded_template = path.expanduser(template)
expanded_template = path.normpath(expanded_template)
if path.isdir(expanded_template):
return expanded_template
if self.is_url(template):
# downloads the file and returns the path
absolute_path = self.download(template)
else:
absolute_path = path.abspath(expanded_template)
if path.exists(absolute_path):
return self.extract(absolute_path)
raise CommandError("couldn't handle %s template %s." %
(self.app_or_project, template))
def validate_name(self, name, app_or_project):
if name is None:
raise CommandError("you must provide %s %s name" % (
"an" if app_or_project == "app" else "a", app_or_project))
# If it's not a valid directory name.
if six.PY2:
if not re.search(r'^[_a-zA-Z]\w*$', name):
# Provide a smart error message, depending on the error.
if not re.search(r'^[_a-zA-Z]', name):
message = 'make sure the name begins with a letter or underscore'
else:
message = 'use only numbers, letters and underscores'
raise CommandError("%r is not a valid %s name. Please %s." %
(name, app_or_project, message))
else:
if not name.isidentifier():
raise CommandError(
"%r is not a valid %s name. Please make sure the name is "
"a valid identifier." % (name, app_or_project)
)