python类retry()的实例源码

__init__.py 文件源码 项目:shifthelper 作者: fact-project 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def update_heartbeat():
    @retry(
        stop_max_delay=30000,  # 30 seconds max
        wait_exponential_multiplier=100,  # wait 2^i * 100 ms, on the i-th retry
        wait_exponential_max=1000,  # but wait 1 second per try maximum
        wrap_exception=True
    )
    def retry_fetch_fail_after_30sec():
        return requests.post(
            config['webservice']['shifthelperHeartbeat'],
            auth=(
                config['webservice']['user'],
                config['webservice']['password']
            )
        ).json()
    try:
        return retry_fetch_fail_after_30sec()
    except RetryError as e:
        return {}
service.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _do_work(self, params, fn):
        pod_name = params.args.K8S_POD_NAME

        timeout = CONF.cni_daemon.vif_annotation_timeout

        # In case of KeyError retry for `timeout` s, wait 1 s between tries.
        @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000,
                        retry_on_exception=lambda e: isinstance(e, KeyError))
        def find():
            return self.registry[pod_name]

        try:
            d = find()
            pod = d['pod']
            vif = base.VersionedObject.obj_from_primitive(d['vif'])
        except KeyError:
            raise exceptions.ResourceNotReady(pod_name)

        fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS)
        return vif
test_retrying.py 文件源码 项目:deb-python-retrying 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_after_attempts(self):
        TestBeforeAfterAttempts._attempt_number = 0

        def _after(attempt_number):
            TestBeforeAfterAttempts._attempt_number = attempt_number

        @retry(wait_fixed = 100, stop_max_attempt_number = 3, after_attempts = _after)
        def _test_after():
            if TestBeforeAfterAttempts._attempt_number < 2:
                raise Exception("testing after_attempts handler")
            else:
                pass

        _test_after()

        self.assertTrue(TestBeforeAfterAttempts._attempt_number is 2)
boto3_retry.py 文件源码 项目:boto3-retry 作者: timmartin19 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def retry_boto_func(func, *args,
                    retryable_error_codes=None,
                    retryable_status_codes=None,
                    retryable_exceptions=None,
                    max_retries=5,
                    retry_wait_time=2000,
                    **kwargs):
    retriable = retry(
        stop_max_attempt_number=max_retries,
        wait_exponential_multiplier=retry_wait_time,
        retry_on_exception=lambda exc: isinstance(exc, Boto3RetryableException)
    )(_call_boto_func)
    return retriable(func, *args,
                     retryable_status_codes=retryable_status_codes,
                     retryable_exceptions=retryable_exceptions,
                     retryable_codes=retryable_error_codes,
                     **kwargs)
google.py 文件源码 项目:dsub 作者: googlegenomics 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _retry_api_check(exception):
  """Return True if we should retry. False otherwise.

  Args:
    exception: An exception to test for transience.

  Returns:
    True if we should retry. False otherwise.
  """
  _print_error('Exception %s: %s' % (type(exception).__name__, str(exception)))

  if isinstance(exception, apiclient.errors.HttpError):
    if exception.resp.status in TRANSIENT_HTTP_ERROR_CODES:
      return True

  if isinstance(exception, socket.error):
    if exception.errno in TRANSIENT_SOCKET_ERROR_CODES:
      return True

  if isinstance(exception, HttpAccessTokenRefreshError):
    return True

  return False
config.py 文件源码 项目:farnsworth 作者: mechaphish 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def execute_sql(self, sql, params=None, require_commit=True):
        @retry(wait_exponential_multiplier=500,
               wait_exponential_max=10000,
               stop_max_attempt_number=10,
               retry_on_exception=self.retry_if_peewee_error)
        def execute():
            try:
                cursor = super(RetryHarderOperationalError, self) \
                    .execute_sql(sql, params, require_commit)
            except (peewee.OperationalError, peewee.InterfaceError), error:
                print LOG.debug("Retrying after Peewee error: %s", error.message)
                if not self.is_closed():
                    self.close()
                with self.exception_wrapper():
                    cursor = self.get_cursor()
                    cursor.execute(sql, params or ())
                    if require_commit and self.get_autocommit():
                        self.commit()
            return cursor
        return execute()
