def _cfcFiltSuro(xPha, xAmp, surJob, self):
"""SUP: Get the cfc and surrogates
The function return:
- The unormalized cfc
- All the surrogates (for pvalue)
- The mean of surrogates (for normalization)
- The deviation of surrogates (for normalization)
"""
# Check input variables :
npts, ntrial = xPha.shape
W = self._window
nwin = len(W)
# Get the filter for phase/amplitude properties :
phaMeth = self._pha.get(self._sf, self._pha.f, self._npts)
ampMeth = self._amp.get(self._sf, self._amp.f, self._npts)
# Filt the phase and amplitude :
xPha = self._pha.apply(xPha, phaMeth)
xAmp = self._amp.apply(xAmp, ampMeth)
# Extract phase of amplitude for PLV method:
if self.Id[0] in ['4']:
for a in range(xAmp.shape[0]):
for t in range(xAmp.shape[2]):
xAmp[a, :, t] = np.angle(hilbert(np.ravel(xAmp[a, :, t])))
# 2D loop trick :
claIdx, listWin, listTrial = list2index(nwin, ntrial)
# Get the unormalized cfc :
uCfc = [_cfcGet(np.squeeze(xPha[:, W[k[0]][0]:W[k[0]][1], k[1]]),
np.squeeze(xAmp[:, W[k[0]][0]:W[k[0]][1], k[1]]),
self.Id, self._nbins) for k in claIdx]
uCfc = np.array(groupInList(uCfc, listWin))
# Run surogates on each window :
if (self.n_perm != 0) and (self.Id[0] is not '5') and (self.Id[1] is not '0'):
Suro = Parallel(n_jobs=surJob)(delayed(_cfcGetSuro)(
xPha[:, k[0]:k[1], :], xAmp[:, k[0]:k[1], :],
self.Id, self.n_perm, self._nbins, self._matricial) for k in self._window)
mSuro = [np.mean(k, 3) for k in Suro]
stdSuro = [np.std(k, 3) for k in Suro]
else:
Suro, mSuro, stdSuro = None, None, None
return uCfc, Suro, mSuro, stdSuro
python类hilbert()的实例源码
def signal_envelope1D(data, *, sigma=None, fs=None):
"""Docstring goes here
TODO: this is not yet epoch-aware!
sigma = 0 means no smoothing (default 4 ms)
"""
if sigma is None:
sigma = 0.004 # 4 ms standard deviation
if fs is None:
if isinstance(data, (np.ndarray, list)):
raise ValueError("sampling frequency must be specified!")
elif isinstance(data, core.AnalogSignalArray):
fs = data.fs
if isinstance(data, (np.ndarray, list)):
# Compute number of samples to compute fast FFTs
padlen = nextfastpower(len(data)) - len(data)
# Pad data
paddeddata = np.pad(data, (0, padlen), 'constant')
# Use hilbert transform to get an envelope
envelope = np.absolute(hilbert(paddeddata))
# Truncate results back to original length
envelope = envelope[:len(data)]
if sigma:
# Smooth envelope with a gaussian (sigma = 4 ms default)
EnvelopeSmoothingSD = sigma*fs
smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
envelope = smoothed_envelope
elif isinstance(data, core.AnalogSignalArray):
newasa = copy.deepcopy(data)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
cum_lengths = np.insert(np.cumsum(data.lengths), 0, 0)
# for segment in data:
for idx in range(data.n_epochs):
# print('hilberting epoch {}/{}'.format(idx+1, data.n_epochs))
segment_data = data._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]]
n_signals, n_samples = segment_data.shape
assert n_signals == 1, 'only 1D signals supported!'
# Compute number of samples to compute fast FFTs:
padlen = nextfastpower(n_samples) - n_samples
# Pad data
paddeddata = np.pad(segment_data.squeeze(), (0, padlen), 'constant')
# Use hilbert transform to get an envelope
envelope = np.absolute(hilbert(paddeddata))
# free up memory
del paddeddata
# Truncate results back to original length
envelope = envelope[:n_samples]
if sigma:
# Smooth envelope with a gaussian (sigma = 4 ms default)
EnvelopeSmoothingSD = sigma*fs
smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
envelope = smoothed_envelope
newasa._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]] = np.atleast_2d(envelope)
return newasa
return envelope
def time_norm(data, method, clip_factor=1, clip_set_zero=False,
mute_parts=48, mute_factor=2):
"""
Calculate normalized data, see e.g. Bensen et al. (2007)
:param data: numpy array with data to manipulate
:param str method:
1bit: reduce data to +1 if >0 and -1 if <0\n
clip: clip data to the root mean square (rms)\n
mute_envelope: calculate envelope and set data to zero where envelope
is larger than specified
:param float clip_factor: multiply std with this value before cliping
:param bool clip_mask: instead of clipping, set the values to zero and mask
them
:param int mute_parts: mean of the envelope is calculated by dividing the
envelope into several parts, the mean calculated in each part and
the median of this averages defines the mean envelope
:param float mute_factor: mean of envelope multiplied by this
factor defines the level for muting
:return: normalized data
"""
mask = np.ma.getmask(data)
if method == '1bit':
data = np.sign(data)
elif method == 'clip':
std = np.std(data)
args = (data < -clip_factor * std, data > clip_factor * std)
if clip_set_zero:
ind = np.logical_or(*args)
data[ind] = 0
else:
np.clip(data, *args, out=data)
elif method == 'mute_envelope':
N = next_fast_len(len(data))
envelope = np.abs(hilbert(data, N))[:len(data)]
levels = [np.mean(d) for d in np.array_split(envelope, mute_parts)]
level = mute_factor * np.median(levels)
data[envelope > level] = 0
else:
msg = 'The method passed to time_norm is not known: %s.' % method
raise ValueError(msg)
return _fill_array(data, mask=mask, fill_value=0.)
# http://azitech.wordpress.com/
# 2011/03/15/designing-a-butterworth-low-pass-filter-with-scipy/