def _get_upload_content(field_storage):
"""Returns an email.Message holding the values of the file transfer.
It decodes the content of the field storage and creates a new email.Message.
Args:
field_storage: cgi.FieldStorage that represents uploaded blob.
Returns:
An email.message.Message holding the upload information.
"""
message = email.message.Message()
message.add_header(
'content-transfer-encoding',
field_storage.headers.getheader('Content-Transfer-Encoding', ''))
message.set_payload(field_storage.file.read())
payload = message.get_payload(decode=True)
return email.message_from_string(payload)
python类add_header()的实例源码
def _get_upload_content(field_storage):
"""Returns an email.Message holding the values of the file transfer.
It decodes the content of the field storage and creates a new email.Message.
Args:
field_storage: cgi.FieldStorage that represents uploaded blob.
Returns:
An email.message.Message holding the upload information.
"""
message = email.message.Message()
message.add_header(
'content-transfer-encoding',
field_storage.headers.getheader('Content-Transfer-Encoding', ''))
message.set_payload(field_storage.file.read())
payload = message.get_payload(decode=True)
return email.message_from_string(payload)
def _get_upload_content(field_storage):
"""Returns an email.Message holding the values of the file transfer.
It decodes the content of the field storage and creates a new email.Message.
Args:
field_storage: cgi.FieldStorage that represents uploaded blob.
Returns:
An email.message.Message holding the upload information.
"""
message = email.message.Message()
message.add_header(
'content-transfer-encoding',
field_storage.headers.getheader('Content-Transfer-Encoding', ''))
message.set_payload(field_storage.file.read())
payload = message.get_payload(decode=True)
return email.message_from_string(payload)
def _get_upload_content(field_storage):
"""Returns an email.Message holding the values of the file transfer.
It decodes the content of the field storage and creates a new email.Message.
Args:
field_storage: cgi.FieldStorage that represents uploaded blob.
Returns:
An email.message.Message holding the upload information.
"""
message = email.message.Message()
message.add_header(
'content-transfer-encoding',
field_storage.headers.getheader('Content-Transfer-Encoding', ''))
message.set_payload(field_storage.file.read())
payload = message.get_payload(decode=True)
return email.message_from_string(payload)
def _get_upload_content(field_storage):
"""Returns an email.Message holding the values of the file transfer.
It decodes the content of the field storage and creates a new email.Message.
Args:
field_storage: cgi.FieldStorage that represents uploaded blob.
Returns:
An email.message.Message holding the upload information.
"""
message = email.message.Message()
message.add_header(
'content-transfer-encoding',
field_storage.headers.getheader('Content-Transfer-Encoding', ''))
message.set_payload(field_storage.file.read())
payload = message.get_payload(decode=True)
return email.message_from_string(payload)
def _run_test_success(self, upload_data, upload_url):
"""Basic dispatcher request flow."""
request_path = urlparse.urlparse(upload_url)[2]
# Get session key from upload url.
session_key = upload_url.split('/')[-1]
self.environ['PATH_INFO'] = request_path
self.environ['CONTENT_TYPE'] = (
'multipart/form-data; boundary="================1234=="')
status, _, response_body, forward_environ, forward_body = (
self.run_dispatcher(upload_data))
self.assertEquals('200 OK', status)
self.assertEquals('Forwarded successfully.', response_body)
self.assertNotEquals(None, forward_environ)
# These must NOT be unicode strings.
self.assertIsInstance(forward_environ['PATH_INFO'], str)
if 'QUERY_STRING' in forward_environ:
self.assertIsInstance(forward_environ['QUERY_STRING'], str)
self.assertRegexpMatches(forward_environ['CONTENT_TYPE'],
r'multipart/form-data; boundary="[^"]+"')
self.assertEquals(len(forward_body), int(forward_environ['CONTENT_LENGTH']))
self.assertIn(constants.FAKE_IS_ADMIN_HEADER, forward_environ)
self.assertEquals('1', forward_environ[constants.FAKE_IS_ADMIN_HEADER])
new_request = email.message_from_string(
'Content-Type: %s\n\n%s' % (forward_environ['CONTENT_TYPE'],
forward_body))
(upload,) = new_request.get_payload()
self.assertEquals('message/external-body', upload.get_content_type())
message = email.message.Message()
message.add_header('Content-Type', upload['Content-Type'])
blob_key = message.get_param('blob-key')
blob_contents = blobstore.BlobReader(blob_key).read()
self.assertEquals('value', blob_contents)
self.assertRaises(datastore_errors.EntityNotFoundError,
datastore.Get,
session_key)
return upload, forward_environ, forward_body