blueprint_item.py 文件源码 项目:aos-pyez 作者: Apstra 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def await_build_ready(self, timeout=5000):
        """
        Wait a specific amount of `timeout` for the blueprint build status
        to return no errors.  The waiting polling interval is fixed at 1sec.

        Args:
            timeout (int): timeout to wait in milliseconds

        Returns:
            True: when the blueprint contains to build errors
            False: when the blueprint contains build errors, even after waiting `timeout`

        """
        @retrying.retry(wait_fixed=1000, stop_max_delay=timeout)
        def wait_for_no_errors():
            assert not self.build_errors

        # noinspection PyBroadException
        try:
            wait_for_no_errors()
        except:
            return False

        return True
test_mesos.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_if_ucr_app_runs_in_new_pid_namespace(dcos_api_session):
    # We run a marathon app instead of a metronome job because metronome
    # doesn't support running docker images with the UCR. We need this
    # functionality in order to test that the pid namespace isolator
    # is functioning correctly.
    app, test_uuid = test_helpers.marathon_test_app(container_type=marathon.Container.MESOS)

    ps_output_file = 'ps_output'
    app['cmd'] = 'ps ax -o pid= > {}; sleep 1000'.format(ps_output_file)

    with dcos_api_session.marathon.deploy_and_cleanup(app, check_health=False):
        marathon_framework_id = dcos_api_session.marathon.get('/v2/info').json()['frameworkId']
        app_task = dcos_api_session.marathon.get('/v2/apps/{}/tasks'.format(app['id'])).json()['tasks'][0]

        # There is a short delay between the `app_task` starting and it writing
        # its output to the `pd_output_file`. Because of this, we wait up to 10
        # seconds for this file to appear before throwing an exception.
        @retrying.retry(wait_fixed=1000, stop_max_delay=10000)
        def get_ps_output():
            return dcos_api_session.mesos_sandbox_file(
                app_task['slaveId'], marathon_framework_id, app_task['id'], ps_output_file)

        assert len(get_ps_output().split()) <= 4, 'UCR app has more than 4 processes running in its pid namespace'
animebot.py 文件源码 项目:animebot 作者: EV3REST 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def noparser(bot, update, tags, pages, chat_id, info=None): #Parser without retry loop (to prevent infinte exception)
    bot.sendChatAction(chat_id, "upload_photo")
    client = Pybooru('Yandere')
    randomint = randint(1000, 10000000)
    try:
        randompage = randint(1, int(pages))
        posts = client.posts_list(tags=str(tags), limit=1, page=str(randompage))
        for post in posts:
            urllib.request.urlretrieve(post['file_url'], "tmp/anime_bot_" + str(randomint) + ".jpg")
            tmp_data = "Uploader: " + post['author']  + "\nID: " + str(post['id'])
            globalarray[chat_id] = dict(data=tmp_data)
        photo = open('tmp/anime_bot_' + str(randomint) + ".jpg", 'rb')
        reply_markup = ikeyboard
        if info != None:
            bot.sendPhoto(chat_id, photo, reply_markup=reply_markup, caption=info + '\n' + tmp_data)
            os.remove('tmp/anime_bot_' + str(randomint) + ".jpg")
        else:
            bot.sendPhoto(chat_id, photo, reply_markup=reply_markup, caption=tmp_data)
            os.remove('tmp/anime_bot_' + str(randomint) + ".jpg")
    except Exception as e:
        print(e)
