python类retry()的实例源码

ebs_start.py 文件源码 项目:ebs-reattach 作者: metamx 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_volume(context):
    logging.debug("Creating volume")
    _create_volume = retry(wait_exponential_multiplier=1000, stop_max_delay=60000,
                           retry_on_exception=retry_if_throttled)(context["ec2_connection"].create_volume)
    volume = _create_volume(context["ebs"]["size"], context["az"], volume_type=context["ebs"]["type"])
    logging.info("Created volume {0}".format(volume.id))
    wait_for_volume_state(volume, "available")
    return volume
utils.py 文件源码 项目:som 作者: vsoch 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def stop_if_result_none(result):
    '''stop if result none will return True if we should not retry 
    when result is none, False otherwise using retrying python package
    '''
    do_retry = result is not None
    return do_retry


# Simple default retrying for calls to api
fabfile.py 文件源码 项目:Railtrack 作者: JeevesTakesOver 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def vagrant_up_with_retry(vagrant_vm):
    """ vagrant up and retry on errorx """
    cmd = 'vagrant up %s --no-provision' % vagrant_vm
    process = Popen(shlex.split(cmd), stdout=PIPE)
    process.communicate()
    exit_code = process.wait()
    return exit_code
fabfile.py 文件源码 项目:Railtrack 作者: JeevesTakesOver 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def vagrant_run_with_retry(vagrant_vm, command):
    """ vagrant run and retry on errorx """
    local('vagrant ssh %s -- %s' % (vagrant_vm, command))
fabfile.py 文件源码 项目:Railtrack 作者: JeevesTakesOver 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def vagrant_halt_with_retry(vagrant_vm):
    """ vagrant halt and retry on errorx """
    cmd = 'vagrant halt %s' % vagrant_vm
    process = Popen(shlex.split(cmd), stdout=PIPE)
    process.communicate()
    exit_code = process.wait()
    return exit_code
fabfile.py 文件源码 项目:Railtrack 作者: JeevesTakesOver 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def vagrant_provision_with_retry(vagrant_vm):
    """ vagrant provision and retry on errorx """
    cmd = 'vagrant provision %s' % vagrant_vm
    process = Popen(shlex.split(cmd), stdout=PIPE)
    process.communicate()
    exit_code = process.wait()
    return exit_code
test_executor.py 文件源码 项目:incubator-ariatosca 作者: apache 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute_and_assert(executor, storage=None):
    expected_value = 'value'
    successful_ctx = MockContext(
        storage, task_kwargs=dict(function=_get_function(mock_successful_task))
    )
    failing_ctx = MockContext(
        storage, task_kwargs=dict(function=_get_function(mock_failing_task))
    )
    task_with_inputs_ctx = MockContext(
        storage,
        task_kwargs=dict(function=_get_function(mock_task_with_input),
                         arguments={'input': models.Argument.wrap('input', 'value')})
    )

    for ctx in [successful_ctx, failing_ctx, task_with_inputs_ctx]:
        executor.execute(ctx)

    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
    def assertion():
        assert successful_ctx.states == ['start', 'success']
        assert failing_ctx.states == ['start', 'failure']
        assert task_with_inputs_ctx.states == ['start', 'failure']
        assert isinstance(failing_ctx.exception, MockException)
        assert isinstance(task_with_inputs_ctx.exception, MockException)
        assert task_with_inputs_ctx.exception.message == expected_value
    assertion()
test_integration.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run(self, count):
        # Get unique number of available TCP ports on the system
        sshd_ports = []
        for try_port in random.sample(range(10000, 11000), count):
            # If the port is already in use, skip it.
            while can_connect(try_port):
                try_port += 1
            sshd_ports.append(try_port)

        # Run sshd servers in parallel, cleaning up when the yield returns.
        subprocesses = []
        for port in sshd_ports:
            subprocesses.append(subprocess.Popen(
                ['/usr/sbin/sshd', '-p{}'.format(port), '-f{}'.format(self.sshd_config_path), '-e', '-D'],
                cwd=str(self.tmpdir)))

        # Wait for the ssh servers to come up
        @retry(stop_max_delay=1000, retry_on_result=lambda x: x is False)
        def check_server(port):
            return can_connect(port)

        for port in sshd_ports:
            check_server(port)

        yield sshd_ports

        # Stop all the subproceses. They are ephemeral temporary SSH connections, no point in being nice
        # with SIGTERM.
        for s in subprocesses:
            s.kill()
