python类monotonic()的实例源码

util.py 文件源码 项目:uchroma 作者: cyanogen 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def smart_delay(delay: float, last_cmd: float, remain: int=0) -> float:
    """
    A "smart" delay mechanism which tries to reduce the
    delay as much as possible based on the time the last
    delay happened.

    :param delay: delay in seconds
    :param last_cmd: time of last command
    :param remain: counter, skip delay unless it's zero

    :return: timestamp to feed to next invocation
    """
    now = time.monotonic()

    if remain == 0 and last_cmd is not None and delay > 0.0:

        delta = now - last_cmd
        if delta < delay:
            sleep = delay - delta
            time.sleep(sleep)

    return now
thread_util.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
command.py 文件源码 项目:runcommands 作者: wylee 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def run(self, run_config, argv, **kwargs):
        if self.timed:
            start_time = time.monotonic()

        run_env = self.get_run_env(run_config.env, run_config.default_env)
        run_config = run_config.copy(env=run_env)

        config = Config(run=run_config, **self.config.copy())

        all_args = self.parse_args(config, argv)
        all_args.update(kwargs)
        result = self(config, **all_args)

        if self.timed:
            hide = kwargs.get('hide', config.run.hide)
            if not Hide.hide_stdout(hide):
                self.print_elapsed_time(time.monotonic() - start_time)

        return result
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
thread_util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
subscriber.py 文件源码 项目:gui_tool 作者: UAVCAN 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def register_event(self, timestamp):
        self._events_since_checkpoint += 1

        dt = timestamp - self._checkpoint_ts
        if dt >= self._update_interval:
            # Resetting the stat if expired
            mono_ts = time.monotonic()
            expired = mono_ts > self._estimate_expires_at
            self._estimate_expires_at = mono_ts + self._estimate_lifetime
            if expired:
                self._hist = []
            elif len(self._hist) >= self._averaging_period:
                self._hist.pop()
            # Updating the history
            self._hist.insert(0, self._events_since_checkpoint / dt)
            self._checkpoint_ts = timestamp
            self._events_since_checkpoint = 0
base_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self):
        self._timer_cancelled_count = 0
        self._closed = False
        self._ready = collections.deque()
        self._scheduled = []
        self._default_executor = None
        self._internal_fds = 0
        # Identifier of the thread running the event loop, or None if the
        # event loop is not running
        self._thread_id = None
        self._clock_resolution = time.get_clock_info('monotonic').resolution
        self._exception_handler = None
        self._debug = (not sys.flags.ignore_environment
                       and bool(os.environ.get('PYTHONASYNCIODEBUG')))
        # In debug mode, if the execution of a callback or a step of a task
        # exceed this duration in seconds, the slow callback/task is logged.
        self.slow_callback_duration = 0.1
        self._current_handle = None
test_base_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test__run_once(self):
        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
                                 self.loop)
        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
                                 self.loop)

        h1.cancel()

        self.loop._process_events = mock.Mock()
        self.loop._scheduled.append(h1)
        self.loop._scheduled.append(h2)
        self.loop._run_once()

        t = self.loop._selector.select.call_args[0][0]
        self.assertTrue(9.5 < t < 10.5, t)
        self.assertEqual([h2], self.loop._scheduled)
        self.assertTrue(self.loop._process_events.called)
test_base_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test__run_once_schedule_handle(self):
        handle = None
        processed = False

        def cb(loop):
            nonlocal processed, handle
            processed = True
            handle = loop.call_soon(lambda: True)

        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
                                self.loop)

        self.loop._process_events = mock.Mock()
        self.loop._scheduled.append(h)
        self.loop._run_once()

        self.assertTrue(processed)
        self.assertEqual([handle], list(self.loop._ready))
test_selectors.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_timeout(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        s.register(wr, selectors.EVENT_WRITE)
        t = time()
        self.assertEqual(1, len(s.select(0)))
        self.assertEqual(1, len(s.select(-1)))
        self.assertLess(time() - t, 0.5)

        s.unregister(wr)
        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(0))
        self.assertFalse(s.select(-1))
        self.assertLess(time() - t, 0.5)

        t0 = time()
        self.assertFalse(s.select(1))
        t1 = time()
        dt = t1 - t0
        # Tolerate 2.0 seconds for very slow buildbots
        self.assertTrue(0.8 <= dt <= 2.0, dt)
