def authenticate(self):
if self.expire_time is None or monotonic() > self.expire_time: # first credential request, or the access token from the previous one expired
# get an access token using OAuth
credential_url = "https://api.cognitive.microsoft.com/sts/v1.0/issueToken"
headers = {"Ocp-Apim-Subscription-Key": self.key}
start_time = monotonic()
response = self.session.post(credential_url, headers=headers)
if response.status_code != 200:
raise RequestError("http request error with status code {}".format(response.status_code))
self.access_token = response.content
expiry_seconds = 590 # document mentions the access token is expired in 10 minutes
self.expire_time = start_time + expiry_seconds
python类monotonic()的实例源码
def sendResponse(self, responseType, value, args=None):
if args is None:
args = {responseType: value}
else:
if type(args) != dict:
raise TypeError
args.update({responseType: value})
if self.socket is not None:
if self.socket.connected:
self.logger.log("server <-- " + repr(args))
else:
self.logger.log("buffer <-- " + repr(args))
try:
self.socket.sendObjAsJSON(args)
self.connectTime = time.monotonic()
except ConnectionRefusedError:
self.logger.log("Connection refused.")
self.disconnect()
except BrokenPipeError:
self.logger.log("Broken pipe, connection dropped.")
self.disconnect()
else:
self.logger.log("nobody <-- " + repr(args))
def apply(self, items, inplace=False, item_getter=lambda item: item):
"""Sort sequence `items`
item_getter: Callable that gets an item of `items` and returns an
object that can be sorted with any of the sorters
specified in the SORTSPECS variable. (This allows for
sorting of widgets as long as they can provide a sortable
object.)
inplace: Modify `items` if True, otherwise return a new list
"""
import time
start_time = time.monotonic()
for sorter in self._sortfuncs:
items = sorter(items, inplace=inplace, item_getter=item_getter)
log.debug('-> Sorted %d items by %s in %.3fms',
len(items), self, (time.monotonic()-start_time)*1e3)
if not inplace:
return items
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def ping(self, ctx):
"""Your average ping command."""
# Set the embed for the pre-ping
clock = random.randint(0x1F550, 0x1F567) # pick a random clock
embed = discord.Embed(colour=0xFFC107)
embed.set_author(name=random.choice(PRE_PING_REMARKS), icon_url=emoji_url(chr(clock)))
# Do the classic ping
start = time.perf_counter() # fuck time.monotonic()
message = await ctx.send(embed=embed)
end = time.perf_counter() # fuck time.monotonic()
ms = (end - start) * 1000
# Edit the embed to show the actual ping
embed.colour = 0x4CAF50
embed.set_author(name='Poing!', icon_url=emoji_url('\U0001f3d3'))
embed.add_field(name='Latency', value=f'{ctx.bot.latency * 1000 :.0f} ms')
embed.add_field(name='Classic', value=f'{ms :.0f} ms', inline=False)
await message.edit(embed=embed)
def stop(self):
"""Exit the help page"""
super().stop()
# Only do it for a minute, so if someone does a quick stop,
# we'll grant them their wish of stopping early.
end = time.monotonic()
if end - self._start_time < 60:
return
final_embed = (discord.Embed(colour=self.colour, description='*Just remember...* \N{HEAVY BLACK HEART}')
.set_author(name='Thank you for looking at the help page!')
.set_image(url=CHIAKI_MOTIVATION_URL)
)
# haaaaaaaaaaaack
await self._message.edit(embed=final_embed)
return await asyncio.sleep(10)
def main(self):
self.start = time.monotonic()
self.prepare()
failed = self.compile_bench()
dt = time.monotonic() - self.start
dt = datetime.timedelta(seconds=dt)
self.logger.error("Benchmark completed in %s" % dt)
if self.uploaded:
self.logger.error("Benchmark results uploaded and written into %s"
% self.upload_filename)
elif failed:
self.logger.error("Benchmark failed but results written into %s"
% self.filename)
else:
self.logger.error("Benchmark result written into %s"
% self.filename)
if failed:
sys.exit(EXIT_BENCH_ERROR)
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
def __exit__(self, *args):
"""
Sleeps for a variable amount of time (dependent on when it was last
called), to give a consistent frame rate. If it cannot meet the desired
frame rate (i.e. too much time has occurred since the last call), then
it simply exits without blocking.
"""
self.called += 1
self.total_transit_time += monotonic() - self.enter_time
if self.max_sleep_time >= 0:
elapsed = monotonic() - self.last_time
sleep_for = self.max_sleep_time - elapsed
if sleep_for > 0:
time.sleep(sleep_for)
self.last_time = monotonic()
def wait_for(self, predicate, timeout=None):
"""Wait until a condition evaluates to True.
predicate should be a callable which result will be interpreted as a
boolean value. A timeout may be provided giving the maximum time to
wait.
"""
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
def run_create_kernel(_idx):
begin = time.monotonic()
try:
k = Kernel.get_or_create('python3')
ret = k.kernel_id
except:
log.exception('run_create_kernel')
ret = None
finally:
end = time.monotonic()
t = end - begin
return t, ret
def run_execute_code(kid):
if kid is not None:
begin = time.monotonic()
console = []
run_id = token_hex(8)
while True:
result = Kernel(kid).execute(run_id, sample_code)
console.extend(result['console'])
if result['status'] == 'finished':
break
stdout = ''.join(rec[1] for rec in console if rec[0] == 'stdout')
end = time.monotonic()
print(stdout)
return end - begin
return None
def run_restart_kernel(kid):
# 2nd params is currently ignored.
if kid is not None:
begin = time.monotonic()
Kernel(kid).restart()
end = time.monotonic()
return end - begin
return None
def run_destroy_kernel(kid):
if kid is not None:
begin = time.monotonic()
Kernel(kid).destroy()
end = time.monotonic()
return end - begin
return None
def time(self):
"""Returns the current time according to the `IOLoop`'s clock.
The return value is a floating-point number relative to an
unspecified time in the past.
By default, the `IOLoop`'s time function is `time.time`. However,
it may be configured to use e.g. `time.monotonic` instead.
Calls to `add_timeout` that pass a number instead of a
`datetime.timedelta` should use this function to compute the
appropriate time, so they can work no matter what time function
is chosen.
"""
return time.time()
def time(self):
"""Returns the current time according to the `IOLoop`'s clock.
The return value is a floating-point number relative to an
unspecified time in the past.
By default, the `IOLoop`'s time function is `time.time`. However,
it may be configured to use e.g. `time.monotonic` instead.
Calls to `add_timeout` that pass a number instead of a
`datetime.timedelta` should use this function to compute the
appropriate time, so they can work no matter what time function
is chosen.
"""
return time.time()
def __enter__(self):
self._tick_start = time.monotonic()
return self
def __exit__(self, *args):
next_tick = time.monotonic() - self._tick_start
if next_tick > self._interval:
next_tick = next_tick % self._interval
else:
self._next_tick = self._interval - next_tick
def hit(self, view):
"""Add a lint request to the queue, return the time at which the request was enqueued."""
timestamp = time.monotonic()
self.q.put((view.id(), timestamp, self.get_delay(view)))
return timestamp