python类Timeout()的实例源码

hanlon_model.py 文件源码 项目:dcaf 作者: dxc-technology 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check_model_state(self):
        base_url = self.module.params['base_url']
        model_name = self.module.params['label']

        uri = "%s/model" % base_url
        try:
            json_result, http_success = hanlon_get_request(uri)

            for response in json_result['response']:
                uri = response['@uri']
                model, http_success = hanlon_get_request(uri)
                if http_success:
                    model_response = model['response']
                    if model_response['@label'] == model_name:
                        return 'present', model_response['@uuid']
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        return 'absent', None
hanlon_model.py 文件源码 项目:dcaf 作者: dxc-technology 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def state_destroy_model(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/model/%s" % (base_url, uuid)

        try:
            req = requests.delete(uri)
            if req.status_code == 200:
                self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
test_exc.py 文件源码 项目:globus-sdk-python 作者: globus 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_convert_request_exception(self):
        """
        Converts known request exceptions into Globus NetworkErrors,
        confirms expected values.
        """
        # NetworkError
        conv = convert_request_exception(self.exc)
        self.assertIsInstance(conv, NetworkError)
        self.assertEqual(conv.underlying_exception.args,
                         self.exc.args)
        # Timeout Error
        conv = convert_request_exception(self.timeout_exc)
        self.assertIsInstance(conv, GlobusTimeoutError)
        self.assertEqual(conv.underlying_exception.args,
                         self.timeout_exc.args)
        # Connection Error
        conv = convert_request_exception(self.connection_exc)
        self.assertIsInstance(conv, GlobusConnectionError)
        self.assertEqual(conv.underlying_exception.args,
                         self.connection_exc.args)
credentials.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
    d = {}
    try:
        r = requests.get(url, timeout=.1)
        if r.content:
            fields = r.content.split('\n')
            for field in fields:
                if field.endswith('/'):
                    d[field[0:-1]] = get_iam_role(url + field)
                else:
                    val = requests.get(url + field).content
                    if val[0] == '{':
                        val = json.loads(val)
                    else:
                        p = val.find('\n')
                        if p > 0:
                            val = r.content.split('\n')
                    d[field] = val
    except (requests.Timeout, requests.ConnectionError):
        pass
    return d
proxy_chicken.py 文件源码 项目:Leancloud_imxie_cloud 作者: xcc3641 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parse_proxy(proxies_url):
    proxies['http'] = proxies_url
    check = False
    try:
        r = requests.get('http://www.baidu.com', proxies=proxies, timeout=5, headers=header_info)
        if r and r.status_code == 200:
            logging.info('===========Successful===============')
            logging.info('|| ???? ||----> ??: (%f)s  ??IP: (%s) ' % (r.elapsed.total_seconds(), proxies_url))
            logging.info('====================================')
            # can_be_use.append(proxies['http'])
            model.save_proxy(proxies_url)
            check = True
    except (requests.ConnectionError, requests.Timeout):
        logging.info(u'|| ?? or ???? ?? ||----> ??IP: (%s) ' % proxies_url)
        pass
    except Exception as e:
        logging.warn(e)
        pass
    return check
EnginXLink.py 文件源码 项目:Service-Descovery-Stack 作者: Tinhorn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def watch_services_node(url, file):
    payload = {'recursive': 'true', 'wait': 'true'}
    while True:
        try:
            r = requests.get(url=url, params=payload)
            return_body = r.json()

            # Checking that return dict is empty
            if return_body:
                watch_thread = watchetcdutils.ChangeNginxThread(confloc=file, thread_id=watchthreadid, payload=return_body,
                                                                lock=thread_lock)
                watch_thread.start()
                # confutils = nginxconfutils.NginxConfUtils(confloc=file, thread_id=watchthreadid)
                # confutils.load_conf()

        except requests.Timeout:
            logger.info("Timeout happened")
connection.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def http_request(self, method, url, **kwargs):
        method = method.upper()

        verify_ssl = kwargs.pop('verify', None) or self.ssl_verify
        proxies = kwargs.pop('proxies', None) or self.proxies

        new_headers = kwargs.pop('headers', None)
        if new_headers:
            headers = self.token_header.copy()
            headers.update(new_headers)
        else:
            headers = self.token_header

        uri = self.server + url

        try:
            raw_data = kwargs.get("data", None)
            if raw_data:
                log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data))
            r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies,
                                     timeout=self._timeout, **kwargs)
            log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url,
                                                                               calculate_elapsed_time(r.elapsed),
                                                                               r.status_code))
        except requests.Timeout as timeout_error:
            raise TimeoutError(uri=uri, original_exception=timeout_error)
        except requests.ConnectionError as connection_error:
            raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server,
                                                                                          str(connection_error)),
                           original_exception=connection_error)
        except Exception as e:
            raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)),
                           original_exception=e)
        else:
            if r.status_code == 404:
                raise ObjectNotFoundError(uri=uri, message=r.text)
            elif r.status_code == 401:
                raise UnauthorizedError(uri=uri, action=method, message=r.text)
            elif r.status_code >= 400:
                raise ServerError(error_code=r.status_code, message=r.text)
            return r
