def test_delete_object_is_deleted(self):
pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
try:
book = BookFixture(Book).create_one()
klass = get_node_class_for_model(Book)
pk = book.pk
try:
book.delete()
klass.nodes.get(pk=pk)
self.fail('Did not raise when trying to get non-existent book node.')
except klass.DoesNotExist as e:
self.assertEqual(str(e), "{'pk': %d}" % pk)
finally:
pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
python类connect()的实例源码
def test_null_foreignkey_is_disconnected(self):
post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
try:
store = StoreFixture(Store, generate_m2m=False).create_one()
klass = get_node_class_for_model(Store)
self.assertEqual(store.bestseller.pk,
get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk)
self.assertEqual(1, len(klass.nodes.has(bestseller=True)))
store.bestseller = None
store.save()
self.assertEqual(0, len(klass.nodes.has(bestseller=True)))
finally:
post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def ready(self):
from .signals.handlers import (
m2m_changed_handler, post_migrate_handler,
post_save_handler, pre_delete_handler
)
m2m_changed.connect(receiver=m2m_changed_handler,
dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
post_save.connect(receiver=post_save_handler,
dispatch_uid='chemtrails.signals.handlers.post_save_handler')
pre_delete.connect(receiver=pre_delete_handler,
dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
post_migrate.connect(receiver=post_migrate_handler,
dispatch_uid='neomodel.core.post_migrate_handler')
# Neo4j config
config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL',
os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL))
config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE',
get_environment_variable('NEO4J_FORCE_TIMEZONE', False))
config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION',
get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True))
config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE',
get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
def ready(self):
# The app is now ready. Include any monkey patches here.
# Monkey patch CSRF to switch to session based CSRF. Session
# based CSRF will prevent attacks from apps under the same
# domain. If you're planning to host your app under it's own
# domain you can remove session_csrf and use Django's CSRF
# library. See also
# https://github.com/mozilla/sugardough/issues/38
session_csrf.monkeypatch()
# Connect signals.
from atmo.jobs.models import SparkJob
from atmo.jobs.signals import assign_group_perm, remove_group_perm
post_save.connect(
assign_group_perm,
sender=SparkJob,
dispatch_uid='sparkjob_post_save_assign_perm',
)
pre_delete.connect(
remove_group_perm,
sender=SparkJob,
dispatch_uid='sparkjob_pre_delete_remove_perm',
)
def register_signal_handlers():
"""Registers signal handlers.
To create a signal for TranslatablePage we have to use wagtails
get_page_model.
"""
post_save.connect(create_language_permissions_and_group, sender=Language)
init_new_page.connect(force_parent_language)
if get_wagtailtrans_setting('SYNC_TREE'):
if get_wagtailtrans_setting('LANGUAGES_PER_SITE'):
m2m_changed.connect(
update_language_trees_for_site,
sender=SiteLanguages.other_languages.through)
else:
post_save.connect(create_new_language_tree, sender=Language)
for model in get_page_models():
if hasattr(model, 'create_translation'):
post_save.connect(synchronize_trees, sender=model)
if hasattr(model, 'get_translations'):
pre_delete.connect(synchronize_deletions, sender=model)
def create_user_profile(sender, instance, created, **kwargs):
""" Create profile and preference upon user creation (``post_save.connect``).
:param sender: sender object
:param instance: user instance
:param created: user creation flag
:param kwargs: dictionary argument
:return: None
"""
if created:
user = instance
# Create profile / preference objects for user
profile, created = UserProfile.objects.get_or_create(user=instance)
preference, created = UserPreference.objects.get_or_create(user=instance,
site=Site.objects.get_current())
# User automatically belongs to meta_all_members
g, c = Group.objects.get_or_create(name=PermissionAlias.all)
user.groups.add(g)
def remove_obj_perms_connected_with_user(sender, instance, **kwargs):
""" Remove user's permissions upon user deletion (``pre_delete.connect``).
:param sender: sender object
:param instance: user instance
:param kwargs: dictionary argument
:return: None
"""
filters = Q(content_type=ContentType.objects.get_for_model(instance),
object_pk=instance.pk)
UserObjectPermission.objects.filter(filters).delete()
GroupObjectPermission.objects.filter(filters).delete()
if instance.profile:
instance.profile.delete()
if instance.preference:
for i in instance.preference.all():
i.delete()
def register(self, model, adapter_cls=VersionAdapter, **field_overrides):
"""Registers a model with this revision manager."""
# Prevent multiple registration.
if self.is_registered(model):
raise RegistrationError("{model} has already been registered with django-reversion".format(
model = model,
))
# Prevent proxy models being registered.
if model._meta.proxy:
raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead")
# Perform any customization.
if field_overrides:
adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides)
# Perform the registration.
adapter_obj = adapter_cls(model)
self._registered_models[model] = adapter_obj
# Connect to the post save signal of the model.
post_save.connect(self._post_save_receiver, model)
pre_delete.connect(self._pre_delete_receiver, model)
def register(model):
"""Register a model to the audit code.
:param model: Model to register.
:type model: object
"""
try:
pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model))
post_save.connect(_post_save, sender=model, dispatch_uid=str(model))
pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model))
except Exception as e:
logger.error("<Register> %s", e.message)
def remove_xform(xform):
# disconnect parsed instance pre delete signal
pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)
# delete instances from mongo db
query = {
ParsedInstance.USERFORM_ID:
"%s_%s" % (xform.user.username, xform.id_string)}
xform_instances.remove(query, j=True)
# delete xform, and all related models
xform.delete()
# reconnect parsed instance pre delete signal?
pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
def remove_xform(xform):
# disconnect parsed instance pre delete signal
pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)
# delete instances from mongo db
query = {
ParsedInstance.USERFORM_ID:
"%s_%s" % (xform.user.username, xform.id_string)}
xform_instances.remove(query, j=True)
# delete xform, and all related models
xform.delete()
# reconnect parsed instance pre delete signal?
pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
def test_create_new_object_is_synced(self):
post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
try:
book = BookFixture(Book).create_one()
klass = get_node_class_for_model(Book)
self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk)
self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True)))
finally:
post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def test_m2m_changed_post_add(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one()
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = AuthorFixture(Author).create_one()
book.authors.add(author)
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_add_reverse(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
author = AuthorFixture(Author).create_one()
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
book = BookFixture(Book, follow_m2m=False, field_values={'authors': []}).create_one()
author.book_set.add(book)
self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_clear_reverse(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = book.authors.get()
author.book_set.clear()
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_remove(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = book.authors.get()
book.authors.remove(author)
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_remove_reverse(self):
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
try:
book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
author = book.authors.get()
author.book_set.remove(book)
self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
finally:
m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def cleanup(cls):
uid = cls.__name__
pre_delete.connect(receiver=on_delete, sender=cls, dispatch_uid=uid)
pre_save.connect(receiver=on_save, sender=cls, dispatch_uid=uid)
return cls
def __init__(self):
"""Initializes the revision state."""
self.clear()
# Connect to the request finished signal.
request_finished.connect(self._request_finished_receiver)
def _listen():
post_save.connect(plan_watchers.on_plan_save, TestPlan)
pre_delete.connect(plan_watchers.on_plan_delete, TestPlan)
pre_save.connect(plan_watchers.pre_save_clean, TestPlan)
def remove_xform(xform):
# disconnect parsed instance pre delete signal
pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)
# delete instances from mongo db
query = {
ParsedInstance.USERFORM_ID:
"%s_%s" % (xform.user.username, xform.id_string)}
xform_instances.remove(query, j=True)
# delete xform, and all related models
xform.delete()
# reconnect parsed instance pre delete signal?
pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
def ready(self):
query.QuerySet.format = format
query.QuerySet.update = update
query.QuerySet._update = _update
if getattr(settings, 'ENABLE_ARRAY_M2M', False):
datastructures.Join.as_sql = as_sql
query.prefetch_one_level = prefetch_one_level
pre_delete.connect(delete_reverse_related)