def update_heartbeat():
@retry(
stop_max_delay=30000, # 30 seconds max
wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry
wait_exponential_max=1000, # but wait 1 second per try maximum
wrap_exception=True
)
def retry_fetch_fail_after_30sec():
return requests.post(
config['webservice']['shifthelperHeartbeat'],
auth=(
config['webservice']['user'],
config['webservice']['password']
)
).json()
try:
return retry_fetch_fail_after_30sec()
except RetryError as e:
return {}
python类RetryError()的实例源码
def test_with_stop_on_return_value(self):
try:
_retryable_test_with_stop(NoneReturnUntilAfterCount(5))
self.fail("Expected RetryError after 3 attempts")
except RetryError as re:
self.assertFalse(re.last_attempt.has_exception)
self.assertEqual(3, re.last_attempt.attempt_number)
self.assertTrue(re.last_attempt.value is None)
print(re)
def test_retry_if_exception_of_type(self):
self.assertTrue(_retryable_test_with_exception_type_io(NoIOErrorAfterCount(5)))
try:
_retryable_test_with_exception_type_io(NoNameErrorAfterCount(5))
self.fail("Expected NameError")
except NameError as n:
self.assertTrue(isinstance(n, NameError))
print(n)
try:
_retryable_test_with_exception_type_io_attempt_limit_wrap(NoIOErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertEqual(3, re.last_attempt.attempt_number)
self.assertTrue(re.last_attempt.has_exception)
self.assertTrue(re.last_attempt.value[0] is not None)
self.assertTrue(isinstance(re.last_attempt.value[1], IOError))
self.assertTrue(re.last_attempt.value[2] is not None)
print(re)
self.assertTrue(_retryable_test_with_exception_type_custom(NoCustomErrorAfterCount(5)))
try:
_retryable_test_with_exception_type_custom(NoNameErrorAfterCount(5))
self.fail("Expected NameError")
except NameError as n:
self.assertTrue(isinstance(n, NameError))
print(n)
try:
_retryable_test_with_exception_type_custom_attempt_limit_wrap(NoCustomErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertEqual(3, re.last_attempt.attempt_number)
self.assertTrue(re.last_attempt.has_exception)
self.assertTrue(re.last_attempt.value[0] is not None)
self.assertTrue(isinstance(re.last_attempt.value[1], CustomError))
self.assertTrue(re.last_attempt.value[2] is not None)
print(re)
def call(self, fn, *args, **kwargs):
start_time = int(round(workflow_time.time() * 1000))
attempt_number = 1
while True:
try:
val = yield fn(*args, **kwargs)
attempt = retrying.Attempt(val, attempt_number, False)
except Exception:
val = sys.exc_info()
attempt = retrying.Attempt(val, attempt_number, True)
if not self.should_reject(attempt):
return_(attempt.get(self._wrap_exception))
delay_since_first_attempt_ms = int(round(workflow_time.time() * 1000)) - start_time
if self.stop(attempt_number, delay_since_first_attempt_ms):
if not self._wrap_exception and attempt.has_exception:
# get() on an attempt with an exception should cause it to be raised, but raise just in case
raise attempt.get()
else:
raise RetryError(attempt)
else:
# use ceil since SWF timer resolution is in seconds
sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
yield workflow_time.sleep(ceil(sleep / 1000.0))
attempt_number += 1
def get_alerts():
@retry(stop_max_delay=30000, # 30 seconds max
wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry
wait_exponential_max=1000, # but wait 1 second per try maximum
wrap_exception=True
)
def retry_fetch_fail_after_30sec():
alerts = requests.get(config['webservice']['post-url'])
return alerts.json()
try:
return retry_fetch_fail_after_30sec()
except RetryError:
return {}
def fetch_users_awake():
@retry(
stop_max_delay=30000, # 30 seconds max
wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry
wait_exponential_max=1000, # but wait 1 second per try maximum
wrap_exception=True
)
def retry_fetch_fail_after_30sec():
return requests.get(config['webservice']['i_am_awake_url']).json()
try:
return retry_fetch_fail_after_30sec()
except RetryError:
return {}
def fetch_dummy_alerts():
@retry(
stop_max_delay=30000, # 30 seconds max
wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry
wait_exponential_max=1000, # but wait 1 second per try maximum
wrap_exception=True
)
def retry_fetch_fail_after_30sec():
return requests.get(config['webservice']['dummy_alerts_url']).json()
try:
return retry_fetch_fail_after_30sec()
except RetryError:
return {}
def test_wrapped_exception(self):
# base exception cases
self.assertTrue(_retryable_test_with_exception_type_io_wrap(NoIOErrorAfterCount(5)))
try:
_retryable_test_with_exception_type_io_wrap(NoNameErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertTrue(isinstance(re.last_attempt.value[1], NameError))
print(re)
try:
_retryable_test_with_exception_type_io_attempt_limit_wrap(NoIOErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertEqual(3, re.last_attempt.attempt_number)
self.assertTrue(re.last_attempt.has_exception)
self.assertTrue(re.last_attempt.value[0] is not None)
self.assertTrue(isinstance(re.last_attempt.value[1], IOError))
self.assertTrue(re.last_attempt.value[2] is not None)
print(re)
# custom error cases
self.assertTrue(_retryable_test_with_exception_type_custom_wrap(NoCustomErrorAfterCount(5)))
try:
_retryable_test_with_exception_type_custom_wrap(NoNameErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertTrue(re.last_attempt.value[0] is not None)
self.assertTrue(isinstance(re.last_attempt.value[1], NameError))
self.assertTrue(re.last_attempt.value[2] is not None)
print(re)
try:
_retryable_test_with_exception_type_custom_attempt_limit_wrap(NoCustomErrorAfterCount(5))
self.fail("Expected RetryError")
except RetryError as re:
self.assertEqual(3, re.last_attempt.attempt_number)
self.assertTrue(re.last_attempt.has_exception)
self.assertTrue(re.last_attempt.value[0] is not None)
self.assertTrue(isinstance(re.last_attempt.value[1], CustomError))
self.assertTrue(re.last_attempt.value[2] is not None)
self.assertTrue("This is a Custom exception class" in str(re.last_attempt.value[1]))
print(re)
def delete_ec2_volume(name, timeout=300):
"""Delete an EC2 EBS volume by its "Name" tag
Args:
timeout: seconds to wait for volume to become available for deletion
"""
def _force_detach_volume(volume):
for attachment in volume.attachments:
volume.detach_from_instance(
DryRun=False,
InstanceId=attachment['InstanceId'],
Device=attachment['Device'],
Force=True)
@retrying.retry(wait_fixed=30 * 1000, stop_max_delay=timeout * 1000,
retry_on_exception=lambda exc: isinstance(exc, botocore.exceptions.ClientError))
def _delete_volume(volume):
_force_detach_volume(volume)
volume.delete() # Raises ClientError if the volume is still attached.
def _get_current_aws_region():
try:
return requests.get('http://169.254.169.254/latest/meta-data/placement/availability-zone').text.strip()[:-1]
except requests.RequestException as ex:
print("Can't get AWS region from instance metadata: {}".format(ex))
return None
# Remove AWS environment variables to force boto to use IAM credentials.
with _remove_env_vars('AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'):
volumes = list(boto3.session.Session(
# We assume we're running these tests from a cluster node, so we
# can assume the region for the instance on which we're running is
# the same region in which any volumes were created.
region_name=_get_current_aws_region(),
).resource('ec2').volumes.filter(Filters=[{'Name': 'tag:Name', 'Values': [name]}]))
if len(volumes) == 0:
raise Exception('no volumes found with name {}'.format(name))
elif len(volumes) > 1:
raise Exception('multiple volumes found with name {}'.format(name))
volume = volumes[0]
try:
_delete_volume(volume)
except retrying.RetryError as ex:
raise Exception('Operation was not completed within {} seconds'.format(timeout)) from ex
def outlet_command(self, outlet, operation):
"""Send command to an outlet in the PDU.
:param outlet: outlet number
:param operation: one of ['on', 'off', 'reboot']
:return: did the operation complete successfully?
"""
valid_operations = ['on', 'off', 'reboot']
if operation not in valid_operations:
raise ValueError(
'"%s" is an invalid operation. Valid operations are: %s',
str(operation), str(valid_operations)
)
operations = {
'on': 1,
'off': 2,
'reboot': 3,
}
if 1 <= outlet <= self._num_outlets:
self._logger.info(
'Setting outlet %d to %s state', outlet, operation
)
self.__set(
self.Q_OUTLET_COMMAND_RW + (outlet,), operations[operation]
)
try:
if operation in ('on', 'reboot'):
success = self.__wait_for_state(outlet, 'on')
else:
success = self.__wait_for_state(outlet, 'off')
except RetryError:
# If the operation timed out, no determination of the result
# can be made.
success = False
return success
else:
raise IndexError(
'Only %d outlets exist. "%s" is an invalid outlet.',
self._num_outlets, str(outlet)
)