def __call__(self, masker, dict_fact, cpu_time, io_time):
test_time = time.perf_counter()
if not hasattr(self, 'data'):
self.data = masker.transform(self.test_imgs,
confounds=self.test_confounds)
scores = np.array([dict_fact.score(data) for data in self.data])
len_imgs = np.array([data.shape[0] for data in self.data])
score = np.sum(scores * len_imgs) / np.sum(len_imgs)
self.test_time += time.perf_counter() - test_time
this_time = time.perf_counter() - self.start_time - self.test_time
self.score.append(score)
self.time.append(this_time)
self.cpu_time.append(cpu_time)
self.io_time.append(io_time)
self.iter.append(dict_fact.n_iter_)
if self.info is not None:
self.info['time'] = self.cpu_time
self.info['score'] = self.score
self.info['iter'] = self.iter
python类perf_counter()的实例源码
def sync(self):
"""Synchronize data from library and actual data in the musics folder
Raises:
ValueError: When musics_folder is not set
"""
if self.musics_folder is None or not pathlib.Path(self.musics_folder).is_dir():
raise ValueError('Invalid music folder: ' + str(self.musics_folder))
self.logger.info(f"Scanning {self.musics_folder}")
start = time.perf_counter()
self.__sync_songs()
self.__sync_artists()
self.__sync_albums()
self.__sync_playlists()
end = time.perf_counter()
self.logger.info('Scan ended in {:.3f}'.format(end - start))
def __create_model(self):
self.logger.info('Creating ListStore')
start = time.perf_counter()
model = AdapterSong.create_store()
order = AbstractPlaylist.OrderBy[self.userconfig['grid']['sort']['field']]
desc = self.userconfig['grid']['sort']['desc']
songs = self.current_playlist.collections(order, desc)
for row in songs:
model.insert_with_valuesv(-1, AdapterSong.create_col_number(), AdapterSong.create_row(row))
GObject.idle_add(lambda: self.__create_model_finished(model))
end = time.perf_counter()
self.logger.info('ListStore created in {:.3f} seconds'.format(end - start))
def ping(self, ctx):
t1 = time.perf_counter()
await ctx.trigger_typing()
t2 = time.perf_counter()
result = round((t2-t1)*1000)
if int(result) >=200:
em = discord.Embed(title="Ping : " + str(result) + "ms", description="... c'est quoi ce ping !", colour=0xFF1111)
await ctx.send(embed=em)
elif int(result) > 100 and int(result) < 200:
em = discord.Embed(title="Ping : " + str(result) + "ms", description="Ca va, ça peut aller, mais j'ai l'impression d'avoir 40 ans !", colour=0xFFA500)
await ctx.send(embed=em)
elif int(result) <= 100:
em = discord.Embed(title="Ping : " + str(result) + "ms", description="Wow c'te vitesse de réaction, je m'épate moi-même !",colour=0x11FF11)
await ctx.send(embed=em)
##INFO##
def ping(self, ctx):
"""Your average ping command."""
# Set the embed for the pre-ping
clock = random.randint(0x1F550, 0x1F567) # pick a random clock
embed = discord.Embed(colour=0xFFC107)
embed.set_author(name=random.choice(PRE_PING_REMARKS), icon_url=emoji_url(chr(clock)))
# Do the classic ping
start = time.perf_counter() # fuck time.monotonic()
message = await ctx.send(embed=embed)
end = time.perf_counter() # fuck time.monotonic()
ms = (end - start) * 1000
# Edit the embed to show the actual ping
embed.colour = 0x4CAF50
embed.set_author(name='Poing!', icon_url=emoji_url('\U0001f3d3'))
embed.add_field(name='Latency', value=f'{ctx.bot.latency * 1000 :.0f} ms')
embed.add_field(name='Classic', value=f'{ms :.0f} ms', inline=False)
await message.edit(embed=embed)
def load_questions(self, msg):
msg = msg.split(" ")
if len(msg) == 2:
_, qlist = msg
if qlist == "random":
chosen_list = randchoice(glob.glob("data/trivia/*.txt"))
self.question_list = self.load_list(chosen_list)
self.status = "new question"
self.timeout = time.perf_counter()
if self.question_list: await self.new_question()
else:
if os.path.isfile("data/trivia/" + qlist + ".txt"):
self.question_list = await self.load_list("data/trivia/" + qlist + ".txt")
self.status = "new question"
self.timeout = time.perf_counter()
if self.question_list: await self.new_question()
else:
await trivia_manager.bot.say("There is no list with that name.")
await self.stop_trivia()
else:
await trivia_manager.bot.say("trivia [list name]")
def check_answer(self, message):
if message.author.id != trivia_manager.bot.user.id:
self.timeout = time.perf_counter()
if self.current_q is not None:
for answer in self.current_q["ANSWERS"]:
if answer in message.content.lower():
self.current_q["ANSWERS"] = []
self.status = "correct answer"
self.add_point(message.author.name)
msg = "You got it {}! **+1** to you!".format(message.author.name)
try:
await trivia_manager.bot.send_typing(self.channel)
await trivia_manager.bot.send_message(message.channel, msg)
except:
await asyncio.sleep(0.5)
await trivia_manager.bot.send_message(message.channel, msg)
return True
def game():
# Controller page
time_full = time.perf_counter()
player_ip = request.remote_addr
# To redirect players who isent in the ip list and has thearfor ni team
redirect_var = True
for i in players:
if player_ip == i.ip:
print("OK")
redirect_var = False
if redirect_var:
return redirect(url_for('index'))
team_var = get_team(player_ip)
direction_var = None
if request.method == 'POST':
# Adds a request to move the robot in the multithread queue
q.put(request.form['submit'])
print(time.perf_counter() - time_full)
return render_template('game.html',team=team_var, direction=direction_var)
def test_get_clock_info(self):
clocks = ['clock', 'perf_counter', 'process_time', 'time']
if hasattr(time, 'monotonic'):
clocks.append('monotonic')
for name in clocks:
info = time.get_clock_info(name)
#self.assertIsInstance(info, dict)
self.assertIsInstance(info.implementation, str)
self.assertNotEqual(info.implementation, '')
self.assertIsInstance(info.monotonic, bool)
self.assertIsInstance(info.resolution, float)
# 0.0 < resolution <= 1.0
self.assertGreater(info.resolution, 0.0)
self.assertLessEqual(info.resolution, 1.0)
self.assertIsInstance(info.adjustable, bool)
self.assertRaises(ValueError, time.get_clock_info, 'xxx')
def write_pages(self, pages):
sums = defaultdict(float)
counts = defaultdict(int)
# group by type
by_type = defaultdict(list)
for page in pages:
by_type[(page.RENDER_PREFERRED_ORDER, page.TYPE)].append(page)
# Render collecting timing statistics
for (order, type), pgs in sorted(by_type.items(), key=lambda x:x[0][0]):
start = time.perf_counter()
for page in pgs:
contents = page.render()
for relpath, rendered in contents.items():
dst = self.output_abspath(relpath)
rendered.write(dst)
end = time.perf_counter()
sums[type] = end - start
counts[type] = len(pgs)
return sums, counts
def launch(self):
"""Ask JupyterHub to launch the image."""
await self.emit({
'phase': 'launching',
'message': 'Launching server...\n',
})
launcher = self.settings['launcher']
username = launcher.username_from_repo(self.repo)
try:
launch_starttime = time.perf_counter()
server_info = await launcher.launch(image=self.image_name, username=username)
LAUNCH_TIME.labels(status='success').observe(time.perf_counter() - launch_starttime)
except:
LAUNCH_TIME.labels(status='failure').observe(time.perf_counter() - launch_starttime)
raise
event = {
'phase': 'ready',
'message': 'server running at %s\n' % server_info['url'],
}
event.update(server_info)
await self.emit(event)
def generate_output(args):
if not args.test:
args.test = glob_with_format(args.format) # by default
tests = construct_relationship_of_files(args.test, args.format)
for name, it in sorted(tests.items()):
log.emit('')
log.info('%s', name)
if 'out' in it:
log.info('output file already exists.')
log.info('skipped.')
continue
with open(it['in']) as inf:
begin = time.perf_counter()
answer, proc = utils.exec_command(args.command, shell=args.shell, stdin=inf)
end = time.perf_counter()
log.status('time: %f sec', end - begin)
if proc.returncode != 0:
log.failure(log.red('RE') + ': return code %d', proc.returncode)
log.info('skipped.')
continue
log.emit(log.bold(answer.decode().rstrip()))
path = path_from_format(args.format, match_with_format(args.format, it['in']).groupdict()['name'], 'out')
with open(path, 'w') as fh:
fh.buffer.write(answer)
log.success('saved to: %s', path)
def _recorder(pid, stop, ival):
"""Subprocess call function to record cpu and memory."""
t = t0 = time()
process = psutil.Process(pid)
if stop is None:
while True:
m = process.memory_info()
print(psutil.cpu_percent(), ',', m[0], ',', m[1])
sleep(ival)
t = time()
else:
while t - t0 < stop:
m = process.memory_info()
print(psutil.cpu_percent(), ',', m[0], ',', m[1])
sleep(ival)
t = time()
def fit(self, path=None):
"""Fit sub-learner"""
if path is None:
path = self.path
t0 = time()
transformers = self._load_preprocess(path)
self._fit(transformers)
if self.out_array is not None:
self._predict(transformers, self.scorer is not None)
o = IndexedEstimator(estimator=self.estimator,
name=self.name_index,
index=self.index,
in_index=self.in_index,
out_index=self.out_index,
data=self.data)
save(path, self.name_index, o)
if self.verbose:
msg = "{:<30} {}".format(self.name_index, "done")
f = "stdout" if self.verbose < 10 - 3 else "stderr"
print_time(t0, msg, file=f)
def _predict(self, transformers, score_preds):
"""Sub-routine to with sublearner"""
n = self.in_array.shape[0]
# For training, use ytemp to score predictions
# During test time, ytemp is None
xtemp, ytemp = slice_array(self.in_array, self.targets, self.out_index)
t0 = time()
if transformers:
xtemp, ytemp = transformers.transform(xtemp, ytemp)
predictions = getattr(self.estimator, self.attr)(xtemp)
self.pred_time_ = time() - t0
# Assign predictions to matrix
assign_predictions(self.out_array, predictions,
self.out_index, self.output_columns, n)
# Score predictions if applicable
if score_preds:
self.score_ = score_predictions(
ytemp, predictions, self.scorer, self.name_index, self.name)
def fit(self, path=None):
"""Fit transformers"""
path = path if path else self.path
t0 = time()
xtemp, ytemp = slice_array(
self.in_array, self.targets, self.in_index)
t0_f = time()
self.estimator.fit(xtemp, ytemp)
self.transform_time_ = time() - t0_f
if self.out_array is not None:
self._transform()
o = IndexedEstimator(estimator=self.estimator,
name=self.name_index,
index=self.index,
in_index=self.in_index,
out_index=self.out_index,
data=self.data)
save(path, self.name_index, o)
if self.verbose:
f = "stdout" if self.verbose < 10 else "stderr"
msg = "{:<30} {}".format(self.name_index, "done")
print_time(t0, msg, file=f)
def fit(self, path=None):
"""Evaluate sub-learner"""
path = path if path else self.path
if self.scorer is None:
raise ValueError("Cannot generate CV-scores without a scorer")
t0 = time()
transformers = self._load_preprocess(path)
self._fit(transformers)
self._predict(transformers)
o = IndexedEstimator(estimator=self.estimator,
name=self.name_index,
index=self.index,
in_index=self.in_index,
out_index=self.out_index,
data=self.data)
save(path, self.name_index, o)
if self.verbose:
f = "stdout" if self.verbose else "stderr"
msg = "{:<30} {}".format(self.name_index, "done")
print_time(t0, msg, file=f)
def _score_preds(self, transformers, index):
# Train scores
xtemp, ytemp = slice_array(self.in_array, self.targets, index)
if transformers:
xtemp, ytemp = transformers.transform(xtemp, ytemp)
t0 = time()
if self.error_score is not None:
try:
scores = self.scorer(self.estimator, xtemp, ytemp)
except Exception as exc: # pylint: disable=broad-except
warnings.warn(
"Scoring failed. Setting error score %r."
"Details:\n%r" % (self.error_score, exc),
FitFailedWarning)
scores = self.error_score
else:
scores = self.scorer(self.estimator, xtemp, ytemp)
pred_time = time() - t0
return scores, pred_time
def answer_with_doc(self, question: str, doc: str) -> Tuple[np.ndarray, List[WebParagraph]]:
""" Answer a question using the given text as a document """
self.log.info("Answering question \"%s\" with a given document" % question)
# Tokenize
question = self.tokenizer.tokenize_paragraph_flat(question)
context = [self.tokenizer.tokenize_with_inverse(x, False) for x in self._split_regex.split(doc)]
# Split into super-paragraphs
context = self._split_document(context, "User", None)
# Select top paragraphs
context = self.paragraph_selector.prune(question, context)
if len(context) == 0:
raise ValueError("Unable to process documents")
# Select the top answer span
t0 = time.perf_counter()
span_scores = self._get_span_scores(question, context)
self.log.info("Computing answer spans took %.5f seconds" % (time.perf_counter() - t0))
return span_scores
def loadQuestions(self, msg):
msg = msg.split(" ")
if len(msg) == 2:
_, qlist = msg
if qlist == "random":
chosenList = choice(glob.glob("trivia/*.txt"))
self.questionList = self.loadList(chosenList)
self.status = "new question"
self.timeout = time.perf_counter()
if self.questionList: await self.newQuestion()
else:
if os.path.isfile("trivia/" + qlist + ".txt"):
self.questionList = self.loadList("trivia/" + qlist + ".txt")
self.status = "new question"
self.timeout = time.perf_counter()
if self.questionList: await self.newQuestion()
else:
await client.send_message(self.channel, "`There is no list with that name.`")
await self.stopTrivia()
else:
await client.send_message(self.channel, "`" + settings["PREFIX"] + "trivia [list name]`")