def _do_healthcheck(self, containers, config):
path = config.get('HEALTHCHECK_URL', '/')
timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
if not _etcd_client:
raise exceptions.HealthcheckException('no etcd client available')
for container in containers:
try:
key = "/deis/services/{self}/{container.job_id}".format(**locals())
url = "http://{}{}".format(_etcd_client.get(key).value, path)
response = requests.get(url, timeout=timeout)
if response.status_code != requests.codes.OK:
raise exceptions.HealthcheckException(
"app failed health check (got '{}', expected: '200')".format(
response.status_code))
except (requests.Timeout, requests.ConnectionError, KeyError) as e:
raise exceptions.HealthcheckException(
'failed to connect to container ({})'.format(e))
python类connect()的实例源码
def _do_healthcheck(self, containers, config):
path = config.get('HEALTHCHECK_URL', '/')
timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
if not _etcd_client:
raise exceptions.HealthcheckException('no etcd client available')
for container in containers:
try:
key = "/deis/services/{self}/{container.job_id}".format(**locals())
url = "http://{}{}".format(_etcd_client.get(key).value, path)
response = requests.get(url, timeout=timeout)
if response.status_code != requests.codes.OK:
raise exceptions.HealthcheckException(
"app failed health check (got '{}', expected: '200')".format(
response.status_code))
except (requests.Timeout, requests.ConnectionError, KeyError) as e:
raise exceptions.HealthcheckException(
'failed to connect to container ({})'.format(e))
def test_add_vars(self):
# add value handler
def handler(vars_data, **kwarg):
vars_data.append({
'name': 'test_var',
'description': 'Is a test!',
'value': 'Hello!',
})
make_template_vars.connect(handler)
# test value
var_data = self.get('test_var')
self.assertIsNotNone(var_data)
self.assertEqual(var_data['value'], 'Hello!')
self.assertEqual(var_data['description'], 'Is a test!')
# test replace
content = replace_template_vars('{{ test_var }}')
self.assertEqual(content, 'Hello!')
# clean
self.assertTrue(make_template_vars.disconnect(handler))
def test_edit_vars(self):
# add value handler
def handler(vars_data, **kwarg):
var_data = self._search_name(vars_data, 'email')
self.assertIsNotNone(var_data)
var_data['value'] = 'Hello Word!'
make_template_vars.connect(handler)
# test value
var_data = self.get('email')
self.assertIsNotNone(var_data)
self.assertEqual(var_data['value'], 'Hello Word!')
# test replace
content = replace_template_vars('{{ email }}')
self.assertEqual(content, 'Hello Word!')
# clean
self.assertTrue(make_template_vars.disconnect(handler))
def test_delete_vars(self):
# add value handler
def handler(vars_data, **kwarg):
var_data = self._search_name(vars_data, 'email')
self.assertIsNotNone(var_data)
vars_data.remove(var_data)
make_template_vars.connect(handler)
# test value
var_data = self.get('email')
self.assertIsNone(var_data)
# test replace
content = replace_template_vars('{{ email }}')
self.assertEqual(content, '{{ email }}')
# clean
self.assertTrue(make_template_vars.disconnect(handler))
def test_add_item(self):
# add value handler
def handler(urls, **kwarg):
urls.append({
'name': 'test',
'url': 'http://google.fr',
})
make_menu.connect(handler)
# test
item = self.get('test')
self.assertIsNotNone(item)
self.assertEqual(item['url'], 'http://google.fr')
# clean
self.assertTrue(make_menu.disconnect(handler))
def test_attachment_executed(self):
def handler(instance, **kwarg):
# get post on landing page event
if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
# ignore other target event
return
self.assertEqual(instance.ip, '127.0.0.1')
self.assertEqual(instance.user_agent, 'Outlook')
raise SuccessException()
post_save.connect(handler, sender=TrackerInfos)
# call tracker
self.send_campaign()
attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
tracker_url = attachment['tracker_url']
# test if handler has call
with self.assertRaises(SuccessException):
self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
self.client.get(tracker_url)
# clean
self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_email_open(self):
def handler(instance, **kwarg):
# get email open event
if instance.target_tracker.key != TRACKER_EMAIL_OPEN:
# ignore other target event
return
self.assertEqual(instance.ip, '127.0.0.1')
self.assertEqual(instance.user_agent, 'Outlook')
raise SuccessException()
post_save.connect(handler, sender=TrackerInfos)
# call tracker
self.send_campaign()
mail_html = mail.outbox[-1].alternatives[0][0]
tracker_url = mail_html.split('src="')[-1].split('"')[0]
# test if handler has call
with self.assertRaises(SuccessException):
self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
self.client.get(tracker_url)
# clean
self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_edit_campaign_report_item(self):
def handler(context, **kwarg):
context['tabs_layout']['Test'] = context['tabs_layout']. \
pop('Other')
make_campaign_report.connect(handler)
# test
content = self.get()
self.assertIn('<a href="#test" aria-controls="test" role="tab" '
'data-toggle="tab">Test</a>', content)
self.assertIn('id="test"', content)
self.assertNotIn('<a href="#other" aria-controls="other" '
'role="tab" data-toggle="tab">Other</a>', content)
self.assertNotIn('id="other"', content)
# clean
self.assertTrue(make_campaign_report.disconnect(handler))
def create_auth(sender, instance, created, **kwargs):
"""
A hook that run when we create a new network that creates
an auth user and token that BTSs on the network use to
authenticate.
"""
if not instance.auth_group or not instance.auth_user:
instance.auth_group, created_group = Group.objects.get_or_create(name='network_%s'
% instance.pk)
if created_group:
assign_perm('view_network', instance.auth_group, instance)
post_save.disconnect(UserProfile.new_user_hook, sender=User)
instance.auth_user, created_user = User.objects.get_or_create(username='network_%s'
% instance.pk)
if created_user:
Token.objects.create(user=instance.auth_user)
instance.auth_group.user_set.add(instance.auth_user)
post_save.connect(UserProfile.new_user_hook, sender=User)
instance.save()
def test_delete_object_is_deleted(self):
pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
try:
book = BookFixture(Book).create_one()
klass = get_node_class_for_model(Book)
pk = book.pk
try:
book.delete()
klass.nodes.get(pk=pk)
self.fail('Did not raise when trying to get non-existent book node.')
except klass.DoesNotExist as e:
self.assertEqual(str(e), "{'pk': %d}" % pk)
finally:
pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
def test_null_foreignkey_is_disconnected(self):
post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
try:
store = StoreFixture(Store, generate_m2m=False).create_one()
klass = get_node_class_for_model(Store)
self.assertEqual(store.bestseller.pk,
get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk)
self.assertEqual(1, len(klass.nodes.has(bestseller=True)))
store.bestseller = None
store.save()
self.assertEqual(0, len(klass.nodes.has(bestseller=True)))
finally:
post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def ready(self):
from .signals.handlers import (
m2m_changed_handler, post_migrate_handler,
post_save_handler, pre_delete_handler
)
m2m_changed.connect(receiver=m2m_changed_handler,
dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
post_save.connect(receiver=post_save_handler,
dispatch_uid='chemtrails.signals.handlers.post_save_handler')
pre_delete.connect(receiver=pre_delete_handler,
dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
post_migrate.connect(receiver=post_migrate_handler,
dispatch_uid='neomodel.core.post_migrate_handler')
# Neo4j config
config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL',
os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL))
config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE',
get_environment_variable('NEO4J_FORCE_TIMEZONE', False))
config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION',
get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True))
config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE',
get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
def test_previous_status(self):
from django.db.models.signals import post_save
def check_different(sender, instance, **kwargs):
check_different.is_different = (instance._status != instance.status)
check_different.is_different = None
post_save.connect(check_different)
s = Service(name="service", description="test", status=0)
s.save()
self.assertFalse(check_different.is_different)
s.status = 0
s.save()
self.assertFalse(check_different.is_different)
s.status = 1
s.save()
self.assertIsNotNone(check_different.is_different)
self.assertTrue(check_different.is_different)
post_save.disconnect(check_different)
def _fire_alerts(self):
# Re-enable signals to enable testing of them
post_save.connect(receiver=TriggerSet._fire_triggersets, dispatch_uid='Fire Trigger Sets')
self._asJaneDoe()
updated_address = {"city": "London", "country": "England"}
response = self._client.patch("/addresses/%d/" % self._janeDoeAddress.id,
updated_address, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self._asJoeBloggs()
updated_address = {"country": "England"}
response = self._client.patch("/addresses/%d/" % self._joeBloggsAddress.id,
updated_address, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Disable signals to avoid affecting other tests
post_save.disconnect(receiver=TriggerSet._fire_triggersets,
dispatch_uid='Fire Trigger Sets')
def ready(self):
# The app is now ready. Include any monkey patches here.
# Monkey patch CSRF to switch to session based CSRF. Session
# based CSRF will prevent attacks from apps under the same
# domain. If you're planning to host your app under it's own
# domain you can remove session_csrf and use Django's CSRF
# library. See also
# https://github.com/mozilla/sugardough/issues/38
session_csrf.monkeypatch()
# Connect signals.
from atmo.jobs.models import SparkJob
from atmo.jobs.signals import assign_group_perm, remove_group_perm
post_save.connect(
assign_group_perm,
sender=SparkJob,
dispatch_uid='sparkjob_post_save_assign_perm',
)
pre_delete.connect(
remove_group_perm,
sender=SparkJob,
dispatch_uid='sparkjob_pre_delete_remove_perm',
)
def register_signal_handlers():
"""Registers signal handlers.
To create a signal for TranslatablePage we have to use wagtails
get_page_model.
"""
post_save.connect(create_language_permissions_and_group, sender=Language)
init_new_page.connect(force_parent_language)
if get_wagtailtrans_setting('SYNC_TREE'):
if get_wagtailtrans_setting('LANGUAGES_PER_SITE'):
m2m_changed.connect(
update_language_trees_for_site,
sender=SiteLanguages.other_languages.through)
else:
post_save.connect(create_new_language_tree, sender=Language)
for model in get_page_models():
if hasattr(model, 'create_translation'):
post_save.connect(synchronize_trees, sender=model)
if hasattr(model, 'get_translations'):
pre_delete.connect(synchronize_deletions, sender=model)
def create_user_profile(sender, instance, created, **kwargs):
""" Create profile and preference upon user creation (``post_save.connect``).
:param sender: sender object
:param instance: user instance
:param created: user creation flag
:param kwargs: dictionary argument
:return: None
"""
if created:
user = instance
# Create profile / preference objects for user
profile, created = UserProfile.objects.get_or_create(user=instance)
preference, created = UserPreference.objects.get_or_create(user=instance,
site=Site.objects.get_current())
# User automatically belongs to meta_all_members
g, c = Group.objects.get_or_create(name=PermissionAlias.all)
user.groups.add(g)
def remove_obj_perms_connected_with_user(sender, instance, **kwargs):
""" Remove user's permissions upon user deletion (``pre_delete.connect``).
:param sender: sender object
:param instance: user instance
:param kwargs: dictionary argument
:return: None
"""
filters = Q(content_type=ContentType.objects.get_for_model(instance),
object_pk=instance.pk)
UserObjectPermission.objects.filter(filters).delete()
GroupObjectPermission.objects.filter(filters).delete()
if instance.profile:
instance.profile.delete()
if instance.preference:
for i in instance.preference.all():
i.delete()
def register(self, model, adapter_cls=VersionAdapter, **field_overrides):
"""Registers a model with this revision manager."""
# Prevent multiple registration.
if self.is_registered(model):
raise RegistrationError("{model} has already been registered with django-reversion".format(
model = model,
))
# Prevent proxy models being registered.
if model._meta.proxy:
raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead")
# Perform any customization.
if field_overrides:
adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides)
# Perform the registration.
adapter_obj = adapter_cls(model)
self._registered_models[model] = adapter_obj
# Connect to the post save signal of the model.
post_save.connect(self._post_save_receiver, model)
pre_delete.connect(self._pre_delete_receiver, model)
def __str__(self):
return '%s: %s' % self.user_id,self.user.username
# def create_user_profile(sender, instance, created, **kwargs):
# if created:
# profile = Profile()
# profile.user = instance
# profile.save()
#
# post_save.connect(create_user_profile, sender=User)
def __init__(self, *args, **kwargs):
super(DirtyFieldsMixin, self).__init__(*args, **kwargs)
post_save.connect(
reset_state, sender=self.__class__,
dispatch_uid='%s-DirtyFieldsMixin-sweeper' %
self.__class__.__name__,
)
reset_state(sender=self.__class__, instance=self)
def setup(self):
for model in self.register_models:
post_save.connect(self.handle_save, sender=model)
post_delete.connect(self.handle_delete, sender=model)
def register(model):
"""Register a model to the audit code.
:param model: Model to register.
:type model: object
"""
try:
pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model))
post_save.connect(_post_save, sender=model, dispatch_uid=str(model))
pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model))
except Exception as e:
logger.error("<Register> %s", e.message)
def get_etcd_client():
if not hasattr(get_etcd_client, "client"):
# wire up etcd publishing if we can connect
try:
get_etcd_client.client = etcd.Client(
host=settings.ETCD_HOST,
port=int(settings.ETCD_PORT))
get_etcd_client.client.get('/deis')
except etcd.EtcdException:
logger.log(logging.WARNING, 'Cannot synchronize with etcd cluster')
get_etcd_client.client = None
return get_etcd_client.client
def get_etcd_client():
if not hasattr(get_etcd_client, "client"):
# wire up etcd publishing if we can connect
try:
get_etcd_client.client = etcd.Client(
host=settings.ETCD_HOST,
port=int(settings.ETCD_PORT))
get_etcd_client.client.get('/deis')
except etcd.EtcdException:
logger.log(logging.WARNING, 'Cannot synchronize with etcd cluster')
get_etcd_client.client = None
return get_etcd_client.client
def test_delete_item(self):
# add value handler
def handler(urls, **kwarg):
item = self._search_name(urls, 'Campaigns')
self.assertIsNotNone(item)
urls.remove(item)
make_menu.connect(handler)
# test
item = self.get('Campaigns')
self.assertIsNone(item)
# clean
self.assertTrue(make_menu.disconnect(handler))
def test_attachment_executed_with_data(self):
def handler(instance, **kwarg):
# get post on landing page event
if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
# ignore other target event
return
self.assertEqual(instance.ip, '127.0.0.1')
self.assertEqual(instance.user_agent, 'Thunderbird')
self.assertEqual(instance.raw, '{"hello": "world!"}')
raise SuccessException()
post_save.connect(handler, sender=TrackerInfos)
# call tracker
self.send_campaign()
attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
tracker_url = attachment['tracker_url']
# test if handler has call
with self.assertRaises(SuccessException):
self.client.defaults['HTTP_USER_AGENT'] = 'Thunderbird'
self.client.post(tracker_url, {'hello': 'world!'})
# clean
self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_email_send(self):
def handler(instance, **kwarg):
# get email open event
if instance.key != TRACKER_EMAIL_SEND:
# ignore other target event
return
self.assertEqual(instance.value, 'success')
# add entry on db for set test in success => SuccessException make
# infinite loop
TrackerInfos.objects.create(
target_tracker=instance,
raw='handler_%s_test_ok' % TRACKER_EMAIL_SEND
)
post_save.connect(handler, sender=Tracker)
# send mail
campaign = self.send_campaign()
# test if handler has call
raw = 'handler_%s_test_ok' % TRACKER_EMAIL_SEND
test_result_tracker = TrackerInfos.objects.filter(raw=raw).first()
self.assertIsNotNone(test_result_tracker)
tracker = test_result_tracker.target_tracker
self.assertEqual(campaign.pk, tracker.campaign.pk)
self.assertEqual(TRACKER_EMAIL_SEND, tracker.key)
# clean
self.assertTrue(post_save.disconnect(handler, sender=Tracker))
def test_landing_page_post(self):
def handler(instance, **kwarg):
# get post on landing page event
if instance.target_tracker.key != TRACKER_LANDING_PAGE_POST:
# ignore other target event
return
self.assertEqual(instance.ip, '127.0.0.1')
self.assertEqual(instance.user_agent, 'Firefox')
self.assertEqual(instance.referer, 'https://webmail.com')
self.assertIn('"login": "Admin"', instance.raw)
raise SuccessException()
post_save.connect(handler, sender=TrackerInfos)
# call tracker
self.send_campaign()
tracker_url = mail.outbox[-1].body
# call landing page
response = self.client.get(tracker_url)
self.assertEqual(response.status_code, 200)
# get form infos
html = response.content.decode()
form = BeautifulSoup(html, 'html.parser').find('form')
post_url = form.get('action')
post_data = {i.get('name'): i.get('value')
for i in form.find_all('input')}
post_data['login'] = 'Admin'
# test if handler has call
with self.assertRaises(SuccessException):
self.client.defaults['HTTP_USER_AGENT'] = 'Firefox'
self.client.defaults['HTTP_REFERER'] = 'https://webmail.com'
self.client.post(post_url, post_data)
# clean
self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))