python类CommandError()的实例源码

trigger.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        try:
            trig = data_root().trigger_ids[options['trigger_id']]
        except KeyError:
            raise CommandError('Trigger %s not found' % options['trigger_id'])
        trig.run(access_context='local-cli')
import-archives.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        try:
            client = data_root().clients[options['client']]
        except KeyError:
            raise CommandError('Client %s not found' % options['client'])

        for repository in data_root().repositories:
            if repository.name == options['repository']:
                break
            if repository.id.startswith(options['repository']):
                break
            if repository.url == options['repository']:
                break
        else:
            raise CommandError('Repository %s not found' % options['repository'])

        with open_repository(repository) as borg_repository:
            manifest, key = Manifest.load(borg_repository)
            with Cache(borg_repository, key, manifest, lock_wait=1) as cache:
                names = self.find_archives(manifest, options['archive'], regex=options['regex'])
                imported = 0

                pi = ProgressIndicatorPercent(msg='Importing archives %4.1f %%: %s', total=len(names), step=0.1)
                for name in names:
                    imported += self.import_archive(manifest, cache, repository, name, client)
                    pi.show(info=[name])
                pi.finish()

        print('Imported %d archives.' % imported, file=sys.stderr)
import-archives.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def find_archives(self, manifest, archive, regex):
        if regex:
            names = []
            for name in manifest.archives:
                if re.fullmatch(archive, name):
                    names.append(name)
            return names
        else:
            try:
                manifest.archives[archive]
                return [archive]
            except KeyError:
                raise CommandError('Archive %s not found' % archive)
process_queue.py 文件源码 项目:django-eb-sqs 作者: cuda-networks 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if not options['queue_names']:
            raise CommandError('Queue names (--queues) not specified')

        queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]

        logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))

        sqs = boto3.resource(
            'sqs',
            region_name=settings.AWS_REGION,
            config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
        )
        queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

        logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))

        worker = WorkerFactory.default().create()

        while True:
            for queue in queues:
                messages = queue.receive_messages(
                    MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
                    WaitTimeSeconds=settings.WAIT_TIME_S,
                )

                for msg in messages:
                    self._process_message(msg, worker)
run_eb_sqs_worker.py 文件源码 项目:django-eb-sqs 作者: cuda-networks 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if not options['url']:
            raise CommandError('Worker endpoint url parameter (--url) not found')

        if not options['queue_name']:
            raise CommandError('Queue name (--queue) not specified')

        url = options['url']
        queue_name = options['queue_name']
        retry_limit = max(int(options['retry_limit']), 1)

        try:
            self.stdout.write('Connect to SQS')
            sqs = boto3.resource(
                'sqs',
                region_name=settings.AWS_REGION,
                config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
            )
            queue = sqs.get_queue_by_name(QueueName=queue_name)
            self.stdout.write('> Connected')

            while True:
                messages = queue.receive_messages(
                    MaxNumberOfMessages=1,
                    WaitTimeSeconds=20
                )

                if len(messages) == 0:
                    break

                for msg in messages:
                    self.stdout.write('Deliver message {}'.format(msg.message_id))
                    if self._process_message_with_retry(url, retry_limit, msg):
                        self.stdout.write('> Delivered')
                    else:
                        self.stdout.write('> Delivery failed (retry-limit reached)')
                    msg.delete()

            self.stdout.write('Message processing finished')
        except ConnectionError:
            self.stdout.write('Connection to {} failed. Message processing failed'.format(url))
batchimport.py 文件源码 项目:sadm 作者: prologin 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if options['file'] is None:
            raise CommandError('Missing --file')
        names = []
        with open(options['file']) as fp:
            lines = [l for l in fp.read().split('\n') if l]
            for l in lines:
                if options['logins']:
                    names.append(l.strip())
                else:
                    firstname, lastname = [f.strip() for f in l.split('\t')]
                    names.append((firstname, lastname))

        create_users(names, options)
create_ussd_app.py 文件源码 项目:ussd_airflow 作者: mwaaas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        try:
            if len(options.get('ussd_app_name')[0].split()) > 1:
                raise CommandError
            app_name = options.get('ussd_app_name')[0]
            call_command('startapp', app_name, template=path)
        except CommandError:
            print('Provide a valid django App Name as documented here: '
                  ' https://docs.djangoproject.com/en/1.10/ref/django-admin/')
test_management.py 文件源码 项目:DjangoCMS 作者: farhan711 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_copy_bad_languages(self):
        out = StringIO()
        with self.assertRaises(CommandError) as command_error:
            management.call_command(
                'cms', 'copy', 'lang', '--from-lang=it', '--to-lang=fr', interactive=False,
                stdout=out
            )

        self.assertEqual(str(command_error.exception), 'Both languages have to be present in settings.LANGUAGES and settings.CMS_LANGUAGES')
copy.py 文件源码 项目:DjangoCMS 作者: farhan711 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_site(self, site_id):
        if site_id:
            try:
                return Site.objects.get(pk=site_id)
            except (ValueError, Site.DoesNotExist):
                raise CommandError('There is no site with given site id.')
        else:
            return None