test_endpoints.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_if_dcos_history_service_is_getting_data(dcos_api_session):
    @retry(stop_max_delay=20000, wait_fixed=500)
    def check_up():
        r = dcos_api_session.get('/dcos-history-service/history/last')
        assert r.status_code == 200
        # Make sure some basic fields are present from state-summary which the DC/OS
        # UI relies upon. Their exact content could vary so don't test the value.
        json = r.json()
        assert {'cluster', 'frameworks', 'slaves', 'hostname'} <= json.keys()
        assert len(json["slaves"]) == len(dcos_api_session.all_slaves)

    check_up()
basic.py 文件源码 项目:jenkins-charm 作者: jenkinsci 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_10_change_plugins(self):
        """Validate that plugins get updated after a config change."""
        plugins = "groovy greenballs"
        charm_name = self.spec.deployment.charm_name
        self.spec.deployment.configure(charm_name, {"plugins": plugins})
        self.spec.deployment.sentry.wait()

        @retry(stop_max_attempt_number=10, wait_fixed=1000)
        def assert_plugins():
            plugins = self.spec.plugins_list()
            self.assertIn("groovy", plugins, "Failed to locate groovy")
            self.assertIn("greenballs", plugins, "Failed to locate greenballs")

        assert_plugins()
webelement.py 文件源码 项目:wd.py 作者: macacajs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def wait_for(
        self, timeout=10000, interval=1000,
        asserter=lambda x: x):
        """Wait for element till given condition

        Support:
            Android iOS Web(WebView)

        Args:
            timeout(int): How long we should be retrying stuff.
            interval(int): How long between retries.
            asserter(callable): The asserter func to determine the result.

        Returns:
            Return the Element.

        Raises:
            WebDriverException.
        """
        if not callable(asserter):
            raise TypeError('Asserter must be callable.')
        @retry(
            retry_on_exception=lambda ex: isinstance(ex, WebDriverException),
            stop_max_delay=timeout,
            wait_fixed=interval
        )
        def _wait_for(el):
            asserter(el)
            return el

        return _wait_for(self)
webelement.py 文件源码 项目:wd.py 作者: macacajs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def wait_for_element(
        self, using, value, timeout=10000,
        interval=1000, asserter=is_displayed):
        """Wait for element till the given condition

        Support:
            Android iOS Web(WebView)

        Args:
            using(str): The element location strategy.
            value(str): The value of the location strategy.
            timeout(int): How long we should be retrying stuff.
            interval(int): How long between retries.
            asserter(callable): The asserter func to determine the result.

        Returns:
            Return the Element.

        Raises:
            WebDriverException.
        """
        if not callable(asserter):
            raise TypeError('Asserter must be callable.')
        @retry(
            retry_on_exception=lambda ex: isinstance(ex, WebDriverException),
            stop_max_delay=timeout,
            wait_fixed=interval
        )
        def _wait_for_element(ctx, using, value):
            el = ctx.element(using, value)
            asserter(el)
            return el

        return _wait_for_element(self, using, value)
webelement.py 文件源码 项目:wd.py 作者: macacajs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def wait_for_elements(
        self, using, value, timeout=10000,
        interval=1000, asserter=is_displayed):
        """Wait for elements till the given condition

        Support:
            Android iOS Web(WebView)

        Args:
            using(str): The element location strategy.
            value(str): The value of the location strategy.
            timeout(int): How long we should be retrying stuff.
            interval(int): How long between retries.
            asserter(callable): The asserter func to determine the result.

        Returns:
            Return the list of Element if any of them satisfy the condition.

        Raises:
            WebDriverException.
        """
        if not callable(asserter):
            raise TypeError('Asserter must be callable.')
        @retry(
            retry_on_exception=lambda ex: isinstance(ex, WebDriverException),
            stop_max_delay=timeout,
            wait_fixed=interval
        )
        def _wait_for_elements(ctx, using, value):
            els = ctx.elements(using, value)
            if not len(els):
                raise WebDriverException('no such element')
            else:
                el = els[0]
                asserter(el)
                return els

        return _wait_for_elements(self, using, value)
webdriver.py 文件源码 项目:wd.py 作者: macacajs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wait_for(
        self, timeout=10000, interval=1000,
        asserter=lambda x: x):
        """Wait for driver till satisfy the given condition

        Support:
            Android iOS Web(WebView)

        Args:
            timeout(int): How long we should be retrying stuff.
            interval(int): How long between retries.
            asserter(callable): The asserter func to determine the result.

        Returns:
            Return the driver.

        Raises:
            WebDriverException.
        """
        if not callable(asserter):
            raise TypeError('Asserter must be callable.')
        @retry(
            retry_on_exception=lambda ex: isinstance(ex, WebDriverException),
            stop_max_delay=timeout,
            wait_fixed=interval
        )
        def _wait_for(driver):
            asserter(driver)
            return driver

        return _wait_for(self)
