def get_urlopen_mock(body=DEFAULT_BODY_CONTENT, headers=None, status=200):
mockHttpResponse = Mock(name='httplib.HTTPResponse')
headers = urllib3.response.HTTPHeaderDict(headers)
if not hasattr(body, 'read'):
body = BytesIO(body)
else:
body.seek(0)
urllib3_response = urllib3.HTTPResponse(body,
headers,
status,
preload_content=False,
original_response=mockHttpResponse)
return MagicMock(return_value=urllib3_response)
python类HTTPResponse()的实例源码
def __init__(self, resp):
super(RESTResponse, self).__init__()
# arg: A urllib3.HTTPResponse object
self.urllib3_response = resp
self.status = resp.status
self.version = resp.version
self.reason = resp.reason
self.strict = resp.strict
self.is_closed = False
def closed(self):
return self.is_closed
# ---------------------------------
# Backwards compat for HTTPResponse
# ---------------------------------
def _provide_ssl_auth_required(self):
"""
Provide ssl auth response
:return: urllib3.HTTPResponse
"""
response = HTTPResponse()
response.status = self.DEFAULT_SSL_CERT_REQUIRED_STATUSES
response.__setattr__('_body', ' ')
return response
def __init__(self, resp):
# arg: A urllib3.HTTPResponse object
self.urllib3_response = resp
self.status = resp.status
self.version = resp.version
self.reason = resp.reason
self.strict = resp.strict
self.is_closed = False
def closed(self):
return self.is_closed
# ---------------------------------
# Backwards compat for HTTPResponse
# ---------------------------------
def __init__(self, resp):
# arg: A urllib3.HTTPResponse object
self.urllib3_response = resp
self.status = resp.status
self.version = resp.version
self.reason = resp.reason
self.strict = resp.strict
self.is_closed = False
def closed(self):
return self.is_closed
# ---------------------------------
# Backwards compat for HTTPResponse
# ---------------------------------
def request(self, url):
"""
Client request SSL
:param str url: request uri
:return: urllib3.HTTPResponse
"""
if self._HTTP_DBG_LEVEL <= self.__debug.level:
self.__debug.debug_request(self._headers, url, self.__cfg.method)
try:
disable_warnings(InsecureRequestWarning)
if self.__cfg.DEFAULT_SCAN == self.__cfg.scan: # directories requests
response = self.__pool.request(self.__cfg.method,
helper.parse_url(url).path,
headers=self._headers,
retries=self.__cfg.retries,
assert_same_host=False,
redirect=False)
self.cookies_middleware(is_accept=self.__cfg.accept_cookies, response=response)
else: # subdomains
response = PoolManager().request(self.__cfg.method, url,
headers=self._headers,
retries=self.__cfg.retries,
assert_same_host=False,
redirect=False)
return response
except MaxRetryError:
if self.__cfg.DEFAULT_SCAN == self.__cfg.scan:
self.__tpl.warning(key='max_retry_error', url=helper.parse_url(url).path)
except HostChangedError as error:
self.__tpl.warning(key='host_changed_error', details=error)
pass
except ReadTimeoutError:
self.__tpl.warning(key='read_timeout_error', url=url)
pass
except ConnectTimeoutError:
self.__tpl.warning(key='connection_timeout_error', url=url)
pass
except SSLError:
if self.__cfg.DEFAULT_SCAN != self.__cfg.scan:
return self._provide_ssl_auth_required()