def test__get_in_use_ports(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
kubernetes = self.useFixture(k_fix.MockK8sClient()).client
pod = get_pod_obj()
port_id = 'f2c1b73a-6a0c-4dca-b986-0d07d09e0c02'
versioned_object = jsonutils.dumps({
'versioned_object.data': {
'active': True,
'address': 'fa:16:3e:ef:e6:9f',
'id': port_id
}})
pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = (
versioned_object)
items = [pod]
kubernetes.get.return_value = {'items': items}
resp = cls._get_in_use_ports(m_driver)
self.assertEqual(resp, [port_id])
python类data()的实例源码
def setUpTestData(cls):
cls.user = UserFactory.create()
# Create Programs, Courses, CourseRuns...
cls.p1_course_run_keys = ['p1_course_run']
cls.p2_course_run_keys = ['p2_course_run_1', 'p2_course_run_2']
cls.p1_course_run = CourseRunFactory.create(edx_course_key=cls.p1_course_run_keys[0])
p2 = FullProgramFactory.create()
first_course = p2.course_set.first()
extra_course = CourseFactory.create(program=p2)
cls.p2_course_run_1 = CourseRunFactory.create(course=first_course, edx_course_key=cls.p2_course_run_keys[0])
cls.p2_course_run_2 = CourseRunFactory.create(course=extra_course, edx_course_key=cls.p2_course_run_keys[1])
all_course_runs = [cls.p1_course_run, cls.p2_course_run_1, cls.p2_course_run_2]
# Create cached edX data
cls.enrollments = [
CachedEnrollmentFactory.create(user=cls.user, course_run=course_run) for course_run in all_course_runs
]
cls.certificates = [
CachedCertificateFactory.create(user=cls.user, course_run=course_run) for course_run in all_course_runs
]
cls.current_grades = [
CachedCurrentGradeFactory.create(user=cls.user, course_run=course_run) for course_run in all_course_runs
]
def test_course_with_proctorate_exam(
self, mock_schedulable, mock_format, mock_grade, mock_get_cert, mock_future_exams, mock_has_to_pay):
"""
Test with proctorate exam results
"""
for _ in range(3):
ProctoredExamGradeFactory.create(user=self.user, course=self.course_noruns)
proct_exam_qset = ProctoredExamGrade.for_user_course(user=self.user, course=self.course_noruns)
serialized_proct_exams = ProctoredExamGradeSerializer(proct_exam_qset, many=True).data
self.mmtrack.get_course_proctorate_exam_results.return_value = serialized_proct_exams
self.assert_course_equal(
self.course_noruns,
api.get_info_for_course(self.course_noruns, self.mmtrack),
proct_exams=serialized_proct_exams
)
assert mock_format.called is False
assert mock_schedulable.call_count == 1
assert mock_has_to_pay.call_count == 1
assert mock_future_exams.call_count == 1
assert mock_get_cert.call_count == 1
assert mock_grade.call_count == 1
self.mmtrack.get_course_proctorate_exam_results.assert_called_once_with(self.course_noruns)
def test_format(self, mock_cache_refresh):
"""Test that get_user_program_info fetches edx data and returns a list of Program data"""
result = api.get_user_program_info(self.user, self.edx_client)
assert mock_cache_refresh.call_count == len(CachedEdxDataApi.SUPPORTED_CACHES)
for cache_type in CachedEdxDataApi.SUPPORTED_CACHES:
mock_cache_refresh.assert_any_call(self.user, self.edx_client, cache_type)
assert isinstance(result, dict)
assert 'is_edx_data_fresh' in result
assert result['is_edx_data_fresh'] is False
assert 'programs' in result
assert len(result['programs']) == 2
for i in range(2):
expected = {
"id": self.expected_programs[i].id,
"description": self.expected_programs[i].description,
"title": self.expected_programs[i].title,
"financial_aid_availability": self.expected_programs[i].financial_aid_availability,
}
assert is_subset_dict(expected, result['programs'][i])
def test_enrollment(self, mock_refresh, mock_edx_enr, mock_index): # pylint: disable=unused-argument
"""
Test for happy path
"""
cache_enr = CachedEnrollment.objects.filter(
user=self.user, course_run__edx_course_key=self.course_id).first()
assert cache_enr is None
enr_json = {'course_details': {'course_id': self.course_id}}
enrollment = Enrollment(enr_json)
mock_edx_enr.return_value = enrollment
resp = self.client.post(self.url, {'course_id': self.course_id}, format='json')
assert resp.status_code == status.HTTP_200_OK
assert mock_edx_enr.call_count == 1
assert mock_edx_enr.call_args[0][1] == self.course_id
assert resp.data == enr_json
mock_index.delay.assert_called_once_with([self.user.id], check_if_changed=True)
cache_enr = CachedEnrollment.objects.filter(
user=self.user, course_run__edx_course_key=self.course_id).first()
assert cache_enr is not None
def test_compute_grade_for_fa(self):
"""
Tests for _compute_grade_for_fa function.
This tests that even with certificate the grade is from current_grades.
"""
run1_data = self.user_edx_data.get_run_data(self.run_fa.edx_course_key)
run2_data = self.user_edx_data.get_run_data(self.run_fa_with_cert.edx_course_key)
grade1_from_cur_grade = api._compute_grade_for_fa(run1_data)
grade2_from_cert = api._compute_grade_for_fa(run2_data)
assert isinstance(grade1_from_cur_grade, api.UserFinalGrade)
assert isinstance(grade2_from_cert, api.UserFinalGrade)
assert grade1_from_cur_grade.passed == self.current_grades.get(
self.run_fa.edx_course_key).data.get('passed')
assert grade1_from_cur_grade.grade == self.current_grades.get(
self.run_fa.edx_course_key).data.get('percent')
assert grade1_from_cur_grade.payed_on_edx is False
assert grade2_from_cert.passed is True
assert grade2_from_cert.grade == self.current_grades.get(
self.run_fa_with_cert.edx_course_key).data.get('percent')
# this is True as long as the certificate is verified
assert grade2_from_cert.payed_on_edx is True
def test_compute_grade_for_non_fa(self):
"""
Tests for _compute_grade_for_non_fa function.
This tests that only the existence of a certificate turns in a passed course.
"""
run3_data = self.user_edx_data.get_run_data(self.run_no_fa.edx_course_key)
run4_data = self.user_edx_data.get_run_data(self.run_no_fa_with_cert.edx_course_key)
grade3_from_cur_grade = api._compute_grade_for_non_fa(run3_data)
grade4_from_cert = api._compute_grade_for_non_fa(run4_data)
assert isinstance(grade3_from_cur_grade, api.UserFinalGrade)
assert isinstance(grade4_from_cert, api.UserFinalGrade)
assert grade3_from_cur_grade.passed is False
assert grade3_from_cur_grade.grade == self.current_grades.get(
self.run_no_fa.edx_course_key).data.get('percent')
# this is true if the enrollment is verified
assert grade3_from_cur_grade.payed_on_edx is True
assert grade4_from_cert.passed is True
assert grade4_from_cert.grade == self.current_grades.get(
self.run_no_fa_with_cert.edx_course_key).data.get('percent')
# this is True as long as the certificate is verified
assert grade4_from_cert.payed_on_edx is True
def test_vermm_user_get_public_to_mm_profile(self):
"""
A verified mm user gets user's public_to_mm profile.
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user2, account_privacy=Profile.PUBLIC_TO_MM)
ProfileFactory.create(user=self.user1, verified_micromaster_user=True)
program = ProgramFactory.create()
ProgramEnrollment.objects.create(
program=program,
user=self.user2,
)
ProgramEnrollment.objects.create(
program=program,
user=self.user1,
)
profile_data = ProfileLimitedSerializer(profile).data
self.client.force_login(self.user1)
resp = self.client.get(self.url2)
assert resp.json() == format_image_expectation(profile_data)
def test_staff_sees_entire_profile(self):
"""
Staff should be able to see the entire profile despite the account privacy
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user2, account_privacy=Profile.PRIVATE)
ProfileFactory.create(user=self.user1, verified_micromaster_user=False)
program = ProgramFactory.create()
ProgramEnrollment.objects.create(
program=program,
user=profile.user,
)
Role.objects.create(
program=program,
role=Staff.ROLE_ID,
user=self.user1,
)
self.client.force_login(self.user1)
resp = self.client.get(self.url2)
profile_data = ProfileSerializer(profile).data
assert resp.json() == format_image_expectation(profile_data)
def test_serializer(self):
"""
Get a user's own profile, ensure that we used ProfileSerializer and not ProfileFilledOutSerializer
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1, filled_out=False)
self.client.force_login(self.user1)
patch_data = ProfileSerializer(profile).data
# PATCH may not succeed, we just care that the right serializer was used
with patch(
'profiles.views.ProfileFilledOutSerializer.__new__',
autospec=True,
side_effect=ProfileFilledOutSerializer.__new__
) as mocked_filled_out, patch(
'profiles.views.ProfileSerializer.__new__',
autospec=True,
side_effect=ProfileSerializer.__new__
) as mocked:
self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data))
assert mocked.called
assert not mocked_filled_out.called
def test_filled_out_serializer(self):
"""
Get a user's own profile, ensure that we used ProfileFilledOutSerializer
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1, filled_out=True)
self.client.force_login(self.user1)
patch_data = ProfileSerializer(profile).data
# PATCH may not succeed, we just care that the right serializer was used
with patch(
'profiles.views.ProfileFilledOutSerializer.__new__',
autospec=True,
side_effect=ProfileFilledOutSerializer.__new__
) as mocked:
self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data))
assert mocked.called
def test_upload_image(self):
"""
An image upload should not delete education or work history entries
"""
with mute_signals(post_save):
profile = ProfileFactory.create(user=self.user1)
EducationFactory.create(profile=profile)
EmploymentFactory.create(profile=profile)
self.client.force_login(self.user1)
# create a dummy image file in memory for upload
image_file = BytesIO()
image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
image.save(image_file, 'png')
image_file.seek(0)
# format patch using multipart upload
resp = self.client.patch(self.url1, data={
'image': image_file
}, format='multipart')
assert resp.status_code == 200, resp.content.decode('utf-8')
assert profile.education.count() == 1
assert profile.work_history.count() == 1
def test_missing_fields(self):
"""
If CyberSource POSTs with fields missing, we should at least save it in a receipt.
It is very unlikely for Cybersource to POST invalid data but it also provides a way to test
that we save a Receipt in the event of an error.
"""
data = {}
for _ in range(5):
data[FAKE.text()] = FAKE.text()
with patch('ecommerce.views.IsSignedByCyberSource.has_permission', return_value=True):
try:
# Missing fields from Cybersource POST will cause the KeyError.
# In this test we just care that we saved the data in Receipt for later
# analysis.
self.client.post(reverse('order-fulfillment'), data=data)
except KeyError:
pass
assert Order.objects.count() == 0
assert Receipt.objects.count() == 1
assert Receipt.objects.first().data == data
def test_ignore_duplicate_cancel(self):
"""
If the decision is CANCEL and we already have a duplicate failed order, don't change anything.
"""
course_run, user = create_purchasable_course_run()
order = create_unfulfilled_order(course_run.edx_course_key, user)
order.status = Order.FAILED
order.save()
data = {
'req_reference_number': make_reference_id(order),
'decision': 'CANCEL',
}
with patch(
'ecommerce.views.IsSignedByCyberSource.has_permission',
return_value=True
):
resp = self.client.post(reverse('order-fulfillment'), data=data)
assert resp.status_code == status.HTTP_200_OK
assert Order.objects.count() == 1
assert Order.objects.get(id=order.id).status == Order.FAILED
def test_error_on_duplicate_order(self, order_status, decision):
"""If there is a duplicate message (except for CANCEL), raise an exception"""
course_run, user = create_purchasable_course_run()
order = create_unfulfilled_order(course_run.edx_course_key, user)
order.status = order_status
order.save()
data = {
'req_reference_number': make_reference_id(order),
'decision': decision,
}
with patch(
'ecommerce.views.IsSignedByCyberSource.has_permission',
return_value=True
), self.assertRaises(EcommerceException) as ex:
self.client.post(reverse('order-fulfillment'), data=data)
assert Order.objects.count() == 1
assert Order.objects.get(id=order.id).status == order_status
assert ex.exception.args[0] == "Order {id} is expected to have status 'created'".format(
id=order.id,
)
def test_coupon_not_redeemable(self):
"""
A 404 should be returned if coupon is not redeemable
"""
with patch(
'ecommerce.views.is_coupon_redeemable', autospec=True
) as _is_redeemable_mock:
_is_redeemable_mock.return_value = False
resp = self.client.post(
reverse('coupon-user-create', kwargs={'code': self.coupon.coupon_code}),
data={
"username": get_social_username(self.user)
},
format='json',
)
assert resp.status_code == status.HTTP_404_NOT_FOUND
_is_redeemable_mock.assert_called_with(self.coupon, self.user)
def test_staff_and_instructor_in_other_program_no_results(self, role, is_enrolled):
"""A user with staff or instructor role in another program gets no results"""
user = UserFactory.create()
Role.objects.create(
user=user,
program=self.program2,
role=role,
)
if is_enrolled:
ProgramEnrollmentFactory.create(user=user, program=self.program1)
params = {
"post_filter": {
"term": {"program.id": self.program1.id}
}
}
self.client.force_login(user)
resp = self.assert_status_code(json=params)
assert len(resp.data['hits']['hits']) == 0
def test_from(self):
"""
Test that we don't filter out the from part of the query
"""
data = {
"post_filter": {
"term": {
"program.id": self.program1.id
}
},
"from": 0,
"size": 50,
}
resp = self.assert_status_code(json=data)
user_ids_in_hits = self.get_user_ids_in_hits(resp.data['hits']['hits'])
assert len(user_ids_in_hits) == 10
# Look at second page. There should be zero hits here
data['from'] = 50
resp = self.assert_status_code(json=data)
user_ids_in_hits = self.get_user_ids_in_hits(resp.data['hits']['hits'])
assert len(user_ids_in_hits) == 0
def test_send_batch_chunk(self, mock_post):
"""
Test that MailgunClient.send_batch chunks recipients
"""
chunk_size = 10
recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
chunked_emails_to = [recipient_tuples[i:i + chunk_size] for i in range(0, len(recipient_tuples), chunk_size)]
assert len(recipient_tuples) == 52
responses = MailgunClient.send_batch('email subject', 'email body', recipient_tuples, chunk_size=chunk_size)
assert mock_post.called
assert mock_post.call_count == 6
for call_num, args in enumerate(mock_post.call_args_list):
called_args, called_kwargs = args
assert list(called_args)[0] == '{}/{}'.format(settings.MAILGUN_URL, 'messages')
assert called_kwargs['data']['text'].startswith('email body')
assert called_kwargs['data']['subject'] == 'email subject'
assert sorted(called_kwargs['data']['to']) == sorted([email for email, _ in chunked_emails_to[call_num]])
assert called_kwargs['data']['recipient-variables'] == json.dumps(
{email: context or {} for email, context in chunked_emails_to[call_num]}
)
response = responses[call_num]
assert response.status_code == HTTP_200_OK
def test_view_response_improperly_configured(self):
"""
Test that the SearchResultMailView will raise ImproperlyConfigured if mailgun returns 401, which
results in returning 500 since micromasters.utils.custom_exception_handler catches ImproperlyConfigured
"""
with patch(
'mail.views.get_all_query_matching_emails', autospec=True, return_value=self.email_results
), patch(
'mail.views.MailgunClient'
) as mock_mailgun_client, patch(
'mail.views.get_mail_vars', autospec=True, return_value=self.email_vars,
) as mock_get_mail_vars:
mock_mailgun_client.send_batch.side_effect = ImproperlyConfigured
resp = self.client.post(self.search_result_mail_url, data=self.request_data, format='json')
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
mock_get_mail_vars.assert_called_once_with(self.email_results)
def test_send_learner_email_view(self, mock_mailgun_client):
"""
Test that learner emails are correctly sent through the view
"""
self.client.force_login(self.staff_user)
mock_mailgun_client.send_individual_email.return_value = Mock(
spec=Response,
status_code=status.HTTP_200_OK,
json=mocked_json()
)
url = reverse(self.url_name, kwargs={'student_id': self.recipient_user.profile.student_id})
resp_post = self.client.post(url, data=self.request_data, format='json')
assert resp_post.status_code == status.HTTP_200_OK
assert mock_mailgun_client.send_individual_email.called
_, called_kwargs = mock_mailgun_client.send_individual_email.call_args
assert called_kwargs['subject'] == self.request_data['email_subject']
assert called_kwargs['body'] == self.request_data['email_body']
assert called_kwargs['recipient'] == self.recipient_user.email
assert called_kwargs['sender_address'] == self.staff_user.email
assert called_kwargs['sender_name'] == self.staff_user.profile.display_name
assert 'raise_for_status' not in called_kwargs
def test_send_financial_aid_view(self, mock_mailgun_client):
"""
Test that the FinancialAidMailView will accept and return expected values
"""
self.client.force_login(self.staff_user_profile.user)
mock_mailgun_client.send_financial_aid_email.return_value = Mock(
spec=Response,
status_code=status.HTTP_200_OK,
json=mocked_json()
)
resp_post = self.client.post(self.url, data=self.request_data, format='json')
assert resp_post.status_code == status.HTTP_200_OK
assert mock_mailgun_client.send_financial_aid_email.called
_, called_kwargs = mock_mailgun_client.send_financial_aid_email.call_args
assert called_kwargs['acting_user'] == self.staff_user_profile.user
assert called_kwargs['financial_aid'] == self.financial_aid
assert called_kwargs['subject'] == self.request_data['email_subject']
assert called_kwargs['body'] == self.request_data['email_body']
assert 'raise_for_status' not in called_kwargs
def test_ipam_driver_request_pool_with_default_v6pool(self,
mock_list_subnetpools):
fake_kuryr_subnetpool_id = uuidutils.generate_uuid()
fake_name = 'kuryr6'
kuryr_subnetpools = self._get_fake_v6_subnetpools(
fake_kuryr_subnetpool_id, prefixes=['fe80::/64'])
mock_list_subnetpools.return_value = {
'subnetpools': kuryr_subnetpools['subnetpools']}
fake_request = {
'AddressSpace': '',
'Pool': '',
'SubPool': '', # In the case --ip-range is not given
'Options': {},
'V6': True
}
response = self.app.post('/IpamDriver.RequestPool',
content_type='application/json',
data=jsonutils.dumps(fake_request))
self.assertEqual(200, response.status_code)
mock_list_subnetpools.assert_called_with(name=fake_name)
decoded_json = jsonutils.loads(response.data)
self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
def test_network_driver_allocate_network(self):
docker_network_id = lib_utils.get_hash()
allocate_network_request = {
'NetworkID': docker_network_id,
'IPv4Data': [{
'AddressSpace': 'foo',
'Pool': '192.168.42.0/24',
'Gateway': '192.168.42.1/24',
}],
'IPv6Data': [],
'Options': {}
}
response = self.app.post('/NetworkDriver.AllocateNetwork',
content_type='application/json',
data=jsonutils.dumps(
allocate_network_request))
self.assertEqual(200, response.status_code)
decoded_json = jsonutils.loads(response.data)
self.assertEqual({'Options': {}}, decoded_json)
def invoke_3play_callback(self, state='complete'):
"""
Make request to 3PlayMedia callback handler, this invokes
callback with all the necessary parameters.
Arguments:
state(str): state of the callback
"""
response = self.client.post(
# `build_url` strips `/`, putting it back and add necessary query params.
'/{}'.format(build_url(
self.url, edx_video_id=self.video.studio_id,
org=self.org, lang_code=self.video_source_language
)),
content_type='application/x-www-form-urlencoded',
data=urllib.urlencode(dict(file_id=self.file_id, status=state))
)
return response
def test_missing_required_params(self, request_data, mock_logger):
"""
Test the callback in case of missing attributes.
"""
response = self.client.post(
'/{}'.format(build_url(self.url, **request_data['query_params'])),
content_type='application/x-www-form-urlencoded',
data=urllib.urlencode(request_data['data']),
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Assert the logs
required_attrs = ['file_id', 'lang_code', 'status', 'org', 'edx_video_id']
received_attrs = request_data['data'].keys() + request_data['query_params'].keys()
missing_attrs = [attr for attr in required_attrs if attr not in received_attrs]
mock_logger.warning.assert_called_with(
u'[3PlayMedia Callback] process_id=%s Received Attributes=%s Missing Attributes=%s',
request_data['data'].get('file_id', None),
received_attrs,
missing_attrs,
)
def test_callback_for_non_success_statuses(self, state, message, expected_status, mock_logger):
"""
Tests the callback for all the non-success statuses.
"""
self.url = '/{}'.format(build_url(
self.url, edx_video_id='12345', org='MAx', lang_code=self.video_source_language
))
self.client.post(self.url, content_type='application/x-www-form-urlencoded', data=urllib.urlencode({
'file_id': self.file_id,
'status': state,
'error_description': state # this will be logged.
}))
self.assertEqual(
TranscriptProcessMetadata.objects.filter(process_id=self.file_id).latest().status,
expected_status
)
mock_logger.error.assert_called_with(
message,
state,
self.org,
self.video.studio_id,
self.file_id
)
def test_transcript_credentials_invalid_provider(self, post_data):
"""
Test that post crednetials gives proper error in case of invalid provider.
"""
# Verify that transcript credentials are not present for this org and provider.
provider = post_data.get('provider')
response = self.client.post(
self.url,
data=json.dumps(post_data),
content_type='application/json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = json.loads(response.content)
self.assertDictEqual(response, {
'message': 'Invalid provider {provider}.'.format(provider=provider),
'error_type': TranscriptionProviderErrorType.INVALID_PROVIDER
})
def test_transcript_credentials_error(self, post_data, missing_keys):
"""
Test that post credentials gives proper error in case of invalid input.
"""
provider = post_data.get('provider')
error_message = '{missing} must be specified for {provider}.'.format(
provider=provider,
missing=missing_keys
)
response = self.client.post(
self.url,
data=json.dumps(post_data),
content_type='application/json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = json.loads(response.content)
self.assertDictEqual(response, {
'message': error_message,
'error_type': TranscriptionProviderErrorType.MISSING_REQUIRED_ATTRIBUTES
})
def test_send_batch_chunk(self, mock_post):
"""
Test that MailgunClient.send_batch chunks recipients
"""
chunk_size = 10
recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
chunked_emails_to = [recipient_tuples[i:i + chunk_size] for i in range(0, len(recipient_tuples), chunk_size)]
assert len(recipient_tuples) == 52
responses = MailgunClient.send_batch('email subject', 'email body', recipient_tuples, chunk_size=chunk_size)
assert mock_post.called
assert mock_post.call_count == 6
for call_num, args in enumerate(mock_post.call_args_list):
called_args, called_kwargs = args
assert list(called_args)[0] == '{}/{}'.format(settings.MAILGUN_URL, 'messages')
assert called_kwargs['data']['text'].startswith('email body')
assert called_kwargs['data']['subject'] == 'email subject'
assert sorted(called_kwargs['data']['to']) == sorted([email for email, _ in chunked_emails_to[call_num]])
assert called_kwargs['data']['recipient-variables'] == json.dumps(
{email: context or {} for email, context in chunked_emails_to[call_num]}
)
response = responses[call_num]
assert response.status_code == HTTP_200_OK