def mock_nsx_clustered_api(self, session_response=None, **kwargs):
orig_request = nsx_cluster.TimeoutSession.request
def mocked_request(*args, **kwargs):
if args[2].endswith('api/session/create'):
response = models.Response()
response.status_code = 200
response.headers = {
'Set-Cookie': 'JSESSIONID=%s;junk' % JSESSIONID}
return response
return orig_request(*args, **kwargs)
with mock.patch.object(nsx_cluster.TimeoutSession, 'request',
new=mocked_request):
cluster = NsxClientTestCase.MockNSXClusteredAPI(
session_response=session_response, **kwargs)
return cluster
python类Response()的实例源码
def call_endpoint(self, method: str, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
"""Call api endpoint method.
:param method: REST API methods name (string) - 'POST', 'GET', 'PUT', 'PATCH', 'DELETE'
:param endpoint: endpoint which will be called with method.
:param args: additional arguments
:param kwargs: additional key-worded arguments
:return:
"""
logging.info('Calling method %s for endpoint %s', method, endpoint)
if method.upper() not in REST_API_METHODS:
logging.error('Method %s does not match REST API METHODS: %s', method, ','.join(REST_API_METHODS))
raise TypeError('Method {0} does not match REST API METHODS: {1}'.format(str(method),
','.join(REST_API_METHODS)))
req_method = getattr(requests, method.lower())
resp = req_method(urljoin(self.host_name, endpoint), *args, **kwargs)
return resp
def expected_error_response(self, response):
if isinstance(response, Response): # requests response
# don't want bytes
_txt = response.content.decode('utf8')
response = ErrorResponse().from_json(_txt)
if isinstance(response, ErrorResponse):
self.conv.events.store(EV_PROTOCOL_RESPONSE, response,
sender=self.__class__.__name__)
if response["error"] not in self.expect_error["error"]:
raise Break("Wrong error, got {} expected {}".format(
response["error"], self.expect_error["error"]))
try:
if self.expect_error["stop"]:
raise Break("Stop requested after received expected error")
except KeyError:
pass
else:
self.conv.events.store(EV_FAULT, "Expected error, didn't get it")
raise Break("Did not receive expected error")
return response
def get_sess_create_resp():
sess_create_response = models.Response()
sess_create_response.status_code = 200
sess_create_response.headers = {'Set-Cookie': 'JSESSIONID=abc;'}
return sess_create_response
def setUp(self):
self.listener = ExecutionListener({})
self.response = Response()
self.response.cookies.update({
'username': 'john',
'sessionid': 'abcd'
})
self.context = Context('http://localhost')
self.context.headers['Cookie'] = 'name="John Doe"; sessionid=xyz'
def forward_request(self, method, path, data, headers):
""" This interceptor method is called by the proxy when receiving a new request
(*before* forwarding the request to the backend service). It receives details
of the incoming request, and returns either of the following results:
* True if the request should be forwarded to the backend service as-is (default).
* An integer (e.g., 200) status code to return directly to the client without
calling the backend service.
* An instance of requests.models.Response to return directly to the client without
calling the backend service.
* An instance of requests.models.Request which represents a new/modified request
that will be forwarded to the backend service.
* Any other value, in which case a 503 Bad Gateway is returned to the client
without calling the backend service.
"""
return True
def test_run_test_bad_method(self, capsys):
conf = Mock()
conf.get.return_value = {
'ep1': {'method': 'FOO'}
}
args = Mock(endpoint_name='ep1')
res1 = Mock(spec_set=Response)
type(res1).status_code = 200
type(res1).content = 'res1content'
type(res1).headers = {
'h1': 'h1val',
'hz': 'hzval'
}
with patch.multiple(
pbm,
autospec=True,
requests=DEFAULT,
logger=DEFAULT,
get_base_url=DEFAULT,
node=DEFAULT
) as mocks:
mocks['get_base_url'].return_value = 'mybase/'
mocks['node'].return_value = 'mynode'
mocks['requests'].get.return_value = res1
with pytest.raises(Exception) as excinfo:
run_test(conf, args)
assert exc_msg(excinfo.value) == 'Unimplemented method: FOO'
def _response_to_json(resp: Response) -> bytes:
"""Generate a json byte string from a response received through requests."""
# we store only the top of the file because core dumps can become very large
# also: we do not want to store more potentially sensitive data than necessary
# to determine whether there is a leak or not
return json.dumps({
'text': resp.content[0:50*1024].decode(errors='replace'),
'status_code': resp.status_code,
'headers': dict(resp.headers),
'url': resp.url,
}).encode()
def validate_response(called_method: Callable) -> Callable:
"""Decorator for HTTPClient validating received response for REST API methods.
If status code from server is different than 200 raises HTTPError.
:param called_method: requests methods - POST, GET, PUT, PATH, DELETE
:return: response from server
"""
def wrapper(method: Callable, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
"""Executes decorated function and checks status_code."""
resp = called_method(method, endpoint, *args, **kwargs)
if resp.status_code != 200:
logging.error('HTTP Client returned status code %s for url %s', resp.status_code, resp.url)
raise HTTPError('HTTP Client returned status code {0} for url {1}'.format(str(resp.status_code), resp.url))
return resp
return wrapper
def to_json(response: Response) -> dict:
"""Convert response from requests to json object.
:param response: response from requests
:return: json object
"""
return response.json()
def setUpDS(self):
self.app.app.registry.docservice_url = 'http://localhost'
test = self
def request(method, url, **kwargs):
response = Response()
if method == 'POST' and '/upload' in url:
url = test.generate_docservice_url()
response.status_code = 200
response.encoding = 'application/json'
response._content = '{{"data":{{"url":"{url}","hash":"md5:{md5}","format":"application/msword","title":"name.doc"}},"get_url":"{url}"}}'.format(url=url, md5='0'*32)
response.reason = '200 OK'
return response
self._srequest = SESSION.request
SESSION.request = request
def setUpBadDS(self):
self.app.app.registry.docservice_url = 'http://localhost'
def request(method, url, **kwargs):
response = Response()
response.status_code = 403
response.encoding = 'application/json'
response._content = '"Unauthorized: upload_view failed permission check"'
response.reason = '403 Forbidden'
return response
self._srequest = SESSION.request
SESSION.request = request
def do_request(self, req, status=None, expect_errors=None):
req.headers.environ["HTTP_HOST"] = self.hostname
if hasattr(self, 'file_obj') and not self.file_obj.closed:
self.file_obj.write(req.as_bytes(True))
self.file_obj.write("\n")
if req.body:
try:
self.file_obj.write(
'DATA:\n' + json.dumps(json.loads(req.body), indent=2, ensure_ascii=False).encode('utf8'))
self.file_obj.write("\n")
except:
pass
self.file_obj.write("\n")
resp = super(DumpsTestAppwebtest, self).do_request(req, status=status, expect_errors=expect_errors)
if hasattr(self, 'file_obj') and not self.file_obj.closed:
headers = [(n.title(), v)
for n, v in resp.headerlist
if n.lower() != 'content-length']
headers.sort()
self.file_obj.write(str('Response: %s\n%s\n') % (
resp.status,
str('\n').join([str('%s: %s') % (n, v) for n, v in headers]),
))
if resp.testbody:
try:
self.file_obj.write(json.dumps(json.loads(resp.testbody), indent=2, ensure_ascii=False).encode('utf8'))
except:
pass
self.file_obj.write("\n\n")
return resp
def test_from_response_404(self):
r = models.Response()
r.status_code = 404
r.headers['Content-Type'] = "application/json"
r._content = json.dumps(
{"description": "Archive policy rule foobar does not exist"}
).encode('utf-8')
exc = exceptions.from_response(r)
self.assertIsInstance(exc, exceptions.ArchivePolicyRuleNotFound)
def test_resource_type_before_resource(self):
r = models.Response()
r.status_code = 404
r.headers['Content-Type'] = "application/json"
r._content = json.dumps(
{"description": "Resource type foobar does not exist"}
).encode('utf-8')
exc = exceptions.from_response(r)
self.assertIsInstance(exc, exceptions.ResourceTypeNotFound)
def test_from_response_keystone_401(self):
r = models.Response()
r.status_code = 401
r.headers['Content-Type'] = "application/json"
r._content = json.dumps({"error": {
"message": "The request you have made requires authentication.",
"code": 401, "title": "Unauthorized"}}
).encode('utf-8')
exc = exceptions.from_response(r)
self.assertIsInstance(exc, exceptions.Unauthorized)
self.assertEqual("The request you have made requires authentication.",
exc.message)
def test_from_response_unknown_middleware(self):
r = models.Response()
r.status_code = 400
r.headers['Content-Type'] = "application/json"
r._content = json.dumps(
{"unknown": "random message"}
).encode('utf-8')
exc = exceptions.from_response(r)
self.assertIsInstance(exc, exceptions.ClientException)
self.assertEqual('{"unknown": "random message"}', exc.message)
def post_comment(post_payload, assert_success: bool=True) -> Response:
response = requests.post(url=COMMENT_SIDECAR_URL, json=post_payload)
if assert_success:
assert_that(response.status_code) \
.described_as("Comment creation failed. Message: " + response.text) \
.is_equal_to(201)
return response
def get_comments(site: str = DEFAULT_SITE, path: str = DEFAULT_PATH, assert_success: bool=True) -> Response:
response = requests.get("{}?site={}&path={}".format(COMMENT_SIDECAR_URL, site, path))
if assert_success:
assert_that(response.status_code)\
.described_as("Getting comments failed. Message: " + response.text)\
.is_equal_to(200)
return response
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def _make_response(self, data, status_code=200):
from requests.models import Response
response = Response()
response._content = json.dumps(data).encode('utf-8')
response.status_code = status_code
return response
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
r = Response()
r.status_code = status_code
r.encoding = encoding
r.raw = RequestsStringIO(msg.encode())
r._content_consumed = True
r._content = r.raw.read()
return r
# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
def mock_response(json_payload):
response = Response()
response.status_code = 200
response.json = MagicMock(return_value=json_payload)
return MagicMock(return_value=response)
# Fake API output.
def parser(response):
"""
Parser Response to Result
:param response:
:return: dict
"""
result = {
"url": response.url,
"title": response.doc('title').text(),
}
return result
def start_downloader(self, url, args):
"""
Start Downloader
"""
resp = Response()
if url.find(u'?args=') > -1:
real_url, search_word = url.split('?args=')
search_word = unicode(unquote(search_word))
print 'url: ', real_url
print 'search_word: ', search_word
c = IndustryAndCommerceGeetestCrack(
url=real_url,
search_text=search_word,
input_id="content",
search_element_id="search",
gt_element_class_name="gt_box",
gt_slider_knob_name="gt_slider_knob",
result_numbers_xpath='/html/body/div[1]/div[6]/div[1]/span',
result_list_verify_class='clickStyle')
result, cookies = c.crack()
current_url = real_url
body = result.encode('utf-8') if result else u'<html>??????</html>'.encode('utf-8')
# resp.status_code = 200
resp._content = body
resp.url = real_url
resp.doc = PyQuery(body)
return resp
else:
resp = self.download(url, args=args)
return resp
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
r = Response()
r.status_code = status_code
r.encoding = encoding
r.raw = RequestsStringIO(msg.encode())
r._content_consumed = True
r._content = r.raw.read()
return r
# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
def __str__(self):
if isinstance(self.m_response, Response_name):
if self.m_response:
return "<Response [%s] [%s] [%.2f KB]>" % (
self.m_response.status_code, self.m_response.url, (float(len(self.m_response.content)) / 1000))
else:
return "<Response failed: %s>" % self.request.url
else:
return "<Selenium Response: %s>" % self.request.url