def homomorphic_envelope(x, fs=1000, f_LPF=8, order=3):
"""
Computes the homomorphic envelope of x
Args:
x : array
fs : float
Sampling frequency. Defaults to 1000 Hz
f_LPF : float
Lowpass frequency, has to be f_LPF < fs/2. Defaults to 8 Hz
Returns:
time : numpy array
"""
b, a = butter(order, 2 * f_LPF / fs, 'low')
he = np.exp(filtfilt(b, a, np.log(np.abs(hilbert(x)))))
return he
python类hilbert()的实例源码
def process(self, wave):
wave.check_mono()
if wave.sample_rate != self.sr:
raise Exception("Wrong sample rate")
n = int(np.ceil(2 * wave.num_frames / float(self.w_len)))
m = (n + 1) * self.w_len / 2
swindow = self.make_signal_window(n)
win_ratios = [self.window / swindow[t * self.w_len / 2 :
t * self.w_len / 2 + self.w_len]
for t in range(n)]
wave = wave.zero_pad(0, int(m - wave.num_frames))
wave = audio.Wave(signal.hilbert(wave), wave.sample_rate)
result = np.zeros((self.n_bins, n))
for b in range(self.n_bins):
w = self.widths[b]
wc = 1 / np.square(w + 1)
filter = self.filters[b]
band = fftfilt(filter, wave.zero_pad(0, int(2 * w))[:,0])
band = band[int(w) : int(w + m), np.newaxis]
for t in range(n):
frame = band[t * self.w_len / 2:
t * self.w_len / 2 + self.w_len,:] * win_ratios[t]
result[b, t] = wc * np.real(np.conj(np.dot(frame.conj().T, frame)))
return audio.Spectrogram(result, self.sr, self.w_len, self.w_len / 2)
def ams_extractor(x, sr, win_len, shift_len, order):
from scipy.signal import hilbert
envelope = np.abs(hilbert(x))
for i in range(order-1):
envelope = np.abs(hilbert(envelope))
envelope = envelope * 1./3.
frames = (len(envelope) - win_len) // shift_len
hanning_window = np.hanning(win_len)
ams_feature = np.zeros(shape=(15, frames))
wts = cal_triangle_window(0, sr//2, win_len, 15, 15.6, 400)
for i in range(frames):
one_frame = x[i*shift_len:i*shift_len+win_len]
one_frame = one_frame * hanning_window
frame_fft = np.abs(np.fft.fft(one_frame, win_len))
ams_feature[:,i] = np.matmul(wts, frame_fft)
return ams_feature
def ams_extractor(x, sr, win_len, shift_len, order=1, decimate_coef=1./4.):
from scipy.signal import hilbert
envelope = np.abs(hilbert(x))
for i in range(order-1):
envelope = np.abs(hilbert(envelope))
envelope = envelope * decimate_coef
frames = (len(envelope) - win_len) // shift_len
hanning_window = np.hanning(win_len)
ams_feature = np.zeros(shape=(15, frames))
wts = cal_triangle_window(0, sr//2, win_len, 15, 15.6, 400)
for i in range(frames):
one_frame = x[i*shift_len:i*shift_len+win_len]
one_frame = one_frame * hanning_window
frame_fft = np.abs(np.fft.fft(one_frame, win_len))
ams_feature[:,i] = np.matmul(wts, frame_fft)
return ams_feature
def __init__(self, idpac=(1, 1, 3), fpha=[2, 4], famp=[60, 200],
dcomplex='hilbert', filt='fir1', cycle=(3, 6), filtorder=3,
width=7, nbins=18, nblocks=2):
"""Check and initialize."""
# Initialize visualization methods :
PacPlot.__init__(self)
# ----------------- CHECKING -----------------
# Pac methods :
self._idcheck(idpac)
# Frequency checking :
self.fpha, self.famp = pac_vec(fpha, famp)
self.xvec, self.yvec = self.fpha.mean(1), self.famp.mean(1)
# Check spectral properties :
self._speccheck(filt, dcomplex, filtorder, cycle, width)
# ----------------- SELF -----------------
self.nbins, self.nblocks = int(nbins), int(nblocks)
phase_autocorr.py 文件源码
项目:leeds_seismology_programs
作者: georgetaylor3152
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def autocorr(trace):
"""This function takes an obspy trace object and performs a phase autocorrelation
of the trace with itself. First, the hilbert transform is taken to obtain the
analytic signal and hence the instantaneous phase. This is then passed to a
fortran script where phase correlation is performed after Schimmel et al., 2011.
This function relies on the shared object file phasecorr.so, which is the file
containing the fortran subroutine for phase correlation.
"""
import numpy as np
from scipy.signal import hilbert
from phasecorr import phasecorr
# Hilbert transform to obtain the analytic signal
htrans = hilbert(trace)
# Perform phase autocorrelation with instantaneous phase
pac = phasecorr(np.angle(htrans),np.angle(htrans),len(htrans))
return pac
phase_autocorr.py 文件源码
项目:leeds_seismology_programs
作者: georgetaylor3152
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def crosscorr(tr1,tr2):
"""This function takes 2 obspy traces and performs a phase crosscorrelation
of the traces. First, the hilbert transform is taken to obtain the
analytic signal and hence the instantaneous phase. This is then passed to a
fortran script where phase correlation is performed after Schimmel et al., 2011.
This function relies on the shared object file phasecorr.so, which is the file
containing the fortran subroutine for phase correlation.
"""
import numpy as np
from scipy.signal import hilbert
from phasecorr import phasecorr
# Hilbert transform to obtain the analytic signal
htrans1 = hilbert(tr1)
htrans2 = hilbert(tr2)
# Perform phase autocorrelation with instantaneous phase
pcc = phasecorr(np.angle(htrans1),np.angle(htrans2),len(htrans1))
return pcc
def InstantaneousPhase(syn, obs, nt, dt, eps=0.05):
# instantaneous phase
# (Bozdag et al 2011, eq 27)
r = _np.real(_analytic(syn))
i = _np.imag(_analytic(syn))
phi_syn = _np.arctan2(i,r)
r = _np.real(_analytic(obs))
i = _np.imag(_analytic(obs))
phi_obs = _np.arctan2(i,r)
phi_rsd = phi_syn - phi_obs
esyn = abs(_analytic(syn))
emax = max(esyn**2.)
wadj = phi_rsd*_np.imag(_analytic(syn))/(esyn**2. + eps*emax) + \
_np.imag(_analytic(phi_rsd * syn/(esyn**2. + eps*emax)))
return wadj
def AnalyticSignal(syn, obs, nt, dt, eps=0.):
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
esyn1 = esyn + eps*max(esyn)
eobs1 = eobs + eps*max(eobs)
esyn3 = esyn**3 + eps*max(esyn**3)
diff1 = syn/(esyn1) - obs/(eobs1)
diff2 = _hilbert(syn)/esyn1 - _hilbert(obs)/eobs1
part1 = diff1*_hilbert(syn)**2/esyn3 - diff2*syn*_hilbert(syn)/esyn3
part2 = diff1*syn*_hilbert(syn)/esyn3 - diff2*syn**2/esyn3
wadj = part1 + _hilbert(part2)
return wadj
def __hilbert(x):
"""
Hilbert transform to the power of 2
Args:
x (array): numpy array of data
Returns:
y (array): numpy array of Hilbert transformed x
(same size as x)
"""
l = x.size
pad = int(2**(np.floor(np.log2(l)) + 1))
y = signal.hilbert(x, N=pad)[:l]
return y
def process(self, wave, W):
wave.check_mono()
if wave.sample_rate != self.sr: raise Exception("Wrong sample rate")
n = int(np.ceil(2 * wave.num_frames / float(self.w_len)))
m = (n + 1) * self.w_len / 2
if n != W.shape[1]: raise Exception("Wrong size for W")
swindow = self.make_signal_window(n)
win_ratios = [self.window / swindow[t * self.w_len / 2 :
t * self.w_len / 2 + self.w_len]
for t in range(n)]
wave = wave.zero_pad(0, int((n + 1) * self.w_len / 2.0 - wave.num_frames))
wave = audio.Wave(signal.hilbert(wave), wave.sample_rate)
result = np.zeros(wave.shape)
for b in range(self.n_bins):
w = self.widths[b]
wc = 1/np.square(w + 1)
filter = 1/w * self.filters[b]
band = fftfilt(filter, wave.zero_pad(0, int(2 * w))[:,0])
band = band[int(w) : int(w + (n + 1) * self.w_len / 2), np.newaxis]
out_band = audio.Wave(np.zeros(band.shape, np.complex128), wave.sample_rate)
for t in range(n):
start = int(t * self.w_len / 2)
end = int(t * self.w_len / 2 + self.w_len)
frame = band[start:end,:] * win_ratios[t]**2
out_band[start:end,:] = out_band[start:end,:] + frame * W[b,t]
out_band = np.real(fftfilt(filter, out_band.zero_pad(0, int(2 * w))[:,0]))
result[:,0] = result[:,0] + self.weights[b] * out_band[int(w): int(w + m)]
return result
def _speccheck(self, filt=None, dcomplex=None, filtorder=None, cycle=None,
width=None):
"""Check spectral parameters."""
# Check the filter name :
if filt is not None:
if filt not in ['fir1', 'butter', 'bessel']:
raise ValueError("filt must either be 'fir1', 'butter' or "
"'bessel'")
else:
self._filt = filt
# Check cycle :
if cycle is not None:
cycle = np.asarray(cycle)
if (len(cycle) is not 2) or not cycle.dtype == int:
raise ValueError("Cycle must be a tuple of two integers.")
else:
self._cycle = cycle
# Check complex decomposition :
if dcomplex is not None:
if dcomplex not in ['hilbert', 'wavelet']:
raise ValueError("dcomplex must either be 'hilbert' or "
"'wavelet'.")
else:
self._dcomplex = dcomplex
# Convert filtorder :
if filtorder is not None:
self._filtorder = int(filtorder)
# Convert Morlet's width :
if width is not None:
self._width = int(width)
def _getTransform(sf, f, npts, method, wltWidth, *arg):
"""Return a fuction which contain a transformation
- 'hilbert'
- 'hilbert1'
- 'hilbert2'
- 'wavelet'
"""
# Get the design of the filter :
fDesign = _getFiltDesign(sf, f, npts, *arg)
# Hilbert method
if method == 'hilbert':
def hilb(x):
xH = np.zeros(x.shape)*1j
xF = fDesign(x)
for k in range(x.shape[1]):
xH[:, k] = hilbert(xF[:, k])
return xH
return hilb
# Hilbert method 1
elif method == 'hilbert1':
def hilb1(x): return hilbert(fDesign(x))
return hilb1
# Hilbert method 2
elif method == 'hilbert2':
def hilb2(x): return hilbert2(fDesign(x))
return hilb2
# Wavelet method
elif method == 'wavelet':
def wav(x): return morlet(x, sf, (f[0]+f[1])/2, wavelet_width=wltWidth)
return wav
# Filter the signal
elif method == 'filter':
def fm(x): return fDesign(x)
return fm
def test_envelope(self):
x = np.random.normal(0, 1, 1000)
self.assertArrayEqual(signal.envelope(x), np.abs(sp.hilbert(x)))
def envelope(x):
"""Generate a Hilbert envelope of the real signal x."""
return _np.abs(_sig.hilbert(x, axis=0))
def hilbert_transform(x):
"""
Computes modulus of the complex valued
hilbert transform of x
"""
return np.abs(hilbert(x))
def phase_amplitude(signals, phase=True, amplitude=True):
"""Extract instantaneous phase and amplitude with Hilbert transform"""
# one dimension array
if signals.ndim == 1:
signals = signals[None, :]
one_dim = True
elif signals.ndim == 2:
one_dim = False
else:
raise ValueError('Impossible to compute phase_amplitude with ndim ='
' %s.' % (signals.ndim, ))
n_epochs, n_points = signals.shape
n_fft = compute_n_fft(signals)
sig_phase = np.empty(signals.shape) if phase else None
sig_amplitude = np.empty(signals.shape) if amplitude else None
for i, sig in enumerate(signals):
sig_complex = hilbert(sig, n_fft)[:n_points]
if phase:
sig_phase[i] = np.angle(sig_complex)
if amplitude:
sig_amplitude[i] = np.abs(sig_complex)
# one dimension array
if one_dim:
if phase:
sig_phase = sig_phase[0]
if amplitude:
sig_amplitude = sig_amplitude[0]
return sig_phase, sig_amplitude
def crop_for_fast_hilbert(signals):
"""Crop the signal to have a good prime decomposition, for hilbert filter.
"""
if signals.ndim < 2:
tmax = signals.shape[0]
while prime_factors(tmax)[-1] > 20:
tmax -= 1
return signals[:tmax]
else:
tmax = signals.shape[1]
while prime_factors(tmax)[-1] > 20:
tmax -= 1
return signals[:, :tmax]
def _my_hilbert(x, n_fft=None, envelope=False):
""" Compute Hilbert transform of signals w/ zero padding.
Parameters
----------
x : array, shape (n_times)
The signal to convert
n_fft : int, length > x.shape[-1] | None
How much to pad the signal before Hilbert transform.
Note that signal will then be cut back to original length.
envelope : bool
Whether to compute amplitude of the hilbert transform in order
to return the signal envelope.
Returns
-------
out : array, shape (n_times)
The hilbert transform of the signal, or the envelope.
"""
from scipy.signal import hilbert
n_fft = x.shape[-1] if n_fft is None else n_fft
n_x = x.shape[-1]
out = hilbert(x, N=n_fft)[:n_x]
if envelope is True:
out = np.abs(out)
return out
def _compute_normalized_phase(data):
"""Compute normalized phase angles
Parameters
----------
data : ndarray, shape (n_epochs, n_sources, n_times)
The data to compute the phase angles for.
Returns
-------
phase_angles : ndarray, shape (n_epochs, n_sources, n_times)
The normalized phase angles.
"""
from scipy.signal import hilbert
return (np.angle(hilbert(data)) + np.pi) / (2 * np.pi)
def __init__(self, real_signal, sampling):
self._fs = 1/sampling
self._analytic_signal = hilbert(real_signal)
self._amplitude_envelope = np.abs(self._analytic_signal)
self._instantaneous_phase = np.unwrap(np.angle(self._analytic_signal))
self._instantaneous_frequency = (np.diff(self._instantaneous_phase) /
(2.0*np.pi) * self._fs)
self._instantaneous_frequency = np.insert(self._instantaneous_frequency, 0, np.nan)
def Envelope(syn, obs, nt, dt, eps=0.05):
# envelope difference
# (Yuan et al 2015, eq 16)
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
if esyn.all() == 0.0:
etmp = 0.0
else:
etmp = (esyn - eobs)/(esyn + eps*esyn.max())
wadj = etmp*syn - _np.imag(_analytic(etmp*_np.imag(_analytic(syn))))
return wadj
def Envelope3(syn, obs, nt, dt, eps=0.):
# envelope lag
# (Yuan et al 2015, eqs B-2, B-5)
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
erat = _np.zeros(nt)
erat[1:-1] = (esyn[2:] - esyn[0:-2])/(2.*dt)
erat[1:-1] /= esyn[1:-1]
erat *= misfit.Envelope3(syn, obs, nt, dt)
wadj = -erat*syn + _hilbert(erat*_hilbert(esyn))
return wadj
def Envelope(syn, obs, nt, dt, eps=0.05):
# envelope difference
# (Yuan et al 2015, eq 9)
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
ersd = esyn-eobs
return _np.sqrt(_np.sum(ersd*ersd*dt))
def Envelope2(syn, obs, nt, dt, eps=0.):
# envelope amplitude ratio
# (Yuan et al 2015, eq B-1)
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
raise NotImplementedError
def Envelope3(syn, obs, nt, dt, eps=0.):
# envelope cross-correlation lag
# (Yuan et al 2015, eqs B-4)
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
return Traveltime(esyn, eobs, nt, dt)
def AnalyticSignal(syn, obs, nt, dt, eps=0.):
esyn = abs(_analytic(syn))
eobs = abs(_analytic(obs))
esyn1 = esyn + eps*max(esyn)
eobs1 = eobs + eps*max(eobs)
diff = syn/esyn1 - obs/eobs1
return _np.sqrt(_np.sum(diff*diff*dt))
def hilbert(w):
return np.imag(analytic(w))
def spectral(x, sf, f, axis, stype, dcomplex, filt, filtorder, cycle, width,
njobs):
"""Extract spectral informations from data.
Parameters
----------
x : array_like
Array of data
sf : float
Sampling frequency
f : array_like
Frequency vector of shape (N, 2)
axis : int
Axis where the time is located.
stype : string
Spectral informations to extract (use either 'pha' or 'amp')
dcomplex : string
Complex decomposition type. Use either 'hilbert' or 'wavelet'
filt : string
Name of the filter to use (only if dcomplex is 'hilbert'). Use
either 'eegfilt', 'butter' or 'bessel'.
filtorder : int
Order of the filter (only if dcomplex is 'hilbert')
cycle : int
Number of cycles to use for fir1 filtering.
width : int
Width of the wavelet.
njobs : int
Number of jobs to use. If jobs is -1, all of them are going to be
used.
"""
# Filtering + complex decomposition :
if dcomplex is 'hilbert':
# Filt each time series :
nf = range(f.shape[0])
xf = Parallel(n_jobs=njobs)(delayed(filtdata)(
x, sf, f[k, :], axis, filt, cycle, filtorder) for k in nf)
# Use hilbert for the complex decomposition :
xd = hilbert(xf, axis=axis + 1) if stype is not None else np.array(xf)
elif dcomplex is 'wavelet':
f = f.mean(1) # centered frequencies
xd = Parallel(n_jobs=njobs)(delayed(morlet)(
x, sf, k, axis, width) for k in f)
# Extract phase / amplitude :
if stype is 'pha':
return np.angle(np.moveaxis(xd, axis + 1, -1))
elif stype is 'amp':
return np.abs(np.moveaxis(xd, axis + 1, -1))
elif stype is None:
return np.moveaxis(xd, axis + 1, -1)
def filter(self, sf, x, axis=-1, ftype='phase', keepfilt=False, njobs=-1):
"""Filt the data in the specified frequency bands.
Parameters
----------
sf: float
The sampling frequency.
x: array_like
Array of data.
axis : int | -1
Location of the time axis.
ftype : {'phase', 'amplitude'}
Specify if you want to extract phase ('phase') or the amplitude
('amplitude').
njobs : int | -1
Number of jobs to compute PAC in parallel. For very large data,
set this parameter to 1 in order to prevent large memory usage.
keepfilt : bool | False
Specify if you only want the filtered data (True). This parameter
is only avaible with dcomplex='hilbert' and not wavelet.
Returns
-------
xfilt : array_like
The filtered data of shape (n_frequency, ...)
"""
# Sampling frequency :
if not isinstance(sf, (int, float)):
raise ValueError("The sampling frequency must be a float number.")
else:
sf = float(sf)
# Compatibility between keepfilt and wavelet :
if (keepfilt is True) and (self._dcomplex is 'wavelet'):
raise ValueError("Using wavelet for the complex decomposition do "
"not allow to get filtered data only. Set the "
"keepfilt parameter to False or set dcomplex to "
"'hilbert'.")
# 1D signals :
if x.ndim == 1:
x = x.reshape(1, -1)
axis = 1
# Switch between phase or amplitude :
if ftype is 'phase':
tosend = 'pha' if not keepfilt else None
xfilt = spectral(x, sf, self.fpha, axis, tosend, self._dcomplex,
self._filt, self._filtorder, self._cycle[0],
self._width, njobs)
elif ftype is 'amplitude':
tosend = 'amp' if not keepfilt else None
xfilt = spectral(x, sf, self.famp, axis, tosend, self._dcomplex,
self._filt, self._filtorder, self._cycle[1],
self._width, njobs)
else:
raise ValueError("ftype must either be None, 'phase' or "
"'amplitude.'")
return xfilt