def from_fields(cls, fields, boundary=None, encoding='utf-8', callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
python类MultipartEncoder()的实例源码
def generate_payload(event):
"""
returns a file-like MultipartEncoder instance that can be used to write-out the multi-part request headers and body
:param event: dict payload to send as "metadata" part in multi-part request
:return: MultipartEncoder
"""
return MultipartEncoder({"metadata": (None, io.BytesIO(json.dumps(event).encode()), 'application/json')})
def sendSparkPOST(url, data):
m = MultipartEncoder(fields=data)
request = requests.post(url, data=m,
headers={"Content-Type": m.content_type,
"Authorization": "Bearer "+bearer})
return request
def post_update(context, update):
"""
Updates a Cisco Spark room
:param context: button state and configuration
:type context: ``dict``
:param update: content of the update to be posted there
:type update: ``str`` or ``dict``
If the update is a simple string, it is sent as such to Cisco Spark.
Else if it a dictionary, then it is encoded as MIME Multipart.
"""
logging.info("Posting update to Cisco Spark room")
url = 'https://api.ciscospark.com/v1/messages'
headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']}
if isinstance(update, dict):
update['roomId'] = context['spark']['id']
payload = MultipartEncoder(fields=update)
headers['Content-Type'] = payload.content_type
else:
payload = {'roomId': context['spark']['id'], 'text': update }
response = requests.post(url=url, headers=headers, data=payload)
if response.status_code != 200:
logging.info(response.json())
raise Exception("Received error code {}".format(response.status_code))
logging.info('- done, check the room with Cisco Spark client software')
#
# handle Twilio API
#
def load_file(path):
_, filename = os.path.split(path)
with open(path, 'rb') as file_:
fields = {
filename: (filename, file_, 'text/plain')
}
encoder = MultipartEncoder(fields=fields)
return (encoder.content_type, encoder.to_string())
def test_smoke(self):
encoder = MultipartEncoder(fields={'name': 'hello'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.data_received(encoder.to_string())
def test_basic_single(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello world'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'hello world')
def test_break_chunk_at_boundary(self):
expected_first_value = 'hello' * 500
expected_second_value = 'hello' * 500
first = ValueTarget()
second = ValueTarget()
encoder = MultipartEncoder(fields={
'first': 'hello' * 500,
'second': 'hello' * 500
})
body = encoder.to_string()
boundary = encoder.boundary.encode('utf-8')
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('first', first)
parser.register('second', second)
index = body[50:].index(boundary) + 5
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(first.value, expected_first_value.encode('utf-8'))
self.assertEqual(second.value, expected_second_value.encode('utf-8'))
def test_mixed_content_varying_chunk_size(self):
with open(data_file_path('file.txt'), 'rb') as file_:
expected_value = file_.read()
with open(data_file_path('file.txt'), 'rb') as file_:
fields = {
'name': 'hello world',
'age': '10',
'cv.txt': ('file.txt', file_, 'text/plain')
}
encoder = MultipartEncoder(fields=fields)
body = encoder.to_string()
content_type = encoder.content_type
for index in range(len(body)):
name = ValueTarget()
age = ValueTarget()
cv = ValueTarget()
parser = StreamingFormDataParser(
headers={'Content-Type': content_type})
parser.register('name', name)
parser.register('age', age)
parser.register('cv.txt', cv)
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(name.value, b'hello world')
self.assertEqual(age.value, b'10')
self.assertEqual(cv.value, expected_value)
def test_parameter_contains_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello\r\nworld'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'hello\r\nworld')
def test_parameter_ends_with_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello\r\n'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'hello\r\n')
def test_parameter_starts_with_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': '\r\nworld'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'\r\nworld')
def test_register_after_data_received(self):
encoder = MultipartEncoder(fields={'name': 'hello'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.data_received(encoder.to_string())
self.assertRaises(ParseFailedException, parser.register,
'name', ValueTarget())
def main():
args = parse_args()
with open(args.filename, 'rb') as fd:
encoder = MultipartEncoder(fields={
'file': ('file', fd, args.content_type)
})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('file', ValueTarget())
parser.data_received(encoder.to_string())
def _import_file(self, filename):
if not self.api_key:
key = self.device.keygen()
else:
key = self.api_key
params = {
'type': 'import',
'category': 'configuration',
'key': key
}
path = os.path.basename(filename)
mef = requests_toolbelt.MultipartEncoder(
fields={
'file': (path, open(filename, 'rb'), 'application/octet-stream')
}
)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
url = 'https://{0}/api/'.format(self.hostname)
request = requests.post(
url,
verify=False,
params=params,
headers={'Content-Type': mef.content_type},
data=mef
)
# if something goes wrong just raise an exception
request.raise_for_status()
response = xml.etree.ElementTree.fromstring(request.content)
if response.attrib['status'] == 'error':
return False
else:
return path
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def from_fields(cls, fields, boundary=None, encoding='utf-8',
callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
def uploadPhoto(self, photo, caption = None, upload_id = None, is_sidecar = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
if is_sidecar:
data['is_sidecar']='1'
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)