def new_version_available(self) -> bool:
"""
Checks the remote manifest to see if a new version is available
:return: Whether or not there is a new version available
"""
if self.manifest.get("remote_manifest", "") == "":
return False
self.logger.info("Checking for update...")
try:
manifest = requests.get(self.manifest.get("remote_manifest", "")).json()
version_remote = manifest["version"].split(".")
version_local = self.manifest["version"].split(".")
for i in range(len(version_local)):
if int(version_local[i]) < int(get_item_from_list(version_remote, i, "0")):
self.logger.info(f"An update is available. Current version is v{self.manifest['version']}, updated version is v{manifest['version']}.")
return True
self.logger.info("No update required.")
return False
except requests.exceptions.ConnectionError:
self.logger.warning("Failed to check for update.")
return False
python类exceptions()的实例源码
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
"""Ensure that the IPFS daemon is running via HTTP before proceeding"""
client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)
try:
# OSError if ipfs not installed, redundant of below
# subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))
# ConnectionError/AttributeError if IPFS daemon not running
client.id()
return True
except (ConnectionError, exceptions.AttributeError):
logError("Daemon is not running at http://" + hostAndPort)
return False
except OSError:
logError("IPFS is likely not installed. "
"See https://ipfs.io/docs/install/")
sys.exit()
except:
logError('Unknown error in retrieving daemon status')
logError(sys.exc_info()[0])
def enroll(self, enrollment_id, enrollment_secret):
"""Enroll a registered user in order to receive a signed X509 certificate
Args:
enrollment_id (str): The registered ID to use for enrollment
enrollment_secret (str): The secret associated with the
enrollment ID
Returns: PEM-encoded X509 certificate
Raises:
RequestException: errors in requests.exceptions
ValueError: Failed response, json parse error, args missing
"""
private_key = self._crypto.generate_private_key()
csr = self._crypto.generate_csr(private_key, x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
cert = self._ca_client.enroll(
enrollment_id, enrollment_secret,
csr.public_bytes(Encoding.PEM).decode("utf-8"))
return Enrollment(private_key, cert)
def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None):
self.host = host
self.username = username
self.password = password
self.cert = cert
self.verify_ssl = verify_ssl
if not user_agent:
self.user_agent = 'webinspectapi/' + version
else:
self.user_agent = user_agent
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Set auth_type based on what's been provided
if username is not None:
self.auth_type = 'basic'
elif cert is not None:
self.auth_type = 'certificate'
else:
self.auth_type = 'unauthenticated'
def find_decl_doc(self, name):
raise IncludeError(name)
import requests
from requests.exceptions import InvalidSchema
url = METATAB_ASSETS_URL + name + '.csv'
try:
# See if it exists online in the official repo
r = requests.head(url, allow_redirects=False)
if r.status_code == requests.codes.ok:
return url
except InvalidSchema:
pass # It's probably FTP
def test_request_retry(self, mock_request):
"""Tests that two connection errors will be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 3:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 204
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points
)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def test_request_retry(self, mock_request):
"""Tests that two connection errors will be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 3:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points
)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def __init__(self, endpoint, resource_uri, optimization=True,
max_elems=100, filter_query=None, filter_dialect=None):
self.endpoint = endpoint
self.resource_uri = resource_uri
self.filter_dialect = None
self.filter_query = None
self.optimization = optimization
self.max_elems = max_elems
if filter_query is not None:
try:
self.filter_dialect = FILTER_DIALECT_MAP[filter_dialect]
except KeyError:
valid_opts = ', '.join(FILTER_DIALECT_MAP)
raise exceptions.WSManInvalidFilterDialect(
invalid_filter=filter_dialect, supported=valid_opts)
self.filter_query = filter_query
def blender_id_server_validate(token) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
"""Validate the auth token with the server.
@param token: the authentication token
@type token: str
@returns: tuple (expiry, error).
The expiry is the expiry date of the token if it is valid, else None.
The error is None if the token is valid, or an error message when it's invalid.
"""
import requests
import requests.exceptions
try:
r = requests.post(blender_id_endpoint('u/validate_token'),
data={'token': token}, verify=True)
except requests.exceptions.RequestException as e:
return (str(e), None)
if r.status_code != 200:
return (None, 'Authentication token invalid')
response = r.json()
return (response['token_expires'], None)
def make_authenticated_call(method, url, auth_token, data):
"""Makes a HTTP call authenticated with the OAuth token."""
import requests
import requests.exceptions
try:
r = requests.request(method,
blender_id_endpoint(url),
data=data,
headers={'Authorization': 'Bearer %s' % auth_token},
verify=True)
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
raise BlenderIdCommError(str(e))
return r
def track(token, uid, message, name='Message'):
try:
r = requests.post(
TRACK_URL,
params={"token": token, "uid": uid, "name": name},
data=json.dumps(message),
headers={'Content-type': 'application/json'},
)
return r.json()
except requests.exceptions.Timeout:
# set up for a retry, or continue in a retry loop
return False
except (requests.exceptions.RequestException, ValueError) as e:
# catastrophic error
print(e)
return False
def _request(self, method, url, data=None, **kwargs):
try:
response = self.session.request(method, url, data=data, **kwargs)
if response.status_code == 200:
return response
else:
try:
data = response.json()
except ValueError:
data = None
raise matchlight.error.APIError(
response.status_code, response.reason, data)
except requests.exceptions.RetryError:
raise matchlight.error.ConnectionError(
'Matchlight API request failed with too many retries')
except requests.exceptions.ConnectionError:
raise matchlight.error.ConnectionError(
'Matchlight API request failed with connection error')
registration.py 文件源码
项目:CommunityCellularManager
作者: facebookincubator
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def register_update(eapi):
"""Ensures the inbound URL for the BTS is up to date."""
vpn_ip = system_utilities.get_vpn_ip()
vpn_status = 'up' if vpn_ip else 'down'
# This could fail when offline! Must handle connection exceptions.
params = {
'bts_uuid': _get_snowflake(),
'vpn_status': vpn_status,
'vpn_ip': vpn_ip,
'federer_port': '80',
}
try:
d = _send_cloud_req(
requests.get,
'/bts/register',
'BTS registration',
params=params,
headers=eapi.auth_header,
timeout=11)
if 'bts_secret' in d:
conf['bts_secret'] = d['bts_secret']
except RegistrationError as ex:
logger.error(str(ex))
def __exit__(self, exc, value, traceback):
if self._is_reported:
# if the user has already manually marked this response as failure or success
# we can ignore the default haviour of letting the response code determine the outcome
return exc is None
if exc:
if isinstance(value, ResponseError):
self.failure(value)
else:
return False
else:
try:
self.raise_for_status()
except requests.exceptions.RequestException as e:
self.failure(e)
else:
self.success()
return True
def put_session(self, session, retry=True):
try:
r = requests.put(self.url + "/conns", auth=self.auth, json=session, timeout=20.0)
except requests.exceptions.RequestException:
dbg("Cannot connect to backend")
return []
if r.status_code == 200:
return r.json()
elif retry:
msg = r.raw.read()
dbg("Backend upload failed, retrying (" + str(msg) + ")")
return self.put_session(session, False)
else:
msg = r.raw.read()
raise IOError(msg)
def put_sample_info(self, f, retry=True):
try:
sha256 = f["sha256"]
r = requests.put(self.url + "/sample/" + sha256, auth=self.auth, json=f, timeout=20.0)
except requests.exceptions.RequestException:
dbg("Cannot connect to backend")
return
if r.status_code == 200:
return r.json()
elif retry:
msg = r.raw.read()
dbg("Backend upload failed, retrying (" + str(msg) + ")")
return self.put_sample_info(f, False)
else:
msg = r.raw.read()
raise IOError(msg)
def put_sample(self, data, retry=True):
try:
r = requests.post(self.url + "/file", auth=self.auth, data=data, timeout=20.0)
except requests.exceptions.RequestException:
dbg("Cannot connect to backend")
return
if r.status_code == 200:
return
elif retry:
msg = r.raw.read()
dbg("Backend upload failed, retrying (" + str(msg) + ")")
return self.put_sample(sha256, filename, False)
else:
msg = r.raw.read()
raise IOError(msg)
def test_reset_no_response_get(self, mock_time, mock_requests, mock_get_url):
"""
Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
and: EzOutlet initialized with an IP address and timeout.
When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
"""
_ = mock_time
# Given
self.configure_mock_requests(mock_requests=mock_requests)
# When
try:
self.uut.reset(post_reset_delay=self.post_reset_delay,
ez_outlet_reset_interval=self.ez_outlet_reset_interval)
except ezoutlet.exceptions.EzOutletError:
pass # exception tested elsewhere
# Then
mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
def test_reset_no_response_raise(self, mock_time, mock_requests, mock_get_url):
"""
Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
and: EzOutlet initialized with an IP address and timeout.
When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
Then: reset() raises ez_outlet.EzOutletError, e.
and: str(e) == ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(timeout).
"""
_ = mock_time
_ = mock_get_url
# Given
self.configure_mock_requests(mock_requests=mock_requests)
# When
with self.assertRaises(ezoutlet.exceptions.EzOutletError) as e:
self.uut.reset(post_reset_delay=self.post_reset_delay,
ez_outlet_reset_interval=self.ez_outlet_reset_interval)
# Then
self.assertEqual(str(e.exception),
ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(self.timeout))
def test_reset_no_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
"""
Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
and: EzOutlet initialized with an IP address and timeout.
When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
"""
_ = mock_get_url
# Given
self.configure_mock_requests(mock_requests=mock_requests)
# When
try:
self.uut.reset(post_reset_delay=self.post_reset_delay,
ez_outlet_reset_interval=self.ez_outlet_reset_interval)
except ezoutlet.exceptions.EzOutletError:
pass # exception tested elsewhere
# Then
mock_time.sleep.assert_not_called()
# Suppress since PyCharm doesn't recognize @mock.patch.object
# noinspection PyUnresolvedReferences
def test_reset_unexpected_response_get(self, mock_time, mock_requests, mock_get_url):
"""
Given: Mock requests module configured to give unexpected_response_contents
and: EzOutlet initialized with an IP address and timeout.
When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
"""
_ = mock_time
# Given
self.configure_mock_requests(mock_requests=mock_requests)
# When
try:
self.uut.reset(post_reset_delay=self.post_reset_delay,
ez_outlet_reset_interval=self.ez_outlet_reset_interval)
except ezoutlet.exceptions.EzOutletError:
pass # exception tested elsewhere
# Then
mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
def test_reset_unexpected_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
"""
Given: Mock requests module configured to give unexpected_response_contents
and: EzOutlet initialized with an IP address and timeout.
When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
"""
_ = mock_get_url
# Given
self.configure_mock_requests(mock_requests=mock_requests)
# When
try:
self.uut.reset(post_reset_delay=self.post_reset_delay,
ez_outlet_reset_interval=self.ez_outlet_reset_interval)
except ezoutlet.exceptions.EzOutletError:
pass # exception tested elsewhere
# Then
mock_time.sleep.assert_not_called()
def directions(self, proxies=None):
"""
Returns list with translate directions
>>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e")
>>> directions = translate.directions
>>> len(directions) > 0
True
"""
try:
response = requests.get(self.url("langs"), params={"key": self.api_key}, proxies=proxies)
except requests.exceptions.ConnectionError:
raise YandexTranslateException(self.error_codes[503])
else:
response = response.json()
status_code = response.get("code", 200)
if status_code != 200:
raise YandexTranslateException(status_code)
return response.get("dirs")
def retrieve_pool(self, uuid):
"""Retrieve a :class:`qarnot.pool.Pool` from its uuid
:param str uuid: Desired pool uuid
:rtype: :class:`~qarnot.pool.Pool`
:returns: Existing pool defined by the given uuid
:raises qarnot.exceptions.MissingPoolException: pool does not exist
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('pool update', uuid=uuid))
if response.status_code == 404:
raise MissingPoolException(response.json()['message'])
raise_on_error(response)
return Pool.from_json(self, response.json())
def retrieve_task(self, uuid):
"""Retrieve a :class:`qarnot.task.Task` from its uuid
:param str uuid: Desired task uuid
:rtype: :class:`~qarnot.task.Task`
:returns: Existing task defined by the given uuid
:raises qarnot.exceptions.MissingTaskException: task does not exist
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('task update', uuid=uuid))
if response.status_code == 404:
raise MissingTaskException(response.json()['message'])
raise_on_error(response)
return Task.from_json(self, response.json())
def retrieve_disk(self, uuid):
"""Retrieve a :class:`~qarnot.disk.Disk` from its uuid
:param str uuid: Desired disk uuid
:rtype: :class:`~qarnot.disk.Disk`
:returns: Existing disk defined by the given uuid
:raises ValueError: no such disk
:raises qarnot.exceptions.MissingDiskException: disk does not exist
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('disk info', name=uuid))
if response.status_code == 404:
raise MissingDiskException(response.json()['message'])
raise_on_error(response)
return Disk.from_json(self, response.json())
def create_disk(self, description, lock=False, tags=None):
"""Create a new :class:`~qarnot.disk.Disk`.
:param str description: a short description of the disk
:param bool lock: prevents the disk to be removed accidentally
:param tags: custom tags
:type tags: list(`str`)
:rtype: :class:`qarnot.disk.Disk`
:returns: The created :class:`~qarnot.disk.Disk`.
:raises qarnot.exceptions.MaxDiskException: disk quota reached
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
"""
disk = Disk(self, description, lock=lock, tags=tags)
disk.create()
return disk
def profiles(self):
"""Get list of profiles available on the cluster.
:rtype: List of :class:`Profile`
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
url = get_url('profiles')
response = self._get(url)
raise_on_error(response)
profiles_list = []
for p in response.json():
url = get_url('profile details', profile=p)
response2 = self._get(url)
if response2.status_code == 404:
continue
raise_on_error(response2)
profiles_list.append(Profile(response2.json()))
return profiles_list