def generate_noise(D,N):
"""Generate data for the changepoint detection. Data can either be of type 0
or type 1, but when it's a combination fo both, we define a target label
Input
- D,N Dimenstionality arguments D dimensions over N samples
Output
- Data in format
X is a matrix in R^{N x D}
y is a matrix in R^{N,} not to donfuse with {N,1}"""
#Check if we have even D, so we can split the array in future
assert D%2 == 0, 'We need even number of dimensions'
Dhalf = int(D/2)
ratioP = 0.5 #balance of targets
X = np.random.randn(N,D)
y = np.zeros(N)
#Generate two filter cofficients
filters = {}
filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass')
filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass')
for i in xrange(N):
if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont
signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i])
signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i])
X[i] = np.concatenate((signalA[:Dhalf],signalB[:Dhalf]),axis=0) #Concatenate the two signals
if True: #Boolean: do you want to introduce a pattern at the changepoint?
Dstart = int(Dhalf-pattern_len/2)
X[i,Dstart:Dstart+pattern_len] = pattern
y[i] = 1 #The target label
else:
mode = int(np.random.rand()>ratioP)
X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i])
y[i] = 0 #The target label
return X,y
python类filtfilt()的实例源码
def generate_noise(D,N):
"""Generate data for the changepoint detection. Data can either be of type 0
or type 1, but when it's a combination fo both, we define a target label
Input
- D,N Dimenstionality arguments D dimensions over N samples
Output
- Data in format
X is a matrix in R^{N x D}
y is a matrix in R^{N,} not to donfuse with {N,1}"""
#Check if we have even D, so we can split the array in future
assert D%2 == 0, 'We need even number of dimensions'
Dhalf = int(D/2)
ratioP = 0.5 #balance of targets
X = np.random.randn(N,D)
y = np.zeros(N)
mark = np.zeros(N)
#Generate two filter cofficients
filters = {}
filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass')
filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass')
for i in xrange(N):
if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont
Dcut = np.random.randint(pattern_len,D-pattern_len)
signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i])
signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i])
X[i] = np.concatenate((signalA[:Dcut],signalB[Dcut:]),axis=0) #Concatenate the two signals
if True: #Boolean: do you want to introduce a pattern at the changepoint?
Dstart = int(Dcut - pattern_len/2)
X[i,Dstart:Dstart+pattern_len] = pattern
y[i] = 1 #The target label
mark[i] = Dcut
else:
mode = int(np.random.rand()>ratioP)
X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i])
y[i] = 0 #The target label
return X,y,mark
def generate_noise(D,N):
"""Generate data for the changepoint detection. Data can either be of type 0
or type 1, but when it's a combination fo both, we define a target label
Input
- D,N Dimenstionality arguments D dimensions over N samples
Output
- Data in format
X is a matrix in R^{N x D}
y is a matrix in R^{N,} not to donfuse with {N,1}"""
#Check if we have even D, so we can split the array in future
assert D%2 == 0, 'We need even number of dimensions'
ratioP = 0.5 #balance of targets
X = np.random.randn(N,D)
y = np.zeros(N)
mark = np.zeros(N)
#Generate two filter cofficients
filters = {}
filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass')
filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass')
for i in xrange(N):
if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont
Dcut = np.random.randint(pattern_len,D-pattern_len)
signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i])
signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i])
X[i] = np.concatenate((signalA[:Dcut],signalB[Dcut:]),axis=0) #Concatenate the two signals
if True: #Boolean: do you want to introduce a pattern at the changepoint?
Dstart = int(Dcut - pattern_len/2)
X[i,Dstart:Dstart+pattern_len] = pattern
y[i] = 1 #The target label
mark[i] = Dcut
else:
mode = int(np.random.rand()>ratioP)
X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i])
y[i] = 0 #The target label
return X,y,mark
def high_pass(self,data_buffer):
[b1, a1] = [self.high_pass_coefficients[0],self.high_pass_coefficients[1]]
high_passed = signal.filtfilt(b1,a1,data_buffer)
return high_passed
def low_pass(self,data_buffer):
[b, a] = [self.low_pass_coefficients[0],self.low_pass_coefficients[1]]
low_passed = signal.filtfilt(b,a,data_buffer)
return low_passed
def _decimate(x, q):
"""
Downsample the signal after low-pass filtering to avoid aliasing.
An order 16 Chebyshev type I filter is used.
Parameters
----------
x : ndarray
The signal to be downsampled, as an N-dimensional array.
q : int
The downsampling factor.
Returns
-------
y : ndarray
The down-sampled signal.
"""
if not isinstance(q, int):
raise TypeError("q must be an integer")
b, a = signal.filter_design.cheby1(16, 0.025, 0.98 / q)
y = signal.filtfilt(b, a, x, axis=-1)
sl = [slice(None)] * y.ndim
sl[-1] = slice(None, None, q)
return y[sl]
def _update_raw_data(params):
"""Helper only needs to be called when time or proj is changed"""
from scipy.signal import filtfilt
start = params['t_start']
stop = params['raw'].time_as_index(start + params['duration'])[0]
start = params['raw'].time_as_index(start)[0]
data_picks = _pick_data_channels(params['raw'].info)
data, times = params['raw'][:, start:stop]
if params['projector'] is not None:
data = np.dot(params['projector'], data)
# remove DC
if params['remove_dc'] is True:
data -= np.mean(data, axis=1)[:, np.newaxis]
if params['ba'] is not None:
data[data_picks] = filtfilt(params['ba'][0], params['ba'][1],
data[data_picks], axis=1, padlen=0)
# scale
for di in range(data.shape[0]):
data[di] /= params['scalings'][params['types'][di]]
# stim channels should be hard limited
if params['types'][di] == 'stim':
norm = float(max(data[di]))
data[di] /= norm if norm > 0 else 1.
# clip
if params['clipping'] == 'transparent':
data[np.logical_or(data > 1, data < -1)] = np.nan
elif params['clipping'] == 'clamp':
data = np.clip(data, -1, 1, data)
params['data'] = data
params['times'] = times
def _filtfilt(*args, **kwargs):
"""wrap filtfilt, excluding padding arguments"""
from scipy.signal import filtfilt
# cut out filter args
if len(args) > 4:
args = args[:4]
if 'padlen' in kwargs:
del kwargs['padlen']
return filtfilt(*args, **kwargs)
def get_filtfilt():
"""Helper to get filtfilt from scipy"""
from scipy.signal import filtfilt
if 'padlen' in _get_args(filtfilt):
return filtfilt
return _filtfilt
def apply_filter(x, filter=None, axis=0):
"""Apply a filter to an array."""
if isinstance(x, list):
x = np.asarray(x)
if x.shape[axis] == 0:
return x
b, a = filter
return signal.filtfilt(b, a, x[:], axis=axis)
def bandfilter(data,fa=None,fb=None,Fs=1000.,order=4,zerophase=True,bandstop=False):
N = len(data)
assert len(shape(data))==1
padded = zeros(2*N,dtype=data.dtype)
padded[N/2:N/2+N]=data
padded[:N/2]=data[N/2:0:-1]
padded[N/2+N:]=data[-1:N/2-1:-1]
if not fa==None and not fb==None:
if bandstop:
b,a = butter(order,array([fa,fb])/(0.5*Fs),btype='bandstop')
else:
b,a = butter(order,array([fa,fb])/(0.5*Fs),btype='bandpass')
elif not fa==None:
# high pass
b,a = butter(order,fa/(0.5*Fs),btype='high')
assert not bandstop
elif not fb==None:
# low pass
b,a = butter(order,fb/(0.5*Fs),btype='low')
assert not bandstop
else:
assert 0
if zerophase:
return filtfilt(b,a,padded)[N/2:N/2+N]
else:
return lfilter(b,a,padded)[N/2:N/2+N]
assert 0
# now try it with a broad spectrum
def bandpass_filter(data,fa=None,fb=None,
Fs=1000.,order=4,zerophase=True,bandstop=False):
'''
IF fa is None, assumes lowpass with cutoff fb
IF fb is None, assume highpass with cutoff fa
Array can be any dimension, filtering performed over last dimension
Args:
data (ndarray): data, filtering performed over last dimension
fa (number): low-frequency cutoff. If none, highpass at fb
fb (number): high-frequency cutoff. If none, lowpass at fa
order (1..6): butterworth filter order. Default 4
zerophase (boolean): Use forward-backward filtering? (true)
bandstop (boolean): Do band-stop rather than band-pass
Parameters
----------
Returns
-------
'''
N = data.shape[-1]
padded = np.zeros(data.shape[:-1]+(2*N,),dtype=data.dtype)
padded[...,N//2 :N//2+N] = data
padded[..., :N//2 ] = data[...,N//2:0 :-1]
padded[...,N//2+N: ] = data[...,-1 :N//2-1:-1]
if not fa is None and not fb is None:
if bandstop:
b,a = butter(order,np.array([fa,fb])/(0.5*Fs),btype='bandstop')
else:
b,a = butter(order,np.array([fa,fb])/(0.5*Fs),btype='bandpass')
elif not fa==None:
# high pass
b,a = butter(order,fa/(0.5*Fs),btype='high')
assert not bandstop
elif not fb==None:
# low pass
b,a = butter(order,fb/(0.5*Fs),btype='low')
assert not bandstop
else: raise Exception('Both fa and fb appear to be None')
return (filtfilt if zerophase else lfilter)(b,a,padded)[...,N//2:N//2+N]
assert 0
def bandpass(w, freqlo, freqhi, fs, npass=2):
wn = [2*freqlo/fs, 2*freqhi/fs]
(b,a) = signal.butter(npass, wn, btype='band')
w = signal.lfilter(b, a, w)
return w
# A forward-backward linear filter (filtfilt) (added by DmBorisov)
def bandpass2(w, freqlo, freqhi, fs, npass=3):
wn = [2*freqlo/fs, 2*freqhi/fs]
(b,a) = signal.butter(npass, wn, 'bandpass')
w = signal.filtfilt(b, a, w)
return w
signal_filtering.py 文件源码
项目:HAR-stacked-residual-bidir-LSTMs
作者: guillaume-chevalier
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def butter_lowpass_filter(data, cutoff_freq, nyq_freq, order=4):
# Build and apply filter to data (signal)
b, a = butter_lowpass(cutoff_freq, nyq_freq, order=order)
y = signal.filtfilt(b, a, data)
return y
def calcPhase():
ipctx1 = dict()
ipctx1["ip"] = np.zeros((1,2001))
dt = p.params["dt"]
randSamp = np.random.randint(0,len(AllAs),1000)
# Phases of three nuclei
phasMus = dict()
Flags = []
Flags.append("SWA")
phasTAvsTA = []
phasTIvsSTN = []
phasTAvsSTN = []
for i,samp in enumerate(randSamp):
#temp = np.zeros((8,len(freqs),len(fftfreq)/10+1))
A = AllAs[samp]
B = AllBs[samp]
Rates = psGA.calcRates(Flags,1.0,A,B,False,ipctx1,iptau=p.params["iptau"])
# Filter the signals first at SWA frequency, or the phase difference between cortical input signal and TA,TI not calculated correctly due to multiple frequencies in the signal
#B, A = sciSig.butter(2,np.array([0.0001,0.0005]),btype='low')
B, A = sciSig.butter(2,np.array([0.00005]),btype='low')
taFilt = sciSig.filtfilt(B, A, Rates['ta'])#, padlen=150)
tiFilt = sciSig.filtfilt(B, A, Rates['ti'])#, padlen=150)
stnFilt = sciSig.filtfilt(B, A, Rates['stn'])#, padlen=150)
tG = np.arange(0,len(ipctx1["ip"][0])+dt,dt)
fftfreq = np.fft.fftfreq(len(taFilt),d=dt)
fftta = np.fft.rfft(taFilt-np.mean(taFilt))
fftti = np.fft.rfft(tiFilt-np.mean(tiFilt))
fftstn = np.fft.rfft(stnFilt-np.mean(stnFilt))
fftip = np.fft.rfft(Rates['ipctx'] -np.mean(Rates['ipctx']))
maxta = np.where(np.abs(fftta)==np.max(np.abs(fftta)))[0]
maxti = np.where(np.abs(fftti)==np.max(np.abs(fftti)))[0]
maxstn = np.where(np.abs(fftstn) == np.max(np.abs(fftstn)))[0]
maxip = np.where(np.abs(fftip) == np.max(np.abs(fftip)))[0]
print "maxta,maxti,maxstn,maxip",maxta,maxti,maxstn,maxip
phasTAvsTA.append(np.angle(fftta[maxta]/fftta[maxta]))
phasTIvsSTN.append(np.mean([np.angle(fftti[maxti]/fftstn[maxti]),np.angle(fftti[maxstn]/fftstn[maxstn])] ))
phasTAvsSTN.append(np.mean([np.angle(fftta[maxta]/fftstn[maxta]),np.angle(fftta[maxstn]/fftstn[maxstn])] ))
phasMus["ta_ta"] = phasTAvsTA
phasMus["stn_ti"] = phasTIvsSTN
phasMus["stn_ta"] = phasTAvsSTN
pickle.dump(phasMus,open(path5+"Phases.pickle","w"))
def filtdata(x, sf, f, axis, filt, cycle, filtorder):
"""Filt the data using a forward/backward filter to avoid phase shifting.
Parameters
----------
x : array_like
Array of data
sf : float
Sampling frequency
f : array_like
Frequency vector of shape (N, 2)
axis : int
Axis where the time is located.
filt : string
Name of the filter to use (only if dcomplex is 'hilbert'). Use
either 'eegfilt', 'butter' or 'bessel'.
filtorder : int
Order of the filter (only if dcomplex is 'hilbert')
cycle : int
Number of cycles to use for fir1 filtering.
"""
# fir1 filter :
if filt == 'fir1':
forder = fir_order(sf, x.shape[axis], f[0], cycle=cycle)
b, a = fir1(forder, f / (sf / 2))
# butterworth filter :
elif filt == 'butter':
b, a = butter(filtorder, [(2 * f[0]) / sf, (2 * f[1]) / sf],
btype='bandpass')
forder = None
# bessel filter :
elif filt == 'bessel':
b, a = bessel(filtorder, [(2 * f[0]) / sf, (2 * f[1]) / sf],
btype='bandpass')
forder = None
return filtfilt(b, a, x, padlen=forder, axis=axis)
###############################################################################
###############################################################################
# FILTER ORDER
###############################################################################
###############################################################################
def bandpass_filter(data, lowcut=None, highcut=None, *, numtaps=None,
fs=None):
"""Band filter data using a zero phase FIR filter (filtfilt).
Parameters
----------
data : AnalogSignalArray, ndarray, or list
lowcut : float, optional (default 1 Hz)
Lower cut-off frequency
highcut : float, optional (default 600 Hz)
Upper cut-off frequency
numtaps : int, optional (default 25)
Number of filter taps
fs : float, optional if AnalogSignalArray is passed
Sampling frequency (Hz)
Returns
-------
filtered : same type as data
"""
if numtaps is None:
numtaps = 25
if lowcut is None:
lowcut = 1
if highcut is None:
highcut = 600
if isinstance(data, (np.ndarray, list)):
if fs is None:
raise ValueError("sampling frequency must be specified!")
# Generate filter for detection
b = firwin(numtaps=numtaps,
cutoff=[lowcut/(fs/2), highcut/(fs/2)],
pass_zero=False)
# Filter raw data to get ripple data
ripple_data = filtfilt(b, 1, data)
return ripple_data
elif isinstance(data, AnalogSignalArray):
if fs is None:
fs = data.fs
warnings.warn("no sampling frequency provided,"
" using fs={} Hz from AnalogSignalArray".format(fs))
# Generate filter for detection
b = firwin(numtaps=numtaps,
cutoff=[lowcut/(fs/2), highcut/(fs/2)],
pass_zero=False)
# Filter raw data to get ripple data
ripple_data = filtfilt(b,1,data.ydata)
# Return a copy of the AnalogSignalArray with the filtered data
filtered_analogsignalarray = data.copy()
filtered_analogsignalarray._ydata = ripple_data
return filtered_analogsignalarray
else:
raise TypeError(
"Unknown data type {} to filter.".format(str(type(data))))
def spike_filtfilt(data, lowcut=None, highcut=None, *, fs=None, verbose=False):
"""Filter data to the spike band (default 600--6000 Hz).
Parameters
----------
data : AnalogSignalArray, ndarray, or list
lowcut : float, optional (default 600 Hz)
Lower cut-off frequency
highcut : float, optional (default 6000 Hz)
Upper cut-off frequency
fs : float, optional if AnalogSignalArray is passed
Sampling frequency (Hz)
Returns
-------
filtered : same type as data
"""
if isinstance(data, (np.ndarray, list)):
if fs is None:
raise ValueError("sampling frequency must be specified!")
elif isinstance(data, AnalogSignalArray):
if fs is None:
fs = data.fs
if lowcut is None:
lowcut = 600
if highcut is None:
highcut = 6000
[b, a] = butter(2, lowcut/(fs/2), btype='highpass')
[bhigh, ahigh] = butter(1, highcut/(fs/2))
if isinstance(data, (np.ndarray, list)):
# Filter raw data
spikedata = filtfilt(b, a,filtfilt(bhigh, ahigh, data))
return spikedata
elif isinstance(data, AnalogSignalArray):
spikedata = filtfilt(b, a, filtfilt(bhigh, ahigh, data.ydata))
# Return a copy of the AnalogSignalArray with the filtered data
out = copy.copy(data)
out._ydata = spikedata
return out
def butterLP(self):
#===== parameter =====
coefficients = self.trendLine()
axis = str(self.axis_combobox.currentText())
N=10
time_interval = 0.007
sample_rate = int(1/time_interval)
Nquist_freq = 0.5 * sample_rate
Wn = float(self.filtbutter_lp_slider.value()) / 1430.0
#===== input =====
if axis[0] =='a':
input_signal = self.acc_normalize(self.raw_data[axis])
elif axis[0]=='g':
input_signal = self.gyro_normalize(self.raw_data[axis])
elif axis[0]=='m':
input_signal = self.raw_data[axis]
#===== caculation area =====
b, a = signal.butter(N, Wn, 'low')
#d, c = signal.bessel(N, Wn, 'low')
butter_lp_signal = signal.filtfilt(b, a, input_signal)
#===== output =====
raw_a, raw_v, raw_s = self.another_integral(input_signal)
butter_lp_a, butter_lp_v, butter_lp_s = self.another_integral(butter_lp_signal) #butter_lp_signal
#bessel_lp_a, bessel_lp_v, bessel_lp_s = self.basic_basic_integral(bessel_lp_signal)
print "Max: " + str(max(butter_lp_s)) + "; Min: " + str(min(butter_lp_s))
#===== raw vs filt =====
plt.title(axis)
plt.subplot(311)
plt.grid(True)
plt.plot(raw_a, "blue", label="raw_a")
plt.plot(butter_lp_a, "red", label="butter_lp_a")
plt.legend(loc=2, fontsize = 'medium')
plt.subplot(312)
plt.grid(True)
plt.plot(raw_v, "blue", label="raw_a")
plt.plot(butter_lp_v, "red", label="butter_lp_v")
plt.legend(loc=2, fontsize = 'medium')
plt.subplot(313)
plt.grid(True)
plt.plot(raw_s, "blue", label="raw_a")
plt.plot(butter_lp_s, "red", label="butter_lp_s")
plt.legend(loc=2, fontsize = 'medium')
plt.show()
self.filtbutter_lp_slider.setFocus()