python类RequestException()的实例源码

client.py 文件源码 项目:pyetcd 作者: twindb 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _request_call(self, uri, method='get', wait=False, **kwargs):
        if self._allow_reconnect:
            urls = self._urls
        else:
            urls = [self._urls[0]]
        error_messages = []
        for u in urls:
            try:
                url = u + uri

                return EtcdResult(getattr(self._session, method)(url,
                                                                 **kwargs))
            except RequestException as err:
                error_messages.append("%s: %s" % (u, err))

        raise EtcdException('No more hosts to connect.\nErrors: %s'
                            % '\n'.join(error_messages))
bitbucket.py 文件源码 项目:badwolf 作者: bosondata 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def grant_access_token(self, code):
        res = self._session.post(
            'https://bitbucket.org/site/oauth2/access_token',
            data={
                'grant_type': 'authorization_code',
                'code': code,
            },
            auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret)
        )
        try:
            res.raise_for_status()
        except requests.RequestException as reqe:
            error_info = res.json()
            raise BitbucketAPIError(
                res.status_code,
                error_info.get('error', ''),
                error_info.get('error_description', ''),
                request=reqe.request,
                response=reqe.response
            )
        data = res.json()
        self._access_token = data['access_token']
        self._refresh_token = data['refresh_token']
        self._token_type = data['token_type']
        return data
client.py 文件源码 项目:pyoozie 作者: Shopify 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _test_connection(self):
        response = None
        try:
            response = requests.get('{}/versions'.format(self._url), timeout=self._timeout)
            response.raise_for_status()
            self._stats.update(response)
        except requests.RequestException as err:
            self._stats.update(response)
            if self._verbose and response is not None:
                self.logger.error(response.headers)
            message = "Unable to contact Oozie server at {}".format(self._url)
            raise exceptions.OozieException.communication_error(message, err)
        try:
            versions = response.json()
        except ValueError as err:
            message = "Invalid response from Oozie server at {} ".format(self._url)
            raise exceptions.OozieException.communication_error(message, err)
        if 2 not in versions:
            message = "Oozie server at {} does not support API version 2 (supported: {})".format(self._url, versions)
            raise exceptions.OozieException.communication_error(message)
rw_handles.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def write(self, data):
        """Write data to the file.

        :param data: data to be written
        :raises: VimConnectionException, VimException
        """
        try:
            self._file_handle.send(data)
        except requests.RequestException as excep:
            excep_msg = _("Connection error occurred while writing data to"
                          " %s.") % self._url
            LOG.exception(excep_msg)
            raise exceptions.VimConnectionException(excep_msg, excep)
        except Exception as excep:
            # TODO(vbala) We need to catch and raise specific exceptions
            # related to connection problems, invalid request and invalid
            # arguments.
            excep_msg = _("Error occurred while writing data to"
                          " %s.") % self._url
            LOG.exception(excep_msg)
            raise exceptions.VimException(excep_msg, excep)
rw_handles.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def write(self, data):
        """Write data to the file.

        :param data: data to be written
        :raises: VimConnectionException, VimException
        """
        try:
            self._file_handle.send(data)
            self._bytes_written += len(data)
        except requests.RequestException as excep:
            excep_msg = _("Connection error occurred while writing data to"
                          " %s.") % self._url
            LOG.exception(excep_msg)
            raise exceptions.VimConnectionException(excep_msg, excep)
        except Exception as excep:
            # TODO(vbala) We need to catch and raise specific exceptions
            # related to connection problems, invalid request and invalid
            # arguments.
            excep_msg = _("Error occurred while writing data to"
                          " %s.") % self._url
            LOG.exception(excep_msg)
            raise exceptions.VimException(excep_msg, excep)
fees.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_fees():
    try:
        response = requests.get(_fee_host + "v1/fees/recommended")
        if response.status_code == 200:
            fee_per_kb = response.json()['halfHourFee'] * 1000
        else:
            raise requests.ConnectionError('Received status_code %d' % response.status_code)
    except requests.RequestException as error:
        fee_per_kb = DEFAULT_FEE_PER_KB
        logger.error(
            "Error getting recommended fees from server: %s. Using defaults." %
            error)

    if not 0 <= fee_per_kb <= 2 * DEFAULT_FEE_PER_KB:
        raise exceptions.UnreasonableFeeError(
            'Unreasonable fee per kB: %s' % fee_per_kb)

    return {
        'per_kb': fee_per_kb,
        'per_input': int(DEFAULT_INPUT_SIZE_KB * fee_per_kb),
        'per_output': int(DEFAULT_OUTPUT_SIZE_KB * fee_per_kb)
    }
