python类HTTPError()的实例源码

hatebu_info.py 文件源码 项目:taikutsu_blog_works 作者: hokekiyoo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_category(url):
    try:
        html = request.urlopen("http://b.hatena.ne.jp/entry/{}".format(url))
        soup = BeautifulSoup(html,"lxml")
        return soup.find("html").get("data-category-name")
    except request.HTTPError as e:
        print(e.reason)
    except request.URLError as e:
        print(e.reason)

#??????????????????
hatebu_info.py 文件源码 项目:taikutsu_blog_works 作者: hokekiyoo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def is_hatenatop(url):
    try:
        html = request.urlopen("http://hatenablog.com/")
    except urllib.HTTPError as e:
        print(e.reason)
    except urllib.URLError as e:
        print(e.reason)
    soup = BeautifulSoup(html,"lxml")
    a = soup.find("a",href=url)
    if a is None:
        return False
    return url == a.get("href")
github.py 文件源码 项目:loghub 作者: spyder-ide 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_access_token(self, code, state=None):
        '''
        In callback url: http://host/callback?code=123&state=xyz

        use code and state to get an access token.        
        '''
        kw = dict(
            client_id=self._client_id,
            client_secret=self._client_secret,
            code=code)
        if self._redirect_uri:
            kw['redirect_uri'] = self._redirect_uri
        if state:
            kw['state'] = state
        opener = build_opener(HTTPSHandler)
        request = Request(
            'https://github.com/login/oauth/access_token',
            data=_encode_params(kw))
        request.get_method = _METHOD_MAP['POST']
        request.add_header('Accept', 'application/json')
        try:
            response = opener.open(request, timeout=TIMEOUT)
            r = _parse_json(response.read())
            if 'error' in r:
                raise ApiAuthError(str(r.error))
            return str(r.access_token)
        except HTTPError as e:
            raise ApiAuthError('HTTPError when get access token')
github.py 文件源码 项目:loghub 作者: spyder-ide 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _http(self, _method, _path, **kw):
        data = None
        params = None
        if _method == 'GET' and kw:
            _path = '%s?%s' % (_path, _encode_params(kw))
        if _method in ['POST', 'PATCH', 'PUT']:
            data = bytes(_encode_json(kw), 'utf-8')
        url = '%s%s' % (_URL, _path)
        opener = build_opener(HTTPSHandler)
        request = Request(url, data=data)
        request.get_method = _METHOD_MAP[_method]
        if self._authorization:
            request.add_header('Authorization', self._authorization)
        if _method in ['POST', 'PATCH', 'PUT']:
            request.add_header('Content-Type',
                               'application/x-www-form-urlencoded')

        class Resp():
            code = None

        resp = Resp()
        req = None

        try:
            response = opener.open(request, timeout=TIMEOUT)
            is_json = self._process_resp(response.headers)
            if is_json:
                return _parse_json(response.read().decode('utf-8'))
        except HTTPError as e:
            is_json = self._process_resp(e.headers)
            if is_json:
                json = _parse_json(e.read().decode('utf-8'))
            else:
                json = e.read().decode('utf-8')
            req = JsonObject(method=_method, url=url)
            resp = JsonObject(code=e.code, json=json)
        finally:
            if resp.code == 404:
                raise ApiNotFoundError(url, req, resp)
            elif req:
                raise ApiError(url, req, resp)
meter.py 文件源码 项目:insta_browser 作者: aLkRicha 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __get_profile_first_posts(self):
        url = self.__profile_fp_url.format(self.username)
        try:
            data = json.loads(self.__process_url(url))
        except simple_browser.HTTPError:
            raise ValueError('User not found.')

        self.user['un'] = self.username
        self.user['id'] = data['user']['id']
        self.user['fn'] = data['user']['full_name']
        self.user['b'] = data['user']['biography']
        self.user['pic'] = data['user']['profile_pic_url_hd']
        self.user['iv'] = data['user']['is_verified']
        self.user['ip'] = data['user']['is_private']
        self.user[COUNTERS_KEY] = {
            COUNT_KEY_FOLLOWING: data['user']['follows']['count'],
            COUNT_KEY_FOLLOWED_BY: data['user']['followed_by']['count'],
            COUNT_KEY_POSTS: data['user']['media']['count'],
            COUNT_KEY_IMAGE_POSTS: 0,
            COUNT_KEY_VIDEO_POSTS: 0,
            COUNT_KEY_ALBUM_POSTS: 0,
            COUNT_KEY_LIKES: 0,
            COUNT_KEY_COMMENTS: 0,
            COUNT_KEY_VIDEO_VIEWS: 0,
            COUNT_KEY_LIKES_PER_POST: 0,
            COUNT_KEY_COMMENTS_PER_POST: 0,
            COUNT_KEY_VIEWS_PER_POST: 0,
        }

        self.__send_success_callback('account', self.user)

        if not self.user['ip'] and self.user[COUNTERS_KEY][COUNT_KEY_POSTS]:
            self.__process_posts_first(data['user']['media']['nodes'])
            self.__tmp_req_info = data['user']['media']['page_info']
