def getpage(self, pagenum):
try:
url = self.baseurl + self.seeLZ + '&pn=' + str(pagenum)
request = urllib2.Request(url)
response = urllib2.urlopen(request)
page = BeautifulSoup(response, "html5lib")
return page
except urllib2.URLError, e:
if hasattr(e, 'reason'):
print u"?????????????", e.reason
return None
python类URLError()的实例源码
def sub_app_get(self, offset):
count = 0
while True:
try:
f = urllib2.urlopen('http://localhost:5050%s' % offset)
except urllib2.URLError, e:
if hasattr(e, 'reason') and type(e.reason) == urllib2.socket.error:
# i.e. process not started up yet
count += 1
time.sleep(1)
assert count < 5, '%s: %r; %r' % (offset, e, e.args)
else:
print 'Error opening url: %s' % offset
assert 0, e # Print exception
else:
break
return f.read()
def _get_content(self, url):
http_request = urllib2.Request(url=url)
api_key = self.config.get('api_key')
if api_key:
http_request.add_header('Authorization', api_key)
try:
http_response = urllib2.urlopen(http_request)
except urllib2.HTTPError, e:
if e.getcode() == 404:
raise ContentNotFoundError('HTTP error: %s' % e.code)
else:
raise ContentFetchError('HTTP error: %s' % e.code)
except urllib2.URLError, e:
raise ContentFetchError('URL error: %s' % e.reason)
except httplib.HTTPException, e:
raise ContentFetchError('HTTP Exception: %s' % e)
except socket.error, e:
raise ContentFetchError('HTTP socket error: %s' % e)
except Exception, e:
raise ContentFetchError('HTTP general exception: %s' % e)
return http_response.read()
def basic_auth(server="http://127.0.0.1"):
"""
to use basic login with a different server
from gluon.contrib.login_methods.basic_auth import basic_auth
auth.settings.login_methods.append(basic_auth('http://server'))
"""
def basic_login_aux(username,
password,
server=server):
key = base64.b64encode(username + ':' + password)
headers = {'Authorization': 'Basic ' + key}
request = urllib2.Request(server, None, headers)
try:
urllib2.urlopen(request)
return True
except (urllib2.URLError, urllib2.HTTPError):
return False
return basic_login_aux
def browse_amazon_url(self):
'''Opens Amazon page for current book's ASIN using user's local store'''
# Try to use the nearest Amazon store to the user.
# If this fails we'll default to .com, the user will have to manually
# edit the preferences file to fix it (it is a simple text file).
if not prefs['tld']:
import json
from collections import defaultdict
from urllib2 import urlopen, URLError
try:
country = json.loads(urlopen('http://ipinfo.io/json').read())['country']
except (URLError, KeyError):
country = 'unknown'
country_tld = defaultdict(lambda: 'com', {'AU': 'com.au', 'BR': 'com.br', 'CA': 'ca', 'CN': 'cn', 'FR': 'fr',
'DE': 'de', 'IN': 'in', 'IT': 'it', 'JP': 'co.jp', 'MX': 'com.mx',
'NL': 'nl', 'ES': 'es', 'GB': 'co.uk', 'US': 'com'})
prefs['tld'] = country_tld[country]
webbrowser.open('https://www.amazon.{0}/gp/product/{1}/'.format(prefs['tld'], self._asin_edit.text()))
def check_for_update():
if os.path.exists(FILE_UPDATE):
mtime = os.path.getmtime(FILE_UPDATE)
last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
today = datetime.utcnow().strftime('%Y-%m-%d')
if last == today:
return
try:
with open(FILE_UPDATE, 'a'):
os.utime(FILE_UPDATE, None)
request = urllib2.Request(
CORE_VERSION_URL,
urllib.urlencode({'version': __version__}),
)
response = urllib2.urlopen(request)
with open(FILE_UPDATE, 'w') as update_json:
update_json.write(response.read())
except (urllib2.HTTPError, urllib2.URLError):
pass
def http_download(url, target_path):
"""Download file to local
Args:
- url(string): url request path
- target_path(string): download destination
Raises:
IOError
urllib2.URLError
"""
try:
resp = urllib2.urlopen(url)
except urllib2.URLError, e:
if not hasattr(e, 'code'):
raise
resp = e
if resp.code != 200:
raise IOError("Request url(%s) expect 200 but got %d" %(url, resp.code))
with open(target_path, 'wb') as f:
shutil.copyfileobj(resp, f)
return target_path
def test_create_session_invalid_form(self, mock_validate_url_form):
mock_validate_url_form.side_effect = [False]
base_URL = "invalidForm.com/"
with self.assertRaises(URLError) as err:
API.apiCalls.ApiCalls(
client_id="",
client_secret="",
base_URL=base_URL,
username="",
password=""
)
self.assertTrue("not a valid URL" in str(err.exception))
mock_validate_url_form.assert_called_with(base_URL)
def create_session(self):
"""
create session to be re-used until expiry for get and post calls
returns session (OAuth2Session object)
"""
if self.base_URL[-1:] != "/":
self.base_URL = self.base_URL + "/"
if validate_URL_form(self.base_URL):
oauth_service = self.get_oauth_service()
access_token = self.get_access_token(oauth_service)
self._session = oauth_service.get_session(access_token)
if self.validate_URL_existence(self.base_URL, use_session=True) is\
False:
raise Exception("Cannot create session. " +
"Verify your credentials are correct.")
else:
raise URLError(self.base_URL + " is not a valid URL")
def linksExtractor(url, fileFormat='png'):
tag = 'a'
attr = 'href'
if (fileFormat in ['png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']):
tag = 'img'
attr = 'src'
try:
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'}
req=urllib2.Request(url, None, headers)
htmlDoc=urllib2.urlopen(req).read()
except urllib2.HTTPError as err:
print("Server Response : " + str(err.code()))
return "Server refused to connect!"
except urllib2.URLError:
return 'Invalid URL!'
page = BeautifulSoup(htmlDoc, 'html.parser')
page.prettify()
res = []
for link in page.find_all(tag):
pre = link.get(attr)
pre = str(pre)
if (pre[-len(fileFormat):] == fileFormat):
res.append(pre)
else:
pass
if (len(res) < 1):
return 'EMPTY'
return res
def update_plex():
Logger.info("plex - sending request to update Plex")
url = 'http://%s/library/sections/all/refresh?X-Plex-Token=%s' % (PLEX_IP, PLEX_TOKEN)
try:
urllib2.urlopen(url).read()
except urllib2.HTTPError, e:
Logger.warning("plex - unable to make request to Plex - HTTP Error %s", str(e.code))
except urllib2.URLError, e:
Logger.warning("plex - unable to make request to Plex - URL Error %s", e.reason)
else:
Logger.info("plex - update successful")
def update_plex():
Logger.info("plex - sending request to update Plex")
url = 'http://%s/library/sections/all/refresh?X-Plex-Token=%s' % (PLEX_IP, PLEX_TOKEN)
try:
urllib2.urlopen(url).read()
except urllib2.HTTPError, e:
Logger.warning("plex - unable to make request to Plex - HTTP Error %s", str(e.code))
except urllib2.URLError, e:
Logger.warning("plex - unable to make request to Plex - URL Error %s", e.reason)
else:
Logger.info("plex - update successful")
def goglib_get_banner(game_name, icon_path, banner_path):
req = urllib2.Request('https://www.gog.com/game/' + game_name)
try:
game_page = urllib2.urlopen(req)
game_page_content = game_page.read()
soup = BeautifulSoup(game_page_content, 'lxml')
raw_data = soup.findAll(attrs={'name':'og:image'})
banner_url = raw_data[0]['content'].encode('utf-8')
if banner_url.startswith('http'):
banner_req = urllib2.Request(banner_url)
else:
banner_req = urllib2.Request('https:' + banner_url)
banner_data = urllib2.urlopen(banner_req).read()
banner_file = open(banner_path + '/' + game_name + '.jpg', 'wb')
banner_file.write(banner_data)
banner_file.close()
pic_src = Image.open(banner_path + '/' + game_name + '.jpg')
scale_lvl = 240/float(pic_src.size[1])
scaled_width = int(float(pic_src.size[0])*scale_lvl)
pic = pic_src.resize((scaled_width, 240), PIL.Image.ANTIALIAS)
pic.save(banner_path + '/' + game_name + '.jpg')
#~ if banner_url.startswith('http'):
#~ goglib_recreate_banner.goglib_recreate_banner(game_name, icon_path, banner_path)
new_pic = Image.open(banner_path + '/' + game_name + '.jpg')
pic_grey = new_pic.convert('L')
pic_grey.save(banner_path + '/unavailable/' + game_name + '.jpg')
except urllib2.URLError as e:
print e.reason
except urllib2.HTTPError as e:
print e.code
print e.read()
def get_banner(game_name, url, banner_path, lib):
banner_req = urllib2.Request(url)
try:
if not os.path.exists(banner_path):
os.makedirs(banner_path)
banner_data = urllib2.urlopen(banner_req).read()
banner_file = open(banner_path + '/' + game_name + '.jpg', 'wb')
banner_file.write(banner_data)
banner_file.close()
pic_src = Image.open(banner_path + '/' + game_name + '.jpg')
pic = pic_src.resize((518, 240), PIL.Image.ANTIALIAS)
pic.save(banner_path + '/' + game_name + '.jpg')
if lib == 'goglib':
if not os.path.exists(banner_path + '/unavailable/'):
os.makedirs(banner_path + '/unavailable/')
new_pic = Image.open(banner_path + '/' + game_name + '.jpg')
pic_grey = new_pic.convert('L')
pic_grey.save(banner_path + '/unavailable/' + game_name + '.jpg')
except urllib2.URLError as e:
print e.reason
except urllib2.HTTPError as e:
print e.code
print e.read()
def httpcall(url):
useragent_list()
referer_list()
code=0
if url.count("?")>0:
param_joiner="&"
else:
param_joiner="?"
request = urllib2.Request(url + param_joiner + buildblock(random.randint(3,10)) + '=' + buildblock(random.randint(3,10)))
request.add_header('User-Agent', random.choice(headers_useragents))
request.add_header('Cache-Control', 'no-cache')
request.add_header('Accept-Charset', 'ISO-8859-1,utf-8;q=0.7,*;q=0.7')
request.add_header('Referer', random.choice(headers_referers) + buildblock(random.randint(5,10)))
request.add_header('Keep-Alive', random.randint(110,120))
request.add_header('Connection', 'keep-alive')
request.add_header('Host',host)
try:
urllib2.urlopen(request)
except urllib2.HTTPError, e:
#print e.code
set_flag(1)
print 'Response Code 500'
code=500
except urllib2.URLError, e:
#print e.reason
sys.exit()
else:
inc_counter()
urllib2.urlopen(request)
return(code)
#http caller thread
def __http_get(self, url, timeout = 3):
try:
r = urllib2.urlopen(url, timeout = timeout)
return json.load(r)
except urllib2.URLError as e:
self.__logger.error("Error fetching %s - %s", url, e.reason)
def get_real_url(url, loaded_urls):
real_url = None
response = None
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"})
response = urlopen(req)
real_url = response.geturl()
print 'Real_url is: ' + str(real_url)
if real_url in loaded_urls:
print 'URL had been downloaded in previous '
real_url = None
except IOError as e: #If there is any IOError
print("IOError on url "+str(url))
print e
except HTTPError as e: #If there is any HTTPError
print("HTTPError on url "+str(url))
print e
except URLError as e:
print("URLError on url "+str(url))
print e
if response:
response.close()
return real_url
def download_image(url, save_dir, loaded_urls=None):
real_url = None
response = None
save_image_name = None
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"})
response = urlopen(req)
real_url = response.geturl()
if loaded_urls and real_url in loaded_urls:
print 'URL had been downloaded in previous searching'
real_url = None
else:
img_name = hashlib.md5(real_url).hexdigest()
save_image_name = save_dir + '/' + img_name + '.' + CONFIGS[u'search_file_type']
print 'Try to save image ' + real_url + ' into file: ' + save_image_name
output_file = open(save_image_name,'wb')
data = response.read()
output_file.write(data)
#response.close()
except IOError as e: #If there is any IOError
print("IOError on url "+str(url))
print e
except HTTPError as e: #If there is any HTTPError
print("HTTPError on url "+str(url))
print e
except URLError as e:
print("URLError on url "+str(url))
print e
if response:
response.close()
return real_url, save_image_name
############## End of Functions to get real urls and download images ############
############## Main Program ############
def test_download_failed_URLError(self, mock_urlopen):
mock_urlopen.side_effect = URLError(None)
fake_request = urllib2.Request('http://fakeurl.com')
self.assertRaises(
self.glance.RetryableError,
self.glance._download_tarball_and_verify,
fake_request, 'fake_staging_path')