python类CommandError()的实例源码

test_management_update_user.py 文件源码 项目:SpongeAuth 作者: lukegb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_users_missing(settings):
    out = io.StringIO()

    user1 = UserFactory.create()
    user2 = UserFactory.create()

    with pytest.raises(CommandError) as exc:
        call_command('sso_ping_discourse', 'bar', user1.username, 'baz', stdout=out)

    assert user1.username not in out.getvalue()
    assert user2.username not in str(exc.value)
    assert user2.username not in out.getvalue()

    assert str(exc.value) in (
        'User mismatch: couldn\'t find "bar", "baz"',
        'User mismatch: couldn\'t find "baz", "bar"',
    )
sync-from-pdns.py 文件源码 项目:desec-stack 作者: desec-io 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        domains = Domain.objects.all()

        if options['domain-name']:
            domains = domains.filter(name__in=options['domain-name'])
            domain_names = domains.values_list('name', flat=True)

            for domain_name in options['domain-name']:
                if domain_name not in domain_names:
                    raise CommandError('{} is not a known domain'.format(domain_name))

        for domain in domains:
            self.stdout.write('%s ...' % domain.name, ending='')
            try:
                domain.sync_from_pdns()
                self.stdout.write(' synced')
            except Exception as e:
                if str(e).startswith('Could not find domain ') \
                        and domain.owner.captcha_required:
                    self.stdout.write(' skipped')
                else:
                    self.stdout.write(' failed')
                    msg = 'Error while processing {}: {}'.format(domain.name, e)
                    raise CommandError(msg)
renamefieldhistory.py 文件源码 项目:django-field-history 作者: grantmcconnaughey 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        model_name = options.get('model')
        from_field = options.get('from_field')
        to_field = options.get('to_field')

        if not model_name:
            raise CommandError('--model_name is a required argument')
        if not from_field:
            raise CommandError('--from_field is a required argument')
        if not to_field:
            raise CommandError('--to_field is a required argument')

        model = apps.get_model(model_name)
        content_type = ContentType.objects.get_for_model(model)
        field_histories = FieldHistory.objects.filter(content_type=content_type, field_name=from_field)

        self.stdout.write('Updating {} FieldHistory object(s)\n'.format(field_histories.count()))

        field_histories.update(field_name=to_field)
modelchemy.py 文件源码 项目:modelchemy 作者: rjusher 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle_subcommnad(self, root, cmd, database, options, subcmd, argvs):
        cmd_type = None

        if subcmd == 'create-all':
            cmd_type = CreateAllSubCommand(root, cmd, database,
                                      options, self.stdout, self.stderr,
                                      True)
        elif subcmd == 'drop-all':
            cmd_type = DropAllSubCommand(root, cmd, database, options,
                                          self.stdout, self.stderr, True)

        if cmd_type is not None:
            cmd_type.run_from_argv(argvs)
        else:
            if options.traceback:
                raise CommandError('Invalid Command.')

            self.stderr.write('CommandError: Invalid Command.')
            sys.exit(1)
copy.py 文件源码 项目:DjangoCMS 作者: farhan711 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        try:
            from_site = int(options.get('from_site', None))
        except Exception:
            from_site = settings.SITE_ID
        try:
            to_site = int(options.get('to_site', None))
        except Exception:
            to_site = settings.SITE_ID
        try:
            assert from_site != to_site
        except AssertionError:
            raise CommandError('Sites must be different')

        from_site = self.get_site(from_site)
        to_site = self.get_site(to_site)

        pages = Page.objects.drafts().filter(site=from_site, depth=1)

        with transaction.atomic():
            for page in pages:
                page.copy_page(None, to_site)
            self.stdout.write('Copied CMS Tree from SITE_ID {0} successfully to SITE_ID {1}.\n'.format(from_site.pk, to_site.pk))
run_migrations.py 文件源码 项目:edx-django-release-util 作者: edx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):
        migrator = MigrationSession(self.stderr, kwargs['database'])

        failure = False
        try:
            migrator.apply_all()
        except CommandError as e:
            self.stderr.write("Migration error: {}".format(e))
            failure = True

        state = dump_migration_session_state(migrator.state)
        self.stdout.write(state)
        if kwargs['output_file']:
            with open(kwargs['output_file'], 'w') as outfile:
                outfile.write(state)

        sys.exit(int(failure))
read_datajson.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        url = options['url']
        name = options['name']
        if url or name:
            if not name or not url:
                raise CommandError(MISSING_ARG_MSG)
            index_catalog(DataJson(url), name)
            return

        catalogs = yaml.load(requests.get(CATALOGS_INDEX).text)

        for catalog_id, values in catalogs.items():
            if values['federado'] and values['formato'] == 'json':
                try:
                    catalog = DataJson(values['url'])
                except (IOError, ValueError) as e:
                    logging.warn(READ_ERROR, catalog_id, e)
                    continue

                index_catalog(catalog, catalog_id)