when_impl.py 文件源码 项目:directory-tests 作者: uktrade 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def visit_page(
        context: Context, actor_alias: str, page_name: str, *,
        first_time: bool = False):
    """Will visit specific page.

    NOTE:
    In order for the retry scheme to work properly you should have
    the webdriver' page load timeout set to value lower than the retry's
    `wait_fixed` timer, e.g `driver.set_page_load_timeout(time_to_wait=30)`
    """
    if not get_actor(context, actor_alias):
        add_actor(context, unauthenticated_actor(actor_alias))
    context.current_page = get_page_object(page_name)
    logging.debug(
        "%s will visit '%s' page using: '%s'", actor_alias, page_name,
        context.current_page.URL)
    context.current_page.visit(context.driver, first_time=first_time)
eventually_consistent.py 文件源码 项目:python-repo-tools 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def call(f, exceptions=AssertionError, tries=STOP_MAX_ATTEMPT_NUMBER_DEFAULT):
    """Call a given function and treat it as eventually consistent.

    The function will be called immediately and retried with exponential
    backoff up to the listed amount of times.

    By default, it only retries on AssertionErrors, but can be told to retry
    on other errors.

    For example:

        @eventually_consistent.call
        def _():
            results = client.query().fetch(10)
            assert len(results) == 10

    """
    __tracebackhide__ = True
    return retry(
        wait_exponential_multiplier=WAIT_EXPONENTIAL_MULTIPLIER,
        wait_exponential_max=WAIT_EXPONENTIAL_MAX_DEFAULT,
        stop_max_attempt_number=tries,
        retry_on_exception=_retry_on_exception(exceptions))(f)()
django_system_test.py 文件源码 项目:opencensus-python 作者: census-instrumentation 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_django_request_trace(self):
        requests.get(
            BASE_URL,
            headers=self.headers_trace)

        @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT)
        def test_with_retry(self):
            trace = self.client.get_trace(trace_id=self.trace_id)
            spans = trace.get('spans')

            self.assertEqual(trace.get('projectId'), PROJECT)
            self.assertEqual(trace.get('traceId'), str(self.trace_id))
            self.assertEqual(len(spans), 1)

            for span in spans:
                labels = span.get('labels')
                self.assertEqual(labels.get('/http/status_code'), '200')

        test_with_retry(self)
django_system_test.py 文件源码 项目:opencensus-python 作者: census-instrumentation 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_sqlalchemy_mysql_trace(self):
        requests.get(
            '{}sqlalchemy_mysql'.format(BASE_URL),
            headers=self.headers_trace)

        @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT)
        def test_with_retry(self):
            trace = self.client.get_trace(trace_id=self.trace_id)
            spans = trace.get('spans')

            self.assertEqual(trace.get('projectId'), PROJECT)
            self.assertEqual(trace.get('traceId'), str(self.trace_id))
            self.assertNotEqual(len(trace.get('spans')), 0)

            request_succeeded = False

            for span in spans:
                labels = span.get('labels')
                if '/http/status_code' in labels.keys():
                    self.assertEqual(labels.get('/http/status_code'), '200')
                    request_succeeded = True

            self.assertTrue(request_succeeded)

        test_with_retry(self)
flask_system_test.py 文件源码 项目:opencensus-python 作者: census-instrumentation 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_flask_request_trace(self):
        requests.get(
            BASE_URL,
            headers=self.headers_trace)

        @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT)
        def test_with_retry(self):
            trace = self.client.get_trace(trace_id=self.trace_id)
            spans = trace.get('spans')

            self.assertEqual(trace.get('projectId'), PROJECT)
            self.assertEqual(trace.get('traceId'), str(self.trace_id))
            self.assertEqual(len(spans), 1)

            for span in spans:
                labels = span.get('labels')
                self.assertEqual(labels.get('/http/status_code'), '200')

        test_with_retry(self)
flask_system_test.py 文件源码 项目:opencensus-python 作者: census-instrumentation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_sqlalchemy_mysql_trace(self):
        requests.get(
            '{}sqlalchemy-mysql'.format(BASE_URL),
            headers=self.headers_trace)

        @retry(wait_fixed=RETRY_WAIT_PERIOD, stop_max_attempt_number=RETRY_MAX_ATTEMPT)
        def test_with_retry(self):
            trace = self.client.get_trace(trace_id=self.trace_id)
            spans = trace.get('spans')

            self.assertEqual(trace.get('projectId'), PROJECT)
            self.assertEqual(trace.get('traceId'), str(self.trace_id))
            self.assertNotEqual(len(spans), 0)

            request_succeeded = False

            for span in spans:
                labels = span.get('labels')
                if '/http/status_code' in labels.keys():
                    self.assertEqual(labels.get('/http/status_code'), '200')
                    request_succeeded = True

            self.assertTrue(request_succeeded)

        test_with_retry(self)
