python类HTTPError()的实例源码

api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start_websocket(self, run_in_background=False):
        socket_params = {}
        try:
            socket_params = self.get_socket_params()
        except requests.exceptions.HTTPError:
            print ("Can't Access Socket Params")
        ping_interval = 60
        ping_timeout = 10

        if 'pythonPingInterval' in socket_params.keys():
            ping_interval = socket_params['pythonPingInterval']

        if 'pythonPingTimeout' in socket_params.keys():
            ping_timeout = socket_params['pythonPingTimeout']

        url = self.config['socketEndpoint'].format(api_key=self.api_key, access_token=self.access_token)
        self.websocket = websocket.WebSocketApp(url,
                                                header={'Authorization: Bearer' + self.access_token},
                                                on_data=self._on_data,
                                                on_error=self._on_error,
                                                on_close=self._on_close)
        if run_in_background is True:
            self.ws_thread = threading.Thread(target=self.websocket.run_forever)
            self.ws_thread.daemon = True
            self.ws_thread.start()
        else:
            self.websocket.run_forever(ping_interval=ping_interval, ping_timeout=ping_timeout)
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def api_call_helper(self, name, http_method, params, data):
        # helper formats the url and reads error codes nicely
        url = self.config['host'] + self.config['routes'][name]

        if params is not None:
            url = url.format(**params)

        response = self.api_call(url, http_method, data)

        if response.status_code != 200:
            raise requests.HTTPError(response.text)

        body = json.loads(response.text)

        if is_status_2xx(body['code']):
            # success
            return body['data']
        else:
            raise requests.HTTPError(response.text)

        return
