def applyFilter(data, b, a, padding=100, bidir=True):
"""Apply a linear filter with coefficients a, b. Optionally pad the data before filtering
and/or run the filter in both directions."""
try:
import scipy.signal
except ImportError:
raise Exception("applyFilter() requires the package scipy.signal.")
d1 = data.view(np.ndarray)
if padding > 0:
d1 = np.hstack([d1[:padding], d1, d1[-padding:]])
if bidir:
d1 = scipy.signal.lfilter(b, a, scipy.signal.lfilter(b, a, d1)[::-1])[::-1]
else:
d1 = scipy.signal.lfilter(b, a, d1)
if padding > 0:
d1 = d1[padding:-padding]
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return MetaArray(d1, info=data.infoCopy())
else:
return d1
python类signal()的实例源码
def applyFilter(data, b, a, padding=100, bidir=True):
"""Apply a linear filter with coefficients a, b. Optionally pad the data before filtering
and/or run the filter in both directions."""
try:
import scipy.signal
except ImportError:
raise Exception("applyFilter() requires the package scipy.signal.")
d1 = data.view(np.ndarray)
if padding > 0:
d1 = np.hstack([d1[:padding], d1, d1[-padding:]])
if bidir:
d1 = scipy.signal.lfilter(b, a, scipy.signal.lfilter(b, a, d1)[::-1])[::-1]
else:
d1 = scipy.signal.lfilter(b, a, d1)
if padding > 0:
d1 = d1[padding:-padding]
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return MetaArray(d1, info=data.infoCopy())
else:
return d1
def besselFilter(data, cutoff, order=1, dt=None, btype='low', bidir=True):
"""return data passed through bessel filter"""
try:
import scipy.signal
except ImportError:
raise Exception("besselFilter() requires the package scipy.signal.")
if dt is None:
try:
tvals = data.xvals('Time')
dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
except:
dt = 1.0
b,a = scipy.signal.bessel(order, cutoff * dt, btype=btype)
return applyFilter(data, b, a, bidir=bidir)
#base = data.mean()
#d1 = scipy.signal.lfilter(b, a, data.view(ndarray)-base) + base
#if (hasattr(data, 'implements') and data.implements('MetaArray')):
#return MetaArray(d1, info=data.infoCopy())
#return d1
def butterworthFilter(data, wPass, wStop=None, gPass=2.0, gStop=20.0, order=1, dt=None, btype='low', bidir=True):
"""return data passed through bessel filter"""
try:
import scipy.signal
except ImportError:
raise Exception("butterworthFilter() requires the package scipy.signal.")
if dt is None:
try:
tvals = data.xvals('Time')
dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
except:
dt = 1.0
if wStop is None:
wStop = wPass * 2.0
ord, Wn = scipy.signal.buttord(wPass*dt*2., wStop*dt*2., gPass, gStop)
#print "butterworth ord %f Wn %f c %f sc %f" % (ord, Wn, cutoff, stopCutoff)
b,a = scipy.signal.butter(ord, Wn, btype=btype)
return applyFilter(data, b, a, bidir=bidir)
def butterworthFilter(data, wPass, wStop=None, gPass=2.0, gStop=20.0, order=1, dt=None, btype='low', bidir=True):
"""return data passed through bessel filter"""
try:
import scipy.signal
except ImportError:
raise Exception("butterworthFilter() requires the package scipy.signal.")
if dt is None:
try:
tvals = data.xvals('Time')
dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
except:
dt = 1.0
if wStop is None:
wStop = wPass * 2.0
ord, Wn = scipy.signal.buttord(wPass*dt*2., wStop*dt*2., gPass, gStop)
#print "butterworth ord %f Wn %f c %f sc %f" % (ord, Wn, cutoff, stopCutoff)
b,a = scipy.signal.butter(ord, Wn, btype=btype)
return applyFilter(data, b, a, bidir=bidir)
def differentiator(n, Hz=1):
"""
Return linear phase impulse response for a length *n* filter that
approximates the differential operator. The sampling frequency is
*Hz*.
The filter length *n* must be even. The remez function returns
type 3 (for *n* odd) and 4 (for *n* even) linear phase
filters. However, type 3 linear phase filters are 0 at $\omega =
0$ and $\omega = \pi$.
"""
if n % 2 == 1:
raise ValueError('the filter length n must be even')
return scipy.signal.remez(n,
[0, Hz / 2],
[1],
Hz=Hz,
type='differentiator') * Hz * 2 * NP.pi
def __init__(self, n, axis, order=2, mode='valid'):
"""
Construct gradient operator for signal of dimension *n* for
dimension *axis*. Use a filter kernel of length *order* (must
be even). Use convolution type *mode*.
"""
self.n = n
self.ndim = len(self.n)
self.axis = axis
if axis < 0 or axis >= self.ndim:
raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim))
self.d = differentiator(order)
h_list = []
for i in reversed(range(self.ndim)):
if i == axis:
h_list.append(self.d)
else:
h_list.append(NP.array([1]))
super(GradientFilter, self).__init__(n, h_list, mode=mode)
def reSample( df , dt = None , xAxis = None , n = None , kind = 'linear') :
""" re-sample the signal """
if type(df) == pd.Series : df = pd.DataFrame(df)
f = interp1d( df.index, np.transpose(df.values) , kind=kind, axis=-1, copy=True, bounds_error=True, assume_sorted=True)
if dt :
end = int(+(df.index[-1] - df.index[0] ) / dt) * dt + df.index[0]
xAxis = np.linspace( df.index[0] , end , 1+int(+(end - df.index[0] ) / dt) )
elif n :
xAxis = np.linspace( df.index[0] , df.index[-1] , n )
elif xAxis == None :
raise(Exception("reSample : either dt or xAxis should be provided" ))
#For rounding issue, ensure that xAxis is within ts.xAxis
#xAxis[ np.where( xAxis > np.max(df.index[:]) ) ] = df.index[ np.where( xAxis > np.max(df.index[:]) ) ]
return pd.DataFrame( data = np.transpose(f(xAxis)), index = xAxis , columns = map( lambda x : "reSample("+ x +")" , df.columns ) )
def getPSD( df , dw = 0.05, roverlap = 0.5, window='hanning', detrend='constant') :
"""
Compute the power spectral density
"""
if type(df) == pd.Series : df = pd.DataFrame(df)
nfft = int ( (2*pi / dw) / dx(df) )
nperseg = 2**int(log(nfft)/log(2))
noverlap = nperseg * roverlap
""" Return the PSD of a time signal """
try :
from scipy.signal import welch
except :
raise Exception("Welch function not found, please install scipy > 0.12")
data = []
for iSig in range(df.shape[1]) :
test = welch( df.values[:,iSig] , fs = 1. / dx(df) , window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density')
data.append( test[1] / (2*pi) )
xAxis = test[0][:] * 2*pi
return pd.DataFrame( data = np.transpose(data), index = xAxis , columns = [ "psd("+ str(x) +")" for x in df.columns ] )
def derivFFT(df, n=1 ) :
""" Deriv a signal trought FFT, warning, edge can be a bit noisy...
indexList : channel to derive
n : order of derivation
"""
deriv = []
for iSig in range(df.shape[1]) :
fft = np.fft.fft( df.values[:,iSig] ) #FFT
freq = np.fft.fftfreq( df.shape[0] , dx(df) )
from copy import deepcopy
fft0 = deepcopy(fft)
if n>0 :
fft *= (1j * 2*pi* freq[:])**n #Derivation in frequency domain
else :
fft[-n:] *= (1j * 2*pi* freq[-n:])**n
fft[0:-n] = 0.
tts = np.real(np.fft.ifft(fft))
tts -= tts[0]
deriv.append( tts ) #Inverse FFT
return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "DerivFFT("+ x +")" for x in df.columns ] )
def load_segment(segment_path, old_segment_format=True, normalize_signal=False, resample_frequency=None):
"""
Convienience function for loading segments
:param segment_path: Path to the segment file to load.
:param old_segment_format: If True, the old format will be used. If False, the format backed by a pandas
dataframe will be used.
:param normalize_signal: If True, the signal will be normalized using the subject median and median absolute
deviation.
:param resample_frequency: If this is set to a number, the signal will be resampled to that frequency.
:return: A Segment or DFSegment object with the data from the segment in *segment_path*.
"""
if normalize_signal:
return load_and_standardize(segment_path, old_segment_format=old_segment_format)
else:
if old_segment_format:
segment = Segment(segment_path)
else:
segment = DFSegment.from_mat_file(segment_path)
if resample_frequency is not None:
segment.resample_frequency(resample_frequency, inplace=True)
return segment
def highpass_cnt(data, low_cut_hz, fs, filt_order=3, axis=0):
"""
Highpass signal applying **causal** butterworth filter of given order.
Parameters
----------
data: 2d-array
Time x channels
low_cut_hz: float
fs: float
filt_order: int
Returns
-------
highpassed_data: 2d-array
Data after applying highpass filter.
"""
if (low_cut_hz is None) or (low_cut_hz == 0):
log.info("Not doing any highpass, since low 0 or None")
return data.copy()
b, a = scipy.signal.butter(filt_order, low_cut_hz / (fs / 2.0),
btype='highpass')
assert filter_is_stable(a)
data_highpassed = scipy.signal.lfilter(b, a, data, axis=axis)
return data_highpassed
def _damp_flux(flux, taperwidth):
"""Apply damping to the sides of the input flux. The damping applies to a width of
size taperwidth on each side of the domain.
Currently, uses a Tukey window.
Inputs:
flux input signal on which to damp sides (array)
taperwidth width over which to apply damping at each boundary, as a fraction of domain size (scalar)
Outputs:
dampedFlux signal with edges damped (array)
"""
alpha = taperwidth * 2 # alpha sets the width of the damping for the entire signal, accounting for both left and right boundaries
tukeyWindow = scipy.signal.tukey(len(flux), alpha)
dampedFlux = flux * tukeyWindow
return dampedFlux
def get_transfer_function(self, in_name, out_name, in_type='v', atol=0.0):
# type: (str, str, str, float) -> TransferFunctionContinuous
"""Compute the transfer function between the two given nodes.
Parameters
----------
in_name : str
the input voltage/current node name.
out_name : Union[str, List[str]]
the output voltage node name.
in_type : str
set to 'v' for input voltage sources. Otherwise, current sources.
atol : float
absolute tolerance for checking zeros in the numerator. Used to filter out scipy warnings.
Returns
-------
system : TransferFunctionContinuous
the scipy transfer function object. See scipy.signal package on how to use this object.
"""
num, den = self.get_num_den(in_name, out_name, in_type=in_type, atol=atol)
return TransferFunctionContinuous(num, den)
def test_scipy(pyi_builder):
pyi_builder.test_source(
"""
from distutils.version import LooseVersion
# Test top-level SciPy importability.
import scipy
from scipy import *
# Test hooked SciPy modules.
import scipy.io.matlab
import scipy.sparse.csgraph
# Test problematic SciPy modules.
import scipy.linalg
import scipy.signal
# SciPy >= 0.16 privatized the previously public "scipy.lib" package as
# "scipy._lib". Since this package is problematic, test its
# importability regardless of SciPy version.
if LooseVersion(scipy.__version__) >= LooseVersion('0.16.0'):
import scipy._lib
else:
import scipy.lib
""")
def butter_low(dt_list, val, lowpass=1.0):
import scipy.signal
#val_mask = np.ma.getmaskarray(val)
#dt is 300 s, 5 min
dt_diff = np.diff(dt_list)
if isinstance(dt_diff[0], float):
dt_diff *= 86400.
else:
dt_diff = np.array([dt.total_seconds() for dt in dt_diff])
dt = malib.fast_median(dt_diff)
#f is 0.00333 Hz
#288 samples/day
fs = 1./dt
nyq = fs/2.
order = 3
f_max = (1./(86400*lowpass)) / nyq
b, a = scipy.signal.butter(order, f_max, btype='lowpass')
#b, a = sp.signal.butter(order, (f_min, f_max), btype='bandstop')
w, h = scipy.signal.freqz(b, a, worN=2000)
# w_f = (nyq/np.pi)*w
# w_f_days = 1/w_f/86400.
#plt.plot(w_f_days, np.abs(h))
val_f = scipy.signal.filtfilt(b, a, val)
return val_f
def is_periodic(aud_sample, SAMPLING_RATE = 8000):
'''
:param aud_sample: Numpy 1D array rep of audio sample
:param SAMPLING_RATE: Used to focus on human speech freq range
:return: True if periodic, False if aperiodic
'''
threshold = 20
# Use auto-correlation to find if there is enough periodicity in [50-400] Hz range
values = signal.correlate(aud_sample, aud_sample, mode='full')
# [50-400 Hz] corresponds to [2.5-20] ms OR [20-160] samples for 8 KHz sampling rate
l_idx = int(SAMPLING_RATE*2.5/1000)
r_idx = int(SAMPLING_RATE*20/1000)
values = values[len(values)/2:]
subset_values = values[l_idx:r_idx]
if np.argmax(subset_values) < threshold:
return False
else:
return True
def test_despreader_peaks(pos):
"""Test correlation peaks with signal at different positions."""
estimator = soa_estimator.SoaEstimator(TEMPLATE, None,
BLOCK_LEN,
len(TEMPLATE))
block = gen_block(pos)
fft = np.fft.fft(block)
corr = estimator.despread(fft)
corr_abs = np.abs(corr)
assert len(corr) == BLOCK_LEN - len(TEMPLATE) + 1
if pos <= BLOCK_LEN - len(TEMPLATE):
peak_idx = np.argmax(corr_abs)
non_peak = np.delete(corr_abs, peak_idx)
assert peak_idx == pos
assert corr_abs[peak_idx] >= PEAK_MAG - 0.1
np.testing.assert_array_less(non_peak, MAX_SIDEBAND + 0.1)
else:
np.testing.assert_array_less(corr_abs, MAX_SIDEBAND + 0.1)
def DCT4(samples):
"""
Method to create DCT4 transformation using DCT3
Arguments :
samples : (1D Array) Input samples to be transformed
Returns :
y : (1D Array) Transformed output samples
"""
# Initialize
samplesup=np.zeros(2*N, dtype = np.float32)
# Upsample signal:
samplesup[1::2]=samples
y=spfft.dct(samplesup,type=3,norm='ortho')*np.sqrt(2)#/2
return y[0:N]
#The DST4 transform:
def DST4(samples):
"""
Method to create DST4 transformation using DST3
Arguments :
samples : (1D Array) Input samples to be transformed
Returns :
y : (1D Array) Transformed output samples
"""
# Initialize
samplesup=np.zeros(2*N, dtype = np.float32)
# Upsample signal
# Reverse order to obtain DST4 out of DCT4:
#samplesup[1::2]=np.flipud(samples)
samplesup[0::2] = samples
y = spfft.dst(samplesup,type=3,norm='ortho')*np.sqrt(2)#/2
# Flip sign of every 2nd subband to obtain DST4 out of DCT4
#y=(y[0:N])*(((-1)*np.ones(N, dtype = np.float32))**range(N))
return y[0: N]
def x2polyphase(x, N):
"""
Method to convert input signal x (a row vector) into a polphase row vector
of blocks with length N.
Arguments :
x : (1D Array) Input samples
N : (int) Number of subbands
Returns :
y : (3D Array) Polyphase representation of the input signal
Author : Gerald Schuller ('shl'), Dec/2/15
"""
#Number of blocks in the signal:
L=int(np.floor(len(x)/N))
xp=np.zeros((1,N,L), dtype = np.float32)
for m in range(L):
xp[0,:,m] = x[m*N: (m*N+N)]
return xp
def bandstop(data, fs, start, stop, order = 3):
"""
Apply bandstop filter on data.
:param data: 3D video data
:param fs: sampling frequency
:param order: the order of butter filter used.
:param start: lower frequency where band starts
:param stop: higher frequency where band ends
:return: the filtered 3d data set.
"""
nyq = 0.5 * fs
high = stop / nyq
low = start / nyq
order = order
b, a = scipy.signal.butter(order, [low, high], btype='bandstop')
# zi = scipy.signal.lfiltic(b, a, y=[0.])
dataf = scipy.signal.lfilter(b, a, data)
return dataf
def updateUi(self):
if self.previewCheckBox.checkState(): # Draw preview of filtered signal
currentlySelected = self.stackedWidget.currentWidget()
function = currentlySelected.returnFunction()
input_data = self.centralWidget.getData()
previewElement = ChainElement(name="Preview")
previewElement.function = function
previewElement.input_ = input_data
previewElement.update()
output_data = previewElement.output
data = (*output_data, input_data[1])
if self.previewPlot: # PlotWidget created already
self.previewPlot.redraw(data)
else: # PlotWidget not yet created
self.previewPlot = PlotWidget(data)
self.previewDlg = QDialog()
self.previewDlg.setWindowTitle("Signal Preview")
grid = QGridLayout()
grid.addWidget(self.previewPlot)
self.previewDlg.setLayout(grid)
self.previewDlg.show()
def freq_shift_ramp(x, hza, dt):
N_orig = len(x)
N_padded = 2**nextpow2(N_orig)
t = numpy.arange(0, N_padded)
f_shift = numpy.linspace(hza[0], hza[1], len(x))
f_shift = numpy.append(f_shift, hza[1]*numpy.ones(N_padded-len(x)))
pc1 = f_shift[:-1] - f_shift[1:]
phase_correction = numpy.add.accumulate(
t * dt * numpy.append(numpy.zeros(1), 2*numpy.pi*pc1))
lo = numpy.exp(1j*(2*numpy.pi*dt*f_shift*t + phase_correction))
x0 = numpy.append(x, numpy.zeros(N_padded-N_orig, x.dtype))
h = scipy.signal.hilbert(x0)*lo
ret = h[:N_orig].real
return ret
# avoid most of the round-up-to-power-of-two penalty by
# doing log-n shifts. discontinuity at boundaries,
# but that's OK for JT65 2048-sample symbols.
def __init__(self, from_rate, to_rate):
self.from_rate = from_rate
self.to_rate = to_rate
if self.from_rate > self.to_rate:
# prepare a filter to precede resampling.
self.filter = butter_lowpass(0.45 * self.to_rate,
from_rate,
7)
self.zi = scipy.signal.lfiltic(self.filter[0],
self.filter[1],
[0])
# total number of input and output samples,
# so we can insert/delete to keep long-term
# rates correct.
self.nin = 0
self.nout = 0
# how much will output be delayed?
# in units of output samples.
def cwt_time(data, wavelet, widths, dt, axis):
# wavelets can be complex so output is complex
output = np.zeros((len(widths),) + data.shape, dtype=np.complex)
# compute in time
slices = [None for _ in data.shape]
slices[axis] = slice(None)
for ind, width in enumerate(widths):
# number of points needed to capture wavelet
M = 10 * width / dt
# times to use, centred at zero
t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
# sample wavelet and normalise
norm = (dt / width) ** .5
wavelet_data = norm * wavelet(t, width)
output[ind, :] = scipy.signal.fftconvolve(data,
wavelet_data[slices],
mode='same')
return output
def coi(self):
"""The Cone of Influence is the region near the edges of the
input signal in which edge effects may be important.
Return a tuple (T, S) that describes the edge of the cone
of influence as a single line in (time, scale).
"""
Tmin = self.time.min()
Tmax = self.time.max()
Tmid = Tmin + (Tmax - Tmin) / 2
s = np.logspace(np.log10(self.scales.min()),
np.log10(self.scales.max()),
100)
c1 = Tmin + self.wavelet.coi(s)
c2 = Tmax - self.wavelet.coi(s)
C = np.hstack((c1[np.where(c1 < Tmid)], c2[np.where(c2 > Tmid)]))
S = np.hstack((s[np.where(c1 < Tmid)], s[np.where(c2 > Tmid)]))
# sort w.r.t time
iC = C.argsort()
sC = C[iC]
sS = S[iC]
return sC, sS
def cwt_time(data, wavelet, widths, dt, axis):
# wavelets can be complex so output is complex
output = np.zeros((len(widths),) + data.shape, dtype=np.complex)
# compute in time
slices = [None for _ in data.shape]
slices[axis] = slice(None)
for ind, width in enumerate(widths):
# number of points needed to capture wavelet
M = 10 * width / dt
# times to use, centred at zero
t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
# sample wavelet and normalise
norm = (dt / width) ** .5
wavelet_data = norm * wavelet(t, width)
output[ind, :] = scipy.signal.fftconvolve(data,
wavelet_data[slices],
mode='same')
return output
def mean_csd(perievent_lfp1, perievent_lfp2, window, fs):
"""Computes the mean Cross-Spectral Density (CSD) between perievent slices
Parameters
----------
perievent_lfp1 : nept.AnalogSignal
perievent_lfp2 : nept.AnalogSignal
window : int
fs : int
Returns
-------
freq : np.array
power : np.array
"""
freq, power = scipy.signal.csd(perievent_lfp1.data.T, perievent_lfp2.data.T,
fs=fs, nperseg=window, nfft=int(window*2))
return freq, np.mean(power, axis=0)
def mean_coherence(perievent_lfp1, perievent_lfp2, window, fs):
"""Computes the mean coherence between perievent slices
Parameters
----------
perievent_lfp1 : nept.AnalogSignal
perievent_lfp2 : nept.AnalogSignal
window : int
fs : int
Returns
-------
freq : np.array
coherence : np.array
"""
freq, coherence = scipy.signal.coherence(
perievent_lfp1.data.T, perievent_lfp2.data.T, fs=fs, nperseg=window, nfft=int(window*2))
return freq, np.mean(coherence, axis=0)