def get_category(url):
try:
html = request.urlopen("http://b.hatena.ne.jp/entry/{}".format(url))
soup = BeautifulSoup(html,"lxml")
return soup.find("html").get("data-category-name")
except request.HTTPError as e:
print(e.reason)
except request.URLError as e:
print(e.reason)
#??????????????????
python类HTTPError()的实例源码
def is_hatenatop(url):
try:
html = request.urlopen("http://hatenablog.com/")
except urllib.HTTPError as e:
print(e.reason)
except urllib.URLError as e:
print(e.reason)
soup = BeautifulSoup(html,"lxml")
a = soup.find("a",href=url)
if a is None:
return False
return url == a.get("href")
def get_access_token(self, code, state=None):
'''
In callback url: http://host/callback?code=123&state=xyz
use code and state to get an access token.
'''
kw = dict(
client_id=self._client_id,
client_secret=self._client_secret,
code=code)
if self._redirect_uri:
kw['redirect_uri'] = self._redirect_uri
if state:
kw['state'] = state
opener = build_opener(HTTPSHandler)
request = Request(
'https://github.com/login/oauth/access_token',
data=_encode_params(kw))
request.get_method = _METHOD_MAP['POST']
request.add_header('Accept', 'application/json')
try:
response = opener.open(request, timeout=TIMEOUT)
r = _parse_json(response.read())
if 'error' in r:
raise ApiAuthError(str(r.error))
return str(r.access_token)
except HTTPError as e:
raise ApiAuthError('HTTPError when get access token')
def _http(self, _method, _path, **kw):
data = None
params = None
if _method == 'GET' and kw:
_path = '%s?%s' % (_path, _encode_params(kw))
if _method in ['POST', 'PATCH', 'PUT']:
data = bytes(_encode_json(kw), 'utf-8')
url = '%s%s' % (_URL, _path)
opener = build_opener(HTTPSHandler)
request = Request(url, data=data)
request.get_method = _METHOD_MAP[_method]
if self._authorization:
request.add_header('Authorization', self._authorization)
if _method in ['POST', 'PATCH', 'PUT']:
request.add_header('Content-Type',
'application/x-www-form-urlencoded')
class Resp():
code = None
resp = Resp()
req = None
try:
response = opener.open(request, timeout=TIMEOUT)
is_json = self._process_resp(response.headers)
if is_json:
return _parse_json(response.read().decode('utf-8'))
except HTTPError as e:
is_json = self._process_resp(e.headers)
if is_json:
json = _parse_json(e.read().decode('utf-8'))
else:
json = e.read().decode('utf-8')
req = JsonObject(method=_method, url=url)
resp = JsonObject(code=e.code, json=json)
finally:
if resp.code == 404:
raise ApiNotFoundError(url, req, resp)
elif req:
raise ApiError(url, req, resp)
def __get_profile_first_posts(self):
url = self.__profile_fp_url.format(self.username)
try:
data = json.loads(self.__process_url(url))
except simple_browser.HTTPError:
raise ValueError('User not found.')
self.user['un'] = self.username
self.user['id'] = data['user']['id']
self.user['fn'] = data['user']['full_name']
self.user['b'] = data['user']['biography']
self.user['pic'] = data['user']['profile_pic_url_hd']
self.user['iv'] = data['user']['is_verified']
self.user['ip'] = data['user']['is_private']
self.user[COUNTERS_KEY] = {
COUNT_KEY_FOLLOWING: data['user']['follows']['count'],
COUNT_KEY_FOLLOWED_BY: data['user']['followed_by']['count'],
COUNT_KEY_POSTS: data['user']['media']['count'],
COUNT_KEY_IMAGE_POSTS: 0,
COUNT_KEY_VIDEO_POSTS: 0,
COUNT_KEY_ALBUM_POSTS: 0,
COUNT_KEY_LIKES: 0,
COUNT_KEY_COMMENTS: 0,
COUNT_KEY_VIDEO_VIEWS: 0,
COUNT_KEY_LIKES_PER_POST: 0,
COUNT_KEY_COMMENTS_PER_POST: 0,
COUNT_KEY_VIEWS_PER_POST: 0,
}
self.__send_success_callback('account', self.user)
if not self.user['ip'] and self.user[COUNTERS_KEY][COUNT_KEY_POSTS]:
self.__process_posts_first(data['user']['media']['nodes'])
self.__tmp_req_info = data['user']['media']['page_info']
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def catch_request(request):
"""Helper function to catch common exceptions encountered when
establishing a connection with a HTTP/HTTPS request
"""
try:
uh = urlopen(request)
return uh, False
except (HTTPError, URLError, socket.error):
e = sys.exc_info()[1]
return None, e
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
headers = {'User-Agent': user_agent}
start = timeit.default_timer()
h.request("GET", urlparts[2], headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def get_access_token(self, renew=False):
""" Get an access token using your app_id and app_secret.
You shouldn't need to call this method yourself. If there is no access token yet, this method
will be called when a request is made. If a token expires, this method will also automatically
be called to renew the token.
Args:
renew: if True, then force the client to get a new token (even if not expired). By default if
there is already an access token in the client then this method is a no-op.
"""
if self.access_token is None or renew:
headers = {} # don't use json here, juse urlencode.
url = self._url_for_op('token')
data = urlencode({'grant_type': 'client_credentials',
'client_id':self.CLIENT_ID,
'client_secret':self.CLIENT_SECRET})
data = bytearray(data, 'utf-8')
req = urllib2.Request(url, data, headers)
try:
response = urllib2.urlopen(req).read()
response = self._parse_response(response)
except urllib2.HTTPError as e:
raise ApiError(e.reason)
except Exception as e:
raise ApiError(e)
self.access_token = response['access_token']
return self.access_token
def urlopen(self, url): # pylint:disable=no-self-use
""" Open a URI; return urllib.request.Request object """
if download_npo.verbose:
msg('urlopen: ' + url)
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
'Cookie': 'npo_cc=tmp; npo_cc_www.npogeschiedenis.nl=tmp',
}
req = urllib2.Request(url, headers=headers)
try:
return urllib2.urlopen(req)
except urllib2.HTTPError:
raise download_npo.Error(
'De URL {} is niet gevonden (404 error)'.format(url))
def crawl_club_users(cid):
''' Crawl through and get all user ids in a specific club'''
club_users=[]
page_num, count = 0, 36
while count==36:
# Prepare url:
url = 'https://myanimelist.net/clubs.php?action=view&t=members&id={0}&show={1}'.format(cid,str(page_num*36))
try:
page = urllib2.urlopen(url)
soup = BeautifulSoup(page)
# Extract all links:
all_links = soup.find_all('a',href=True)
count = 0
for link in all_links:
if 'profile' in link['href']:
if len(link.text)>0:
# These are the users
club_users.append(link.text)
count+=1
# Get the next page number and rest:
page_num +=1
time.sleep(0.5+abs(np.random.randn(1)))
if int(page_num)%10==9:
print('Moving to page ' + str(page_num))
except urllib2.HTTPError:
count=0
return club_users
def __request_handler(callee):
try:
return callee()
except HTTPError as e:
if e.code == 403:
print("Seems like I exceeded the number of requests per minute...")
print("I'm gonna sleep for 1 min and try again later...")
time.sleep(60)
return callee()
def get_fzf_release(access_token=None):
filename = 'fzf-{0}-release.json'.format(fzf_version)
filepath = os.path.join(os.path.dirname(__file__), filename)
try:
with open(filepath) as f:
d = f.read()
except IOError:
url = release_url
if access_token:
url = '{0}?access_token={1}'.format(url, access_token)
try:
r = urllib2.urlopen(url)
except urllib2.HTTPError as e:
if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0:
raise RuntimeError(
'GitHub rate limit reached. To increate the limit use '
'-g/--github-access-token option.\n ' + str(e)
)
elif e.code == 401 and access_token:
raise RuntimeError('Invalid GitHub access token.')
raise
d = r.read()
r.close()
mode = 'w' + ('b' if isinstance(d, bytes) else '')
try:
with open(filename, mode) as f:
f.write(d)
except IOError:
pass
try:
return json.loads(d)
except TypeError:
return json.loads(d.decode('utf-8'))
def _query(self, path, before=None, after=None):
res = []
url = '%s/lookup/%s' % (self.server, path)
params = {}
if self.limit:
params['limit'] = self.limit
if before and after:
params['time_first_after'] = after
params['time_last_before'] = before
else:
if before:
params['time_first_before'] = before
if after:
params['time_last_after'] = after
if params:
url += '?{0}'.format(urlencode(params))
req = Request(url)
req.add_header('Accept', 'application/json')
req.add_header('X-Api-Key', self.apikey)
proxy_args = {}
if self.http_proxy:
proxy_args['http'] = self.http_proxy
if self.https_proxy:
proxy_args['https'] = self.https_proxy
proxy_handler = ProxyHandler(proxy_args)
opener = build_opener(proxy_handler)
try:
http = opener.open(req)
while True:
line = http.readline()
if not line:
break
yield json.loads(line.decode('ascii'))
except (HTTPError, URLError) as e:
raise QueryError(str(e), sys.exc_traceback)
def _install(edition, version, path, **kwargs):
controller = _create_controller()
try:
home = controller.install(edition, version.strip(), path, **kwargs)
except HTTPError as error:
if error.code == 401:
raise RuntimeError("Missing or incorrect authorization")
elif error.code == 403:
raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url)
else:
raise
else:
return home
def send(self):
sg = sendgrid.SendGridAPIClient(
apikey=settings.DJANGO_SENDGRID_PARSE_API)
data = {
'personalizations': [
{
'to': [
{
'email': mail
} for mail in self.to
],
'substitutions': self.body,
'subject': self.subject,
}
],
'from': {
'email': self.from_email
},
'template_id': self.template_id,
}
try:
response = sg.client.mail.send.post(request_body=data)
except urllib.HTTPError as e:
print(e.read())
def markdownRefresh():
PORT = vim.eval('g:mkdp_port')
curBuf = vim.current.buffer
bufnr = curBuf.number
pbufnr = prefix + str(bufnr)
lineNum = vim.current.window.cursor[0] - 1
encoding = vim.eval('&encoding').upper()
if PY_VERSOIN == '2':
lines = NEW_LINE.join(curBuf).decode(encoding).split(U_NEW_LINE)
else:
lines = NEW_LINE.join(curBuf).split(U_NEW_LINE)
curLine = lines[lineNum]
if tag.search(curLine) != None:
curLine = tag.sub(u'\\1 ' + flagSign, curLine, 1)
else:
curLine = B.sub(flagSign, curLine, 1)
lines[lineNum] = curLine
data = U_NEW_LINE.join(lines).encode('utf-8')
req = urllib2.Request(URL % (PORT, pbufnr), data = data)
req.get_method = lambda: "PUT"
try:
urllib2.urlopen(req)
except urllib2.HTTPError as e:
if e.code == 406:
vim.command('call remove(g:mkdp_bufs, %s)' % bufnr)
def stop(self):
"""Stop application process."""
if self._thread:
try:
urlopen(self.url('/shutdown'))
except HTTPError:
pass # 500 server closed
self._thread.join()
self._thread = None
def _http_request(self, url, headers={}, data=None):
req = urllib2.Request(url, headers=headers, data=data)
try:
resp = self.opener.open(req)
except urllib2.HTTPError as e:
if e.code == 404:
raise NotGitRepository()
if e.code != 200:
raise GitProtocolError("unexpected http response %d" % e.code)
return resp