test_selectors.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_select_interrupt(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
        self.addCleanup(signal.alarm, 0)

        signal.alarm(1)

        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(2))
        self.assertLess(time() - t, 2.5)
utils.py 文件源码 项目:git-annex-metadata-gui 作者: alpernebbi 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __call__(self, instance=None):
        if instance is not None and instance is not self._instance:
            fmt = "Instance mismatch on autoconsumer {}, ({} vs {})."
            msg = fmt.format(
                self._function.__name__,
                instance, self._instance,
            )
            logger.critical(msg)

        if self._generator is None:
            return

        try:
            endtime = time.monotonic() + self._timeout
            while time.monotonic() < endtime:
                next(self._generator)

        except StopIteration:
            self._generator = None

        else:
            QtCore.QMetaObject.invokeMethod(
                self._instance, self._function.__name__,
                Qt.Qt.QueuedConnection,
            )
cog_info.py 文件源码 项目:Godavaru 作者: Godavaru 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def ping(self, ctx):
        """Check my response time and my websocket ping

        **Usage:** `g_ping`

        **Permission:** User"""
        console = self.bot.get_channel(316688736089800715)
        before = datetime.datetime.utcnow()
        ping_msg = await console.send(":mega: **Pinging...**")
        ping = (datetime.datetime.utcnow() - before) * 1000
        before2 = time.monotonic()
        await (await self.bot.ws.ping())
        after = time.monotonic()
        ping2 = (after - before2) * 1000
        var = int(random.random() * 5)
        v = ["a", "e", "i", "o", "u"]
        await ping_msg.edit(content=':warning: [`'+str(datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))+'`] `' + ctx.message.author.name + '#' + ctx.message.author.discriminator + '` checked my ping in the channel `' + ctx.message.channel.name + '` in the server `' + ctx.message.guild.name + '`. The result was {:.2f}ms'.format(ping.total_seconds())+" with a websocket ping of {0:.0f}ms".format(ping2))
        await ctx.send(":mega: P"+v[var]+"ng! The message took **{:.0f}ms**!".format(ping.total_seconds())+" `Websocket: {0:.0f}ms` :thinking:".format(ping2))
test_ratelimits.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_window_decorator_timing():
    from time import monotonic

    limiter = FixedWindowRateLimiter(SECONDS, PERMITS)

    @limiter.limit
    def call():
        return monotonic()

    times = []
    for _ in range(VALUE_COUNT):
        times.append(call())

    start_indexes = [i for i in range(VALUE_COUNT) if i % PERMITS == 0]

    last = -SECONDS
    for index in start_indexes:
        assert times[index] - last >= SECONDS - EPSILON
        last = times[index]
test_ratelimits.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_window_decorator_across_windows():
    from time import monotonic, sleep

    limiter = FixedWindowRateLimiter(SECONDS, 1)

    @limiter.limit
    def call():
        t = monotonic()
        sleep(SECONDS * 1.25)
        return t

    # This should take up two fixed windows for the first task, and the second shouldn't execute until the third (after 2 x SECONDS).
    first = call()
    second = call()

    assert (SECONDS - EPSILON) * 3 >= second - first >= (SECONDS - EPSILON) * 2

##########################
# TokenBucketRateLimiter #
##########################
test_ratelimits.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_bucket_acquire_timing():
    from time import monotonic

    limiter = TokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
    times = []
    for _ in range(VALUE_COUNT):
        with limiter:
            times.append(monotonic())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = []
    for i in range(BURST, len(times)):
        expected_times.append(first_time + (i - BURST + 1) * frequency)

    # Shouldn't wait for BURST calls
    for i in range(BURST - 1):
        assert times[i + 1] - times[i] < frequency - EPSILON

    # After burst, the rest should fall at about the expected frequency
    times = times[BURST:]
    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
