def test_iter_list_same(self):
# Test that iter data and list data give the same result.
# This is an explicit test that iterators and lists are treated the
# same; justification for this test over and above the similar test
# in UnivariateCommonMixin is that an earlier design had variance and
# friends swap between one- and two-pass algorithms, which would
# sometimes give different results.
data = [random.uniform(-3, 8) for _ in range(1000)]
expected = self.func(data)
self.assertEqual(self.func(iter(data)), expected)
python类variance()的实例源码
def test_exact_uniform(self):
# Test the variance against an exact result for uniform data.
data = list(range(10000))
random.shuffle(data)
expected = (10000**2 - 1)/12 # Exact value.
self.assertEqual(self.func(data), expected)
def test_ints(self):
# Test population variance with int data.
data = [4, 7, 13, 16]
exact = 22.5
self.assertEqual(self.func(data), exact)
def test_decimals(self):
# Test population variance with Decimal data.
D = Decimal
data = [D("12.1"), D("12.2"), D("12.5"), D("12.9")]
exact = D('0.096875')
result = self.func(data)
self.assertEqual(result, exact)
self.assertIsInstance(result, Decimal)
def test_ints(self):
# Test sample variance with int data.
data = [4, 7, 13, 16]
exact = 30
self.assertEqual(self.func(data), exact)
def test_fractions(self):
# Test sample variance with Fraction data.
F = Fraction
data = [F(1, 4), F(1, 4), F(3, 4), F(7, 4)]
exact = F(1, 2)
result = self.func(data)
self.assertEqual(result, exact)
self.assertIsInstance(result, Fraction)
def test_decimals(self):
# Test sample variance with Decimal data.
D = Decimal
data = [D(2), D(2), D(7), D(9)]
exact = 4*D('9.5')/D(3)
result = self.func(data)
self.assertEqual(result, exact)
self.assertIsInstance(result, Decimal)
def test_compare_to_variance(self):
# Test that stdev is, in fact, the square root of variance.
data = [random.uniform(-17, 24) for _ in range(1000)]
expected = math.sqrt(statistics.pvariance(data))
self.assertEqual(self.func(data), expected)
def pvariance(text):
"""
Finds the population variance of a space-separated list of numbers.
Example::
/pvariance 33 54 43 65 43 62
"""
return format_output(statistics.pvariance(parse_numeric_list(text)))
def variance(text):
"""
Finds the variance of a space-separated list of numbers.
Example::
/variance 33 54 43 65 43 62
"""
return format_output(statistics.variance(parse_numeric_list(text)))
def setup():
commands.add(mean)
commands.add(median)
commands.add(median_low)
commands.add(median_high)
commands.add(median_grouped)
commands.add(mode)
commands.add(pstdev)
commands.add(pvariance)
commands.add(stdev)
commands.add(variance)
def setup_method(self, method):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.values = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6]
self.count = len(self.values)
self.min = min(self.values)
self.max = max(self.values)
self.total = sum(self.values)
self.mean = round(sum(self.values) / len(self.values), 2)
self.median = round(statistics.median(self.values), 2)
self.deviation = round(statistics.stdev(self.values), 2)
self.variance = round(statistics.variance(self.values), 2)
def test_sensor_source(self):
"""Test if source is a sensor."""
assert setup_component(self.hass, 'sensor', {
'sensor': {
'platform': 'statistics',
'name': 'test',
'entity_id': 'sensor.test_monitored',
}
})
for value in self.values:
self.hass.states.set('sensor.test_monitored', value,
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS})
self.hass.block_till_done()
state = self.hass.states.get('sensor.test_mean')
self.assertEqual(str(self.mean), state.state)
self.assertEqual(self.min, state.attributes.get('min_value'))
self.assertEqual(self.max, state.attributes.get('max_value'))
self.assertEqual(self.variance, state.attributes.get('variance'))
self.assertEqual(self.median, state.attributes.get('median'))
self.assertEqual(self.deviation,
state.attributes.get('standard_deviation'))
self.assertEqual(self.mean, state.attributes.get('mean'))
self.assertEqual(self.count, state.attributes.get('count'))
self.assertEqual(self.total, state.attributes.get('total'))
self.assertEqual('°C', state.attributes.get('unit_of_measurement'))
def __init__(self, hass, entity_id, name, sampling_size):
"""Initialize the Statistics sensor."""
self._hass = hass
self._entity_id = entity_id
self.is_binary = True if self._entity_id.split('.')[0] == \
'binary_sensor' else False
if not self.is_binary:
self._name = '{} {}'.format(name, ATTR_MEAN)
else:
self._name = '{} {}'.format(name, ATTR_COUNT)
self._sampling_size = sampling_size
self._unit_of_measurement = None
if self._sampling_size == 0:
self.states = deque()
else:
self.states = deque(maxlen=self._sampling_size)
self.median = self.mean = self.variance = self.stdev = 0
self.min = self.max = self.total = self.count = 0
@callback
# pylint: disable=invalid-name
def async_stats_sensor_state_listener(entity, old_state, new_state):
"""Called when the sensor changes state."""
self._unit_of_measurement = new_state.attributes.get(
ATTR_UNIT_OF_MEASUREMENT)
try:
self.states.append(float(new_state.state))
self.count = self.count + 1
except ValueError:
self.count = self.count + 1
hass.async_add_job(self.async_update_ha_state, True)
async_track_state_change(
hass, entity_id, async_stats_sensor_state_listener)
def state_attributes(self):
"""Return the state attributes of the sensor."""
if not self.is_binary:
return {
ATTR_MEAN: self.mean,
ATTR_COUNT: self.count,
ATTR_MAX_VALUE: self.max,
ATTR_MEDIAN: self.median,
ATTR_MIN_VALUE: self.min,
ATTR_SAMPLING_SIZE: 'unlimited' if self._sampling_size is
0 else self._sampling_size,
ATTR_STANDARD_DEVIATION: self.stdev,
ATTR_TOTAL: self.total,
ATTR_VARIANCE: self.variance,
}
def attachment_marker(stream_id: uuid, CC_obj: CerebralCortex, config: dict, start_time=None, end_time=None):
"""
Label sensor data as sensor-on-body, sensor-off-body, or improper-attachment.
All the labeled data (st, et, label) with its metadata are then stored in a datastore
:param stream_id: UUID
:param CC_obj: CerebralCortex object
:param config: Data diagnostics configurations
"""
stream = CC_obj.get_datastream(stream_id, data_type=DataSet.COMPLETE, start_time=start_time, end_time=end_time)
results = OrderedDict()
threshold_val = None
stream_name = stream._name
if stream_name == config["stream_names"]["autosense_ecg"]:
threshold_val = config['attachment_marker']['ecg_on_body']
label_on = config['labels']['ecg_on_body']
label_off = config['labels']['ecg_off_body']
elif stream_name == config["stream_names"]["autosense_rip"]:
threshold_val = config['attachment_marker']['rip_on_body']
label_on = config['labels']['rip_on_body']
label_off = config['labels']['rip_off_body']
else:
raise ValueError("Incorrect sensor type.")
windowed_data = window(stream.data, config['general']['window_size'], False)
for key, data in windowed_data.items():
# remove outliers from a window data
normal_values = outlier_detection(data)
if stat.variance(normal_values) < threshold_val:
results[key] = label_off
else:
results[key] = label_on
merged_windows = merge_consective_windows(results)
input_streams = [{"id": str(stream_id), "name": stream_name}]
store(input_streams, merged_windows, CC_obj, config, config["algo_names"]["attachment_marker"])
# TODO: gsr_response method is not being used. Need to make sure whether GSR values actually respresent GSR data.