def test_copy_filled_placeholder(self):
"""
If an existing title in the target language has plugins in a placeholder
that placeholder is skipped
"""
site = 1
number_start_plugins = CMSPlugin.objects.all().count()
# create an empty title language
root_page = Page.objects.on_site(site).get_home()
create_title("de", "root page de", root_page)
ph = root_page.placeholders.get(slot="body")
add_plugin(ph, "TextPlugin", "de", body="Hello World")
out = StringIO()
management.call_command(
'cms', 'copy', 'lang', '--from-lang=en', '--to-lang=de', interactive=False, stdout=out
)
self.assertEqual(CMSPlugin.objects.filter(language='en').count(), number_start_plugins)
# one placeholder (with 7 plugins) is skipped, so the difference must be 6
self.assertEqual(CMSPlugin.objects.filter(language='de').count(), number_start_plugins-6)
python类call_command()的实例源码
def call_runscheduler(loops=1, mock_call_command=None):
ctx = {'sleep_count': 0}
def fake_sleep(seconds):
ctx['sleep_count'] += 1
if ctx['sleep_count'] > loops:
raise KeyboardInterrupt()
if mock_call_command is None:
mock_call_command = mock.MagicMock()
with mock.patch.object(runscheduler, 'call_command', mock_call_command):
with mock.patch.object(runscheduler, 'logger') as mock_logger:
with mock.patch('time.sleep', fake_sleep):
with pytest.raises(KeyboardInterrupt):
call_command('runscheduler')
return mock_call_command, mock_logger
def test_page_has_redirect_response(site):
redirect_page = RedirectPage.objects.create(
depth=1,
path='0002',
title='Page',
slug='page',
)
# Make redirect page the root page
site.root_page = redirect_page
site.save()
# Build static files
management.call_command('build', '--skip-static', '--skip-media')
assert os.path.exists(os.path.join(settings.BUILD_DIR, 'index.html'))
# Check if meta tag is present
content = open(os.path.join(settings.BUILD_DIR, 'index.html')).read()
assert '<meta http-equiv="refresh" content="1; url=http://www.example.com/">' in content # noqa
def test_04_versioner(self):
# remove local files
#versioner.backend.destroy()
management.call_command('gitversions',
format='json',
indent=4)
# check local uncomited changes
self.assertEqual(versioner.backend.check(), True)
versioner.backend.commit('Initial Commit')
self.assertEqual(versioner.backend.check(), False)
# cleanup
def handle(self, *args, **options):
print ">>> Initializing your database"
try:
management.call_command('syncdb')
management.call_command('migrate')
try:
# do we need cache table?
cache.get('', None)
except ProgrammingError:
# yes we do
management.call_command('createcachetable', 'vaultier_cache')
# public static files
management.call_command('collectstatic', interactive=False)
except OperationalError as e:
msg = ">>> Your DB is not configured correctly: {}"
print msg.format(e.message)
else:
if options.get('no_statistics'):
task_statistics_collector.delay()
print (">>> DB is initialized, you can now try to run Vaultier "
"using 'vaultier runserver'")
def user_logged_in_handler(sender, request, user, **kwargs):
logging.debug('user_logged_in_handler')
request.session.save()
usersession, created = UserSession.objects.get_or_create(user=user, session_id=request.session.session_key)
usersession.ip = get_ip(request)
if DepartmentUser.objects.filter(email__iexact=user.email).exists():
logging.debug('user_logged_in_handler departmentuser {}'.format(user.email))
usersession.department_user = DepartmentUser.objects.filter(email__iexact=user.email)[0]
if (user.username != usersession.department_user.username):
test = get_user_model().objects.filter(username=usersession.department_user.username)
if test.exists():
test.delete()
user.username = usersession.department_user.username
user.save()
usersession.save()
logging.debug('user_logged_in_handler saving stuff')
management.call_command("clearsessions", verbosity=0)
def test_clearsessions_command(self):
"""
Test clearsessions command for clearing expired sessions.
"""
self.assertEqual(0, Session.objects.count())
# One object in the future
self.session['foo'] = 'bar'
self.session.set_expiry(3600)
self.session.save()
# One object in the past
other_session = self.backend()
other_session['foo'] = 'bar'
other_session.set_expiry(-3600)
other_session.save()
# Two sessions are in the database before clearsessions...
self.assertEqual(2, Session.objects.count())
management.call_command('clearsessions')
# ... and one is deleted.
self.assertEqual(1, Session.objects.count())
def create_json_dump():
filepath = os.path.join(settings.DBBACKUP_STORAGE_OPTIONS['location'], 'openkamer-' + str(datetime.date.today()) + '.json')
filepath_compressed = filepath + '.gz'
with open(filepath, 'w') as fileout:
management.call_command(
'dumpdata',
'--all',
'--natural-foreign',
'--exclude', 'auth.permission',
'--exclude', 'contenttypes',
'person',
'parliament',
'government',
'document',
'stats',
'website',
stdout=fileout
)
with open(filepath, 'rb') as f_in:
with gzip.open(filepath_compressed, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(filepath)
BackupDaily.remove_old_json_dumps(days_old=30)
def test_createadmin_prompts_for_password_if_not_given(self):
stderr = StringIO()
stdout = StringIO()
username = factory.make_name('user')
password = factory.make_string()
ssh_import = "%s:%s" % (
random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
factory.make_name('user-id'))
email = factory.make_email_address()
self.patch(createadmin, 'prompt_for_password').return_value = password
self.patch(keysource_module.KeySource, 'import_keys')
call_command(
'createadmin', username=username, email=email,
ssh_import=ssh_import, stdout=stdout, stderr=stderr)
user = User.objects.get(username=username)
self.assertThat(stderr, IsEmpty)
self.assertThat(stdout, IsEmpty)
self.assertTrue(user.check_password(password))
def test_createadmin_prompts_for_username_if_not_given(self):
stderr = StringIO()
stdout = StringIO()
username = factory.make_name('user')
password = factory.make_string()
email = factory.make_email_address()
ssh_import = "%s:%s" % (
random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
factory.make_name('user-id'))
self.patch(createadmin, 'prompt_for_username').return_value = username
self.patch(keysource_module.KeySource, 'import_keys')
call_command(
'createadmin', password=password, email=email,
ssh_import=ssh_import, stdout=stdout, stderr=stderr)
user = User.objects.get(username=username)
self.assertThat(stderr, IsEmpty)
self.assertThat(stdout, IsEmpty)
self.assertTrue(user.check_password(password))
def test_createadmin_prompts_for_email_if_not_given(self):
stderr = StringIO()
stdout = StringIO()
username = factory.make_name('user')
password = factory.make_string()
email = factory.make_email_address()
ssh_import = "%s:%s" % (
random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
factory.make_name('user-id'))
self.patch(createadmin, 'prompt_for_email').return_value = email
self.patch(keysource_module.KeySource, 'import_keys')
call_command(
'createadmin', username=username, password=password,
ssh_import=ssh_import, stdout=stdout, stderr=stderr)
user = User.objects.get(username=username)
self.assertThat(stderr, IsEmpty)
self.assertThat(stdout, IsEmpty)
self.assertTrue(user.check_password(password))
def test_createadmin_prompts_for_ssh_import_if_not_given(self):
stderr = StringIO()
stdout = StringIO()
username = factory.make_name('user')
password = factory.make_string()
email = factory.make_email_address()
ssh_import = "%s:%s" % (
random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
factory.make_name('user-id'))
self.patch(
createadmin, 'prompt_for_ssh_import').return_value = ssh_import
self.patch(keysource_module.KeySource, 'import_keys')
call_command(
'createadmin', username=username, password=password, email=email,
stdout=stdout, stderr=stderr)
user = User.objects.get(username=username)
self.assertThat(stderr, IsEmpty)
self.assertThat(stdout, IsEmpty)
self.assertTrue(user.check_password(password))
def test_apikey_generates_key(self):
stderr = StringIO()
stdout = StringIO()
user = factory.make_User()
num_keys = len(user.userprofile.get_authorisation_tokens())
call_command(
'apikey', username=user.username, generate=True, stderr=stderr,
stdout=stdout)
self.assertThat(stderr, IsEmpty)
keys_after = user.userprofile.get_authorisation_tokens()
expected_num_keys = num_keys + 1
self.assertEqual(expected_num_keys, len(keys_after))
expected_token = user.userprofile.get_authorisation_tokens()[1]
expected_string = convert_tuple_to_string(
get_creds_tuple(expected_token)) + '\n'
self.assertEqual(expected_string, stdout.getvalue())
def test_api_key_rejects_deletion_of_nonexistent_key(self):
stderr = StringIO()
user = factory.make_User()
existing_token = get_one(
user.userprofile.get_authorisation_tokens())
token_string = convert_tuple_to_string(
get_creds_tuple(existing_token))
call_command(
'apikey', username=user.username, delete=token_string,
stderr=stderr)
self.assertThat(stderr, IsEmpty)
# Delete it again. Check that there's a sensible rejection.
error_text = assertCommandErrors(
self, 'apikey', username=user.username, delete=token_string)
self.assertIn(
"No matching api key found", error_text)
def test_api_key_rejects_update_of_nonexistent_key(self):
stderr = StringIO()
user = factory.make_User()
fake_api_key_name = "Test Key Name"
existing_token = get_one(
user.userprofile.get_authorisation_tokens())
token_string = convert_tuple_to_string(
get_creds_tuple(existing_token))
call_command(
'apikey', username=user.username, delete=token_string,
stderr=stderr)
self.assertThat(stderr, IsEmpty)
# Try to update the deleted token.
error_text = assertCommandErrors(
self, 'apikey', username=user.username, update=token_string,
api_key_name=fake_api_key_name)
self.assertIn(
"No matching api key found", error_text)
def test_django_run_renames_piston_tables_if_piston_tables_exists(self):
self.patch(
dbupgrade_command, "_south_was_performed").return_value = True
self.patch(dbupgrade_command, "_find_tables").return_value = [
"piston_consumer",
"piston_token",
]
mock_rename = self.patch(
dbupgrade_command, "_rename_piston_to_piston3")
mock_call = self.patch(dbupgrade_module, "call_command")
call_command('dbupgrade', django=True)
self.assertThat(
mock_rename, MockCalledOnceWith("default", ["consumer", "token"]))
self.assertThat(
mock_call, MockCalledOnceWith(
"migrate", interactive=False, fake_initial=True))
def test_call_must_have_first_and_last_message_correct(self):
scheduler = Scheduler(report='my_report_class',
periodicity=Scheduler.PER_MON_SUN)
scheduler.save()
out = StringIO()
management.call_command('scheduler_process', stdout=out)
first_line, last_line, blank_line = (
out.getvalue().split('\n')[0],
out.getvalue().split('\n')[-2],
out.getvalue().split('\n')[-1]
)
first_message = 'Starting scheduler process'
last_message = 'Scheduler #{} processed'.format(scheduler.id)
self.assertIn(first_message, first_line)
self.assertIn(last_message, last_line)
self.assertEqual(blank_line, '')
def test_find_same_as_with_group(self):
contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
buf = StringIO()
sysout = sys.stdout
sys.stdout = buf
management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
buf.seek(0, 0)
sys.stdout = sysout
self.assertEqual(2, len(buf.readlines()))
qs = models.Group.objects.filter(name="SameAs")
self.assertEqual(1, qs.count())
self.assertEqual(qs[0].contacts.count(), 2)
self.assertFalse(contact1 in qs[0].contacts.all())
self.assertTrue(contact2 in qs[0].contacts.all())
self.assertTrue(contact3 in qs[0].contacts.all())
def test_find_same_as_with_existing_group(self):
contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
gr = models.Group.objects.create(name="SameAs")
gr.contacts.add(contact1)
gr.save()
buf = StringIO()
sysout = sys.stdout
sys.stdout = buf
management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
buf.seek(0, 0)
sys.stdout = sysout
self.assertEqual(2, len(buf.readlines()))
qs = models.Group.objects.filter(name="SameAs")
self.assertEqual(1, qs.count())
self.assertEqual(qs[0].contacts.count(), 3)
self.assertTrue(contact1 in qs[0].contacts.all())
self.assertTrue(contact2 in qs[0].contacts.all())
self.assertTrue(contact3 in qs[0].contacts.all())
def test_find_same_as_with_no_name(self):
contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
contact4 = mommy.make(models.Contact, firstname="", lastname="")
contact5 = mommy.make(models.Contact, firstname="", lastname="")
buf = StringIO()
sysout = sys.stdout
sys.stdout = buf
management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
buf.seek(0, 0)
sys.stdout = sysout
self.assertEqual(2, len(buf.readlines()))
qs = models.Group.objects.filter(name="SameAs")
self.assertEqual(1, qs.count())
self.assertEqual(qs[0].contacts.count(), 2)
self.assertFalse(contact1 in qs[0].contacts.all())
self.assertTrue(contact2 in qs[0].contacts.all())
self.assertTrue(contact3 in qs[0].contacts.all())
self.assertFalse(contact4 in qs[0].contacts.all())
self.assertFalse(contact5 in qs[0].contacts.all())
def forward(apps, schema_editor):
# Removed as it's causing exceptions and is not needed except for old nodes
# which should have updated by now
# call_command("runscript", "0016_populate_content_rendered")
pass
def test_startdash_usage_empty(self):
self.assertRaises(CommandError, call_command, 'startdash')
def test_startdash_usage_correct(self, handle):
call_command('startdash', 'test_dash')
handle.assert_called_with(dash_name='test_dash',
extensions=["py", "tmpl", "html", "js",
"css"],
files=[], no_color=False, pythonpath=None,
settings=None, skip_checks=True, target=None,
template=None, traceback=False, verbosity=1)
def test_startpanel_usage_empty(self):
self.assertRaises(CommandError, call_command, 'startpanel')
def test_startpanel_usage_correct(self, handle):
call_command('startpanel', 'test_dash', '--dashboard=foo.bar')
handle.assert_called_with(panel_name='test_dash', dashboard='foo.bar',
extensions=["py", "tmpl", "html"],
files=[], no_color=False, pythonpath=None,
settings=None, skip_checks=True, target=None,
template=None, traceback=False, verbosity=1)
def setUp(self):
"""
Set up the test class.
For the import to run an election has to exist in the
database.
"""
call_command('import_basedata')
test_path = setup_path = os.path.dirname(os.path.realpath(__file__))
results_file = test_path + '/data/example_01.json'
mapping_file = test_path + '/data/example_config.json'
call_command('import_results', results_file, mapping_file)
def test_import_json_result_data_with_mapping(self):
"""
Tests an json import of results.
"""
number_of_results = RawData.objects.count()
self.assertEqual(number_of_results, 1)
number_of_results = PollingStationResult.objects.count()
self.assertEqual(number_of_results, 4)
# call_command('import_results', local_data_file, location='local',
# file_type='json', mapping_file=mapping_file)
# number_of_results = RawData.objects.count()
# self.assertEqual(number_of_results, 2)
#def test_import_base_data(self):
#
# number_of_results = RawData.objects.count()
# self.assertEqual()
# def test_import_xml_result_data_with_mapping(self):
# """
# Tests an xml import.
# """
# test_path = os.path.dirname(os.path.realpath(__file__))
# local_data_file = test_path + '/data/example_01.xml'
# mapping_file = test_path + '/data/example_mapping.json'
# call_command('import_results', local_data_file, location='local',
# file_type='xml', mapping_file=mapping_file)
# number_of_results = RawData.objects.count()
# self.assertEqual(number_of_results, 1)
# call_command('import_results', local_data_file, location='local',
# file_type='xml', mapping_file=mapping_file)
# number_of_results = RawData.objects.count()
# self.assertEqual(number_of_results, 2)
def sync_database():
management.call_command('makemigrations', 'dwarf')
management.call_command('migrate', 'dwarf')
def calculate_vote(sender, instance, **kwargs):
## don't fire if it's a gdoc result
if instance.gdoc_import != True:
electiondate_arg = str(instance.electiondate)
call_command('calculate_vote', electiondate_arg)
## needs to be written <-- not doing this bc it would mess with manual results w/ multiple winners, winners that req 2/3, etc
# call_command('declare_winner')
## NEEDS TO STAY models.model so PSQL import works
def snapshot(electiondate_string):
## *** need to have this on S3 or other external location to not fill up server ***
## NOTE: instead of saving anew, just use/copy the tmp file?
## should we snapshot manual results from gdoc? use separate parent dir to avoid conflict
if mccelectionsenv == "local":
# call_command('snapshot_results')
## copy and rename results.csv w/ cp command
file_path = os.environ["SAVER_PATH"]
orgin = file_path + "/tmp/results.csv"
now = timezone.localtime(timezone.now())
save_date = now.date()
save_date_string = str(save_date)
timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')
snapshot_filename = "results%s.csv" % (timestamp)
destination_dir = "%s/%s/%s" % (file_path, electiondate_string, save_date_string)
destination = "%s/%s" % (destination_dir, snapshot_filename)
mkdir = "mkdir -p %s" % (destination_dir)
snapshot = "cp %s %s" % (origin, destination)
## making the dir, if it's not there
call(mkdir, shell=True)
message = "Making new directory, if needed:\n%s" % (destination_dir)
slackbot(message)
## actual snapshot executed
call(snapshot, shell=True)
message = "Snapshotting"
slackbot(message)
# else:
# snapshot to S3