python类Response()的实例源码

nsxlib_testcase.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def mock_nsx_clustered_api(self, session_response=None, **kwargs):
        orig_request = nsx_cluster.TimeoutSession.request

        def mocked_request(*args, **kwargs):
            if args[2].endswith('api/session/create'):
                response = models.Response()
                response.status_code = 200
                response.headers = {
                    'Set-Cookie': 'JSESSIONID=%s;junk' % JSESSIONID}
                return response
            return orig_request(*args, **kwargs)

        with mock.patch.object(nsx_cluster.TimeoutSession, 'request',
                               new=mocked_request):
            cluster = NsxClientTestCase.MockNSXClusteredAPI(
                session_response=session_response, **kwargs)
        return cluster
http_client.py 文件源码 项目:StockScraper 作者: stovorov 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def call_endpoint(self, method: str, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
        """Call api endpoint method.

        :param method: REST API methods name (string) - 'POST', 'GET', 'PUT', 'PATCH', 'DELETE'
        :param endpoint: endpoint which will be called with method.
        :param args: additional arguments
        :param kwargs: additional key-worded arguments
        :return:
        """
        logging.info('Calling method %s for endpoint %s', method, endpoint)

        if method.upper() not in REST_API_METHODS:
            logging.error('Method %s does not match REST API METHODS: %s', method, ','.join(REST_API_METHODS))
            raise TypeError('Method {0} does not match REST API METHODS: {1}'.format(str(method),
                                                                                     ','.join(REST_API_METHODS)))

        req_method = getattr(requests, method.lower())
        resp = req_method(urljoin(self.host_name, endpoint), *args, **kwargs)
        return resp
request.py 文件源码 项目:otest 作者: rohe 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def expected_error_response(self, response):
        if isinstance(response, Response):  # requests response
            # don't want bytes
            _txt = response.content.decode('utf8')
            response = ErrorResponse().from_json(_txt)

        if isinstance(response, ErrorResponse):
            self.conv.events.store(EV_PROTOCOL_RESPONSE, response,
                                   sender=self.__class__.__name__)
            if response["error"] not in self.expect_error["error"]:
                raise Break("Wrong error, got {} expected {}".format(
                    response["error"], self.expect_error["error"]))
            try:
                if self.expect_error["stop"]:
                    raise Break("Stop requested after received expected error")
            except KeyError:
                pass
        else:
            self.conv.events.store(EV_FAULT, "Expected error, didn't get it")
            raise Break("Did not receive expected error")

        return response
test_cluster.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_sess_create_resp():
    sess_create_response = models.Response()
    sess_create_response.status_code = 200
    sess_create_response.headers = {'Set-Cookie': 'JSESSIONID=abc;'}
    return sess_create_response
test_cli.py 文件源码 项目:http-prompt 作者: eliangcs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self.listener = ExecutionListener({})

        self.response = Response()
        self.response.cookies.update({
            'username': 'john',
            'sessionid': 'abcd'
        })

        self.context = Context('http://localhost')
        self.context.headers['Cookie'] = 'name="John Doe"; sessionid=xyz'
generic_proxy.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def forward_request(self, method, path, data, headers):
        """ This interceptor method is called by the proxy when receiving a new request
            (*before* forwarding the request to the backend service). It receives details
            of the incoming request, and returns either of the following results:

            * True if the request should be forwarded to the backend service as-is (default).
            * An integer (e.g., 200) status code to return directly to the client without
              calling the backend service.
            * An instance of requests.models.Response to return directly to the client without
              calling the backend service.
            * An instance of requests.models.Request which represents a new/modified request
              that will be forwarded to the backend service.
            * Any other value, in which case a 503 Bad Gateway is returned to the client
              without calling the backend service.
        """
        return True
test_runner.py 文件源码 项目:webhook2lambda2sqs 作者: jantman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_run_test_bad_method(self, capsys):
        conf = Mock()
        conf.get.return_value = {
            'ep1': {'method': 'FOO'}
        }
        args = Mock(endpoint_name='ep1')
        res1 = Mock(spec_set=Response)
        type(res1).status_code = 200
        type(res1).content = 'res1content'
        type(res1).headers = {
            'h1': 'h1val',
            'hz': 'hzval'
        }

        with patch.multiple(
            pbm,
            autospec=True,
            requests=DEFAULT,
            logger=DEFAULT,
            get_base_url=DEFAULT,
            node=DEFAULT
        ) as mocks:
            mocks['get_base_url'].return_value = 'mybase/'
            mocks['node'].return_value = 'mynode'
            mocks['requests'].get.return_value = res1
            with pytest.raises(Exception) as excinfo:
                run_test(conf, args)
        assert exc_msg(excinfo.value) == 'Unimplemented method: FOO'
serverleak.py 文件源码 项目:PrivacyScore 作者: PrivacyScore 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _response_to_json(resp: Response) -> bytes:
    """Generate a json byte string from a response received through requests."""
    # we store only the top of the file because core dumps can become very large
    # also: we do not want to store more potentially sensitive data than necessary
    # to determine whether there is a leak or not

    return json.dumps({
        'text': resp.content[0:50*1024].decode(errors='replace'),
        'status_code': resp.status_code,
        'headers': dict(resp.headers),
        'url': resp.url,
    }).encode()
http_client.py 文件源码 项目:StockScraper 作者: stovorov 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_response(called_method: Callable) -> Callable:
    """Decorator for HTTPClient validating received response for REST API methods.
    If status code from server is different than 200 raises HTTPError.

    :param called_method: requests methods - POST, GET, PUT, PATH, DELETE
    :return: response from server
    """
    def wrapper(method: Callable, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
        """Executes decorated function and checks status_code."""
        resp = called_method(method, endpoint, *args, **kwargs)
        if resp.status_code != 200:
            logging.error('HTTP Client returned status code %s for url %s', resp.status_code, resp.url)
            raise HTTPError('HTTP Client returned status code {0} for url {1}'.format(str(resp.status_code), resp.url))
        return resp
    return wrapper
http_client.py 文件源码 项目:StockScraper 作者: stovorov 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def to_json(response: Response) -> dict:
        """Convert response from requests to json object.

        :param response: response from requests
        :return: json object
        """
        return response.json()
base.py 文件源码 项目:openregistry.api 作者: openprocurement 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUpDS(self):
        self.app.app.registry.docservice_url = 'http://localhost'
        test = self
        def request(method, url, **kwargs):
            response = Response()
            if method == 'POST' and '/upload' in url:
                url = test.generate_docservice_url()
                response.status_code = 200
                response.encoding = 'application/json'
                response._content = '{{"data":{{"url":"{url}","hash":"md5:{md5}","format":"application/msword","title":"name.doc"}},"get_url":"{url}"}}'.format(url=url, md5='0'*32)
                response.reason = '200 OK'
            return response

        self._srequest = SESSION.request
        SESSION.request = request
base.py 文件源码 项目:openregistry.api 作者: openprocurement 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUpBadDS(self):
        self.app.app.registry.docservice_url = 'http://localhost'
        def request(method, url, **kwargs):
            response = Response()
            response.status_code = 403
            response.encoding = 'application/json'
            response._content = '"Unauthorized: upload_view failed permission check"'
            response.reason = '403 Forbidden'
            return response

        self._srequest = SESSION.request
        SESSION.request = request
base.py 文件源码 项目:openregistry.api 作者: openprocurement 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def do_request(self, req, status=None, expect_errors=None):
        req.headers.environ["HTTP_HOST"] = self.hostname
        if hasattr(self, 'file_obj') and not self.file_obj.closed:
            self.file_obj.write(req.as_bytes(True))
            self.file_obj.write("\n")
            if req.body:
                try:
                    self.file_obj.write(
                            'DATA:\n' + json.dumps(json.loads(req.body), indent=2, ensure_ascii=False).encode('utf8'))
                    self.file_obj.write("\n")
                except:
                    pass
            self.file_obj.write("\n")
        resp = super(DumpsTestAppwebtest, self).do_request(req, status=status, expect_errors=expect_errors)
        if hasattr(self, 'file_obj') and not self.file_obj.closed:
            headers = [(n.title(), v)
                       for n, v in resp.headerlist
                       if n.lower() != 'content-length']
            headers.sort()
            self.file_obj.write(str('Response: %s\n%s\n') % (
                resp.status,
                str('\n').join([str('%s: %s') % (n, v) for n, v in headers]),
            ))

            if resp.testbody:
                try:
                    self.file_obj.write(json.dumps(json.loads(resp.testbody), indent=2, ensure_ascii=False).encode('utf8'))
                except:
                    pass
            self.file_obj.write("\n\n")
        return resp
test_exceptions.py 文件源码 项目:python-gnocchiclient 作者: gnocchixyz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_from_response_404(self):
        r = models.Response()
        r.status_code = 404
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"description": "Archive policy rule foobar does not exist"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ArchivePolicyRuleNotFound)
test_exceptions.py 文件源码 项目:python-gnocchiclient 作者: gnocchixyz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_resource_type_before_resource(self):
        r = models.Response()
        r.status_code = 404
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"description": "Resource type foobar does not exist"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ResourceTypeNotFound)
test_exceptions.py 文件源码 项目:python-gnocchiclient 作者: gnocchixyz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_from_response_keystone_401(self):
        r = models.Response()
        r.status_code = 401
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps({"error": {
            "message": "The request you have made requires authentication.",
            "code": 401, "title": "Unauthorized"}}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.Unauthorized)
        self.assertEqual("The request you have made requires authentication.",
                         exc.message)
