python类RequestException()的实例源码

schematizer.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _call_api(self, api, params=None, request_body=None):
        request_params = params or {}
        if request_body:
            request_params['body'] = request_body
        request = api(**request_params)
        retry_policy = RetryPolicy(
            ExpBackoffPolicy(with_jitter=True),
            max_retry_count=get_config().schematizer_client_max_connection_retry
        )
        response = retry_on_exception(
            retry_policy=retry_policy,
            retry_exceptions=RequestException,
            func_to_retry=self._get_api_result,
            request=request
        )
        return response
M_GetUserVideo_Oracle.py 文件源码 项目:danmu-bilibili 作者: saberxxy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
M_GetUserVideo_Oracle.py 文件源码 项目:danmu-bilibili 作者: saberxxy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
dns_common_lexicon.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_txt_record(self, domain, record_name, record_content):
        """
        Add a TXT record using the supplied information.

        :param str domain: The domain to use to look up the managed zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :raises errors.PluginError: if an error occurs communicating with the DNS Provider API
        """
        self._find_domain_id(domain)

        try:
            self.provider.create_record(type='TXT', name=record_name, content=record_content)
        except RequestException as e:
            logger.debug('Encountered error adding TXT record: %s', e, exc_info=True)
            raise errors.PluginError('Error adding TXT record: {0}'.format(e))
dns_common_lexicon.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def del_txt_record(self, domain, record_name, record_content):
        """
        Delete a TXT record using the supplied information.

        :param str domain: The domain to use to look up the managed zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :raises errors.PluginError: if an error occurs communicating with the DNS Provider  API
        """
        try:
            self._find_domain_id(domain)
        except errors.PluginError as e:
            logger.debug('Encountered error finding domain_id during deletion: %s', e,
                         exc_info=True)
            return

        try:
            self.provider.delete_record(type='TXT', name=record_name, content=record_content)
        except RequestException as e:
            logger.debug('Encountered error deleting TXT record: %s', e, exc_info=True)
__init__.py 文件源码 项目:marconibot 作者: s4w3d0ff 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _retry(func):
        """ retry decorator """
        @_wraps(func)
        def retrying(*args, **kwargs):
            problems = []
            for delay in _chain(retryDelays, [None]):
                try:
                    # attempt call
                    return func(*args, **kwargs)

                # we need to try again
                except RequestException as problem:
                    problems.append(problem)
                    if delay is None:
                        logger.debug(problems)
                        raise RetryException(
                            'retryDelays exhausted ' + str(problem))
                    else:
                        # log exception and wait
                        logger.debug(problem)
                        logger.info("-- retrying in %ds", delay)
                        sleep(delay)
        return retrying
company.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def requests_company_detail_data(company_id):
    """?????????"""
    headers = generate_http_header()
    crawler_sleep()
    try:
        response = requests.get(
            url=constants.COMPANY_DETAIL_URL.format(company_id=company_id),
            headers=headers,
            cookies=Cookies.get_random_cookies(),
            allow_redirects=False,
            timeout=constants.TIMEOUT)
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    html = etree.HTML(response.text)
    advantage = html.xpath('//div[@id="tags_container"]//li/text()')
    size = html.xpath('//div[@id="basic_container"]//li[3]/span/text()')
    address = html.xpath('//p[@class="mlist_li_desc"]/text()')
    introduce = html.xpath('//span[@class="company_content"]//text()')

    return format_tag(advantage, address, size, introduce, company_id)
company.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def request_company_json(url, page_no):
    prams = {
        'first': False,
        'pn': page_no,
        'sortField': 1,
        'havemark': 0,
    }
    headers = generate_http_header()
    crawler_sleep()
    try:
        cookies = Cookies.get_random_cookies()
        response_json = requests.get(
            url=url,
            params=prams,
            headers=headers,
            cookies=cookies,
            allow_redirects=False,
            timeout=constants.TIMEOUT).json()
        if 'totalCount' not in response_json:
            Cookies.remove_cookies(cookies)
            raise RequestsError(error_log='wrong response content')
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    return response_json
job.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requests_job_detail_data(job_id):
    """?????????"""
    headers = generate_http_header()
    crawler_sleep()
    try:
        response = requests.get(
            url=constants.JOB_DETAIL_URL.format(job_id=job_id),
            headers=headers,
            cookies=Cookies.get_random_cookies(),
            allow_redirects=False,
            timeout=constants.TIMEOUT)
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    html = etree.HTML(response.text)
    department = html.xpath('//div[@class="job-name"]/div[@class="company"]/text()')
    description = html.xpath('//dd[@class="job_bt"]/div//text()')
    keywords = html.xpath('//dd[@class="job_request"]//li[@class="labels"]/text()')
    return format_tag(department, description, keywords, job_id)