transport.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
speedtest_cli.py 文件源码 项目:test-server 作者: xtria 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def catch_request(request):
    """Helper function to catch common exceptions encountered when
    establishing a connection with a HTTP/HTTPS request

    """

    try:
        uh = urlopen(request)
        return uh, False
    except (HTTPError, URLError, socket.error):
        e = sys.exc_info()[1]
        return None, e
speedtest_cli.py 文件源码 项目:test-server 作者: xtria 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getBestServer(servers):
    """Perform a speedtest.net latency request to determine which
    speedtest.net server has the lowest latency
    """

    results = {}
    for server in servers:
        cum = []
        url = '%s/latency.txt' % os.path.dirname(server['url'])
        urlparts = urlparse(url)
        for i in range(0, 3):
            try:
                if urlparts[0] == 'https':
                    h = HTTPSConnection(urlparts[1])
                else:
                    h = HTTPConnection(urlparts[1])
                headers = {'User-Agent': user_agent}
                start = timeit.default_timer()
                h.request("GET", urlparts[2], headers=headers)
                r = h.getresponse()
                total = (timeit.default_timer() - start)
            except (HTTPError, URLError, socket.error):
                cum.append(3600)
                continue
            text = r.read(9)
            if int(r.status) == 200 and text == 'test=test'.encode():
                cum.append(total)
            else:
                cum.append(3600)
            h.close()
        avg = round((sum(cum) / 6) * 1000, 3)
        results[avg] = server
    fastest = sorted(results.keys())[0]
    best = results[fastest]
    best['latency'] = fastest

    return best
client.py 文件源码 项目:catFaceSwapSendToTodd 作者: Micasou 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_access_token(self, renew=False):
    """ Get an access token using your app_id and app_secret.

    You shouldn't need to call this method yourself. If there is no access token yet, this method
    will be called when a request is made. If a token expires, this method will also automatically
    be called to renew the token.

    Args:
      renew: if True, then force the client to get a new token (even if not expired). By default if
      there is already an access token in the client then this method is a no-op.
    """
    if self.access_token is None or renew:
      headers = {}  # don't use json here, juse urlencode.
      url = self._url_for_op('token')
      data = urlencode({'grant_type': 'client_credentials',
                               'client_id':self.CLIENT_ID,
                               'client_secret':self.CLIENT_SECRET})
      data = bytearray(data, 'utf-8')
      req = urllib2.Request(url, data, headers)
      try:
        response = urllib2.urlopen(req).read()
        response = self._parse_response(response)
      except urllib2.HTTPError as e:
        raise ApiError(e.reason)
      except Exception as e:
        raise ApiError(e)
      self.access_token = response['access_token']
    return self.access_token
sites.py 文件源码 项目:download-npo 作者: Carpetsmoker 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def urlopen(self, url):  # pylint:disable=no-self-use
        """ Open a URI; return urllib.request.Request object """
        if download_npo.verbose:
            msg('urlopen: ' + url)

        headers = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
            'Cookie': 'npo_cc=tmp; npo_cc_www.npogeschiedenis.nl=tmp',
        }
        req = urllib2.Request(url, headers=headers)
        try:
            return urllib2.urlopen(req)
        except urllib2.HTTPError:
            raise download_npo.Error(
                'De URL {} is niet gevonden (404 error)'.format(url))
getData.py 文件源码 项目:anime_recs 作者: Cpierse 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def crawl_club_users(cid):
    ''' Crawl through and get all user ids in a specific club'''
    club_users=[]
    page_num, count = 0, 36
    while count==36:
        # Prepare url:
        url = 'https://myanimelist.net/clubs.php?action=view&t=members&id={0}&show={1}'.format(cid,str(page_num*36))
        try:        
            page = urllib2.urlopen(url)
            soup = BeautifulSoup(page)
            # Extract all links:
            all_links = soup.find_all('a',href=True)
            count = 0
            for link in all_links:
                if 'profile' in link['href']:
                    if len(link.text)>0:
                        # These are the users
                        club_users.append(link.text)
                        count+=1
            # Get the next page number and rest:
            page_num +=1
            time.sleep(0.5+abs(np.random.randn(1)))
            if int(page_num)%10==9:
                print('Moving to page ' + str(page_num))
        except urllib2.HTTPError:
            count=0
    return club_users
utils.py 文件源码 项目:testsuite-parallelization 作者: jeandersonbc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __request_handler(callee):
        try:
            return callee()
        except HTTPError as e:
            if e.code == 403:
                print("Seems like I exceeded the number of requests per minute...")
                print("I'm gonna sleep for 1 min and try again later...")
                time.sleep(60)
                return callee()
