def _get_request_view_and_content(self, create_content=True):
request = RequestFactory().get("/")
request.user = self.user
profile = self.user.profile
contents = []
if create_content:
contents.extend([
ContentFactory(author=profile, order=3, pinned=True),
ContentFactory(author=profile, order=2, pinned=True),
ContentFactory(author=profile, order=1, pinned=True),
])
Content.objects.filter(id=contents[0].id).update(order=3)
Content.objects.filter(id=contents[1].id).update(order=2)
Content.objects.filter(id=contents[2].id).update(order=1)
view = OrganizeContentProfileDetailView(request=request)
view.object = profile
view.target_profile = profile
view.kwargs = {"guid": profile.guid}
return request, view, contents, profile
python类RequestFactory()的实例源码
def setup_method(self, method):
from heartbeat.settings import HEARTBEAT
HEARTBEAT.update({
'checkers': ['tests.test_views'],
'auth': {
'username': 'foo',
'password': 'bar',
'authorized_ips': ['1.3.3.7'],
}
})
self.heartbeat = HEARTBEAT
basic_auth = {'HTTP_AUTHORIZATION': 'Basic Zm9vOmJhcg=='}
self.factory = RequestFactory(**basic_auth)
def test_chart_view_get(self):
ChartViewSubClass = type('ChartViewSubClass', (ChartView, ), {
'chart_instance': LineChart()
})
chart_view = ChartViewSubClass()
request_factory = RequestFactory()
request = request_factory.get('/test-url')
response = chart_view.get(request)
self.assertEquals(response.status_code, 200)
charset = getattr(response, 'charset', 'utf-8')
content = response.content.decode(charset)
data = json.loads(content)
self.assertIn('data', data)
self.assertIn('options', data)
self.assertIn('type', data)
self.assertTrue(isinstance(data['data'], dict))
self.assertTrue(isinstance(data['options'], dict))
self.assertTrue(isinstance(data['type'], (six.string_types, six.text_type)))
self.assertIn(data['type'], ['bar', 'line', 'radar', 'polarArea', 'pie', 'bubble'])
self.assertIn('title', data['options'])
def test_chart_view_get(self):
ChartViewSubClass = type('ChartViewSubClass', (ChartView, ), {
'chart_instance': LineChart()
})
chart_view = ChartViewSubClass()
request_factory = RequestFactory()
request = request_factory.get('/test-url')
response = chart_view.get(request)
self.assertEquals(response.status_code, 200)
charset = getattr(response, 'charset', 'utf-8')
content = response.content.decode(charset)
data = json.loads(content)
self.assertIn('data', data)
self.assertIn('options', data)
self.assertIn('type', data)
self.assertTrue(isinstance(data['data'], dict))
self.assertTrue(isinstance(data['options'], dict))
self.assertTrue(isinstance(data['type'], (six.string_types, six.text_type)))
self.assertIn(data['type'], ['bar', 'line', 'radar', 'polarArea', 'pie', 'bubble'])
self.assertIn('title', data['options'])
def test_set_request_debug():
@decorators.set_request_debug
def myview(request):
return http.HttpResponse('debug={}'.format(request._request_debug))
request = RequestFactory().get('/')
response = myview(request)
assert response.content == b'debug=False'
request = RequestFactory(HTTP_DEBUG='true').get('/')
response = myview(request)
assert response.content == b'debug=True'
request = RequestFactory(HTTP_DEBUG='0').get('/')
response = myview(request)
assert response.content == b'debug=False'
def test_datefilter_filtered(self):
self.request_factory = RequestFactory()
modeladmin = MyModelAdmin(MyModel, site)
request = self.request_factory.get('/', {'created_at__gte': self.today,
'created_at__lte': self.tomorrow})
changelist = self.get_changelist(request, MyModel, modeladmin)
queryset = changelist.get_queryset(request)
self.assertEqual(list(queryset), [self.django_book])
filterspec = changelist.get_filters(request)[0][0]
self.assertEqual(force_text(filterspec.title), 'created at')
choice = select_by(filterspec.choices(changelist))
self.assertEqual(choice['query_string'], '?')
self.assertEqual(choice['system_name'], 'created-at')
def test_datefilter_filtered_with_one_params(self):
self.request_factory = RequestFactory()
modeladmin = MyModelAdmin(MyModel, site)
request = self.request_factory.get('/', {'created_at__gte': self.today})
changelist = self.get_changelist(request, MyModel, modeladmin)
queryset = changelist.get_queryset(request)
self.assertEqual(list(queryset), [self.django_book])
filterspec = changelist.get_filters(request)[0][0]
self.assertEqual(force_text(filterspec.title), 'created at')
choice = select_by(filterspec.choices(changelist))
self.assertEqual(choice['query_string'], '?')
self.assertEqual(choice['system_name'], 'created-at')
def test_datefilter_filtered_datefield(self):
self.request_factory = RequestFactory()
modeladmin = MyModelDateAdmin(MyModelDate, site)
request = self.request_factory.get('/', {'created_at__gte': self.today,
'created_at__lte': self.tomorrow})
changelist = self.get_changelist(request, MyModelDate, modeladmin)
queryset = changelist.get_queryset(request)
self.assertEqual(list(queryset), [self.django_book_date])
filterspec = changelist.get_filters(request)[0][0]
self.assertEqual(force_text(filterspec.title), 'created at')
choice = select_by(filterspec.choices(changelist))
self.assertEqual(choice['query_string'], '?')
self.assertEqual(choice['system_name'], 'created-at')
def test_datefilter_filtered_with_one_params(self):
self.request_factory = RequestFactory()
modeladmin = MyModelTimeAdmin(MyModel, site)
request = self.request_factory.get('/', {'created_at__gte_0': self.today,
'created_at__gte_1': self.min_time})
changelist = self.get_changelist(request, MyModel, modeladmin)
queryset = changelist.get_queryset(request)
self.assertEqual(list(queryset), [self.django_book])
filterspec = changelist.get_filters(request)[0][0]
self.assertEqual(force_text(filterspec.title), 'created at')
choice = select_by(filterspec.choices(changelist))
self.assertEqual(choice['query_string'], '?')
self.assertEqual(choice['system_name'], 'created-at')
test_django-webhook-github.py 文件源码
项目:django-github-webhook
作者: fladi
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def setUp(self):
class SimpleView(WebHookView):
secret = 'foobar'
def ping(self, payload, request):
return payload
self.factory = RequestFactory()
self.view = SimpleView.as_view()
self.payload = json.dumps({'success': True})
self.mac_valid = hmac.new(
self.view.view_class.secret.encode('utf-8'),
msg=self.payload.encode('utf-8'),
digestmod=hashlib.sha1
)
self.mac_invalid = hmac.new(
'malicious'.encode('utf-8'),
msg=self.payload.encode('utf-8'),
digestmod=hashlib.sha1
)
def test_create_message_mutation():
user = mixer.blend('auth.User')
mut = schema.CreateMessageMutation()
data = {'message': 'Test'}
req = RequestFactory().get('/')
req.user = AnonymousUser()
res = mut.mutate(None, data, req, None)
assert res.status == 403, 'Should return 403 if user is not logged in'
req.user = user
res = mut.mutate(None, {}, req, None)
assert res.status == 400, 'Should return 400 if there are form errors'
assert 'message' in res.formErrors, (
'Should have form error for message field')
req.user = user
res = mut.mutate(None, {'message': 'Test'}, req, None)
assert res.status == 200, 'Should return 400 if there are form errors'
assert res.message.pk == 1, 'Should create new message'
def setUp(self):
self.site = AdminSite()
self.client = Client()
self.factory = RequestFactory()
# def test_spider_admin(self):
# client = Client()
# client.login(username='admin', password='pass')
# base_admin = '/admin/driver27'
# models = ['circuit', 'competition', 'driver', 'grandprix', 'race', 'season', 'seat', 'team']
# for model in models:
# url = base_admin + '/' + model + '/'
# resp = client.get(url, follow=True)
# self.assertEqual(resp.status_code, 200, url + ' code='+str(resp.status_code))
# url = base_admin + '/' + model + '/add/'
# resp = client.get(url, follow=True)
# self.assertEqual(resp.status_code, 200, url + ' code='+str(resp.status_code))
# url = base_admin + '/' + model + '/1/change/'
# resp = client.get(url, follow=True)
# self.assertEqual(resp.status_code, 200, url + ' code='+str(resp.status_code))
def test_scan_result_view_anonymous_user_post_request(self):
request = RequestFactory().post('/urlscan/', {'url': 'https://github.com/'})
request.user = AnonymousUser()
response = UrlScanView.as_view()(request)
url = response.url
digits_in_url = url[12:]
scan_id = ''
for character in digits_in_url:
try:
scan_id = scan_id + str(int(character))
except:
pass
scan_id = int(scan_id)
request = RequestFactory().post('/scanresult/', {'post_apple': 'Apple'})
response = ScanResults.as_view()(request, pk=scan_id)
self.assertEqual(405, response.status_code)
def test_scan_result_view_registered_user_get_request(self):
user = User.objects.create_user(username='Username', password='Password')
request = RequestFactory().post('/urlscan/', {'url': 'https://github.com/'})
request.user = user
response = UrlScanView.as_view()(request)
url = response.url
digits_in_url = url[12:]
scan_id = ''
for character in digits_in_url:
try:
scan_id = scan_id + str(int(character))
except:
pass
scan_id = int(scan_id)
request = RequestFactory().get('/scanresult/')
response = ScanResults.as_view()(request, pk=scan_id)
self.assertIn('<h3>Please wait... Your tasks are in the queue.\n Reload in 5-10 minutes</h3>\n', response.content)
def test_public_with_link_to_share_toggle_on(self):
# sharing behavior as of 09/13/2012:
# it requires both data_share and form_share both turned on
# in order to grant anon access to form uploading
# TODO: findout 'for_user': 'all' and what it means
response = self.client.post(self.perm_url, {'for_user': 'all',
'perm_type': 'link'})
self.assertEqual(response.status_code, 302)
self.assertEqual(MetaData.public_link(self.xform), True)
# toggle shared on
self.xform.shared = True
self.xform.shared_data = True
self.xform.save()
response = self.anon.get(self.show_url)
self.assertEqual(response.status_code, 302)
if not self._running_enketo():
raise SkipTest
with HTTMock(enketo_mock):
factory = RequestFactory()
request = factory.get('/')
request.user = AnonymousUser()
response = enter_data(
request, self.user.username, self.xform.id_string)
self.assertEqual(response.status_code, 302)
def form_list_xml(url, request, **kwargs):
response = requests.Response()
factory = RequestFactory()
req = factory.get(url.path)
req.user = authenticate(username='bob', password='bob')
req.user.profile.require_auth = False
req.user.profile.save()
id_string = 'transportation_2011_07_25'
if url.path.endswith('formList'):
res = formList(req, username='bob')
elif url.path.endswith('form.xml'):
res = download_xform(req, username='bob', id_string=id_string)
elif url.path.find('xformsManifest') > -1:
res = xformsManifest(req, username='bob', id_string=id_string)
elif url.path.find('formid-media') > -1:
data_id = url.path[url.path.rfind('/') + 1:]
res = download_media_data(
req, username='bob', id_string=id_string, data_id=data_id)
response._content = get_streaming_content(res)
else:
res = formList(req, username='bob')
response.status_code = 200
if not response._content:
response._content = res.content
return response
def test_add_participants(self):
# by default, a user will be authorized to add a participant if they are not yet in the thread
# we add new and existing participants
request = RequestFactory()
request.user = self.user
request.rest_messaging_participant = Participant.objects.get(id=self.user.id)
self.assertTrue(all(participant in [self.participant1, self.participant2, self.participant3] for participant in self.thread1.participants.all()))
self.assertEqual(3, len(self.thread1.participants.all()))
with self.assertNumQueries(2):
self.thread1.add_participants(request, self.participant4.id, self.participant5.id)
self.assertTrue(all(participant in [self.participant1, self.participant2, self.participant3, self.participant4, self.participant5] for participant in self.thread1.participants.all()))
self.assertEqual(5, len(self.thread1.participants.all()))
# by default, the number of participants is limited to 10
l = []
for i in range(7, 16): # setUp ends at 6
l.append(Participant(id=i))
Participant.objects.bulk_create(l)
self.thread1.add_participants(request, *[p.id for p in l])
self.assertEqual(10, len(self.thread1.participants.all()))
def setUp(self):
self.tracker_1 = TrackerFactory.create(
device_type=Tracker.PC, ip_country='ESP')
self.tracker_2 = TrackerFactory.create(
device_type=Tracker.MOBILE, ip_country='NLD')
self.tracker_3 = TrackerFactory.create(
device_type=Tracker.TABLET, ip_country='GBR')
self.admin_site = AdminSite(name='tracker_admin')
self.tracker_admin = TrackerAdmin(Tracker, self.admin_site)
self.url = reverse('admin:tracking_analyzer_tracker_changelist')
# Create a superuser and mock a request made by it.
self.user = UserFactory.create(is_staff=True, is_superuser=True)
self.request = RequestFactory().get('/')
self.request.user = self.user
def test_changelist_view_context_countries_count_present(self):
"""
The ``get_request_count`` context contains a dataset for countries
requests when not filtering by country.
"""
url = reverse('admin:tracking_analyzer_tracker_changelist')
request = RequestFactory().get(url)
request.user = self.user
response = self.tracker_admin.changelist_view(request)
self.assertEqual(
response.context_data['countries_count'],
'[["{0}", 1], ["{1}", 1], ["{2}", 1]]'.format(
countries.alpha3(self.tracker_1.ip_country),
countries.alpha3(self.tracker_3.ip_country),
countries.alpha3(self.tracker_2.ip_country),
)
)
def test_form_kwargs_has_user(self):
request = RequestFactory().get("/")
request.user = self.user
view = self.get_instance(MarkdownXImageUploadView, request=request)
kwargs = view.get_form_kwargs()
self.assertEqual(kwargs.get("user"), self.user)
def get_request(user):
request = RequestFactory().get("/")
request.user = user
return request
def test_serializer_context(self, mock_serializer):
request = RequestFactory().get("/")
view = StreamsAPIBaseView()
view.get(request)
mock_serializer.assert_called_once_with([], many=True, context={"throughs": {}, "request": request})
def setUp(self):
super().setUp()
self.user = self.make_user()
self.profile = self.user.profile
self.factory = RequestFactory()
self.view = ProfileUpdateView()
request = self.factory.get('/fake-url')
request.user = self.user
self.view.request = request
def test_profile_visibility_authenticated_staff_user(self):
request = RequestFactory().get('/')
request.user = self.staff_user
view = self.get_instance(GlobalSearchView, request=request)
queryset_pks = view.get_queryset().values_list("pk", flat=True)
self.assertEqual(
set(queryset_pks),
{str(self.public_profile.pk), str(self.site_profile.pk), str(self.limited_profile.pk)},
)
def test_profile_visibility_anonymous_user(self):
request = RequestFactory().get('/')
request.user = AnonymousUser()
view = self.get_instance(GlobalSearchView, request=request)
queryset_pks = view.get_queryset().values_list("pk", flat=True)
self.assertEqual(
set(queryset_pks), {str(self.public_profile.pk)},
)
def test_index():
factory = RequestFactory()
request = factory.get(reverse('index'))
response = index(request)
assert response.status_code == 200
def test_disabled_auth(self):
self.heartbeat['auth'] = {'disable': True}
# Make factory without auth header
self.factory = RequestFactory()
request = self.factory.get(reverse('1337'))
response = details(request)
assert response.status_code == 200
assert response['content-type'] == 'application/json'
json_response = json.loads(response.content.decode('utf-8'))
assert json_response['test_views']['ping'] == 'pong'
def request(self):
return RequestFactory()
def request(self):
return RequestFactory()
def request(self):
return RequestFactory()