def register_event(self, timestamp):
self._events_since_checkpoint += 1
dt = timestamp - self._checkpoint_ts
if dt >= self._update_interval:
# Resetting the stat if expired
mono_ts = time.monotonic()
expired = mono_ts > self._estimate_expires_at
self._estimate_expires_at = mono_ts + self._estimate_lifetime
if expired:
self._hist = []
elif len(self._hist) >= self._averaging_period:
self._hist.pop()
# Updating the history
self._hist.insert(0, self._events_since_checkpoint / dt)
self._checkpoint_ts = timestamp
self._events_since_checkpoint = 0
评论列表
文章目录