def test_lfilter_gen(self):
x = np.random.normal(0, 1, 1000)
hb = np.array([0, 0, 1, 0], dtype=np.float)
f = signal.lfilter_gen(hb, 1)
y = [f.send(v) for v in x]
self.assertArrayEqual(np.append([0, 0], x[:-2]), y)
hb, ha = sp.iirfilter(4, 0.01, btype='lowpass')
y1 = sp.lfilter(hb, ha, x)
f = signal.lfilter_gen(hb, ha)
y2 = [f.send(v) for v in x]
self.assertArrayEqual(y1, y2, precision=6)
python类lfilter()的实例源码
def mfilter(s, x, axis=0, complex_output=False):
"""Matched filter recevied signal using a reference signal.
:param s: reference signal
:param x: recevied signal
:param axis: axis of the signal, if multiple signals specified
:param complex_output: True to return complex signal, False for absolute value of complex signal
"""
hb = _np.conj(_np.flipud(s))
x = _np.pad(x, (0, len(s)-1), 'constant')
y = _sig.lfilter(hb, 1, x, axis)[len(s)-1:]
if not complex_output:
y = _np.abs(y)
return y
def lfilter0(b, a, x, axis=0):
"""Filter data with an IIR or FIR filter with zero DC group delay.
:func:`scipy.signal.lfilter` provides a way to filter a signal `x` using a FIR/IIR
filter defined by `b` and `a`. The resulting output is delayed, as compared to the
input by the group delay. This function corrects for the group delay, resulting in
an output that is synchronized with the input signal. If the filter as an acausal
impulse response, some precursor signal from the output will be lost. To avoid this,
pad input signal `x` with sufficient zeros at the beginning to capture the precursor.
Since both, :func:`scipy.signal.lfilter` and this function return a signal with the
same length as the input, some signal tail is lost at the end. To avoid this, pad
input signal `x` with sufficient zeros at the end.
See documentation for :func:`scipy.signal.lfilter` for more details.
>>> import arlpy
>>> import numpy as np
>>> fs = 250000
>>> b = arlpy.uwa.absorption_filter(fs, distance=500)
>>> x = np.pad(arlpy.signal.sweep(20000, 40000, 0.5, fs), (127, 127), 'constant')
>>> y = arlpy.signal.lfilter0(b, 1, x)
"""
w, g = _sig.group_delay((b, a))
ndx = _np.argmin(_np.abs(w))
d = int(round(g[ndx]))
x = _np.pad(x, (0, d), 'constant')
y = _sig.lfilter(b, a, x, axis)[d:]
return y
def harvest_filter_f0_contour(f0, st, ed, b, a):
smoothed_f0 = f0.copy()
smoothed_f0[:st] = smoothed_f0[st]
smoothed_f0[ed + 1:] = smoothed_f0[ed]
aaa = sg.lfilter(b, a, smoothed_f0)
bbb = sg.lfilter(b, a, aaa[::-1])
smoothed_f0 = bbb[::-1].copy()
smoothed_f0[:st] = 0.
smoothed_f0[ed + 1:] = 0.
return smoothed_f0
def harvest_filter_f0_contour(f0, st, ed, b, a):
smoothed_f0 = f0.copy()
smoothed_f0[:st] = smoothed_f0[st]
smoothed_f0[ed + 1:] = smoothed_f0[ed]
aaa = sg.lfilter(b, a, smoothed_f0)
bbb = sg.lfilter(b, a, aaa[::-1])
smoothed_f0 = bbb[::-1].copy()
smoothed_f0[:st] = 0.
smoothed_f0[ed + 1:] = 0.
return smoothed_f0
def butter_lowpass_filter(data, cutoff, fs, order=2):
"""
This function designs a Butterworth lowpass filter and applies it to the data
:param data: Audio data to be lowpass filtered
:param cutoff: Cutoff frequency in Hz
:param fs: Samplerate of audio
:param order: Order of the filter, defaults to second order filter as required by timbral_metallic
:return: Returns the filtered signal
"""
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y
def discount_rewards(x, gamma):
"""
Given vector x, computes a vector y such that
y[i] = x[i] + gamma * x[i+1] + gamma^2 x[i+2] + ...
"""
return signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1]
# Source: http://stackoverflow.com/a/12201744/1735784
def SPBFILTER(df, n1 = 40, n2 = 60, n3 = 0, field = 'close'):
if n3 == 0:
n3 = int((n1 + n2)/2)
a1 = 5.0/n1
a2 = 5.0/n2
B = [a1-a2, a2-a1]
A = [1, (1-a1)+(1-a2), -(1-a1)*(1-a2)]
PB = pd.Series(signal.lfilter(B, A, df[field]), name = 'SPB_%s_%s' % (n1, n2))
RMS = pd.Series(pd.rolling_mean(PB*PB, n3)**0.5, name = 'SPBRMS__%s_%s' % (n1, n2))
return pd.concat([PB, RMS], join='outer', axis=1)
def Fdeltas(x,w=9):
'''
# D = deltas(X,W) Calculate the deltas (derivatives) of a sequence
input feature*frame
'''
# nr feature, nc frame
nr,nc = x.shape
# Define window shape
hlen = int(np.floor(w/2))
w = 2*hlen + 1
win = np.arange(hlen,-hlen-1,-1)
# normalisation
win = win/np.sum(np.abs(win))
# pad data by repeating first and last columns
xx = np.hstack((np.tile(x[:,0],(hlen,1)).transpose(),x,np.tile(x[:,-1],(hlen,1)).transpose()))
# Apply the delta filter
d = lfilter(win, 1, xx, 1) # filter along dim 1 (rows)
# Trim edges
d = d[:,2*hlen:2*hlen+nc]
return d
def filter(self, s, axis=0):
"""Filter new data.
"""
if self.bandType == 'allpass':
return s
if self.bandType == 'allstop':
return np.zeros_like(s)
""" Should be very close to filtfilt, padding? XXX - idfah
if self.zeroPhase:
rev = [slice(None),]*s.ndim
rev[axis] = slice(None,None,-1)
#ziScaled = self.scaleZi(s[rev], axis)
y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, s[rev], axis=axis, zi=newZi)
y = y[rev]
"""
# if zeroPhase and signal is shorter than padlen (default in filtfilt function)
if self.zeroPhase and \
(3*max(len(self.numCoef), len(self.denomCoef))) < s.shape[axis]:
# need astype below since filtfilt calls lfilter_zi, which does not preserve dtype XXX - idfah
return spsig.filtfilt(self.numCoef, self.denomCoef,
s, axis=axis, padtype='even').astype(self.dtype, copy=False)
else:
ziScaled = self.scaleZi(s, axis)
# even padding to help reduce edge effects
nPad = 3*max(len(self.numCoef), len(self.denomCoef))
sPad = np.apply_along_axis(np.pad, axis, s, pad_width=nPad, mode='reflect') # edge for constant padding
slc = [slice(nPad,-nPad) if i == axis else slice(None) for i in range(s.ndim)]
y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, sPad, axis=axis, zi=ziScaled)
return y[slc]
def filter(self, s, axis=0):
if self.bandType == 'allpass':
return s
if self.bandType == 'allstop':
return np.zeros_like(s)
if not self.ziSaved:
# use padding to find initial zi? XXX - idfah
self.zi = self.scaleZi(s, axis)
self.ziSaved = True
y, self.zi = spsig.lfilter(nc, dc, s, axis=axis, zi=self.zi)
return y
def butter_highpass_filter(data, cutoff, fs, order=5):
b, a = butter_highpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y
def butter_highpass_filter(data, cutoff, fs, order=5):
b, a = AudioAnalyzer.butter_highpass(cutoff, fs, order=order)
y = signal.lfilter(b, a, data)
return y
def sosfilter(sos, zi_in, x):
y = x
zi_out = zi_in
for i in range(len(sos)):
y, zi_out[i] = lfilter(sos[i,:3], sos[i,3:], y, zi = zi_in[i])
return y, zi_out
def filter(self):
path = os.getcwd()+'/trialGraspEventDetection_dataFiles'
self.Fgr = np.sum(self.values[:, 9:15], axis=1) # SAI
self.Fgl = np.sum(self.values[:, 0:7], axis=1) # SAI
# can use this to plot in matlab graspeventdetection_plot.m
np.savetxt(path+'/SAI_Fgr.txt', self.Fgr)
# can use this to plot in matlab
np.savetxt(path+'/SAI_Fgl.txt', self.Fgl)
# 0.55*pi rad/samples
b1, a1 = signal.butter(1, 0.55, 'high', analog=False)
self.f_acc_x = signal.lfilter(b1, a1, self.acc_x, axis=-1, zi=None)
self.f_acc_y = signal.lfilter(b1, a1, self.acc_y, axis=-1, zi=None)
self.f_acc_z = signal.lfilter(b1, a1, self.acc_z, axis=-1, zi=None)
# self.f_eff = signal.lfilter(b1, a1, self.eff, axis=-1, zi=None)
# type(eff)
self.FAII = np.sqrt(np.square(self.f_acc_x) +
np.square(self.f_acc_y) +
np.square(self.f_acc_z))
# can use this to plot in matlab
np.savetxt(path+'/FAII.txt', self.FAII)
# subtract base values from the values array
self.values1 = self.values - self.values.min(axis=0)
# pass the filter for each sensor
self.fvalues1 = np.zeros(self.values1.shape)
# 0.48*pi rad/samples
b, a = signal.butter(1, 0.48, 'high', analog=False)
for i in range(16):
self.fvalues1[:, i] = signal.lfilter(b, a, self.values1[:, i],
axis=-1, zi=None)
self.FAI = np.sum(self.fvalues1, axis=1)
# can use this to plot in matlab
np.savetxt(path+'/FAI.txt', self.FAI)
def compute_fai(self):
filtered_values = signal.lfilter(self.b, self.a,
self.sensor_values, axis=0)
# print shape()
finger1 = filtered_values[-1][0:1].sum()
finger2 = filtered_values[-1][1:2].sum()
finger3 = filtered_values[-1][2:3].sum()
msg = FingerSAI()
msg.finger1 = float(finger1)
msg.finger2 = float(finger2)
msg.finger3 = float(finger3)
self.fai_pub.publish(msg)
if self.recording:
t = time()
self.data['fair'].append((t, right))
self.data['fail'].append((t, left))
self.data['faim'].append((t, middle))
def compute_faii(self):
filtered_acc = signal.lfilter(self.b1, self.a1, self.acc, axis=0)
self.faii = np.sqrt((filtered_acc**2).sum(axis=1))
# Publish just the last value
val = self.faii[-1]
self.faii_pub.publish(Float64(val))
if self.recording:
t = time()
self.data['faii'].append((t, val))
def __gausscoeff(s):
"""Python implementation of the algorithm by Young & Vliet, 1995."""
if s < .5:
raise ValueError('Sigma for Gaussian filter must be >0.5 samples')
q = 0.98711*s - 0.96330 if s > 0.5 else 3.97156 \
- 4.14554*np.sqrt(1.0 - 0.26891*s)
b = np.zeros(4)
b[0] = 1.57825 + 2.44413*q + 1.4281*q**2 + 0.422205*q**3
b[1] = 2.44413*q + 2.85619*q**2 + 1.26661*q**3
b[2] = -(1.4281*q**2 + 1.26661*q**3)
b[3] = 0.422205*q**3
B = 1.0 - ((b[1] + b[2] + b[3])/b[0])
# convert to a format compatible with lfilter's
# difference equation
B = np.array([B])
A = np.ones(4)
A[1:] = -b[1:]/b[0]
return B, A
def Gaussian1D(signal, sigma, padding=0):
n = signal.shape[0]
tmp = np.zeros(n + padding)
if tmp.shape[0] < 4:
raise ValueError('Signal and padding too short')
tmp[:n] = signal
B, A = __gausscoeff(sigma)
tmp = lfilter(B, A, tmp)
tmp = tmp[::-1]
tmp = lfilter(B, A, tmp)
tmp = tmp[::-1]
return tmp[:n]
def G_inv_mat(x,mode,NT,gs,gd_vec,bas_flag = True, c1_flag = True):
"""
Fast computation of G^{-1}*x and G^{-T}*x
"""
from scipy.signal import lfilter
if mode == 1:
b = lfilter(np.array([1]),np.concatenate([np.array([1.]),-gs]),x[:NT]) + bas_flag*x[NT-1+bas_flag] + c1_flag*gd_vec*x[-1]
#b = np.dot(Emat,b)
elif mode == 2:
#x = np.dot(Emat.T,x)
b = np.hstack((np.flipud(lfilter(np.array([1]),np.concatenate([np.array([1.]),-gs]),np.flipud(x))),np.ones(bas_flag)*np.sum(x),np.ones(c1_flag)*np.sum(gd_vec*x)))
return b