def homomorphic_envelope(x, fs=1000, f_LPF=8, order=3):
"""
Computes the homomorphic envelope of x
Args:
x : array
fs : float
Sampling frequency. Defaults to 1000 Hz
f_LPF : float
Lowpass frequency, has to be f_LPF < fs/2. Defaults to 8 Hz
Returns:
time : numpy array
"""
b, a = butter(order, 2 * f_LPF / fs, 'low')
he = np.exp(filtfilt(b, a, np.log(np.abs(hilbert(x)))))
return he
python类filtfilt()的实例源码
def highpass(data, BUTTER_ORDER=3, sampling_rate=10000, cut_off=500.0):
Wn = (float(cut_off) / (float(sampling_rate) / 2.0), 0.95)
b, a = signal.butter(BUTTER_ORDER, Wn, 'pass')
return signal.filtfilt(b, a, data)
def lpf(x, cutoff, fs, order=5):
"""
low pass filters signal with Butterworth digital
filter according to cutoff frequency
filter uses Gustafsson’s method to make sure
forward-backward filt == backward-forward filt
Note that edge effects are expected
Args:
x (array): signal data (numpy array)
cutoff (float): cutoff frequency (Hz)
fs (int): sample rate (Hz)
order (int): order of filter (default 5)
Returns:
filtered (array): low pass filtered data
"""
nyquist = fs / 2
b, a = butter(order, cutoff / nyquist)
if not np.all(np.abs(np.roots(a)) < 1):
raise PsolaError('Filter with cutoff at {} Hz is unstable given '
'sample frequency {} Hz'.format(cutoff, fs))
filtered = filtfilt(b, a, x, method='gust')
return filtered
def pb2bb(x, fs, fc, fd=None, flen=127, cutoff=None):
"""Convert passband signal to baseband.
The baseband conversion uses a low-pass filter after downconversion, with a
default cutoff frequency of `0.6*fd`, if `fd` is specified, or `1.1*fc` if `fd`
is not specified. Alternatively, the user may specify the cutoff frequency
explicitly.
For communication applications, one may wish to use :func:`arlpy.comms.downconvert` instead,
as that function supports matched filtering with a pulse shape rather than a generic
low-pass filter.
:param x: passband signal
:param fs: sampling rate of passband signal in Hz
:param fc: carrier frequency in passband in Hz
:param fd: sampling rate of baseband signal in Hz (``None`` => same as `fs`)
:param flen: number of taps in the low-pass FIR filter
:param cutoff: cutoff frequency in Hz (``None`` means auto-select)
:returns: complex baseband signal, sampled at `fd`
"""
if cutoff is None:
cutoff = 0.6*fd if fd is not None else 1.1*fc
y = x * _np.sqrt(2)*_np.exp(2j*_np.pi*fc*time(x,fs))
hb = _sig.firwin(flen, cutoff=cutoff, nyq=fs/2.0)
y = _sig.filtfilt(hb, 1, y)
if fd is not None and fd != fs:
y = _sig.resample_poly(y, 2*fd, fs)[::2]
return y
def process(self, obj_data):
'''
Apply lowpass filter to data set, with changes applied in place
@param obj_data: Input data with data
'''
ntaps = self.ap_paramList[0]()
fpassf_per = self.ap_paramList[1]()
fstopf_per = self.ap_paramList[2]()
wghts = self.ap_paramList[3]()
miter = self.ap_paramList[4]()
b_filt=signal.fir_filter_design.remez(numtaps=ntaps,
bands=[0,fpassf_per,fstopf_per,.5],
desired=[1,0],weight=wghts,maxiter=miter)
for label, data, err in obj_data.getIterator():
data.update(signal.filtfilt(b_filt,1,data))
def process(self, obj_data):
'''
Apply lowpass filter to data set
@param obj_data: Input data. Changes are made in place.
'''
column_names = obj_data.getDefaultColumns()
ntaps = self.ap_paramList[0]()
fpassf_per = self.ap_paramList[1]()
fstopf_per = self.ap_paramList[2]()
wghts = self.ap_paramList[3]()
miter = self.ap_paramList[4]()
b_filt=signal.fir_filter_design.remez(numtaps=ntaps,
bands=[0,fpassf_per,fstopf_per,.5],
desired=[1,0],weight=wghts,maxiter=miter)
for label, data in obj_data.getIterator():
for column in column_names:
obj_data.updateData(label, data.index, column, signal.filtfilt(b_filt,1,data))
def process(self, X):
onset_func = zscore(self.func(X))
onset_func = signal.filtfilt(self.moving_avg_filter, 1, onset_func)
onset_func = onset_func - signal.medfilt(
onset_func[:,np.newaxis], self.median_kernel
)[:,0]
peaks = signal.argrelmax(onset_func)
onsets = peaks[0][np.where(onset_func[peaks[0]] >
self.threshold)]
return onsets
def bfilt(cop_dat, cutoff, order):
# Butterworth filter
b,a = signal.butter(order, cutoff)
# filter coronal
cor_lp = signal.filtfilt(b, a, cop_dat[:,0])
# filter sagittal
sag_lp = signal.filtfilt(b, a, cop_dat[:,1])
return np.concatenate((to_sing(cor_lp),to_sing(sag_lp),to_sing(cop_dat[:,2])),axis=1)
def _analog_filter(self,
ftype,
highpass=None,
lowpass=None,
order=3,
zerophase=True):
' Use a classic analog filter on the data, currently butter or bessel'
from scipy.signal import butter, bessel
filter_types = {'butter': butter, 'bessel': bessel}
afilter = filter_types[ftype]
if highpass is None and lowpass is not None:
b, a = afilter(order, lowpass / (self.sr / 2), btype='lowpass')
elif highpass is not None and lowpass is None:
b, a = afilter(order, highpass / (self.sr / 2), btype='highpass')
elif highpass is not None and lowpass is not None:
if highpass < lowpass:
b, a = afilter(order,
(highpass / (self.sr / 2),
lowpass / (self.sr / 2)),
btype='bandpass')
else:
b, a = afilter(order,
(lowpass / (self.sr / 2),
highpass / (self.sr / 2)),
btype='bandstop')
if zerophase:
return self.filtfilt(b, a)
else:
return self.lfilter(b, a)
def filtfilt(self, b, a):
" Performs forward backward filtering on the stream."
from scipy.signal import filtfilt
def filter_func(x):
return filtfilt(b, a, x, axis=0)
return self.new_stream(self.vector_map(filter_func))
def abs_and_smooth(x, sr, lp=100):
abs_x = np.abs(x)
if len(x.shape) > 1:
abs_x = np.sum(abs_x,
axis=-1) # sum over last dimension eg sum over channels
b, a = butter(3, lp / 2 / sr, btype="low")
filtered_x = filtfilt(b, a, abs_x, axis=0)
return filtered_x
def test_filtfilt():
sr = 1000
freq = 100
b, a = butter(3, freq / (sr / 2), 'high')
for data in (data2, data3, data4):
x = filtfilt(b, a, data, axis=0)
y = Stream(data, chunksize=211, sr=sr).filtfilt(b, a).call()
assert eq(x, y)
def butter_lowpass_filter(data, cut, fs, order, zero_phase=False):
from scipy.signal import butter, lfilter, filtfilt
nyq = 0.5 * fs
cut = cut / nyq
b, a = butter(order, cut, btype='low')
y = (filtfilt if zero_phase else lfilter)(b, a, data)
return y
def apply_butter_filter(self, data, order, cutoff, btype):
cutoff /= self.nyq
b, a = butter(N=order, Wn=cutoff, btype=btype)
return filtfilt(b, a, data, axis=1)
def _getFiltDesign(sf, f, npts, filtname, cycle, order, axis):
"""Get the designed filter
sf : sample frequency
f : frequency vector/list [ex : f = [2,4]]
npts : number of points
- 'fir1'
- 'butter'
- 'bessel'
"""
if type(f) != np.ndarray:
f = np.array(f)
# fir1 filter :
if filtname == 'fir1':
fOrder = fir_order(sf, npts, f[0], cycle=cycle)
b, a = fir1(fOrder, f/(sf / 2))
# butterworth filter :
elif filtname == 'butter':
b, a = butter(order, [(2*f[0])/sf, (2*f[1])/sf], btype='bandpass')
fOrder = None
# bessel filter :
elif filtname == 'bessel':
b, a = bessel(order, [(2*f[0])/sf, (2*f[1])/sf], btype='bandpass')
fOrder = None
def filtSignal(x):
return filtfilt(b, a, x, padlen=fOrder, axis=axis)
return filtSignal
def fir_filt(x, Fs, Fc, fOrder):
(b, a) = fir1(fOrder, Fc / (Fs / 2))
return filtfilt(b, a, x, padlen=fOrder)
####################################################################
# - Morlet :
####################################################################
def butter_lowpass_filtfilt(data, *, cutoff, fs, order=5):
"""Lowpass filter data using a zero-phase filt-filt butterworth
filter.
Performs zero-phase digital filtering by processing the input data
in both the forward and reverse directions.
"""
b, a = butter_lowpass(cutoff, fs, order=order)
y = filtfilt(b, a, data, padlen=150)
return y
def filtfiltlong(finname, foutname, fmt, b, a, buffer_len=100000, overlap_len=100, max_len=-1):
"""Use memmap and chunking to filter continuous data.
Inputs:
finname -
foutname -
fmt - data format eg 'i'
b,a - filter coefficients
buffer_len - how much data to process at a time
overlap_len - how much data do we add to the end of each chunk to smooth out filter transients
max_len - how many samples to process. If set to -1, processes the whole file
Outputs:
y - The memmapped array pointing to the written file
Notes on algorithm:
1. The arrays are memmapped, so we let pylab (numpy) take care of handling large arrays
2. The filtering is done in chunks:
Chunking details:
|<------- b1 ------->||<------- b2 ------->|
-----[------*--------------{-----*------]--------------*------}----------
|<-------------- c1 -------------->|
|<-------------- c2 -------------->|
From the array of data we cut out contiguous buffers (b1,b2,...) and to each buffer we add some extra overlap to
make chunks (c1,c2). The overlap helps to remove the transients from the filtering which would otherwise appear at
each buffer boundary.
"""
x = pylab.memmap(finname, dtype=fmt, mode='r')
if max_len == -1:
max_len = x.size
y = pylab.memmap(foutname, dtype=fmt, mode='w+', shape=max_len)
for buff_st_idx in xrange(0, max_len, buffer_len):
chk_st_idx = max(0, buff_st_idx - overlap_len)
buff_nd_idx = min(max_len, buff_st_idx + buffer_len)
chk_nd_idx = min(x.size, buff_nd_idx + overlap_len)
rel_st_idx = buff_st_idx - chk_st_idx
rel_nd_idx = buff_nd_idx - chk_st_idx
this_y_chk = filtfilt(b, a, x[chk_st_idx:chk_nd_idx])
y[buff_st_idx:buff_nd_idx] = this_y_chk[rel_st_idx:rel_nd_idx]
return y
def periodic_derivative(x,y,max_periods):
plot = False
Ns =len(x)
b,a = signalbutter(8,2.0*max_periods/Ns)
ymid =interp(x+0.5*(x[1]-x[0]),x,y,period=2*np_pi)
yder = diff(ymid)/diff(x)
#yder = Ns/(max(x)-min(x))*fftpackdiff(y,1,Ns)
yder_filt = deepcopy(yder)
x_filt = deepcopy(x)
x_filt = npappend(x_filt,x_filt[-1]+x_filt[1]-x_filt[0])
yder_filt = signalfiltfilt(b,a,npappend(yder_filt,yder_filt[0]))
if plot:
plt.figure(1)
plt.subplot(311)
plt.plot(x, y)
plt.subplot(312)
plt.plot(x[0:-1],yder)
plt.subplot(313)
plt.plot(x_filt[0:-1],yder_filt)
plt.show()
return yder_filt
#x = numpy.array(range(100))
#y = numpy.sin(twopi*x/100)+numpy.sin(twopi*x/10)
#periodic_derivative(x,y,4)
def filter(self, s, axis=0):
"""Filter new data.
"""
if self.bandType == 'allpass':
return s
if self.bandType == 'allstop':
return np.zeros_like(s)
""" Should be very close to filtfilt, padding? XXX - idfah
if self.zeroPhase:
rev = [slice(None),]*s.ndim
rev[axis] = slice(None,None,-1)
#ziScaled = self.scaleZi(s[rev], axis)
y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, s[rev], axis=axis, zi=newZi)
y = y[rev]
"""
# if zeroPhase and signal is shorter than padlen (default in filtfilt function)
if self.zeroPhase and \
(3*max(len(self.numCoef), len(self.denomCoef))) < s.shape[axis]:
# need astype below since filtfilt calls lfilter_zi, which does not preserve dtype XXX - idfah
return spsig.filtfilt(self.numCoef, self.denomCoef,
s, axis=axis, padtype='even').astype(self.dtype, copy=False)
else:
ziScaled = self.scaleZi(s, axis)
# even padding to help reduce edge effects
nPad = 3*max(len(self.numCoef), len(self.denomCoef))
sPad = np.apply_along_axis(np.pad, axis, s, pad_width=nPad, mode='reflect') # edge for constant padding
slc = [slice(nPad,-nPad) if i == axis else slice(None) for i in range(s.ndim)]
y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, sPad, axis=axis, zi=ziScaled)
return y[slc]
def high_pass(self,data_buffer):
[b1, a1] = [self.high_pass_coefficients[0],self.high_pass_coefficients[1]]
high_passed = signal.filtfilt(b1,a1,data_buffer)
return high_passed
def low_pass(self,data_buffer):
[b, a] = [self.low_pass_coefficients[0],self.low_pass_coefficients[1]]
low_passed = signal.filtfilt(b,a,data_buffer)
return low_passed
def low_pass_filter(x, order, cutOffFrequency, samplingFrequency):
nyq = samplingFrequency*0.5
cut = cutOffFrequency/nyq
b, a = signal.butter(order, cut, btype='low')
y = signal.filtfilt(b, a, x)
return y
def imu_filter_lowpass(x, order = 4, sRate = 148.148148148148, highcut = 20.0):
""" Forward-backward band-pass filtering (IIR butterworth filter) """
nyq = 0.5 * sRate
high = highcut/nyq
b, a = butter(N =order, Wn = high, btype = 'low')
return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd',
padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_highpass(x, order = 4, sRate = 148.148148148148, lowcut = 0.01):
""" Forward-backward band-pass filtering (IIR butterworth filter) """
nyq = 0.5 * sRate
low = lowcut/nyq
b, a = butter(N =order, Wn = low, btype = 'high')
return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd',
padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_bandpass(x, order = 4, sRate = 148.148148148148, lowcut = 1., highcut = 20.):
""" Forward-backward band-pass filtering (IIR butterworth filter) """
nyq = 0.5 * sRate
low = lowcut/nyq
high = highcut/nyq
b, a = butter(N =order, Wn = [low, high], btype = 'band')
return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd',
padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_comb(x):
""" Comb filtering at 50 Hz. Coefficients are computed with MATLAB."""
b = np.zeros(41)
a = np.zeros(41)
b[0] = 0.941160767899653
b[-1] = -0.941160767899653
a[0] = 1.
a[-1] = -0.882321535799305
return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd',
padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def glove_filter(self, order = 4, highcut = 2):
nyq = 0.5 * self.sRate['glove']
high = highcut/nyq
b, a = butter(N=order, Wn = high, btype = 'lowpass')
self.glove = filtfilt(b=b, a=a, x=self.glove, axis=0)
def emg_filter_comb(x):
""" Comb filtering at 50 Hz, for 2KHz sampling frequency.
Coefficients are computed with MATLAB."""
b = np.zeros(41)
a = np.zeros(41)
b[0] = 0.941160767899653
b[-1] = -0.941160767899653
a[0] = 1.
a[-1] = -0.882321535799305
return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd',
padlen = np.minimum(3*np.maximum(len(a),len(b)), x.shape[0]-1))
def glove_filter_lowpass(x, order = 4, sRate = 2000., highcut = 1.):
""" Forward-backward band-pass filtering (IIR butterworth filter) """
nyq = 0.5 * sRate
high = highcut/nyq
b, a = butter(order, high, btype = 'low')
filtered = filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd')
return filtered