startapp.py 文件源码 项目:USTC-Software-2017 作者: igemsoftware2017 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle(self, **options):
        app_name, target = options.pop('name'), options.pop('directory')

        # Set the top directory to root of Biohub instead of current
        # path.
        if target is None:
            target = os.path.join(settings.BIOHUB_DIR, app_name)

            try:
                os.makedirs(target)
            except OSError as e:
                if e.errno == errno.EEXIST:
                    message = "'%s' already exists" % target
                else:
                    message = e
                raise CommandError(message)

        # Use custom app template
        if options['template'] is None:
            options['template'] = os.path.join(
                settings.BIOHUB_CORE_DIR, 'app_template')

        super(StartappCommand, self).handle('app', app_name, target, **options)
test_command.py 文件源码 项目:django-secure-mail 作者: blag 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_no_matching_fingerprint_raises_error(self):
        out = StringIO()
        err = StringIO()

        missing_fingerprint = '01234567890ABCDEF01234567890ABCDEF01234567'
        rgx = re.compile(r'''^Key matching fingerprint '{fp}' not '''
                         r'''found.$'''.format(fp=missing_fingerprint))

        self.assertEquals(Key.objects.count(), 0)

        with self.assertRaisesRegex(CommandError, rgx):
            call_command('email_signing_key', missing_fingerprint,
                         stdout=out, stderr=err)

        self.assertEquals(out.getvalue(), '')
        self.assertEquals(err.getvalue(), '')
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_startdash_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startdash')
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_startpanel_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startpanel')
__init__.py 文件源码 项目:django-calaccess-processed-data 作者: california-civic-data-coalition 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_or_create_processed_version(self):
        """
        Get or create the current processed version.

        Return a tuple (ProcessedDataVersion object, created), where
        created is a boolean specifying whether a version was created.
        """
        # get the latest raw data version
        try:
            latest_raw_version = RawDataVersion.objects.latest(
                'release_datetime',
            )
        except RawDataVersion.DoesNotExist:
            raise CommandError(
                'No raw CAL-ACCESS data loaded (run `python manage.py '
                'updatecalaccessrawdata`).'
            )

        # check if latest raw version update completed
        if latest_raw_version.update_stalled:
            msg_tmp = 'Update to raw version released at %s did not complete'
            raise CommandError(
                msg_tmp % latest_raw_version.release_datetime.ctime()
            )

        return ProcessedDataVersion.objects.get_or_create(
            raw_version=latest_raw_version,
        )
shell.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, **options):
        if options['plain']:
            warnings.warn(
                "The --plain option is deprecated in favor of the -i python or --interface python option.",
                RemovedInDjango20Warning
            )
            options['interface'] = 'python'

        # Execute the command and exit.
        if options['command']:
            exec(options['command'])
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
            exec(sys.stdin.read())
            return

        available_shells = [options['interface']] if options['interface'] else self.shells

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
test_migration_dependencies_command.py 文件源码 项目:django-migrations-graph 作者: dizballanze 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_call_without_apps_arguments_raise_command_error(self):
        """ Command call without applications lists should return usage info """
        with self.assertRaises(CommandError):
            call_command(self.COMMAND_NAME)
test_subcommand.py 文件源码 项目:django-subcommand2 作者: CptLemming 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_parent_command_throws_exception_if_no_subcommand(self):
        with self.assertRaises(CommandError):
            call_command(
                'parent_command', stdout=StringIO(), interactive=False)
alter_data.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_past_ungraded_course_run(user=None, course=None, now=None):
    """Loop through past course runs and find one without grade data"""
    past_runs = CourseRun.objects.filter(
        course=course,
        end_date__lt=now,
    ).exclude(end_date=None).order_by('-end_date').all()
    for past_run in past_runs:
        if not (CachedCurrentGradeHandler(user).exists(past_run) or
                FinalGrade.objects.filter(user=user, course_run=past_run).exists()):
            return past_run
    raise CommandError("Can't find past run that isn't already passed/failed for Course '{}'".format(course.title))
backfill_discussion_users.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):  # pylint: disable=unused-argument

        if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False):
            raise CommandError('OPEN_DISCUSSIONS_USER_SYNC is set to False (so disabled).')

        sync_discussion_users.delay()
        self.stdout.write(self.style.SUCCESS('Async job to backfill users submitted'))
