def test_multiprocess_muzzled(self):
"""
Tests muzzling when multiple duplicate Alerts are being processed
concurrently.
"""
with patch('alerts.models.Alert._format_title',
return_value=self.data['content']['subject']):
incident_num = 20
alert_count = Alert.objects.count()
args = [self.doc_obj]
alert = self.email_wdog.process(*args)
# check that a new Alert was created
self.assertEqual(Alert.objects.count(), alert_count + 1)
# NOTE: we can't use multiprocessing with Mocks,
# so we have to settle for using threading to mimic concurrency
threads = []
for dummy_index in range(incident_num):
new_thread = threading.Thread(target=self.email_wdog.process,
args=args)
threads.append(new_thread)
new_thread.start()
for thread in threads:
thread.join()
# NOTE: we can't check Alert counts because saved Alerts
# won't be committed in the TransactionTestCase
# but we can check that the previous Alert was incremented
alert = Alert.objects.get(pk=alert.pk)
self.assertEqual(alert.incidents, incident_num + 1)
评论列表
文章目录