def mm_heartbeat(self):
# Check if stop or set next timer
if self.shutdown:
return
threading.Timer(self.hb_timer, self.mm_heartbeat).start()
address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB")
logger.debug("Sending MM Heatbeat")
try:
response = urlopen(address).read()
except URLError as err:
logger.error("URLError: %s", err.reason)
return
logger.debug("Response: " + response)
python类URLError()的实例源码
def send_remote_shutdown_command(self):
try:
from urllib import request as url_request
URLError = url_request.URLError
except ImportError:
import urllib2 as url_request
import urllib2
URLError = urllib2.URLError
try:
url_request.urlopen("http://127.0.0.1:%d/shutdown" % self.port)
except URLError:
return
count = 0
while self.is_connectable():
if count == 30:
break
count += 1
time.sleep(1)
def send_remote_shutdown_command(self):
try:
from urllib import request as url_request
URLError = url_request.URLError
except ImportError:
import urllib2 as url_request
import urllib2
URLError = urllib2.URLError
try:
url_request.urlopen("%s/shutdown" % self.service_url)
except URLError:
return
for x in range(30):
if not self.is_connectable():
break
else:
time.sleep(1)
def __getitem__(self, key):
if self.cache and key in self.geocache:
return self.geocache[key]
location = Location()
try:
self._get_geocoding(key, location)
self._get_timezone(location)
self._get_elevation(location)
except URLError:
raise AstralError(('GoogleGeocoder: Unable to contact '
'Google maps API'))
url = 'http://maps.google.com/maps?q=loc:%f,%f'
location.url = url % (location.latitude, location.longitude)
if self.cache:
self.geocache[key] = location
return location
def command(self, command, value=None):
func_str = 'GoProHero.command({}, {})'.format(command, value)
if command in self.commandMaxtrix:
args = self.commandMaxtrix[command]
# accept both None and '' for commands without a value
if value == '':
value = None
# for commands with values, translate the value
if value is not None and value in args['translate']:
value = args['translate'][value]
# build the final url
url = self._commandURL(args['cmd'], value)
# attempt to contact the camera
try:
urlopen(url, timeout=self.timeout).read()
logging.info('{} - http success!'.format(func_str))
return True
except (HTTPError, URLError, socket.timeout) as e:
logging.warning('{}{} - error opening {}: {}{}'.format(
Fore.YELLOW, func_str, url, e, Fore.RESET))
# catchall return statement
return False
def send_remote_shutdown_command(self):
try:
from urllib import request as url_request
URLError = url_request.URLError
except ImportError:
import urllib2 as url_request
import urllib2
URLError = urllib2.URLError
try:
url_request.urlopen("http://127.0.0.1:%d/shutdown" % self.port)
except URLError:
return
count = 0
while self.is_connectable():
if count == 30:
break
count += 1
time.sleep(1)
def __getitem__(self, key):
if self.cache and key in self.geocache:
return self.geocache[key]
location = Location()
try:
self._get_geocoding(key, location)
self._get_timezone(location)
self._get_elevation(location)
except URLError:
raise AstralError(('GoogleGeocoder: Unable to contact '
'Google maps API'))
url = 'http://maps.google.com/maps?q=loc:%f,%f'
location.url = url % (location.latitude, location.longitude)
if self.cache:
self.geocache[key] = location
return location
def retry_wrapper(func, name, mid, log=False):
def wrapped_func():
max_tries = 5
for trie in range(max_tries):
try:
func()
return
except ur.URLError:
if log:
print("URLError for {}({}); sleeping for 1 second".format(name, mid))
time.sleep(1)
except http.client.RemoteDisconnected:
if log:
print("RemoteDisconnected for {}({}); sleeping for 1 second".format(name, mid))
time.sleep(1)
else:
raise ValueError("Unable to execute function.")
return wrapped_func
def rank_checker(url,hatebu_url):
try:
html = request.urlopen(hatebu_url)
except request.HTTPError as e:
print(e.reason)
except request.URLError as e:
print(e.reason)
soup = BeautifulSoup(html,"lxml")
a = soup.find("a",href=url)
if a == None:
rank = None
else:
rank = a.get("data-entryrank")
return rank
# ????????????????????
def test_open_url(self):
if sys.version_info[0] < 3:
p2url = urllib.pathname2url
else:
p2url = urllib2.pathname2url
fpath = os.path.join(os.path.dirname(__file__), "resources")
fpath = os.path.abspath(fpath)
tfile = os.path.join(fpath, "rwopstest.txt")
urlpath = "file:%s" % p2url(tfile)
resfile = resources.open_url(urlpath)
self.assertIsNotNone(resfile)
tfile = os.path.join(fpath, "invalid")
urlpath = "file:%s" % p2url(tfile)
self.assertRaises(urllib2.URLError, resources.open_url, urlpath)
def retrieve_url_nodecode(url):
""" Return the content of the url page as a string """
req = Request(url, headers = headers)
try:
response = urlopen(req)
except URLError as errno:
print(" ".join(("Connection error:", str(errno.reason))))
print(" ".join(("URL:", url)))
return ""
dat = response.read()
# Check if it is gzipped
if dat[:2] == '\037\213':
# Data is gzip encoded, decode it
compressedstream = StringIO(dat)
gzipper = gzip.GzipFile(fileobj=compressedstream)
extracted_data = gzipper.read()
dat = extracted_data
return dat
return dat
def retrieve_url_nodecode(url):
""" Return the content of the url page as a string """
req = Request(url, headers = headers)
try:
response = urlopen(req)
except URLError as errno:
print(" ".join(("Connection error:", str(errno.reason))))
print(" ".join(("URL:", url)))
return ""
dat = response.read()
# Check if it is gzipped
if dat[:2] == '\037\213':
# Data is gzip encoded, decode it
compressedstream = StringIO(dat)
gzipper = gzip.GzipFile(fileobj=compressedstream)
extracted_data = gzipper.read()
dat = extracted_data
return dat
return dat
def send_remote_shutdown_command(self):
try:
from urllib import request as url_request
URLError = url_request.URLError
except ImportError:
import urllib2 as url_request
import urllib2
URLError = urllib2.URLError
try:
url_request.urlopen("%s/shutdown" % self.service_url)
except URLError:
return
count = 0
while self.is_connectable():
if count == 30:
break
count += 1
time.sleep(1)
def _load_image_from_url_or_local_path(self, file_path):
urlparts = urlparse(file_path)
if urlparts.scheme in ('http', 'https'):
try:
file = self._session.get(file_path, stream=True).raw
except RequestException:
log.warning("Image skipped", exc_info=True)
return None
elif urlparts.scheme in ("ftp", "data"):
try:
file = urlopen(file_path)
except (URLError, ) + ftplib.all_errors:
log.warning("Image skipped", exc_info=True)
return None
else:
file = file_path
try:
return open_image(file)
except (IOError, ValueError):
log.warning("Image skipped", exc_info=True)
return None
def get_location(url):
try:
response = request.urlopen(url)
# urllib will follow redirections and it's too much code to tell urllib
# not to do that
return response.geturl()
except socket.timeout:
print('request timeout')
exit()
except request.HTTPError as e:
print(e.code)
except request.URLError as e:
print(e.reason)
exit()
return "fail"
def get_location(url):
try:
response = request.urlopen(url)
# urllib will follow redirections and it's too much code to tell urllib
# not to do that
return response.geturl()
except socket.timeout:
print('request timeout')
exit()
except request.HTTPError as e:
print(e.code)
except request.URLError as e:
print(e.reason)
exit()
return "fail"
sdl2ext_resources_test.py 文件源码
项目:inventwithpython_pysdl2
作者: rswinkle
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_open_url(self):
if sys.version_info[0] < 3:
p2url = urllib.pathname2url
else:
p2url = urllib2.pathname2url
fpath = os.path.join(os.path.dirname(__file__), "resources")
fpath = os.path.abspath(fpath)
tfile = os.path.join(fpath, "rwopstest.txt")
urlpath = "file:%s" % p2url(tfile)
resfile = resources.open_url(urlpath)
self.assertIsNotNone(resfile)
tfile = os.path.join(fpath, "invalid")
urlpath = "file:%s" % p2url(tfile)
self.assertRaises(urllib2.URLError, resources.open_url, urlpath)
def update_mm(self, status):
address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSSTATUS&status=" + status)
logger.debug("Calling URL: %s", address)
try:
response = urlopen(address).read()
except URLError as err:
logger.error("URLError: %s", err.reason)
return
logger.debug("Response: %s", response)
def send_data(self, event):
"""Send event to VES"""
server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
's' if self._app_config['UseHttps'] else '',
self._app_config['Domain'], int(self._app_config['Port']),
'{}'.format('/{}'.format(self._app_config['Path']) if len(
self._app_config['Path']) > 0 else ''),
int(self._app_config['ApiVersion']), '{}'.format(
'/{}'.format(self._app_config['Topic']) if len(
self._app_config['Topic']) > 0 else ''))
logging.info('Vendor Event Listener is at: {}'.format(server_url))
credentials = base64.b64encode('{}:{}'.format(
self._app_config['Username'],
self._app_config['Password']).encode()).decode()
logging.info('Authentication credentials are: {}'.format(credentials))
try:
request = url.Request(server_url)
request.add_header('Authorization', 'Basic {}'.format(credentials))
request.add_header('Content-Type', 'application/json')
event_str = json.dumps(event).encode()
logging.debug("Sending {} to {}".format(event_str, server_url))
url.urlopen(request, event_str, timeout=1)
logging.debug("Sent data to {} successfully".format(server_url))
except url.HTTPError as e:
logging.error('Vendor Event Listener exception: {}'.format(e))
except url.URLError as e:
logging.error(
'Vendor Event Listener is is not reachable: {}'.format(e))
except Exception as e:
logging.error('Vendor Event Listener error: {}'.format(e))
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def download_file(url, download_directory):
"""Download a remote file
Args:
download_directory: (string)
Returns:
(string) that path of the file that was just downloaded. If something failed during
download, return None
Raises:
DownloadError
"""
Output.print_information("Downloading " + url + " ...")
parsed_url = urlparse(url)
if parsed_url.path in ["/", ""]:
file_name = parsed_url.netloc
else:
file_name = parsed_url.path.split("/")[-1]
download_path = abspath(join(download_directory, file_name))
try:
with open(download_path, 'wb') as file_object:
file_object.write(urlopen(url).read())
return download_path
except HTTPError as expn:
raise DownloadError("HTTP error code " + str(expn.code) + " while retrieving " \
+ url + "\n" + str(expn.reason))
except URLError as expn:
raise DownloadError("HTTP URL error while retrieving " + url + "\n" + str(expn.reason))
except Exception as expn:
raise DownloadError("Unable to retrieve " + url + "\n" + str(expn))
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def async_run(self, line, cell=None):
"""Run code into cell asynchronously
Usage:\\
%async_run <source> (cell content)
"""
if cell is None:
code_to_run = line
else:
code_to_run = cell
session_id = str(uuid4())
connection_id = format_ws_connection_id(PY_ROLE, session_id)
try:
_ = urlopen(connection_string(web_socket=False, extra='ping'))
except URLError:
print("Connection to server refused!", end=' ')
print("Use %async_run_server first!")
else:
connector = WSConnector(connection_id, code_to_run, self.shell)
connector.connect()
html_output = LIGHT_HTML_OUTPUT_CELL.format(session_id=session_id)
js_code = JS_WEBSOCKET_CODE.replace('__sessionid__', session_id)
js_code = js_code.replace('__connection_id__', format_ws_connection_id(JS_ROLE,
session_id))
html_output += js_code
return HTML(html_output)
def send_response(event, context, response_status, reason=None, response_data={}):
body = {
"Status": response_status,
"PhysicalResourceId": context.log_stream_name,
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
}
print("Responding: {}".format(response_status))
if reason:
print(reason)
body["Reason"] = reason
if response_data:
print(response_data)
body["Data"] = response_data
body = json.dumps(body).encode("utf-8")
req = Request(event["ResponseURL"], data=body, headers={
"Content-Length": len(body),
"Content-Type": "",
})
req.get_method = lambda: "PUT"
try:
urlopen(req)
return True
except HTTPError as e:
print("Failed executing HTTP request: {}".format(e.code))
return False
except URLError as e:
print("Failed to reach the server: {}".format(e.reason))
return False
def test(self, url, toHex=True):
try:
url = 'http://{}/{}'.format(self._ip, url)
print(url)
response = urlopen(
url, timeout=self.timeout).read()
if toHex:
response = response.encode('hex')
print(response)
except (HTTPError, URLError, socket.timeout) as e:
print(e)
def catch_request(request):
"""Helper function to catch common exceptions encountered when
establishing a connection with a HTTP/HTTPS request
"""
try:
uh = urlopen(request)
return uh, False
except (HTTPError, URLError, socket.error):
e = sys.exc_info()[1]
return None, e
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
headers = {'User-Agent': user_agent}
start = timeit.default_timer()
h.request("GET", urlparts[2], headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def query(self, query):
query = textwrap.dedent(query)
if self.debug:
self.debug(query)
url = '%s?query=%s' % (self.endpoint, quote(query))
req = Request(url)
if not self.cache:
req.add_header('cache-control', 'no-cache')
req.add_header('Accept', 'application/sparql-results+json')
try:
res = urlopen(req).read()
except (URLError) as e:
raise WdmapperError(e)
if six.PY3:
res = res.decode('utf8')
if res:
try:
data = json.loads(res)
except ValueError as e:
if res.find(b'QueryTimeoutException') != -1:
e = 'query timeout'
raise WdmapperError(e)
if data and 'results' in data:
result = []
qvars = data['head']['vars']
for row in data['results']['bindings']:
values = {}
for var in qvars:
if var in row:
values[var] = row[var]['value']
else:
values[var] = None
result.append(values)
return result
def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""
results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
start = timeit.default_timer()
h.request("GET", urlparts[2])
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def ping(ip):
try:
urlopen("http://" + ip + ":1921/name").read()
return True
except URLError:
return False