freeze_final_grades.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):  # pylint: disable=unused-argument
        edx_course_key = kwargs.get('edx_course_key')
        try:
            run = CourseRun.objects.get(edx_course_key=edx_course_key)
        except CourseRun.DoesNotExist:
            raise CommandError('Course Run for course_id "{}" does not exist'.format(edx_course_key))
        try:
            can_freeze = run.can_freeze_grades
        except ImproperlyConfigured:
            raise CommandError('Course Run for course_id "{}" is missing the freeze date'.format(edx_course_key))
        if not can_freeze:
            raise CommandError('Course Run for course_id "{}" cannot be frozen yet'.format(edx_course_key))
        if CourseRunGradingStatus.is_complete(run):
            self.stdout.write(
                self.style.SUCCESS(
                    'Final grades for course "{0}" are already complete'.format(edx_course_key)
                )
            )
            return

        freeze_course_run_final_grades.delay(run.id)
        self.stdout.write(
            self.style.SUCCESS(
                'Successfully submitted async task to freeze final grades for course "{0}"'.format(edx_course_key)
            )
        )
shell.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def handle(self, **options):
        if options['plain']:
            warnings.warn(
                "The --plain option is deprecated in favor of the -i python or --interface python option.",
                RemovedInDjango20Warning
            )
            options['interface'] = 'python'

        # Execute the command and exit.
        if options['command']:
            exec(options['command'])
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]:
            exec(sys.stdin.read())
            return

        available_shells = [options['interface']] if options['interface'] else self.shells

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
muttest.py 文件源码 项目:django-mutpy 作者: phihos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_apps(apps):
    """Check if a list of apps is entirely contained in the list of installed apps."""
    for app in apps:
        installed_apps = settings.INSTALLED_APPS
        if app not in installed_apps:
            raise CommandError('App %s not contained in INSTALLED_APPS %s'
                               % (app, settings.INSTALLED_APPS))
tests.py 文件源码 项目:django-field-history 作者: grantmcconnaughey 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_renamefieldhistory_model_arg_is_required(self):
        Person.objects.create(name='Initial Name')

        self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)

        with self.assertRaises(CommandError):
            call_command('renamefieldhistory', from_field='name', to_field='name2')
tests.py 文件源码 项目:django-field-history 作者: grantmcconnaughey 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_renamefieldhistory_from_field_arg_is_required(self):
        Person.objects.create(name='Initial Name')

        self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)

        with self.assertRaises(CommandError):
            call_command('renamefieldhistory', model='tests.Person', to_field='name2')
tests.py 文件源码 项目:django-field-history 作者: grantmcconnaughey 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_renamefieldhistory_to_field_arg_is_required(self):
        Person.objects.create(name='Initial Name')

        self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1)

        with self.assertRaises(CommandError):
            call_command('renamefieldhistory', model='tests.Person', from_field='name')
zipdata.py 文件源码 项目:tumanov_castleoaks 作者: Roamdev 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        date = datetime.now().date()

        backup_name = '{}.zip'.format(date.strftime('%Y_%m_%d'))
        if args:
            backup_dir = os.path.abspath(args[0])
        else:
            backup_dir = settings.BACKUP_ROOT

        if not os.path.exists(backup_dir):
            raise CommandError('output directory does not exists')

        backup_path = os.path.join(backup_dir, backup_name)

        with zipfile.ZipFile(backup_path, 'w') as ziph:
            for root, dirs, files in os.walk(settings.MEDIA_ROOT):
                for file in files:
                    abspath = os.path.abspath(os.path.join(root, file))
                    relpath = os.path.relpath(abspath, settings.MEDIA_ROOT)
                    ziph.write(abspath, os.path.join('media', relpath))

            # db dump
            dump_path = os.path.join(settings.BASE_DIR, 'dump.json')
            call_command('dump', output=dump_path)
            ziph.write(dump_path, 'dump.json')

        os.unlink('dump.json')
        self.stdout.write('backup saved to "%s"' % backup_path)
process_fxa_data.py 文件源码 项目:basket 作者: mozmeao 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if not all(getattr(settings, name) for name in ['FXA_ACCESS_KEY_ID',
                                                        'FXA_SECRET_ACCESS_KEY',
                                                        'FXA_S3_BUCKET']):
            raise CommandError('FXA S3 Bucket access not configured')

        main()
        if options['cron']:
            log('cron schedule starting')
            schedule.start()
process_maintenance_queue.py 文件源码 项目:basket 作者: mozmeao 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if settings.MAINTENANCE_MODE:
            raise CommandError('Command unavailable in maintenance mode')

        count = 0
        for task in QueuedTask.objects.all()[:options['num_tasks']]:
            task.retry()
            count += 1

        print '{} processed. {} remaining.'.format(count, QueuedTask.objects.count())
