def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
python类convolve()的实例源码
def lsf_to_lpc(all_lsf):
if len(all_lsf.shape) < 2:
all_lsf = all_lsf[None]
order = all_lsf.shape[1]
all_lpc = np.zeros((len(all_lsf), order + 1))
for i in range(len(all_lsf)):
lsf = all_lsf[i]
zeros = np.exp(1j * lsf)
sum_zeros = zeros[::2]
diff_zeros = zeros[1::2]
sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros)))
diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros)))
sum_filt = np.poly(sum_zeros)
diff_filt = np.poly(diff_zeros)
if order % 2 != 0:
deconv_diff = sg.convolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff = sg.convolve(diff_filt, [1, -1])
deconv_sum = sg.convolve(sum_filt, [1, 1])
lpc = .5 * (deconv_sum + deconv_diff)
# Last coefficient is 0 and not returned
all_lpc[i] = lpc[:-1]
return np.squeeze(all_lpc)
def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
def lsf_to_lpc(all_lsf):
if len(all_lsf.shape) < 2:
all_lsf = all_lsf[None]
order = all_lsf.shape[1]
all_lpc = np.zeros((len(all_lsf), order + 1))
for i in range(len(all_lsf)):
lsf = all_lsf[i]
zeros = np.exp(1j * lsf)
sum_zeros = zeros[::2]
diff_zeros = zeros[1::2]
sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros)))
diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros)))
sum_filt = np.poly(sum_zeros)
diff_filt = np.poly(diff_zeros)
if order % 2 != 0:
deconv_diff = sg.convolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff = sg.convolve(diff_filt, [1, -1])
deconv_sum = sg.convolve(sum_filt, [1, 1])
lpc = .5 * (deconv_sum + deconv_diff)
# Last coefficient is 0 and not returned
all_lpc[i] = lpc[:-1]
return np.squeeze(all_lpc)
def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
def lsf_to_lpc(all_lsf):
if len(all_lsf.shape) < 2:
all_lsf = all_lsf[None]
order = all_lsf.shape[1]
all_lpc = np.zeros((len(all_lsf), order + 1))
for i in range(len(all_lsf)):
lsf = all_lsf[i]
zeros = np.exp(1j * lsf)
sum_zeros = zeros[::2]
diff_zeros = zeros[1::2]
sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros)))
diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros)))
sum_filt = np.poly(sum_zeros)
diff_filt = np.poly(diff_zeros)
if order % 2 != 0:
deconv_diff = sg.convolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff = sg.convolve(diff_filt, [1, -1])
deconv_sum = sg.convolve(sum_filt, [1, 1])
lpc = .5 * (deconv_sum + deconv_diff)
# Last coefficient is 0 and not returned
all_lpc[i] = lpc[:-1]
return np.squeeze(all_lpc)
def _fprime(ds, zi, Xi=None, sample_weights=None, reg=None, return_func=False):
"""np.dot(D.T, X[i] - np.dot(D, zi)) + reg
Parameters
----------
ds : array, shape (n_atoms, n_times_atom)
The atoms
zi : array, shape (n_atoms * n_times_valid)
The activations
Xi : array, shape (n_times, ) or None
The data array for one trial
sample_weights : array, shape (n_times, ) or None
The sample weights for one trial
reg : float or None
The regularization constant
return_func : boolean
Returns also the objective function, used to speed up LBFGS solver
Returns
-------
(func) : float
The objective function
grad : array, shape (n_atoms * n_times_valid)
The gradient
"""
n_atoms, n_times_atom = ds.shape
zi_reshaped = zi.reshape((n_atoms, -1))
Dzi = _choose_convolve(zi_reshaped, ds)
if Xi is not None:
Dzi -= Xi
if sample_weights is not None:
if return_func:
# preserve Dzi, we don't want to apply the weights twice in func
wDzi = sample_weights * Dzi
else:
Dzi *= sample_weights
wDzi = Dzi
else:
wDzi = Dzi
if return_func:
func = 0.5 * np.dot(wDzi, Dzi.T)
if reg is not None:
func += reg * zi.sum()
# Now do the dot product with the transpose of D (D.T) which is
# the conv by the reversed filter (keeping valid mode)
grad = np.concatenate(
[signal.convolve(wDzi, d[::-1], 'valid') for d in ds])
# grad = -np.dot(D.T, X[i] - np.dot(D, zi))
if reg is not None:
grad += reg
if return_func:
return func, grad
else:
return grad
def getWeights(X1D):
d = len(X1D)
''' puts the X values in their respective dimensions to use bsxfun for
evaluation'''
Xreshape = []
Xreshape.append(X1D[0].reshape(-1,1))
if d >= 2:
Xreshape.append(X1D[1].reshape([1,-1]))
for idx in range (2,d):
dims = [1]*idx
dims.append(-1)
Xreshape.append(reshape(X1D[idx], dims))
# to find and handle singleton dimensions
sizes = [X1D[ix].size for ix in range(0,d)]
Xlength = array(sizes)
''' calculate weights '''
#1) calculate mass/volume for each cuboid
weight = 1
for idx in range(0,d):
if Xlength[idx] > 1:
if idx > 1:
weight = multiply(weight[...,newaxis],diff(Xreshape[idx], axis=idx))
else:
weight = multiply(weight,diff(Xreshape[idx], axis=idx))
else:
weight = weight[...,newaxis]
#2) sum to get weight for each point
if d > 1:
dims = tile(2,[1,d])
dims[0,where(Xlength == 1)] = 1
d = sum(Xlength > 1)
weight = (2.**(-d))*convn(weight, ones(dims.ravel()), mode='full')
else:
weight = (2.**(-1))*convolve(weight, array([[1],[1]]))
return weight
def compute_moving_window_int(sample: np.ndarray,
fs: float,
blackman_win_length: int,
filter_length: int = 257,
delta: float = .02) -> np.ndarray:
"""
:param sample: ecg sample array
:param fs: sampling frequency
:param blackman_win_length: length of the blackman window on which to compute the moving window integration
:param filter_length: length of the FIR bandpass filter on which filtering is done on ecg sample array
:param delta: to compute the weights of each band in FIR filter
:return: the Moving window integration of the sample array
"""
# I believe these constants can be kept in a file
# filter edges
filter_edges = [0, 4.5 * 2 / fs, 5 * 2 / fs, 20 * 2 / fs, 20.5 * 2 / fs, 1]
# gains at filter band edges
gains = [0, 0, 1, 1, 0, 0]
# weights
weights = [500 / delta, 1 / delta, 500 / delta]
# length of the FIR filter
# FIR filter coefficients for bandpass filtering
filter_coeff = signal.firls(filter_length, filter_edges, gains, weights)
# bandpass filtered signal
bandpass_signal = signal.convolve(sample, filter_coeff, 'same')
bandpass_signal /= np.percentile(bandpass_signal, 90)
# derivative array
derivative_array = (np.array([-1.0, -2.0, 0, 2.0, 1.0])) * (1 / 8)
# derivative signal (differentiation of the bandpass)
derivative_signal = signal.convolve(bandpass_signal, derivative_array, 'same')
derivative_signal /= np.percentile(derivative_signal, 90)
# squared derivative signal
derivative_squared_signal = derivative_signal ** 2
derivative_squared_signal /= np.percentile(derivative_squared_signal, 90)
# blackman window
blackman_window = np.blackman(blackman_win_length)
# moving window Integration of squared derivative signal
mov_win_int_signal = signal.convolve(derivative_squared_signal, blackman_window, 'same')
mov_win_int_signal /= np.percentile(mov_win_int_signal, 90)
return mov_win_int_signal
def symbol_recovery_24(xdi, xdq):
angles = numpy.where(xdi >= 0, numpy.arctan2(xdq, xdi), numpy.arctan2(-xdq, -xdi))
theta = (signal.convolve(angles, smooth)) [-len(xdi):]
xr = (xdi + 1j * xdq) * numpy.exp(-1j * theta)
bi = (numpy.real(xr) >= 0) + 0
# pll parameters
period = 24
halfPeriod = period / 2
corr = period / 24.
phase = 0
res = []
pin = 0
stats = {0: 0, 1: 1}
oddity = 0
latestXrSquared = [0]*8
lxsIndex = 0
theta = [0]
shift = 0
# pll, system model, error calculation, estimate update
for i in range(1, len(bi)):
if bi[i-1] != bi[i]:
if phase < halfPeriod-2:
phase += corr
elif phase > halfPeriod+2:
phase -= corr
if phase >= period:
phase -= period
latestXrSquared[lxsIndex] = (xdi[i] + 1j * xdq[i])**2
lxsIndex += 1
if lxsIndex >= len(latestXrSquared):
lxsIndex = 0
th = shift + cmath.phase(sum(latestXrSquared)) / 2
if abs(th - theta[-1]) > 2:
if th < theta[-1]:
shift += math.pi
th += math.pi
else:
shift -= math.pi
th -= math.pi
theta.append(th)
oddity += 1
if oddity == 2:
oddity = 0
yp = (xdi[i] + 1j * xdq[i])
ypp = cmath.exp(-1j * th) * yp
# bit decode
nin = 1 * (ypp.real > 0)
stats[nin] += 1
res.append(pin ^ nin)
pin = nin
phase += 1
return res
def demodulate_array(h, soft_lcd):
# primary worker function, credit to all
i = h[1:] * np.conj(h[:-1])
j = np.angle(i)
k = signal.convolve(j, basebandBP)
# resample from 256kHz to 228kHz
rdsBand = signal.resample(k, int(len(k)*228e3/256e3))
# length modulo 4
rdsBand = rdsBand[:(len(rdsBand)//4)*4]
c57 = numpy.tile( [1., -1.], len(rdsBand)//4 )
xi = rdsBand[::2] * c57
xq = rdsBand[1::2] * (-c57)
xfi = signal.convolve(xi, filtLP)
xfq = signal.convolve(xq, filtLP)
xsfi = signal.convolve(xfi, pulseFilt)
xsfq = signal.convolve(xfq, pulseFilt)
if len(xsfi) % 2 == 1:
xsfi = xsfi[:-1]
xsfq = xsfq[:-1]
xdi = (xsfi[::2] + xsfi[1::2]) / 2
xdq = xsfq[::2]
res = symbol_recovery_24(xdi, xdq)
hits = []
for i in range(len(res)-26):
h = rds_crc(res, i, 26)
if h:
hits.append( (i, h) )
print(res,hits)
packets = []
print([decode_one(res, x[0]) for x in hits if x[1] == 'A'])
for i in range(len(hits)-3):
if hits[i][1] == "A":
bogus = False
for j,sp in enumerate("ABCD"):
if 26*j != hits[i+j][0] - hits[i][0]:
bogus = True
if hits[i+j][1] != sp:
bogus = True
if not bogus:
for j in range(4):
packets.append(decode_one(res, hits[i+j][0]))
soft_lcd.update_state(packets)