python类call_command()的实例源码

test_management.py 文件源码 项目:DjangoCMS 作者: farhan711 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_copy_filled_placeholder(self):
        """
        If an existing title in the target language has plugins in a placeholder
        that placeholder is skipped
        """
        site = 1
        number_start_plugins = CMSPlugin.objects.all().count()

        # create an empty title language
        root_page = Page.objects.on_site(site).get_home()
        create_title("de", "root page de", root_page)
        ph = root_page.placeholders.get(slot="body")
        add_plugin(ph, "TextPlugin", "de", body="Hello World")

        out = StringIO()
        management.call_command(
            'cms', 'copy', 'lang', '--from-lang=en', '--to-lang=de', interactive=False, stdout=out
        )

        self.assertEqual(CMSPlugin.objects.filter(language='en').count(), number_start_plugins)
        # one placeholder (with 7 plugins) is skipped, so the difference must be 6
        self.assertEqual(CMSPlugin.objects.filter(language='de').count(), number_start_plugins-6)
test_runscheduler.py 文件源码 项目:tts-bug-bounty-dashboard 作者: 18F 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def call_runscheduler(loops=1, mock_call_command=None):
    ctx = {'sleep_count': 0}

    def fake_sleep(seconds):
        ctx['sleep_count'] += 1
        if ctx['sleep_count'] > loops:
            raise KeyboardInterrupt()

    if mock_call_command is None:
        mock_call_command = mock.MagicMock()

    with mock.patch.object(runscheduler, 'call_command', mock_call_command):
        with mock.patch.object(runscheduler, 'logger') as mock_logger:
            with mock.patch('time.sleep', fake_sleep):
                with pytest.raises(KeyboardInterrupt):
                    call_command('runscheduler')
            return mock_call_command, mock_logger
test_redirect.py 文件源码 项目:wagtail-bakery 作者: moorinteractive 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_page_has_redirect_response(site):
    redirect_page = RedirectPage.objects.create(
        depth=1,
        path='0002',
        title='Page',
        slug='page',
    )

    # Make redirect page the root page
    site.root_page = redirect_page
    site.save()

    # Build static files
    management.call_command('build', '--skip-static', '--skip-media')
    assert os.path.exists(os.path.join(settings.BUILD_DIR, 'index.html'))

    # Check if meta tag is present
    content = open(os.path.join(settings.BUILD_DIR, 'index.html')).read()
    assert '<meta http-equiv="refresh" content="1; url=http://www.example.com/">' in content # noqa
test_commands.py 文件源码 项目:django-gitversions 作者: michaelkuty 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_04_versioner(self):

        # remove local files
        #versioner.backend.destroy()

        management.call_command('gitversions',
                                format='json',
                                indent=4)

        # check local uncomited changes
        self.assertEqual(versioner.backend.check(), True)

        versioner.backend.commit('Initial Commit')

        self.assertEqual(versioner.backend.check(), False)

        # cleanup
setup.py 文件源码 项目:vaultier 作者: Movile 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        print ">>> Initializing your database"
        try:
            management.call_command('syncdb')
            management.call_command('migrate')
            try:
                # do we need cache table?
                cache.get('', None)
            except ProgrammingError:
                # yes we do
                management.call_command('createcachetable', 'vaultier_cache')

            # public static files
            management.call_command('collectstatic', interactive=False)
        except OperationalError as e:
            msg = ">>> Your DB is not configured correctly: {}"
            print msg.format(e.message)
        else:
            if options.get('no_statistics'):
                task_statistics_collector.delay()
            print (">>> DB is initialized, you can now try to run Vaultier "
                   "using 'vaultier runserver'")
