python类Response()的实例源码

work.py 文件源码 项目:biji 作者: jianmoumou 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def judge_response():
    import requests
    from requests.models import Response
    body = requests.get("http://www.126.com")
    if isinstance(body, Response):
        print "yes"
    else:
        print "no"
testutils.py 文件源码 项目:firebase-admin-python 作者: firebase 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send(self, request, **kwargs):
        request._extra_kwargs = kwargs
        self._recorder.append(request)
        resp = models.Response()
        resp.url = request.url
        resp.status_code = self._status
        resp.raw = six.BytesIO(self._data.encode())
        return resp
test_misc.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_parallel_download():

    file_length = 10000000

    class DownloadListener(ProxyListener):

        def forward_request(self, method, path, data, headers):
            sleep_time = int(path.replace('/', ''))
            time.sleep(sleep_time)
            response = Response()
            response.status_code = 200
            response._content = ('%s' % sleep_time) * file_length
            return response

    test_port = 12124
    tmp_file_pattern = '/tmp/test.%s'

    proxy = GenericProxy(port=test_port, update_listener=DownloadListener())
    proxy.start()

    def do_download(param):
        tmp_file = tmp_file_pattern % param
        TMP_FILES.append(tmp_file)
        download('http://localhost:%s/%s' % (test_port, param), tmp_file)

    values = (1, 2, 3)
    parallelize(do_download, values)
    proxy.stop()

    for val in values:
        tmp_file = tmp_file_pattern % val
        assert len(load_file(tmp_file)) == file_length
test_api_gateway.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_api_gateway_http_integration():
    test_port = 12123
    backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)

    # create target HTTP backend
    class TestListener(ProxyListener):

        def forward_request(self, **kwargs):
            response = Response()
            response.status_code = 200
            response._content = kwargs.get('data') or '{}'
            return response

    proxy = GenericProxy(test_port, update_listener=TestListener())
    proxy.start()

    # create API Gateway and connect it to the HTTP backend
    result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)

    url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
        stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)

    # make sure CORS headers are present
    origin = 'localhost'
    result = requests.options(url, headers={'origin': origin})
    assert result.status_code == 200
    assert re.match(result.headers['Access-Control-Allow-Origin'].replace('*', '.*'), origin)
    assert 'POST' in result.headers['Access-Control-Allow-Methods']

    # make test request to gateway
    result = requests.get(url)
    assert result.status_code == 200
    assert to_str(result.content) == '{}'
    data = {'data': 123}
    result = requests.post(url, data=json.dumps(data))
    assert result.status_code == 200
    assert json.loads(to_str(result.content)) == data

    # clean up
    proxy.stop()
