def judge_response():
import requests
from requests.models import Response
body = requests.get("http://www.126.com")
if isinstance(body, Response):
print "yes"
else:
print "no"
python类Response()的实例源码
def send(self, request, **kwargs):
request._extra_kwargs = kwargs
self._recorder.append(request)
resp = models.Response()
resp.url = request.url
resp.status_code = self._status
resp.raw = six.BytesIO(self._data.encode())
return resp
def run_parallel_download():
file_length = 10000000
class DownloadListener(ProxyListener):
def forward_request(self, method, path, data, headers):
sleep_time = int(path.replace('/', ''))
time.sleep(sleep_time)
response = Response()
response.status_code = 200
response._content = ('%s' % sleep_time) * file_length
return response
test_port = 12124
tmp_file_pattern = '/tmp/test.%s'
proxy = GenericProxy(port=test_port, update_listener=DownloadListener())
proxy.start()
def do_download(param):
tmp_file = tmp_file_pattern % param
TMP_FILES.append(tmp_file)
download('http://localhost:%s/%s' % (test_port, param), tmp_file)
values = (1, 2, 3)
parallelize(do_download, values)
proxy.stop()
for val in values:
tmp_file = tmp_file_pattern % val
assert len(load_file(tmp_file)) == file_length
def test_api_gateway_http_integration():
test_port = 12123
backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)
# create target HTTP backend
class TestListener(ProxyListener):
def forward_request(self, **kwargs):
response = Response()
response.status_code = 200
response._content = kwargs.get('data') or '{}'
return response
proxy = GenericProxy(test_port, update_listener=TestListener())
proxy.start()
# create API Gateway and connect it to the HTTP backend
result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)
url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)
# make sure CORS headers are present
origin = 'localhost'
result = requests.options(url, headers={'origin': origin})
assert result.status_code == 200
assert re.match(result.headers['Access-Control-Allow-Origin'].replace('*', '.*'), origin)
assert 'POST' in result.headers['Access-Control-Allow-Methods']
# make test request to gateway
result = requests.get(url)
assert result.status_code == 200
assert to_str(result.content) == '{}'
data = {'data': 123}
result = requests.post(url, data=json.dumps(data))
assert result.status_code == 200
assert json.loads(to_str(result.content)) == data
# clean up
proxy.stop()
def error_response(message, code=400, error_type='ValidationError'):
response = Response()
response.status_code = code
response.headers['x-amzn-errortype'] = error_type
response._content = """<ErrorResponse xmlns="%s">
<Error>
<Type>Sender</Type>
<Code>%s</Code>
<Message>%s</Message>
</Error>
<RequestId>%s</RequestId>
</ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
return response
def make_response(operation_name, content='', code=200):
response = Response()
response._content = """<{op_name}Response xmlns="{xmlns}">
<{op_name}Result>
{content}
</{op_name}Result>
<ResponseMetadata><RequestId>{uid}</RequestId></ResponseMetadata>
</{op_name}Response>""".format(xmlns=XMLNS_CLOUDFORMATION,
op_name=operation_name, uid=uuid.uuid4(), content=content)
response.status_code = code
return response
def _create_response_object(content, code, headers):
response = Response()
response.status_code = code
response.headers = headers
response._content = content
return response
def get_cors_response(headers):
# TODO: for now we simply return "allow-all" CORS headers, but in the future
# we should implement custom headers for CORS rules, as supported by API Gateway:
# http://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-cors.html
response = Response()
response.status_code = 200
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response.headers['Access-Control-Allow-Headers'] = '*'
response._content = ''
return response
def return_response(self, method, path, data, headers, response):
""" This interceptor method is called by the proxy when returning a response
(*after* having forwarded the request and received a response from the backend
service). It receives details of the incoming request as well as the response
from the backend service, and returns either of the following results:
* An instance of requests.models.Response to return to the client instead of the
actual response returned from the backend service.
* Any other value, in which case the response from the backend service is
returned to the client.
"""
return None
def get_cors(bucket_name):
response = Response()
cors = BUCKET_CORS.get(bucket_name)
if not cors:
# TODO: check if bucket exists, otherwise return 404-like error
cors = {
'CORSConfiguration': []
}
body = xmltodict.unparse(cors)
response._content = body
response.status_code = 200
return response
def set_cors(bucket_name, cors):
# TODO: check if bucket exists, otherwise return 404-like error
if isinstance(cors, six.string_types):
cors = xmltodict.parse(cors)
BUCKET_CORS[bucket_name] = cors
response = Response()
response.status_code = 200
return response
def get_lifecycle(bucket_name):
response = Response()
lifecycle = BUCKET_LIFECYCLE.get(bucket_name)
if not lifecycle:
# TODO: check if bucket exists, otherwise return 404-like error
lifecycle = {
'LifecycleConfiguration': []
}
body = xmltodict.unparse(lifecycle)
response._content = body
response.status_code = 200
return response
def set_lifecycle(bucket_name, lifecycle):
# TODO: check if bucket exists, otherwise return 404-like error
if isinstance(to_str(lifecycle), six.string_types):
lifecycle = xmltodict.parse(lifecycle)
BUCKET_LIFECYCLE[bucket_name] = lifecycle
response = Response()
response.status_code = 200
return response
def return_response(self, method, path, data, headers, response, request_handler):
if method == 'POST' and path == '/':
req_data = urlparse.parse_qs(to_str(data))
action = req_data.get('Action', [None])[0]
event_type = None
queue_url = None
if action == 'CreateQueue':
event_type = event_publisher.EVENT_SQS_CREATE_QUEUE
response_data = xmltodict.parse(response.content)
if 'CreateQueueResponse' in response_data:
queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']
elif action == 'DeleteQueue':
event_type = event_publisher.EVENT_SQS_DELETE_QUEUE
queue_url = req_data.get('QueueUrl', [None])[0]
if event_type and queue_url:
event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})
# patch the response and return the correct endpoint URLs
if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues'):
content_str = content_str_original = to_str(response.content)
new_response = Response()
new_response.status_code = response.status_code
new_response.headers = response.headers
if config.USE_SSL and '<QueueUrl>http://' in content_str:
# return https://... if we're supposed to use SSL
content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)
# expose external hostname:port
external_port = get_external_port(headers, request_handler)
content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',
r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port), content_str)
new_response._content = content_str
if content_str_original != new_response._content:
# if changes have been made, return patched response
new_response.headers['content-length'] = len(new_response._content)
return new_response
# extract the external port used by the client to make the request
def kinesis_error_response(data):
error_response = Response()
error_response.status_code = 200
content = {'FailedRecordCount': 1, 'Records': []}
for record in data['Records']:
content['Records'].append({
'ErrorCode': 'ProvisionedThroughputExceededException',
'ErrorMessage': 'Rate exceeded for shard X in stream Y under account Z.'
})
error_response._content = json.dumps(content)
return error_response
def make_response(op_name, content=''):
response = Response()
if not content:
content = '<MessageId>%s</MessageId>' % short_uid()
response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<{op_name}Result>
{content}
</{op_name}Result>
<ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>
</{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())
response.status_code = 200
return response
def error_response(message=None, error_type=None, code=400):
if not message:
message = 'Unknown error'
if not error_type:
error_type = 'UnknownError'
if 'com.amazonaws.dynamodb' not in error_type:
error_type = 'com.amazonaws.dynamodb.v20120810#%s' % error_type
response = Response()
response.status_code = code
content = {
'message': message,
'__type': error_type
}
response._content = json.dumps(content)
return response
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def test_api_gateway_http_integration():
test_port = 12123
backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)
# create target HTTP backend
def listener(**kwargs):
response = Response()
response.status_code = 200
response._content = json.dumps(kwargs['data']) if kwargs['data'] else '{}'
return response
proxy = GenericProxy(test_port, update_listener=listener)
proxy.start()
# create API Gateway and connect it to the HTTP backend
result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)
# make test request to gateway
url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)
result = requests.get(url)
assert result.status_code == 200
assert to_str(result.content) == '{}'
data = {"data": 123}
result = requests.post(url, data=json.dumps(data))
assert result.status_code == 200
assert json.loads(to_str(result.content)) == data
# clean up
proxy.stop()
def error_response(message, code=400, error_type='ValidationError'):
response = Response()
response.status_code = code
response.headers['x-amzn-errortype'] = error_type
response._content = """<ErrorResponse xmlns="%s">
<Error>
<Type>Sender</Type>
<Code>%s</Code>
<Message>%s</Message>
</Error>
<RequestId>%s</RequestId>
</ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
return response