def test_savgol_filter():
"""Test savgol filtering
"""
h_freq = 10.
raw, events = _get_data()[:2]
epochs = Epochs(raw, events, event_id, tmin, tmax)
assert_raises(RuntimeError, epochs.savgol_filter, 10.)
epochs = Epochs(raw, events, event_id, tmin, tmax, preload=True)
freqs = fftpack.fftfreq(len(epochs.times), 1. / epochs.info['sfreq'])
data = np.abs(fftpack.fft(epochs.get_data()))
match_mask = np.logical_and(freqs >= 0, freqs <= h_freq / 2.)
mismatch_mask = np.logical_and(freqs >= h_freq * 2, freqs < 50.)
epochs.savgol_filter(h_freq)
data_filt = np.abs(fftpack.fft(epochs.get_data()))
# decent in pass-band
assert_allclose(np.mean(data[:, :, match_mask], 0),
np.mean(data_filt[:, :, match_mask], 0),
rtol=1e-4, atol=1e-2)
# suppression in stop-band
assert_true(np.mean(data[:, :, mismatch_mask]) >
np.mean(data_filt[:, :, mismatch_mask]) * 5)
python类fftfreq()的实例源码
test_epochs.py 文件源码
项目:decoding_challenge_cortana_2016_3rd
作者: kingjr
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
test_evoked.py 文件源码
项目:decoding_challenge_cortana_2016_3rd
作者: kingjr
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_savgol_filter():
"""Test savgol filtering
"""
h_freq = 10.
evoked = read_evokeds(fname, 0)
freqs = fftpack.fftfreq(len(evoked.times), 1. / evoked.info['sfreq'])
data = np.abs(fftpack.fft(evoked.data))
match_mask = np.logical_and(freqs >= 0, freqs <= h_freq / 2.)
mismatch_mask = np.logical_and(freqs >= h_freq * 2, freqs < 50.)
assert_raises(ValueError, evoked.savgol_filter, evoked.info['sfreq'])
evoked_sg = evoked.copy().savgol_filter(h_freq)
data_filt = np.abs(fftpack.fft(evoked_sg.data))
# decent in pass-band
assert_allclose(np.mean(data[:, match_mask], 0),
np.mean(data_filt[:, match_mask], 0),
rtol=1e-4, atol=1e-2)
# suppression in stop-band
assert_true(np.mean(data[:, mismatch_mask]) >
np.mean(data_filt[:, mismatch_mask]) * 5)
# original preserved
assert_allclose(data, np.abs(fftpack.fft(evoked.data)), atol=1e-16)
def _precompute_st_windows(n_samp, start_f, stop_f, sfreq, width):
"""Precompute stockwell gausian windows (in the freq domain)"""
tw = fftpack.fftfreq(n_samp, 1. / sfreq) / n_samp
tw = np.r_[tw[:1], tw[1:][::-1]]
k = width # 1 for classical stowckwell transform
f_range = np.arange(start_f, stop_f, 1)
windows = np.empty((len(f_range), len(tw)), dtype=np.complex)
for i_f, f in enumerate(f_range):
if f == 0.:
window = np.ones(len(tw))
else:
window = ((f / (np.sqrt(2. * np.pi) * k)) *
np.exp(-0.5 * (1. / k ** 2.) * (f ** 2.) * tw ** 2.))
window /= window.sum() # normalisation
windows[i_f] = fftpack.fft(window)
return windows
def check_and_phase_shift(trace):
if trace.stats.npts < 10 * trace.stats.sampling_rate:
trace.data = np.zeros(trace.stats.npts)
return
dt = np.mod(trace.stats.starttime.datetime.microsecond * 1.0e-6,
trace.stats.delta)
if (trace.stats.delta - dt) <= np.finfo(float).eps:
dt = 0.
if dt != 0.:
if dt <= (trace.stats.delta / 2.):
dt = -dt
# direction = "left"
else:
dt = (trace.stats.delta - dt)
# direction = "right"
# log.debug("correcting time by %.6fs"%dt)
trace.detrend(type="demean")
trace.detrend(type="simple")
trace.taper(max_percentage=None, max_length=1.0)
n = next_fast_len(int(trace.stats.npts))
FFTdata = fft(trace.data, n=n)
freq = fftfreq(n, d=trace.stats.delta)
FFTdata = FFTdata * np.exp(1j * 2. * np.pi * freq * dt)
FFTdata = FFTdata.astype(np.complex64)
ifft(FFTdata, n=n, overwrite_x=True)
trace.data = np.real(FFTdata[:len(trace.data)]).astype(np.float)
trace.stats.starttime += dt
# clean_scipy_cache()
def stftfreq(wsize, sfreq=None):
"""Frequencies of stft transformation
Parameters
----------
wsize : int
Size of stft window
sfreq : float
Sampling frequency. If None the frequencies are given between 0 and pi
otherwise it's given in Hz.
Returns
-------
freqs : array
The positive frequencies returned by stft
See Also
--------
stft
istft
"""
n_freq = wsize // 2 + 1
freqs = fftfreq(wsize)
freqs = np.abs(freqs[:n_freq])
if sfreq is not None:
freqs *= float(sfreq)
return freqs
def shift_waveform(tr, dtshift):
"""
tr_shift = shift_waveform(tr, dtshift):
Shift data in trace tr by dtshift seconds backwards.
Parameters
----------
tr : obspy.Trace
Trace that contains the data to shift
dtshift : float
Time shift in seconds
Returns
-------
tr_shift : obspy.Trace
Copy of tr, but with data shifted dtshift seconds backwards.
"""
data_pad = np.r_[tr.data, np.zeros_like(tr.data)]
freq = fft.fftfreq(len(data_pad), tr.stats.delta)
shiftvec = np.exp(- 2 * np.pi * complex(0., 1.) * freq * dtshift)
data_fd = shiftvec * fft.fft(data_pad *
signal.tukey(len(data_pad),
alpha=0.2))
tr.data = np.real(fft.ifft(data_fd))[0:tr.stats.npts]
data_load.py 文件源码
项目:Automatic-feature-extraction-from-signal
作者: VVVikulin
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def simple_lf_filter(signal, sample_rate=1000):
W = fftfreq(signal.size, d=1/float(sample_rate))
f_signal = rfft(signal)
cut_f_signal = f_signal.copy()
cut_f_signal[(W<5)] = 0
return irfft(cut_f_signal).astype('int16')
data_load.py 文件源码
项目:Automatic-feature-extraction-from-signal
作者: VVVikulin
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def simple_hf_filter(signal, sample_rate=1000):
W = fftfreq(signal.size, d=1/float(sample_rate))
f_signal = rfft(signal)
cut_f_signal = f_signal.copy()
cut_f_signal[(W>70)] = 0
return irfft(cut_f_signal).astype('int16')
def psd(data, dt, ndivide=1, window=hanning, overlap_half=False):
"""Calculate power spectrum density of data.
Args:
data (np.ndarray): Input data.
dt (float): Time between each data.
ndivide (int): Do averaging (split data into ndivide, get psd of each, and average them).
ax (matplotlib.axes): Axis you want to plot on.
doplot (bool): Plot how averaging works.
overlap_half (bool): Split data to half-overlapped regions.
Returns:
vk (np.ndarray): Frequency.
psd (np.ndarray): PSD
"""
logger = getLogger('decode.utils.ndarray.psd')
if overlap_half:
step = int(len(data) / (ndivide + 1))
size = step * 2
else:
step = int(len(data) / ndivide)
size = step
if bin(len(data)).count('1') != 1:
logger.warning('warning: length of data is not power of 2: {}'.format(len(data)))
size = int(len(data) / ndivide)
if bin(size).count('1') != 1.:
if overlap_half:
logger.warning('warning: ((length of data) / (ndivide+1)) * 2 is not power of 2: {}'.format(size))
else:
logger.warning('warning: (length of data) / ndivide is not power of 2: {}'.format(size))
psd = np.zeros(size)
T = (size - 1) * dt
vs = 1 / dt
vk_ = fftfreq(size, dt)
vk = vk_[np.where(vk_ >= 0)]
for i in range(ndivide):
d = data[i * step:i * step + size]
if window is None:
w = np.ones(size)
corr = 1.0
else:
w = window(size)
corr = np.mean(w**2)
psd = psd + 2 * (np.abs(fft(d * w)))**2 / size * dt / corr
return vk, psd[:len(vk)] / ndivide
def gvectors(self, ikpt=1, gamma=False):
'''
Generate the G-vectors that satisfies the following relation
(G + k)**2 / 2 < ENCUT
'''
assert 1 <= ikpt <= self._nkpts, 'Invalid kpoint index!'
kvec = self._kvecs[ikpt-1]
# fx, fy, fz = [fftfreq(n) * n for n in self._ngrid]
# fftfreq in scipy.fftpack is a little different with VASP frequencies
fx = [ii if ii < self._ngrid[0] / 2 + 1 else ii - self._ngrid[0]
for ii in range(self._ngrid[0])]
fy = [jj if jj < self._ngrid[1] / 2 + 1 else jj - self._ngrid[1]
for jj in range(self._ngrid[1])]
fz = [kk if kk < self._ngrid[2] / 2 + 1 else kk - self._ngrid[2]
for kk in range(self._ngrid[2])]
if gamma:
# parallel gamma version of VASP WAVECAR exclude some planewave
# components, -DwNGZHalf
kgrid = np.array([(fx[ii], fy[jj], fz[kk])
for kk in range(self._ngrid[2])
for jj in range(self._ngrid[1])
for ii in range(self._ngrid[0])
if (
(fz[kk] > 0) or
(fz[kk] == 0 and fy[jj] > 0) or
(fz[kk] == 0 and fy[jj] == 0 and fx[ii] >= 0)
)], dtype=float)
else:
kgrid = np.array([(fx[ii], fy[jj], fz[kk])
for kk in range(self._ngrid[2])
for jj in range(self._ngrid[1])
for ii in range(self._ngrid[0])], dtype=float)
# Kinetic_Energy = (G + k)**2 / 2
# HSQDTM = hbar**2/(2*ELECTRON MASS)
KENERGY = HSQDTM * np.linalg.norm(
np.dot(kgrid + kvec[np.newaxis,:] , TPI*self._Bcell), axis=1
)**2
# find Gvectors where (G + k)**2 / 2 < ENCUT
Gvec = kgrid[np.where(KENERGY < self._encut)[0]]
if self._lsoc:
assert Gvec.shape[0] == self._nplws[ikpt - 1] / 2, \
'No. of planewaves not consistent for an SOC WAVECAR! %d %d %d' % \
(Gvec.shape[0], self._nplws[ikpt -1], np.prod(self._ngrid))
else:
assert Gvec.shape[0] == self._nplws[ikpt - 1], 'No. of planewaves not consistent! %d %d %d' % \
(Gvec.shape[0], self._nplws[ikpt -1], np.prod(self._ngrid))
return np.asarray(Gvec, dtype=int)
def _mt_spectra(x, dpss, sfreq, n_fft=None):
""" Compute tapered spectra
Parameters
----------
x : array, shape=(n_signals, n_times)
Input signal
dpss : array, shape=(n_tapers, n_times)
The tapers
sfreq : float
The sampling frequency
n_fft : int | None
Length of the FFT. If None, the number of samples in the input signal
will be used.
Returns
-------
x_mt : array, shape=(n_signals, n_tapers, n_times)
The tapered spectra
freqs : array
The frequency points in Hz of the spectra
"""
if n_fft is None:
n_fft = x.shape[1]
# remove mean (do not use in-place subtraction as it may modify input x)
x = x - np.mean(x, axis=-1)[:, np.newaxis]
# only keep positive frequencies
freqs = fftpack.fftfreq(n_fft, 1. / sfreq)
freq_mask = (freqs >= 0)
freqs = freqs[freq_mask]
# The following is equivalent to this, but uses less memory:
# x_mt = fftpack.fft(x[:, np.newaxis, :] * dpss, n=n_fft)
n_tapers = dpss.shape[0] if dpss.ndim > 1 else 1
x_mt = np.zeros((len(x), n_tapers, freq_mask.sum()), dtype=np.complex128)
for idx, sig in enumerate(x):
x_mt[idx] = fftpack.fft(sig[np.newaxis, :] * dpss,
n=n_fft)[:, freq_mask]
return x_mt, freqs