def smart_delay(delay: float, last_cmd: float, remain: int=0) -> float:
"""
A "smart" delay mechanism which tries to reduce the
delay as much as possible based on the time the last
delay happened.
:param delay: delay in seconds
:param last_cmd: time of last command
:param remain: counter, skip delay unless it's zero
:return: timestamp to feed to next invocation
"""
now = time.monotonic()
if remain == 0 and last_cmd is not None and delay > 0.0:
delta = now - last_cmd
if delta < delay:
sleep = delay - delta
time.sleep(sleep)
return now
python类monotonic()的实例源码
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def run(self, run_config, argv, **kwargs):
if self.timed:
start_time = time.monotonic()
run_env = self.get_run_env(run_config.env, run_config.default_env)
run_config = run_config.copy(env=run_env)
config = Config(run=run_config, **self.config.copy())
all_args = self.parse_args(config, argv)
all_args.update(kwargs)
result = self(config, **all_args)
if self.timed:
hide = kwargs.get('hide', config.run.hide)
if not Hide.hide_stdout(hide):
self.print_elapsed_time(time.monotonic() - start_time)
return result
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def register_event(self, timestamp):
self._events_since_checkpoint += 1
dt = timestamp - self._checkpoint_ts
if dt >= self._update_interval:
# Resetting the stat if expired
mono_ts = time.monotonic()
expired = mono_ts > self._estimate_expires_at
self._estimate_expires_at = mono_ts + self._estimate_lifetime
if expired:
self._hist = []
elif len(self._hist) >= self._averaging_period:
self._hist.pop()
# Updating the history
self._hist.insert(0, self._events_since_checkpoint / dt)
self._checkpoint_ts = timestamp
self._events_since_checkpoint = 0
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
# Identifier of the thread running the event loop, or None if the
# event loop is not running
self._thread_id = None
self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None
self._debug = (not sys.flags.ignore_environment
and bool(os.environ.get('PYTHONASYNCIODEBUG')))
# In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1
self._current_handle = None
def test__run_once(self):
h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
self.loop)
h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
self.loop)
h1.cancel()
self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h1)
self.loop._scheduled.append(h2)
self.loop._run_once()
t = self.loop._selector.select.call_args[0][0]
self.assertTrue(9.5 < t < 10.5, t)
self.assertEqual([h2], self.loop._scheduled)
self.assertTrue(self.loop._process_events.called)
def test__run_once_schedule_handle(self):
handle = None
processed = False
def cb(loop):
nonlocal processed, handle
processed = True
handle = loop.call_soon(lambda: True)
h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
self.loop)
self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._run_once()
self.assertTrue(processed)
self.assertEqual([handle], list(self.loop._ready))
def test_timeout(self):
s = self.SELECTOR()
self.addCleanup(s.close)
rd, wr = self.make_socketpair()
s.register(wr, selectors.EVENT_WRITE)
t = time()
self.assertEqual(1, len(s.select(0)))
self.assertEqual(1, len(s.select(-1)))
self.assertLess(time() - t, 0.5)
s.unregister(wr)
s.register(rd, selectors.EVENT_READ)
t = time()
self.assertFalse(s.select(0))
self.assertFalse(s.select(-1))
self.assertLess(time() - t, 0.5)
t0 = time()
self.assertFalse(s.select(1))
t1 = time()
dt = t1 - t0
# Tolerate 2.0 seconds for very slow buildbots
self.assertTrue(0.8 <= dt <= 2.0, dt)
def test_select_interrupt(self):
s = self.SELECTOR()
self.addCleanup(s.close)
rd, wr = self.make_socketpair()
orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
self.addCleanup(signal.alarm, 0)
signal.alarm(1)
s.register(rd, selectors.EVENT_READ)
t = time()
self.assertFalse(s.select(2))
self.assertLess(time() - t, 2.5)
def __call__(self, instance=None):
if instance is not None and instance is not self._instance:
fmt = "Instance mismatch on autoconsumer {}, ({} vs {})."
msg = fmt.format(
self._function.__name__,
instance, self._instance,
)
logger.critical(msg)
if self._generator is None:
return
try:
endtime = time.monotonic() + self._timeout
while time.monotonic() < endtime:
next(self._generator)
except StopIteration:
self._generator = None
else:
QtCore.QMetaObject.invokeMethod(
self._instance, self._function.__name__,
Qt.Qt.QueuedConnection,
)
def ping(self, ctx):
"""Check my response time and my websocket ping
**Usage:** `g_ping`
**Permission:** User"""
console = self.bot.get_channel(316688736089800715)
before = datetime.datetime.utcnow()
ping_msg = await console.send(":mega: **Pinging...**")
ping = (datetime.datetime.utcnow() - before) * 1000
before2 = time.monotonic()
await (await self.bot.ws.ping())
after = time.monotonic()
ping2 = (after - before2) * 1000
var = int(random.random() * 5)
v = ["a", "e", "i", "o", "u"]
await ping_msg.edit(content=':warning: [`'+str(datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))+'`] `' + ctx.message.author.name + '#' + ctx.message.author.discriminator + '` checked my ping in the channel `' + ctx.message.channel.name + '` in the server `' + ctx.message.guild.name + '`. The result was {:.2f}ms'.format(ping.total_seconds())+" with a websocket ping of {0:.0f}ms".format(ping2))
await ctx.send(":mega: P"+v[var]+"ng! The message took **{:.0f}ms**!".format(ping.total_seconds())+" `Websocket: {0:.0f}ms` :thinking:".format(ping2))
def test_window_decorator_timing():
from time import monotonic
limiter = FixedWindowRateLimiter(SECONDS, PERMITS)
@limiter.limit
def call():
return monotonic()
times = []
for _ in range(VALUE_COUNT):
times.append(call())
start_indexes = [i for i in range(VALUE_COUNT) if i % PERMITS == 0]
last = -SECONDS
for index in start_indexes:
assert times[index] - last >= SECONDS - EPSILON
last = times[index]
def test_window_decorator_across_windows():
from time import monotonic, sleep
limiter = FixedWindowRateLimiter(SECONDS, 1)
@limiter.limit
def call():
t = monotonic()
sleep(SECONDS * 1.25)
return t
# This should take up two fixed windows for the first task, and the second shouldn't execute until the third (after 2 x SECONDS).
first = call()
second = call()
assert (SECONDS - EPSILON) * 3 >= second - first >= (SECONDS - EPSILON) * 2
##########################
# TokenBucketRateLimiter #
##########################
def test_bucket_acquire_timing():
from time import monotonic
limiter = TokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
times = []
for _ in range(VALUE_COUNT):
with limiter:
times.append(monotonic())
frequency = SECONDS / PERMITS
first_time = times[0]
expected_times = []
for i in range(BURST, len(times)):
expected_times.append(first_time + (i - BURST + 1) * frequency)
# Shouldn't wait for BURST calls
for i in range(BURST - 1):
assert times[i + 1] - times[i] < frequency - EPSILON
# After burst, the rest should fall at about the expected frequency
times = times[BURST:]
epsilon = 0.04
for i, time in enumerate(times):
assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
def test_windowed_bucket_acquire_timing():
from time import monotonic
limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
times = []
for _ in range(VALUE_COUNT):
with limiter:
times.append(monotonic())
frequency = SECONDS / PERMITS
first_time = times[0]
expected_times = [first_time]
for i in range(1, len(times)):
expected_times.append(first_time + i * frequency)
epsilon = 0.04
for i, time in enumerate(times):
assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
def test_windowed_bucket_decorator_timing():
from time import monotonic
limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
@limiter.limit
def call():
return monotonic()
times = []
for _ in range(VALUE_COUNT):
times.append(call())
frequency = SECONDS / PERMITS
first_time = times[0]
expected_times = [first_time]
for i in range(1, len(times)):
expected_times.append(first_time + i * frequency)
epsilon = 0.04
for i, time in enumerate(times):
assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
def try_point(self, point, spawn_time=None, spawn_id=None):
try:
point = randomize_point(point)
skip_time = monotonic() + (conf.GIVE_UP_KNOWN if spawn_time else conf.GIVE_UP_UNKNOWN)
worker = await self.best_worker(point, skip_time)
if not worker:
if spawn_time:
self.skipped += 1
return
async with worker.busy:
if spawn_time:
worker.after_spawn = time() - spawn_time
if await worker.visit(point, spawn_id):
self.visits += 1
except CancelledError:
raise
except Exception:
self.log.exception('An exception occurred in try_point')
finally:
self.coroutine_semaphore.release()
def best_worker(self, point, skip_time):
good_enough = conf.GOOD_ENOUGH
while self.running:
gen = (w for w in self.workers if not w.busy.locked())
try:
worker = next(gen)
lowest_speed = worker.travel_speed(point)
except StopIteration:
lowest_speed = float('inf')
for w in gen:
speed = w.travel_speed(point)
if speed < lowest_speed:
lowest_speed = speed
worker = w
if speed < good_enough:
break
if lowest_speed < conf.SPEED_LIMIT:
worker.speed = lowest_speed
return worker
if skip_time and monotonic() > skip_time:
return None
await sleep(conf.SEARCH_SLEEP, loop=LOOP)
def __init__(self):
self.cache = NotificationCache()
self.notify_ranking = conf.NOTIFY_RANKING
self.initial_score = conf.INITIAL_SCORE
self.minimum_score = conf.MINIMUM_SCORE
self.last_notification = monotonic() - (conf.FULL_TIME / 2)
self.always_notify = []
self.log = get_logger('notifier')
self.never_notify = conf.NEVER_NOTIFY_IDS
self.rarity_override = conf.RARITY_OVERRIDE
self.sent = 0
if self.notify_ranking:
self.initialize_ranking()
LOOP.call_later(3600, self.set_notify_ids)
elif conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS:
self.notify_ids = conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS
self.always_notify = conf.ALWAYS_NOTIFY_IDS
self.notify_ranking = len(self.notify_ids)
def db():
conn = GreenConnection(host='localhost')
try:
with conn as cur:
print('>> sleeping')
st = time.monotonic()
cur.execute('SELECT SLEEP(2)')
en = time.monotonic() - st
assert en >= 2
print('<< sleeping {:.3f}s'.format(en))
cur.execute('SELECT 42')
print('"SELECT 42" -> {!r}'.format(cur.fetchone()))
print('>> sleeping')
st = time.monotonic()
cur.execute('SELECT SLEEP(1)')
en = time.monotonic() - st
assert en >= 1
print('<< sleeping {:.3f}s'.format(en))
finally:
conn.close()
def test(items):
t1 = time.monotonic()
ids = account.calendar.bulk_create(items=items)
t2 = time.monotonic()
account.bulk_delete(ids)
t3 = time.monotonic()
delta1 = t2 - t1
rate1 = len(ids) / delta1
delta2 = t3 - t2
rate2 = len(ids) / delta2
print(('Time to process %s items (batchsize %s/%s, poolsize %s): %s / %s (%s / %s per sec)' % (
len(ids), services.CreateItem.CHUNKSIZE, services.DeleteItem.CHUNKSIZE,
config.protocol.poolsize, delta1, delta2, rate1, rate2)))
# Generate items
def __init__(self, rate=0.0, autojump_threshold=inf):
# when the real clock said 'real_base', the virtual time was
# 'virtual_base', and since then it's advanced at 'rate' virtual
# seconds per real second.
self._real_base = 0.0
self._virtual_base = 0.0
self._rate = 0.0
self._autojump_threshold = 0.0
self._autojump_task = None
self._autojump_cancel_scope = None
# kept as an attribute so that our tests can monkeypatch it
self._real_clock = time.monotonic
# use the property update logic to set initial values
self.rate = rate
self.autojump_threshold = autojump_threshold
def test_timekeeping():
# probably a good idea to use a real clock for *one* test anyway...
TARGET = 0.1
# give it a few tries in case of random CI server flakiness
for _ in range(4):
real_start = time.monotonic()
with _core.open_cancel_scope() as scope:
scope.deadline = _core.current_time() + TARGET
await sleep_forever()
real_duration = time.monotonic() - real_start
accuracy = real_duration / TARGET
print(accuracy)
# Actual time elapsed should always be >= target time
# (== is possible because time.monotonic on Windows is really low res)
if 1.0 <= accuracy < 2: # pragma: no branch
break
else: # pragma: no cover
assert False
def worker(executor, eargs, start, duration, timeout):
queries = 0
rows = 0
latency_stats = np.zeros((timeout * 100,))
min_latency = float('inf')
max_latency = 0.0
while time.monotonic() - start < duration:
req_start = time.monotonic()
rows += await executor(*eargs)
req_time = round((time.monotonic() - req_start) * 1000 * 100)
if req_time > max_latency:
max_latency = req_time
if req_time < min_latency:
min_latency = req_time
latency_stats[req_time] += 1
queries += 1
return queries, rows, latency_stats, min_latency, max_latency
def sync_worker(executor, eargs, start, duration, timeout):
queries = 0
rows = 0
latency_stats = np.zeros((timeout * 100,))
min_latency = float('inf')
max_latency = 0.0
while time.monotonic() - start < duration:
req_start = time.monotonic()
rows += executor(*eargs)
req_time = round((time.monotonic() - req_start) * 1000 * 100)
if req_time > max_latency:
max_latency = req_time
if req_time < min_latency:
min_latency = req_time
latency_stats[req_time] += 1
queries += 1
return queries, rows, latency_stats, min_latency, max_latency
def callback(self, ch, method, properties, body):
"""
????,????????????rabbitmq???
:param ch: ???self.channel
:param method:
:param properties:???????????
:param body:????
:return:
"""
before = time.monotonic() # ?????????????
exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
exec_cmd.start()
exec_cmd.join(self.timeout)
after = time.monotonic() # ????????????,????????????
if (after - before) > self.timeout: # ????????????????,??????????,???????????
self.response = bytes("command exec timeout", "utf8")
print(" [*] Got a task {}".format(str(body, "utf8)")))
message = {"host": self.id, "data": self.response}
ch.basic_publish(exchange="",
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id,),
body=bytes(str(message), "utf-8"))
ch.basic_ack(delivery_tag=method.delivery_tag)