def test_add_campaign_report_item(self):
def handler(context, **kwarg):
context['tabs_layout']['Test'] = []
make_campaign_report.connect(handler)
# test
content = self.get()
self.assertIn('<a href="#test" aria-controls="test" role="tab" '
'data-toggle="tab">Test</a>', content)
self.assertIn('id="test"', content)
# clean
self.assertTrue(make_campaign_report.disconnect(handler))
python类connect()的实例源码
def test_delete_campaign_report_item(self):
def handler(context, **kwarg):
del(context['tabs_layout']['Other'])
make_campaign_report.connect(handler)
# test
content = self.get()
self.assertNotIn('<a href="#other" aria-controls="other" '
'role="tab" data-toggle="tab">Other</a>', content)
self.assertNotIn('id="other"', content)
# clean
self.assertTrue(make_campaign_report.disconnect(handler))
def setup():
from .models import TextFormat, TextFilter
post_save.connect(invalidate_cache, sender=TextFormat)
post_save.connect(invalidate_cache, sender=TextFilter)
def register(indexer_class):
"""
Registers an indexer class.
"""
if indexer_class not in _REGISTERED_CLASSES:
_REGISTERED_CLASSES.append(indexer_class)
# register signal handlers
model = indexer_class.serializer_class.Meta.model
post_delete.connect(_instance_changed, sender=model)
post_save.connect(_instance_changed, sender=model)
def register_post_save_functions():
# Disconnect first in case this function is called twice
logger.debug('Thumbnail: Registering post_save functions.')
post_save.disconnect(generate_thumbnail, sender=Layer)
post_save.connect(generate_thumbnail, sender=Layer, weak=False)
post_save.disconnect(generate_thumbnail, sender=Map)
post_save.connect(generate_thumbnail, sender=Map, weak=False)
def ready(self):
from .models import Action
post_save.connect(self.on_action_save, sender=Action)
def __init__(self, sender, subject_prefix, host):
super(MailAlerts, self).__init__()
self.sender = sender
self.subject_prefix = subject_prefix
self.host = host
self.change_event = threading.Event()
self.exit = False
post_save.connect(self._table_changed)
def register_signals():
pre_save.connect(LostKey.objects.on_pre_save, sender=LostKey)
post_save.connect(LostKey.objects.send_notification, sender=LostKey)
post_save.connect(
Member.objects.send_transfer_workspace_key_info, sender=Member)
def save(self, *args, **kwargs):
super(Comment, self).save(*args, **kwargs)
# recipient_list = [self.post.author.email]
# title = '{} ?? ??? ?????'.format(self.post.title)
# content = '{}? {}??? ????'.format(
# self.created_date.strftime('%Y.%m.%d %H:%M'),
# self.content
# )
# send_mail(title, content)
# post_save.connect(send_comment_mail, sender=Comment)
def save(self, *args, **kwargs):
super(Comment, self).save(*args, **kwargs)
# recipient_list = [self.post.author.email]
# title = '{} ?? ??? ?????'.format(self.post.title)
# content = '{}? {}??? ????'.format(
# self.created_date.strftime('%Y.%m.%d %H:%M'),
# self.content
# )
# send_mail(title, content)
# post_save.connect(send_comment_mail, sender=Comment)
def save(self, *args, **kwargs):
super(Comment, self).save(*args, **kwargs)
# recipient_list = [self.post.author.email]
# title = '{} ?? ??? ?????'.format(self.post.title)
# content = '{}? {}??? ????'.format(
# self.created_date.strftime('%Y.%m.%d %H:%M'),
# self.content
# )
# send_mail(title, content)
# post_save.connect(send_comment_mail, sender=Comment)
def save(self, *args, **kwargs):
super(Comment, self).save(*args, **kwargs)
# recipient_list = [self.post.author.email]
# title = '{} ?? ??? ?????'.format(self.post.title)
# content = '{}? {}??? ????'.format(
# self.created_date.strftime('%Y.%m.%d %H:%M'),
# self.content
# )
# send_mail(title, content)
# post_save.connect(send_comment_mail, sender=Comment)
def __init__(self):
# Iterate over existing configurations that are moderated
config_qs = SocialFeedConfiguration.objects.filter(moderated=True)
for config in config_qs:
self.config_menu_items[config.id] = \
self._create_moderate_menu_item(config)
self._registered_menu_items = list(self.config_menu_items.values())
self.construct_hook_name = None
post_save.connect(self._update_menu, sender=SocialFeedConfiguration)
def get_niming_user_info_url(self, url):
url = 'https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect'\
%(self.appid, quote(url))
return url
def test_no_knock(self, handler):
posts = []
post_save.connect(handler, NoKnockPost, dispatch_uid='test_knocker_mock')
posts.append(NoKnockPost.objects.create(
title='first post',
slug='first-post',
))
posts.append(NoKnockPost.objects.create(
title='second post',
slug='second-post',
))
self.assertEqual(handler.call_count, 2)
def _connect(self):
"""
Connect signal to current model
"""
post_save.connect(
notify_items, sender=self.__class__,
dispatch_uid='knocker_{0}'.format(self.__class__.__name__)
)
def test_create_new_object_is_synced(self):
post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
try:
book = BookFixture(Book).create_one()
klass = get_node_class_for_model(Book)
self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk)
self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True)))
finally:
post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def test_m2m_changed_post_add(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one()
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = AuthorFixture(Author).create_one()
book.authors.add(author)
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')