health.py 文件源码 项目:python-panopticon 作者: mobify 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_url(url, expected_status=200, timeout=5):
    """
    A simple check if `url` is reachable and resturns `expected_status`.
    """
    if not url:
        return {HealthCheck.HEALTHY: False,
                HealthCheck.STATUS_MESSAGE: 'No URL specified to check.'}

    try:
        response = requests.get(url, timeout=timeout)
    except requests.RequestException as exc:
        message = 'Error connecting to URL: {}'.format(str(exc))
        return {HealthCheck.HEALTHY: False,
                HealthCheck.STATUS_MESSAGE: message}

    if expected_status == response.status_code:
        return {HealthCheck.HEALTHY: True,
                HealthCheck.STATUS_MESSAGE: 'URL is available'}

    message = 'server responded with unexpected status code: {}'.format(
        response.status_code)

    return {HealthCheck.HEALTHY: False,
            HealthCheck.STATUS_MESSAGE: message}
version.py 文件源码 项目:globus-cli 作者: globus 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_versions():
    """
    Wrap in a function to ensure that we don't run this every time a CLI
    command runs (yuck!)

    Also protects import of `requests` from issues when grabbed by setuptools.
    More on that inline
    """
    # import in the func (rather than top-level scope) so that at setup time,
    # `requests` isn't required -- otherwise, setuptools will fail to run
    # because requests isn't installed yet.
    import requests
    try:
        version_data = requests.get(
            "https://pypi.python.org/pypi/globus-cli/json").json()
        latest = max(LooseVersion(v) for v in version_data["releases"])
        return latest, LooseVersion(__version__)
    # if the fetch from pypi fails
    except requests.RequestException:
        return None, LooseVersion(__version__)
hook.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def trigger(self):
        if not self.revoked and self.url != None:
            try:
                # Find the events
                events = sg.db.session.query(EVENT).filter(EVENT.id > self.last_event_id, EVENT.notif_to_push == True, EVENT.group_id == self.group_id).order_by(asc(EVENT.time)).all()
                res = []
                max_id = 0
                for event in events:
                    max_id = max(event.id, max_id)
                    res.append({'id': event.id, 'notif': event.notif.encode(sg.DEFAULT_CHARSET)})
                # Send the data
                if len(res) > 0 :
                    try:
                        headers = {'Authorization': self.jwt}
                        r = requests.post(self.url, headers = headers, json = res, timeout = 1)
                        # Update the hook
                        self.last_event_id = max_id
                        sg.db.session.add(self)
                        sg.db.session.commit()
                    except requests.RequestException as e:
                        sg.logger.warning('Unable to send events for reverse hook %s (%s) and url %s : %s' % (self.name, self.id, self.url, str(e), ))
            except NoResultFound:
                sg.logger.warning('No event found corresponding to the reverse hook %s (%s)' % (self.name, self.id, ))
AlexaService.py 文件源码 项目:RaspberryEcho 作者: ericpotvin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def connected():
        """ Check if can connect to the Amazon oAuth2
            :return boolean
        """
        print "Checking Internet Connection"
        try:
            requests.get(AlexaService.AMAZON_TOKEN_URL)
            print "Connection OK"
            return True
        except requests.exceptions.Timeout as exception:
            print "Error: Timeout / " + exception.message
        except requests.exceptions.TooManyRedirects as exception:
            print "Error: Invalid URL provided / " + exception.message
        except requests.RequestException as exception:
            print "Error: Connection Failed / " + exception.message
            return False
client.py 文件源码 项目:alluxio-py 作者: Alluxio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read(self, n=None):
        """Read the file stream.

        Args:
            n (int, optional): The bytes to read from the stream, if n is None,
                it means read the whole data stream.

        Returns:
            The data in bytes, if all data has been read, returns an empty string.
        """

        if self.r is None:
            self._init_r()
        try:
            if n is None:
                return self.r.content
            return self.r.raw.read(n)
        except requests.RequestException:
            raise_with_traceback(exceptions.HTTPError, 'Failed to read the response body')
api.py 文件源码 项目:python-zillow 作者: seme0021 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _RequestUrl(self, url, verb, data=None):
        """
        Request a url.
        :param url: The web location we want to retrieve.
        :param verb: GET only (for now).
        :param data: A dict of (str, unicode) key/value pairs.
        :return:A JSON object.
        """
        if verb == 'GET':
            url = self._BuildUrl(url, extra_params=data)
            try:
                return requests.get(
                    url,
                    auth=self.__auth,
                    timeout=self._timeout
                )
            except requests.RequestException as e:
                raise ZillowError(str(e))
        return 0
geetest_offline.py 文件源码 项目:script 作者: 9468305 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def query_leveldb(query_db, save_db, queryed_db):
    '''query by leveldb'''
    try:
        with requests.Session() as session:
            _token = ''
            for _name, _code in query_db.RangeIter():
                if not util.has_key(save_db, _name) and not util.has_key(queryed_db, _name):
                    # ????
                    _subname = _name[0: 18] if len(_name) > 18 else _name
                    logging.info(_name + ' -> ' + _subname)
                    _query_code, _token = query_keyword(session, _subname, _token)
                    if _query_code:
                        for _r in _query_code:
                            logging.info(_r[0].decode() + ' : ' + _r[1].decode())
                            save_db.Put(_r[0], _r[1], sync=True)
                    queryed_db.Put(_name, '', sync=True)
        return True
    except requests.RequestException as _e:
        logging.error(_e)
        return False