setup.py 文件源码 项目:iterfzf 作者: dahlia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_fzf_release(access_token=None):
    filename = 'fzf-{0}-release.json'.format(fzf_version)
    filepath = os.path.join(os.path.dirname(__file__), filename)
    try:
        with open(filepath) as f:
            d = f.read()
    except IOError:
        url = release_url
        if access_token:
            url = '{0}?access_token={1}'.format(url, access_token)
        try:
            r = urllib2.urlopen(url)
        except urllib2.HTTPError as e:
            if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0:
                raise RuntimeError(
                    'GitHub rate limit reached. To increate the limit use '
                    '-g/--github-access-token option.\n  ' + str(e)
                )
            elif e.code == 401 and access_token:
                raise RuntimeError('Invalid GitHub access token.')
            raise
        d = r.read()
        r.close()
        mode = 'w' + ('b' if isinstance(d, bytes) else '')
        try:
            with open(filename, mode) as f:
                f.write(d)
        except IOError:
            pass
    try:
        return json.loads(d)
    except TypeError:
        return json.loads(d.decode('utf-8'))
dnsdb_query.py 文件源码 项目:misp-modules 作者: MISP 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _query(self, path, before=None, after=None):
        res = []
        url = '%s/lookup/%s' % (self.server, path)

        params = {}
        if self.limit:
            params['limit'] = self.limit
        if before and after:
            params['time_first_after'] = after
            params['time_last_before'] = before
        else:
            if before:
                params['time_first_before'] = before
            if after:
                params['time_last_after'] = after
        if params:
            url += '?{0}'.format(urlencode(params))

        req = Request(url)
        req.add_header('Accept', 'application/json')
        req.add_header('X-Api-Key', self.apikey)

        proxy_args = {}
        if self.http_proxy:
            proxy_args['http'] = self.http_proxy
        if self.https_proxy:
            proxy_args['https'] = self.https_proxy
        proxy_handler = ProxyHandler(proxy_args)
        opener = build_opener(proxy_handler)

        try:
            http = opener.open(req)
            while True:
                line = http.readline()
                if not line:
                    break
                yield json.loads(line.decode('ascii'))
        except (HTTPError, URLError) as e:
            raise QueryError(str(e), sys.exc_traceback)
controller.py 文件源码 项目:boltkit 作者: neo4j-contrib 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _install(edition, version, path, **kwargs):
    controller = _create_controller()
    try:
        home = controller.install(edition, version.strip(), path, **kwargs)
    except HTTPError as error:
        if error.code == 401:
            raise RuntimeError("Missing or incorrect authorization")
        elif error.code == 403:
            raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url)
        else:
            raise
    else:
        return home
emails.py 文件源码 项目:django-sendgrid-parse 作者: grvty-labs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def send(self):
        sg = sendgrid.SendGridAPIClient(
            apikey=settings.DJANGO_SENDGRID_PARSE_API)

        data = {
          'personalizations': [
            {
              'to': [
                {
                  'email': mail
                } for mail in self.to
              ],
              'substitutions': self.body,
              'subject': self.subject,
            }
          ],
          'from': {
            'email': self.from_email
          },
          'template_id': self.template_id,
        }

        try:
            response = sg.client.mail.send.post(request_body=data)
        except urllib.HTTPError as e:
            print(e.read())
send.py 文件源码 项目:wuye.vim 作者: zhaoyingnan911 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def markdownRefresh():
    PORT = vim.eval('g:mkdp_port')
    curBuf = vim.current.buffer
    bufnr  = curBuf.number
    pbufnr = prefix + str(bufnr)
    lineNum = vim.current.window.cursor[0] - 1
    encoding = vim.eval('&encoding').upper()

    if PY_VERSOIN == '2':
        lines = NEW_LINE.join(curBuf).decode(encoding).split(U_NEW_LINE)
    else:
        lines = NEW_LINE.join(curBuf).split(U_NEW_LINE)

    curLine = lines[lineNum]
    if tag.search(curLine) != None:
        curLine = tag.sub(u'\\1 ' + flagSign, curLine, 1)
    else:
        curLine = B.sub(flagSign, curLine, 1)
    lines[lineNum] = curLine
    data = U_NEW_LINE.join(lines).encode('utf-8')
    req = urllib2.Request(URL % (PORT, pbufnr), data = data)
    req.get_method = lambda: "PUT"
    try:
        urllib2.urlopen(req)
    except urllib2.HTTPError as e:
        if e.code == 406:
            vim.command('call remove(g:mkdp_bufs, %s)' % bufnr)
liveserver_fixtures.py 文件源码 项目:batch-scoring 作者: datarobot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def stop(self):
        """Stop application process."""
        if self._thread:
            try:
                urlopen(self.url('/shutdown'))
            except HTTPError:
                pass  # 500 server closed
            self._thread.join()
        self._thread = None
client.py 文件源码 项目:qgis_resources_sharing 作者: akbargumbira 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _http_request(self, url, headers={}, data=None):
        req = urllib2.Request(url, headers=headers, data=data)
        try:
            resp = self.opener.open(req)
        except urllib2.HTTPError as e:
            if e.code == 404:
                raise NotGitRepository()
            if e.code != 200:
                raise GitProtocolError("unexpected http response %d" % e.code)
        return resp


问题


面经


文章

微信
公众号

扫码关注公众号