def butter_highpass(highcut, fs, order):
nyq = 0.5 * fs
high = highcut / nyq
b, a = butter(order, high, btype="highpass")
return b, a
# Sources for Batch Iterators
#
# These classes load training and test data and perform some basic preprocessing on it.
# They are then passed to factory functions that create the net. There they are used
# as data sources for the batch iterators that feed data to the net.
# All classes band pass or low pass filter their data based on min / max freq using
# a causal filter (lfilter) when the data is first loaded.
# * TrainSource loads a several series of EEG data and events, splices them together into
# one long stream, then normalizes the EEG data to zero mean and unit standard deviation.
# * TestSource is like TrainSource except that it uses the mean and standard deviation
# computed for the associated training source to normalize the EEG data.
# * SubmitSource is like TestSource except that it does not load and event data.
python类lfilter()的实例源码
def phormants(x, Fs):
N = len(x)
w = numpy.hamming(N)
# Apply window and high pass filter.
x1 = x * w
x1 = lfilter([1], [1., 0.63], x1)
# Get LPC.
ncoeff = 2 + Fs / 1000
A, e, k = lpc(x1, ncoeff)
#A, e, k = lpc(x1, 8)
# Get roots.
rts = numpy.roots(A)
rts = [r for r in rts if numpy.imag(r) >= 0]
# Get angles.
angz = numpy.arctan2(numpy.imag(rts), numpy.real(rts))
# Get frequencies.
frqs = sorted(angz * (Fs / (2 * math.pi)))
return frqs
def sp_filter_butterworth_bandpass(input_data, passband, stopband, max_passband_loss, min_stopband_attenuation):
# The SciPy signal processing module uses normalised frequencies, so we need to normalise the input values
norm_passband = input_data.timebase.normalise_freq(passband)
norm_stopband = input_data.timebase.normalise_freq(stopband)
ord,wn = sp_signal.filter_design.buttord(norm_passband, norm_stopband, max_passband_loss, min_stopband_attenuation)
b, a = sp_signal.filter_design.butter(ord, wn, btype = 'bandpass')
output_data = input_data
for i,s in enumerate(output_data.signal):
output_data.signal[i] = sp_signal.lfilter(b,a,s)
return output_data
#########################################
## wrappers to numpy signal processing ##
#########################################
def preemphasis(x, coef=0.97):
"""Pre-emphasis
Args:
x (1d-array): Input signal.
coef (float): Pre-emphasis coefficient.
Returns:
array: Output filtered signal.
See also:
:func:`inv_preemphasis`
Examples:
>>> from nnmnkwii.util import example_audio_file
>>> from scipy.io import wavfile
>>> fs, x = wavfile.read(example_audio_file())
>>> x = x.astype(np.float64)
>>> from nnmnkwii import preprocessing as P
>>> y = P.preemphasis(x, coef=0.97)
>>> assert x.shape == y.shape
"""
b = np.array([1., -coef], x.dtype)
a = np.array([1.], x.dtype)
return signal.lfilter(b, a, x)
def Gaussian2D(image, sigma, padding=0):
n, m = image.shape[0], image.shape[1]
tmp = np.zeros((n + padding, m + padding))
if tmp.shape[0] < 4:
raise ValueError('Image and padding too small')
if tmp.shape[1] < 4:
raise ValueError('Image and padding too small')
B, A = __gausscoeff(sigma)
tmp[:n, :m] = image
tmp = lfilter(B, A, tmp, axis=0)
tmp = np.flipud(tmp)
tmp = lfilter(B, A, tmp, axis=0)
tmp = np.flipud(tmp)
tmp = lfilter(B, A, tmp, axis=1)
tmp = np.fliplr(tmp)
tmp = lfilter(B, A, tmp, axis=1)
tmp = np.fliplr(tmp)
return tmp[:n, :m]
#-----------------------------------------------------------------------------
def load_series(self, subject, series):
min_freq = self.min_freq
max_freq = self.max_freq
key = (subject, series, min_freq, max_freq)
if key not in self._series_cache:
while len(self._series_cache) > self.MAX_CACHE_SIZE:
# Randomly throw away an item
self._series_cache.popitem()
print("Loading", subject, series)
data = read_csv(path(subject, series, "data"))
# Filter here since it's slow and we don't want to filter multiple
# times. `lfilter` is CAUSAL and thus doesn't violate the ban on future data.
if (self.min_freq is None) or (self.min_freq == 0):
print("Low pass filtering, f_h =", max_freq)
b, a = butter_lowpass(max_freq, SAMPLE_RATE, FILTER_N)
else:
print("Band pass filtering, f_l =", min_freq, "f_h =", max_freq)
b, a = butter_bandpass(min_freq, max_freq, SAMPLE_RATE, FILTER_N)
self._series_cache[key] = lfilter(b, a, data, axis=0)
return self._series_cache[key]
def _apply_filter(sys, dt, u):
# "Correct" implementation of filt that has a single time-step delay
# see Nengo issue #938
if dt is not None:
num, den = cont2discrete(sys, dt).tf
elif not sys.analog:
num, den = sys.tf
else:
raise ValueError("system (%s) must be discrete if not given dt" % sys)
# convert from the polynomial representation, and add back the leading
# zeros that were dropped by poly1d, since lfilter will shift it the
# wrong way (it will add the leading zeros back to the end, effectively
# removing the delay)
num, den = map(np.asarray, (num, den))
num = np.append([0]*(len(den) - len(num)), num)
return lfilter(num, den, u, axis=-1)
def _process(self):
# Receive input data.
batch = self.input.receive()
self._measure_time('start', frequency=100)
# Process data.
for i in xrange(self.nb_channels):
batch[:, i], self.z[i] = signal.lfilter(self.b, self.a, batch[:, i], zi=self.z[i])
batch[:, i] -= np.median(batch[:, i])
if self.remove_median:
global_median = np.median(batch, 1)
for i in xrange(self.nb_channels):
batch[:, i] -= global_median
# Send output data.
self.output.send(batch)
self._measure_time('end', frequency=100)
return
def _generate_noise(info, cov, iir_filter, random_state, n_samples, zi=None):
"""Helper to create spatially colored and temporally IIR-filtered noise"""
from scipy.signal import lfilter
noise_cov = pick_channels_cov(cov, include=info['ch_names'], exclude=[])
rng = check_random_state(random_state)
c = np.diag(noise_cov.data) if noise_cov['diag'] else noise_cov.data
mu_channels = np.zeros(len(c))
# we almost always get a positive semidefinite warning here, so squash it
with warnings.catch_warnings(record=True):
noise = rng.multivariate_normal(mu_channels, c, n_samples).T
if iir_filter is not None:
if zi is None:
zi = np.zeros((len(c), len(iir_filter) - 1))
noise, zf = lfilter([1], iir_filter, noise, axis=-1, zi=zi)
else:
zf = None
return noise, zf
def __call__(self, x):
y, self.filter_state = lfilter(b=self.b, a=self.a, x = np.array(x)[None], zi = self.filter_state)
return y[0]
def butter_bandpass_filter(data, highpass, fs, order=4):
b, a = butter_bandpass(0, highpass, fs, order=order)
y = lfilter(b, a, data)
return y
def discount(x, gamma):
"""
Compute discounted sum of future values
out[i] = in[i] + gamma * in[i+1] + gamma^2 * in[i+2] + ...
"""
return signal.lfilter([1],[1,-gamma],x[::-1], axis=0)[::-1]
def butter_bandpass_filter(data, lowcut, highcut, order=5):
b, a = butter_bandpass(lowcut, highcut, order=order)
filtered_data = lfilter(b, a, data)
return filtered_data
def inv_preemphasis(x, coef=0.97):
"""Inverse operation of pre-emphasis
Args:
x (1d-array): Input signal.
coef (float): Pre-emphasis coefficient.
Returns:
array: Output filtered signal.
See also:
:func:`preemphasis`
Examples:
>>> from nnmnkwii.util import example_audio_file
>>> from scipy.io import wavfile
>>> fs, x = wavfile.read(example_audio_file())
>>> x = x.astype(np.float64)
>>> from nnmnkwii import preprocessing as P
>>> x_hat = P.inv_preemphasis(P.preemphasis(x, coef=0.97), coef=0.97)
>>> assert np.allclose(x, x_hat)
"""
b = np.array([1.], x.dtype)
a = np.array([1., -coef], x.dtype)
return signal.lfilter(b, a, x)
def Filter(self, LowCorner, HighCorner, Order=3):
"""
Butterworth bandpass filter
"""
FS = 1./self.HDR['TSMP']
if HighCorner >= FS/2.:
print 'Warning: High corner must be < {0:.2f} Hz'.format(FS/2.)
return
if LowCorner < 0.:
print 'Warning: Low corner must be > 0 Hz'.format(FS/2.)
return
# Corner frequencies
Corners = [2.*LowCorner/FS, 2.*HighCorner/FS]
# Butterworth filter
b, a = _sig.butter(Order, Corners, btype='band')
# Filtering records
for I,S in enumerate(self.CHN):
# self.CHN[I] = _sig.lfilter(b, a, S)
zi = _sig.lfilter_zi(b, a);
self.CHN[I],_ = _sig.lfilter(b, a, S, zi=zi*S[0])
#-----------------------------------------------------------------------------------------
cochleagram_extractor.py 文件源码
项目:speech_feature_extractor
作者: ZhihaoDU
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def erb_frilter_bank(x, fcoefs):
a0 = fcoefs[:, 0]
a11 = fcoefs[:, 1]
a12 = fcoefs[:, 2]
a13 = fcoefs[:, 3]
a14 = fcoefs[:, 4]
a2 = fcoefs[:, 5]
b0 = fcoefs[:, 6]
b1 = fcoefs[:, 7]
b2 = fcoefs[:, 8]
gain = fcoefs[:, 9]
output = np.zeros((np.size(gain, 0), np.size(x, 0)))
for chan in range(np.size(gain, 0)):
y1 = lfilter(np.array([a0[chan] / gain[chan], a11[chan] / gain[chan], a2[chan] / gain[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), x)
y2 = lfilter(np.array([a0[chan], a12[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y1)
y3 = lfilter(np.array([a0[chan], a13[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y2)
y4 = lfilter(np.array([a0[chan], a14[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y3)
output[chan, :] = y4
return output
def erb_frilter_bank(x, fcoefs):
a0 = fcoefs[:, 0]
a11 = fcoefs[:, 1]
a12 = fcoefs[:, 2]
a13 = fcoefs[:, 3]
a14 = fcoefs[:, 4]
a2 = fcoefs[:, 5]
b0 = fcoefs[:, 6]
b1 = fcoefs[:, 7]
b2 = fcoefs[:, 8]
gain = fcoefs[:, 9]
output = np.zeros((np.size(gain, 0), np.size(x, 0)))
for chan in range(np.size(gain, 0)):
y1 = lfilter(np.array([a0[chan] / gain[chan], a11[chan] / gain[chan], a2[chan] / gain[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), x)
y2 = lfilter(np.array([a0[chan], a12[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y1)
y3 = lfilter(np.array([a0[chan], a13[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y2)
y4 = lfilter(np.array([a0[chan], a14[chan], a2[chan]]),
np.array([b0[chan], b1[chan], b2[chan]]), y3)
output[chan, :] = y4
return output
def rasta_filt(x):
number = np.arange(-2., 3., 1.)
number = -1. * number / np.sum(number*number)
denom = np.array([1., -0.94])
zi = lfilter_zi(number, 1)
zi = zi.reshape(1, len(zi))
zi = np.repeat(zi, np.size(x, 0), 0)
y, zf = lfilter(number, 1, x[:,0:4], axis=1, zi=zi)
y, zf = lfilter(number, denom, x, axis=1, zi=zf)
return y
def rasta_filt(x):
number = np.arange(-2., 3., 1.)
number = -1. * number / np.sum(number*number)
denom = np.array([1., -0.94])
zi = lfilter_zi(number, 1)
zi = zi.reshape(1, len(zi))
zi = np.repeat(zi, np.size(x, 0), 0)
y, zf = lfilter(number, 1, x[:,0:4], axis=1, zi=zi)
y, zf = lfilter(number, denom, x, axis=1, zi=zf)
return y
def _analog_filter(self,
ftype,
highpass=None,
lowpass=None,
order=3,
zerophase=True):
' Use a classic analog filter on the data, currently butter or bessel'
from scipy.signal import butter, bessel
filter_types = {'butter': butter, 'bessel': bessel}
afilter = filter_types[ftype]
if highpass is None and lowpass is not None:
b, a = afilter(order, lowpass / (self.sr / 2), btype='lowpass')
elif highpass is not None and lowpass is None:
b, a = afilter(order, highpass / (self.sr / 2), btype='highpass')
elif highpass is not None and lowpass is not None:
if highpass < lowpass:
b, a = afilter(order,
(highpass / (self.sr / 2),
lowpass / (self.sr / 2)),
btype='bandpass')
else:
b, a = afilter(order,
(lowpass / (self.sr / 2),
highpass / (self.sr / 2)),
btype='bandstop')
if zerophase:
return self.filtfilt(b, a)
else:
return self.lfilter(b, a)
def lfilter(self, b, a):
" Forward only filtering"
from scipy.signal import lfilter
def filter_func(x):
return lfilter(b, a, x, axis=0)
return self.new_stream(self.vector_map(filter_func))
preprocessing.py 文件源码
项目:Epileptic-Seizure-Prediction
作者: cedricsimar
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def low_pass_filter(self, sig, low_cut, high_cut, nyq):
"""
Apply a low pass filter to the data to take the relevant frequencies
and to smooth the drop-out regions
No resample necessary because all files have the same sampling frequency
"""
b, a = signal.butter(5, np.array([low_cut, high_cut]) / nyq, btype='band')
sig_filt = signal.lfilter(b, a, sig, axis=0)
return(np.float32(sig_filt))
def butter_bandpass_filter(data, lowcut, highcut, fs, order):
from scipy.signal import butter, lfilter
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='bandpass')
y = lfilter(b, a, data)
return y
def butter_bandstop_filter(data, lowcut, highcut, fs, order):
from scipy.signal import butter, lfilter
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='bandstop')
y = lfilter(b, a, data)
return y
def butter_lowpass_filter(data, cut, fs, order, zero_phase=False):
from scipy.signal import butter, lfilter, filtfilt
nyq = 0.5 * fs
cut = cut / nyq
b, a = butter(order, cut, btype='low')
y = (filtfilt if zero_phase else lfilter)(b, a, data)
return y
def apply_ellip_filter(self, data, order, cutoff, btype):
ripple = 3
attenuation = 50
cutoff /= self.nyq
b, a = iirfilter(N=order, Wn=cutoff, rp=ripple, rs=attenuation,
btype=btype, ftype='ellip')
return lfilter(b, a, data, axis=0)
def apply(self, data):
nyq = self.f / 2.0
cutoff = min(self.f, nyq-1)
h = signal.firwin(numtaps=101, cutoff=cutoff, nyq=nyq)
# data[i][ch][dim0]
for i in range(len(data)):
data_point = data[i]
for j in range(len(data_point)):
data_point[j] = signal.lfilter(h, 1.0, data_point[j])
return data
def _discount(self, x):
a = np.asarray(x)
return lfilter([1], [1, -self.discount_factor], a[::-1], axis=0)[::-1]
def discount(self, x, gamma):
"""
computes discounted sums along 0th dimension of x.
inputs
------
x: ndarray
gamma: float
outputs
-------
y: ndarray with same shape as x, satisfying
y[t] = x[t] + gamma*x[t+1] + gamma^2*x[t+2] + ... + gamma^k x[t+k],
where k = len(x) - t - 1
"""
x = np.array(x)
assert x.ndim >= 1
return lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1]
def butter_bandpass_filter(data, *, lowcut, highcut, fs, order=5):
"""Band filter data using a butterworth filter."""
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y