test_connections.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_remote_status_timeout():
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.side_effect = requests.Timeout
        conn = Connection(**mock_connection_params)
        _, remote_status = conn.status(remote=True)
        assert remote_status["status"].startswith("Unreachable")
test_connections.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_get_timeout():
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.side_effect = requests.Timeout
        conn = Connection(**dict(mock_connection_params, retry_max_attempts=7))
        pytest.raises(UnavailableError, conn.make_call, "")
        assert mock_get.call_count == 7
test_connections.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_post_timeout():
    with mock.patch("umapi_client.connection.requests.Session.post") as mock_post:
        mock_post.side_effect = requests.Timeout
        conn = Connection(**dict(mock_connection_params, retry_max_attempts=2))
        pytest.raises(UnavailableError, conn.make_call, "", [3, 5])
        assert mock_post.call_count == 2
manager.py 文件源码 项目:db_wlan_manager 作者: sistason 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _make_request(self, url, protocol='https'):
        try:
            return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False)
        except requests.Timeout:
            return False
        except requests.ConnectionError as e:
            logging.debug('Connection Error: {}'.format(e))
            return False
__init__.py 文件源码 项目:python-confidant-client 作者: lyft 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_service(self, service, decrypt_blind=False):
        """Get a service's metadata and secrets."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/services/{1}'.format(self.config['url'], service),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200, 404]):
            return ret
        if response.status_code == 404:
            logging.debug('Service not found in confidant.')
            ret['result'] = True
            return ret
        try:
            data = response.json()
            if decrypt_blind:
                data['blind_credentials'] = self._decrypt_blind_credentials(
                    data['blind_credentials']
                )
        except ValueError:
            logging.exception(
                'Received badly formatted json data from confidant.'
            )
            return ret
        ret['service'] = data
        ret['result'] = True
        return ret
__init__.py 文件源码 项目:python-confidant-client 作者: lyft 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_blind_credential(self, id, decrypt_blind=False):
        """Get a blind credential from ID."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/blind_credentials/{1}'.format(self.config['url'], id),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200, 404]):
            return ret
        if response.status_code == 404:
            logging.debug('Blind credential not found in confidant.')
            ret['result'] = False
            return ret
        try:
            data = response.json()
            if decrypt_blind:
                data['decrypted_credential_pairs'] = self._get_decrypted_pairs(
                    data
                )
        except ValueError:
            logging.error('Received badly formatted json data from confidant.')
            return ret
        ret['blind_credential'] = data
        ret['result'] = True
        return ret
