def test_process_no_sieve(self):
"""
Tests the process method for a chute with no sieve.
"""
mock_doc_id = 1
email = {'Message-ID': 'abc', 'Subject': 'This is an Urgent Alert'}
doc_obj = DocumentObj(data=email)
mailchute = MailChute.objects.get(pk=3)
mailchute.enabled = True
mailchute.munger.process = Mock(return_value=mock_doc_id)
doc_id = mailchute.process(doc_obj)
mailchute.munger.process.assert_called_once_with(doc_obj)
self.assertEqual(doc_id, mock_doc_id)
# NOTE: use TransactionTestCase to handle threading
python类TransactionTestCase()的实例源码
def test_multiprocess_muzzled(self):
"""
Tests muzzling when multiple duplicate Alerts are being processed
concurrently.
"""
with patch('alerts.models.Alert._format_title',
return_value=self.data['content']['subject']):
incident_num = 20
alert_count = Alert.objects.count()
args = [self.doc_obj]
alert = self.email_wdog.process(*args)
# check that a new Alert was created
self.assertEqual(Alert.objects.count(), alert_count + 1)
# NOTE: we can't use multiprocessing with Mocks,
# so we have to settle for using threading to mimic concurrency
threads = []
for dummy_index in range(incident_num):
new_thread = threading.Thread(target=self.email_wdog.process,
args=args)
threads.append(new_thread)
new_thread.start()
for thread in threads:
thread.join()
# NOTE: we can't check Alert counts because saved Alerts
# won't be committed in the TransactionTestCase
# but we can check that the previous Alert was incremented
alert = Alert.objects.get(pk=alert.pk)
self.assertEqual(alert.incidents, incident_num + 1)
def __call__(self, test_func):
from django.test import TransactionTestCase
if isinstance(test_func, type):
if not issubclass(test_func, TransactionTestCase):
raise Exception(
"Only subclasses of Django SimpleTestCase "
"can be decorated with override_settings")
original_pre_setup = test_func._pre_setup
original_post_teardown = test_func._post_teardown
def _pre_setup(innerself):
self.enable()
original_pre_setup(innerself)
def _post_teardown(innerself):
original_post_teardown(innerself)
self.disable()
test_func._pre_setup = _pre_setup
test_func._post_teardown = _post_teardown
return test_func
else:
@wraps(test_func)
def inner(*args, **kwargs):
with self:
return test_func(*args, **kwargs)
return inner
def test_run_migrations_success(self):
"""
Test the migration success path.
"""
# Using TransactionTestCase sets up the migrations as set up for the test.
# Reset the release_util migrations to the very beginning - i.e. no tables.
call_command("migrate", "release_util", "zero", verbosity=0)
input_yaml = """
database: 'default',
migrations:
- [release_util, 0001_initial]
- [release_util, 0002_second]
- [release_util, 0003_third]
initial_states:
- [release_util, zero]
"""
output = [
{
'database': 'default',
'duration': None,
'failed_migration': None,
'migration': 'all',
'output': None,
'succeeded_migrations': [
['release_util', '0001_initial'],
['release_util', '0002_second'],
['release_util', '0003_third'],
],
'traceback': None,
'succeeded': True,
},
]
out_file = tempfile.NamedTemporaryFile(suffix='.yml')
in_file = tempfile.NamedTemporaryFile(suffix='.yml')
in_file.write(input_yaml.encode('utf-8'))
in_file.flush()
# Check the stdout output against the expected output.
self._check_command_output(
cmd='run_migrations',
cmd_args=(in_file.name,),
cmd_kwargs={'output_file': out_file.name},
output=output,
)
in_file.close()
# Check the contents of the output file against the expected output.
with open(out_file.name, 'r') as f:
output_yaml = f.read()
parsed_yaml = yaml.safe_load(output_yaml)
self.assertTrue(isinstance(parsed_yaml, list))
parsed_yaml = self._null_certain_fields(parsed_yaml)
self.assertEqual(yaml.dump(output), yaml.dump(parsed_yaml))
out_file.close()
def test_run_migrations_failure(self, migration_name, migration_output):
"""
Test the first, second, and last migration failing.
"""
# Using TransactionTestCase sets up the migrations as set up for the test.
# Reset the release_util migrations to the very beginning - i.e. no tables.
call_command("migrate", "release_util", "zero", verbosity=0)
input_yaml = """
database: 'default',
migrations:
- [release_util, 0001_initial]
- [release_util, 0002_second]
- [release_util, 0003_third]
initial_states:
- [release_util, zero]
"""
out_file = tempfile.NamedTemporaryFile(suffix='.yml')
in_file = tempfile.NamedTemporaryFile(suffix='.yml')
in_file.write(input_yaml.encode('utf-8'))
in_file.flush()
# A bogus class for creating a migration object that will raise a CommandError.
class MigrationFail(object):
atomic = False
def state_forwards(self, app_label, state):
pass
def database_forwards(self, app_label, schema_editor, from_state, to_state):
raise CommandError("Yo")
# Insert the bogus object into the first operation of a migration.
current_migration_list = release_util.tests.migrations.test_migrations.__dict__[migration_name].__dict__['Migration'].operations
current_migration_list.insert(0, MigrationFail())
try:
# Check the stdout output.
self._check_command_output(
cmd="run_migrations",
cmd_args=(in_file.name,),
cmd_kwargs={'output_file': out_file.name},
output=migration_output,
err_output="Migration error: Migration failed for app 'release_util' - migration '{}'.".format(migration_name),
exit_value=1
)
finally:
# Whether the test passes or fails, always pop the failure migration of the list.
current_migration_list.pop(0)
in_file.close()
out_file.close()