test_exceptions.py 文件源码 项目:python-gnocchiclient 作者: gnocchixyz 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_from_response_unknown_middleware(self):
        r = models.Response()
        r.status_code = 400
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"unknown": "random message"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ClientException)
        self.assertEqual('{"unknown": "random message"}', exc.message)
test_comment_sidecar.py 文件源码 项目:comment-sidecar 作者: phauer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post_comment(post_payload, assert_success: bool=True) -> Response:
    response = requests.post(url=COMMENT_SIDECAR_URL, json=post_payload)
    if assert_success:
        assert_that(response.status_code) \
            .described_as("Comment creation failed. Message: " + response.text) \
            .is_equal_to(201)
    return response
test_comment_sidecar.py 文件源码 项目:comment-sidecar 作者: phauer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_comments(site: str = DEFAULT_SITE, path: str = DEFAULT_PATH, assert_success: bool=True) -> Response:
    response = requests.get("{}?site={}&path={}".format(COMMENT_SIDECAR_URL, site, path))
    if assert_success:
        assert_that(response.status_code)\
            .described_as("Getting comments failed. Message: " + response.text)\
            .is_equal_to(200)
    return response
download.py 文件源码 项目:Callandtext 作者: iaora 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
download.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
basetestcase.py 文件源码 项目:wukong 作者: SurveyMonkey 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _make_response(self, data, status_code=200):
        from requests.models import Response
        response = Response()
        response._content = json.dumps(data).encode('utf-8')
        response.status_code = status_code
        return response
