def _project(reference_sources, estimated_source, flen):
"""Least-squares projection of estimated source on the subspace spanned by
delayed versions of reference sources, with delays between 0 and flen-1
"""
nsrc = reference_sources.shape[0]
nsampl = reference_sources.shape[1]
# computing coefficients of least squares problem via FFT ##
# zero padding and FFT of input data
reference_sources = np.hstack((reference_sources,
np.zeros((nsrc, flen - 1))))
estimated_source = np.hstack((estimated_source, np.zeros(flen - 1)))
n_fft = int(2**np.ceil(np.log2(nsampl + flen - 1.)))
sf = scipy.fftpack.fft(reference_sources, n=n_fft, axis=1)
sef = scipy.fftpack.fft(estimated_source, n=n_fft)
# inner products between delayed versions of reference_sources
G = np.zeros((nsrc * flen, nsrc * flen))
for i in range(nsrc):
for j in range(nsrc):
ssf = sf[i] * np.conj(sf[j])
ssf = np.real(scipy.fftpack.ifft(ssf))
ss = toeplitz(np.hstack((ssf[0], ssf[-1:-flen:-1])),
r=ssf[:flen])
G[i * flen: (i+1) * flen, j * flen: (j+1) * flen] = ss
G[j * flen: (j+1) * flen, i * flen: (i+1) * flen] = ss.T
# inner products between estimated_source and delayed versions of
# reference_sources
D = np.zeros(nsrc * flen)
for i in range(nsrc):
ssef = sf[i] * np.conj(sef)
ssef = np.real(scipy.fftpack.ifft(ssef))
D[i * flen: (i+1) * flen] = np.hstack((ssef[0], ssef[-1:-flen:-1]))
# Computing projection
# Distortion filters
try:
C = np.linalg.solve(G, D).reshape(flen, nsrc, order='F')
except np.linalg.linalg.LinAlgError:
C = np.linalg.lstsq(G, D)[0].reshape(flen, nsrc, order='F')
# Filtering
sproj = np.zeros(nsampl + flen - 1)
for i in range(nsrc):
sproj += fftconvolve(C[:, i], reference_sources[i])[:nsampl + flen - 1]
return sproj
python类fftconvolve()的实例源码
def MacroBroad(data, vmacro, extend=True):
"""
This broadens the data by a given macroturbulent velocity.
It works for small wavelength ranges. I need to make a better
version that is accurate for large wavelength ranges! Sorry
for the terrible variable names, it was copied from
convol.pro in AnalyseBstar (Karolien Lefever)
Parameters:
===========
-data: kglib.utils.DataStructures.xypoint instance
Stores the data to be broadened. The data MUST
be equally-spaced before calling this!
-vmacro: float
The macroturbulent velocity, in km/s
-extend: boolean
If true, the y-axis will be extended to avoid edge-effects
Returns:
========
A broadened version of data.
"""
# Make the kernel
c = constants.c.cgs.value * units.cm.to(units.km)
sq_pi = np.sqrt(np.pi)
lambda0 = np.median(data.x)
xspacing = data.x[1] - data.x[0]
mr = vmacro * lambda0 / c
ccr = 2 / (sq_pi * mr)
px = np.arange(-data.size() / 2, data.size() / 2 + 1) * xspacing
pxmr = abs(px) / mr
profile = ccr * (np.exp(-pxmr ** 2) + sq_pi * pxmr * (erf(pxmr) - 1.0))
# Extend the xy axes to avoid edge-effects, if desired
if extend:
before = data.y[-profile.size / 2 + 1:]
after = data.y[:profile.size / 2]
extended = np.r_[before, data.y, after]
first = data.x[0] - float(int(profile.size / 2.0 + 0.5)) * xspacing
last = data.x[-1] + float(int(profile.size / 2.0 + 0.5)) * xspacing
x2 = np.linspace(first, last, extended.size)
conv_mode = "valid"
else:
extended = data.y.copy()
x2 = data.x.copy()
conv_mode = "same"
# Do the convolution
newdata = data.copy()
newdata.y = fftconvolve(extended, profile / profile.sum(), mode=conv_mode)
return newdata
def _compute_smoothed_histogram(values, bandwidth, coord_range,
logtrans=False):
"""Approximate 1-D density estimation.
Estimate 1-D probability densities at evenly-spaced grid points,
for specified data. This method is based on creating a 1-D histogram of
data points quantised with respect to evenly-spaced grid points.
Probability densities are then estimated at the grid points by convolving
the obtained histogram with a Gaussian kernel.
Parameters
----------
values : np.array (N,)
A vector containing the data for which to perform density estimation.
Successive data points are indexed by the first axis in the array.
bandwidth : float
The desired KDE bandwidth. (When log-transformation
of data is desired, bandwidth should be specified in log-space.)
coord_range: (2,)
Minimum and maximum values of coordinate on which to evaluate the
smoothed histogram.
logtrans : boolean
Whether or not to log-transform the data before performing density
estimation.
Returns
-------
np.array (M-1,)
An array of estimated probability densities at specified grid points.
"""
if logtrans:
ber = [np.log10(extreme) for extreme in coord_range]
bin_edges = np.logspace(*ber, num=DENSITY_N + 1)
bin_edge_range = ber[1] - ber[0]
else:
bin_edges = np.linspace(*coord_range, num=DENSITY_N + 1)
bin_edge_range = coord_range[1] - coord_range[0]
if values.size < 2:
# Return zeros if there are too few points to do anything useful.
return bin_edges[:-1], np.zeros(bin_edges.shape[0] - 1)
# Bin the values
H = np.histogram(values, bin_edges)[0]
relative_bw = bandwidth / bin_edge_range
K = _compute_gaussian_kernel(H.shape, relative_bw)
pdf = signal.fftconvolve(H, K, mode='same')
# Return lower edges of bins and normalized pdf
return bin_edges[:-1], pdf / np.trapz(pdf, bin_edges[:-1])
def _compute_smoothed_histogram2d(values,
bandwidth,
coord_ranges,
logtrans=False):
"""Approximate 2-D density estimation.
Estimate 2-D probability densities at evenly-spaced grid points,
for specified data. This method is based on creating a 2-D histogram of
data points quantised with respect to evenly-spaced grid points.
Probability densities are then estimated at the grid points by convolving
the obtained histogram with a Gaussian kernel.
Parameters
----------
values : np.array (N,2)
A 2-D array containing the data for which to perform density
estimation. Successive data points are indexed by the first axis in the
array. The second axis indexes x and y coordinates of data points
(values[:,0] and values[:,1] respectively).
bandwidth : array-like (2,)
The desired KDE bandwidths for x and y axes. (When log-transformation
of data is desired, bandwidths should be specified in log-space.)
coord_range: (2,2)
Minimum and maximum values of coordinates on which to evaluate the
smoothed histogram.
logtrans : array-like (2,)
A 2-element boolean array specifying whether or not to log-transform
the x or y coordinates of the data before performing density
estimation.
Returns
-------
np.array (M-1, M-1)
An array of estimated probability densities at specified grid points.
"""
bin_edges = []
bedge_range = []
for minmax, lt in zip(coord_ranges, logtrans):
if lt:
ber = [np.log10(extreme) for extreme in minmax]
bin_edges.append(np.logspace(*ber, num=DENSITY_N + 1))
bedge_range.append(ber[1] - ber[0])
else:
bin_edges.append(np.linspace(*minmax, num=DENSITY_N + 1))
bedge_range.append(minmax[1] - minmax[0])
# Bin the observations
H = np.histogram2d(values[:, 0], values[:, 1], bins=bin_edges)[0]
relative_bw = [bw / berange for bw, berange in zip(bandwidth, bedge_range)]
K = _compute_gaussian_kernel(H.shape, relative_bw)
pdf = signal.fftconvolve(H.T, K, mode='same')
# Normalize pdf
bin_centers = [edges[:-1] + np.diff(edges) / 2. for edges in bin_edges]
pdf /= np.trapz(np.trapz(pdf, bin_centers[1]), bin_centers[0])
# Return lower bin edges and density
return bin_edges[0][:-1], bin_edges[1][:-1], pdf
def conv(data, kernel, mode='full', method='fft', use_jit=True):
"""Convoles data with a kernel using either FFT or sparse convolution
This function convolves data with a kernel, relying either on the
fast Fourier transform (FFT) or a sparse convolution function.
Parameters
----------
data : array_like
First input, typically the data array
kernel : array_like
Second input, typically the kernel
mode : str {'full', 'valid', 'same'}, optional, default: 'full'
A string indicating the size of the output:
- ``full``:
The output is the full discrete linear convolution of the inputs.
- ``valid``:
The output consists only of those elements that do not rely on
zero-padding.
- ``same``:
The output is the same size as `data`, centered with respect to the
'full' output.
method : str {'fft', 'sparse'}, optional, default: 'fft'
A string indicating the convolution method:
- ``fft``:
Use the fast Fourier transform (FFT).
- ``sparse``:
Use the sparse convolution.
use_jit : bool, optional, default: True
A flag indicating whether to use Numba's just-in-time compilation
option (only relevant for `method`=='sparse').
"""
if method.lower() == 'fft':
# Use FFT: faster on non-sparse data
conved = sps.fftconvolve(data, kernel, mode)
elif method.lower() == 'sparse':
# Use sparseconv: faster on sparse data
conved = sparseconv(data, kernel, mode, use_jit)
else:
raise ValueError("Acceptable methods are: 'fft', 'sparse'.")
return conved
def gen_speech_at_mic_stft(phi_ks, source_signals, mic_array_coord, noise_power, fs, fft_size=1024):
"""
generate microphone signals with short time Fourier transform
:param phi_ks: azimuth of the acoustic sources
:param source_signals: speech signals for each arrival angle, one per row
:param mic_array_coord: x and y coordinates of the microphone array
:param noise_power: the variance of the microphone noise signal
:param fs: sampling frequency
:param fft_size: number of FFT bins
:return: y_hat_stft: received (complex) signal at microphones
y_hat_stft_noiseless: the noiseless received (complex) signal at microphones
"""
frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames
K = source_signals.shape[0] # number of point sources
num_mic = mic_array_coord.shape[1] # number of microphones
# Generate the impulse responses for the array and source directions
impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'),
mic_array_coord, fs)
# Now generate all the microphone signals
y = np.zeros((num_mic, source_signals.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32)
for src in xrange(K):
for mic in xrange(num_mic):
y[mic] += fftconvolve(impulse_response[src, mic], source_signals[src])
# Now do the short time Fourier transform
# The resulting signal is M x fft_size/2+1 x number of frames
y_hat_stft_noiseless = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y]) / np.sqrt(fft_size)
# Add noise to the signals
y_noisy = y + np.sqrt(noise_power) * np.array(np.random.randn(*y.shape), dtype=np.float32)
# compute sources stft
source_stft = \
np.array([pra.stft(s_loop, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for s_loop in source_signals]) / np.sqrt(fft_size)
y_hat_stft = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y_noisy]) / np.sqrt(fft_size)
return y_hat_stft, y_hat_stft_noiseless, source_stft
def gen_sig_at_mic_stft(phi_ks, alpha_ks, mic_array_coord, SNR, fs, fft_size=1024, Ns=256):
"""
generate microphone signals with short time Fourier transform
:param phi_ks: azimuth of the acoustic sources
:param alpha_ks: power of the sources
:param mic_array_coord: x and y coordinates of the microphone array
:param SNR: signal to noise ratio at the microphone
:param fs: sampling frequency
:param fft_size: number of FFT bins
:param Ns: number of time snapshots used to estimate covariance matrix
:return: y_hat_stft: received (complex) signal at microphones
y_hat_stft_noiseless: the noiseless received (complex) signal at microphones
"""
frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames
K = alpha_ks.shape[0] # number of point sources
num_mic = mic_array_coord.shape[1] # number of microphones
# Generate the impulse responses for the array and source directions
impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'),
mic_array_coord, fs)
# Now generate some noise
# source_signal = np.random.randn(K, Ns * fft_size) * np.sqrt(alpha_ks[:, np.newaxis])
source_signal = np.random.randn(K, fft_size + (Ns - 1) * frame_shift_step) * \
np.sqrt(np.reshape(alpha_ks, (-1, 1), order='F'))
# Now generate all the microphone signals
y = np.zeros((num_mic, source_signal.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32)
for src in xrange(K):
for mic in xrange(num_mic):
y[mic] += fftconvolve(impulse_response[src, mic], source_signal[src])
# Now do the short time Fourier transform
# The resulting signal is M x fft_size/2+1 x number of frames
y_hat_stft_noiseless = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y]) / np.sqrt(fft_size)
# compute noise variance based on SNR
signal_energy = linalg.norm(y_hat_stft_noiseless.flatten()) ** 2
noise_energy = signal_energy / 10 ** (SNR * 0.1)
sigma2_noise = noise_energy / y_hat_stft_noiseless.size
# Add noise to the signals
y_noisy = y + np.sqrt(sigma2_noise) * np.array(np.random.randn(*y.shape), dtype=np.float32)
y_hat_stft = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y_noisy]) / np.sqrt(fft_size)
return y_hat_stft, y_hat_stft_noiseless