__init__.py 文件源码 项目:python-confidant-client 作者: lyft 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def list_blind_credentials(self):
        """Get a list of blind credentials."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/blind_credentials'.format(self.config['url']),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200]):
            return ret
        try:
            data = response.json()
        except ValueError:
            logging.error('Received badly formatted json data from confidant.')
            return ret
        ret['blind_credentials'] = data['blind_credentials']
        ret['result'] = True
        return ret
pub.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def scrape_page_for_open_location(self, my_webpage):
        # logger.info(u"scraping", url)
        try:
            my_webpage.scrape_for_fulltext_link()

            if my_webpage.error:
                self.error += my_webpage.error

            if my_webpage.is_open:
                my_open_location = my_webpage.mint_open_location()
                self.open_locations.append(my_open_location)
                # logger.info(u"found open version at", webpage.url)
            else:
                # logger.info(u"didn't find open version at", webpage.url)
                pass

        except requests.Timeout, e:
            self.error += "Timeout in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.ConnectionError, e:
            self.error += "ConnectionError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.ChunkedEncodingError, e:
            self.error += "ChunkedEncodingError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.RequestException, e:
            self.error += "RequestException in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except etree.XMLSyntaxError, e:
            self.error += "XMLSyntaxError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except Exception, e:
            self.error += "Exception in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
gitapi.py 文件源码 项目:GitHub-Repo-Recommender 作者: frankyjuang 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def req(url, hdr):
    try:
        res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0)
    except requests.Timeout:
        raise RequestTimeoutError(url)
    except requests.ConnectionError:
        raise RequestTimeoutError(url)
    if res.status_code != 200:
        raise StatusCodeError(url, res.status_code)
    return res
subhd.py 文件源码 项目:GetSubtitles 作者: gyh1621 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def download_file(self, file_name, sub_url):

        """ ????????? ?????? ??????????????? """

        sid = sub_url.split('/')[-1]

        r = requests.post('http://subhd.com/ajax/down_ajax',
                          data={'sub_id': sid},
                          headers=self.headers)

        content = r.content.decode('unicode-escape')
        if json.loads(content)['success'] is False:
            return None, None, 'false'
        res = re.search('http:.*(?=")', r.content.decode('unicode-escape'))
        download_link = res.group(0).replace('\\/', '/')
        try:
            with closing(requests.get(download_link, stream=True)) as response:
                chunk_size = 1024  # ???????
                # ??????
                content_size = int(response.headers['content-length'])
                bar = ProgressBar(prefix + ' Get',
                                  file_name.strip(), content_size)
                sub_data_bytes = b''
                for data in response.iter_content(chunk_size=chunk_size):
                    sub_data_bytes += data
                    bar.refresh(len(sub_data_bytes))
            # sub_data_bytes = requests.get(download_link, timeout=10).content
        except requests.Timeout:
            return None, None, 'false'
        if 'rar' in download_link:
            datatype = '.rar'
        elif 'zip' in download_link:
            datatype = '.zip'
        elif '7z' in download_link:
            datatype = '.7z'
        else:
            datatype = 'Unknown'

        return datatype, sub_data_bytes, 'success'
zimuzu.py 文件源码 项目:GetSubtitles 作者: gyh1621 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_file(self, file_name, sub_url):

        """ ????????? ?????? ??????????????? """

        s = requests.session()
        r = s.get(sub_url, headers=self.headers)
        bs_obj = BeautifulSoup(r.text, 'html.parser')
        a = bs_obj.find('div', {'class': 'subtitle-links'}).a
        download_link = a.attrs['href']

        try:
            with closing(requests.get(download_link, stream=True)) as response:
                chunk_size = 1024  # ???????
                # ??????
                content_size = int(response.headers['content-length'])
                bar = ProgressBar(prefix + ' Get',
                                  file_name.strip(), content_size)
                sub_data_bytes = b''
                for data in response.iter_content(chunk_size=chunk_size):
                    sub_data_bytes += data
                    bar.refresh(len(sub_data_bytes))
            # sub_data_bytes = requests.get(download_link, timeout=10).content
        except requests.Timeout:
            return None, None
        if 'rar' in download_link:
            datatype = '.rar'
        elif 'zip' in download_link:
            datatype = '.zip'
        elif '7z' in download_link:
            datatype = '.7z'
        else:
            datatype = 'Unknown'

        return datatype, sub_data_bytes
http.py 文件源码 项目:eater 作者: alexhayes 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def request(self, **kwargs) -> Model:
        """
        Make a HTTP request of of type method.

        You should generally leave this method alone. If you need to customise the behaviour use the methods that
        this method uses.
        """
        kwargs = self.get_request_kwargs(request_model=self.request_model, **kwargs)

        # get_request_kwargs can permanently alter the url, method and session
        self.url = kwargs.pop('url', self.url)
        self.method = kwargs.pop('method', self.method)
        self.session = kwargs.pop('session', self.session)

        try:
            response = getattr(self.session, self.method)(self.url, **kwargs)
            return self.create_response_model(response, self.request_model)

        except requests.Timeout:
            raise EaterTimeoutError("%s.%s for URL '%s' timed out." % (
                type(self).__name__,
                self.method,
                self.url
            ))

        except requests.RequestException as exc_info:
            raise EaterConnectError("Exception raised for URL '%s'." % self.url) from exc_info
common.py 文件源码 项目:tintri-python-sdk 作者: Tintri 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def download_file(self, report_url, file_name):
        """
        Downloads the file pointed by URL.

        Args:

            report_url (str): URL returned from API from which file can be downloaded
            file_name (str): Name to be used for downloaded file 
        """
        headers = {'content-type': 'application/json'}

        try:
            r = requests.get(report_url, headers=headers, verify=False, stream=True)
            if r.status_code != 200:
                message = "The HTTP response for get call on: %s is %s" % (report_url, r.status_code)
                raise TintriServerError(r.status_code, message=message)

            with open(file_name, 'w') as file_h:
                for block in r.iter_content(4096):
                    file_h.write(block)

        except TintriServerError:
            raise    
        except requests.ConnectionError:
            raise TintriError("API Connection error occurred.")
        except requests.HTTPError:
            raise TintriError("HTTP error occurred.")
        except requests.Timeout:
            raise TintriError("Request timed out.")
        except Exception as e:
            raise TintriError("An unexpected error occurred: " + e.__str__())


问题


面经


文章

微信
公众号

扫码关注公众号