webdriver.py 文件源码 项目:wd.py 作者: macacajs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wait_for_element(
        self, using, value, timeout=10000,
        interval=1000, asserter=is_displayed):
        """Wait for element till satisfy the given condition

        Support:
            Android iOS Web(WebView)

        Args:
            using(str): The element location strategy.
            value(str): The value of the location strategy.
            timeout(int): How long we should be retrying stuff.
            interval(int): How long between retries.
            asserter(callable): The asserter func to determine the result.

        Returns:
            Return the Element.

        Raises:
            WebDriverException.
        """
        if not callable(asserter):
            raise TypeError('Asserter must be callable.')
        @retry(
            retry_on_exception=lambda ex: isinstance(ex, WebDriverException),
            stop_max_delay=timeout,
            wait_fixed=interval
        )
        def _wait_for_element(ctx, using, value):
            el = ctx.element(using, value)
            asserter(el)
            return el

        return _wait_for_element(self, using, value)
orgmap.py 文件源码 项目:getorg 作者: getorg 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_geolocation(geocode_obj, loc):
    """
    Wrapper function around geopy's geocode function. Used for retry, which will
    run it at most 3 times to get a non-error return value. It will not retry if
    it successfully returns a value.
    """

    loc = geocode_obj.geocode(loc)
    return loc
eventually_consistent.py 文件源码 项目:python-repo-tools 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def mark(f):
    """Marks an entire test as eventually consistent and retries."""
    __tracebackhide__ = True
    return retry(
        wait_exponential_multiplier=WAIT_EXPONENTIAL_MULTIPLIER,
        wait_exponential_max=WAIT_EXPONENTIAL_MAX_DEFAULT,
        stop_max_attempt_number=STOP_MAX_ATTEMPT_NUMBER_DEFAULT,
        retry_on_exception=_retry_on_exception(
            (AssertionError, exceptions.GoogleCloudError)))(f)
mixins.py 文件源码 项目:python-membersuite-api-client 作者: AASHE 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_query(client, base_query, start_record, limit_to, verbose=False):
    """inline method to take advantage of retry"""
    if verbose:
        print("[start: %d limit: %d]" % (start_record, limit_to))
    start = datetime.datetime.now()
    result = client.runSQL(query=base_query,
                           start_record=start_record,
                           limit_to=limit_to)
    end = datetime.datetime.now()
    if verbose:
        print("[%s - %s]" % (start, end))
    return result
session.py 文件源码 项目:rucio 作者: rucio01 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read_session(function):
    '''
    decorator that set the session variable to use inside a function.
    With that decorator it's possible to use the session variable like if a global variable session is declared.

    session is a sqlalchemy session, and you can get one calling get_session().
    This is useful if only SELECTs and the like are being done; anything involving
    INSERTs, UPDATEs etc should use transactional_session.
    '''
    @retry(retry_on_exception=retry_if_db_connection_error,
           wait_fixed=0.5,
           stop_max_attempt_number=2,
           wrap_exception=False)
    @wraps(function)
    def new_funct(*args, **kwargs):

        if isgeneratorfunction(function):
            raise RucioException('read_session decorator should not be used with generator. Use stream_session instead.')

        if not kwargs.get('session'):
            session = get_session()
            try:
                kwargs['session'] = session
                return function(*args, **kwargs)
            except TimeoutError, error:
                session.rollback()  # pylint: disable=maybe-no-member
                raise DatabaseException(str(error))
            except DatabaseError, error:
                session.rollback()  # pylint: disable=maybe-no-member
                raise DatabaseException(str(error))
            except:
                session.rollback()  # pylint: disable=maybe-no-member
                raise
            finally:
                session.remove()
        try:
            return function(*args, **kwargs)
        except:
            raise
    new_funct.__doc__ = function.__doc__
    return new_funct
apc.py 文件源码 项目:pynoc 作者: SimplicityGuy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sensor_supports_humidity(self):
        """Determine if the sensor supports relative humidity measurements.

        :return: does the sensor support relative humidity measurements?
        """
        return self.is_sensor_present and 'humid' in self.sensor_type.lower()

    # pylint: disable=no-self-argument
    # In order to use this method within the @retry decorator, this method
    # must be defined as such.


问题


面经


文章

微信
公众号

扫码关注公众号