python类HTTPError()的实例源码

submit_test.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_submit_problem_web(self):
        common.ensure_login()
        common.ensure_api_key()
        # publish_time is before the first publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1475279990, 1451606400)
        assert cm.exception.response.status_code == 403
        # Correct publish_time
        self.submit_problem_web(1475280000, 1451606400)
        self.submit_problem_web(1480550400, 1451606400)
        # publish_time is after the last publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1480550410, 1451606400)
        assert cm.exception.response.status_code == 403
        # publish_time is past
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1475280000, 1475280001)
        assert cm.exception.response.status_code == 403
submit_test.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_submit_problem_api(self):
        common.ensure_login()
        common.ensure_api_key()
        # publish_time is before the first publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1475279990, 1451606400)
        assert cm.exception.response.status_code == 403
        # Correct publish_time
        self.submit_problem_api(1475280000, 1451606400)
        self.submit_problem_api(1480550400, 1451606400)
        # publish_time is after the last publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1480550410, 1451606400)
        assert cm.exception.response.status_code == 403
        # publish_time is past
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1475280000, 1475280001)
        assert cm.exception.response.status_code == 403
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def api_delete(server_name, api, session_id):
    #Header and URL for delete call
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.delete(url, headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintrRequestsiApiException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# PUT
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def api_put(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.put(url, data=json.dumps(payload),
                         headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# POST
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def api_post(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.post(url, data=json.dumps(payload),
                          headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# Login.
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def download_file(server_name, report_url, session_id, file_name):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.get(report_url, headers=headers, verify=False, stream=True)
        # if HTTP Response is not 200 then raise an exception
        if r.status_code != 200:
            message = "The HTTP response for get call to the server is not 200."
            raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)

        with open(file_name, 'w') as file_h:
            for block in r.iter_content(4096):
                file_h.write(block)

    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except Exception as e:
        raise TintriRequestsException("An unexpected error: " + e.__str__())
LoginSpider.py 文件源码 项目:Foxlogin 作者: callMePlus 项目源码 文件源码 阅读 101 收藏 0 点赞 0 评论 0
def login(self):
        login_params = {
            'j_username': GlobalVal.username,
            'j_password': GlobalVal.password,
            'validateCode': GlobalVal.secretCode
        }
        login_result = ''
        try:
            login_result = self.__session.post(self.home_url, data=login_params, headers=self.header,timeout=6)
        # DNS failure, refused connection, etc
        except ConnectionError :
            return False,'error:ConnectionError.'
        # Timeout
        except TimeoutError:
            return False, 'error:TimeoutError.'
        # HTTPError
        except requests.HTTPError:
            return False, 'error:HTTPError:%s.'%login_result.status_code
        except :
            return False,'error:network disconnect.'
        # Almost everything is ok.
        if 'success' in login_result.json():
            return True, login_result.json()
        else:
            return False, login_result.json()
test_api.py 文件源码 项目:gobble 作者: openspending 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_handle_raises_error_if_status_is_400(api_call):
    responses.add(api_call.method, api_call.url, body=HTTPError())
    with raises(HTTPError):
        handle(api_call())


# @responses.activate
# @mark.parametrize('api_call', api_calls)
# def test_call_endpoint_with_query_parameters(api_call):
#     responses.add(
#         api_call.method,
#         api_call.url + '?spam=eggs&foo=bar',
#         match_querystring=False
#     )
#     params = dict(foo='bar', spam='eggs')
#     assert api_call(params=params).status_code == 200
fiscal.py 文件源码 项目:gobble 作者: openspending 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _handle_promises(self):
        """Collect all promises from S3 uploads.
        """
        for stream, future in zip(self._streams, self._futures):
            exception = future.exception()
            if exception:
                raise exception
            response = future.result()

            if response.status_code != 200:
                message = 'Something went wrong uploading %s to S3: %s'
                log.error(message, response.url, response.text)
                raise HTTPError(message)

            self._responses.append(response)
            stream.close()
test_configuration.py 文件源码 项目:cachet-url-monitor 作者: mtakaki 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        def getLogger(name):
            self.mock_logger = mock.Mock()
            return self.mock_logger

        sys.modules['logging'].getLogger = getLogger

        def get(url, headers):
            get_return = mock.Mock()
            get_return.ok = True
            get_return.json = mock.Mock()
            get_return.json.return_value = {'data': {'status': 1}}
            return get_return

        sys.modules['requests'].get = get

        self.env = EnvironmentVarGuard()
        self.env.set('CACHET_TOKEN', 'token2')

        self.configuration = Configuration('config.yml')
        sys.modules['requests'].Timeout = Timeout
        sys.modules['requests'].ConnectionError = ConnectionError
        sys.modules['requests'].HTTPError = HTTPError
koditidal2.py 文件源码 项目:plugin.audio.tidal2 作者: arnesongit 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_album_json_thread(self):
        try:
            while not xbmc.abortRequested and not self.abortAlbumThreads:
                try:
                    album_id = self.albumQueue.get_nowait()
                except:
                    break
                try:
                    self.get_album(album_id, withCache=False)
                except requests.HTTPError as e:
                    r = e.response
                    msg = _T(30505)
                    try:
                        msg = r.reason
                        msg = r.json().get('userMessage')
                    except:
                        pass
                    log('Error getting Album ID %s' % album_id, xbmc.LOGERROR)
                    if r.status_code == 429 and not self.abortAlbumThreads:
                        self.abortAlbumThreads = True
                        log('Too many requests. Aborting Workers ...', xbmc.LOGERROR)
                        self.albumQueue._init(9999)
                        xbmcgui.Dialog().notification(plugin.name, msg, xbmcgui.NOTIFICATION_ERROR)
        except Exception, e:
            traceback.print_exc()
scanner.py 文件源码 项目:rpwd 作者: ivytin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def http_get(self, s, host, port, timeout, appendix=''):
        try:
            r = s.get('http://{}:{}{}'.format(host, port, appendix), timeout=timeout, verify=False)
        # except requests.exceptions.Timeout:
        #     return None, RequetsHostException('HTTP connection timeout, host: http://{}:{}'
        #                                       .format(host, port))
        except requests.RequestException as err:
            return None, RequetsHostException('{}:{} request error, msg: {}'
                                              .format(host, port, type(err).__name__))
        # except requests.HTTPError:
        #     return None, RequetsHostException('HTTP server response error, host: http://{}:{}'
        #                                       .format(host, port))
        # except requests.exceptions.RequestException as msg:
        #     return None, RequetsHostException('call requests.get error, msg: {}'
        #                                       .format(msg))
        else:
            return r, None
ari_.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _connect(self):
        logger.debug('ARI client listening...')
        try:
            with self._running():
                self.client.run(apps=[APPLICATION_NAME])
        except socket.error as e:
            if e.errno == errno.EPIPE:
                # bug in ari-py when calling client.close(): ignore it and stop
                logger.error('Error while listening for ARI events: %s', e)
                return
            else:
                self._connection_error(e)
        except (WebSocketException, HTTPError) as e:
            self._connection_error(e)
        except ValueError:
            logger.warning('Received non-JSON message from ARI... disconnecting')
            self.client.close()
services.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_mongooseim_history(self, user_uuid, args):
        participant_user = args.get('participant_user_uuid')
        participant_server = args.get('participant_server_uuid', self._xivo_uuid)
        limit = args.get('limit')

        user_jid = self._build_jid(self._xivo_uuid, user_uuid)
        try:
            if participant_user:
                participant_jid = self._build_jid(participant_server, participant_user)
                result = self.mongooseim_client.get_user_history_with_participant(user_jid,
                                                                                  participant_jid,
                                                                                  limit)
            else:
                result = self.mongooseim_client.get_user_history(user_jid, limit)
        except HTTPError as e:
            raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
        except RequestException as e:
            raise MongooseIMUnreachable(self._xivo_uuid, e)

        return result
services.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_message(self, request_body, user_uuid=None):
        from_xivo_uuid, from_ = self._build_from(request_body, user_uuid)
        to_xivo_uuid, to = self._build_to(request_body)
        alias = request_body['alias']
        msg = request_body['msg']
        self.contexts.add(from_, to, to_xivo_uuid=to_xivo_uuid, alias=alias)

        from_jid = self._build_jid(from_xivo_uuid, from_)
        to_jid = self._build_jid(to_xivo_uuid, to)
        try:
            self.mongooseim_client.send_message(from_jid, to_jid, msg)
        except HTTPError as e:
            raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
        except RequestException as e:
            raise MongooseIMUnreachable(self._xivo_uuid, e)

        bus_event = ChatMessageSent((from_xivo_uuid, from_),
                                    (to_xivo_uuid, to),
                                    alias,
                                    msg)
        headers = {
            'user_uuid:{uuid}'.format(uuid=from_): True,
        }
        self._bus_publisher.publish(bus_event, headers=headers)
utils.py 文件源码 项目:python3-linkedin 作者: DEKHTIARJonathan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def raise_for_error(response):
    try:
        response.raise_for_status()
    except (requests.HTTPError, requests.ConnectionError) as error:
        try:
            if len(response.content) == 0:
                # There is nothing we can do here since LinkedIn has neither sent
                # us a 2xx response nor a response content.
                return
            response = response.json()
            if ('error' in response) or ('errorCode' in response):
                message = '%s: %s' % (response.get('error', str(error)),
                                      response.get('message', 'Unknown Error'))
                error_code = response.get('status')
                ex = get_exception_for_error_code(error_code)
                raise ex(message)
            else:
                raise LinkedInError(error.message)
        except (ValueError, TypeError):
            raise LinkedInError(error.message)
api.py 文件源码 项目:pypdns 作者: fousti 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _call(self, verb, url, data={}, params={}, headers={}):
        if headers:
            self.session.headers.update(headers)

        log.info('Call %s with data %s', url, data)
        resp = self.session.request(verb, url, json=data, params=params)
        status_code = resp.status_code
        try:
            resp.raise_for_status()
        except requests.HTTPError as exc:
            log.debug('Error occured, endpoint : %s, apikey : %s',
                      url, self.apikey)
            return resp, status_code

        except requests.Timeout:
            log.error('Request Timeout to %si', url)
            return False, status_code
        except requests.RequestException:
            log.error('Requests Error')
            return False, status_code
        else:
            return resp, status_code
test_workspaces.py 文件源码 项目:sapi-python-client 作者: keboola 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_detail_inexsitent_workspace(self):
        """
        Workspace Endpoint raises HTTPError when mocking inexistent workspace
        detail
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/1')
        responses.add(
            responses.Response(
                method='GET',
                url='https://connection.keboola.com/v2/storage/workspaces/1',
                body=HTTPError(msg)
            )
        )
        workspace_id = '1'
        with self.assertRaises(HTTPError) as error_context:
            self.ws.detail(workspace_id)
        assert error_context.exception.args[0] == msg
test_workspaces.py 文件源码 项目:sapi-python-client 作者: keboola 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_reset_password_for_inexistent_workspace(self):
        """
        Workspace endpoint raises HTTPError when mock resetting password for
        inexistent workspace
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/1/'
               'password')
        responses.add(
            responses.Response(
                method='POST',
                url=('https://connection.keboola.com/v2/storage/workspaces/'
                     '1/password'),
                body=HTTPError(msg)
            )
        )
        workspace_id = '1'
        with self.assertRaises(HTTPError) as error_context:
            self.ws.reset_password(workspace_id)
        assert error_context.exception.args[0] == msg


问题


面经


文章

微信
公众号

扫码关注公众号