def add_delta_deltas(filterbanks, name=None):
"""Compute time first and second-order derivative channels.
Args:
filterbanks: float32 tensor with shape [batch_size, len, num_bins, 1]
name: scope name
Returns:
float32 tensor with shape [batch_size, len, num_bins, 3]
"""
delta_filter = np.array([2, 1, 0, -1, -2])
delta_delta_filter = scipy.signal.convolve(delta_filter, delta_filter, "full")
delta_filter_stack = np.array(
[[0] * 4 + [1] + [0] * 4, [0] * 2 + list(delta_filter) + [0] * 2,
list(delta_delta_filter)],
dtype=np.float32).T[:, None, None, :]
delta_filter_stack /= np.sqrt(
np.sum(delta_filter_stack**2, axis=0, keepdims=True))
filterbanks = tf.nn.conv2d(
filterbanks, delta_filter_stack, [1, 1, 1, 1], "SAME", data_format="NHWC",
name=name)
return filterbanks
python类signal()的实例源码
def test_scipy(pyi_builder):
pyi_builder.test_source(
"""
from distutils.version import LooseVersion
# Test top-level SciPy importability.
import scipy
from scipy import *
# Test hooked SciPy modules.
import scipy.io.matlab
import scipy.sparse.csgraph
# Test problematic SciPy modules.
import scipy.linalg
import scipy.signal
# SciPy >= 0.16 privatized the previously public "scipy.lib" package as
# "scipy._lib". Since this package is problematic, test its
# importability regardless of SciPy version.
if LooseVersion(scipy.__version__) >= LooseVersion('0.16.0'):
import scipy._lib
else:
import scipy.lib
""")
def read_wave_file(filename):
""" Read a wave file from disk
# Arguments
filename : the name of the wave file
# Returns
(fs, x) : (sampling frequency, signal)
"""
if (not os.path.isfile(filename)):
raise ValueError("File does not exist")
s = wave.open(filename, 'rb')
if (s.getnchannels() != 1):
raise ValueError("Wave file should be mono")
# if (s.getframerate() != 22050):
# raise ValueError("Sampling rate of wave file should be 16000")
strsig = s.readframes(s.getnframes())
x = np.fromstring(strsig, np.short)
fs = s.getframerate()
s.close()
x = x/32768.0
return fs, x
def read_wave_file_not_normalized(filename):
""" Read a wave file from disk
# Arguments
filename : the name of the wave file
# Returns
(fs, x) : (sampling frequency, signal)
"""
if (not os.path.isfile(filename)):
raise ValueError("File does not exist")
s = wave.open(filename, 'rb')
if (s.getnchannels() != 1):
raise ValueError("Wave file should be mono")
# if (s.getframerate() != 22050):
# raise ValueError("Sampling rate of wave file should be 16000")
strsig = s.readframes(s.getnframes())
x = np.fromstring(strsig, np.short)
fs = s.getframerate()
s.close()
return fs, x
def cwt_time(data, wavelet, widths, dt, axis):
# wavelets can be complex so output is complex
output = np.zeros((len(widths),) + data.shape, dtype=np.complex)
# compute in time
slices = [None for _ in data.shape]
slices[axis] = slice(None)
for ind, width in enumerate(widths):
# number of points needed to capture wavelet
M = 10 * width / dt
# times to use, centred at zero
t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
# sample wavelet and normalise
norm = (dt / width) ** .5
wavelet_data = norm * wavelet(t, width)
output[ind, :] = scipy.signal.fftconvolve(data,
wavelet_data[slices],
mode='same')
return output
def coi(self):
"""The Cone of Influence is the region near the edges of the
input signal in which edge effects may be important.
Return a tuple (T, S) that describes the edge of the cone
of influence as a single line in (time, scale).
"""
Tmin = self.time.min()
Tmax = self.time.max()
Tmid = Tmin + (Tmax - Tmin) / 2
s = np.logspace(np.log10(self.scales.min()),
np.log10(self.scales.max()),
100)
c1 = Tmin + self.wavelet.coi(s)
c2 = Tmax - self.wavelet.coi(s)
C = np.hstack((c1[np.where(c1 < Tmid)], c2[np.where(c2 > Tmid)]))
S = np.hstack((s[np.where(c1 < Tmid)], s[np.where(c2 > Tmid)]))
# sort w.r.t time
iC = C.argsort()
sC = C[iC]
sS = S[iC]
return sC, sS
def frequency_phase_rotation(values, angle, deg=False):
window_size = 64
noverlap = 32
window = signal.hann(window_size, sym=False)
if not signal.check_COLA(window, len(window), noverlap):
raise Exception('check_COLA failed.')
f, t, Zxx = signal.stft(values, window=window, nperseg=window_size,
noverlap=noverlap
)
Zxx_rotated = np.zeros(Zxx.shape, dtype=np.complex)
for freq_idx in range(Zxx.shape[0]): # Loop over all frequencies
Zxx_rotated[freq_idx] = phase_rotation(Zxx[freq_idx], angle, deg)
t, x = signal.istft(Zxx_rotated, window=window, nperseg=window_size,
noverlap=noverlap
)
return t, x
def adaptiveDetrend(data, x=None, threshold=3.0):
"""Return the signal with baseline removed. Discards outliers from baseline measurement."""
try:
import scipy.signal
except ImportError:
raise Exception("adaptiveDetrend() requires the package scipy.signal.")
if x is None:
x = data.xvals(0)
d = data.view(np.ndarray)
d2 = scipy.signal.detrend(d)
stdev = d2.std()
mask = abs(d2) < stdev*threshold
#d3 = where(mask, 0, d2)
#d4 = d2 - lowPass(d3, cutoffs[1], dt=dt)
lr = scipy.stats.linregress(x[mask], d[mask])
base = lr[1] + lr[0]*x
d4 = d - base
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return MetaArray(d4, info=data.infoCopy())
return d4
def adaptiveDetrend(data, x=None, threshold=3.0):
"""Return the signal with baseline removed. Discards outliers from baseline measurement."""
try:
import scipy.signal
except ImportError:
raise Exception("adaptiveDetrend() requires the package scipy.signal.")
if x is None:
x = data.xvals(0)
d = data.view(np.ndarray)
d2 = scipy.signal.detrend(d)
stdev = d2.std()
mask = abs(d2) < stdev*threshold
#d3 = where(mask, 0, d2)
#d4 = d2 - lowPass(d3, cutoffs[1], dt=dt)
lr = scipy.stats.linregress(x[mask], d[mask])
base = lr[1] + lr[0]*x
d4 = d - base
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return MetaArray(d4, info=data.infoCopy())
return d4
def checker(input_var, desire_size):
'''
check if debug = 1
'''
if input_var is None:
print('input_variable does not exist!')
if desire_size is None:
print('desire_size does not exist!')
dd = numpy.size(desire_size)
dims = numpy.shape(input_var)
# print('dd=',dd,'dims=',dims)
if numpy.isnan(numpy.sum(input_var[:])):
print('input has NaN')
if numpy.ndim(input_var) < dd:
print('input signal has too few dimensions')
if dd > 1:
if dims[0:dd] != desire_size[0:dd]:
print(dims[0:dd])
print(desire_size)
print('input signal has wrong size1')
elif dd == 1:
if dims[0] != desire_size:
print(dims[0])
print(desire_size)
print('input signal has wrong size2')
if numpy.mod(numpy.prod(dims), numpy.prod(desire_size)) != 0:
print('input signal shape is not multiples of desired size!')
def __init__(self, n, axis, order=3, mode='valid'):
"""
Construct a second-order gradient operator for signal of dimension
*n* for dimension *axis*. Use a filter kernel of length
*order* (must be odd). Use convolution type *mode*.
"""
# assert that the filter length is odd
assert(order % 2 == 1)
self.n = n
self.ndim = len(self.n)
self.axis = axis
if axis < 0 or axis >= self.ndim:
raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim))
self.d = differentiator(int(order/2) + 1)
self.d2 = NP.convolve(self.d, self.d)
self.mode = mode
h_list = []
m = []
for i in reversed(range(self.ndim)):
if i == axis:
h_list.append(self.d2)
else:
h_list.append(NP.array([1]))
m.append(len(h_list[-1]))
self.m = m
if mode == 'circ':
n_prime = array(n) - m + 1
super(Gradient2Filter, self).__init__(n_prime, h_list, mode=mode)
else:
super(Gradient2Filter, self).__init__(n, h_list, mode=mode)
def rampDf(df , rStart, rEnd) :
""" Ramp the signal between rStart and rEnd (in place) """
a = ramp_v( df.index[:] , rStart, rEnd )
for c in df.columns :
df[c][:] *= a[:]
return df
def fillCos( A = 1.0, T = 10. , tMin = 0. , tMax = 50. , n = 200) :
""" for testing purpose, fill signal with a cosine """
xAxis = np.linspace( tMin , tMax , n )
data = np.zeros( (n,2) )
data[:,0] = A * np.cos(2*pi*xAxis/T)
data[:,1] = A * np.sin(2*pi*xAxis/T)
return pd.DataFrame( data=data , index = xAxis , columns = [ "Cos" , "Sin" ] )
def slidingFFT( se, T , n = 1 , tStart = None , preSample = False , nHarmo = 5 , kind = abs , phase = None) :
"""
Harmonic analysis on a sliding windows
se : Series to analyse
T : Period
tStart : start _xAxis
n : size of the sliding windows in period.
reSample : reSample the signal so that a period correspond to a integer number of time step
nHarmo : number of harmonics to return
kind : module, real, imaginary part, as a function (abs, np.imag, np.real ...)
phase : phase shift (for instance to extract in-phase with cos or sin)
"""
if (type(se) == pd.DataFrame) :
if len(se.columns) == 1 : se = se.iloc[:,0]
nWin = int(0.5 + n*T / dx(se) )
#ReSample to get round number of time step per period
if preSample :
new = reSample( se, dt = n*T / (nWin) )
else :
new = se
signal = new.values[:]
#Allocate results
res = np.zeros( (new.shape[0] , nHarmo ) )
for iWin in range(new.shape[0] - nWin) :
sig = signal[ iWin : iWin+nWin ] #windows
fft = np.fft.fft( sig ) #FTT
if phase !=None : #Phase shift
fft *= np.exp( 1j* ( 2*pi*(iWin*1./nWin) + phase ))
fftp = kind( fft ) #Take module, real or imaginary part
spectre = 2*fftp/(nWin) #Scale
for ih in range(nHarmo):
res[iWin, ih] = spectre[ih*n]
if ih == 0 : res[iWin, ih] /= 2.0
#if ih == 0 : res[iWin, ih] = 2.0
return pd.DataFrame( data = res , index = new.index , columns = map( lambda x : "Harmo {:} ({:})".format(x , se.name ) , range(nHarmo) ) )
def deriv(df, n=1) :
""" Deriv a signal through finite difference
"""
from signalTreatment import der , der2
deriv = []
if n == 1 :
for iSig in range(df.shape[1]) :
deriv.append( der( df.values[:,iSig] , df.index ) )
elif n == 2 :
for iSig in range(df.shape[1]) :
deriv.append( der2( df.values[:,iSig] , df.index[:] ) )
return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "Deriv("+ x +")" for x in df.columns ] )
def testPSD(display = True) :
""" Read a signal, compute PSD and compare standard deviation to m0 """
df = read( r'../../testData/motion.dat' )
RsSig = np.std( df.values[:,0] ) * 4
print ("Rs from sigma " , RsSig)
psd = getPSD(df )
RsM0 = (np.sum(psd.values[:,0])* dx(psd) ) **0.5 * 4.004
print ("Rs from m0 ",RsM0)
psd = psd[0.1 : 2.0]
if display :
df.plot( )
psd.plot()
plt.show()
def GetSn(y, range_ff=[0.25, 0.5], method='mean'):
"""
Estimate noise power through the power spectral density over the range of large frequencies
Parameters
----------
y : array, shape (T,)
One dimensional array containing the fluorescence intensities with
one entry per time-bin.
range_ff : (1,2) array, nonnegative, max value <= 0.5
range of frequency (x Nyquist rate) over which the spectrum is averaged
method : string, optional, default 'mean'
method of averaging: Mean, median, exponentiated mean of logvalues
Returns
-------
sn : noise standard deviation
"""
ff, Pxx = scipy.signal.welch(y)
ind1 = ff > range_ff[0]
ind2 = ff < range_ff[1]
ind = np.logical_and(ind1, ind2)
Pxx_ind = Pxx[ind]
sn = {
'mean': lambda Pxx_ind: np.sqrt(np.mean(Pxx_ind / 2)),
'median': lambda Pxx_ind: np.sqrt(np.median(Pxx_ind / 2)),
'logmexp': lambda Pxx_ind: np.sqrt(np.exp(np.mean(np.log(Pxx_ind / 2))))
}[method](Pxx_ind)
return sn
def discount_cumsum(x, discount):
# See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering
# Here, we have y[t] - discount*y[t+1] = x[t]
# or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t]
return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
def filter_records_fir(coeffs,
num_taps, # ignored
decim_factor,
recs,
record_length, # ignored (uses shape of recs)
num_records, # ignored (uses shape of recs)
result):
# split out a, b coefficients
b = coeffs[:len(coeffs)//2]
a = coeffs[len(coeffs)//2:]
filtered_signal = scipy.signal.lfilter(b, a, recs)
result[:] = np.copy(filtered_signal[:, ::decim_factor], order="C")
def filter_records_iir(coeffs,
order, # ignored
recs,
record_length, # ignored (uses shape of recs)
num_records, # ignored (uses shape of recs)
result):
# split out a, b coefficients
b = coeffs[:len(coeffs)//2]
a = coeffs[len(coeffs)//2:]
result[:] = scipy.signal.lfilter(b, a, recs)