helpers.py 文件源码 项目:winnaker 作者: target 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_body_text(driver,
        exponential_multiplier=cfg_wait_exponential_multiplier,
        exponential_max=cfg_wait_exponential_max,
        stop_max_attempt=cfg_retry_stop_max_attempt):
    @retry(
        wait_exponential_multiplier=exponential_multiplier,
        wait_exponential_max=exponential_max,
        stop_max_attempt_number=stop_max_attempt)
    def _get_body_text(driver):
        try:
            e = wait_for_xpath_presence(driver, "//body")
        except StaleElementReferenceException:
            a_nice_refresh(driver)
            e = wait_for_xpath_presence(driver, "//*")
            raise StaleElementReferenceException
        return e.get_attribute("outerHTML")
    return _get_body_text(driver)


# Subbornly clicks on the elements which run away from the DOM
command.py 文件源码 项目:Distrpy 作者: j0e1in 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _ensure_cluster_status_set(t):
    m = t.talk_raw(CMD_INFO)
    logging.debug('Ask `info` Rsp %s', m)
    cluster_enabled = PAT_CLUSTER_ENABLED.findall(m)
    if len(cluster_enabled) == 0 or int(cluster_enabled[0]) == 0:
        raise hiredis.ProtocolError(
            'Node %s:%d is not cluster enabled' % (t.host, t.port))

    m = t.talk_raw(CMD_CLUSTER_INFO)
    logging.debug('Ask `cluster info` Rsp %s', m)
    cluster_state = PAT_CLUSTER_STATE.findall(m)
    cluster_slot_assigned = PAT_CLUSTER_SLOT_ASSIGNED.findall(m)
    if cluster_state[0] != 'ok' and int(cluster_slot_assigned[0]) == 0:
        raise hiredis.ProtocolError(
            'Node %s:%d is not in a cluster' % (t.host, t.port))


# Redis instance responses to clients BEFORE changing its 'cluster_state'
#   just retry some times, it should become OK
fixtures.py 文件源码 项目:kibana-docker 作者: elastic 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def kibana(host):
    class Kibana(object):
        def __init__(self):
            self.url = 'http://localhost:5601'
            self.process = host.process.get(comm='node')
            self.image_flavor = config.getoption('--image-flavor')
            self.environment = dict(
                [line.split('=', 1) for line in self.stdout_of('env').split('\n')]
            )

        @retry(**retry_settings)
        def get(self, location='/', allow_redirects=True):
            """GET a page from Kibana."""
            url = urllib.parse.urljoin(self.url, location)
            return requests.get(url)

        def stdout_of(self, command):
            return host.run(command).stdout.strip()

    return Kibana()
test_retrying.py 文件源码 项目:deb-python-retrying 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_before_attempts(self):
        TestBeforeAfterAttempts._attempt_number = 0

        def _before(attempt_number):
            TestBeforeAfterAttempts._attempt_number = attempt_number

        @retry(wait_fixed = 1000, stop_max_attempt_number = 1, before_attempts = _before)
        def _test_before():
            pass

        _test_before()

        self.assertTrue(TestBeforeAfterAttempts._attempt_number is 1)