test_ratelimits.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_windowed_bucket_acquire_timing():
    from time import monotonic

    limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
    times = []
    for _ in range(VALUE_COUNT):
        with limiter:
            times.append(monotonic())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = [first_time]
    for i in range(1, len(times)):
        expected_times.append(first_time + i * frequency)

    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
test_ratelimits.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_windowed_bucket_decorator_timing():
    from time import monotonic

    limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)

    @limiter.limit
    def call():
        return monotonic()

    times = []
    for _ in range(VALUE_COUNT):
        times.append(call())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = [first_time]
    for i in range(1, len(times)):
        expected_times.append(first_time + i * frequency)

    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
overseer.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def try_point(self, point, spawn_time=None, spawn_id=None):
        try:
            point = randomize_point(point)
            skip_time = monotonic() + (conf.GIVE_UP_KNOWN if spawn_time else conf.GIVE_UP_UNKNOWN)
            worker = await self.best_worker(point, skip_time)
            if not worker:
                if spawn_time:
                    self.skipped += 1
                return
            async with worker.busy:
                if spawn_time:
                    worker.after_spawn = time() - spawn_time

                if await worker.visit(point, spawn_id):
                    self.visits += 1
        except CancelledError:
            raise
        except Exception:
            self.log.exception('An exception occurred in try_point')
        finally:
            self.coroutine_semaphore.release()
overseer.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def best_worker(self, point, skip_time):
        good_enough = conf.GOOD_ENOUGH
        while self.running:
            gen = (w for w in self.workers if not w.busy.locked())
            try:
                worker = next(gen)
                lowest_speed = worker.travel_speed(point)
            except StopIteration:
                lowest_speed = float('inf')
            for w in gen:
                speed = w.travel_speed(point)
                if speed < lowest_speed:
                    lowest_speed = speed
                    worker = w
                    if speed < good_enough:
                        break
            if lowest_speed < conf.SPEED_LIMIT:
                worker.speed = lowest_speed
                return worker
            if skip_time and monotonic() > skip_time:
                return None
            await sleep(conf.SEARCH_SLEEP, loop=LOOP)
notification.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self):
        self.cache = NotificationCache()
        self.notify_ranking = conf.NOTIFY_RANKING
        self.initial_score = conf.INITIAL_SCORE
        self.minimum_score = conf.MINIMUM_SCORE
        self.last_notification = monotonic() - (conf.FULL_TIME / 2)
        self.always_notify = []
        self.log = get_logger('notifier')
        self.never_notify = conf.NEVER_NOTIFY_IDS
        self.rarity_override = conf.RARITY_OVERRIDE
        self.sent = 0
        if self.notify_ranking:
            self.initialize_ranking()
            LOOP.call_later(3600, self.set_notify_ids)
        elif conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS:
            self.notify_ids = conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS
            self.always_notify = conf.ALWAYS_NOTIFY_IDS
            self.notify_ranking = len(self.notify_ids)
mysql.py 文件源码 项目:deb-python-greenio 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def db():
        conn = GreenConnection(host='localhost')

        try:
            with conn as cur:
                print('>> sleeping')
                st = time.monotonic()
                cur.execute('SELECT SLEEP(2)')
                en = time.monotonic() - st
                assert en >= 2
                print('<< sleeping {:.3f}s'.format(en))

                cur.execute('SELECT 42')
                print('"SELECT 42" -> {!r}'.format(cur.fetchone()))

                print('>> sleeping')
                st = time.monotonic()
                cur.execute('SELECT SLEEP(1)')
                en = time.monotonic() - st
                assert en >= 1
                print('<< sleeping {:.3f}s'.format(en))
        finally:
            conn.close()
