def _run_test_success(self, upload_data, upload_url):
"""Basic dispatcher request flow."""
request_path = urlparse.urlparse(upload_url)[2]
# Get session key from upload url.
session_key = upload_url.split('/')[-1]
self.environ['PATH_INFO'] = request_path
self.environ['CONTENT_TYPE'] = (
'multipart/form-data; boundary="================1234=="')
status, _, response_body, forward_environ, forward_body = (
self.run_dispatcher(upload_data))
self.assertEquals('200 OK', status)
self.assertEquals('Forwarded successfully.', response_body)
self.assertNotEquals(None, forward_environ)
# These must NOT be unicode strings.
self.assertIsInstance(forward_environ['PATH_INFO'], str)
if 'QUERY_STRING' in forward_environ:
self.assertIsInstance(forward_environ['QUERY_STRING'], str)
self.assertRegexpMatches(forward_environ['CONTENT_TYPE'],
r'multipart/form-data; boundary="[^"]+"')
self.assertEquals(len(forward_body), int(forward_environ['CONTENT_LENGTH']))
self.assertIn(constants.FAKE_IS_ADMIN_HEADER, forward_environ)
self.assertEquals('1', forward_environ[constants.FAKE_IS_ADMIN_HEADER])
new_request = email.message_from_string(
'Content-Type: %s\n\n%s' % (forward_environ['CONTENT_TYPE'],
forward_body))
(upload,) = new_request.get_payload()
self.assertEquals('message/external-body', upload.get_content_type())
message = email.message.Message()
message.add_header('Content-Type', upload['Content-Type'])
blob_key = message.get_param('blob-key')
blob_contents = blobstore.BlobReader(blob_key).read()
self.assertEquals('value', blob_contents)
self.assertRaises(datastore_errors.EntityNotFoundError,
datastore.Get,
session_key)
return upload, forward_environ, forward_body
评论列表
文章目录