def test_m2m_changed_post_remove_reverse(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = book.authors.get()
author.book_set.remove(book)
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
python类disconnect()的实例源码
def tearDown(self):
super(TagAlertTestCase, self).tearDown()
post_save.disconnect(tag_alert, sender=Alert)
def tearDown(self):
super(TagAnalysisTestCase, self).tearDown()
post_save.disconnect(tag_analysis, sender=Analysis)
def tearDown(self):
super(TagCommentTestCase, self).tearDown()
post_save.disconnect(tag_comment, sender=Comment)
def _disconnect_user_post_save_for_migrations(self, sender, **kwargs): # pylint: disable=unused-argument
"""
Handle pre_migrate signal - disconnect User post_save handler.
"""
from django.db.models.signals import post_save
post_save.disconnect(sender=self.auth_user_model, dispatch_uid=USER_POST_SAVE_DISPATCH_UID)
def tearDown(self):
post_save.disconnect(self.post_save_listener, sender=User)
def unregister(self, model):
"""Removes a model from version control."""
if not self.is_registered(model):
raise RegistrationError("{model} has not been registered with django-reversion".format(
model = model,
))
del self._registered_models[model]
post_save.disconnect(self._post_save_receiver, model)
pre_delete.disconnect(self._pre_delete_receiver, model)
def _disconnect_signals():
""" used in testing """
post_save.disconnect(plan_watchers.on_plan_save, TestPlan)
pre_delete.disconnect(plan_watchers.on_plan_delete, TestPlan)
pre_save.disconnect(plan_watchers.pre_save_clean, TestPlan)
def _disconnect_signals():
# used in testing
post_save.disconnect(case_watchers.on_case_save, TestCase)
post_delete.disconnect(case_watchers.on_case_delete, TestCase)
pre_save.disconnect(case_watchers.pre_save_clean, TestCase)
def set_api(sender, instance, **kwargs):
# Always reset the _bot instance after save, in case the token changes.
instance._bot = BotAPI(instance.token)
# set webhook
url = None
cert = None
if instance.enabled:
webhook = reverse('telegrambot:webhook', kwargs={'token': instance.token})
from django.contrib.sites.models import Site
current_site = Site.objects.get_current()
if instance.https_port is None:
url = 'https://' + current_site.domain + webhook
else:
url = 'https://' + current_site.domain + ':' + str(instance.https_port) + webhook
if instance.ssl_certificate:
instance.ssl_certificate.open()
cert = instance.ssl_certificate
instance._bot.setWebhook(webhook_url=url,
certificate=cert)
logger.info("Success: Webhook url %s for bot %s set" % (url, str(instance)))
# complete Bot instance with api data
if not instance.user_api:
bot_api = instance._bot.getMe()
botdict = bot_api.to_dict()
modelfields = [f.name for f in User._meta.get_fields()]
params = {k: botdict[k] for k in botdict.keys() if k in modelfields}
user_api, _ = User.objects.get_or_create(**params)
instance.user_api = user_api
# Prevent signal recursion, and save.
post_save.disconnect(set_api, sender=sender)
instance.save()
post_save.connect(set_api, sender=sender)
logger.info("Success: Bot api info for bot %s set" % str(instance))
def handle(self, *app_labels, **options):
if Project.objects.exists():
print("We already have data. Skip load_initial_data.")
return
assert post_save.disconnect(sender=DiscussionComment, dispatch_uid='discussioncomment_was_saved')
FIXTURES = [
'pages',
'emails',
'articlecategory',
'patterncategory',
'sample_data',
]
for fixture in FIXTURES:
call_command('loaddata', fixture, app_label='tn2app')
def test_recursive_connect(self):
post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
try:
book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
for depth in range(3):
db.cypher_query('MATCH (n)-[r]-() WHERE n.type = "ModelNode" DELETE r') # Delete all relationships
book_node = get_node_for_object(book).save()
book_node.recursive_connect(depth)
if depth == 0:
# Max depth 0 means that no recursion should occur, and no connections
# can be made, because the connected objects might not exist.
for prop in book_node.defined_properties(aliases=False, properties=False).keys():
relation = getattr(book_node, prop)
try:
self.assertEqual(len(relation.all()), 0)
except CardinalityViolation:
# Will raise CardinalityViolation for nodes which has a single
# required relationship
continue
elif depth == 1:
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(store_set=True)))
self.assertEqual(0, len(get_node_class_for_model(Store).nodes.has(books=True)))
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(bestseller_stores=True)))
self.assertEqual(0, len(get_node_class_for_model(Store).nodes.has(bestseller=True)))
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(publisher=True)))
self.assertEqual(1, len(get_node_class_for_model(Publisher).nodes.has(book_set=True)))
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(user=True)))
self.assertEqual(0, len(get_node_class_for_model(User).nodes.has(author=True)))
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(tags=True)))
self.assertEqual(0, len(get_node_class_for_model(Tag).nodes.has(content_type=True)))
elif depth == 2:
self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(user=True)))
self.assertEqual(1, len(get_node_class_for_model(User).nodes.has(author=True)))
self.assertEqual(1, len(get_node_class_for_model(Tag).nodes.has(content_type=True)))
self.assertEqual(1, len(get_node_class_for_model(ContentType)
.nodes.has(content_type_set_for_tag=True)))
finally:
post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')