geetest_offline_nm.py 文件源码 项目:script 作者: 9468305 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def query_leveldb(query_db, save_db, queryed_db):
    '''query by leveldb'''
    try:
        with requests.Session() as session:
            for _name, _code in query_db.RangeIter():
                if not util.has_key(save_db, _name) and not util.has_key(queryed_db, _name):
                    # ????
                    _subname = _name[0: 18] if len(_name) > 18 else _name
                    logging.info(_name + ' -> ' + _subname)
                    _code_all = query_keyword(session, _subname)
                    if _code_all:
                        for _c in _code_all:
                            logging.info(_c[0] + ' : ' + _c[1])
                            save_db.Put(_c[0], _c[1], sync=True)
                    queryed_db.Put(_name, '', sync=True)
        return True
    except requests.RequestException as _e:
        logging.error(_e)
        return False
jsonrpc.py 文件源码 项目:Polyglot 作者: UniversalDevicesInc 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def send_request(self, method_name, is_notification, params):
        """Issue the HTTP request to the server and return the method result (if not a notification)"""
        request_body = self.serialize(method_name, params, is_notification)
        try:
            response = self.request(data=request_body)
        except requests.RequestException as requests_exception:
            raise TransportError('Error calling method %r' % method_name, requests_exception)

        if response.status_code != requests.codes.ok:
            raise TransportError(response.status_code)

        if not is_notification:
            try:
                parsed = response.json()
            except ValueError as value_error:
                raise TransportError('Cannot deserialize response body', value_error)

            return self.parse_result(parsed)
simplepush.py 文件源码 项目:notifiers 作者: liiight 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _send_notification(self, data: dict) -> Response:
        response_data = {
            'provider_name': self.provider_name,
            'data': data
        }
        try:
            response = requests.post(self.base_url, data=data)
            response.raise_for_status()
            response_data['response'] = response
        except requests.RequestException as e:
            if e.response is not None:
                response_data['response'] = e.response
                response_data['errors'] = [e.response.json()['message']]
            else:
                response_data['errors'] = [(str(e))]
        return create_response(**response_data)
slack.py 文件源码 项目:notifiers 作者: liiight 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _send_notification(self, data: dict) -> Response:
        url = data.pop('webhook_url')
        response_data = {
            'provider_name': self.provider_name,
            'data': data
        }
        try:
            response = requests.post(url, json=data)
            response.raise_for_status()
            response_data['response'] = response
        except requests.RequestException as e:
            if e.response is not None:
                response_data['response'] = e.response
                response_data['errors'] = [e.response.text]
            else:
                response_data['errors'] = [(str(e))]
        return create_response(**response_data)
pushover.py 文件源码 项目:notifiers 作者: liiight 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _send_notification(self, data: dict) -> Response:
        response_data = {
            'provider_name': self.provider_name,
            'data': data
        }
        try:
            response = requests.post(self.base_url, data=data)
            response.raise_for_status()
            response_data['response'] = response
        except requests.RequestException as e:
            if e.response is not None:
                response_data['response'] = e.response
                response_data['errors'] = e.response.json()['errors']
            else:
                response_data['errors'] = [(str(e))]
        return create_response(**response_data)
telegram.py 文件源码 项目:notifiers 作者: liiight 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _send_notification(self, data: dict) -> Response:
        token = data.pop('token')
        url = self.base_url.format(token=token, method='sendMessage')
        response_data = {
            'provider_name': self.provider_name,
            'data': data
        }
        try:
            response = requests.post(url, json=data)
            response.raise_for_status()
            response_data['response'] = response
        except requests.RequestException as e:
            if e.response is not None:
                response_data['response'] = e.response
                response_data['errors'] = [e.response.json()['description']]
            else:
                response_data['errors'] = [(str(e))]
        return create_response(**response_data)
join.py 文件源码 项目:notifiers 作者: liiight 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _send_notification(self, data: dict) -> Response:
        response_data = {
            'provider_name': self.provider_name,
            'data': data
        }
        try:
            response = requests.get(self.base_url, params=data)
            response.raise_for_status()
            response_data['response'] = response
            rsp = response.json()
            if not rsp['success']:
                response_data['errors'] = [rsp['errorMessage']]
        except requests.RequestException as e:
            if e.response is not None:
                response_data['response'] = e.response
                response_data['errors'] = [e.response.json()['errorMessage']]
            else:
                response_data['errors'] = [(str(e))]

        return create_response(**response_data)


问题


面经


文章

微信
公众号

扫码关注公众号