job.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request_job_json(company_id, page_no):
    prams = {
        'companyId': company_id,
        'positionFirstType': u"??",
        'pageNo': page_no,
        'pageSize': 10,
    }
    headers = generate_http_header()
    crawler_sleep()
    try:
        cookies = Cookies.get_random_cookies()
        response_json = requests.get(
            url=constants.COMPANY_JOB_URL,
            params=prams,
            headers=headers,
            cookies=cookies,
            timeout=constants.TIMEOUT).json()
        if 'content' not in response_json:
            Cookies.remove_cookies(cookies)
            raise RequestsError(error_log='wrong response content')
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    return response_json
helpers.py 文件源码 项目:directory-ui-buyer 作者: uktrade 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def store_companies_house_profile_in_session_and_validate(
        session, company_number
):
    try:
        store_companies_house_profile_in_session(
            session=session,
            company_number=company_number,
        )
    except RequestException as error:
        if error.response.status_code == http.client.NOT_FOUND:
            raise ValidationError(validators.MESSAGE_COMPANY_NOT_FOUND)
        else:
            raise ValidationError(validators.MESSAGE_COMPANY_ERROR)
    else:
        company_status = get_company_status_from_session(
            session
        )
        validators.company_active(company_status)
        validators.company_unique(company_number)
test_views.py 文件源码 项目:directory-ui-buyer 作者: uktrade 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_submit_enrolment_handles_api_company_not_found(client):

    with patch('enrolment.helpers.get_company_from_companies_house') as mock:
        mock.side_effect = RequestException(
            response=Mock(status_code=http.client.NOT_FOUND),
            request=Mock(),
        )

        response = client.get(
            reverse('register-submit'),
            {
                'company_number': '12345678',
                'has_exported_before': 'True',
            }
        )

    assert "Company not found" in str(response.content)
utorrent.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''
        lf = NamedTemporaryFile()
        lf.write(torrent_data)

        params = {'action': 'add-file', 'token': self.token}
        files = {'torrent_file': open(lf.name, 'rb')}
        try:
            response = requests.post(
                self.UTORRENT_URL,
                auth=self.auth,
                params=params,
                files=files,
                timeout=25).json()
            lf.close()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            lf.close()
            return False
utorrent.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_url(self, url: str, download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''

        params = {'action': 'add-url', 'token': self.token, 's': url}
        try:
            response = requests.get(
                self.UTORRENT_URL,
                auth=self.auth,
                # cookies=self.cookies,
                params=params,
                timeout=25).json()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            return False
docker_args.py 文件源码 项目:gopythongo 作者: gopythongo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_shared_args(args: configargparse.Namespace) -> None:
    if args.docker_tls_verify:
        if not os.path.isfile(args.docker_tls_verify) or not os.access(args.docker_tls_verify, os.R_OK):
            raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_verify))

    if args.docker_tls_client_cert:
        if not os.path.isfile(args.docker_tls_client_cert) or not os.access(args.docker_tls_client_cert, os.R_OK):
            raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_client_cert))

    try:
        x = int(args.docker_ssl_version)
        if x < 1 or x > 5:
            raise ErrorMessage("Unknown value %s for SSL Protocol version. Valid are values 1-5." %
                               args.docker_ssl_version)
    except ValueError:
        raise ErrorMessage("Parameter to --docker-ssl-version must be an integer between 1 and 5")

    dcl = get_docker_client(args)
    try:
        info = dcl.info()
    except RequestException as e:
        raise ErrorMessage("GoPythonGo can't talk to the Docker API at %s (Error was: %s)" %
                           (highlight(args.docker_api), str(e))) from e
requests_handler.py 文件源码 项目:ras-backstage 作者: ONSdigital 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def request_handler(method, url, params=None, auth=None, headers=None, json=None, data=None):
    try:
        if method == 'POST':
            response = requests.post(url, params=params, auth=auth, headers=headers,
                                     json=json, data=data)
        elif method == 'PUT':
            response = requests.put(url, params=params, auth=auth, headers=headers,
                                    json=json, data=data)
        else:
            response = requests.get(url, params=params, auth=auth, headers=headers)
    except RequestException as e:
        logger.exception('Failed to connect to external service',
                         method=method, url=url, exception=str(e))
        raise ApiError(url)

    logger.debug('Request response', method=method, url=url, status=response.status_code)
    return response