optimize.py 文件源码 项目:exchangelib 作者: ecederstrand 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test(items):
    t1 = time.monotonic()
    ids = account.calendar.bulk_create(items=items)
    t2 = time.monotonic()
    account.bulk_delete(ids)
    t3 = time.monotonic()

    delta1 = t2 - t1
    rate1 = len(ids) / delta1
    delta2 = t3 - t2
    rate2 = len(ids) / delta2
    print(('Time to process %s items (batchsize %s/%s, poolsize %s): %s / %s (%s / %s per sec)' % (
        len(ids), services.CreateItem.CHUNKSIZE, services.DeleteItem.CHUNKSIZE,
        config.protocol.poolsize, delta1, delta2, rate1, rate2)))


# Generate items
_mock_clock.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, rate=0.0, autojump_threshold=inf):
        # when the real clock said 'real_base', the virtual time was
        # 'virtual_base', and since then it's advanced at 'rate' virtual
        # seconds per real second.
        self._real_base = 0.0
        self._virtual_base = 0.0
        self._rate = 0.0
        self._autojump_threshold = 0.0
        self._autojump_task = None
        self._autojump_cancel_scope = None
        # kept as an attribute so that our tests can monkeypatch it
        self._real_clock = time.monotonic

        # use the property update logic to set initial values
        self.rate = rate
        self.autojump_threshold = autojump_threshold
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_timekeeping():
    # probably a good idea to use a real clock for *one* test anyway...
    TARGET = 0.1
    # give it a few tries in case of random CI server flakiness
    for _ in range(4):
        real_start = time.monotonic()
        with _core.open_cancel_scope() as scope:
            scope.deadline = _core.current_time() + TARGET
            await sleep_forever()
        real_duration = time.monotonic() - real_start
        accuracy = real_duration / TARGET
        print(accuracy)
        # Actual time elapsed should always be >= target time
        # (== is possible because time.monotonic on Windows is really low res)
        if 1.0 <= accuracy < 2:  # pragma: no branch
            break
    else:  # pragma: no cover
        assert False
pgbench_python.py 文件源码 项目:pgbench 作者: MagicStack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def worker(executor, eargs, start, duration, timeout):
    queries = 0
    rows = 0
    latency_stats = np.zeros((timeout * 100,))
    min_latency = float('inf')
    max_latency = 0.0

    while time.monotonic() - start < duration:
        req_start = time.monotonic()
        rows += await executor(*eargs)
        req_time = round((time.monotonic() - req_start) * 1000 * 100)

        if req_time > max_latency:
            max_latency = req_time
        if req_time < min_latency:
            min_latency = req_time
        latency_stats[req_time] += 1
        queries += 1

    return queries, rows, latency_stats, min_latency, max_latency
pgbench_python.py 文件源码 项目:pgbench 作者: MagicStack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def sync_worker(executor, eargs, start, duration, timeout):
    queries = 0
    rows = 0
    latency_stats = np.zeros((timeout * 100,))
    min_latency = float('inf')
    max_latency = 0.0

    while time.monotonic() - start < duration:
        req_start = time.monotonic()
        rows += executor(*eargs)
        req_time = round((time.monotonic() - req_start) * 1000 * 100)

        if req_time > max_latency:
            max_latency = req_time
        if req_time < min_latency:
            min_latency = req_time
        latency_stats[req_time] += 1
        queries += 1

    return queries, rows, latency_stats, min_latency, max_latency
rabbitmq_client.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def callback(self, ch, method, properties, body):
        """
        ????,????????????rabbitmq???
        :param ch:  ???self.channel
        :param method:
        :param properties:???????????
        :param body:????
        :return:
        """
        before = time.monotonic()  # ?????????????
        exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
        exec_cmd.start()
        exec_cmd.join(self.timeout)
        after = time.monotonic()  # ????????????,????????????
        if (after - before) > self.timeout:  # ????????????????,??????????,???????????
            self.response = bytes("command exec timeout", "utf8")
        print(" [*] Got a task {}".format(str(body, "utf8)")))
        message = {"host": self.id, "data": self.response}
        ch.basic_publish(exchange="",
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=properties.correlation_id,),
                         body=bytes(str(message), "utf-8"))
        ch.basic_ack(delivery_tag=method.delivery_tag)


问题


面经


文章

微信
公众号

扫码关注公众号