python类HTTPError()的实例源码

apidriver.py 文件源码 项目:charm-hacluster 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _post(self, path, op='update', **kwargs):
        """
        Issues a POST request to the MAAS REST API.
        """
        try:
            response = self.client.post(path, **kwargs)
            payload = response.read()
            log.debug("Request %s results: [%s] %s", path, response.getcode(),
                      payload)

            if response.getcode() == OK:
                return Response(True, yaml.load(payload))
            else:
                return Response(False, payload)
        except HTTPError as e:
            log.error("Error encountered: %s for %s with params %s",
                      str(e), path, str(kwargs))
            return Response(False, None)
        except Exception as e:
            log.error("Post request raised exception: %s", e)
            return Response(False, None)
apidriver.py 文件源码 项目:charm-hacluster 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _put(self, path, **kwargs):
        """
        Issues a PUT request to the MAAS REST API.
        """
        try:
            response = self.client.put(path, **kwargs)
            payload = response.read()
            log.debug("Request %s results: [%s] %s", path, response.getcode(),
                      payload)
            if response.getcode() == OK:
                return Response(True, payload)
            else:
                return Response(False, payload)
        except HTTPError as e:
            log.error("Error encountered: %s with details: %s for %s with "
                      "params %s", e, e.read(), path, str(kwargs))
            return Response(False, None)
        except Exception as e:
            log.error("Put request raised exception: %s", e)
            return Response(False, None)

    ###########################################################################
    #  DNS API - http://maas.ubuntu.com/docs2.0/api.html#dnsresource
    ###########################################################################
util.py 文件源码 项目:cli 作者: riseml 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def call_api(api_fn, not_found=None):
    try:
        return api_fn()
    except ApiException as e:
        if e.status == 0:
            raise e
        elif e.status == 401:
            handle_error("You are not authorized!")
        elif e.status == 403:
                handle_http_error(e.body, e.status)
        elif e.status == 404 and not_found:
            not_found()
        else:
            handle_http_error(e.body, e.status)
    except LocationValueError as e:
        handle_error("RiseML is not configured! Please run 'riseml user login' first!")
    except HTTPError as e:
        handle_error('Could not connect to API ({host}:{port}{url}) — {exc_type}'.format(
            host=e.pool.host,
            port=e.pool.port,
            url=e.url,
            exc_type=e.__class__.__name__
        ))
test_response.py 文件源码 项目:drf-reverse-proxy 作者: danpoland 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_broken_response(self):
        request = self.factory.get('/')

        urlopen_mock = MagicMock(side_effect=HTTPError())
        with patch(URLOPEN, urlopen_mock), self.assertRaises(HTTPError):
            CustomProxyView.as_view()(request, path='/')
user.py 文件源码 项目:cli 作者: riseml 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_api_config(api_url, api_key, timeout=180):
    print('Waiting %ss for successful login to %s with API key \'%s\' ...' % (timeout, api_url, api_key))
    config = Configuration()
    old_api_host = config.host
    old_api_key = config.api_key['api_key']
    config.host = api_url
    config.api_key['api_key'] = api_key
    api_client = ApiClient()
    client = AdminApi(api_client)
    start = time.time()
    while True:
        try:
            cluster_infos = client.get_cluster_infos()
            cluster_id = get_cluster_id(cluster_infos)
            print('Success! Cluster ID: %s' % cluster_id)
            config.api_key['api_key'] = old_api_key
            config.host = old_api_host
            return cluster_id
        except ApiException as exc:
            if exc.reason == 'UNAUTHORIZED':
                print(exc.status, 'Unauthorized - wrong api key?')
                sys.exit(1)
            elif time.time() - start < timeout:
                time.sleep(1)
                continue
            else:
                print(exc.status, exc.reason)
                sys.exit(1)
        except HTTPError as e:
            if time.time() - start < timeout:
                time.sleep(1)
                continue
            else:
                print('Unable to connecto to %s ' % api_url)
                # all uncaught http errors goes here
                print(e.reason)
                sys.exit(1)
test_driver.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_resync_http_error(self, m_sleep):
        self.driver._init_received.set()
        with patch.object(self.driver, "get_etcd_connection") as m_get:
            with patch("calico.etcddriver.driver.monotonic_time") as m_time:
                m_time.side_effect = iter([
                    1, 10, RuntimeError()
                ])
                m_get.side_effect = HTTPError()
                self.assertRaises(RuntimeError, self.driver._resync_and_merge)
client.py 文件源码 项目:Sentry 作者: NetEaseGame 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def urlopen(self, method, path, **kwargs):
        """
        Make a request using the next server according to the connection
        strategy, and retries up to max_retries attempts. Ultimately,
        if the request still failed, we reraise the HTTPError from
        urllib3. If at the start of the request, there are no known
        available hosts, we revive all dead connections and forcefully
        attempt to reconnect.
        """
        # If we're trying to initiate a new connection, and
        # all connections are already dead, then we should flail
        # and attempt to connect to one of them
        if len(self.connections) == 0:
            self.force_revive()

        # We don't need strict host checking since our client is enforcing
        # the correct behavior anyways
        kwargs.setdefault('assert_same_host', False)

        try:
            for _ in xrange(self.max_retries):
                conn = self.strategy.next(self.connections)
                try:
                    return conn.urlopen(method, path, **kwargs)
                except HTTPError:
                    self.mark_dead(conn)

                    if len(self.connections) == 0:
                        raise
        finally:
            self.cleanup_dead()
__main__.py 文件源码 项目:cli 作者: riseml 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-v', help="show endpoints", action='store_const', const=True)
    parser.add_argument('--version', '-V', help="show version", action='version', version='RiseML CLI {}'.format(VERSION))
    subparsers = parser.add_subparsers()

    # user ops
    add_whoami_parser(subparsers)
    add_user_parser(subparsers)

    # system ops
    add_system_parser(subparsers)
    add_account_parser(subparsers)

    # worklow ops
    add_init_parser(subparsers)
    add_train_parser(subparsers)
    #add_exec_parser(subparsers)
    add_monitor_parser(subparsers)
    #add_deploy_parser(subparsers)
    add_logs_parser(subparsers)
    add_kill_parser(subparsers)
    add_status_parser(subparsers)

    args = parser.parse_args(sys.argv[1:])

    if args.v:
        print('api_url: %s' % get_api_url())
        print('sync_url: %s' % get_sync_url())
        print('stream_url: %s' % get_stream_url())
        print('git_url: %s' % get_git_url())

    if hasattr(args, 'run'):
        try:
            args.run(args)
        except HTTPError as e:
            # all uncaught http errors goes here
            handle_error(str(e))
        except KeyboardInterrupt:
            print('\nAborting...')
    else:
        parser.print_usage()


问题


面经


文章

微信
公众号

扫码关注公众号