test_workspaces.py 文件源码 项目:sapi-python-client 作者: keboola 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_load_inexistent_tables_to_workspace(self):
        """
        Workspace endpoint raises HTTPError when mock loading inexistent table
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/78432/'
               'load')
        responses.add(
            responses.Response(
                method='POST',
                url=('https://connection.keboola.com/v2/storage/workspaces/'
                     '78432/load'),
                body=HTTPError(msg)
            )
        )
        workspace_id = '78432'
        mapping = {"in.c-table.does_not_exist": "my-table"}
        with self.assertRaises(HTTPError) as error_context:
            self.ws.load_tables(workspace_id, mapping)
        assert error_context.exception.args[0] == msg
test_twindb_cloudflare.py 文件源码 项目:twindb_cloudflare 作者: twindb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_api_call_raises_exception_connection_error(mock_requests,
                                                    cloudflare):

    for ex in [requests.exceptions.RequestException,
               requests.exceptions.HTTPError,
               requests.exceptions.ConnectionError,
               requests.exceptions.ProxyError,
               requests.exceptions.SSLError,
               requests.exceptions.Timeout,
               requests.exceptions.ConnectTimeout,
               requests.exceptions.ReadTimeout,
               requests.exceptions.URLRequired,
               requests.exceptions.TooManyRedirects,
               requests.exceptions.MissingSchema,
               requests.exceptions.InvalidSchema,
               requests.exceptions.InvalidURL,
               requests.exceptions.ChunkedEncodingError,
               requests.exceptions.ContentDecodingError,
               requests.exceptions.StreamConsumedError,
               requests.exceptions.RetryError]:

        mock_requests.get.side_effect = ex('Some error')
        with pytest.raises(CloudFlareException):
            cloudflare._api_call('/foo')
web.py 文件源码 项目:it-jira-bamboohr 作者: saucelabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def request_jira(client, url, method='GET', **kwargs):
    jwt_authorization = 'JWT %s' % encode_token(
        method, url, app.config.get('ADDON_KEY'),
        client.sharedSecret)
    result = requests.request(
        method,
        client.baseUrl.rstrip('/') + url,
        headers={
            "Authorization": jwt_authorization,
            "Content-Type": "application/json"
        },
        **kwargs)
    try:
        result.raise_for_status()
    except requests.HTTPError as e:
        raise requests.HTTPError(e.response.text, response=e.response)
    return result
packages_status_detector.py 文件源码 项目:pip-upgrader 作者: simion 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _fetch_index_package_info(self, package_name, current_version):
        """
        :type package_name: str
        :type current_version: version.Version
        """

        try:
            package_canonical_name = package_name
            if self.PYPI_API_TYPE == 'simple_html':
                package_canonical_name = canonicalize_name(package_name)
            response = requests.get(self.PYPI_API_URL.format(package=package_canonical_name), timeout=15)
        except HTTPError as e:  # pragma: nocover
            return False, e.message

        if not response.ok:  # pragma: nocover
            return False, 'API error: {}'.format(response.reason)

        if self.PYPI_API_TYPE == 'pypi_json':
            return self._parse_pypi_json_package_info(package_name, current_version, response)
        elif self.PYPI_API_TYPE == 'simple_html':
            return self._parse_simple_html_package_info(package_name, current_version, response)
        else:  # pragma: nocover
            raise NotImplementedError('This type of PYPI_API_TYPE type is not supported')
discord.py 文件源码 项目:reconbot 作者: flakas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_character(self, character_id):
        try:
            character = self.eve.get_character(character_id)
        except requests.HTTPError as ex:
            # Patch for character being unresolvable and ESI throwing internal errors
            # Temporarily stub character to not break our behavior.
            if ex.response.status_code == 500:
                character = { 'name': 'Unknown character', 'corporation_id': 98356193 }
            else:
                raise

        return '**%s** (https://zkillboard.com/character/%d/) (%s)' % (
            character['name'],
            character_id,
            self.get_corporation(character['corporation_id'])
        )
mainapi.py 文件源码 项目:polichombr 作者: ANSSI-FR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, endpoint, args=None):
        """
            Wrapper for requests.get
            @arg args is a dict wich is converted to URL parameters
        """
        answer = requests.get(endpoint, params=args)
        if answer.status_code != 200:
            if answer.status_code == 404:
                self.logger.error(
                    "endpoint %s or resource not found", endpoint)
                self.logger.error(
                    "error description was: %s",
                    answer.json()["error_description"])
                return None
            else:
                self.logger.error(
                    "Error during get request for endpoint %s", endpoint)
                self.logger.error("Status code was %d", answer.status_code)
                self.logger.error("error message was: %s", answer.json())
                raise requests.HTTPError
        return answer.json()
test_connectbox_static.py 文件源码 项目:connectbox-pi 作者: ConnectBox 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ssidSuccessfullySet(self, ssid_str):
        r = requests.put(self.ADMIN_SSID_URL, auth=getAdminAuth(),
                         data=json.dumps({"value": ssid_str}))
        try:
            # This assumes there's not a better method to test whether
            #  the SSID was of a valid length...
            r.raise_for_status()
        except requests.HTTPError:
            return False

        if r.json()["result"] != self.SUCCESS_RESPONSE:
            return False

        r = requests.get(self.ADMIN_SSID_URL, auth=getAdminAuth())
        r.raise_for_status()
        # Finally, check whether it was actually set
        return r.json()["result"][0] == ssid_str
__init__.py 文件源码 项目:PythonStudyCode 作者: TongTongX 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_version(self):
        """Fetches the current version number of the Graph API being used."""
        args = {"access_token": self.access_token}
        try:
            response = requests.request("GET",
                                        "https://graph.facebook.com/" +
                                        self.version + "/me",
                                        params=args,
                                        timeout=self.timeout,
                                        proxies=self.proxies)
        except requests.HTTPError as e:
            response = json.loads(e.read())
            raise GraphAPIError(response)

        try:
            headers = response.headers
            version = headers["facebook-api-version"].replace("v", "")
            return float(version)
        except Exception:
            raise GraphAPIError("API version number not available")
tests.py 文件源码 项目:choochoo 作者: nlsdfnbch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_categories_returns_200_and_decoded_json(self):
        provider_id = '1'
        provider_name = 'flinkster'
        category_id  = '1000'

        # Assert querying by network only works as expected
        try:
            resp = self.api.categories(provider_name)
        except HTTPError as e:
            self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
        self.assertIsInstance(resp, dict)
        self.assertIn('items', resp)

        # Assert querying by network and category ID works as expected
        try:
            resp = self.api.categories(provider_name, by_id=category_id)
        except HTTPError as e:
            self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
        self.assertIsInstance(resp, (list, dict))
        self.assertNotIn('items', resp)
accounting.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_time_totals_from_pond(timeallocation, start, end, too, recur=0):
    """
    Pond queries are too slow with large proposals and time out, this hack splits the query
    when a timeout occurs
    """
    if recur > 3:
        raise RecursionError('Pond is timing out, too much recursion.')

    total = 0
    try:
        total += query_pond(
            timeallocation.proposal.id, start, end, timeallocation.telescope_class, timeallocation.instrument_name, too
        )
    except requests.HTTPError:
        logger.warning('We got a pond inception. Splitting further.')
        for start, end in split_time(start, end, 4):
            total += get_time_totals_from_pond(timeallocation, start, end, too, recur=recur + 1)

    return total
api.py 文件源码 项目:fac 作者: mickael9 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def login(self, username, password, require_ownership=False):
        resp = self.session.post(
            self.login_url,
            params=dict(require_game_ownership=int(require_ownership)),
            data=dict(username=username, password=password)
        )

        try:
            json = resp.json()
        except:
            json = None

        try:
            resp.raise_for_status()
            return json[0]
        except requests.HTTPError:
            if isinstance(json, dict) and 'message' in json:
                if json['message'] == 'Insufficient membership':
                    raise OwnershipError(json['message'])
                else:
                    raise AuthError(json['message'])
            else:
                raise
http.py 文件源码 项目:storj-python-sdk 作者: Storj 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def bucket_get(self, bucket_id):
        """Return the bucket object.
        See `API buckets: GET /buckets
        <https://storj.github.io/bridge/#!/buckets/get_buckets_id>`_
        Args:
            bucket_id (str): bucket unique identifier.
        Returns:
            (:py:class:`model.Bucket`): bucket.
        """
        self.logger.info('bucket_get(%s)', bucket_id)

        try:
            return model.Bucket(**self._request(
                method='GET',
                path='/buckets/%s' % bucket_id))

        except requests.HTTPError as e:
            if e.response.status_code == requests.codes.not_found:
                return None
            else:
                self.logger.error('bucket_get() error=%s', e)
                raise BridgeError()
http_test.py 文件源码 项目:storj-python-sdk 作者: Storj 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bucket_get_not_found(self):
        """Test Client.bucket_get() when bucket does not exist."""

        mock_error = requests.HTTPError()
        mock_error.response = mock.Mock()
        mock_error.response.status_code = 404

        self.mock_request.side_effect = mock_error

        bucket = self.client.bucket_get('inexistent')

        self.mock_request.assert_called_once_with(
            method='GET',
            path='/buckets/inexistent')

        assert bucket is None
BusStopPi.py 文件源码 项目:BusStopPi 作者: LoveBootCaptain 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_json():

    global data

    try:

        # response = requests.get('http://sickpi:3000/station/9100013')  # Spittelmarkt
        response = requests.get('http://sickpi:3000/station/9160523')  # Gotlindestr.

        json_data = response.json()

        data = json.loads(json.dumps(json_data[0], indent=4))

        print('\nUPDATE JSON')

        update_departures()

    except (requests.HTTPError, requests.ConnectionError):

        print('\nERROR UPDATE JSON')
        quit_all()
client.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def similar_fragments(self, fragment_id, cutoff, limit=1000):
        """Find similar fragments to query.

        Args:
            fragment_id (str): Query fragment identifier
            cutoff (float): Cutoff, similarity scores below cutoff are discarded.
            limit (int): Maximum number of hits. Default is None for no limit.

        Returns:
            list[dict]: Query fragment identifier, hit fragment identifier and similarity score

        Raises:
            request.HTTPError: When fragment_id could not be found
        """
        url = self.base_url + '/fragments/{fragment_id}/similar'.format(fragment_id=fragment_id)
        params = {'cutoff': cutoff, 'limit': limit}
        response = requests.get(url, params)
        response.raise_for_status()
        return response.json()
client.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _fetch_fragments(self, idtype, ids):
        url = self.base_url + '/fragments?{idtype}={ids}'.format(idtype=idtype, ids=','.join(ids))
        absent_identifiers = []
        try:
            response = requests.get(url)
            response.raise_for_status()
            fragments = response.json()
        except HTTPError as e:
            if e.response.status_code == 404:
                body = e.response.json()
                fragments = body['fragments']
                absent_identifiers = body['absent_identifiers']
            else:
                raise e
        # Convert molblock string to RDKit Mol object
        for fragment in fragments:
            if fragment['mol'] is not None:
                fragment['mol'] = MolFromMolBlock(fragment['mol'])
        return fragments, absent_identifiers
client.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pharmacophores(self, fragment_ids):
        absent_identifiers = []
        pharmacophores = []
        for fragment_id in fragment_ids:
            url = self.base_url + '/fragments/{0}.phar'.format(fragment_id)
            try:
                response = requests.get(url)
                response.raise_for_status()
                pharmacophore = response.text
                pharmacophores.append(pharmacophore)
            except HTTPError as e:
                if e.response.status_code == 404:
                    pharmacophores.append(None)
                    absent_identifiers.append(fragment_id)
                else:
                    raise e
        if absent_identifiers:
            raise IncompletePharmacophores(absent_identifiers, pharmacophores)
        return pharmacophores
pushover_notifier.py 文件源码 项目:oversight 作者: hebenon 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_trigger_event(self, sender, **data):
        # Pushover doesn't support images, so just send the event.
        notification_data = {
            "user": self.pushover_user,
            "token": self.pushover_token,
            "message": "Camera %s, event: %s" % (data['source'], data['prediction']),
            "timestamp": calendar.timegm(data['timestamp'].timetuple())
        }

        # Optionally, set the device.
        if self.pushover_device:
            notification_data['device'] = self.pushover_device

        try:
            r = requests.post("https://api.pushover.net/1/messages.json", data=notification_data)

            if r.status_code != 200:
                logger.error("Failed to send notification, (%d): %s" % (r.status_code, r.text))
        except requests.ConnectionError, e:
            logger.error("Connection Error:", e)
        except requests.HTTPError, e:
            logger.error("HTTP Error:", e)
response.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def raise_for_status(self, allow_redirects=True):
        """Raises stored :class:`HTTPError` or :class:`URLError`, if one occurred."""

        if self.status_code == 304:
            return
        elif self.error:
            if self.traceback:
                six.reraise(Exception, Exception(self.error), Traceback.from_string(self.traceback).as_traceback())
            http_error = HTTPError(self.error)
        elif (self.status_code >= 300) and (self.status_code < 400) and not allow_redirects:
            http_error = HTTPError('%s Redirection' % (self.status_code))
        elif (self.status_code >= 400) and (self.status_code < 500):
            http_error = HTTPError('%s Client Error' % (self.status_code))
        elif (self.status_code >= 500) and (self.status_code < 600):
            http_error = HTTPError('%s Server Error' % (self.status_code))
        else:
            return

        http_error.response = self
        raise http_error
custodia.py 文件源码 项目:commissaire-service 作者: projectatomic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _delete(self, model_instance):
        """
        Deletes a serialized SecretModel string from Custodia.

        :param model_instance: SecretModel instance to delete.
        :type model_instance: commissaire.model.SecretModel
        :raises StorageLookupError: if data lookup fails (404 Not Found)
        :raises requests.HTTPError: if the request fails (other than 404)
        """
        url = self._build_key_url(model_instance)

        try:
            response = self.session.request(
                'DELETE', url, timeout=self.CUSTODIA_TIMEOUT)
            response.raise_for_status()
        except requests.HTTPError as error:
            # XXX bool(response) defers to response.ok, which is a misfeature.
            #     Have to explicitly test "if response is None" to know if the
            #     object is there.
            have_response = response is not None
            if have_response and error.response.status_code == 404:
                raise StorageLookupError(str(error), model_instance)
            else:
                raise error
gogs.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect(self):
        self.gg.setup(self.url_ro)
        self.gg.set_token(self._privatekey)
        self.gg.set_default_private(self.default_create_private)
        self.gg.setup_session(
                self.session_certificate or not self.session_insecure,
                self.session_proxy)
        try:
            self.username = self.user  # Call to self.gg.authenticated_user()
        except HTTPError as err:
            if err.response is not None and err.response.status_code == 401:
                if not self._privatekey:
                    raise ConnectionError('Could not connect to GoGS. '
                                          'Please configure .gitconfig '
                                          'with your gogs private key.') from err
                else:
                    raise ConnectionError('Could not connect to GoGS. '
                                          'Check your configuration and try again.') from err
            else:
                raise err
content.py 文件源码 项目:kacpaw 作者: benburrill 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _comment_exists(self):
        """
        This is yet another horrible workaround to keep this from falling apart.

        Because of the other workarounds to get around the fact that you
        cannot get metadata from a comment, we've run into problems where it
        is not always obvious if a comment even exists.  For this reason, we
        need to have a method to test it for us.
        """
        try:
            # Getting the reply data from a comment should raise an error if
            # it doesn't exist.  We use Comment's get_reply_data because
            # ProgramCommentReply's get_reply_data is another one of those
            # horrible workarounds
            list(Comment(self.id, self.get_program()).get_reply_data())
        except requests.HTTPError:
            return False
        return True
content.py 文件源码 项目:kacpaw 作者: benburrill 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_metadata(self):
        """Returns a dictionary with information about this ``ProgramCommentReply``."""
        # there's no way that I've found to get comment reply metadata directly, 
        # so we iterate though the comment thread until you find this comment
        for comment_data in self.get_parent().get_reply_data():
            if comment_data["key"] == self.id:
                return comment_data
        raise _CommentDoesntExistError(self)

        # I'm keeping this todo until I can fully address it, although I did
        # add an error

        # todo: raise some error instead of returning None.  What error?  IDK.
        # I'm almost tempted to pretend it's an HTTPError, but I'll need to do
        # some research into why we would get here (We can get here btw.
        # That's how I found this).  Would self.get_parent().get_reply_data()
        # raise an HTTPError if self.comment_key was nonsense?  If that's the
        # case, we will only (probably) be here if comment_key was a
        # ProgramComment key instead of a ProgramCommentReply key, so we would
        # probably want to have an error that's more specific (like TypeError
        # maybe?).  Man, there are so many edge cases with this stuff that I
        # really should look into now that I think about it...  We are also
        # probably going to need to keep a lot of this comment to explain why
        # we raise the error we do.
client.py 文件源码 项目:voucherify-python-sdk 作者: voucherifyio 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request(self, path, method='GET', **kwargs):
        try:
            url = ENDPOINT_URL + path

            response = requests.request(
                method=method,
                url=url,
                headers=self.headers,
                timeout=self.timeout,
                **kwargs
            )
        except requests.HTTPError as e:
            response = json.loads(e.read())
        except requests.ConnectionError as e:
            raise VoucherifyError(e)

        if response.headers.get('content-type') and 'json' in response.headers['content-type']:
            result = response.json()
        else:
            result = response.text

        if isinstance(result, dict) and result.get('error'):
            raise VoucherifyError(result)

        return result
gen_test_inputs.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_from_chain(url_adder):
    url = 'https://api.chain.com/v2/bitcoin/%s' % (url_adder)

    ok = False
    while not ok:
        try:
            r = requests.get(url, auth=(CHAIN_API_KEY, CHAIN_API_SECRET))
            r.raise_for_status()
            ok = True
        except requests.HTTPError as e:
            if r.status_code == 429:  # Too many requests
                time.sleep(1)
            else:
                print("Request was to %s" % (url))
                raise e
    b = json.loads(r.text)

    return b
helper.py 文件源码 项目:TestRewrite 作者: osqa-interns 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def goto_course_list(self):
        """Go to the course picker."""
        long_wait = WebDriverWait(self.driver, 30)
        try:
            long_wait.until(
                expect.presence_of_element_located(
                    (By.ID, 'ox-react-root-container')
                )
            )
            if 'tutor' in self.current_url():
                self.find(By.CSS_SELECTOR, '.ui-brand-logo').click()
                self.page.wait_for_page_load()
            else:
                raise HTTPError('Not currently on an OpenStax Tutor webpage:' +
                                '%s' % self.current_url())
        except Exception as ex:
            raise ex
push_to_api.py 文件源码 项目:cobra 作者: wufeifei 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def push(self):
        """
        Push data to API.
        :return: push success or not
        """

        try:
            re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)})

            result = re.json()
            if result.get("vul_pdf", "") != "":
                logger.info('[PUSH API] Push success!')
                return True
            else:
                logger.warning('[PUSH API] Push result error: {0}'.format(re.text))
                return False
        except (requests.ConnectionError, requests.HTTPError) as error:
            logger.critical('[PUSH API] Network error: {0}'.format(str(error)))
            return False
        except ValueError as error:
            logger.critical('[PUSH API] Response error: {0}'.format(str(error)))
            return False


问题


面经


文章

微信
公众号

扫码关注公众号