decorators.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
    r = Response()
    r.status_code = status_code
    r.encoding = encoding
    r.raw = RequestsStringIO(msg.encode())
    r._content_consumed = True
    r._content = r.raw.read()
    return r

# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
test_netbox.py 文件源码 项目:netbox-as-ansible-inventory 作者: AAbouZaid 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def mock_response(json_payload):
    response = Response()
    response.status_code = 200
    response.json = MagicMock(return_value=json_payload)
    return MagicMock(return_value=response)

# Fake API output.
basespider.py 文件源码 项目:xspider 作者: zym1115718204 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parser(response):
        """
        Parser Response to Result
        :param response:
        :return: dict
        """
        result = {
            "url": response.url,
            "title": response.doc('title').text(),
        }
        return result
guangdong_spider.py 文件源码 项目:xspider 作者: zym1115718204 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_downloader(self, url, args):
        """
        Start Downloader
        """
        resp = Response()
        if url.find(u'?args=') > -1:
            real_url, search_word = url.split('?args=')
            search_word = unicode(unquote(search_word))
            print 'url: ', real_url
            print 'search_word: ', search_word
            c = IndustryAndCommerceGeetestCrack(
                url=real_url,
                search_text=search_word,
                input_id="content",
                search_element_id="search",
                gt_element_class_name="gt_box",
                gt_slider_knob_name="gt_slider_knob",
                result_numbers_xpath='/html/body/div[1]/div[6]/div[1]/span',
                result_list_verify_class='clickStyle')
            result, cookies = c.crack()
            current_url = real_url
            body = result.encode('utf-8') if result else u'<html>??????</html>'.encode('utf-8')
            # resp.status_code = 200
            resp._content = body
            resp.url = real_url
            resp.doc = PyQuery(body)
            return resp
        else:
            resp = self.download(url, args=args)
            return resp
download.py 文件源码 项目:Sudoku-Solver 作者: ayush1997 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
download.py 文件源码 项目:youtube-trending-music 作者: ishan-nitj 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
decorators.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
    r = Response()
    r.status_code = status_code
    r.encoding = encoding
    r.raw = RequestsStringIO(msg.encode())
    r._content_consumed = True
    r._content = r.raw.read()
    return r

# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
spider_response.py 文件源码 项目:Sasila 作者: DarkSand 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __str__(self):
        if isinstance(self.m_response, Response_name):
            if self.m_response:
                return "<Response [%s] [%s] [%.2f KB]>" % (
                    self.m_response.status_code, self.m_response.url, (float(len(self.m_response.content)) / 1000))
            else:
                return "<Response failed: %s>" % self.request.url
        else:
            return "<Selenium Response: %s>" % self.request.url


问题


面经


文章

微信
公众号

扫码关注公众号