def process_request(self, request):
"""
Gets the current user from the request and prepares and connects a signal receiver with the user already
attached to it.
"""
# Initialize thread local storage
threadlocal.actionslog = {
'signal_duid': (self.__class__, time.time()),
'remote_ip': request.META.get('REMOTE_ADDR'),
}
# In case of proxy, set 'original' address
if request.META.get('HTTP_X_FORWARDED_FOR'):
threadlocal.actionslog['remote_ip'] = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0]
# Connect signal for automatic logging
if hasattr(request, 'user') and hasattr(request.user, 'is_authenticated') and request.user.is_authenticated():
set_user = curry(self.set_user, request.user)
pre_save.connect(set_user, sender=LogAction, dispatch_uid=threadlocal.actionslog['signal_duid'], weak=False)
python类connect()的实例源码
def test_delete_key_old_way(env):
# if pytest.config.getoption('--ci'):
# pytest.skip('does not work in CI environments')
from django_remote_submission.wrapper.remote import RemoteWrapper
wrapper = RemoteWrapper(
hostname=env.server_hostname,
username=env.remote_user,
port=env.server_port,
)
public_key_filename = os.path.expanduser('~/.ssh/id_rsa.pub')
# Connect with password drop the key
with wrapper.connect(env.remote_password, public_key_filename):
wrapper.deploy_key_if_it_does_not_exist()
# Connect without password
with wrapper.connect():
pass
# delete the they
with wrapper.connect(env.remote_password):
wrapper.delete_key()
def create_referral(sender, request, user, *args, **kwargs):
"""
Get a ReferralLink ID based on clicked redirect.
Use ID and created User to connect created User
to ReferralLink User.
"""
new_personal_link = ReferralLink.objects.get_or_create(user=user)[0]
referral_link_id = request.session.get("referral_link_id")
created_user = user
if referral_link_id:
been_referred = ReferredUser.objects.filter(invitee=created_user).exists()
ref_qs = ReferralLink.objects.filter(id=referral_link_id)
if not been_referred and ref_qs.exists():
_link_obj = ref_qs.first()
new_referral_obj = ReferredUser()
new_referral_obj.invitee = created_user
new_referral_obj.referrer = _link_obj.user
new_referral_obj.code = _link_obj
new_referral_obj.save()
del request.session["referral_link_id"]
if COMPLETED_MESSAGE_DISPLAY == True:
complete_msg = clean_message("COMPLETED_MESSAGE", COMPLETED_MESSAGE)
messages.success(request, complete_msg)
def fix_model_for_attributes(cls):
post_init.connect(fixup_instance, sender=cls)
pre_save.connect(attribute_model_pre_save, sender=cls)
return cls
def register(model):
"""Register a model to the audit code.
:param model: Model to register.
:type model: object
"""
try:
pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model))
post_save.connect(_post_save, sender=model, dispatch_uid=str(model))
pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model))
except Exception as e:
logger.error("<Register> %s", e.message)
def register_pre_save_on_AUTH_USER_MODER_change(sender, setting, value, enter, **kwargs):
if setting == "AUTH_USER_MODEL" and value != USER_MODEL:
if enter:
pre_save.connect(useraudit.password_expiry.user_pre_save, sender=value)
else:
pre_save.disconnect(useraudit.password_expiry.user_pre_save, sender=value)
def test_signal(self):
def handler(sender, user=None, **kwargs):
self.handler_called = True
self.assertEquals(sender, type(self.user))
self.assertEquals(user, self.user)
login_failure_limit_reached.connect(handler)
self.handler_called = False
_ = authenticate(username=self.username, password="INCORRECT")
_ = authenticate(username=self.username, password="INCORRECT")
login_failure_limit_reached.disconnect(handler)
self.assertTrue(self.handler_called)
def contribute_to_class(self, cls, name, virtual_only=False):
super().contribute_to_class(cls, name, virtual_only=False)
setattr(cls, '_fsm_state_field', name)
pre_save.connect(
self._fsm_check_transitions, sender=cls,
dispatch_uid='_fsm_check_transitions')
def register_signals():
pre_save.connect(LostKey.objects.on_pre_save, sender=LostKey)
post_save.connect(LostKey.objects.send_notification, sender=LostKey)
post_save.connect(
Member.objects.send_transfer_workspace_key_info, sender=Member)
def ready(self):
from icekit.plugins.oembed_with_caption.models import OEmbedWithCaptionItem
pre_save.connect(handle_soundcloud_malformed_widths_for_oembeds,
sender=OEmbedWithCaptionItem)
def ready(self):
from fluent_contents.plugins.oembeditem.models import OEmbedItem
pre_save.connect(handle_soundcloud_malformed_widths_for_oembeds, sender=OEmbedItem)
def job_model_saved(mocker):
from django.db.models.signals import pre_save
from django_remote_submission.models import Job
mock = mocker.Mock()
pre_save.connect(mock, sender=Job)
yield mock
pre_save.disconnect(mock, sender=Job)
def connect(cls):
pre_save.connect(on_pre_save, cls)
post_save.connect(on_post_save, cls)
def cleanup(cls):
uid = cls.__name__
pre_delete.connect(receiver=on_delete, sender=cls, dispatch_uid=uid)
pre_save.connect(receiver=on_save, sender=cls, dispatch_uid=uid)
return cls
def _get_latest_changeset(self):
try:
return self.change_sets.latest()
except ChangeSet.DoesNotExist:
return None
# connect the `pre_save` event to the subscribers for tracking changes. we make use of `dispatch_uid` so the
# event is not connected twice.
def register():
pre_save.connect(check_status_change, sender=Segment)
def _run_listen():
post_save.connect(run_watchers.post_run_saved, sender=TestRun)
post_save.connect(run_watchers.post_case_run_saved, sender=TestCaseRun,
dispatch_uid='tcms.testruns.models.TestCaseRun')
post_delete.connect(run_watchers.post_case_run_deleted, sender=TestCaseRun,
dispatch_uid='tcms.testruns.models.TestCaseRun')
pre_save.connect(run_watchers.pre_save_clean, sender=TestRun)
def _listen():
post_save.connect(plan_watchers.on_plan_save, TestPlan)
pre_delete.connect(plan_watchers.on_plan_delete, TestPlan)
pre_save.connect(plan_watchers.pre_save_clean, TestPlan)
def _listen():
""" signals listen """
# case save/delete listen for email notify
post_save.connect(case_watchers.on_case_save, TestCase)
post_delete.connect(case_watchers.on_case_delete, TestCase)
pre_save.connect(case_watchers.pre_save_clean, TestCase)
def contribute_to_class(self, cls, name, *args, **kwargs):
super(SwapIntegerField, self).contribute_to_class(cls, name)
for constraint in cls._meta.unique_together:
if self.name in constraint:
raise TypeError("%s can't be part of a unique constraint." % self.__class__.__name__)
pre_save.connect(self.get_swap_objects, sender=cls)
post_save.connect(self.save_swap_objects, sender=cls)
def test_deploy_and_delete_key(env):
'''
This is the new way of deploying and deleting the private key
'''
from django_remote_submission.tasks import (
copy_key_to_server,
delete_key_from_server
)
from django_remote_submission.wrapper.remote import RemoteWrapper
copy_key_to_server(
username=env.remote_user,
password=env.remote_password,
hostname=env.server_hostname,
port=env.server_port,
public_key_filename=None,
remote=runs_remotely,
)
# This wrapper is just for testing
# Note that no password is passed!
wrapper = RemoteWrapper(
hostname=env.server_hostname,
username=env.remote_user,
port=env.server_port,
)
# Connect without password
with wrapper.connect():
pass
delete_key_from_server(
username=env.remote_user,
password=env.remote_password,
hostname=env.server_hostname,
port=env.server_port,
public_key_filename=None,
remote=runs_remotely,
)
with pytest.raises(ValueError, message="incorrect public key"):
# Connect without password fails!
with wrapper.connect():
pass