models.py 文件源码 项目:oim-cms 作者: parksandwildlife 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def user_logged_in_handler(sender, request, user, **kwargs):
    logging.debug('user_logged_in_handler')
    request.session.save()
    usersession, created = UserSession.objects.get_or_create(user=user, session_id=request.session.session_key)
    usersession.ip = get_ip(request)
    if DepartmentUser.objects.filter(email__iexact=user.email).exists():
        logging.debug('user_logged_in_handler departmentuser {}'.format(user.email))
        usersession.department_user = DepartmentUser.objects.filter(email__iexact=user.email)[0]
        if (user.username != usersession.department_user.username):
            test = get_user_model().objects.filter(username=usersession.department_user.username)
            if test.exists():
                test.delete()
            user.username = usersession.department_user.username
            user.save()
    usersession.save()
    logging.debug('user_logged_in_handler saving stuff')
    management.call_command("clearsessions", verbosity=0)
tests.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_clearsessions_command(self):
        """
        Test clearsessions command for clearing expired sessions.
        """
        self.assertEqual(0, Session.objects.count())

        # One object in the future
        self.session['foo'] = 'bar'
        self.session.set_expiry(3600)
        self.session.save()

        # One object in the past
        other_session = self.backend()
        other_session['foo'] = 'bar'
        other_session.set_expiry(-3600)
        other_session.save()

        # Two sessions are in the database before clearsessions...
        self.assertEqual(2, Session.objects.count())
        management.call_command('clearsessions')
        # ... and one is deleted.
        self.assertEqual(1, Session.objects.count())
