def _try_send(self, ip, port, body, header, data):
try:
request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port))
request.add_header('Content-type', 'text/xml; charset="utf-8"')
request.add_header('SOAPACTION', header)
request_body = '<?xml version="1.0" encoding="utf-8"?>'
request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
request_body += '<s:Body>%s</s:Body></s:Envelope>' % body
request.data = request_body.encode()
result = urllib.request.urlopen(request, timeout=3)
return self._extract(result.read().decode(), data)
except Exception as e:
# except:
# raise
print(str(e))
return None
python类data()的实例源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
*, cafile=None, capath=None):
global _opener
if cafile or capath:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if cafile or capath:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile, capath)
check_hostname = True
else:
check_hostname = False
https_handler = HTTPSHandler(context=context, check_hostname=check_hostname)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self._parse()
def http_error_401(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 401 -- authentication required.
This function supports Basic authentication only."""
if 'www-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['www-authenticate']
import re
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
import re
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def proxy_bypass_environment(host):
"""Test if proxies should not be used for a particular host.
Checks the environment for a variable named no_proxy, which should
be a list of DNS suffixes separated by commas, or '*' for all hosts.
"""
no_proxy = os.environ.get('no_proxy', '') or os.environ.get('NO_PROXY', '')
# '*' is special case for always bypass
if no_proxy == '*':
return 1
# strip port off host
hostonly, port = splitport(host)
# check if the host ends with any of the DNS suffixes
no_proxy_list = [proxy.strip() for proxy in no_proxy.split(',')]
for name in no_proxy_list:
if name and (hostonly.endswith(name) or host.endswith(name)):
return 1
# otherwise, don't bypass
return 0
# This code tests an OSX specific data structure but is testable on all
# platforms
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def authenticate_with_apikey(self, api_key, scope=None):
"""perform authentication by api key and store result for execute_request method
api_key -- secret api key from account settings
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "client_credentials",
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None):
"""perform authentication by contact credentials and store result for execute_request method
username -- typically a contact email
password -- contact password
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "password",
"username": username,
"password": password,
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.full_url = unwrap(url)
self.full_url, self.fragment = splittag(self.full_url)
self.data = data
self.headers = {}
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
self.method = method
self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def test_http_closed(self):
"""Test the connection is cleaned up when the response is closed"""
for (transfer, data) in (
("Connection: close", b"data"),
("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"),
("Content-Length: 4", b"data"),
):
header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer)
conn = test_urllib.fakehttp(header.encode() + data)
handler = urllib.request.AbstractHTTPHandler()
req = Request("http://dummy/")
req.timeout = None
with handler.do_open(conn, req) as resp:
resp.read()
self.assertTrue(conn.fakesock.closed,
"Connection not closed with {!r}".format(transfer))
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
*, cafile=None, capath=None, cadefault=False, context=None):
global _opener
if cafile or capath or cadefault:
if context is not None:
raise ValueError(
"You can't pass both context and any of cafile, capath, and "
"cadefault"
)
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
cafile=cafile,
capath=capath)
https_handler = HTTPSHandler(context=context)
opener = build_opener(https_handler)
elif context:
https_handler = HTTPSHandler(context=context)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False,
method=None):
self.full_url = url
self.headers = {}
self.unredirected_hdrs = {}
self._data = None
self.data = data
self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
if origin_req_host is None:
origin_req_host = request_host(self)
self.origin_req_host = origin_req_host
self.unverifiable = unverifiable
if method:
self.method = method
def build_http_request_obj(self, request_body):
request = urllib.request.Request(self.url)
request.add_header("Content-Type", "application/json")
request.add_header("User-Agent", "gemstone-client")
request.data = json.dumps(request_body).encode()
request.method = "POST"
return request