registration.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _send_cloud_req(req_method, req_path, err_prefix, **kwargs):
    url = conf['registry'] + req_path
    err = None
    try:
        r = req_method(url, **kwargs)
        if r.status_code == 200:
            return json.loads(r.text)
        else:
            err = RegistrationServerError(r, err_prefix)
    except socket.error as ex:
        err = RegistrationClientError(('socket error connecting to %s' %
                                       (url, )),
                                      ex, err_prefix)
    except RequestException as ex:
        err = RegistrationClientError('request to %s failed' % (url, ),
                                      ex, err_prefix)
    raise err
getcrypto.py 文件源码 项目:bumblebee-status 作者: tobi-wan-kenobi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getfromkrak(coin, currency):
    abbrev = {
        "Btc": ["xbt", "XXBTZ"],
        "Eth": ["eth", "XETHZ"],
        "Ltc": ["ltc", "XLTCZ"],
    }
    data = abbrev.get(coin, None)
    if not data: return
    epair = "{}{}".format(data[0], currency)
    tickname = "{}{}".format(data[1], currency.upper())
    try:
        krakenget = requests.get('https://api.kraken.com/0/public/Ticker?pair='+epair).json()
    except (RequestException, Exception):
        return "No connection"
    kethusdask = float(krakenget['result'][tickname]['a'][0])
    kethusdbid = float(krakenget['result'][tickname]['b'][0])
    return coin+": "+str((kethusdask+kethusdbid)/2)[0:6]
weather.py 文件源码 项目:bumblebee-status 作者: tobi-wan-kenobi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def update(self, widgets):
        timestamp = int(time.time())
        if self._nextcheck < timestamp:
            try:
                self._nextcheck = timestamp + self._interval*60
                weather_url = "http://api.openweathermap.org/data/2.5/weather?appid={}".format(self._apikey)
                weather_url = "{}&units={}".format(weather_url, self._unit)
                if self._location == "auto":
                    location_url = "http://ipinfo.io/json"
                    location = json.loads(requests.get(location_url).text)
                    coord = location["loc"].split(",")
                    self._city = location["city"]
                    weather_url = "{url}&lat={lat}&lon={lon}".format(url=weather_url, lat=coord[0], lon=coord[1])
                else:
                    weather_url = "{url}&q={city}".format(url=weather_url, city=self._location)
                weather = json.loads(requests.get(weather_url).text)
                self._temperature = int(weather['main']['temp'])
                self._weather = weather['weather'][0]['main'].lower()
                self._valid = True
            except RequestException:
                self._valid = False
            except Exception:
                self._valid = False

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
ping.py 文件源码 项目:kolibri 作者: learningequality 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):

        interval = float(options.get("interval", DEFAULT_PING_INTERVAL))
        checkrate = float(options.get("checkrate", DEFAULT_PING_CHECKRATE))
        server = options.get("server", DEFAULT_PING_SERVER_URL)

        self.started = datetime.now()

        while True:
            try:
                logging.info("Attempting a ping.")
                data = self.perform_ping(server)
                logging.info("Ping succeeded! (response: {}) Sleeping for {} minutes.".format(data, interval))
                time.sleep(interval * 60)
                continue
            except ConnectionError:
                logging.warn("Ping failed (could not connect). Trying again in {} minutes.".format(checkrate))
            except Timeout:
                logging.warn("Ping failed (connection timed out). Trying again in {} minutes.".format(checkrate))
            except RequestException as e:
                logging.warn("Ping failed ({})! Trying again in {} minutes.".format(e, checkrate))
            time.sleep(checkrate * 60)
download_plenum_meeting_protocols.py 文件源码 项目:knesset-data-pipelines 作者: hasadna 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1):
        try:
            res = self._reuqests_get(url)
        except RequestException as e:
            if retry_num < num_retries:
                logging.exception(e)
                logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries,
                                                                                           seconds_between_retries))
                time.sleep(seconds_between_retries)
                return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num + 1)
            else:
                raise
        if res.status_code == 200:
            object_storage.write(self.s3, bucket, object_name, res.content)
            return True
        else:
            return False
download_committee_meeting_protocols.py 文件源码 项目:knesset-data-pipelines 作者: hasadna 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1):
        try:
            res = self._reuqests_get(url)
        except RequestException as e:
            if retry_num < num_retries:
                logging.exception(e)
                logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries, seconds_between_retries))
                time.sleep(seconds_between_retries)
                return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num+1)
            else:
                raise
        if res.status_code == 200:
            object_storage.write(self.s3, bucket, object_name, res.content, public_bucket=True)
            return True
        else:
            return False
downloader.py 文件源码 项目:antioch 作者: pytube 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download_other(obj, timeout=0, chunk_size=config.DEFAULT_CHUNKSIZE):
    url = obj.get('url', None)
    if url is None:
        return None

    logger.info('downloading video: ' + url)

    try:
        response = requests.get(url, stream=True, timeout=timeout)

    except RequestErrors.RequestException as e:
        logger.error(e)
        return None

    else:
        filename = gen_filename(config.DROP_FOLDER_LOCATION, extension='movie')
        with open(filename, 'wb') as out_file:
            for chunk in response.iter_content(chunk_size=chunk_size):
                out_file.write(chunk)
        return filename