cron.py 文件源码 项目:openkamer 作者: openkamer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_json_dump():
        filepath = os.path.join(settings.DBBACKUP_STORAGE_OPTIONS['location'], 'openkamer-' + str(datetime.date.today()) + '.json')
        filepath_compressed = filepath + '.gz'
        with open(filepath, 'w') as fileout:
            management.call_command(
                'dumpdata',
                '--all',
                '--natural-foreign',
                '--exclude', 'auth.permission',
                '--exclude', 'contenttypes',
                'person',
                'parliament',
                'government',
                'document',
                'stats',
                'website',
                stdout=fileout
            )
        with open(filepath, 'rb') as f_in:
            with gzip.open(filepath_compressed, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(filepath)
        BackupDaily.remove_old_json_dumps(days_old=30)
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_createadmin_prompts_for_password_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        email = factory.make_email_address()
        self.patch(createadmin, 'prompt_for_password').return_value = password
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, email=email,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 79 收藏 0 点赞 0 评论 0
def test_createadmin_prompts_for_username_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(createadmin, 'prompt_for_username').return_value = username
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', password=password, email=email,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_createadmin_prompts_for_email_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(createadmin, 'prompt_for_email').return_value = email
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, password=password,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_createadmin_prompts_for_ssh_import_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(
            createadmin, 'prompt_for_ssh_import').return_value = ssh_import
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, password=password, email=email,
            stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_apikey_generates_key(self):
        stderr = StringIO()
        stdout = StringIO()
        user = factory.make_User()
        num_keys = len(user.userprofile.get_authorisation_tokens())
        call_command(
            'apikey', username=user.username, generate=True, stderr=stderr,
            stdout=stdout)
        self.assertThat(stderr, IsEmpty)
        keys_after = user.userprofile.get_authorisation_tokens()
        expected_num_keys = num_keys + 1
        self.assertEqual(expected_num_keys, len(keys_after))
        expected_token = user.userprofile.get_authorisation_tokens()[1]
        expected_string = convert_tuple_to_string(
            get_creds_tuple(expected_token)) + '\n'
        self.assertEqual(expected_string, stdout.getvalue())
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_api_key_rejects_deletion_of_nonexistent_key(self):
        stderr = StringIO()
        user = factory.make_User()
        existing_token = get_one(
            user.userprofile.get_authorisation_tokens())
        token_string = convert_tuple_to_string(
            get_creds_tuple(existing_token))
        call_command(
            'apikey', username=user.username, delete=token_string,
            stderr=stderr)
        self.assertThat(stderr, IsEmpty)

        # Delete it again. Check that there's a sensible rejection.
        error_text = assertCommandErrors(
            self, 'apikey', username=user.username, delete=token_string)
        self.assertIn(
            "No matching api key found", error_text)
test_commands.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_api_key_rejects_update_of_nonexistent_key(self):
        stderr = StringIO()
        user = factory.make_User()
        fake_api_key_name = "Test Key Name"
        existing_token = get_one(
            user.userprofile.get_authorisation_tokens())
        token_string = convert_tuple_to_string(
            get_creds_tuple(existing_token))
        call_command(
            'apikey', username=user.username, delete=token_string,
            stderr=stderr)
        self.assertThat(stderr, IsEmpty)

        # Try to update the deleted token.
        error_text = assertCommandErrors(
            self, 'apikey', username=user.username, update=token_string,
            api_key_name=fake_api_key_name)
        self.assertIn(
            "No matching api key found", error_text)
test_commands_dbupgrade.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_django_run_renames_piston_tables_if_piston_tables_exists(self):
        self.patch(
            dbupgrade_command, "_south_was_performed").return_value = True
        self.patch(dbupgrade_command, "_find_tables").return_value = [
            "piston_consumer",
            "piston_token",
        ]
        mock_rename = self.patch(
            dbupgrade_command, "_rename_piston_to_piston3")
        mock_call = self.patch(dbupgrade_module, "call_command")
        call_command('dbupgrade', django=True)
        self.assertThat(
            mock_rename, MockCalledOnceWith("default", ["consumer", "token"]))
        self.assertThat(
            mock_call, MockCalledOnceWith(
                "migrate", interactive=False, fake_initial=True))
test_commands.py 文件源码 项目:django-onmydesk 作者: knowledge4life 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_call_must_have_first_and_last_message_correct(self):
        scheduler = Scheduler(report='my_report_class',
                              periodicity=Scheduler.PER_MON_SUN)
        scheduler.save()

        out = StringIO()
        management.call_command('scheduler_process', stdout=out)

        first_line, last_line, blank_line = (
            out.getvalue().split('\n')[0],
            out.getvalue().split('\n')[-2],
            out.getvalue().split('\n')[-1]
        )

        first_message = 'Starting scheduler process'
        last_message = 'Scheduler #{} processed'.format(scheduler.id)

        self.assertIn(first_message, first_line)
        self.assertIn(last_message, last_line)
        self.assertEqual(blank_line, '')
test_sameas.py 文件源码 项目:balafon 作者: ljean 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_find_same_as_with_group(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))
        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 2)
        self.assertFalse(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
test_sameas.py 文件源码 项目:balafon 作者: ljean 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_find_same_as_with_existing_group(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")

        gr = models.Group.objects.create(name="SameAs")
        gr.contacts.add(contact1)
        gr.save()

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))

        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 3)
        self.assertTrue(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
test_sameas.py 文件源码 项目:balafon 作者: ljean 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_find_same_as_with_no_name(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact4 = mommy.make(models.Contact, firstname="", lastname="")
        contact5 = mommy.make(models.Contact, firstname="", lastname="")

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))

        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 2)
        self.assertFalse(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
        self.assertFalse(contact4 in qs[0].contacts.all())
        self.assertFalse(contact5 in qs[0].contacts.all())
0016_populate_content_rendered.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def forward(apps, schema_editor):
    # Removed as it's causing exceptions and is not needed except for old nodes
    # which should have updated by now
    # call_command("runscript", "0016_populate_content_rendered")
    pass
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_startdash_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startdash')
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_startdash_usage_correct(self, handle):
        call_command('startdash', 'test_dash')

        handle.assert_called_with(dash_name='test_dash',
                                  extensions=["py", "tmpl", "html", "js",
                                              "css"],
                                  files=[], no_color=False, pythonpath=None,
                                  settings=None, skip_checks=True, target=None,
                                  template=None, traceback=False, verbosity=1)
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_startpanel_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startpanel')
test_commands.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_startpanel_usage_correct(self, handle):
        call_command('startpanel', 'test_dash', '--dashboard=foo.bar')

        handle.assert_called_with(panel_name='test_dash', dashboard='foo.bar',
                                  extensions=["py", "tmpl", "html"],
                                  files=[], no_color=False, pythonpath=None,
                                  settings=None, skip_checks=True, target=None,
                                  template=None, traceback=False, verbosity=1)
test_import.py 文件源码 项目:offenewahlen-nrw17 作者: OKFNat 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Set up the test class.

        For the import to run an election has to exist in the
        database.
        """

        call_command('import_basedata')
        test_path = setup_path = os.path.dirname(os.path.realpath(__file__))
        results_file = test_path + '/data/example_01.json'
        mapping_file = test_path + '/data/example_config.json'
        call_command('import_results', results_file, mapping_file)
test_import.py 文件源码 项目:offenewahlen-nrw17 作者: OKFNat 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_import_json_result_data_with_mapping(self):
        """
        Tests an json import of results.
        """

        number_of_results = RawData.objects.count()
        self.assertEqual(number_of_results, 1)

        number_of_results = PollingStationResult.objects.count()
        self.assertEqual(number_of_results, 4)

        # call_command('import_results', local_data_file, location='local',
        #   file_type='json', mapping_file=mapping_file)
        # number_of_results = RawData.objects.count()
        # self.assertEqual(number_of_results, 2)

    #def test_import_base_data(self):
    #
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual()

    # def test_import_xml_result_data_with_mapping(self):
    #   """
    #   Tests an xml import.
    #   """
    #   test_path = os.path.dirname(os.path.realpath(__file__))
    #   local_data_file = test_path + '/data/example_01.xml'
    #   mapping_file = test_path + '/data/example_mapping.json'

    #   call_command('import_results', local_data_file, location='local',
    #       file_type='xml', mapping_file=mapping_file)
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual(number_of_results, 1)

    #   call_command('import_results', local_data_file, location='local',
    #       file_type='xml', mapping_file=mapping_file)
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual(number_of_results, 2)
api.py 文件源码 项目:Dwarf 作者: Dwarf-Community 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sync_database():
        management.call_command('makemigrations', 'dwarf')
        management.call_command('migrate', 'dwarf')
models.py 文件源码 项目:mccelections 作者: mcclatchy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def calculate_vote(sender, instance, **kwargs):
    ## don't fire if it's a gdoc result
    if instance.gdoc_import != True:
        electiondate_arg = str(instance.electiondate)
        call_command('calculate_vote', electiondate_arg)
        ## needs to be written <-- not doing this bc it would mess with manual results w/ multiple winners, winners that req 2/3, etc
        # call_command('declare_winner')


## NEEDS TO STAY models.model so PSQL import works
election_auto_w_gdoc.py 文件源码 项目:mccelections 作者: mcclatchy 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def snapshot(electiondate_string):
    ## *** need to have this on S3 or other external location to not fill up server ***
        ## NOTE: instead of saving anew, just use/copy the tmp file?
        ## should we snapshot manual results from gdoc? use separate parent dir to avoid conflict
    if mccelectionsenv == "local":
        # call_command('snapshot_results')
        ## copy and rename results.csv w/ cp command
        file_path = os.environ["SAVER_PATH"]
        orgin = file_path + "/tmp/results.csv"
        now = timezone.localtime(timezone.now())
        save_date = now.date()
        save_date_string = str(save_date)
        timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')

        snapshot_filename = "results%s.csv" % (timestamp)
        destination_dir = "%s/%s/%s" % (file_path, electiondate_string, save_date_string)
        destination = "%s/%s" % (destination_dir, snapshot_filename)

        mkdir = "mkdir -p %s" % (destination_dir)
        snapshot = "cp %s %s" % (origin, destination)

        ## making the dir, if it's not there
        call(mkdir, shell=True)
        message = "Making new directory, if needed:\n%s" % (destination_dir)
        slackbot(message)
        ## actual snapshot executed
        call(snapshot, shell=True)
        message = "Snapshotting"
        slackbot(message)
    # else:
        # snapshot to S3


问题


面经


文章

微信
公众号

扫码关注公众号