shell.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, **options):
        if options['plain']:
            warnings.warn(
                "The --plain option is deprecated in favor of the -i python or --interface python option.",
                RemovedInDjango20Warning
            )
            options['interface'] = 'python'

        # Execute the command and exit.
        if options['command']:
            exec(options['command'])
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
            exec(sys.stdin.read())
            return

        available_shells = [options['interface']] if options['interface'] else self.shells

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
newplugin.py 文件源码 项目:USTC-Software-2017 作者: igemsoftware2017 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _validate_template(self, target):
        """
        To ensure that the plugin template directory does exist.
        """

        if not path.exists(target):
            raise CommandError(
                "Plugin template directory missing, reinstall biohub "
                "or just manually create the plugin.")
newplugin.py 文件源码 项目:USTC-Software-2017 作者: igemsoftware2017 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _validate_plugin_name(self, plugin_name):
        """
        To validate the given `plugin_name`.

        A `plugin_name` is considered valid if:

         + it's not empty.
         + it does not exist currently.

        Afterwards the attributes `self.plugin_name` and `self.label` will be
        set.
        """

        if not plugin_name:
            raise CommandError("You must provide a plugin_name.")

        try:
            importlib.import_module(plugin_name)

            raise CommandError(
                "The plugin_name %r is conflicted with another module, "
                "please specify a new one." % plugin_name)
        except ImportError:
            pass

        self.label = plugin_name.rsplit('.', 1)[-1]
        self.plugin_name = plugin_name
newplugin.py 文件源码 项目:USTC-Software-2017 作者: igemsoftware2017 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _ensure_path_exists(self, directory):

        try:
            os.makedirs(directory)
        except OSError as e:
            if e.errno == errno.EEXIST:
                if not path.isdir(directory):
                    raise CommandError("'%s' already exists." % directory)
            else:
                raise CommandError(e)
_base.py 文件源码 项目:USTC-Software-2017 作者: igemsoftware2017 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fail_installation(self):
        raise CommandError("Plugin '%s' cannot be properly installed."
                           % self.plugin_name)
test_generic.py 文件源码 项目:dsmr-reader 作者: dennissiemensma 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_tracking_disabled(self):
        """ Test whether datalogger can bij stopped by changing track setting. """
        datalogger_settings = DataloggerSettings.get_solo()
        datalogger_settings.track = False
        datalogger_settings.save()

        # Datalogger should crash with error.
        with self.assertRaisesMessage(CommandError, 'Datalogger tracking is DISABLED!'):
            self._intercept_command_stdout('dsmr_datalogger')
shell.py 文件源码 项目:django-rtc 作者: scifiswapnil 项目源码 文件源码 阅读 81 收藏 0 点赞 0 评论 0
def handle(self, **options):
        if options['plain']:
            warnings.warn(
                "The --plain option is deprecated in favor of the -i python or --interface python option.",
                RemovedInDjango20Warning
            )
            options['interface'] = 'python'

        # Execute the command and exit.
        if options['command']:
            exec(options['command'])
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
            exec(sys.stdin.read())
            return

        available_shells = [options['interface']] if options['interface'] else self.shells

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
flow_graph.py 文件源码 项目:viewflow-extensions 作者: Thermondo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, **options):
        flow_path = options.get('flow_path')
        output = options.get('output')
        graph_type = options.get('graph_type')
        try:
            file_path, flow_name = flow_path[0].rsplit('.', 1)
        except ValueError as e:
            raise CommandError("Please, specify the full path to your flow.") from e
        try:
            flows_file = importlib.import_module(file_path)
            flow_cls = getattr(flows_file, flow_name)
        except ImportError as e:
            raise CommandError("Could not find file %s" % (file_path,)) from e
        except (AttributeError, TypeError) as e:
            raise CommandError("Could not find the flow with the name %s" % (flow_name,)) from e

        grid = chart.calc_layout_data(flow_cls)
        if graph_type == SVG:
            graph = chart.grid_to_svg(grid)
        if graph_type == BPMN:
            graph = chart.grid_to_bpmn(grid)
        if output != '':
            with open(output, 'w') as f:
                f.write(graph)
        else:
            self.stdout.write(graph)
shell.py 文件源码 项目:LatinSounds_AppEnviaMail 作者: G3ek-aR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle(self, **options):
        if options['plain']:
            warnings.warn(
                "The --plain option is deprecated in favor of the -i python or --interface python option.",
                RemovedInDjango20Warning
            )
            options['interface'] = 'python'

        # Execute the command and exit.
        if options['command']:
            exec(options['command'])
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
            exec(sys.stdin.read())
            return

        available_shells = [options['interface']] if options['interface'] else self.shells

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
test_command.py 文件源码 项目:django-secure-mail 作者: blag 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_fingerprint_and_generate_flag_raises_error(self):
        out = StringIO()
        err = StringIO()

        rgx = re.compile(r'^You cannot specify fingerprints and --generate '
                         r'when running this command$')

        self.assertEquals(Key.objects.count(), 0)

        with self.assertRaisesRegex(CommandError, rgx):
            call_command('email_signing_key', TEST_KEY_FINGERPRINT,
                         generate=True, stdout=out, stderr=err)

        self.assertEquals(out.getvalue(), '')
        self.assertEquals(err.getvalue(), '')
test_commands.py 文件源码 项目:django-cloudinary-storage 作者: klis87 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_command_raises_error_when_manifest_doesnt_exist(self, read_manifest_mock):
        with self.assertRaises(CommandError):
            execute_command('deleteredundantstatic', '--noinput')


问题


面经


文章

微信
公众号

扫码关注公众号