def test_attach_avatars(self):
"""
It should attach robotic avatars given a username prefix
"""
with mute_signals(post_save):
for i in range(3):
ProfileFactory.create(user__username="user_{}".format(i))
for i in range(3):
ProfileFactory.create(user__username="fake_{}".format(i))
# clear all images
for profile in Profile.objects.all():
profile.image = None
profile.image_medium = None
profile.image_small = None
profile.save()
self.command.handle("attach_avatars", username_prefix="fake")
for profile in Profile.objects.all():
has_image = profile.user.username.startswith("fake")
assert bool(profile.image) == has_image
assert bool(profile.image_medium) == has_image
assert bool(profile.image_small) == has_image
python类post_save()的实例源码
def deserialize_user_data(user_data, programs):
"""
Deserializes a dict of mixed User/Profile data and returns the newly-inserted User
"""
username = FAKE_USER_USERNAME_PREFIX + user_data['email'].split('@')[0]
user = deserialize_model_data(User, user_data, username=username)
# Create social username
user.social_auth.create(
provider=EdxOrgOAuth2.name,
uid=user.username,
)
# The user data is generated in this script with mute_signals(post_save)
# so we need to create the profile explicitly.
profile = deserialize_model_data(Profile, user_data, user=user, fake_user=True)
deserialize_profile_detail_data(profile, Employment, user_data['work_history'])
deserialize_profile_detail_data(profile, Education, user_data['education'])
deserialize_dashboard_data(user, user_data, programs)
ensure_cached_data_freshness(user)
return user
def test_context_has_role(self, role):
"""
Assert context values when staff or instructor which are different than for regular logged in user
"""
with mute_signals(post_save):
profile = ProfileFactory.create()
Role.objects.create(
role=role,
program=self.program_page.program,
user=profile.user,
)
self.client.force_login(profile.user)
response = self.client.get(self.program_page.url)
assert response.context['authenticated'] is True
assert response.context['username'] is None
assert response.context['is_staff'] is True
def test_get_token_for_user(settings, mocker):
"""
Assert that get_token_for_user returns a token for a valid DiscussionUser
"""
with mute_signals(post_save):
user = UserFactory.create()
settings.OPEN_DISCUSSIONS_JWT_SECRET = 'secret'
settings.OPEN_DISCUSSIONS_JWT_EXPIRES_DELTA = 3600
mock_get_token = mocker.patch('open_discussions_api.utils.get_token')
DiscussionUser.objects.create(user=user, username='username')
assert get_token_for_user(user) is not None
mock_get_token.assert_called_once_with(
'secret',
'username',
[],
expires_delta=3600,
extra_payload={
'auth_url': None,
'session_url': None
}
)
def test_create_discussion_user(mock_staff_client):
"""Verify create_discussion_user makes the correct API calls"""
mock_response = mock_staff_client.users.create.return_value
mock_response.status_code = 201
mock_response.json.return_value = {
'username': 'username'
}
with mute_signals(post_save):
profile = ProfileFactory.create()
discussion_user = DiscussionUser.objects.create(user=profile.user)
api.create_discussion_user(discussion_user)
assert discussion_user.username == 'username'
mock_staff_client.users.create.assert_called_once_with(
name=profile.full_name,
image=profile.image.url,
image_small=profile.image_small.url,
image_medium=profile.image_medium.url,
)
def test_update_discussion_user(mock_staff_client):
"""Verify update_discussion_user makes the correct API calls"""
mock_response = mock_staff_client.users.update.return_value
mock_response.status_code = 200
mock_response.json.return_value = {
'username': 'username'
}
with mute_signals(post_save):
profile = ProfileFactory.create()
discussion_user = DiscussionUser.objects.create(user=profile.user, username='username')
api.update_discussion_user(discussion_user)
mock_staff_client.users.update.assert_called_once_with(
discussion_user.username,
name=profile.full_name,
image=profile.image.url,
image_small=profile.image_small.url,
image_medium=profile.image_medium.url,
)
def setUpTestData(cls):
with mute_signals(post_save):
cls.learner1 = UserFactory.create()
cls.learner1_username = 'learner1_username'
cls.learner1.social_auth.create(
provider=EdxOrgOAuth2.name,
uid=cls.learner1_username
)
cls.learner2 = UserFactory.create()
cls.learner2_username = 'learner2_username'
cls.learner2.social_auth.create(
provider=EdxOrgOAuth2.name,
uid=cls.learner2_username
)
cls.program = ProgramFactory.create()
cls.staff = UserFactory.create()
cls.staff_username = 'staff_username'
for learner in (cls.learner1, cls.learner2):
ProgramEnrollment.objects.create(
program=cls.program,
user=learner,
)
def setUpTestData(cls):
with mute_signals(post_save):
cls.profile = ProfileFactory.create()
cls.program, _ = create_program(past=True)
cls.course_run = cls.program.course_set.first().courserun_set.first()
CachedCurrentGradeFactory.create(
user=cls.profile.user,
course_run=cls.course_run,
data={
"passed": True,
"percent": 0.9,
"course_key": cls.course_run.edx_course_key,
"username": cls.profile.user.username
}
)
CachedCertificateFactory.create(user=cls.profile.user, course_run=cls.course_run)
cls.exam_run = ExamRunFactory.create(
course=cls.course_run.course,
date_first_schedulable=now_in_utc() - timedelta(days=1),
)
def test_write_ead_file(self):
"""
Tests that write_ead_file outputs correctly
"""
kwargs = {
'id': 143,
'operation': 'add',
'exam_run__exam_series_code': 'MM-DEDP',
'exam_run__date_first_eligible': date(2016, 5, 15),
'exam_run__date_last_eligible': date(2016, 10, 15),
}
with mute_signals(post_save):
profile = ProfileFactory(id=14879)
exam_auths = [ExamAuthorizationFactory.create(user=profile.user, **kwargs)]
exam_auths[0].updated_on = FIXED_DATETIME
self.ead_writer.write(self.tsv_file, exam_auths)
assert self.tsv_rows[0] == (
"add\t143\t"
"14879\tMM-DEDP\t\t"
"\t2016/05/15\t2016/10/15\t" # accommodation blank intentionally
"2016/05/15 15:02:55"
)
def setUpTestData(cls):
with mute_signals(post_save):
profile = ProfileFactory.create()
cls.program, _ = create_program(past=True)
cls.user = profile.user
cls.course_run = cls.program.course_set.first().courserun_set.first()
with mute_signals(post_save):
CachedEnrollmentFactory.create(user=cls.user, course_run=cls.course_run)
cls.exam_run = ExamRunFactory.create(course=cls.course_run.course)
with mute_signals(post_save):
cls.final_grade = FinalGradeFactory.create(
user=cls.user,
course_run=cls.course_run,
passed=True,
course_run_paid_on_edx=False,
)
def test_successful_program_certificate_generation(self):
"""
Test has final grade and a certificate
"""
final_grade = FinalGradeFactory.create(
user=self.user,
course_run=self.run_1,
passed=True,
status='complete',
grade=0.8
)
CourseRunGradingStatus.objects.create(course_run=self.run_1, status='complete')
with mute_signals(post_save):
MicromastersCourseCertificate.objects.create(final_grade=final_grade)
cert_qset = MicromastersProgramCertificate.objects.filter(user=self.user, program=self.program)
assert cert_qset.exists() is False
api.generate_program_certificate(self.user, self.program)
assert cert_qset.exists() is True
def test_upload_to(self):
"""
Image upload_to should have a function which creates a path
"""
# pin the timestamps used in creating the URL
with patch('profiles.util.now_in_utc', autospec=True) as mocked_now_in_utc:
mocked_now_in_utc.return_value = now_in_utc()
with mute_signals(post_save):
profile = ProfileFactory.create()
assert profile.image.url.endswith(profile_image_upload_uri(None, "example.jpg").replace("+", ""))
assert profile.image_small.url.endswith(
profile_image_upload_uri_small(None, "example.jpg").replace("+", "")
)
assert profile.image_medium.url.endswith(
profile_image_upload_uri_medium(None, "example.jpg").replace("+", "")
)
def test_update_education(self):
"""
Test that we handle updating an Education correctly
"""
with mute_signals(post_save):
education = EducationFactory.create()
education_data = EducationSerializer(education).data
education_data['degree_name'] = BACHELORS
serializer = ProfileSerializer(instance=education.profile, data={
'education': [education_data], 'work_history': []
})
serializer.is_valid(raise_exception=True)
serializer.save()
assert education.profile.education.count() == 1
education = education.profile.education.first()
assert EducationSerializer(education).data == education_data
def test_delete_education(self):
"""
Test that we delete Educations which aren't specified in the PATCH
"""
with mute_signals(post_save):
education1 = EducationFactory.create()
EducationFactory.create(profile=education1.profile)
# has a different profile
education3 = EducationFactory.create()
assert education1.profile.education.count() == 2
education_object1 = EducationSerializer(education1).data
serializer = ProfileSerializer(instance=education1.profile, data={
'education': [education_object1], 'work_history': []
})
serializer.is_valid(raise_exception=True)
serializer.save()
assert education1.profile.education.count() == 1
assert education1.profile.education.first() == education1
# Other profile is unaffected
assert education3.profile.education.count() == 1
def test_update_employment(self):
"""
Test that we handle updating an employment correctly
"""
with mute_signals(post_save):
employment = EmploymentFactory.create()
employment_object = EmploymentSerializer(employment).data
employment_object['position'] = "SE"
serializer = ProfileSerializer(instance=employment.profile, data={
'work_history': [employment_object], 'education': []
})
serializer.is_valid(raise_exception=True)
serializer.save()
assert employment.profile.work_history.count() == 1
employment = employment.profile.work_history.first()
assert EmploymentSerializer(employment).data == employment_object
def test_update_employment_different_profile(self):
"""
Make sure we can't edit an employment for a different profile
"""
with mute_signals(post_save):
employment1 = EmploymentFactory.create()
employment2 = EmploymentFactory.create()
employment_object = EmploymentSerializer(employment1).data
employment_object['id'] = employment2.id
serializer = ProfileSerializer(instance=employment1.profile, data={
'work_history': [employment_object], 'education': []
})
serializer.is_valid(raise_exception=True)
with self.assertRaises(ValidationError) as ex:
serializer.save()
assert ex.exception.detail == ["Work history {} does not exist".format(employment2.id)]
def test_delete_employment(self):
"""
Test that we delete employments which aren't specified in the PATCH
"""
with mute_signals(post_save):
employment1 = EmploymentFactory.create()
EmploymentFactory.create(profile=employment1.profile)
# has a different profile
employment3 = EmploymentFactory.create()
assert employment1.profile.work_history.count() == 2
employment_object1 = EmploymentSerializer(employment1).data
serializer = ProfileSerializer(instance=employment1.profile, data={
'work_history': [employment_object1], 'education': []
})
serializer.is_valid(raise_exception=True)
serializer.save()
assert employment1.profile.work_history.count() == 1
assert employment1.profile.work_history.first() == employment1
# Other profile is unaffected
assert employment3.profile.work_history.count() == 1
def test_vermm_user_get_public_to_mm_profile(self):
"""
A verified mm user gets user's public_to_mm profile.
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user2, account_privacy=Profile.PUBLIC_TO_MM)
ProfileFactory.create(user=self.user1, verified_micromaster_user=True)
program = ProgramFactory.create()
ProgramEnrollment.objects.create(
program=program,
user=self.user2,
)
ProgramEnrollment.objects.create(
program=program,
user=self.user1,
)
profile_data = ProfileLimitedSerializer(profile).data
self.client.force_login(self.user1)
resp = self.client.get(self.url2)
assert resp.json() == format_image_expectation(profile_data)
def test_instructor_sees_entire_profile(self):
"""
An instructor should be able to see the entire profile despite the account privacy
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user2, account_privacy=Profile.PRIVATE)
ProfileFactory.create(user=self.user1, verified_micromaster_user=False)
program = ProgramFactory.create()
ProgramEnrollment.objects.create(
program=program,
user=profile.user,
)
Role.objects.create(
program=program,
role=Instructor.ROLE_ID,
user=self.user1,
)
self.client.force_login(self.user1)
profile_data = ProfileSerializer(profile).data
resp = self.client.get(self.url2)
assert resp.json() == format_image_expectation(profile_data)
def test_staff_sees_entire_profile(self):
"""
Staff should be able to see the entire profile despite the account privacy
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user2, account_privacy=Profile.PRIVATE)
ProfileFactory.create(user=self.user1, verified_micromaster_user=False)
program = ProgramFactory.create()
ProgramEnrollment.objects.create(
program=program,
user=profile.user,
)
Role.objects.create(
program=program,
role=Staff.ROLE_ID,
user=self.user1,
)
self.client.force_login(self.user1)
resp = self.client.get(self.url2)
profile_data = ProfileSerializer(profile).data
assert resp.json() == format_image_expectation(profile_data)
def test_serializer(self):
"""
Get a user's own profile, ensure that we used ProfileSerializer and not ProfileFilledOutSerializer
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1, filled_out=False)
self.client.force_login(self.user1)
patch_data = ProfileSerializer(profile).data
# PATCH may not succeed, we just care that the right serializer was used
with patch(
'profiles.views.ProfileFilledOutSerializer.__new__',
autospec=True,
side_effect=ProfileFilledOutSerializer.__new__
) as mocked_filled_out, patch(
'profiles.views.ProfileSerializer.__new__',
autospec=True,
side_effect=ProfileSerializer.__new__
) as mocked:
self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data))
assert mocked.called
assert not mocked_filled_out.called
def test_filled_out_serializer(self):
"""
Get a user's own profile, ensure that we used ProfileFilledOutSerializer
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1, filled_out=True)
self.client.force_login(self.user1)
patch_data = ProfileSerializer(profile).data
# PATCH may not succeed, we just care that the right serializer was used
with patch(
'profiles.views.ProfileFilledOutSerializer.__new__',
autospec=True,
side_effect=ProfileFilledOutSerializer.__new__
) as mocked:
self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data))
assert mocked.called
def test_upload_image(self):
"""
An image upload should not delete education or work history entries
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1)
EducationFactory.create(profile=profile)
EmploymentFactory.create(profile=profile)
self.client.force_login(self.user1)
# create a dummy image file in memory for upload
image_file = BytesIO()
image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
image.save(image_file, 'png')
image_file.seek(0)
# format patch using multipart upload
resp = self.client.patch(self.url1, data={
'image': image_file
}, format='multipart')
assert resp.status_code == 200, resp.content.decode('utf-8')
assert profile.education.count() == 1
assert profile.work_history.count() == 1
def test_presave_removes_current_role(self):
"""
Updating the role in the model triggers a pre_save
signal that removes the previous role from the user.
"""
mm_role = Role.objects.create(
program=self.program,
user=self.user,
role='staff',
)
self.assert_standard_role_permissions(True)
# muting the post_save signal to avoid the reassignment of the roles and related permissions
# in this way only the pre_save will run and the effect will be only to remove the old role
with mute_signals(post_save):
mm_role.role = 'instructor'
mm_role.save()
self.assert_standard_role_permissions(False)
def setUpTestData(cls):
with mute_signals(post_save):
profile = ProfileFactory.create()
cls.user = profile.user
cls.program = ProgramFactory.create()
Role.objects.create(
user=cls.user,
program=cls.program,
role=Staff.ROLE_ID
)
with mute_signals(post_save):
profile = ProfileFactory.create(email_optin=True, filled_out=True)
profile2 = ProfileFactory.create(email_optin=False, filled_out=True)
cls.learner = profile.user
cls.learner2 = profile2.user
# self.user with role staff on program
for user in [cls.learner, cls.learner2, cls.user]:
ProgramEnrollmentFactory(
user=user,
program=cls.program,
)
def test_update_percolate_memberships(self, source_type, is_member, query_matches, mock_on_commit):
"""
Tests that existing memberships are updated where appropriate
"""
with mute_signals(post_save):
query = PercolateQueryFactory.create(source_type=source_type)
profile = ProfileFactory.create(filled_out=True)
program_enrollment = ProgramEnrollmentFactory.create(user=profile.user)
membership = PercolateQueryMembershipFactory.create(
user=profile.user,
query=query,
is_member=is_member,
needs_update=False
)
with patch(
'search.api._search_percolate_queries',
return_value=[query.id] if query_matches else []
) as search_percolate_queries_mock:
update_percolate_memberships(profile.user, source_type)
search_percolate_queries_mock.assert_called_once_with(program_enrollment)
membership.refresh_from_db()
assert membership.needs_update is (is_member is not query_matches)
def test_index_program_enrolled_users_missing_profiles(self, mock_on_commit):
"""
Test that index_program_enrolled_users doesn't index users missing profiles
"""
with mute_signals(post_save):
program_enrollments = [ProgramEnrollmentFactory.build() for _ in range(10)]
with patch(
'search.indexing_api._index_chunk', autospec=True, return_value=0
) as index_chunk, patch(
'search.indexing_api.serialize_program_enrolled_user',
autospec=True,
side_effect=lambda x: None # simulate a missing profile
) as serialize_mock, patch(
'search.indexing_api.serialize_public_enrolled_user', autospec=True, side_effect=lambda x: x
) as serialize_public_mock:
index_program_enrolled_users(program_enrollments)
assert index_chunk.call_count == 0
assert serialize_public_mock.call_count == 0
assert serialize_mock.call_count == len(program_enrollments)
def setUpTestData(cls):
super().setUpTestData()
cls.program_enrollment_unsent = ProgramEnrollmentFactory.create()
cls.program_enrollment_sent = ProgramEnrollmentFactory.create()
cls.automatic_email = AutomaticEmailFactory.create(enabled=True)
cls.percolate_query = cls.automatic_email.query
cls.other_query = PercolateQueryFactory.create(source_type=PercolateQuery.DISCUSSION_CHANNEL_TYPE)
cls.percolate_queries = [cls.percolate_query, cls.other_query]
cls.automatic_email_disabled = AutomaticEmailFactory.create(enabled=False)
cls.percolate_query_disabled = cls.automatic_email_disabled.query
SentAutomaticEmail.objects.create(
automatic_email=cls.automatic_email,
user=cls.program_enrollment_sent.user,
status=SentAutomaticEmail.SENT,
)
# User was sent email connected to a different AutomaticEmail
SentAutomaticEmail.objects.create(
user=cls.program_enrollment_unsent.user,
automatic_email=AutomaticEmailFactory.create(enabled=True),
status=SentAutomaticEmail.SENT,
)
with mute_signals(post_save):
cls.staff_user = UserFactory.create()
def post_save_handler(sender, instance, **kwargs):
"""
Called when any of models saved or updated.
"""
logger.info(
'Singal sync on post_save from django to google fusiontables'
' for model: %s and row id: %s',
sender._meta.db_table,
instance.id
)
try:
kft = _get_kft_instance(sender=sender, raw=kwargs.get('raw'))[1]
if not instance._ft_id:
kft.insert_rows(sender=sender, row_id=instance.id)
else:
kft.update_rows(sender=sender, row_id=instance._ft_id)
except SkipException as exc:
logger.debug("Skip synchronization: %s", exc.args)
# else:
# raise CannotCreateInstanceException(
# "Internal error: Cannot create kfusiontables instance."
# )
def ready(self):
from django.db.models.signals import post_save, post_delete
from devour.django.receivers import produce_post_save, produce_post_delete
from django.conf import settings
import os
signal_types = {
'post_save': produce_post_save,
'post_delete': produce_post_delete
}
if getattr(settings, 'KAFKA_CONFIG', None):
try:
excl = settings.KAFKA_CONFIG['producers'].get('exclude')
except:
#TODO: Error handling
excl = []
pass
for s,f in signal_types.items():
if next((True for ex in excl if s == ex), False):
del signal_types[s]
for sig in signal_types.keys():
getattr(signals, sig).connect(signal_types[sig])