cloudformation_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def error_response(message, code=400, error_type='ValidationError'):
    response = Response()
    response.status_code = code
    response.headers['x-amzn-errortype'] = error_type
    response._content = """<ErrorResponse xmlns="%s">
          <Error>
            <Type>Sender</Type>
            <Code>%s</Code>
            <Message>%s</Message>
          </Error>
          <RequestId>%s</RequestId>
        </ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
    return response
cloudformation_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_response(operation_name, content='', code=200):
    response = Response()
    response._content = """<{op_name}Response xmlns="{xmlns}">
      <{op_name}Result>
        {content}
      </{op_name}Result>
      <ResponseMetadata><RequestId>{uid}</RequestId></ResponseMetadata>
    </{op_name}Response>""".format(xmlns=XMLNS_CLOUDFORMATION,
        op_name=operation_name, uid=uuid.uuid4(), content=content)
    response.status_code = code
    return response
apigateway_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create_response_object(content, code, headers):
    response = Response()
    response.status_code = code
    response.headers = headers
    response._content = content
    return response
apigateway_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_cors_response(headers):
    # TODO: for now we simply return "allow-all" CORS headers, but in the future
    # we should implement custom headers for CORS rules, as supported by API Gateway:
    # http://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-cors.html
    response = Response()
    response.status_code = 200
    response.headers['Access-Control-Allow-Origin'] = '*'
    response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
    response.headers['Access-Control-Allow-Headers'] = '*'
    response._content = ''
    return response
generic_proxy.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def return_response(self, method, path, data, headers, response):
        """ This interceptor method is called by the proxy when returning a response
            (*after* having forwarded the request and received a response from the backend
            service). It receives details of the incoming request as well as the response
            from the backend service, and returns either of the following results:

            * An instance of requests.models.Response to return to the client instead of the
              actual response returned from the backend service.
            * Any other value, in which case the response from the backend service is
              returned to the client.
        """
        return None
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_cors(bucket_name):
    response = Response()
    cors = BUCKET_CORS.get(bucket_name)
    if not cors:
        # TODO: check if bucket exists, otherwise return 404-like error
        cors = {
            'CORSConfiguration': []
        }
    body = xmltodict.unparse(cors)
    response._content = body
    response.status_code = 200
    return response
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_cors(bucket_name, cors):
    # TODO: check if bucket exists, otherwise return 404-like error
    if isinstance(cors, six.string_types):
        cors = xmltodict.parse(cors)
    BUCKET_CORS[bucket_name] = cors
    response = Response()
    response.status_code = 200
    return response
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_lifecycle(bucket_name):
    response = Response()
    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)
    if not lifecycle:
        # TODO: check if bucket exists, otherwise return 404-like error
        lifecycle = {
            'LifecycleConfiguration': []
        }
    body = xmltodict.unparse(lifecycle)
    response._content = body
    response.status_code = 200
    return response
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set_lifecycle(bucket_name, lifecycle):
    # TODO: check if bucket exists, otherwise return 404-like error
    if isinstance(to_str(lifecycle), six.string_types):
        lifecycle = xmltodict.parse(lifecycle)
    BUCKET_LIFECYCLE[bucket_name] = lifecycle
    response = Response()
    response.status_code = 200
    return response
sqs_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def return_response(self, method, path, data, headers, response, request_handler):

        if method == 'POST' and path == '/':
            req_data = urlparse.parse_qs(to_str(data))
            action = req_data.get('Action', [None])[0]
            event_type = None
            queue_url = None
            if action == 'CreateQueue':
                event_type = event_publisher.EVENT_SQS_CREATE_QUEUE
                response_data = xmltodict.parse(response.content)
                if 'CreateQueueResponse' in response_data:
                    queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']
            elif action == 'DeleteQueue':
                event_type = event_publisher.EVENT_SQS_DELETE_QUEUE
                queue_url = req_data.get('QueueUrl', [None])[0]

            if event_type and queue_url:
                event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})

            # patch the response and return the correct endpoint URLs
            if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues'):
                content_str = content_str_original = to_str(response.content)
                new_response = Response()
                new_response.status_code = response.status_code
                new_response.headers = response.headers
                if config.USE_SSL and '<QueueUrl>http://' in content_str:
                    # return https://... if we're supposed to use SSL
                    content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)
                # expose external hostname:port
                external_port = get_external_port(headers, request_handler)
                content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',
                    r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port), content_str)
                new_response._content = content_str
                if content_str_original != new_response._content:
                    # if changes have been made, return patched response
                    new_response.headers['content-length'] = len(new_response._content)
                    return new_response


# extract the external port used by the client to make the request
kinesis_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def kinesis_error_response(data):
    error_response = Response()
    error_response.status_code = 200
    content = {'FailedRecordCount': 1, 'Records': []}
    for record in data['Records']:
        content['Records'].append({
            'ErrorCode': 'ProvisionedThroughputExceededException',
            'ErrorMessage': 'Rate exceeded for shard X in stream Y under account Z.'
        })
    error_response._content = json.dumps(content)
    return error_response
sns_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_response(op_name, content=''):
    response = Response()
    if not content:
        content = '<MessageId>%s</MessageId>' % short_uid()
    response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
        <{op_name}Result>
            {content}
        </{op_name}Result>
        <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>
        </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())
    response.status_code = 200
    return response
dynamodb_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def error_response(message=None, error_type=None, code=400):
    if not message:
        message = 'Unknown error'
    if not error_type:
        error_type = 'UnknownError'
    if 'com.amazonaws.dynamodb' not in error_type:
        error_type = 'com.amazonaws.dynamodb.v20120810#%s' % error_type
    response = Response()
    response.status_code = code
    content = {
        'message': message,
        '__type': error_type
    }
    response._content = json.dumps(content)
    return response
download.py 文件源码 项目:MyFriend-Rob 作者: lcheniv 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
test_api_gateway.py 文件源码 项目:localstack 作者: atlassian 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_api_gateway_http_integration():
    test_port = 12123
    backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)

    # create target HTTP backend
    def listener(**kwargs):
        response = Response()
        response.status_code = 200
        response._content = json.dumps(kwargs['data']) if kwargs['data'] else '{}'
        return response

    proxy = GenericProxy(test_port, update_listener=listener)
    proxy.start()

    # create API Gateway and connect it to the HTTP backend
    result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)

    # make test request to gateway
    url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
        stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)
    result = requests.get(url)
    assert result.status_code == 200
    assert to_str(result.content) == '{}'
    data = {"data": 123}
    result = requests.post(url, data=json.dumps(data))
    assert result.status_code == 200
    assert json.loads(to_str(result.content)) == data

    # clean up
    proxy.stop()
cloudformation_listener.py 文件源码 项目:localstack 作者: atlassian 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def error_response(message, code=400, error_type='ValidationError'):
    response = Response()
    response.status_code = code
    response.headers['x-amzn-errortype'] = error_type
    response._content = """<ErrorResponse xmlns="%s">
          <Error>
            <Type>Sender</Type>
            <Code>%s</Code>
            <Message>%s</Message>
          </Error>
          <RequestId>%s</RequestId>
        </ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
    return response


问题


面经


文章

微信
公众号

扫码关注公众号