python类Response()的实例源码

base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_response_and_time(self, key, default=(None, None)):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response, timestamp = self.responses[key]
        except KeyError:
            return default
        return self.restore_response(response), timestamp
restapi.py 文件源码 项目:onedrive-e 作者: tobecontinued 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def put(self, url, data, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP PUT request.
        :param str url: URL of the HTTP request.
        :param bytes | None data: Binary data to send in the request body.
        :param dict | None headers: Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        params = {
            'proxies': self.proxies,
            'data': data
        }
        if headers is not None:
            params['headers'] = headers
        return self.request('put', url, params=params,
                            ok_status_code=ok_status_code, auto_renew=auto_renew)
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save_response(self, key, response):
        """ Save response to cache

        :param key: key for this response
        :param response: response to save

        .. note:: Response is reduced before saving (with :meth:`reduce_response`)
                  to make it picklable
        """
        #
        # Delete the X-Auth-Token
        #
        if 'X-Auth-Token' in response.request.headers:
            del response.request.headers['X-Auth-Token']

        self.responses[key] = response
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 81 收藏 0 点赞 0 评论 0
def get_response(self, key, default=None):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response = self.responses[key]
        except KeyError:
            return default
        return response
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def restore_response(self, response, seen=None):
        """ Restore response object after unpickling
        """
        if seen is None:
            seen = {}
        try:
            return seen[id(response)]
        except KeyError:
            pass
        result = requests.Response()
        for field in self._response_attrs:
            setattr(result, field, getattr(response, field, None))
        result.raw._cached_content_ = result.content
        seen[id(response)] = result
        result.history = tuple(self.restore_response(r, seen) for r in response.history)
        return result
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_request_api_server_auth_recover(self, options, config, request,
                                             text):
        driver = self._get_driver(options, config)
        driver._apiinsecure = False
        driver._use_api_certs = False
        response_bad = requests.Response()
        response_bad.status_code = requests.codes.unauthorized
        token = "xyztoken"
        text.return_value = json.dumps({
            'access': {
                'token': {
                    'id': token
                },
            },
        })
        response_good = requests.Response()
        response_good.status_code = requests.codes.ok
        request.side_effect = [response_bad, response_good, response_good]
        url = "/URL"

        driver._request_api_server(url)

        request.assert_called_with('/URL', data=None,
                                   headers={'X-AUTH-TOKEN': token})
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_request_backend(self, options, config):
        driver = self._get_driver(options, config)
        driver._relay_request = mock.MagicMock()
        response = requests.Response()
        response.status_code = requests.codes.ok
        driver._relay_request.return_value = response

        context = mock.MagicMock()
        data_dict = {}
        obj_name = 'object'
        action = 'TEST'

        code, message = driver._request_backend(context, data_dict, obj_name,
                                                action)

        self.assertEqual(requests.codes.ok, code)
        self.assertEqual({'message': None}, message)
        driver._relay_request.assert_called()
test_request.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_asend_returns_appropriate_sorna_response(mocker, req_params,
                                                        mock_sorna_aresp):
    req = Request(**req_params)
    methods = Request._allowed_methods
    for method in methods:
        req.method = method

        mock_reqfunc = mocker.patch.object(
            aiohttp.ClientSession, method.lower(),
            new_callable=asynctest.CoroutineMock
        )
        mock_reqfunc.return_value, conf = mock_sorna_aresp

        resp = await req.asend()

        assert isinstance(resp, Response)
        assert resp.status == conf['status']
        assert resp.reason == conf['reason']
        assert resp.content_type == conf['content_type']
        body = await conf['read']()
        assert resp.content_length == len(body)
        assert resp.text() == body.decode()
        assert resp.json() == json.loads(body.decode())
rnr_wrappers.py 文件源码 项目:retrieve-and-rank-tuning 作者: rchaks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parse_response_content_for_predictions(self, question_number, response):
        """
        Parses the json representation of the docs from the HTTP response and returns it as list predictions
            with scores (and confidences if they exist)
        :param str question_number: used as qid
        :param requests.Response response:
        :return: list of feature vectors
        :rtype: list(list(str))
        """
        response_content = response.json()
        results = []
        if response_content['response']['numFound'] > 0:
            for doc in response_content['response']['docs']:
                if "ranker.confidence" in doc:
                    conf_score = doc["ranker.confidence"]
                else:
                    conf_score = None
                results.append(Prediction(qid=question_number, aid=doc[self.DOC_ID_FIELD_NAME], rank_score=doc['score'],
                                          conf_score=conf_score))
        else:
            self.logger.warn('Empty response: %s' % vars(response))
        return results
discovery_wrappers.py 文件源码 项目:retrieve-and-rank-tuning 作者: rchaks 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _parse_response_content_for_predictions(self, question_number, response):
        """
        Parses the json representation of the docs from the HTTP response and returns it as list predictions
            with scores (and confidences if they exist)
        :param str question_number: used as qid
        :param requests.Response response:
        :return: list of feature vectors
        :rtype: list(list(str))
        """
        response_content = response.json()
        results = []
        if response_content["matching_results"] > 0:
            for doc in response_content['results']:
                results.append(Prediction(qid=question_number, aid=doc[DOC_ID_FIELD_NAME], rank_score=doc['score'],
                                          conf_score=None))
        else:
            self.logger.warn('Empty response: %s' % vars(response))
        return results
restapi.py 文件源码 项目:onedrive-e 作者: tobecontinued 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, url, params=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP GET request.
        :param str url: URL of the HTTP request.
        :param dict[str, T] | None params: (Optional) Dictionary to construct query string.
        :param dict | None headers: (Optional) Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        args = {'proxies': self.proxies}
        if params is not None:
            args['params'] = params
        if headers is not None:
            args['headers'] = headers
        return self.request('get', url, args, ok_status_code=ok_status_code, auto_renew=auto_renew)
restapi.py 文件源码 项目:onedrive-e 作者: tobecontinued 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post(self, url, data=None, json=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP POST request.
        :param str url: URL of the HTTP request.
        :param dict | None data: (Optional) Data in POST body of the request.
        :param dict | None json: (Optional) Send the dictionary as JSON content in POST body and set proper headers.
        :param dict | None headers: (Optional) Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        params = {
            'proxies': self.proxies
        }
        if json is not None:
            params['json'] = json
        else:
            params['data'] = data
        if headers is not None:
            params['headers'] = headers
        return self.request('post', url, params, ok_status_code=ok_status_code, auto_renew=auto_renew)
resource.py 文件源码 项目:OpenStackClient_VBS 作者: Huawei 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def mixin_request_id(self, resp):
        """mixin request id from request response

        :param request.Response resp: http response
        """
        if isinstance(resp, Response):
            # Extract 'X-Openstack-Request-Id' from headers if
            # response is a Response object.
            request_id = (resp.headers.get('openstack-request-id') or
                          resp.headers.get('x-openstack-request-id') or
                          resp.headers.get('x-compute-request-id'))
        else:
            # If resp is of type string or None.
            request_id = resp

        self.request_id = request_id
fake_client.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 204
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
api_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_add_channel_failed_create_channel(mock_staff_client, mocker):
    """If client.channels.create fails an exception should be raised"""
    response_500 = Response()
    response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR
    mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500)

    with pytest.raises(ChannelCreationException) as ex:
        api.add_channel(
            Search.from_dict({}),
            "title",
            "name",
            "public_description",
            "channel_type",
            123,
            456,
        )
    assert ex.value.args[0] == "Error creating channel name"
    mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with()
    assert mock_staff_client.channels.create.call_count == 1
    assert PercolateQuery.objects.count() == 0
    assert Channel.objects.count() == 0
api_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_batch_400_no_raise(self, mock_post):
        """
        Test that if raise_for_status is False we don't raise an exception for a 400 response
        """
        mock_post.return_value = Mock(
            spec=Response,
            status_code=HTTP_400_BAD_REQUEST,
            json=mocked_json()
        )

        chunk_size = 10
        recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
        assert len(recipient_tuples) == 52
        with override_settings(
            MAILGUN_RECIPIENT_OVERRIDE=None,
        ):
            resp_list = MailgunClient.send_batch(
                'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False
            )

        assert len(resp_list) == 6
        for resp in resp_list:
            assert resp.status_code == HTTP_400_BAD_REQUEST
        assert mock_post.call_count == 6
        assert mock_post.return_value.raise_for_status.called is False
api_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_financial_aid_email_error(self, raise_for_status, mock_post):
        """
        Test that send_financial_aid_email handles errors correctly
        """
        mock_post.return_value = Mock(
            spec=Response,
            status_code=HTTP_400_BAD_REQUEST,
            json=mocked_json(),
        )

        response = MailgunClient.send_financial_aid_email(
            self.staff_user_profile.user,
            self.financial_aid,
            'email subject',
            'email body',
            raise_for_status=raise_for_status,
        )

        assert response.raise_for_status.called is raise_for_status
        assert response.status_code == HTTP_400_BAD_REQUEST
        assert response.json() == {}
fake_client.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def post_settings(greeting_text):
    """ Sets the START Button and also the Greeting Text.
        The payload for the START Button will be 'USER_START'

    :param str greeting_text: Desired Greeting Text (160 chars).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    # Set the greeting texts
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    txtpayload = {}
    txtpayload['setting_type'] = 'greeting'
    txtpayload['greeting'] = {'text': greeting_text}
    response_msg = json.dumps(txtpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    # Set the start button
    btpayload = {}
    btpayload['setting_type'] = 'call_to_actions'
    btpayload['thread_state'] = 'new_thread'
    btpayload['call_to_actions'] = [{'payload': 'USER_START'}]
    response_msg = json.dumps(btpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_start_button(payload='START'):
    """ Sets the Thread Settings Greeting Text
    (/docs/messenger-platform/thread-settings/get-started-button).

    :param str payload: Desired postback payload (Default START).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    btpayload = {}
    btpayload["setting_type"] = "call_to_actions"
    btpayload["thread_state"] = "new_thread"
    btpayload["call_to_actions"] = [{"payload": payload}]
    response_msg = json.dumps(btpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'):
    """ Sets the whistelisted domains for the Messenger Extension
    (/docs/messenger-platform/thread-settings/domain-whitelisting).

    :param list whistelisted_domains: Domains to be whistelisted.
    :param str domain_action_type: Action to run `add/remove` (Defaut add).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['setting_type'] = 'domain_whitelisting'
    payload['whitelisted_domains'] = whitelisted_domains
    payload['domain_action_type'] = domain_action_type
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def typing(fbid, sender_action):
    """ Displays/Hides the typing gif/mark seen on facebook chat
    (/docs/messenger-platform/send-api-reference/sender-actions)

    :param str fbid: User id to display action.
    :param str sender_action: `typing_off/typing_on/mark_seen`.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    payload['sender_action'] = sender_action
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status


# Send API Content Type
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post_text_message(fbid, message):
    """ Sends a common text message
    (/docs/messenger-platform/send-api-reference/text-message)

    :param str fbid: User id to send the text.
    :param str message: Text to be displayed for the user (230 chars).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    payload['message'] = {'text': message}  # Limit 320 chars
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def post_attachment(fbid, media_url, file_type):
    """ Sends a media attachment
    (/docs/messenger-platform/send-api-reference/contenttypes)

    :param str fbid: User id to send the audio.
    :param str url: Url of a hosted media.
    :param str type: image/audio/video/file.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    attachment = {"type": file_type, "payload": {"url": media_url}}
    payload['message'] = {"attachment": attachment}
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post_start_button(payload='START'):
    """ Sets the **Get Started button**.

    :usage:

        >>> payload = 'GET_STARTED'
        >>> response = fbbotw.post_start_button(payload=payload)
    :param str payload: Desired postback payload (Default 'START').
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/get-started-button <https://developers.facebook\
    .com/docs/messenger-platform/messenger-profile/get-started-button>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload_data = {}
    payload_data['get_started'] = {'payload': payload}
    data = json.dumps(payload_data)
    status = requests.post(url, headers=HEADER, data=data)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post_domain_whitelist(whitelisted_domains):
    """ Sets the whistelisted domains for the Messenger Extension

    :usage:

        >>> # Define the array of domains to be whitelisted (Max 10)
        >>> whistelisted_domains = [
                "https://myfirst_domain.com",
                "https://another_domain.com"
            ]
        >>> fbbotw.post_domain_whitelist(
                whitelisted_domains=whitelisted_domains
            )
    :param list whistelisted_domains: Domains to be whistelisted (Max 10).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/domain-whitelisting <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['whitelisted_domains'] = whitelisted_domains
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
fbbotw.py 文件源码 项目:fbbotw 作者: JoabMendes 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_account_linking_url(account_linking_url):
    """ Sets the **liking_url** to connect the user with your business login

    :usage:

        >>> my_callback_linking_url = "https://my_business_callback.com"
        >>> response = fbbotw.post_account_linking_url(
                account_linking_url=my_callback_linking_url
            )
    :param str account_linking_url: URL to the account linking OAuth flow.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/account-linking-url <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/account-linking-url>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['account_linking_url'] = account_linking_url
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status


问题


面经


文章

微信
公众号

扫码关注公众号