def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
request = urllib.request.Request(url)
request.add_header('User-Agent', agent)
if etag:
request.add_header('If-None-Match', etag)
if isinstance(modified, str):
modified = _parse_date(modified)
elif isinstance(modified, datetime.datetime):
modified = modified.utctimetuple()
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
if referrer:
request.add_header('Referer', referrer)
if gzip and zlib:
request.add_header('Accept-encoding', 'gzip, deflate')
elif gzip:
request.add_header('Accept-encoding', 'gzip')
elif zlib:
request.add_header('Accept-encoding', 'deflate')
else:
request.add_header('Accept-encoding', '')
if auth:
request.add_header('Authorization', 'Basic %s' % auth)
if ACCEPT_HEADER:
request.add_header('Accept', ACCEPT_HEADER)
# use this for whatever -- cookies, special headers, etc
# [('Cookie','Something'),('x-special-header','Another Value')]
for header_name, header_value in list(request_headers.items()):
request.add_header(header_name, header_value)
request.add_header('A-IM', 'feed') # RFC 3229 support
return request
python类add_header()的实例源码
def test_custom_headers(self):
url = "http://www.example.com"
with support.transient_internet(url):
opener = urllib.request.build_opener()
request = urllib.request.Request(url)
self.assertFalse(request.header_items())
opener.open(request)
self.assertTrue(request.header_items())
self.assertTrue(request.has_header('User-agent'))
request.add_header('User-Agent','Test-Agent')
opener.open(request)
self.assertEqual(request.get_header('User-agent'),'Test-Agent')
def get_raw(self, url):
# print("Raw request:", url)
request = urllib.request.Request(url)
# setup private request headers if appropriate
if self._engine.token != None:
if self._engine.name == "gitlab":
request.add_header('PRIVATE-TOKEN',self._engine.token)
else:
if self._verbose: print("Tokens not setup for engine yet")
# run the request
try:
result = urllib.request.urlopen(request)
except urllib.error.HTTPError as e:
self._error = "HTTP error"
self._error_msg = str(e.code)
self._update_ready = None
except urllib.error.URLError as e:
self._error = "URL error, check internet connection"
self._error_msg = str(e.reason)
self._update_ready = None
return None
else:
result_string = result.read()
result.close()
return result_string.decode()
# result of all api calls, decoded into json format
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
charset (str): charset if website does not include one in headers
proxy (str): proxy url, ex 'http://IP' (default: None)
num_retries (int): number of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
charset (str): charset if website does not include one in headers
proxy (str): proxy url, ex 'http://IP' (default: None)
num_retries (int): number of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def get_html(url):
request = urllib.request.Request(url)
request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
html = urllib.request.urlopen(request)
return html.read()
def get_html(url):
request = urllib.request.Request(url)
request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
html = urllib.request.urlopen(request)
return html.read()
def test_custom_headers(self):
url = "http://www.example.com"
with support.transient_internet(url):
opener = urllib.request.build_opener()
request = urllib.request.Request(url)
self.assertFalse(request.header_items())
opener.open(request)
self.assertTrue(request.header_items())
self.assertTrue(request.has_header('User-agent'))
request.add_header('User-Agent','Test-Agent')
opener.open(request)
self.assertEqual(request.get_header('User-agent'),'Test-Agent')
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
request = urllib.request.Request(url)
request.add_header('User-Agent', agent)
if etag:
request.add_header('If-None-Match', etag)
if isinstance(modified, basestring):
modified = _parse_date(modified)
elif isinstance(modified, datetime.datetime):
modified = modified.utctimetuple()
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
if referrer:
request.add_header('Referer', referrer)
if gzip and zlib:
request.add_header('Accept-encoding', 'gzip, deflate')
elif gzip:
request.add_header('Accept-encoding', 'gzip')
elif zlib:
request.add_header('Accept-encoding', 'deflate')
else:
request.add_header('Accept-encoding', '')
if auth:
request.add_header('Authorization', 'Basic %s' % auth)
if accept_header:
request.add_header('Accept', accept_header)
# use this for whatever -- cookies, special headers, etc
# [('Cookie','Something'),('x-special-header','Another Value')]
for header_name, header_value in request_headers.items():
request.add_header(header_name, header_value)
request.add_header('A-IM', 'feed') # RFC 3229 support
return request
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
request = urllib.request.Request(url)
request.add_header('User-Agent', agent)
if etag:
request.add_header('If-None-Match', etag)
if isinstance(modified, basestring):
modified = _parse_date(modified)
elif isinstance(modified, datetime.datetime):
modified = modified.utctimetuple()
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
if referrer:
request.add_header('Referer', referrer)
if gzip and zlib:
request.add_header('Accept-encoding', 'gzip, deflate')
elif gzip:
request.add_header('Accept-encoding', 'gzip')
elif zlib:
request.add_header('Accept-encoding', 'deflate')
else:
request.add_header('Accept-encoding', '')
if auth:
request.add_header('Authorization', 'Basic %s' % auth)
if accept_header:
request.add_header('Accept', accept_header)
# use this for whatever -- cookies, special headers, etc
# [('Cookie','Something'),('x-special-header','Another Value')]
for header_name, header_value in request_headers.items():
request.add_header(header_name, header_value)
request.add_header('A-IM', 'feed') # RFC 3229 support
return request
advanced_link_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
charset (str): charset if website does not include one in headers
proxy (str): proxy url, ex 'http://IP' (default: None)
num_retries (int): number of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
advanced_link_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 44
收藏 0
点赞 0
评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
""" Download a given URL and return the page content
args:
url (str): URL
kwargs:
user_agent (str): user agent (default: wswp)
charset (str): charset if website does not include one in headers
proxy (str): proxy url, ex 'http://IP' (default: None)
num_retries (int): number of retries if a 5xx error is seen (default: 2)
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def __send_request(self, method, uri, data):
url = self.__url + uri
request = urllib.request.Request(url)
if (method == 'POST'):
request.data = bytes(json.dumps(data), 'utf-8')
auth = str(
base64.b64encode(
bytes('%s:%s' % (self.user, self.password), 'utf-8')
),
'ascii'
).strip()
request.add_header('Authorization', 'Basic %s' % auth)
request.add_header('Content-Type', 'application/json')
e = None
try:
response = urllib.request.urlopen(request).read()
except urllib.error.HTTPError as ex:
response = ex.read()
e = ex
if response:
result = json.loads(response.decode())
else:
result = {}
if e != None:
if result and 'error' in result:
error = '"' + result['error'] + '"'
else:
error = 'No additional error message received'
raise APIError('TestRail API returned HTTP %s (%s)' %
(e.code, error))
return result
def __call(self, url=API_URL, params={}, data=None, headers={}):
"""Common method for API call.
url: API URL
params: query string parameters
data: POST data
headers: additional request headers
Return: parsed JSON structure or raise GooglError.
"""
params.update(key=self.key)
if self.userip is not None:
params.update(userip=self.userip)
full_url = "%s?%s" % (url % self.api, urllib.parse.urlencode(params))
request = urllib.request.Request(full_url, data=bytes(data, encoding="UTF-8"), headers=headers)
if self.referer is not None:
request.add_header("Referer", self.referer)
if self.client_login is not None:
request.add_header("Authorization", "GoogleLogin auth=%s" % self.client_login)
try:
response = urllib.request.urlopen(request)
return json.loads(str(response.read(), encoding="UTF-8"))
except urllib.error.HTTPError as e:
error = json.loads(e.fp.read())
raise GooglError(error["error"]["code"], error["error"]["message"])
def _getMealsURL():
"""Download meals information from XML feed"""
request = urllib.request.Request(mealsURL)
request.add_header("Authorization", "Basic %s" % mealsURL_authorization)
result = urllib.request.urlopen(request, timeout=__timeoutSeconds)
return result, 0
def post_api(self, api_path):
options = json.dumps(self.options)
options = options if isinstance(options, bytes) else options.encode('utf8')
request = urllib.request.Request(self.arachni_url + api_path, options)
request.add_header('Content-Type', 'application/json')
return urllib.request.urlopen(request).read().decode('utf8')
def stage_repository(self, url):
local = os.path.join(self._updater_path,"update_staging")
error = None
# make/clear the staging folder
# ensure the folder is always "clean"
if self._verbose: print("Preparing staging folder for download:\n",local)
if os.path.isdir(local) == True:
try:
shutil.rmtree(local)
os.makedirs(local)
except:
error = "failed to remove existing staging directory"
else:
try:
os.makedirs(local)
except:
error = "failed to create staging directory"
if error != None:
if self._verbose: print("Error: Aborting update, "+error)
self._error = "Update aborted, staging path error"
self._error_msg = "Error: {}".format(error)
return False
if self._backup_current==True:
self.create_backup()
if self._verbose: print("Now retrieving the new source zip")
self._source_zip = os.path.join(local,"source.zip")
if self._verbose: print("Starting download update zip")
try:
request = urllib.request.Request(url)
# setup private token if appropriate
if self._engine.token != None:
if self._engine.name == "gitlab":
request.add_header('PRIVATE-TOKEN',self._engine.token)
else:
if self._verbose: print("Tokens not setup for selected engine yet")
self.urlretrieve(urllib.request.urlopen(request), self._source_zip)
# add additional checks on file size being non-zero
if self._verbose: print("Successfully downloaded update zip")
return True
except Exception as e:
self._error = "Error retrieving download, bad link?"
self._error_msg = "Error: {}".format(e)
if self._verbose:
print("Error retrieving download, bad link?")
print("Error: {}".format(e))
return False