def get_average_problems_solved_per_user(eligible=True, scoring=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
solves = []
for tid, breakdown in user_breakdown.items():
for uid, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return (statistics.mean(solves),
statistics.stdev(solves))
python类mean()的实例源码
def get_team_participation_percentage(eligible=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
team_size_any = defaultdict(list)
team_size_correct = defaultdict(list)
for tid, breakdown in user_breakdown.items():
count_any = 0
count_correct = 0
for uid, work in breakdown.items():
if work is not None:
count_any += 1
if work['correct'] > 0:
count_correct += 1
team_size_any[len(breakdown.keys())].append(count_any)
team_size_correct[len(breakdown.keys())].append(count_correct)
return {x: statistics.mean(y) for x, y in team_size_any.items()}, \
{x: statistics.mean(y) for x, y in team_size_correct.items()}
def summary(self, verbose=False):
times = set()
for r in self.results:
if not r.finish:
r.capture()
if verbose:
print(' {}'.format(r.str(self.dp)), file=self.file)
times.add(r.elapsed())
if times:
print(_SUMMARY_TEMPLATE.format(
count=len(times),
mean=mean(times),
stddev=stdev(times) if len(times) > 1 else 0,
min=min(times),
max=max(times),
dp=self.dp,
), file=self.file, flush=True)
else:
raise RuntimeError('timer not started')
return times
def get_arguments():
parser = argparse.ArgumentParser(description='FAST5 to FASTQ',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('dir', type=str,
help='directory of FAST5 reads to extract (will be searched recursively)')
parser.add_argument('--min_length', type=int, default=0,
help='Exclude reads shorter than this length (in bp)')
parser.add_argument('--min_mean_qual', type=float, default=0.0,
help='Exclude reads with a mean qscore less than this value')
parser.add_argument('--min_qual_window', type=float, default=0.0,
help='Exclude reads where their mean qscore in a sliding window drops '
'below this value')
parser.add_argument('--window_size', type=int, default=50,
help='The size of the sliding window used for --min_qual_window')
parser.add_argument('--target_bases', type=int, default=None,
help='If set, exclude the worst reads (as judged by their minimum qscore '
'in a sliding window) such that only this many bases remain')
args = parser.parse_args()
args.dir = os.path.abspath(args.dir)
return args
def get_min_window_qscore(quals, window_size):
"""
Returns the minimum mean qscore over a sliding window.
"""
quals = [q - 33 for q in quals] # covert to numbers
current_window_qscore = statistics.mean(quals[:window_size])
shift_count = len(quals) - window_size
if shift_count < 1:
return current_window_qscore
min_window_qscore = current_window_qscore
for i in range(shift_count):
leaving_window = quals[i]
entering_window = quals[i + window_size]
current_window_qscore += (entering_window - leaving_window) / window_size
if current_window_qscore < min_window_qscore:
min_window_qscore = current_window_qscore
return min_window_qscore
def get_min_window_qscore(quals, window_size):
"""
Returns the minimum mean qscore over a sliding window.
"""
quals = [q - 33 for q in quals] # covert to numbers
current_window_qscore = statistics.mean(quals[:window_size])
shift_count = len(quals) - window_size
if shift_count < 1:
return current_window_qscore
min_window_qscore = current_window_qscore
for i in range(shift_count):
leaving_window = quals[i]
entering_window = quals[i + window_size]
current_window_qscore += (entering_window - leaving_window) / window_size
if current_window_qscore < min_window_qscore:
min_window_qscore = current_window_qscore
return min_window_qscore
def MEAN(df, n, price='Close'):
"""
Arithmetic mean (average) of data
"""
mean_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
mean = float('NaN')
else:
start = i + 1 - n
end = i + 1
mean = statistics.mean(df[price][start:end])
mean_list.append(mean)
i += 1
return mean_list
def HARMONIC_MEAN(df, n, price='Close'):
"""
Harmonic mean of data
"""
harmonic_mean_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
harmonic_mean = float('NaN')
else:
start = i + 1 - n
end = i + 1
harmonic_mean = statistics.harmonic_mean(df[price][start:end])
harmonic_mean_list.append(harmonic_mean)
i += 1
return harmonic_mean_list
def get_average_problems_solved_per_user(eligible=True, scoring=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
solves = []
for tid, breakdown in user_breakdown.items():
for uid, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return (statistics.mean(solves),
statistics.stdev(solves))
def get_team_participation_percentage(eligible=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
team_size_any = defaultdict(list)
team_size_correct = defaultdict(list)
for tid, breakdown in user_breakdown.items():
count_any = 0
count_correct = 0
for uid, work in breakdown.items():
if work is not None:
count_any += 1
if work['correct'] > 0:
count_correct += 1
team_size_any[len(breakdown.keys())].append(count_any)
team_size_correct[len(breakdown.keys())].append(count_correct)
return {x: statistics.mean(y) for x, y in team_size_any.items()}, \
{x: statistics.mean(y) for x, y in team_size_correct.items()}
def get_average_problems_solved_per_user(eligible=True, scoring=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
solves = []
for tid, breakdown in user_breakdown.items():
for uid, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return (statistics.mean(solves),
statistics.stdev(solves))
def get_team_participation_percentage(eligible=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
team_size_any = defaultdict(list)
team_size_correct = defaultdict(list)
for tid, breakdown in user_breakdown.items():
count_any = 0
count_correct = 0
for uid, work in breakdown.items():
if work is not None:
count_any += 1
if work['correct'] > 0:
count_correct += 1
team_size_any[len(breakdown.keys())].append(count_any)
team_size_correct[len(breakdown.keys())].append(count_correct)
return {x: statistics.mean(y) for x, y in team_size_any.items()}, \
{x: statistics.mean(y) for x, y in team_size_correct.items()}
def printWinSizeSummary(neighborTL):
'''Given a list where index is genes and the values are neighbor genes, calculate the size of this window in bp for each gene. Return the mean and standard deviation.'''
winL = []
for neighborT in neighborTL:
winL.append(calcWinSize(neighborT,geneNames,geneInfoD))
median = statistics.median(winL)
mean = statistics.mean(winL)
stdev = statistics.stdev(winL)
print(" median",round(median))
print(" mean",round(mean))
print(" stdev",round(stdev))
## mods for core stuff (requires changing functions, so we move them here)
def evaluate_and_update_max_score(self, t, episodes):
eval_stats = eval_performance(
self.env, self.agent, self.n_runs,
max_episode_len=self.max_episode_len, explorer=self.explorer,
logger=self.logger)
elapsed = time.time() - self.start_time
custom_values = tuple(tup[1] for tup in self.agent.get_statistics())
mean = eval_stats['mean']
values = (t,
episodes,
elapsed,
mean,
eval_stats['median'],
eval_stats['stdev'],
eval_stats['max'],
eval_stats['min']) + custom_values
record_stats(self.outdir, values)
if mean > self.max_score:
update_best_model(self.agent, self.outdir, t, self.max_score, mean,
logger=self.logger)
self.max_score = mean
return mean
def evaluate_and_update_max_score(self, t, episodes, env, agent):
eval_stats = eval_performance(
env, agent, self.n_runs,
max_episode_len=self.max_episode_len, explorer=self.explorer,
logger=self.logger)
elapsed = time.time() - self.start_time
custom_values = tuple(tup[1] for tup in agent.get_statistics())
mean = eval_stats['mean']
values = (t,
episodes,
elapsed,
mean,
eval_stats['median'],
eval_stats['stdev'],
eval_stats['max'],
eval_stats['min']) + custom_values
record_stats(self.outdir, values)
with self._max_score.get_lock():
if mean > self._max_score.value:
update_best_model(
agent, self.outdir, t, self._max_score.value, mean,
logger=self.logger)
self._max_score.value = mean
return mean
def calculate_IDL(self, data_lst, Concentration, debug_on):
DegreesOfFreedom = len(data_lst) - 1
if DegreesOfFreedom < 1:
return 'PoorSensitivity'
Ta = self.T_Table_99Confidence.get(DegreesOfFreedom, "TooMany")
if debug_on == True:
print('DegreesOfFreedom: ', DegreesOfFreedom)
print('Concentration,: ', Concentration)
print('data_lst: ', data_lst)
if Ta == "TooMany":
raise Exception('There are more than 21 data values for the IDL calculation and therefore not enough degrees of freedom in T_Table_99Confidence dictionary.')
Averge = statistics.mean(data_lst)
StandardDeviation = statistics.stdev(data_lst)
RSD = (StandardDeviation/Averge) * 100
return round(((Ta * RSD * Concentration)/100),2)
def runPutTest(testDataPath, testDataRangeStart, testDataRangeEnd, f):
log.debug('running put tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDataRangeStart, testDataRangeEnd):
print(i)
thisPath = '%s/%i' % (testDataPath, i)
o = loadTestData(thisPath)
f.putObject(o, str(i))
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.warning('RESULT (PUT): total test runtime: %s seconds, mean per object: %s' % (
timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd)))
log.critical('RESULT (PUT): median result: %s ' % statistics.median(calculateTimeDeltas(times)))
log.critical('RESULT (PUT): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times)))
log.critical('RESULT (PUT): mean result: %s ' % statistics.mean(calculateTimeDeltas(times)))
# log.critical('RESULT (PUT): individual times: %s ' % (calculateTimeDeltas(times)))
def runGetTest(testDataPath, testDataRangeStart, testDataRangeEnd, f):
log.debug('running get tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDataRangeStart, testDataRangeEnd):
thisPath = '%s/%i' % (testDataPath, i)
o = f.getObject(str(i))
saveTestData(o, thisPath)
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.critical('RESULT (GET): total test runtime: %s seconds, mean per object: %s' % (
timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd)))
log.critical('RESULT (GET): median result: %s ' % statistics.median(calculateTimeDeltas(times)))
log.critical('RESULT (GET): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times)))
log.critical('RESULT (GET): mean result: %s ' % statistics.mean(calculateTimeDeltas(times)))
# log.critical('RESULT (GET): individual times: %s ' % (calculateTimeDeltas(times)))
def runDeleteTest(testDataRangeStart, testDataRangeEnd, f):
log.debug('running delete tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDataRangeStart, testDataRangeEnd):
f.deleteObject(str(i))
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.critical('RESULT (DELETE): total test runtime: %s seconds, mean per object: %s' % (
timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd)))
log.critical('RESULT (DELETE): median result: %s ' % statistics.median(calculateTimeDeltas(times)))
log.critical('RESULT (DELETE): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times)))
log.critical('RESULT (DELETE): mean result: %s ' % statistics.mean(calculateTimeDeltas(times)))
# log.critical('RESULT (DELETE): individual times: %s ' % (calculateTimeDeltas(times)))
###############################################################################
###############################################################################
def eval_performance(rom, p_func, n_runs):
assert n_runs > 1, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
env = ale.ALE(rom, treat_life_lost_as_terminal=False)
test_r = 0
while not env.is_terminal:
s = chainer.Variable(np.expand_dims(dqn_phi(env.state), 0))
pout = p_func(s)
a = pout.action_indices[0]
test_r += env.receive_action(a)
scores.append(test_r)
print('test_{}:'.format(i), test_r)
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, median, stdev
def eval_performance(process_idx, make_env, model, phi, n_runs):
assert n_runs > 1, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
model.reset_state()
env = make_env(process_idx, test=True)
obs = env.reset()
done = False
test_r = 0
while not done:
s = chainer.Variable(np.expand_dims(phi(obs), 0))
pout, _ = model.pi_and_v(s)
a = pout.action_indices[0]
obs, r, done, info = env.step(a)
test_r += r
scores.append(test_r)
print('test_{}:'.format(i), test_r)
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, median, stdev
def calculate_brightness_for_image(image):
pix = image.load()
width, height = image.size
width = float(width)
height = float(height)
data = []
for y in range(0, int(height)):
for x in range(0, int(width)):
if (y < (1.0 - BODY_H - HEAD_H) * height) or\
(y > (1.0 - BODY_H - HEAD_H) * height and
y < (1.0 - HEAD_H) * height and
(x < (1.0 - HEAD_W) / 2.0 * width or
x > (1.0 + HEAD_W) / 2.0)) or\
(y > (1.0 - BODY_H) * height and
(x < (1.0 - BODY_W) / 2.0 * width or
x > (1.0 + BODY_W) / 2.0 * width)):
r, g, b = pix[x, y]
brightness = int(calculate_brightness_for_pixel(
r, g, b) / 255.0 * 100.0)
data.append(ponderate(brightness))
return int(statistics.mean(data))
def get_channel(self, previous_value, new_value):
""" Prepares signal value depending on the previous one and algorithm. """
if self.stereo_algorithm == STEREO_ALGORITHM_NEW:
channel_value = new_value
elif self.stereo_algorithm == STEREO_ALGORITHM_LOGARITHM:
if previous_value == 0.0:
channel_value = 0.0
else:
channel_value = 20 * math.log10(new_value/previous_value)
if channel_value < -20:
channel_value = -20
if channel_value > 3:
channel_value = 3
channel_value = (channel_value + 20) * (100/23)
elif self.stereo_algorithm == STEREO_ALGORITHM_AVERAGE:
channel_value = statistics.mean([previous_value, new_value])
return channel_value
def _post_processing_status(self) -> TargetStatuses:
"""
Return the status of the target, or what it will be when processing is
finished.
The status depends on the standard deviation of the color bands.
How VWS determines this is unknown, but it relates to how suitable the
target is for detection.
"""
image = Image.open(self._image)
image_stat = ImageStat.Stat(image)
average_std_dev = statistics.mean(image_stat.stddev)
if average_std_dev > 5:
return TargetStatuses.SUCCESS
return TargetStatuses.FAILED
def ejecutar(función):
print(función)
cronometrajes = []
stdout = sys.stdout
for i in range(100):
sys.stdout = None
horaInicio = time.time()
función()
segundos = time.time() - horaInicio
sys.stdout = stdout
cronometrajes.append(segundos)
promedio = statistics.mean(cronometrajes)
if i < 10 or i % 10 == 9:
print("{} {:3.2f} {:3.2f}".format(
1 + i, promedio,
statistics.stdev(cronometrajes,
promedio) if i > 1 else 0))
def math_stats_calculations(point_map):
point_array = []
for team in team_array:
point_array.append(point_map[team])
# Calculates mean
mean_val = str(round(statistics.mean(point_array), 2))
# Calculates median
median_val = str(round(statistics.median(point_array), 2))
# Calculates standard deviation
stdev_val = str(round(statistics.stdev(point_array), 2))
# Calculates variance
var_val = str(round(statistics.variance(point_array), 2))
return (mean_val,median_val,stdev_val,var_val)
# Calls my function
def encode_benchmark(self, bench):
data = {}
data['environment'] = self.conf.environment
data['project'] = self.conf.project
data['branch'] = self.branch
data['benchmark'] = bench.get_name()
# Other benchmark metadata:
# - description
# - units="seconds", units_title="Time", lessisbetter=True
data['commitid'] = self.revision
data['revision_date'] = self.commit_date.isoformat()
data['executable'] = self.conf.executable
data['result_value'] = bench.mean()
# Other result metadata: result_date
if bench.get_nvalue() == 1:
data['std_dev'] = 0
else:
data['std_dev'] = bench.stdev()
values = bench.get_values()
data['min'] = min(values)
data['max'] = max(values)
# Other stats metadata: q1, q3
return data
def pooled_sample_variance(sample1, sample2):
"""Find the pooled sample variance for two samples.
Args:
sample1: one sample.
sample2: the other sample.
Returns:
Pooled sample variance, as a float.
"""
deg_freedom = len(sample1) + len(sample2) - 2
mean1 = statistics.mean(sample1)
squares1 = ((x - mean1) ** 2 for x in sample1)
mean2 = statistics.mean(sample2)
squares2 = ((x - mean2) ** 2 for x in sample2)
return (math.fsum(squares1) + math.fsum(squares2)) / float(deg_freedom)
def __str__(self):
if self.base.get_nvalue() > 1:
values = (self.base.mean(), self.base.stdev(),
self.changed.mean(), self.changed.stdev())
text = "%s +- %s -> %s +- %s" % self.base.format_values(values)
msg = significant_msg(self.base, self.changed)
delta_avg = quantity_delta(self.base, self.changed)
return ("Mean +- std dev: %s: %s\n%s"
% (text, delta_avg, msg))
else:
format_value = self.base.format_value
base = self.base.mean()
changed = self.changed.mean()
delta_avg = quantity_delta(self.base, self.changed)
return ("%s -> %s: %s"
% (format_value(base),
format_value(changed),
delta_avg))
def quantity_delta(base, changed):
old = base.mean()
new = changed.mean()
is_time = (base.get_unit() == 'second')
if old == 0 or new == 0:
return "incomparable (one result was zero)"
if new > old:
if is_time:
return "%.2fx slower" % (new / old)
else:
return "%.2fx larger" % (new / old)
elif new < old:
if is_time:
return "%.2fx faster" % (old / new)
else:
return "%.2fx smaller" % (old / new)
else:
return "no change"