??Ajax????????.py 文件源码 项目:python3_crawl 作者: princewen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_page_index(offset,keyword):
    data = {
        'offset':offset,
        'format':'json',
        'keyword':keyword,
        'autoload':'true',
        'count':'20',
        'cur_tab':3
    }
    url = 'http://www.toutiao.com/search_content?'+urlencode(data)
    try:
        response = requests.get(url)
        if response.status_code==200:
            return response.text
        return None
    except RequestException:
        print ("??????")
        return None
github_helper.py 文件源码 项目:merge-bot 作者: jasonkuster 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch_prs(self):
        """Gets the PRs for the configured repository.

        Returns:
            A tuple of (pr_list, error).
            pr_list is an iterable of GithubPRs if successful, otherwise empty.
            error is None if successful, or the error message otherwise.
        """
        try:
            prs = self.get(GITHUB_PULLS_ENDPOINT)
            return [GithubPR(self, pr) for pr in prs], None
        except HTTPError as exc:
            return [], 'Non-200 HTTP Code Received: {}'.format(exc)
        except Timeout as exc:
            return [], 'Request timed out: {}'.format(exc)
        except RequestException as exc:
            return [], 'Catastrophic error in requests: {}'.format(exc)
github_helper.py 文件源码 项目:merge-bot 作者: jasonkuster 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self, endpoint):
        """get makes a GET request against a specified endpoint.

        Args:
            endpoint: URL to which to make the GET request. URL is relative
            to https://api.github.com/repos/{self.org}/{self.repo}/
        Returns:
            JSON object retrieved from the endpoint.
        Raises:
            HTTPError: if we get an HTTP error status.
            Timeout: for timeouts.
            RequestException: for other assorted exceptional cases.
        """
        url = urlparse.urljoin(self.repo_url, endpoint)
        resp = requests.get(url, auth=(BOT_NAME, self.bot_key))
        if resp.status_code is 200:
            return resp.json()
        resp.raise_for_status()
tasks.py 文件源码 项目:esdc-ce 作者: erigones 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def task_user_callback_cb(task_id, parent_task_id, cb, **kwargs):
    """
    Task for calling remote url in user defined callback
    """
    try:
        obj = get_vms_object(kwargs)
    except ObjectDoesNotExist:
        obj = None

    user = User.objects.get(id=user_id_from_task_id(parent_task_id))
    payload, status = get_task_status(parent_task_id)

    try:
        response = UserCallback(parent_task_id).request(cb, user.callback_key, payload)
    except RequestException as ex:
        status = states.FAILURE
        details = ex
    else:
        status = states.SUCCESS
        details = str(response.status_code) + ': ' + response.reason

    if cb.get('cb_log'):
        task_log(parent_task_id, LOG_REMOTE_CALLBACK, obj=obj, task_status=status, detail=details)
http.py 文件源码 项目:pandahouse 作者: kszucs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute(query, connection=None, data=None, external=None, stream=False):
    host, params, files = prepare(query, connection, external=external)

    response = requests.post(host, params=params, data=data,
                             stream=stream, files=files)

    try:
        response.raise_for_status()
    except RequestException as e:
        if response.content:
            raise ClickhouseException(response.content)
        else:
            raise e

    if stream:
        return response.raw
    else:
        return response.content
spider.py 文件源码 项目:spider_python3_scrapy 作者: chaiwenjun000 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_page_index(offset, keyword):
    data = {
        'offset':offset,
        'format':'json',
        'keyword':keyword,
        'autoload':'true',
        'count':20,
        'cur_tab':3
    }
    url = 'http://www.toutiao.com/search_content/?' + urlencode(data)
    print('([%d] ??????? %s' % (os.getpid(), url))
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        return None
    except RequestException:
        print('???????', url)
        return None
utils.py 文件源码 项目:inspace 作者: mochaoss 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_site_description(url):
    site_content = description = ''
    try:
        site_content = requests.get(url, timeout=1).content
    except RequestException:
        logger.warn('Connection error trying to connect to {}'.format(url))

    if site_content:
        target_markup = BeautifulSoup(site_content, 'html.parser')
        meta_description = target_markup.head.find('meta', {'name': 'description'})
        if meta_description:
            description = meta_description.get('content')
        else:
            first_paragraph = target_markup.find('p')
            if first_paragraph:
                description = first_paragraph.string
    return description


问题


面经


文章

微信
公众号

扫码关注公众号