boto3_retry.py 文件源码 项目:boto3-retry 作者: timmartin19 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def retry_boto(max_retries=5, retry_wait_time=2000, retryable_error_codes=None, retryable_status_codes=None, retryable_exceptions=None):
    def decorator(func):
        @functools.wraps(func)
        @retry(stop_max_attempt_number=max_retries,
               wait_exponential_multiplier=retry_wait_time,
               retry_on_exception=lambda exc: isinstance(exc, Boto3RetryableException))
        def wrapper(*args, **kwargs):
            return _call_boto_func(func, *args,
                                   retryable_status_codes=retryable_status_codes,
                                   retryable_exceptions=retryable_exceptions,
                                   retryable_codes=retryable_error_codes,
                                   **kwargs)
        return wrapper
    return decorator
dsub_util.py 文件源码 项目:dsub 作者: googlegenomics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _retry_download_check(exception):
  """Return True if we should retry, False otherwise"""
  print_error('Exception during download: %s' % str(exception))
  return isinstance(exception, oauth2client.client.HttpAccessTokenRefreshError)


# Exponential backoff retrying downloads of GCS object chunks.
# Maximum 23 retries.
# Wait 1, 2, 4 ... 64, 64, 64... seconds.
base.py 文件源码 项目:threatstack-python-client 作者: MyPureCloud 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retry_on_429(exc):
    """ Used to trigger retry on rate limit """
    return isinstance(exc, errors.APIRateLimitError)
download.py 文件源码 项目:Planet-Pipeline-GUI 作者: samapriya 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retry_if_rate_limit_error(exception):
    """Return True if we should retry (in this case when it's a rate_limit
    error), False otherwise"""
    return isinstance(exception, RateLimitException)
iam_service.py 文件源码 项目:keyrotator 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def retry_if_500_error(exception):
  """Allow retry if we get a 500 error from IAM API."""
  logging.info("Received %s, retrying...", exception)
  return (isinstance(exception, errors.HttpError)
          and exception.resp.status >= 500
          and exception.resp.status < 600)
utils.py 文件源码 项目:reseg 作者: fvisin 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def retry_if_io_error(exception):
    """Return True if IOError.

    Return True if we should retry (in this case when it's an IOError),
    False otherwise.
    """
    print "Filesystem error, retrying in 2 seconds..."
    return isinstance(exception, IOError)
download.py 文件源码 项目:Planet-GEE-Pipeline-GUI 作者: samapriya 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def retry_if_rate_limit_error(exception):
    """Return True if we should retry (in this case when it's a rate_limit
    error), False otherwise"""
    return isinstance(exception, RateLimitException)
device_telemetry.py 文件源码 项目:aos-pyez 作者: Apstra 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self):
        @retrying.retry(wait_fixed=1000, stop_max_delay=5000)
        def get_status():
            self.collection.digest()
            me = self.collection[self.name]
            assert me.value['status']
            return me.value

        self.datum = get_status()
        return self
devices.py 文件源码 项目:aos-pyez 作者: Apstra 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def update(self, device_keys):
        has_devices = self.get_devices()

        has_ids = set([dev['id'] for dev in has_devices])
        should_ids = has_ids | set(device_keys)
        diff_ids = has_ids ^ should_ids

        if not diff_ids:
            return   # nothing to add

        # need to append to what's already in the pool,
        # since this is a PUT action

        for new_id in diff_ids:
            has_devices.append(dict(id=new_id))

        timeout = 3000

        @retrying.retry(wait_fixed=1000, stop_max_delay=timeout)
        def put_updated():
            got = self.api.requests.put(
                self.url, json=dict(display_name='Default Pool',
                                    devices=has_devices))

            if not got.ok:
                raise SessionRqstError(
                    message='unable to update approved list: %s' % got.text,
                    resp=got)

        put_updated()
download.py 文件源码 项目:Planet-GEE-Pipeline-CLI 作者: samapriya 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def retry_if_rate_limit_error(exception):
    """Return True if we should retry (in this case when it's a rate_limit
    error), False otherwise"""
    return isinstance(exception, RateLimitException)
ebs_start.py 文件源码 项目:ebs-reattach 作者: metamx 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def retry_if_value_error(exception):
    """Return True if we should retry (in this case when it's an IOError), False otherwise"""
    return isinstance(exception, ValueError)


问题


面经


文章

微信
公众号

扫码关注公众号