def test_summary_max_shortest_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area)
dist = x2 - xs
ind = np.argmin(dist)
x0 = xs[ind]
x2 = x2[ind]
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(x0, summary[0], atol=0.05)
assert np.isclose(x2, summary[2], atol=0.05)
python类isclose()的实例源码
def test_summary_max_shortest_3(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.95
c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area)
dist = x2 - xs
ind = np.argmin(dist)
x0 = xs[ind]
x2 = x2[ind]
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(x0, summary[0], atol=0.05)
assert np.isclose(x2, summary[2], atol=0.05)
def test_summary_max_central_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_central", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
xval = interp1d(cdf, xs)([0.5 - 0.5 * summary_area, 0.5 + 0.5 * summary_area])
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(xval[0], summary[0], atol=0.05)
assert np.isclose(xval[1], summary[2], atol=0.05)
def test_meta_from_position(igmsp):
# One match
meta = igmsp.meta_from_position((0.0019,17.7737), 1*u.arcsec)
assert len(meta) == 1
# Blank
meta2 = igmsp.meta_from_position((10.038604,55.298477), 1*u.arcsec)
assert meta2 is None
# Multiple sources (insure rank order)
meta3 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg)
assert len(meta3) == 2
assert np.isclose(meta3['R'][0],meta3['R'][1])
# Multiple meta entries (GGG)
meta4 = igmsp.meta_from_position('001115.23+144601.8', 1*u.arcsec)
assert len(meta4) == 2
assert meta4['R'][0] != meta4['R'][1]
# Multiple but grab closest source
meta5 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg, max_match=1)
assert len(meta5) == 1
# Groups
meta = igmsp.meta_from_position((2.813500,14.767200), 20*u.deg, groups=['GGG','HD-LLS_DR1'])
for group in meta['GROUP'].data:
assert group in ['GGG', 'HD-LLS_DR1']
# Physical separation
meta6 = igmsp.meta_from_position('001115.23+144601.8', 300*u.kpc)
assert len(meta6) == 2
def test_spectra_from_meta(igmsp): # Base level operation
# One match
meta = igmsp.meta_from_position((0.0019,17.7737), 1*u.arcsec)
spec = igmsp.spectra_from_meta(meta)
# Two sources, one meta entry each
meta2 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg)
spec2 = igmsp.spectra_from_meta(meta2)
assert spec2.nspec == 2
# Many sources and meta entries; groups separated
meta3 = igmsp.meta_from_position((2.813500,14.767200), 20*u.deg)#, groups=['GGG','HD-LLS_DR1'])
spec3 = igmsp.spectra_from_meta(meta3)
assert spec3.nspec == 15
# Many sources and meta entries; groups scrambled
idx = np.arange(15).astype(int)
idx[1] = 13
idx[13] = 1
meta4 = meta3[idx]
spec4 = igmsp.spectra_from_meta(meta4)#, debug=True)
spec4.select = 1
assert np.isclose(meta4['WV_MIN'][1], spec4.wvmin.value)
def ft_autocorrelation_function(self, k):
"""Compute the 3D Fourier transform of the isotropic correlation
function for an independent sphere for given magnitude k of the 3D wave vector
(float).
"""
X = self.radius * np.asarray(k)
volume_sphere = 4.0 / 3 * np.pi * self.radius**3
bessel_term = np.empty_like(X)
zero_X = np.isclose(X, 0)
non_zero_X = np.logical_not(zero_X)
X_non_zero = X[non_zero_X]
bessel_term[non_zero_X] = (9 * ((np.sin(X_non_zero) - X_non_zero * np.cos(X_non_zero))
/ X_non_zero**3)**2)
bessel_term[zero_X] = 1.0
return self.corr_func_at_origin * volume_sphere * bessel_term
def test_one_sample_pulse_freq(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
frequency = random.uniform(1000, 10000)
duty_cycle = random.uniform(0.2, 0.8)
# Select random counters from the device.
counters = random.sample(self._get_device_counters(x_series_device), 2)
with nidaqmx.Task() as write_task, nidaqmx.Task() as read_task:
write_task.co_channels.add_co_pulse_chan_freq(
counters[0], freq=frequency, duty_cycle=duty_cycle)
write_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
read_task.ci_channels.add_ci_pulse_chan_freq(
counters[1], min_val=1000, max_val=10000)
read_task.ci_channels.all.ci_pulse_freq_term = (
'/{0}InternalOutput'.format(counters[0]))
read_task.start()
write_task.start()
reader = CounterReader(read_task.in_stream)
value_read = reader.read_one_sample_pulse_frequency()
write_task.stop()
assert numpy.isclose(value_read.freq, frequency, rtol=0.05)
assert numpy.isclose(value_read.duty_cycle, duty_cycle, rtol=0.05)
def test_one_sample_pulse_time(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
high_time = random.uniform(0.0001, 0.001)
low_time = random.uniform(0.0001, 0.001)
# Select random counters from the device.
counters = random.sample(self._get_device_counters(x_series_device), 2)
with nidaqmx.Task() as write_task, nidaqmx.Task() as read_task:
write_task.co_channels.add_co_pulse_chan_time(
counters[0], high_time=high_time, low_time=low_time)
write_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
read_task.ci_channels.add_ci_pulse_chan_time(
counters[1], min_val=0.0001, max_val=0.001)
read_task.ci_channels.all.ci_pulse_time_term = (
'/{0}InternalOutput'.format(counters[0]))
read_task.start()
write_task.start()
reader = CounterReader(read_task.in_stream)
value_read = reader.read_one_sample_pulse_time()
write_task.stop()
assert numpy.isclose(value_read.high_time, high_time, rtol=0.05)
assert numpy.isclose(value_read.low_time, low_time, rtol=0.05)
def test_pulse_ticks_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
high_ticks = random.randint(100, 1000)
low_ticks = random.randint(100, 1000)
starting_edge = random.choice([Edge.RISING, Edge.FALLING])
# Select random counters from the device.
counters = random.sample(self._get_device_counters(x_series_device), 2)
with nidaqmx.Task() as write_task, nidaqmx.Task() as read_task:
write_task.co_channels.add_co_pulse_chan_ticks(
counters[0],
'/{0}/100kHzTimebase'.format(x_series_device.name),
high_ticks=high_ticks, low_ticks=low_ticks)
write_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
read_task.ci_channels.add_ci_pulse_chan_ticks(
counters[1], source_terminal='/{0}/100kHzTimebase'.format(
x_series_device.name),
min_val=100, max_val=1000)
read_task.ci_channels.all.ci_pulse_ticks_term = (
'/{0}InternalOutput'.format(counters[0]))
read_task.ci_channels.all.ci_pulse_ticks_starting_edge = (
starting_edge)
read_task.start()
write_task.start()
reader = CounterReader(read_task.in_stream)
value_read = reader.read_one_sample_pulse_ticks()
write_task.stop()
assert numpy.isclose(
value_read.high_tick, high_ticks, rtol=0.05, atol=1)
assert numpy.isclose(
value_read.low_tick, low_ticks, rtol=0.05, atol=1)
def test_pulse_freq_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
frequency = random.uniform(100, 1000)
duty_cycle = random.uniform(0.2, 0.8)
starting_edge = random.choice([Edge.RISING, Edge.FALLING])
# Select random counters from the device.
counters = random.sample(self._get_device_counters(x_series_device), 2)
with nidaqmx.Task() as write_task, nidaqmx.Task() as read_task:
write_task.co_channels.add_co_pulse_chan_freq(
counters[0], freq=frequency, duty_cycle=duty_cycle)
write_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
read_task.ci_channels.add_ci_pulse_chan_freq(
counters[1], min_val=100, max_val=1000)
read_task.ci_channels.all.ci_pulse_freq_term = (
'/{0}InternalOutput'.format(counters[0]))
read_task.ci_channels.all.ci_pulse_freq_starting_edge = (
starting_edge)
read_task.start()
write_task.start()
value_read = read_task.read(timeout=2)
write_task.stop()
assert numpy.isclose(value_read.freq, frequency, rtol=0.01)
assert numpy.isclose(value_read.duty_cycle, duty_cycle, rtol=0.01)
def test_pulse_time_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
high_time = random.uniform(0.001, 0.01)
low_time = random.uniform(0.001, 0.01)
starting_edge = random.choice([Edge.RISING, Edge.FALLING])
# Select random counters from the device.
counters = random.sample(self._get_device_counters(x_series_device), 2)
with nidaqmx.Task() as write_task, nidaqmx.Task() as read_task:
write_task.co_channels.add_co_pulse_chan_time(
counters[0], high_time=high_time, low_time=low_time)
write_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
read_task.ci_channels.add_ci_pulse_chan_time(
counters[1], min_val=0.001, max_val=0.01)
read_task.ci_channels.all.ci_pulse_time_term = (
'/{0}InternalOutput'.format(counters[0]))
read_task.ci_channels.all.ci_pulse_time_starting_edge = (
starting_edge)
read_task.start()
write_task.start()
value_read = read_task.read(timeout=2)
write_task.stop()
assert numpy.isclose(value_read.high_time, high_time, rtol=0.01)
assert numpy.isclose(value_read.low_time, low_time, rtol=0.01)
def test_create_ai_thrmstr_chan_iex(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_thrmstr_chan_iex(
ai_phys_chan, name_to_assign_to_channel="ThermistorIexChannel",
min_val=-30.0, max_val=300.0, units=TemperatureUnits.DEG_C,
resistance_config=ResistanceConfiguration.FOUR_WIRE,
current_excit_source=ExcitationSource.EXTERNAL,
current_excit_val=0.0001, a=0.0013, b=0.00023,
c=0.000000102)
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "ThermistorIexChannel"
assert numpy.isclose(ai_channel.ai_min, -30.0, atol=1)
assert numpy.isclose(ai_channel.ai_max, 300.0, atol=1)
assert ai_channel.ai_temp_units == TemperatureUnits.DEG_C
assert (ai_channel.ai_resistance_cfg ==
ResistanceConfiguration.FOUR_WIRE)
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 0.0001
assert ai_channel.ai_thrmstr_a == 0.0013
assert ai_channel.ai_thrmstr_b == 0.00023
assert ai_channel.ai_thrmstr_c == 0.000000102
def test_create_ai_thrmstr_chan_vex(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_thrmstr_chan_vex(
ai_phys_chan, name_to_assign_to_channel="ThermistorVexChannel",
min_val=-50.0, max_val=300.0, units=TemperatureUnits.DEG_C,
resistance_config=ResistanceConfiguration.FOUR_WIRE,
voltage_excit_source=ExcitationSource.EXTERNAL,
voltage_excit_val=2.5, a=0.001295361, b=0.0002343159,
c=0.0000001018703, r_1=5000.0)
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "ThermistorVexChannel"
assert numpy.isclose(ai_channel.ai_min, -50.0, atol=1)
assert numpy.isclose(ai_channel.ai_max, 300.0, atol=1)
assert ai_channel.ai_temp_units == TemperatureUnits.DEG_C
assert (ai_channel.ai_resistance_cfg ==
ResistanceConfiguration.FOUR_WIRE)
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 2.5
assert ai_channel.ai_thrmstr_a == 0.001295361
assert ai_channel.ai_thrmstr_b == 0.0002343159
assert ai_channel.ai_thrmstr_c == 0.0000001018703
assert ai_channel.ai_thrmstr_r_1 == 5000.0
def test_ai_strain_gage_chan(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_strain_gage_chan(
ai_phys_chan, name_to_assign_to_channel="StrainGageChannel",
min_val=-0.05, max_val=0.05, units=StrainUnits.STRAIN,
strain_config=StrainGageBridgeType.FULL_BRIDGE_I,
voltage_excit_source=ExcitationSource.EXTERNAL,
voltage_excit_val=1.0, gage_factor=4.0,
initial_bridge_voltage=0.0, nominal_gage_resistance=350.0,
poisson_ratio=0.30, lead_wire_resistance=0.1,
custom_scale_name="")
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "StrainGageChannel"
assert numpy.isclose(ai_channel.ai_min, -0.05)
assert numpy.isclose(ai_channel.ai_max, 0.05)
assert ai_channel.ai_strain_units == StrainUnits.STRAIN
assert (ai_channel.ai_strain_gage_cfg ==
StrainGageBridgeType.FULL_BRIDGE_I)
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 1.0
assert ai_channel.ai_strain_gage_gage_factor == 4.0
assert ai_channel.ai_bridge_initial_voltage == 0.0
assert ai_channel.ai_strain_gage_poisson_ratio == 0.30
assert ai_channel.ai_lead_wire_resistance == 0.1
def test_create_ai_voltage_chan_with_excit(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_voltage_chan_with_excit(
ai_phys_chan, name_to_assign_to_channel="VoltageExcitChannel",
terminal_config=TerminalConfiguration.NRSE,
min_val=-10.0, max_val=10.0, units=VoltageUnits.VOLTS,
bridge_config=BridgeConfiguration.NO_BRIDGE,
voltage_excit_source=ExcitationSource.EXTERNAL,
voltage_excit_val=0.1, use_excit_for_scaling=False,
custom_scale_name="")
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "VoltageExcitChannel"
assert ai_channel.ai_term_cfg == TerminalConfiguration.NRSE
assert numpy.isclose(ai_channel.ai_min, -10.0)
assert numpy.isclose(ai_channel.ai_max, 10.0)
assert ai_channel.ai_voltage_units == VoltageUnits.VOLTS
assert ai_channel.ai_bridge_cfg == BridgeConfiguration.NO_BRIDGE
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 0.1
assert not ai_channel.ai_excit_use_for_scaling
def test_acoustic_material():
water = fds.AcousticMaterial(1500, 1000)
water.bulk_viscosity = 1e-3
water.shear_viscosity = 1e-3
assert np.isclose(water.absorption_coef, 7e-3 / 3)
water.absorption_coef = 3e-3
assert np.isclose(water.absorption_coef, 3e-3)
def test_acoustic_material_absorption_coef():
water = fds.AcousticMaterial(1500, 1000, absorption_coef=2e-3)
assert np.isclose(water.absorption_coef, 2e-3)
def active_set_Lam(self, fixed, vary):
grad = self.grad_wrt_Lam(fixed, vary)
assert np.allclose(grad, grad.T, 1e-3)
return np.where((np.abs(np.triu(grad)) > self.lamL) | (self.Lam != 0))
# return np.where((np.abs(grad) > self.lamL) | (~np.isclose(self.Lam, 0)))
def active_set_Theta(self, fixed, vary):
grad = self.grad_wrt_Theta(fixed, vary)
return np.where((np.abs(grad) > self.lamT) | (self.Theta != 0))
# return np.where((np.abs(grad) > self.lamT) | (~np.isclose(self.Theta, 0)))
def assert_allclose(x, y, rtol=1e-10, atol=1e-8):
"""Drop in replacement for `numpy.testing.assert_allclose` that shows the nonmatching elements"""
if np.isscalar(x) and np.isscalar(y) == 1:
return np.testing.assert_allclose(x, y, rtol=rtol, atol=atol)
if x.shape != y.shape:
raise AssertionError("Shape mismatch: %s vs %s" % (str(x.shape), str(y.shape)))
d = ~np.isclose(x, y, rtol, atol)
if np.any(d):
miss = np.where(d)[0]
raise AssertionError("""Mismatch of %d elements (%g %%) at the level of rtol=%g, atol=%g
%s
%s
%s""" % (len(miss), len(miss)/x.size, rtol, atol, repr(miss), str(x[d]), str(y[d])))