python类ConnectionError()的实例源码

tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def api_put(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.put(url, data=json.dumps(payload),
                         headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# POST
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def api_post(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.post(url, data=json.dumps(payload),
                          headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# Login.
tintri_1_1.py 文件源码 项目:tintri-stats 作者: genebean 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def download_file(server_name, report_url, session_id, file_name):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.get(report_url, headers=headers, verify=False, stream=True)
        # if HTTP Response is not 200 then raise an exception
        if r.status_code != 200:
            message = "The HTTP response for get call to the server is not 200."
            raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)

        with open(file_name, 'w') as file_h:
            for block in r.iter_content(4096):
                file_h.write(block)

    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except Exception as e:
        raise TintriRequestsException("An unexpected error: " + e.__str__())
cprp.py 文件源码 项目:fedoidc 作者: OpenIDC 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def index(self, uid='', iss=''):
        link = ''
        if iss:
            link = iss
        elif uid:
            try:
                link = self.rph.find_srv_discovery_url(
                    resource="acct:{}".format(uid))
            except requests.ConnectionError:
                raise cherrypy.HTTPError(
                    message="Webfinger lookup failed, connection error")
        else:
            fname = os.path.join(self.html_home, 'opbyuid.html')
            return as_bytes(open(fname, 'r').read())

        if link:
            resp_headers = self.rph.begin(link)
            raise cherrypy.HTTPRedirect(resp_headers['Location'])
test_redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
        mock_get.side_effect = requests.ConnectionError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        mock_get.asset_called_once_with('url',
                                        auth=auth.HTTPBasicAuth('username',
                                                                'password'))
        # SSL Error
        mock_get.side_effect = requests.exceptions.SSLError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 2)
        # Timeout
        mock_get.side_effect = requests.Timeout
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 3)
api_testcase_with_test_reset.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def supervisor_controlled_processes_running(self):
        # Use the api to verify the processes controlled by supervisor are all in a RUNNING state
        try:
            response = self.chroma_manager.get('/api/system_status/')
        except requests.ConnectionError:
            logger.warning("Connection error trying to connect to the manager")
            return False

        self.assertEqual(response.successful, True, response.text)
        system_status = response.json
        non_running_processes = []
        for process in system_status['supervisor']:
            if not process['statename'] == 'RUNNING':
                non_running_processes.append(process)

        if non_running_processes:
            logger.warning("Supervisor processes found not to be running: '%s'" % non_running_processes)
            return False
        else:
            return True
doubanMovieCralwer.py 文件源码 项目:DoubanMovieCrawlerUI 作者: Qua4re 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get(self,url):
        try:
            resp = requests.get(url=url,headers=headers.get_header())
            self.notifylineEdit_4.emit(u"[+] ???? "+url)
            return resp

        except requests.ConnectionError:
            # self.log.error(traceback.print_exc())
            # print traceback.print_exc()
            self.log.error(u'??{0}??'.format(url))
            #   ???????3?
            time.sleep(3)

        except Exception as e:
            # self.log.error(traceback.print_exc())
            print traceback.print_exc()
            self.log.error(u'??{0}??????{1}'.format(url,e))
            # return

    #   ??????
    #   http://movie.douban.com/subject/10533913/photos?type=R
    #   ??  http://img3.douban.com/view/photo/thumb/public/p1812524514.jpg
    #   ???? http://img3.douban.com/view/photo/photo/public/p1812524514.jpg

    #   ???????images,??????
tasks.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def create_external_export(username, id_string, export_id, query=None,
                           token=None, meta=None):
    export = Export.objects.get(id=export_id)
    try:
        # though export is not available when for has 0 submissions, we
        # catch this since it potentially stops celery
        gen_export = generate_external_export(
            Export.EXTERNAL_EXPORT, username,
            id_string, export_id, token, query, meta
        )
    except (Exception, NoRecordsFoundError, ConnectionError) as e:
        export.internal_status = Export.FAILED
        export.save()
        # mail admins
        details = {
            'export_id': export_id,
            'username': username,
            'id_string': id_string
        }
        report_exception("External Export Exception: Export ID - "
                         "%(export_id)s, /%(username)s/%(id_string)s"
                         % details, e, sys.exc_info())
        raise
    else:
        return gen_export.id
test_configuration.py 文件源码 项目:cachet-url-monitor 作者: mtakaki 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        def getLogger(name):
            self.mock_logger = mock.Mock()
            return self.mock_logger

        sys.modules['logging'].getLogger = getLogger

        def get(url, headers):
            get_return = mock.Mock()
            get_return.ok = True
            get_return.json = mock.Mock()
            get_return.json.return_value = {'data': {'status': 1}}
            return get_return

        sys.modules['requests'].get = get

        self.env = EnvironmentVarGuard()
        self.env.set('CACHET_TOKEN', 'token2')

        self.configuration = Configuration('config.yml')
        sys.modules['requests'].Timeout = Timeout
        sys.modules['requests'].ConnectionError = ConnectionError
        sys.modules['requests'].HTTPError = HTTPError
img_tools.py 文件源码 项目:visualize-tsne 作者: YontiLevin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def url_to_img_array(url):
    if not isinstance(url, basestring):
        logging.warning("input is neither an ndarray nor a string, so I don't know what to do")
        return None

    # replace_https_with_http:
    if 'http' in url and 'https' not in url:
        url = url.replace("https", "http")
    try:
        headers = {'User-Agent': USER_AGENT}
        response = requests.get(url, headers=headers)
        img_array = cv2.imdecode(np.asarray(bytearray(response.content)), 1)
    except requests.ConnectionError:
        logging.warning("connection error - check url or connection")
        return None
    except:
        logging.warning(" error other than connection error - check something other than connection")
        return None

    return img_array
mon.py 文件源码 项目:cephmetrics 作者: ceph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_event(url, tag_name, event_message):

        headers = {"Content-Type": "application/json"}

        try:
            r = requests.post(url,
                              headers=headers,
                              data='{{"what":"Ceph Health",'
                                   '"tags":"{}",'
                                   '"data":"{}"}}'.format(tag_name,
                                                          event_message))
        except requests.ConnectionError:
            # if we hit this, the endpoint wasn't there (graphite web was not
            # accessible) so identify that issue as a server error (500)
            return 500

        else:
            return r.status_code
mon.py 文件源码 项目:cephmetrics 作者: ceph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post_event(url, tag_name, event_message):

        headers = {"Content-Type": "application/json"}

        try:
            r = requests.post(url,
                              headers=headers,
                              data='{{"what":"Ceph Health",'
                                   '"tags":"{}",'
                                   '"data":"{}"}}'.format(tag_name,
                                                          event_message))
        except requests.ConnectionError:
            # if we hit this, the endpoint wasn't there (graphite web was not
            # accessible) so identify that issue as a server error (500)
            return 500

        else:
            return r.status_code
file.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def post(self, printer_id):
        """Send file from on printer to defined printers or prints given file"""
        args = deleteParser.parse_args()
        printer = g.user.get_printer_id(printer_id)
        if not printer:
            return "", 403
        if args["send"]:  # send file from one printer to defined printers
            printer_ids = sendParser.parse_args()
            printers = g.user.get_accessible_printers_id(printer_ids["printerId"])
            content = OctoprintService.get_file_contents(printer, args["origin"], args["name"])
            for dest_printer in printers:
                try:
                    OctoprintService.send_file(dest_printer, args["name"], content, False)
                except (RuntimeError, requests.ConnectionError):
                    pass
            return "", 200
        else:  # print file
            if OctoprintService.print(printer, args["origin"], args["name"]):
                return "", 200

            return "", 409
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_run_socket_error(self, m_post):
        m_post.side_effect = requests.ConnectionError
        result = self._test_run('DEL', 'delNetwork', m_post)
        self.assertEqual(1, result)
disney_api.py 文件源码 项目:wait4disney 作者: gtt116 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_wait_time_list():
    for _ in xrange(3):
        try:
            token = get_token()
            response = get_wait_time(token)
        except (requests.HTTPError, requests.ConnectionError):
            time.sleep(1)
        else:
            return response
connection.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def http_request(self, method, url, **kwargs):
        method = method.upper()

        verify_ssl = kwargs.pop('verify', None) or self.ssl_verify
        proxies = kwargs.pop('proxies', None) or self.proxies

        new_headers = kwargs.pop('headers', None)
        if new_headers:
            headers = self.token_header.copy()
            headers.update(new_headers)
        else:
            headers = self.token_header

        uri = self.server + url

        try:
            raw_data = kwargs.get("data", None)
            if raw_data:
                log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data))
            r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies,
                                     timeout=self._timeout, **kwargs)
            log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url,
                                                                               calculate_elapsed_time(r.elapsed),
                                                                               r.status_code))
        except requests.Timeout as timeout_error:
            raise TimeoutError(uri=uri, original_exception=timeout_error)
        except requests.ConnectionError as connection_error:
            raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server,
                                                                                          str(connection_error)),
                           original_exception=connection_error)
        except Exception as e:
            raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)),
                           original_exception=e)
        else:
            if r.status_code == 404:
                raise ObjectNotFoundError(uri=uri, message=r.text)
            elif r.status_code == 401:
                raise UnauthorizedError(uri=uri, action=method, message=r.text)
            elif r.status_code >= 400:
                raise ServerError(error_code=r.status_code, message=r.text)
            return r
