def __spectrogram(self,data,samp_rate,window,per_lap,wlen,mult):
samp_rate = float(samp_rate)
if not wlen:
wlen = samp_rate/100.0
npts=len(data)
nfft = int(self.__nearest_pow_2(wlen * samp_rate))
if nfft > npts:
nfft = int(self.__nearest_pow_2(npts / 8.0))
if mult is not None:
mult = int(self.__nearest_pow_2(mult))
mult = mult * nfft
nlap = int(nfft * float(per_lap))
end = npts / samp_rate
window = signal.get_window(window,nfft)
specgram, freq, time = mlab.specgram(data, Fs=samp_rate,window=window,NFFT=nfft,
pad_to=mult, noverlap=nlap)
return specgram,freq,time
# Sort data by experimental conditions and plot spectrogram for analog signals (e.g. LFP)
python类get_window()的实例源码
def cw(fc, duration, fs, window=None):
"""Generate a sinusoidal pulse.
:param fc: frequency of the pulse in Hz
:param duration: duration of the pulse in s
:param fs: sampling rate in Hz
:param window: window function to use (``None`` means rectangular window)
For supported window functions, see documentation for :func:`scipy.signal.get_window`.
>>> import arlpy
>>> x1 = arlpy.signal.cw(fc=27000, duration=0.5, fs=250000)
>>> x2 = arlpy.signal.cw(fc=27000, duration=0.5, fs=250000, window='hamming')
>>> x3 = arlpy.signal.cw(fc=27000, duration=0.5, fs=250000, window=('kaiser', 4.0))
"""
n = int(round(duration*fs))
x = _np.sin(2*_np.pi*fc*time(n, fs))
if window is not None:
w = _sig.get_window(window, n)
x *= w
return x
def sweep(f1, f2, duration, fs, method='linear', window=None):
"""Generate frequency modulated sweep.
:param f1: starting frequency in Hz
:param f2: ending frequency in Hz
:param duration: duration of the pulse in s
:param fs: sampling rate in Hz
:param method: type of sweep (``'linear'``, ``'quadratic'``, ``'logarithmic'``, ``'hyperbolic'``)
:param window: window function to use (``None`` means rectangular window)
For supported window functions, see documentation for :func:`scipy.signal.get_window`.
>>> import arlpy
>>> x1 = arlpy.signal.sweep(20000, 30000, duration=0.5, fs=250000)
>>> x2 = arlpy.signal.cw(20000, 30000, duration=0.5, fs=250000, window='hamming')
>>> x2 = arlpy.signal.cw(20000, 30000, duration=0.5, fs=250000, window=('kaiser', 4.0))
"""
n = int(round(duration*fs))
x = _sig.chirp(time(n, fs), f1, duration, f2, method)
if window is not None:
w = _sig.get_window(window, n)
x *= w
return x
def probs_to_classes(self, probabilities):
"""Takes a likelihood matrix produced by predict_proba and returns
the classification for each entry
Naive argmax returns a very noisy signal - windowing helps focus on
strongly matching areas.
"""
smooth_time = self.params.get('smooth_time', 0.1)
dt = self.active_song.time[1] - self.active_song.time[0]
windowsize = np.round(smooth_time / dt)
window = signal.get_window('hamming', int(windowsize))
window /= np.sum(window)
num_classes = probabilities.shape[0]
smooth_prbs = [np.convolve(
probabilities[i, :], window, mode='same') for i in range(num_classes)]
return np.argmax(np.stack(smooth_prbs, axis=0), axis=0)
window.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _prep_window(self, **kwargs):
""" provide validation for our window type, return the window """
window = self._get_window()
if isinstance(window, (list, tuple, np.ndarray)):
return com._asarray_tuplesafe(window).astype(float)
elif com.is_integer(window):
try:
import scipy.signal as sig
except ImportError:
raise ImportError('Please install scipy to generate window '
'weight')
# the below may pop from kwargs
win_type = _validate_win_type(self.win_type, kwargs)
return sig.get_window(win_type, window).astype(float)
raise ValueError('Invalid window %s' % str(window))
def get_window(window, frame_length, periodic=True):
''' Cached version of scipy.signal.get_window '''
# Funtion
if hasattr(window, '__call__'):
return window(frame_length)
# Window name or scalar
elif (isinstance(window, (six.string_types, tuple)) or
np.isscalar(window)):
return signal.get_window(window, frame_length, fftbins=periodic)
# Predefined-array
elif isinstance(window, (np.ndarray, list)):
if len(window) == frame_length:
return np.asarray(window)
raise ValueError('Window size mismatch: '
'{:d} != {:d}'.format(len(window), frame_length))
# Unknown
else:
raise ValueError('Invalid window specification: %s' % str(window))
# ===========================================================================
# Array utils
# ===========================================================================
def __spectrogram(self,data,samp_rate,window,per_lap,wlen,mult):
samp_rate = float(samp_rate)
if not wlen:
wlen = samp_rate/100.0
npts=len(data)
nfft = int(self.__nearest_pow_2(wlen * samp_rate))
if nfft > npts:
nfft = int(self.__nearest_pow_2(npts / 8.0))
if mult is not None:
mult = int(self.__nearest_pow_2(mult))
mult = mult * nfft
nlap = int(nfft * float(per_lap))
end = npts / samp_rate
window = signal.get_window(window,nfft)
specgram, freq, time = mlab.specgram(data, Fs=samp_rate,window=window,NFFT=nfft,
pad_to=mult, noverlap=nlap)
return specgram,freq,time
# Sort data by experimental conditions and plot spectrogram for analog signals (e.g. LFP)
def blackman_tukey(x,
M,
L,
y=None,
window='boxcar',
window_args=[],
d=1,
full=False):
"""
Compute the Blackman-Tukey cross power spectral density (PSD)
estimate between the time-domain signals *x* and *y* (must be the
same length as *x*). If *y* is not given, compute the power
spectral density estimate of *x*. Use the spectral window with
identifier *window* (see the options in
:func:scipy.`signal.get_window`, e.g., a tuple can be used to pass
arguments to the window function) and length *M* (i.e., the
maximum auto-correlation lag to include in the estimate). Compute
the estimate at *L* uniformly spaced frequency samples where *d*
is the time domain sample interval. If not *full*, return the
tuple containing the length *L* PSD estimate and length *L*
corresponding frequencies. If *full*, also return the estimated
cross correlation and window function (i.e., a tuple with four
elements).
"""
N = len(x)
assert M <= N
if y is None:
y = x
else:
assert len(y) == N
Rxy = scipy.signal.correlate(x, y) / N
Rxy_window = Rxy[(N - 1) - M:(N - 1) + M + 1]
window = scipy.signal.get_window(window, 2*M + 1, fftbins=False)
k_range = NP.arange(0, L)
shift = NP.exp(2j * NP.pi * k_range * M / L)
Sxy = NP.fft.fft(window * Rxy_window, n=L) * shift * d
f = NP.fft.fftfreq(L, d=d)
if full:
return (Sxy, f, Rxy, window)
else:
return (Sxy, f)
def blackman_tukey(x,
M,
L,
y=None,
window='boxcar',
window_args=[],
d=1,
full=False):
"""
Compute the Blackman-Tukey cross power spectral density (PSD)
estimate between the time-domain signals *x* and *y* (must be the
same length as *x*). If *y* is not given, compute the power
spectral density estimate of *x*. Use the spectral window with
identifier *window* (see the options in
:func:scipy.`signal.get_window`, e.g., a tuple can be used to pass
arguments to the window function) and length *M* (i.e., the
maximum auto-correlation lag to include in the estimate). Compute
the estimate at *L* uniformly spaced frequency samples where *d*
is the time domain sample interval. If not *full*, return the
tuple containing the length *L* PSD estimate and length *L*
corresponding frequencies. If *full*, also return the estimated
cross correlation and window function (i.e., a tuple with four
elements).
"""
N = len(x)
assert M <= N
if y is None:
y = x
else:
assert len(y) == N
Rxy = scipy.signal.correlate(x, y) / N
Rxy_window = Rxy[(N - 1) - M:(N - 1) + M + 1]
window = scipy.signal.get_window(window, 2*M + 1, fftbins=False)
k_range = NP.arange(0, L)
shift = NP.exp(2j * NP.pi * k_range * M / L)
Sxy = NP.fft.fft(window * Rxy_window, n=L) * shift * d
f = NP.fft.fftfreq(L, d=d)
if full:
return (Sxy, f, Rxy, window)
else:
return (Sxy, f)
def window(sw, window = 'barthann'):
"""
Creates 2d window of size sw
--------------------------------------------------------------------------
Usage:
Call: w = window(sw, window = 'barthann')
Input: sw size of window
window string specifying window type
Output: Window of size sw
--------------------------------------------------------------------------
Copyright (C) 2010 Michael Hirsch
"""
w1 = signal.get_window(window,sw[0])
w1 = (w1 + w1[::-1])/2
w1 -= w1.min()
w2 = signal.get_window(window,sw[1])
w2 = (w2 + w2[::-1])/2
w2 -= w2.min()
www = np.outer(w1,w2)
www = www/www.max()
www = np.maximum(www, 1e-16)
return www
def filter_window(self):
"""
:return: filter window
"""
window = sig.get_window(self.window, self.data_length, fftbins=False)
# empirical value for scaling flattop to sqrt(W)/V
window/=(np.sum(window)/2)
return window
def _test_window_function(self, torch_method, scipy_name):
for size in [1, 2, 5, 10, 50, 100, 1024, 2048]:
for periodic in [True, False]:
ref = torch.from_numpy(signal.get_window(scipy_name, size, fftbins=periodic))
self.assertEqual(torch_method(size, periodic=periodic), ref)
def istft(stft_matrix, frame_length, step_length=None,
window='hann', padding=False):
"""
Inverse short-time Fourier transform (ISTFT).
Converts a complex-valued spectrogram `stft_matrix` to time-series `y`
by minimizing the mean squared error between `stft_matrix` and STFT of
`y` as described in [1]_.
In general, window function, hop length and other parameters should be same
as in stft, which mostly leads to perfect reconstruction of a signal from
unmodified `stft_matrix`.
.. [1] D. W. Griffin and J. S. Lim,
"Signal estimation from modified short-time Fourier transform,"
IEEE Trans. ASSP, vol.32, no.2, pp.236–243, Apr. 1984.
Parameters
----------
stft_matrix : np.ndarray [shape=(1 + n_fft/2, t)]
STFT matrix from `stft`
frame_length: int
number of samples point for 1 frame
step_length: int
number of samples point for 1 step (when shifting the frames)
If unspecified, defaults `frame_length / 4`.
window : string, tuple, number, function, np.ndarray [shape=(n_fft,)]
- a window specification (string, tuple, or number);
see `scipy.signal.get_window`
- a window function, such as `scipy.signal.hanning`
- a user-specified window vector of length `n_fft`
padding: boolean
- If `True`, the signal `y` is padded so that frame
`D[:, t]` is centered at `y[t * step_length]`.
- If `False`, then `D[:, t]` begins at `y[t * step_length]`
Returns
-------
y : np.ndarray [shape=(n,), dtype=float32]
time domain signal reconstructed from `stft_matrix`
"""
# ====== check arguments ====== #
frame_length = int(frame_length)
if step_length is None:
step_length = frame_length // 4
else:
step_length = int(step_length)
nfft = 2 * (stft_matrix.shape[1] - 1)
# ====== use scipy here ====== #
try:
from scipy.signal.spectral import istft as _istft
except ImportError:
raise RuntimeError("`istft` requires scipy version >= 0.19")
return _istft(stft_matrix, fs=1.0, window=window,
nperseg=frame_length, noverlap=frame_length - step_length, nfft=nfft,
input_onesided=True, boundary=padding,
time_axis=0, freq_axis=-1)[-1]
def ispec(spec, frame_length, step_length=None, window="hann",
nb_iter=48, normalize=True, db=False, padding=False):
""" Invert power spectrogram back to raw waveform
Parameters
----------
spec : np.ndarray [shape=(t, n_fft / 2 + 1)]
magnitude, power, or DB spectrogram of STFT
frame_length: int
number of samples point for 1 frame
step_length: int
number of samples point for 1 step (when shifting the frames)
If unspecified, defaults `frame_length / 4`.
window : string, tuple, number, function, or np.ndarray [shape=(n_fft,)]
- a window specification (string, tuple, or number);
see `scipy.signal.get_window`
- a window function, such as `scipy.signal.hanning`
- a vector or array of length `n_fft`
nb_iter: int
number of iteration, the higher the better audio quality
db: bool
if the given spectrogram is in decibel (dB) units (used logarithm)
normalize: bool
normalize output raw signal to have mean=0., and std=1.
"""
spec = spec.astype('float64')
# ====== check arguments ====== #
frame_length = int(frame_length)
if step_length is None:
step_length = frame_length // 4
else:
step_length = int(step_length)
# ====== convert to power spectrogram ====== #
if db:
spec = db2power(spec)
# ====== iterative estmate best phase ====== #
X_best = copy.deepcopy(spec)
nfft = (spec.shape[1] - 1) * 2
for i in range(nb_iter):
X_t = istft(X_best, frame_length=frame_length, step_length=step_length,
window=window, padding=padding)
est = stft(X_t, frame_length=frame_length, step_length=step_length,
nfft=nfft, window=window, padding=padding, energy=False)
phase = est / np.maximum(1e-8, np.abs(est))
X_best = spec * phase
X_t = istft(X_best, frame_length=frame_length, step_length=step_length,
window=window, padding=padding)
y = np.real(X_t)
if normalize:
y = (y - y.mean()) / y.std()
return y
# ===========================================================================
# F0 analysis
# ===========================================================================