def test_users_missing(settings):
out = io.StringIO()
user1 = UserFactory.create()
user2 = UserFactory.create()
with pytest.raises(CommandError) as exc:
call_command('sso_ping_discourse', 'bar', user1.username, 'baz', stdout=out)
assert user1.username not in out.getvalue()
assert user2.username not in str(exc.value)
assert user2.username not in out.getvalue()
assert str(exc.value) in (
'User mismatch: couldn\'t find "bar", "baz"',
'User mismatch: couldn\'t find "baz", "bar"',
)
python类CommandError()的实例源码
def handle(self, *args, **options):
domains = Domain.objects.all()
if options['domain-name']:
domains = domains.filter(name__in=options['domain-name'])
domain_names = domains.values_list('name', flat=True)
for domain_name in options['domain-name']:
if domain_name not in domain_names:
raise CommandError('{} is not a known domain'.format(domain_name))
for domain in domains:
self.stdout.write('%s ...' % domain.name, ending='')
try:
domain.sync_from_pdns()
self.stdout.write(' synced')
except Exception as e:
if str(e).startswith('Could not find domain ') \
and domain.owner.captcha_required:
self.stdout.write(' skipped')
else:
self.stdout.write(' failed')
msg = 'Error while processing {}: {}'.format(domain.name, e)
raise CommandError(msg)
renamefieldhistory.py 文件源码
项目:django-field-history
作者: grantmcconnaughey
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
model_name = options.get('model')
from_field = options.get('from_field')
to_field = options.get('to_field')
if not model_name:
raise CommandError('--model_name is a required argument')
if not from_field:
raise CommandError('--from_field is a required argument')
if not to_field:
raise CommandError('--to_field is a required argument')
model = apps.get_model(model_name)
content_type = ContentType.objects.get_for_model(model)
field_histories = FieldHistory.objects.filter(content_type=content_type, field_name=from_field)
self.stdout.write('Updating {} FieldHistory object(s)\n'.format(field_histories.count()))
field_histories.update(field_name=to_field)
def handle_subcommnad(self, root, cmd, database, options, subcmd, argvs):
cmd_type = None
if subcmd == 'create-all':
cmd_type = CreateAllSubCommand(root, cmd, database,
options, self.stdout, self.stderr,
True)
elif subcmd == 'drop-all':
cmd_type = DropAllSubCommand(root, cmd, database, options,
self.stdout, self.stderr, True)
if cmd_type is not None:
cmd_type.run_from_argv(argvs)
else:
if options.traceback:
raise CommandError('Invalid Command.')
self.stderr.write('CommandError: Invalid Command.')
sys.exit(1)
def handle(self, *args, **options):
try:
from_site = int(options.get('from_site', None))
except Exception:
from_site = settings.SITE_ID
try:
to_site = int(options.get('to_site', None))
except Exception:
to_site = settings.SITE_ID
try:
assert from_site != to_site
except AssertionError:
raise CommandError('Sites must be different')
from_site = self.get_site(from_site)
to_site = self.get_site(to_site)
pages = Page.objects.drafts().filter(site=from_site, depth=1)
with transaction.atomic():
for page in pages:
page.copy_page(None, to_site)
self.stdout.write('Copied CMS Tree from SITE_ID {0} successfully to SITE_ID {1}.\n'.format(from_site.pk, to_site.pk))
def handle(self, *args, **kwargs):
migrator = MigrationSession(self.stderr, kwargs['database'])
failure = False
try:
migrator.apply_all()
except CommandError as e:
self.stderr.write("Migration error: {}".format(e))
failure = True
state = dump_migration_session_state(migrator.state)
self.stdout.write(state)
if kwargs['output_file']:
with open(kwargs['output_file'], 'w') as outfile:
outfile.write(state)
sys.exit(int(failure))
def handle(self, *args, **options):
url = options['url']
name = options['name']
if url or name:
if not name or not url:
raise CommandError(MISSING_ARG_MSG)
index_catalog(DataJson(url), name)
return
catalogs = yaml.load(requests.get(CATALOGS_INDEX).text)
for catalog_id, values in catalogs.items():
if values['federado'] and values['formato'] == 'json':
try:
catalog = DataJson(values['url'])
except (IOError, ValueError) as e:
logging.warn(READ_ERROR, catalog_id, e)
continue
index_catalog(catalog, catalog_id)
def handle(self, **options):
app_name, target = options.pop('name'), options.pop('directory')
# Set the top directory to root of Biohub instead of current
# path.
if target is None:
target = os.path.join(settings.BIOHUB_DIR, app_name)
try:
os.makedirs(target)
except OSError as e:
if e.errno == errno.EEXIST:
message = "'%s' already exists" % target
else:
message = e
raise CommandError(message)
# Use custom app template
if options['template'] is None:
options['template'] = os.path.join(
settings.BIOHUB_CORE_DIR, 'app_template')
super(StartappCommand, self).handle('app', app_name, target, **options)
def test_no_matching_fingerprint_raises_error(self):
out = StringIO()
err = StringIO()
missing_fingerprint = '01234567890ABCDEF01234567890ABCDEF01234567'
rgx = re.compile(r'''^Key matching fingerprint '{fp}' not '''
r'''found.$'''.format(fp=missing_fingerprint))
self.assertEquals(Key.objects.count(), 0)
with self.assertRaisesRegex(CommandError, rgx):
call_command('email_signing_key', missing_fingerprint,
stdout=out, stderr=err)
self.assertEquals(out.getvalue(), '')
self.assertEquals(err.getvalue(), '')
def test_startdash_usage_empty(self):
self.assertRaises(CommandError, call_command, 'startdash')
def test_startpanel_usage_empty(self):
self.assertRaises(CommandError, call_command, 'startpanel')
__init__.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def get_or_create_processed_version(self):
"""
Get or create the current processed version.
Return a tuple (ProcessedDataVersion object, created), where
created is a boolean specifying whether a version was created.
"""
# get the latest raw data version
try:
latest_raw_version = RawDataVersion.objects.latest(
'release_datetime',
)
except RawDataVersion.DoesNotExist:
raise CommandError(
'No raw CAL-ACCESS data loaded (run `python manage.py '
'updatecalaccessrawdata`).'
)
# check if latest raw version update completed
if latest_raw_version.update_stalled:
msg_tmp = 'Update to raw version released at %s did not complete'
raise CommandError(
msg_tmp % latest_raw_version.release_datetime.ctime()
)
return ProcessedDataVersion.objects.get_or_create(
raw_version=latest_raw_version,
)
def handle(self, **options):
if options['plain']:
warnings.warn(
"The --plain option is deprecated in favor of the -i python or --interface python option.",
RemovedInDjango20Warning
)
options['interface'] = 'python'
# Execute the command and exit.
if options['command']:
exec(options['command'])
return
# Execute stdin if it has anything to read and exit.
# Not supported on Windows due to select.select() limitations.
if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
exec(sys.stdin.read())
return
available_shells = [options['interface']] if options['interface'] else self.shells
for shell in available_shells:
try:
return getattr(self, shell)(options)
except ImportError:
pass
raise CommandError("Couldn't import {} interface.".format(shell))
test_migration_dependencies_command.py 文件源码
项目:django-migrations-graph
作者: dizballanze
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_call_without_apps_arguments_raise_command_error(self):
""" Command call without applications lists should return usage info """
with self.assertRaises(CommandError):
call_command(self.COMMAND_NAME)
def test_parent_command_throws_exception_if_no_subcommand(self):
with self.assertRaises(CommandError):
call_command(
'parent_command', stdout=StringIO(), interactive=False)
def get_past_ungraded_course_run(user=None, course=None, now=None):
"""Loop through past course runs and find one without grade data"""
past_runs = CourseRun.objects.filter(
course=course,
end_date__lt=now,
).exclude(end_date=None).order_by('-end_date').all()
for past_run in past_runs:
if not (CachedCurrentGradeHandler(user).exists(past_run) or
FinalGrade.objects.filter(user=user, course_run=past_run).exists()):
return past_run
raise CommandError("Can't find past run that isn't already passed/failed for Course '{}'".format(course.title))
def handle(self, *args, **kwargs): # pylint: disable=unused-argument
if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False):
raise CommandError('OPEN_DISCUSSIONS_USER_SYNC is set to False (so disabled).')
sync_discussion_users.delay()
self.stdout.write(self.style.SUCCESS('Async job to backfill users submitted'))
def handle(self, *args, **kwargs): # pylint: disable=unused-argument
edx_course_key = kwargs.get('edx_course_key')
try:
run = CourseRun.objects.get(edx_course_key=edx_course_key)
except CourseRun.DoesNotExist:
raise CommandError('Course Run for course_id "{}" does not exist'.format(edx_course_key))
try:
can_freeze = run.can_freeze_grades
except ImproperlyConfigured:
raise CommandError('Course Run for course_id "{}" is missing the freeze date'.format(edx_course_key))
if not can_freeze:
raise CommandError('Course Run for course_id "{}" cannot be frozen yet'.format(edx_course_key))
if CourseRunGradingStatus.is_complete(run):
self.stdout.write(
self.style.SUCCESS(
'Final grades for course "{0}" are already complete'.format(edx_course_key)
)
)
return
freeze_course_run_final_grades.delay(run.id)
self.stdout.write(
self.style.SUCCESS(
'Successfully submitted async task to freeze final grades for course "{0}"'.format(edx_course_key)
)
)
def handle(self, **options):
if options['plain']:
warnings.warn(
"The --plain option is deprecated in favor of the -i python or --interface python option.",
RemovedInDjango20Warning
)
options['interface'] = 'python'
# Execute the command and exit.
if options['command']:
exec(options['command'])
return
# Execute stdin if it has anything to read and exit.
# Not supported on Windows due to select.select() limitations.
if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
exec(sys.stdin.read())
return
available_shells = [options['interface']] if options['interface'] else self.shells
for shell in available_shells:
try:
return getattr(self, shell)(options)
except ImportError:
pass
raise CommandError("Couldn't import {} interface.".format(shell))
def check_apps(apps):
"""Check if a list of apps is entirely contained in the list of installed apps."""
for app in apps:
installed_apps = settings.INSTALLED_APPS
if app not in installed_apps:
raise CommandError('App %s not contained in INSTALLED_APPS %s'
% (app, settings.INSTALLED_APPS))
def test_renamefieldhistory_model_arg_is_required(self):
Person.objects.create(name='Initial Name')
self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)
with self.assertRaises(CommandError):
call_command('renamefieldhistory', from_field='name', to_field='name2')
def test_renamefieldhistory_from_field_arg_is_required(self):
Person.objects.create(name='Initial Name')
self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)
with self.assertRaises(CommandError):
call_command('renamefieldhistory', model='tests.Person', to_field='name2')
def test_renamefieldhistory_to_field_arg_is_required(self):
Person.objects.create(name='Initial Name')
self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)
with self.assertRaises(CommandError):
call_command('renamefieldhistory', model='tests.Person', from_field='name')
def handle(self, *args, **options):
date = datetime.now().date()
backup_name = '{}.zip'.format(date.strftime('%Y_%m_%d'))
if args:
backup_dir = os.path.abspath(args[0])
else:
backup_dir = settings.BACKUP_ROOT
if not os.path.exists(backup_dir):
raise CommandError('output directory does not exists')
backup_path = os.path.join(backup_dir, backup_name)
with zipfile.ZipFile(backup_path, 'w') as ziph:
for root, dirs, files in os.walk(settings.MEDIA_ROOT):
for file in files:
abspath = os.path.abspath(os.path.join(root, file))
relpath = os.path.relpath(abspath, settings.MEDIA_ROOT)
ziph.write(abspath, os.path.join('media', relpath))
# db dump
dump_path = os.path.join(settings.BASE_DIR, 'dump.json')
call_command('dump', output=dump_path)
ziph.write(dump_path, 'dump.json')
os.unlink('dump.json')
self.stdout.write('backup saved to "%s"' % backup_path)
def handle(self, *args, **options):
if not all(getattr(settings, name) for name in ['FXA_ACCESS_KEY_ID',
'FXA_SECRET_ACCESS_KEY',
'FXA_S3_BUCKET']):
raise CommandError('FXA S3 Bucket access not configured')
main()
if options['cron']:
log('cron schedule starting')
schedule.start()
def handle(self, *args, **options):
if settings.MAINTENANCE_MODE:
raise CommandError('Command unavailable in maintenance mode')
count = 0
for task in QueuedTask.objects.all()[:options['num_tasks']]:
task.retry()
count += 1
print '{} processed. {} remaining.'.format(count, QueuedTask.objects.count())
def run_from_argv(self, argv):
"""
Set up any environment changes requested (e.g., Python path
and Django settings), then run this command. If the
command raises a ``CommandError``, intercept it and print it sensibly
to stderr. If the ``--traceback`` option is present or the raised
``Exception`` is not ``CommandError``, raise it.
"""
self._called_from_command_line = True
parser = self.create_parser()
options = parser.parse_args(argv)
cmd_options = vars(options)
# Move positional args out of options to mimic legacy optparse
args = cmd_options.pop('args', ())
try:
self.execute(*args, **cmd_options)
except Exception as e:
if self._django_options.traceback or not isinstance(e, CommandError):
raise
# SystemCheckError takes care of its own formatting.
if isinstance(e, SystemCheckError):
self.stderr.write(str(e), lambda x: x)
else:
self.stderr.write('%s: %s' % (e.__class__.__name__, e))
sys.exit(1)
finally:
try:
connections.close_all()
except ImproperlyConfigured:
# Ignore if connections aren't setup at this point (e.g. no
# configured settings).
pass
def _localimport(drive_id, channel_id, node_ids=None, update_progress=None, check_for_cancel=None):
drives = get_mounted_drives_with_channel_info()
drive = drives[drive_id]
# copy channel's db file then copy all the content files from storage dir
available_channel_ids = [c["id"] for c in drive.metadata["channels"]]
assert channel_id in available_channel_ids, "The given channel was not found in the drive."
try:
call_command(
"importchannel",
"local",
channel_id,
drive.datafolder,
update_progress=update_progress,
check_for_cancel=check_for_cancel
)
call_command(
"importcontent",
"local",
channel_id,
drive.datafolder,
node_ids=node_ids,
update_progress=update_progress,
check_for_cancel=check_for_cancel
)
except UserCancelledError:
try:
call_command("deletechannel", channel_id, update_progress=update_progress)
except CommandError:
pass
raise
def handle(self, *args, **options):
if not TRANSTOOL_DL_URL or not TRANSTOOL_DL_KEY:
raise CommandError('Please, set TRANSTOOL_DL_URL and TRANSTOOL_DL_KEY settings.')
if options['mo_only'] and options['po_only']:
raise CommandError('Use only --mo-only or --po-only but not both.')
self.stdout.write('Download file: Send POST request to {}'.format(TRANSTOOL_DL_URL))
r = requests.post(TRANSTOOL_DL_URL, {
'key': TRANSTOOL_DL_KEY,
'po-only': str(int(options['po_only'])),
'mo-only': str(int(options['mo_only'])),
}, stream=True)
if r.status_code != 200:
self.stdout.write('Request status code is not 200: {}'.format(r.status_code))
self.stdout.write('Fail.', ending='\n\n')
sys.exit(1)
file_content = BytesIO()
for chunk in r.iter_content(chunk_size=(16 * 1024)):
file_content.write(chunk)
file_content.seek(0, os.SEEK_END)
file_content_size = file_content.tell()
self.stdout.write('Downloaded file {} {} bytes'.format(r.headers['Content-Type'], file_content_size))
if options['po_only']:
exts = ['.po']
elif options['mo_only']:
exts = ['.mo']
else:
exts = ['.po', '.mo']
diff_info = self._get_diff_info(file_content, exts)
if options['diff']:
self.print_diff_info(diff_info)
else:
self.copy_files(diff_info, file_content)
self.stdout.write('Done.', ending='\n\n')
def handle(self, **options):
directory = options.get('directory', None)
# directory key is already present in options which is None.
name = options['name']
full_path = None
if not directory:
"""
If directory is not provided then use DEFAULT_APPS_DIRECTORY.
This block creates DEFAULT_APPS_DIRECTORY/app_name directory and
pass that path to default startapp command.
"""
directory = os.path.join(settings.BASE_DIR, settings.DEFAULT_APPS_DIRECTORY)
full_path = os.path.join(directory, name)
if not os.path.exists(full_path):
os.makedirs(full_path)
options['directory'] = full_path
try:
super().handle(**options)
if full_path:
"""
If apps directory is used then change the app name to DEFAULT_APPS_DIRECTORY.app_name
in AppConfig
"""
apps_py = os.path.join(full_path, 'apps.py')
if os.path.isfile(apps_py):
for line in fileinput.input(apps_py, inplace=True):
line = line.replace("'%s'" % name, "'%s.%s'" % (settings.DEFAULT_APPS_DIRECTORY, name))
print(line, end='')
except CommandError:
if full_path:
shutil.rmtree(full_path)
raise