cluster_auto_start_daemon.py 文件源码 项目:sm-engine-ansible 作者: METASPACE2020 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _send_rest_request(self, address):
        try:
            resp = requests.get(address)
        except ConnectionError as e:
            self.logger.debug('{} - {}'.format(address, e))
            return False
        except Exception as e:
            self.logger.warning('{} - {}'.format(address, e))
            return False
        else:
            self.logger.debug(resp)
            return resp.ok
manager.py 文件源码 项目:db_wlan_manager 作者: sistason 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _make_request(self, url, protocol='https'):
        try:
            return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False)
        except requests.Timeout:
            return False
        except requests.ConnectionError as e:
            logging.debug('Connection Error: {}'.format(e))
            return False
manager.py 文件源码 项目:db_wlan_manager 作者: sistason 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def login(self):
        """ Log in to the ICE Portal (wifionice) """
        logging.info('Trying to log in...')
        try:
            ret = self.session.post('http://{}/de/?login'.format(self.api_host_ip),
                                    data={'login': True, 'CSRFToken': self.csrf_token})
        except requests.exceptions.ConnectionError:
            logging.debug('Login Failed, probably bad wifi')
elogger.py 文件源码 项目:pylogging 作者: ansrivas 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __log(self, msg, level):
        payload = {'app_name': str(self.app_name), 'log_level': level, 'message': msg}
        try:
            self.session.post(url=self.elastic_url, data=json.dumps(payload),
                              headers=self.headers, auth=self.auth)
        except ConnectionError as ce:
            logging.error(ce.message)
            logging.exception('Unable to connect to Elastic Search. Check the `elastic_url` and `auth`')
            logging.error(msg)


问题


面经


文章

微信
公众号

扫码关注公众号