def plotpsd(data, dt, ndivide=1, window=hanning, overlap_half=False, ax=None, **kwargs):
"""Plot PSD (Power Spectral Density).
Args:
data (np.ndarray): Input data.
dt (float): Time between each data.
ndivide (int): Do averaging (split data into ndivide, get psd of each, and average them).
overlap_half (bool): Split data to half-overlapped regions.
ax (matplotlib.axes): Axis the figure is plotted on.
kwargs (optional): Plot options passed to ax.plot().
"""
if ax is None:
ax = plt.gca()
vk, psddata = psd(data, dt, ndivide, window, overlap_half)
ax.loglog(vk, psddata, **kwargs)
ax.set_xlabel('Frequency [Hz]', fontsize=20, color='grey')
ax.set_ylabel('PSD', fontsize=20, color='grey')
ax.legend()
python类hanning()的实例源码
def _lagged_coherence_1freq(x, f, Fs, N_cycles=3, f_step=1):
"""Calculate lagged coherence of x at frequency f using the hanning-taper FFT method"""
Nsamp = int(np.ceil(N_cycles*Fs / f))
# For each N-cycle chunk, calculate phase
chunks = _nonoverlapping_chunks(x,Nsamp)
C = len(chunks)
hann_window = signal.hanning(Nsamp)
fourier_f = np.fft.fftfreq(Nsamp,1/float(Fs))
fourier_f_idx = _arg_closest_value(fourier_f,f)
fourier_coefsoi = np.zeros(C,dtype=complex)
for i2, c in enumerate(chunks):
fourier_coef = np.fft.fft(c*hann_window)
fourier_coefsoi[i2] = fourier_coef[fourier_f_idx]
lcs_num = 0
for i2 in range(C-1):
lcs_num += fourier_coefsoi[i2]*np.conj(fourier_coefsoi[i2+1])
lcs_denom = np.sqrt(np.sum(np.abs(fourier_coefsoi[:-1])**2)*np.sum(np.abs(fourier_coefsoi[1:])**2))
return np.abs(lcs_num/lcs_denom)
def ltsd_vad(x, fs, threshold=9, winsize=8192):
# winsize based on sample rate
# 1024 for fs = 16000
orig_dtype = x.dtype
orig_scale_min = x.min()
orig_scale_max = x.max()
x = (x - x.min()) / (x.max() - x.min())
# works with 16 bit
x = x * (2 ** 15)
x = x.astype("int32")
window = sp.hanning(winsize)
ltsd = LTSD(winsize, window, 5)
s_vad = ltsd.compute(x)
# LTSD is 50% overlap, so each "step" covers 4096 samples
# +1 to cover the extra edge window
n_samples = int(((len(s_vad) + 1) * winsize) // 2)
time_s = n_samples / float(fs)
time_points = np.linspace(0, time_s, len(s_vad))
time_samples = (fs * time_points).astype(np.int32)
time_samples = time_samples
f_vad = np.zeros_like(x, dtype=np.bool)
offset = winsize
for n, (ss, es) in enumerate(zip(time_samples[:-1], time_samples[1:])):
sss = ss - offset
if sss < 0:
sss = 0
ses = es - offset
if ses < 0:
ses = 0
if s_vad[n + 1] < threshold:
f_vad[sss:ses] = False
else:
f_vad[sss:ses] = True
f_vad[ses:] = False
x = x.astype("float64")
x = x / float(2 ** 15)
x = x * (orig_scale_max - orig_scale_min) + orig_scale_min
x = x.astype(orig_dtype)
return x[f_vad], f_vad
def ltsd_vad(x, fs, threshold=9, winsize=8192):
# winsize based on sample rate
# 1024 for fs = 16000
orig_dtype = x.dtype
orig_scale_min = x.min()
orig_scale_max = x.max()
x = (x - x.min()) / (x.max() - x.min())
# works with 16 bit
x = x * (2 ** 15)
x = x.astype("int32")
window = sp.hanning(winsize)
ltsd = LTSD(winsize, window, 5)
s_vad = ltsd.compute(x)
# LTSD is 50% overlap, so each "step" covers 4096 samples
# +1 to cover the extra edge window
n_samples = int(((len(s_vad) + 1) * winsize) // 2)
time_s = n_samples / float(fs)
time_points = np.linspace(0, time_s, len(s_vad))
time_samples = (fs * time_points).astype(np.int32)
time_samples = time_samples
f_vad = np.zeros_like(x, dtype=np.bool)
offset = winsize
for n, (ss, es) in enumerate(zip(time_samples[:-1], time_samples[1:])):
sss = ss - offset
if sss < 0:
sss = 0
ses = es - offset
if ses < 0:
ses = 0
if s_vad[n + 1] < threshold:
f_vad[sss:ses] = False
else:
f_vad[sss:ses] = True
f_vad[ses:] = False
x = x.astype("float64")
x = x / float(2 ** 15)
x = x * (orig_scale_max - orig_scale_min) + orig_scale_min
x = x.astype(orig_dtype)
return x[f_vad], f_vad
def main(fn,start,end):
fn = Path(fn).expanduser()
#rx_array is loading the last 45% of the waveform from the file
rx_array = load_bin(fn, start, end)
#peak_array holds the indexes of each peak in the waveform
#peak_distance is the smallest distance between each peak
peak_array,peak_distance = get_peaks(rx_array)
l = peak_distance-1
print('using window: ',l,'\n')
#remove first peak
peak_array= peak_array[1:]
Npulse=len(peak_array)-1
print(Npulse,'pulses detected')
wind = signal.hanning(l)
Ntone = 2
Nblockest = 160
fs = 4e6 # [Hz]
data = np.empty([Npulse,l])
#set each row of data to window * (first l samples after each peak)
for i in range(Npulse):
data[i,:] = wind * rx_array[peak_array[i]:peak_array[i]+l]
fb_est, sigma = esprit(data, Ntone, Nblockest, fs)
print ('fb_est',fb_est)
print ('sigma: ', sigma)
drange = (3e8*fb_est) / (2e6/.1)
print ('range: ',drange,'\n')
def NMREval(self, xn, xnhat):
""" Method to perform NMR perceptual evaluation of audio quality between two signals.
Args :
xn : (ndarray) 1D Array containing the true time domain signal.
xnhat : (ndarray) 1D Array containing the estimated time domain signal.
Returns :
NMR : (float) A float measurement in dB providing a perceptually weighted
evaluation. Below -9 dB can be considered as in-audible difference/error.
As appears in :
- K. Brandenburg and T. Sporer, “NMR and Masking Flag: Evaluation of Quality Using Perceptual Criteria,” in
Proceedings of the AES 11th International Conference on Test and Measurement, Portland, USA, May 1992, pp. 169–179
- J. Nikunen and T. Virtanen, "Noise-to-mask ratio minimization by weighted non-negative matrix factorization," in
Acoustics Speech and Signal Processing (ICASSP), 2010 IEEE International Conference on, Dallas, TX, 2010, pp. 25-28.
"""
mX, _ = TimeFrequencyDecomposition.STFT(xn, hanning(self.nfft/2 + 1), self.nfft, self.nfft/4)
mXhat, _ = TimeFrequencyDecomposition.STFT(xnhat, hanning(self.nfft/2 + 1), self.nfft, self.nfft/4)
# Compute Error
Err = np.abs(mX - mXhat) ** 2.
# Acquire Masking Threshold
mT = self.maskingThreshold(mX)
# Inverse the filter of masking threshold
imT = 1./(mT + eps)
# Outer/Middle Ear transfer function on the diagonal
LTq = 10 ** (self.MOEar()/20.)
# NMR computation
NMR = 10. * np.log10((1./mX.shape[0]) * self._maxb * np.sum((imT * (Err*LTq))))
print(NMR)
return NMR
def get_ft_windows():
""" Retrieve the available windows to be applied on signal data before FT.
@return: dict with keys being the window name and items being again a dict
containing the actual function and the normalization factor to
calculate correctly the amplitude spectrum in the Fourier Transform
To find out the amplitude normalization factor check either the scipy
implementation on
https://github.com/scipy/scipy/blob/v0.15.1/scipy/signal/windows.py#L336
or just perform a sum of the window (oscillating parts of the window should
be averaged out and constant offset factor will remain):
MM=1000000 # choose a big number
print(sum(signal.hanning(MM))/MM)
"""
win = {'none': {'func': np.ones, 'ampl_norm': 1.0},
'hamming': {'func': signal.hamming, 'ampl_norm': 1.0/0.54},
'hann': {'func': signal.hann, 'ampl_norm': 1.0/0.5},
'blackman': {'func': signal.blackman, 'ampl_norm': 1.0/0.42},
'triang': {'func': signal.triang, 'ampl_norm': 1.0/0.5},
'flattop': {'func': signal.flattop, 'ampl_norm': 1.0/0.2156},
'bartlett': {'func': signal.bartlett, 'ampl_norm': 1.0/0.5},
'parzen': {'func': signal.parzen, 'ampl_norm': 1.0/0.375},
'bohman': {'func': signal.bohman, 'ampl_norm': 1.0/0.4052847},
'blackmanharris': {'func': signal.blackmanharris, 'ampl_norm': 1.0/0.35875},
'nuttall': {'func': signal.nuttall, 'ampl_norm': 1.0/0.3635819},
'barthann': {'func': signal.barthann, 'ampl_norm': 1.0/0.5}}
return win
def test_smooth():
tr = get_rand_traj()
assert len(tr.attrs_nstep) > 0
trs = crys.smooth(tr, hanning(11))
assert len(trs.attrs_nstep) > 0
assert_attrs_not_none(trs, attr_lst=tr.attr_lst)
for name in tr.attrs_nstep:
a1 = getattr(tr, name)
a2 = getattr(trs, name)
assert a1.shape == a2.shape
assert np.abs(a1 - a2).sum() > 0.0
assert trs.timestep == tr.timestep
assert trs.nstep == tr.nstep
# reproduce data with kernel [0,1,0]
trs = crys.smooth(tr, hanning(3))
for name in tr.attrs_nstep:
a1 = getattr(tr, name)
a2 = getattr(trs, name)
assert np.allclose(a1, a2)
trs1 = crys.smooth(tr, hanning(3), method=1)
trs2 = crys.smooth(tr, hanning(3), method=2)
assert len(trs1.attrs_nstep) > 0
assert len(trs2.attrs_nstep) > 0
for name in tr.attrs_nstep:
a1 = getattr(tr, name)
a2 = getattr(trs1, name)
a3 = getattr(trs2, name)
assert np.allclose(a1, a2)
assert np.allclose(a1, a3)
trs1 = crys.smooth(tr, hanning(11), method=1)
trs2 = crys.smooth(tr, hanning(11), method=2)
assert len(trs1.attrs_nstep) > 0
assert len(trs2.attrs_nstep) > 0
for name in trs1.attrs_nstep:
a1 = getattr(trs1, name)
a2 = getattr(trs2, name)
assert np.allclose(a1, a2)
def psd(data, dt, ndivide=1, window=hanning, overlap_half=False):
"""Calculate power spectrum density of data.
Args:
data (np.ndarray): Input data.
dt (float): Time between each data.
ndivide (int): Do averaging (split data into ndivide, get psd of each, and average them).
ax (matplotlib.axes): Axis you want to plot on.
doplot (bool): Plot how averaging works.
overlap_half (bool): Split data to half-overlapped regions.
Returns:
vk (np.ndarray): Frequency.
psd (np.ndarray): PSD
"""
logger = getLogger('decode.utils.ndarray.psd')
if overlap_half:
step = int(len(data) / (ndivide + 1))
size = step * 2
else:
step = int(len(data) / ndivide)
size = step
if bin(len(data)).count('1') != 1:
logger.warning('warning: length of data is not power of 2: {}'.format(len(data)))
size = int(len(data) / ndivide)
if bin(size).count('1') != 1.:
if overlap_half:
logger.warning('warning: ((length of data) / (ndivide+1)) * 2 is not power of 2: {}'.format(size))
else:
logger.warning('warning: (length of data) / ndivide is not power of 2: {}'.format(size))
psd = np.zeros(size)
T = (size - 1) * dt
vs = 1 / dt
vk_ = fftfreq(size, dt)
vk = vk_[np.where(vk_ >= 0)]
for i in range(ndivide):
d = data[i * step:i * step + size]
if window is None:
w = np.ones(size)
corr = 1.0
else:
w = window(size)
corr = np.mean(w**2)
psd = psd + 2 * (np.abs(fft(d * w)))**2 / size * dt / corr
return vk, psd[:len(vk)] / ndivide
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def f_psd(x, Fs, method,
Hzmed=0, welch_params={'window':'hanning','nperseg':1000,'noverlap':None}):
'''
Calculate the power spectrum of a signal
Parameters
----------
x : array
temporal signal
Fs : integer
sampling rate
method : str in ['fftmed','welch']
Method for calculating PSD
Hzmed : float
relevant if method == 'fftmed'
Frequency width of the median filter
welch_params : dict
relevant if method == 'welch'
Parameters to sp.signal.welch
Returns
-------
f : array
frequencies corresponding to the PSD output
psd : array
power spectrum
'''
if method == 'fftmed':
# Calculate frequencies
N = len(x)
f = np.arange(0,Fs/2,Fs/N)
# Calculate PSD
rawfft = np.fft.fft(x)
psd = np.abs(rawfft[:len(f)])**2
# Median filter
if Hzmed > 0:
sampmed = np.argmin(np.abs(f-Hzmed/2.0))
psd = signal.medfilt(psd,sampmed*2+1)
elif method == 'welch':
f, psd = sp.signal.welch(x, fs=Fs, **welch_params)
else:
raise ValueError('input for PSD method not recognized')
return f, psd
def lagged_coherence(x, frange, Fs, N_cycles=3, f_step=1, return_spectrum=False):
"""
Quantify the rhythmicity of a time series using lagged coherence.
Return the mean lagged coherence in the frequency range as an
estimate of rhythmicity.
As in Fransen et al. 2015
Parameters
----------
x : array-like 1d
voltage time series
f_range : (low, high), Hz
frequency range for narrowband signal of interest, used to find
zerocrossings of the oscillation
Fs : float
The sampling rate (default = 1000Hz)
N_cycles : float
Number of cycles of the frequency of interest to be used in lagged coherence calculate
f_step : float, Hz
step size to calculate lagged coherence in the frequency range.
return_spectrum : bool
if True, return the lagged coherence for all frequency values. otherwise, only return mean
fourier_or_wavelet : string {'fourier', 'wavelet'}
NOT IMPLEMENTED. ONLY FOURIER
method for estimating phase.
fourier: calculate fourier coefficients for each time window. hanning tpaer.
wavelet: multiply each window by a N-cycle wavelet
Returns
-------
rhythmicity : float
mean lagged coherence value in the frequency range of interest
"""
# Identify Fourier components of interest
freqs = np.arange(frange[0],frange[1]+f_step,f_step)
# Calculate lagged coherence for each frequency
F = len(freqs)
lcs = np.zeros(F)
for i,f in enumerate(freqs):
lcs[i] = _lagged_coherence_1freq(x, f, Fs, N_cycles=N_cycles, f_step=f_step)
if return_spectrum:
return lcs
else:
return np.mean(lcs)