def broadcast_tx(self, tx):
s = io.BytesIO()
tx.stream(s)
tx_as_hex = b2h(s.getvalue())
data = urlencode(dict(tx=tx_as_hex)).encode("utf8")
URL = "http://blockchain.info/pushtx"
try:
d = urlopen(URL, data=data).read()
return d
except HTTPError as ex:
try:
d = ex.read()
ex.message = d
except:
pass
raise ex
python类HTTPError()的实例源码
def get_access_token(self, code, state=None):
'''
In callback url: http://host/callback?code=123&state=xyz
use code and state to get an access token.
'''
kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code)
if self._redirect_uri:
kw['redirect_uri'] = self._redirect_uri
if state:
kw['state'] = state
opener = build_opener(HTTPSHandler)
request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw))
request.get_method = _METHOD_MAP['POST']
request.add_header('Accept', 'application/json')
try:
response = opener.open(request, timeout=TIMEOUT)
r = _parse_json(response.read())
if 'error' in r:
raise ApiAuthError(str(r.error))
return str(r.access_token)
except HTTPError as e:
raise ApiAuthError('HTTPError when get access token')
def command(self, command, value=None):
func_str = 'GoProHero.command({}, {})'.format(command, value)
if command in self.commandMaxtrix:
args = self.commandMaxtrix[command]
# accept both None and '' for commands without a value
if value == '':
value = None
# for commands with values, translate the value
if value is not None and value in args['translate']:
value = args['translate'][value]
# build the final url
url = self._commandURL(args['cmd'], value)
# attempt to contact the camera
try:
urlopen(url, timeout=self.timeout).read()
logging.info('{} - http success!'.format(func_str))
return True
except (HTTPError, URLError, socket.timeout) as e:
logging.warning('{}{} - error opening {}: {}{}'.format(
Fore.YELLOW, func_str, url, e, Fore.RESET))
# catchall return statement
return False
def rank_checker(url,hatebu_url):
try:
html = request.urlopen(hatebu_url)
except request.HTTPError as e:
print(e.reason)
except request.URLError as e:
print(e.reason)
soup = BeautifulSoup(html,"lxml")
a = soup.find("a",href=url)
if a == None:
rank = None
else:
rank = a.get("data-entryrank")
return rank
# ????????????????????
def do_request(self, req):
if DEBUG:
print('requesting', req.get_method(), req.get_full_url())
opener = self.build_opener()
opener.add_handler(self._cookie_processor)
try:
self._response = opener.open(req)
except HTTPError as e:
self._response = e
self.url = self._response.geturl()
self.path = get_selector(Request(self.url))
self.data = self._response.read()
self.status = self._response.code
self._forms = None
self.form = None
return self.get_response()
def call_api(self):
url = 'http://prj2epsg.org/search.json?'
params = {
'mode': 'wkt',
'terms': self.prj
}
try:
req = request.urlopen(url+urlencode(params))
except request.HTTPError as http_exc:
logger.warning("""Failed to retrieve data from prj2epsg.org API:\n
Status: %s \n
Message: %s""" % (http_exc.code, http_exc.msg))
else:
raw_resp = req.read()
try:
resp = json.loads(raw_resp.decode('utf-8'))
except json.JSONDecodeError:
logger.warning('API call succeeded but response\
is not JSON: %s' % raw_resp)
self.process_api_result(resp)
def _do_post(self, url, data, headers):
try: # Python 3
if data != None:
data = bytes(data, 'utf-8')
except: # Python 2
pass
try:
req = url_request.Request(url=url, data=data, headers=headers)
req.add_header('User-Agent', KKBOXHTTP.USER_AGENT)
f = url_request.urlopen(req)
r = f.read()
except url_request.HTTPError as e:
print(e.fp.read())
raise (e)
except Exception as e:
raise (e)
try: # Python 3
r = str(r, 'utf-8')
except: # Python 2
pass
json_object = json.loads(r)
return json_object
def download_fzf_binary(plat, arch, overwrite=False, access_token=None):
bin_path = fzf_windows_bin_path if plat == 'windows' else fzf_bin_path
if overwrite or not os.path.isfile(bin_path):
asset = get_fzf_binary_url(plat, arch, access_token)
url, ext = asset
if access_token:
url = '{0}?access_token={1}'.format(url, access_token)
try:
r = urllib2.urlopen(url)
except urllib2.HTTPError as e:
if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0:
raise RuntimeError(
'GitHub rate limit reached. To increate the limit use '
'-g/--github-access-token option.\n ' + str(e)
)
elif e.code == 401 and access_token:
raise RuntimeError('Invalid GitHub access token.')
raise
extract(r, ext, bin_path)
r.close()
mode = os.stat(bin_path).st_mode
if not (mode & 0o111):
os.chmod(bin_path, mode | 0o111)
def install(self, version, core_count, read_replica_count, initial_port, password, verbose=False):
try:
package = _create_controller().download("enterprise", version, self.path, verbose=verbose)
port_gen = count(initial_port)
initial_discovery_members = self._install_cores(self.path, package, core_count, port_gen)
self._install_read_replicas(self.path, package, initial_discovery_members, read_replica_count, port_gen)
self._set_initial_password(password)
return realpath(self.path)
except HTTPError as error:
if error.code == 401:
raise RuntimeError("Missing or incorrect authorization")
elif error.code == 403:
raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url)
else:
raise
def get_location(url):
try:
response = request.urlopen(url)
# urllib will follow redirections and it's too much code to tell urllib
# not to do that
return response.geturl()
except socket.timeout:
print('request timeout')
exit()
except request.HTTPError as e:
print(e.code)
except request.URLError as e:
print(e.reason)
exit()
return "fail"
def get_location(url):
try:
response = request.urlopen(url)
# urllib will follow redirections and it's too much code to tell urllib
# not to do that
return response.geturl()
except socket.timeout:
print('request timeout')
exit()
except request.HTTPError as e:
print(e.code)
except request.URLError as e:
print(e.reason)
exit()
return "fail"
def send_data(self, event):
"""Send event to VES"""
server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
's' if self._app_config['UseHttps'] else '',
self._app_config['Domain'], int(self._app_config['Port']),
'{}'.format('/{}'.format(self._app_config['Path']) if len(
self._app_config['Path']) > 0 else ''),
int(self._app_config['ApiVersion']), '{}'.format(
'/{}'.format(self._app_config['Topic']) if len(
self._app_config['Topic']) > 0 else ''))
logging.info('Vendor Event Listener is at: {}'.format(server_url))
credentials = base64.b64encode('{}:{}'.format(
self._app_config['Username'],
self._app_config['Password']).encode()).decode()
logging.info('Authentication credentials are: {}'.format(credentials))
try:
request = url.Request(server_url)
request.add_header('Authorization', 'Basic {}'.format(credentials))
request.add_header('Content-Type', 'application/json')
event_str = json.dumps(event).encode()
logging.debug("Sending {} to {}".format(event_str, server_url))
url.urlopen(request, event_str, timeout=1)
logging.debug("Sent data to {} successfully".format(server_url))
except url.HTTPError as e:
logging.error('Vendor Event Listener exception: {}'.format(e))
except url.URLError as e:
logging.error(
'Vendor Event Listener is is not reachable: {}'.format(e))
except Exception as e:
logging.error('Vendor Event Listener error: {}'.format(e))
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def download_file(url, download_directory):
"""Download a remote file
Args:
download_directory: (string)
Returns:
(string) that path of the file that was just downloaded. If something failed during
download, return None
Raises:
DownloadError
"""
Output.print_information("Downloading " + url + " ...")
parsed_url = urlparse(url)
if parsed_url.path in ["/", ""]:
file_name = parsed_url.netloc
else:
file_name = parsed_url.path.split("/")[-1]
download_path = abspath(join(download_directory, file_name))
try:
with open(download_path, 'wb') as file_object:
file_object.write(urlopen(url).read())
return download_path
except HTTPError as expn:
raise DownloadError("HTTP error code " + str(expn.code) + " while retrieving " \
+ url + "\n" + str(expn.reason))
except URLError as expn:
raise DownloadError("HTTP URL error while retrieving " + url + "\n" + str(expn.reason))
except Exception as expn:
raise DownloadError("Unable to retrieve " + url + "\n" + str(expn))
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def _http(self, _method, _path, **kw):
data = None
params = None
if _method=='GET' and kw:
_path = '%s?%s' % (_path, _encode_params(kw))
if _method in ['POST', 'PATCH', 'PUT']:
data = bytes(_encode_json(kw), 'utf-8')
url = '%s%s' % (_URL, _path)
opener = build_opener(HTTPSHandler)
request = Request(url, data=data)
request.get_method = _METHOD_MAP[_method]
if self._authorization:
request.add_header('Authorization', self._authorization)
if _method in ['POST', 'PATCH', 'PUT']:
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
try:
response = opener.open(request, timeout=TIMEOUT)
is_json = self._process_resp(response.headers)
if is_json:
return _parse_json(response.read().decode('utf-8'))
except HTTPError as e:
is_json = self._process_resp(e.headers)
if is_json:
json = _parse_json(e.read().decode('utf-8'))
else:
json = e.read().decode('utf-8')
req = JsonObject(method=_method, url=url)
resp = JsonObject(code=e.code, json=json)
if resp.code==404:
raise ApiNotFoundError(url, req, resp)
raise ApiError(url, req, resp)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
request = urllib.Request(url)
# print('API request url:', request.get_full_url())
if method:
request.get_method = lambda: method
token = token if token != None else token_auth_string()
request.add_header('Authorization', 'token ' + token)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if data is not None:
request.add_data(bytes(data.encode('utf8')))
# print('API request data:', request.get_data())
# print('API request header:', request.header_items())
# https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
# if https_proxy:
# opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
# urllib.ProxyHandler({'https': https_proxy}))
# urllib.install_opener(opener)
try:
with contextlib.closing(urllib.urlopen(request)) as response:
if response.code == 204: # No Content
return None
else:
return json.loads(response.read().decode('utf8', 'ignore'))
except urllib.HTTPError as err:
with contextlib.closing(err):
raise SimpleHTTPError(err.code, err.read())
def send_response(event, context, response_status, reason=None, response_data={}):
body = {
"Status": response_status,
"PhysicalResourceId": context.log_stream_name,
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
}
print("Responding: {}".format(response_status))
if reason:
print(reason)
body["Reason"] = reason
if response_data:
print(response_data)
body["Data"] = response_data
body = json.dumps(body).encode("utf-8")
req = Request(event["ResponseURL"], data=body, headers={
"Content-Length": len(body),
"Content-Type": "",
})
req.get_method = lambda: "PUT"
try:
urlopen(req)
return True
except HTTPError as e:
print("Failed executing HTTP request: {}".format(e.code))
return False
except URLError as e:
print("Failed to reach the server: {}".format(e.reason))
return False
def test(self, url, toHex=True):
try:
url = 'http://{}/{}'.format(self._ip, url)
print(url)
response = urlopen(
url, timeout=self.timeout).read()
if toHex:
response = response.encode('hex')
print(response)
except (HTTPError, URLError, socket.timeout) as e:
print(e)
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def catch_request(request):
"""Helper function to catch common exceptions encountered when
establishing a connection with a HTTP/HTTPS request
"""
try:
uh = urlopen(request)
return uh, False
except (HTTPError, URLError, socket.error):
e = sys.exc_info()[1]
return None, e
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
headers = {'User-Agent': user_agent}
start = timeit.default_timer()
h.request("GET", urlparts[2], headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
start = timeit.default_timer()
h.request("GET", urlparts[2])
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
def catch_request(request):
"""Helper function to catch common exceptions encountered when
establishing a connection with a HTTP/HTTPS request
"""
try:
uh = urlopen(request)
return uh
except (HTTPError, URLError, socket.error):
return False
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
headers = {'User-Agent': user_agent}
start = timeit.default_timer()
h.request("GET", urlparts[2], headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def send_tx(tx):
s = io.BytesIO()
tx.stream(s)
tx_as_hex = b2h(s.getvalue())
data = urlencode(dict(tx=tx_as_hex)).encode("utf8")
URL = "http://blockchain.info/pushtx"
try:
d = urlopen(URL, data=data).read()
return d
except HTTPError as ex:
d = ex.read()
print(ex)