def handle(self, *args, **options):
try:
trig = data_root().trigger_ids[options['trigger_id']]
except KeyError:
raise CommandError('Trigger %s not found' % options['trigger_id'])
trig.run(access_context='local-cli')
python类CommandError()的实例源码
def handle(self, *args, **options):
try:
client = data_root().clients[options['client']]
except KeyError:
raise CommandError('Client %s not found' % options['client'])
for repository in data_root().repositories:
if repository.name == options['repository']:
break
if repository.id.startswith(options['repository']):
break
if repository.url == options['repository']:
break
else:
raise CommandError('Repository %s not found' % options['repository'])
with open_repository(repository) as borg_repository:
manifest, key = Manifest.load(borg_repository)
with Cache(borg_repository, key, manifest, lock_wait=1) as cache:
names = self.find_archives(manifest, options['archive'], regex=options['regex'])
imported = 0
pi = ProgressIndicatorPercent(msg='Importing archives %4.1f %%: %s', total=len(names), step=0.1)
for name in names:
imported += self.import_archive(manifest, cache, repository, name, client)
pi.show(info=[name])
pi.finish()
print('Imported %d archives.' % imported, file=sys.stderr)
def find_archives(self, manifest, archive, regex):
if regex:
names = []
for name in manifest.archives:
if re.fullmatch(archive, name):
names.append(name)
return names
else:
try:
manifest.archives[archive]
return [archive]
except KeyError:
raise CommandError('Archive %s not found' % archive)
def handle(self, *args, **options):
if not options['queue_names']:
raise CommandError('Queue names (--queues) not specified')
queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
sqs = boto3.resource(
'sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
worker = WorkerFactory.default().create()
while True:
for queue in queues:
messages = queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)
for msg in messages:
self._process_message(msg, worker)
def handle(self, *args, **options):
if not options['url']:
raise CommandError('Worker endpoint url parameter (--url) not found')
if not options['queue_name']:
raise CommandError('Queue name (--queue) not specified')
url = options['url']
queue_name = options['queue_name']
retry_limit = max(int(options['retry_limit']), 1)
try:
self.stdout.write('Connect to SQS')
sqs = boto3.resource(
'sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)
queue = sqs.get_queue_by_name(QueueName=queue_name)
self.stdout.write('> Connected')
while True:
messages = queue.receive_messages(
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
if len(messages) == 0:
break
for msg in messages:
self.stdout.write('Deliver message {}'.format(msg.message_id))
if self._process_message_with_retry(url, retry_limit, msg):
self.stdout.write('> Delivered')
else:
self.stdout.write('> Delivery failed (retry-limit reached)')
msg.delete()
self.stdout.write('Message processing finished')
except ConnectionError:
self.stdout.write('Connection to {} failed. Message processing failed'.format(url))
def handle(self, *args, **options):
if options['file'] is None:
raise CommandError('Missing --file')
names = []
with open(options['file']) as fp:
lines = [l for l in fp.read().split('\n') if l]
for l in lines:
if options['logins']:
names.append(l.strip())
else:
firstname, lastname = [f.strip() for f in l.split('\t')]
names.append((firstname, lastname))
create_users(names, options)
def handle(self, *args, **options):
try:
if len(options.get('ussd_app_name')[0].split()) > 1:
raise CommandError
app_name = options.get('ussd_app_name')[0]
call_command('startapp', app_name, template=path)
except CommandError:
print('Provide a valid django App Name as documented here: '
' https://docs.djangoproject.com/en/1.10/ref/django-admin/')
def test_copy_bad_languages(self):
out = StringIO()
with self.assertRaises(CommandError) as command_error:
management.call_command(
'cms', 'copy', 'lang', '--from-lang=it', '--to-lang=fr', interactive=False,
stdout=out
)
self.assertEqual(str(command_error.exception), 'Both languages have to be present in settings.LANGUAGES and settings.CMS_LANGUAGES')
def get_site(self, site_id):
if site_id:
try:
return Site.objects.get(pk=site_id)
except (ValueError, Site.DoesNotExist):
raise CommandError('There is no site with given site id.')
else:
return None
def handle(self, **options):
if options['plain']:
warnings.warn(
"The --plain option is deprecated in favor of the -i python or --interface python option.",
RemovedInDjango20Warning
)
options['interface'] = 'python'
# Execute the command and exit.
if options['command']:
exec(options['command'])
return
# Execute stdin if it has anything to read and exit.
# Not supported on Windows due to select.select() limitations.
if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
exec(sys.stdin.read())
return
available_shells = [options['interface']] if options['interface'] else self.shells
for shell in available_shells:
try:
return getattr(self, shell)(options)
except ImportError:
pass
raise CommandError("Couldn't import {} interface.".format(shell))
def _validate_template(self, target):
"""
To ensure that the plugin template directory does exist.
"""
if not path.exists(target):
raise CommandError(
"Plugin template directory missing, reinstall biohub "
"or just manually create the plugin.")
def _validate_plugin_name(self, plugin_name):
"""
To validate the given `plugin_name`.
A `plugin_name` is considered valid if:
+ it's not empty.
+ it does not exist currently.
Afterwards the attributes `self.plugin_name` and `self.label` will be
set.
"""
if not plugin_name:
raise CommandError("You must provide a plugin_name.")
try:
importlib.import_module(plugin_name)
raise CommandError(
"The plugin_name %r is conflicted with another module, "
"please specify a new one." % plugin_name)
except ImportError:
pass
self.label = plugin_name.rsplit('.', 1)[-1]
self.plugin_name = plugin_name
def _ensure_path_exists(self, directory):
try:
os.makedirs(directory)
except OSError as e:
if e.errno == errno.EEXIST:
if not path.isdir(directory):
raise CommandError("'%s' already exists." % directory)
else:
raise CommandError(e)
def fail_installation(self):
raise CommandError("Plugin '%s' cannot be properly installed."
% self.plugin_name)
def test_tracking_disabled(self):
""" Test whether datalogger can bij stopped by changing track setting. """
datalogger_settings = DataloggerSettings.get_solo()
datalogger_settings.track = False
datalogger_settings.save()
# Datalogger should crash with error.
with self.assertRaisesMessage(CommandError, 'Datalogger tracking is DISABLED!'):
self._intercept_command_stdout('dsmr_datalogger')
def handle(self, **options):
if options['plain']:
warnings.warn(
"The --plain option is deprecated in favor of the -i python or --interface python option.",
RemovedInDjango20Warning
)
options['interface'] = 'python'
# Execute the command and exit.
if options['command']:
exec(options['command'])
return
# Execute stdin if it has anything to read and exit.
# Not supported on Windows due to select.select() limitations.
if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
exec(sys.stdin.read())
return
available_shells = [options['interface']] if options['interface'] else self.shells
for shell in available_shells:
try:
return getattr(self, shell)(options)
except ImportError:
pass
raise CommandError("Couldn't import {} interface.".format(shell))
def handle(self, **options):
flow_path = options.get('flow_path')
output = options.get('output')
graph_type = options.get('graph_type')
try:
file_path, flow_name = flow_path[0].rsplit('.', 1)
except ValueError as e:
raise CommandError("Please, specify the full path to your flow.") from e
try:
flows_file = importlib.import_module(file_path)
flow_cls = getattr(flows_file, flow_name)
except ImportError as e:
raise CommandError("Could not find file %s" % (file_path,)) from e
except (AttributeError, TypeError) as e:
raise CommandError("Could not find the flow with the name %s" % (flow_name,)) from e
grid = chart.calc_layout_data(flow_cls)
if graph_type == SVG:
graph = chart.grid_to_svg(grid)
if graph_type == BPMN:
graph = chart.grid_to_bpmn(grid)
if output != '':
with open(output, 'w') as f:
f.write(graph)
else:
self.stdout.write(graph)
def handle(self, **options):
if options['plain']:
warnings.warn(
"The --plain option is deprecated in favor of the -i python or --interface python option.",
RemovedInDjango20Warning
)
options['interface'] = 'python'
# Execute the command and exit.
if options['command']:
exec(options['command'])
return
# Execute stdin if it has anything to read and exit.
# Not supported on Windows due to select.select() limitations.
if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
exec(sys.stdin.read())
return
available_shells = [options['interface']] if options['interface'] else self.shells
for shell in available_shells:
try:
return getattr(self, shell)(options)
except ImportError:
pass
raise CommandError("Couldn't import {} interface.".format(shell))
def test_fingerprint_and_generate_flag_raises_error(self):
out = StringIO()
err = StringIO()
rgx = re.compile(r'^You cannot specify fingerprints and --generate '
r'when running this command$')
self.assertEquals(Key.objects.count(), 0)
with self.assertRaisesRegex(CommandError, rgx):
call_command('email_signing_key', TEST_KEY_FINGERPRINT,
generate=True, stdout=out, stderr=err)
self.assertEquals(out.getvalue(), '')
self.assertEquals(err.getvalue(), '')
def test_command_raises_error_when_manifest_doesnt_exist(self, read_manifest_mock):
with self.assertRaises(CommandError):
execute_command('deleteredundantstatic', '--noinput')