def test_submit_problem_web(self):
common.ensure_login()
common.ensure_api_key()
# publish_time is before the first publish time
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_web(1475279990, 1451606400)
assert cm.exception.response.status_code == 403
# Correct publish_time
self.submit_problem_web(1475280000, 1451606400)
self.submit_problem_web(1480550400, 1451606400)
# publish_time is after the last publish time
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_web(1480550410, 1451606400)
assert cm.exception.response.status_code == 403
# publish_time is past
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_web(1475280000, 1475280001)
assert cm.exception.response.status_code == 403
python类HTTPError()的实例源码
def test_submit_problem_api(self):
common.ensure_login()
common.ensure_api_key()
# publish_time is before the first publish time
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_api(1475279990, 1451606400)
assert cm.exception.response.status_code == 403
# Correct publish_time
self.submit_problem_api(1475280000, 1451606400)
self.submit_problem_api(1480550400, 1451606400)
# publish_time is after the last publish time
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_api(1480550410, 1451606400)
assert cm.exception.response.status_code == 403
# publish_time is past
with self.assertRaises(requests.HTTPError) as cm:
self.submit_problem_api(1475280000, 1475280001)
assert cm.exception.response.status_code == 403
def api_delete(server_name, api, session_id):
#Header and URL for delete call
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.delete(url, headers=headers, verify=False)
except requests.ConnectionError:
raise TintrRequestsiApiException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# PUT
def api_put(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.put(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# POST
def api_post(server_name, api, payload, session_id):
headers = {'content-type': 'application/json',
'cookie': 'JSESSIONID='+session_id }
url = 'https://' + server_name + API + api
try:
# Invoke the API.
r = requests.post(url, data=json.dumps(payload),
headers=headers, verify=False)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except:
raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")
return r
# Login.
def download_file(server_name, report_url, session_id, file_name):
headers = {'content-type': 'application/json'}
try:
r = requests.get(report_url, headers=headers, verify=False, stream=True)
# if HTTP Response is not 200 then raise an exception
if r.status_code != 200:
message = "The HTTP response for get call to the server is not 200."
raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)
with open(file_name, 'w') as file_h:
for block in r.iter_content(4096):
file_h.write(block)
except requests.ConnectionError:
raise TintriRequestsException("API Connection error occurred.")
except requests.HTTPError:
raise TintriRequestsException("HTTP error occurred.")
except requests.Timeout:
raise TintriRequestsException("Request timed out.")
except Exception as e:
raise TintriRequestsException("An unexpected error: " + e.__str__())
def login(self):
login_params = {
'j_username': GlobalVal.username,
'j_password': GlobalVal.password,
'validateCode': GlobalVal.secretCode
}
login_result = ''
try:
login_result = self.__session.post(self.home_url, data=login_params, headers=self.header,timeout=6)
# DNS failure, refused connection, etc
except ConnectionError :
return False,'error:ConnectionError.'
# Timeout
except TimeoutError:
return False, 'error:TimeoutError.'
# HTTPError
except requests.HTTPError:
return False, 'error:HTTPError:%s.'%login_result.status_code
except :
return False,'error:network disconnect.'
# Almost everything is ok.
if 'success' in login_result.json():
return True, login_result.json()
else:
return False, login_result.json()
def test_handle_raises_error_if_status_is_400(api_call):
responses.add(api_call.method, api_call.url, body=HTTPError())
with raises(HTTPError):
handle(api_call())
# @responses.activate
# @mark.parametrize('api_call', api_calls)
# def test_call_endpoint_with_query_parameters(api_call):
# responses.add(
# api_call.method,
# api_call.url + '?spam=eggs&foo=bar',
# match_querystring=False
# )
# params = dict(foo='bar', spam='eggs')
# assert api_call(params=params).status_code == 200
def _handle_promises(self):
"""Collect all promises from S3 uploads.
"""
for stream, future in zip(self._streams, self._futures):
exception = future.exception()
if exception:
raise exception
response = future.result()
if response.status_code != 200:
message = 'Something went wrong uploading %s to S3: %s'
log.error(message, response.url, response.text)
raise HTTPError(message)
self._responses.append(response)
stream.close()
def setUp(self):
def getLogger(name):
self.mock_logger = mock.Mock()
return self.mock_logger
sys.modules['logging'].getLogger = getLogger
def get(url, headers):
get_return = mock.Mock()
get_return.ok = True
get_return.json = mock.Mock()
get_return.json.return_value = {'data': {'status': 1}}
return get_return
sys.modules['requests'].get = get
self.env = EnvironmentVarGuard()
self.env.set('CACHET_TOKEN', 'token2')
self.configuration = Configuration('config.yml')
sys.modules['requests'].Timeout = Timeout
sys.modules['requests'].ConnectionError = ConnectionError
sys.modules['requests'].HTTPError = HTTPError
def get_album_json_thread(self):
try:
while not xbmc.abortRequested and not self.abortAlbumThreads:
try:
album_id = self.albumQueue.get_nowait()
except:
break
try:
self.get_album(album_id, withCache=False)
except requests.HTTPError as e:
r = e.response
msg = _T(30505)
try:
msg = r.reason
msg = r.json().get('userMessage')
except:
pass
log('Error getting Album ID %s' % album_id, xbmc.LOGERROR)
if r.status_code == 429 and not self.abortAlbumThreads:
self.abortAlbumThreads = True
log('Too many requests. Aborting Workers ...', xbmc.LOGERROR)
self.albumQueue._init(9999)
xbmcgui.Dialog().notification(plugin.name, msg, xbmcgui.NOTIFICATION_ERROR)
except Exception, e:
traceback.print_exc()
def http_get(self, s, host, port, timeout, appendix=''):
try:
r = s.get('http://{}:{}{}'.format(host, port, appendix), timeout=timeout, verify=False)
# except requests.exceptions.Timeout:
# return None, RequetsHostException('HTTP connection timeout, host: http://{}:{}'
# .format(host, port))
except requests.RequestException as err:
return None, RequetsHostException('{}:{} request error, msg: {}'
.format(host, port, type(err).__name__))
# except requests.HTTPError:
# return None, RequetsHostException('HTTP server response error, host: http://{}:{}'
# .format(host, port))
# except requests.exceptions.RequestException as msg:
# return None, RequetsHostException('call requests.get error, msg: {}'
# .format(msg))
else:
return r, None
def _connect(self):
logger.debug('ARI client listening...')
try:
with self._running():
self.client.run(apps=[APPLICATION_NAME])
except socket.error as e:
if e.errno == errno.EPIPE:
# bug in ari-py when calling client.close(): ignore it and stop
logger.error('Error while listening for ARI events: %s', e)
return
else:
self._connection_error(e)
except (WebSocketException, HTTPError) as e:
self._connection_error(e)
except ValueError:
logger.warning('Received non-JSON message from ARI... disconnecting')
self.client.close()
def _get_mongooseim_history(self, user_uuid, args):
participant_user = args.get('participant_user_uuid')
participant_server = args.get('participant_server_uuid', self._xivo_uuid)
limit = args.get('limit')
user_jid = self._build_jid(self._xivo_uuid, user_uuid)
try:
if participant_user:
participant_jid = self._build_jid(participant_server, participant_user)
result = self.mongooseim_client.get_user_history_with_participant(user_jid,
participant_jid,
limit)
else:
result = self.mongooseim_client.get_user_history(user_jid, limit)
except HTTPError as e:
raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
except RequestException as e:
raise MongooseIMUnreachable(self._xivo_uuid, e)
return result
def send_message(self, request_body, user_uuid=None):
from_xivo_uuid, from_ = self._build_from(request_body, user_uuid)
to_xivo_uuid, to = self._build_to(request_body)
alias = request_body['alias']
msg = request_body['msg']
self.contexts.add(from_, to, to_xivo_uuid=to_xivo_uuid, alias=alias)
from_jid = self._build_jid(from_xivo_uuid, from_)
to_jid = self._build_jid(to_xivo_uuid, to)
try:
self.mongooseim_client.send_message(from_jid, to_jid, msg)
except HTTPError as e:
raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
except RequestException as e:
raise MongooseIMUnreachable(self._xivo_uuid, e)
bus_event = ChatMessageSent((from_xivo_uuid, from_),
(to_xivo_uuid, to),
alias,
msg)
headers = {
'user_uuid:{uuid}'.format(uuid=from_): True,
}
self._bus_publisher.publish(bus_event, headers=headers)
def raise_for_error(response):
try:
response.raise_for_status()
except (requests.HTTPError, requests.ConnectionError) as error:
try:
if len(response.content) == 0:
# There is nothing we can do here since LinkedIn has neither sent
# us a 2xx response nor a response content.
return
response = response.json()
if ('error' in response) or ('errorCode' in response):
message = '%s: %s' % (response.get('error', str(error)),
response.get('message', 'Unknown Error'))
error_code = response.get('status')
ex = get_exception_for_error_code(error_code)
raise ex(message)
else:
raise LinkedInError(error.message)
except (ValueError, TypeError):
raise LinkedInError(error.message)
def _call(self, verb, url, data={}, params={}, headers={}):
if headers:
self.session.headers.update(headers)
log.info('Call %s with data %s', url, data)
resp = self.session.request(verb, url, json=data, params=params)
status_code = resp.status_code
try:
resp.raise_for_status()
except requests.HTTPError as exc:
log.debug('Error occured, endpoint : %s, apikey : %s',
url, self.apikey)
return resp, status_code
except requests.Timeout:
log.error('Request Timeout to %si', url)
return False, status_code
except requests.RequestException:
log.error('Requests Error')
return False, status_code
else:
return resp, status_code
def test_detail_inexsitent_workspace(self):
"""
Workspace Endpoint raises HTTPError when mocking inexistent workspace
detail
"""
msg = ('404 Client Error: Not Found for url: '
'https://connection.keboola.com/v2/storage/workspaces/1')
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/workspaces/1',
body=HTTPError(msg)
)
)
workspace_id = '1'
with self.assertRaises(HTTPError) as error_context:
self.ws.detail(workspace_id)
assert error_context.exception.args[0] == msg
def test_reset_password_for_inexistent_workspace(self):
"""
Workspace endpoint raises HTTPError when mock resetting password for
inexistent workspace
"""
msg = ('404 Client Error: Not Found for url: '
'https://connection.keboola.com/v2/storage/workspaces/1/'
'password')
responses.add(
responses.Response(
method='POST',
url=('https://connection.keboola.com/v2/storage/workspaces/'
'1/password'),
body=HTTPError(msg)
)
)
workspace_id = '1'
with self.assertRaises(HTTPError) as error_context:
self.ws.reset_password(workspace_id)
assert error_context.exception.args[0] == msg