base.py 文件源码 项目:modelchemy 作者: rjusher 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run_from_argv(self, argv):
        """
        Set up any environment changes requested (e.g., Python path
        and Django settings), then run this command. If the
        command raises a ``CommandError``, intercept it and print it sensibly
        to stderr. If the ``--traceback`` option is present or the raised
        ``Exception`` is not ``CommandError``, raise it.
        """

        self._called_from_command_line = True
        parser = self.create_parser()

        options = parser.parse_args(argv)
        cmd_options = vars(options)
        # Move positional args out of options to mimic legacy optparse
        args = cmd_options.pop('args', ())
        try:
            self.execute(*args, **cmd_options)
        except Exception as e:
            if self._django_options.traceback or not isinstance(e, CommandError):
                raise

            # SystemCheckError takes care of its own formatting.
            if isinstance(e, SystemCheckError):
                self.stderr.write(str(e), lambda x: x)
            else:
                self.stderr.write('%s: %s' % (e.__class__.__name__, e))
            sys.exit(1)
        finally:
            try:
                connections.close_all()
            except ImproperlyConfigured:
                # Ignore if connections aren't setup at this point (e.g. no
                # configured settings).
                pass
api.py 文件源码 项目:kolibri 作者: learningequality 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _localimport(drive_id, channel_id, node_ids=None, update_progress=None, check_for_cancel=None):
    drives = get_mounted_drives_with_channel_info()
    drive = drives[drive_id]
    # copy channel's db file then copy all the content files from storage dir

    available_channel_ids = [c["id"] for c in drive.metadata["channels"]]
    assert channel_id in available_channel_ids, "The given channel was not found in the drive."

    try:
        call_command(
            "importchannel",
            "local",
            channel_id,
            drive.datafolder,
            update_progress=update_progress,
            check_for_cancel=check_for_cancel
        )
        call_command(
            "importcontent",
            "local",
            channel_id,
            drive.datafolder,
            node_ids=node_ids,
            update_progress=update_progress,
            check_for_cancel=check_for_cancel
        )
    except UserCancelledError:
        try:
            call_command("deletechannel", channel_id, update_progress=update_progress)
        except CommandError:
            pass
        raise
transtool_import.py 文件源码 项目:django-transtool 作者: liminspace 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if not TRANSTOOL_DL_URL or not TRANSTOOL_DL_KEY:
            raise CommandError('Please, set TRANSTOOL_DL_URL and TRANSTOOL_DL_KEY settings.')
        if options['mo_only'] and options['po_only']:
            raise CommandError('Use only --mo-only or --po-only but not both.')
        self.stdout.write('Download file: Send POST request to {}'.format(TRANSTOOL_DL_URL))
        r = requests.post(TRANSTOOL_DL_URL, {
            'key': TRANSTOOL_DL_KEY,
            'po-only': str(int(options['po_only'])),
            'mo-only': str(int(options['mo_only'])),
        }, stream=True)
        if r.status_code != 200:
            self.stdout.write('Request status code is not 200: {}'.format(r.status_code))
            self.stdout.write('Fail.', ending='\n\n')
            sys.exit(1)
        file_content = BytesIO()
        for chunk in r.iter_content(chunk_size=(16 * 1024)):
            file_content.write(chunk)
        file_content.seek(0, os.SEEK_END)
        file_content_size = file_content.tell()
        self.stdout.write('Downloaded file {} {} bytes'.format(r.headers['Content-Type'], file_content_size))
        if options['po_only']:
            exts = ['.po']
        elif options['mo_only']:
            exts = ['.mo']
        else:
            exts = ['.po', '.mo']
        diff_info = self._get_diff_info(file_content, exts)
        if options['diff']:
            self.print_diff_info(diff_info)
        else:
            self.copy_files(diff_info, file_content)
        self.stdout.write('Done.', ending='\n\n')
startapp.py 文件源码 项目:django-template 作者: DheerendraRathor 项目源码 文件源码 阅读 112 收藏 0 点赞 0 评论 0
def handle(self, **options):
        directory = options.get('directory', None)
        # directory key is already present in options which is None.
        name = options['name']

        full_path = None
        if not directory:
            """
            If directory is not provided then use DEFAULT_APPS_DIRECTORY.

            This block creates DEFAULT_APPS_DIRECTORY/app_name directory and
            pass that path to default startapp command.
            """
            directory = os.path.join(settings.BASE_DIR, settings.DEFAULT_APPS_DIRECTORY)
            full_path = os.path.join(directory, name)
            if not os.path.exists(full_path):
                os.makedirs(full_path)
            options['directory'] = full_path

        try:
            super().handle(**options)
            if full_path:
                """
                If apps directory is used then change the app name to DEFAULT_APPS_DIRECTORY.app_name
                in AppConfig
                """
                apps_py = os.path.join(full_path, 'apps.py')
                if os.path.isfile(apps_py):
                    for line in fileinput.input(apps_py, inplace=True):
                        line = line.replace("'%s'" % name, "'%s.%s'" % (settings.DEFAULT_APPS_DIRECTORY, name))
                        print(line, end='')
        except CommandError:
            if full_path:
                shutil.rmtree(full_path)
            raise


问题


面经


文章

微信
公众号

扫码关注公众号