python类MultipartEncoder()的实例源码

multipart.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def from_fields(cls, fields, boundary=None, encoding='utf-8', callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
directives.py 文件源码 项目:python-avs 作者: lddias 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def generate_payload(event):
    """
    returns a file-like MultipartEncoder instance that can be used to write-out the multi-part request headers and body


    :param event: dict payload to send as "metadata" part in multi-part request
    :return: MultipartEncoder
    """
    return MultipartEncoder({"metadata": (None, io.BytesIO(json.dumps(event).encode()), 'application/json')})
whereis.py 文件源码 项目:whereis 作者: matfowle 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sendSparkPOST(url, data):
    m = MultipartEncoder(fields=data)
    request = requests.post(url, data=m,
                            headers={"Content-Type": m.content_type,
                            "Authorization": "Bearer "+bearer})
    return request
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def post_update(context, update):
    """
    Updates a Cisco Spark room

    :param context: button state and configuration
    :type context: ``dict``

    :param update: content of the update to be posted there
    :type update: ``str`` or ``dict``

    If the update is a simple string, it is sent as such to Cisco Spark.
    Else if it a dictionary, then it is encoded as MIME Multipart.
    """

    logging.info("Posting update to Cisco Spark room")

    url = 'https://api.ciscospark.com/v1/messages'
    headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']}

    if isinstance(update, dict):
        update['roomId'] = context['spark']['id']
        payload = MultipartEncoder(fields=update)
        headers['Content-Type'] = payload.content_type
    else:
        payload = {'roomId': context['spark']['id'], 'text': update }

    response = requests.post(url=url, headers=headers, data=payload)

    if response.status_code != 200:
        logging.info(response.json())
        raise Exception("Received error code {}".format(response.status_code))

    logging.info('- done, check the room with Cisco Spark client software')

#
# handle Twilio API
#
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_file(path):
    _, filename = os.path.split(path)

    with open(path, 'rb') as file_:
        fields = {
            filename: (filename, file_, 'text/plain')
        }

        encoder = MultipartEncoder(fields=fields)

        return (encoder.content_type, encoder.to_string())
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_smoke(self):
        encoder = MultipartEncoder(fields={'name': 'hello'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})

        parser.data_received(encoder.to_string())
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_basic_single(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello world'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello world')
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_break_chunk_at_boundary(self):
        expected_first_value = 'hello' * 500
        expected_second_value = 'hello' * 500

        first = ValueTarget()
        second = ValueTarget()

        encoder = MultipartEncoder(fields={
            'first': 'hello' * 500,
            'second': 'hello' * 500
        })

        body = encoder.to_string()
        boundary = encoder.boundary.encode('utf-8')

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})

        parser.register('first', first)
        parser.register('second', second)

        index = body[50:].index(boundary) + 5

        parser.data_received(body[:index])
        parser.data_received(body[index:])

        self.assertEqual(first.value, expected_first_value.encode('utf-8'))
        self.assertEqual(second.value, expected_second_value.encode('utf-8'))
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_mixed_content_varying_chunk_size(self):
        with open(data_file_path('file.txt'), 'rb') as file_:
            expected_value = file_.read()

        with open(data_file_path('file.txt'), 'rb') as file_:
            fields = {
                'name': 'hello world',
                'age': '10',
                'cv.txt': ('file.txt', file_, 'text/plain')
            }

            encoder = MultipartEncoder(fields=fields)

            body = encoder.to_string()
            content_type = encoder.content_type

        for index in range(len(body)):
            name = ValueTarget()
            age = ValueTarget()
            cv = ValueTarget()

            parser = StreamingFormDataParser(
                headers={'Content-Type': content_type})

            parser.register('name', name)
            parser.register('age', age)
            parser.register('cv.txt', cv)

            parser.data_received(body[:index])
            parser.data_received(body[index:])

            self.assertEqual(name.value, b'hello world')
            self.assertEqual(age.value, b'10')
            self.assertEqual(cv.value, expected_value)
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_parameter_contains_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello\r\nworld'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)
        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello\r\nworld')
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def test_parameter_ends_with_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello\r\n'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello\r\n')
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_parameter_starts_with_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': '\r\nworld'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'\r\nworld')
test_parser.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_register_after_data_received(self):
        encoder = MultipartEncoder(fields={'name': 'hello'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.data_received(encoder.to_string())

        self.assertRaises(ParseFailedException, parser.register,
                          'name', ValueTarget())
benchmark.py 文件源码 项目:streaming-form-data 作者: siddhantgoel 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    args = parse_args()

    with open(args.filename, 'rb') as fd:
        encoder = MultipartEncoder(fields={
            'file': ('file', fd, args.content_type)
        })

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('file', ValueTarget())

        parser.data_received(encoder.to_string())
panos.py 文件源码 项目:napalm-panos 作者: napalm-automation-community 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _import_file(self, filename):
        if not self.api_key:
            key = self.device.keygen()
        else:
            key = self.api_key

        params = {
            'type': 'import',
            'category': 'configuration',
            'key': key
        }

        path = os.path.basename(filename)

        mef = requests_toolbelt.MultipartEncoder(
            fields={
                'file': (path, open(filename, 'rb'), 'application/octet-stream')
            }
        )

        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        url = 'https://{0}/api/'.format(self.hostname)
        request = requests.post(
            url,
            verify=False,
            params=params,
            headers={'Content-Type': mef.content_type},
            data=mef
        )

        # if something goes wrong just raise an exception
        request.raise_for_status()
        response = xml.etree.ElementTree.fromstring(request.content)

        if response.attrib['status'] == 'error':
            return False
        else:
            return path
encoder.py 文件源码 项目:OctoFusion 作者: tapnair 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)
encoder.py 文件源码 项目:OctoFusion 作者: tapnair 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, encoder, callback=None):
        #: Instance of the :class:`MultipartEncoder` being monitored
        self.encoder = encoder

        #: Optionally function to call after a read
        self.callback = callback or IDENTITY

        #: Number of bytes already read from the :class:`MultipartEncoder`
        #: instance
        self.bytes_read = 0

        #: Avoid the same problem in bug #80
        self.len = self.encoder.len
encoder.py 文件源码 项目:OctoFusion 作者: tapnair 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def from_fields(cls, fields, boundary=None, encoding='utf-8',
                    callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
__init__.py 文件源码 项目:Instagram-API-python 作者: LevPasha 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def uploadPhoto(self, photo, caption = None, upload_id = None, is_sidecar = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        if is_sidecar:
            data['is_sidecar']='1'
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
multipart.py 文件源码 项目:anaconda-project 作者: Anaconda-Platform 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)


问题


面经


文章

微信
公众号

扫码关注公众号