def _try_send(self, ip, port, body, header, data):
try:
request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port))
request.add_header('Content-type', 'text/xml; charset="utf-8"')
request.add_header('SOAPACTION', header)
request_body = '<?xml version="1.0" encoding="utf-8"?>'
request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
request_body += '<s:Body>%s</s:Body></s:Envelope>' % body
request.data = request_body.encode()
result = urllib.request.urlopen(request, timeout=3)
return self._extract(result.read().decode(), data)
except Exception as e:
# except:
# raise
print(str(e))
return None
python类add_header()的实例源码
def getWebPage(url, headers, cookies, postData=None):
try:
if (postData):
params = urllib.parse.urlencode(postData)
params = params.encode('utf-8')
request = urllib.request.Request(url, data=params, headers=headers)
else:
print('Fetching '+url)
request = urllib.request.Request(url, None, headers)
request.add_header('Cookie', cookies)
if (postData):
response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
else:
response = urllib.request.urlopen(request)
if response.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(response.read())
f = gzip.GzipFile(fileobj=buf)
r = f.read()
else:
r = response.read()
return r
except Exception as e:
print("Error processing webpage: "+str(e))
return None
## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def play(self, txt):
encText = urllib.parse.quote(txt)
data = "speaker=" + self.speaker + "&speed=" + self.speed + "&text=" + encText;
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
response = urllib.request.urlopen(request, data=data.encode('utf-8'))
rescode = response.getcode()
if(rescode==200):
response_body = response.read()
with open(tmpPlayPath, 'wb') as f:
f.write(response_body)
#?? ???? ?? vlc
os.system('cvlc ' + tmpPlayPath + ' --play-and-exit')
#??????
#os.system('omxplayer ' + tmpPlayPath)
else:
print("Error Code:" + rescode)
def authenticate_with_apikey(self, api_key, scope=None):
"""perform authentication by api key and store result for execute_request method
api_key -- secret api key from account settings
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "client_credentials",
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None):
"""perform authentication by contact credentials and store result for execute_request method
username -- typically a contact email
password -- contact password
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "password",
"username": username,
"password": password,
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def downloadUrls(self, urls):
url_data = {}
for u in urls:
url = self.base_url + u
request = urllib.request.Request(url)
# the .htaccess file checks for the header, and if it exists returns unprocessed data.
request.add_header('User-agent', 'our-web-crawler')
try:
response = urllib.request.urlopen(request)
data = response.read()
except urllib.request.HTTPError:
log (url)
raise
except urllib.request.URLError:
log (url)
raise
yield (u,data)
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
sitemap_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
link_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
id_iteration_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent)
try:
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# recursively retry 5xx HTTP errors
return download(url, num_retries - 1)
return html
def put(self, location, params=None):
"""Dispatch a PUT request to a SeaMicro chassis.
The seamicro box has order-dependent HTTP parameters, so we build
our own get URL, and use a list vs. a dict for data, as the order is
implicit.
"""
opener = urllib.request.build_opener(urllib.request.HTTPHandler)
url = self.build_url(location, params)
request = urllib.request.Request(url)
request.get_method = lambda: 'PUT'
request.add_header('content-type', 'text/json')
response = opener.open(request)
json_data = self.parse_response(url, response)
return json_data['result']
def send(url, headers, timeout):
request = urllib.request.Request(url)
data = ""
# Add password key
request.add_header(_gs["smplshll_main_password_var"], _gs["smplshll_main_password"])
for header in headers.keys():
request.add_header(header, Utility.crypt(_gs["smplshll_input_password"], headers[header]))
if timeout > 0:
data = urllib.request.urlopen(request, timeout = timeout).read()
else:
data = urllib.request.urlopen(request).read()
return Utility.crypt(_gs["smplshll_input_password"], data, False) if _gs["smplshll_response_encryption"] else data
def build_http_request_obj(self, request_body):
request = urllib.request.Request(self.url)
request.add_header("Content-Type", "application/json")
request.add_header("User-Agent", "gemstone-client")
request.data = json.dumps(request_body).encode()
request.method = "POST"
return request
def post(self, path, headers, body):
uri = "%s/%s" % (self.url.geturl(), path)
if self.debug: print("post: uri=%s, headers=%s" % (uri, headers))
request = urllib2.Request(uri)
for header in headers:
request.add_header(header, headers[header])
request.add_data(body)
resp = urllib2.urlopen(request, cafile=self.cafile)
if self.debug: print("resp: %s" % resp.info())
return resp
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
request = urllib.request.Request(url)
request.add_header('User-Agent', agent)
if etag:
request.add_header('If-None-Match', etag)
if isinstance(modified, str):
modified = _parse_date(modified)
elif isinstance(modified, datetime.datetime):
modified = modified.utctimetuple()
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
if referrer:
request.add_header('Referer', referrer)
if gzip and zlib:
request.add_header('Accept-encoding', 'gzip, deflate')
elif gzip:
request.add_header('Accept-encoding', 'gzip')
elif zlib:
request.add_header('Accept-encoding', 'deflate')
else:
request.add_header('Accept-encoding', '')
if auth:
request.add_header('Authorization', 'Basic %s' % auth)
if ACCEPT_HEADER:
request.add_header('Accept', ACCEPT_HEADER)
# use this for whatever -- cookies, special headers, etc
# [('Cookie','Something'),('x-special-header','Another Value')]
for header_name, header_value in list(request_headers.items()):
request.add_header(header_name, header_value)
request.add_header('A-IM', 'feed') # RFC 3229 support
return request
def open_url(url):
request = urllib.request.Request(url)
request.add_header('User-Agent',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.3357.400 QQBrowser/9.6.11858.400')
response = urllib.request.urlopen(request)
return response.read()
# download image
def fetch_page(query):
url = REQUEST_URL.format(BASE_URL, query)
request = urllib.request.Request(url)
request.add_header('User-agent', _random_user_agent())
request.add_header('connection', 'keep-alive')
request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
print(url)
response = urllib.request.urlopen(request)
data = response.read()
print(type(data))
return gzip.decompress(data)
def upload_image(self, image_uri, sync, username, userid, channel_name):
token = self.apikey
logger.info('downloading %s', image_uri)
filename = os.path.basename(image_uri)
request = urllib.request.Request(image_uri)
request.add_header("Authorization", "Bearer %s" % token)
image_response = urllib.request.urlopen(request)
content_type = image_response.info().get_content_type()
filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
physical_extension = "." + filename.rsplit(".", 1).pop().lower()
if physical_extension == filename_extension:
pass
elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
# account for mimetypes idiosyncrancy to return jpe for valid jpeg
pass
else:
logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
filename += filename_extension
logger.info('uploading as %s', filename)
image_id = yield from self.bot._client.upload_image(image_response, filename=filename)
logger.info('sending HO message, image_id: %s', image_id)
yield from sync._bridgeinstance._send_to_internal_chat(
sync.hangoutid,
"shared media from slack",
{ "sync": sync,
"source_user": username,
"source_uid": userid,
"source_title": channel_name },
image_id=image_id )
def test_custom_headers(self):
url = "http://www.example.com"
with support.transient_internet(url):
opener = urllib.request.build_opener()
request = urllib.request.Request(url)
self.assertFalse(request.header_items())
opener.open(request)
self.assertTrue(request.header_items())
self.assertTrue(request.has_header('User-agent'))
request.add_header('User-Agent','Test-Agent')
opener.open(request)
self.assertEqual(request.get_header('User-agent'),'Test-Agent')
def _get(self, url: object, api: object = None, timeout: object = None) -> object:
request = urllib.request.Request(url=url)
request.add_header('Referer', 'https://wx.qq.com/')
if api == 'webwxgetvoice':
request.add_header('Range', 'bytes=0-')
if api == 'webwxgetvideo':
request.add_header('Range', 'bytes=0-')
try:
response = urllib.request.urlopen(request, timeout=timeout) if timeout else urllib.request.urlopen(request)
if api == 'webwxgetvoice' or api == 'webwxgetvideo':
data = response.read()
else:
data = response.read().decode('utf-8')
logging.debug(url)
return data
except urllib.error.HTTPError as e:
logging.error('HTTPError = ' + str(e.code))
except urllib.error.URLError as e:
logging.error('URLError = ' + str(e.reason))
except http.client.HTTPException as e:
logging.error('HTTPException')
except timeout_error as e:
pass
except ssl.CertificateError as e:
pass
except Exception:
import traceback
logging.error('generic exception: ' + traceback.format_exc())
return ''
def _post(self, url: object, params: object, jsonfmt: object = True) -> object:
if jsonfmt:
data = (json.dumps(params)).encode()
request = urllib.request.Request(url=url, data=data)
request.add_header(
'ContentType', 'application/json; charset=UTF-8')
else:
request = urllib.request.Request(url=url, data=urllib.parse.urlencode(params).encode(encoding='utf-8'))
try:
response = urllib.request.urlopen(request)
data = response.read()
if jsonfmt:
return json.loads(data.decode('utf-8') )#object_hook=_decode_dict)
return data
except urllib.error.HTTPError as e:
logging.error('HTTPError = ' + str(e.code))
except urllib.error.URLError as e:
logging.error('URLError = ' + str(e.reason))
except http.client.HTTPException as e:
logging.error('HTTPException')
except Exception:
import traceback
logging.error('generic exception: ' + traceback.format_exc())
return ''
def test_custom_headers(self):
url = "http://www.example.com"
with support.transient_internet(url):
opener = urllib.request.build_opener()
request = urllib.request.Request(url)
self.assertFalse(request.header_items())
opener.open(request)
self.assertTrue(request.header_items())
self.assertTrue(request.has_header('User-agent'))
request.add_header('User-Agent','Test-Agent')
opener.open(request)
self.assertEqual(request.get_header('User-agent'),'Test-Agent')
def volgendeZonondergang(self):
# sunrise from domoticz... But I don't know how to retrieve it....
try:
domoticzurl = 'https://127.0.0.1:8443/json.htm?type=command¶m=getSunRiseSet'
encoding = 'utf-8'
inlog = '%s:%s' % (self.domoticzusername, self.domoticzpassword)
base64string = base64.b64encode(inlog.encode(encoding)).decode(encoding)
request = urllib.request.Request(domoticzurl)
request.add_header("Authorization", "Basic %s" % base64string)
response = urllib.request.urlopen(request)
data = response.read()
JSON_object = json.loads(data.decode(encoding))
time = JSON_object['Sunset'].split(':')
now = datetime.now()
ret = datetime(now.year, now.month, now.day, int(time[0]), int(time[1]), 0)
# when started after sunset use 'now'
now = now + timedelta(minutes = int(Parameters["Mode4"]))
if (now > ret):
ret = ret + timedelta(days = 1)
return ret
except Exception as e:
self.LogError("Error retrieving Sunset: "+ str(e))
now = datetime.now()
return datetime(now.year, now.month, now.day, 22, 0, 0)
def domoticzrequest (url):
request = urllib.request.Request(url)
request.add_header("Authorization", "Basic %s" % base64string)
response = urllib.request.urlopen(request)
return response.read().decode('utf-8')
def domoticzrequest (url):
request = urllib.request.Request(url)
request.add_header("Authorization", "Basic %s" % base64string)
response = urllib.request.urlopen(request)
return response.read().decode('utf-8')
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
request = urllib.request.Request(url)
request.add_header('User-Agent', agent)
if etag:
request.add_header('If-None-Match', etag)
if isinstance(modified, str):
modified = _parse_date(modified)
elif isinstance(modified, datetime.datetime):
modified = modified.utctimetuple()
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
if referrer:
request.add_header('Referer', referrer)
if gzip and zlib:
request.add_header('Accept-encoding', 'gzip, deflate')
elif gzip:
request.add_header('Accept-encoding', 'gzip')
elif zlib:
request.add_header('Accept-encoding', 'deflate')
else:
request.add_header('Accept-encoding', '')
if auth:
request.add_header('Authorization', 'Basic %s' % auth)
if ACCEPT_HEADER:
request.add_header('Accept', ACCEPT_HEADER)
# use this for whatever -- cookies, special headers, etc
# [('Cookie','Something'),('x-special-header','Another Value')]
for header_name, header_value in list(request_headers.items()):
request.add_header(header_name, header_value)
request.add_header('A-IM', 'feed') # RFC 3229 support
return request
def execute_request(self, api_url, api_request_object=None, method=None):
"""
perform api request and return result as an instance of ApiObject or list of ApiObjects
api_url -- absolute or relative api resource url
api_request_object -- any json serializable object to send to API
method -- HTTP method of api request. Default: GET if api_request_object is None else POST
"""
if self._token is None:
raise ApiException("Access token is not abtained. "
"Call authenticate_with_apikey or authenticate_with_contact_credentials first.")
if not api_url.startswith("http"):
api_url = self.api_endpoint + api_url
if method is None:
if api_request_object is None:
method = "GET"
else:
method = "POST"
request = urllib.request.Request(api_url, method=method)
if api_request_object is not None:
request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()
request.add_header("Content-Type", "application/json")
request.add_header("Accept", "application/json")
request.add_header("Authorization", "Bearer " + self._get_access_token())
try:
response = urllib.request.urlopen(request)
return WaApiClient._parse_response(response)
except urllib.error.HTTPError as httpErr:
if httpErr.code == 400:
raise ApiException(httpErr.read())
else:
raise
def _refresh_auth_token(self):
data = {
"grant_type": "refresh_token",
"refresh_token": self._token.refresh_token
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()