python类hilbert()的实例源码

_pac.py 文件源码 项目:brainpipe 作者: EtienneCmb 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _cfcFiltSuro(xPha, xAmp, surJob, self):
    """SUP: Get the cfc and surrogates

    The function return:
        - The unormalized cfc
        - All the surrogates (for pvalue)
        - The mean of surrogates (for normalization)
        - The deviation of surrogates (for normalization)
    """
    # Check input variables :
    npts, ntrial = xPha.shape
    W = self._window
    nwin = len(W)

    # Get the filter for phase/amplitude properties :
    phaMeth = self._pha.get(self._sf, self._pha.f, self._npts)
    ampMeth = self._amp.get(self._sf, self._amp.f, self._npts)

    # Filt the phase and amplitude :
    xPha = self._pha.apply(xPha, phaMeth)
    xAmp = self._amp.apply(xAmp, ampMeth)

    # Extract phase of amplitude for PLV method:
    if self.Id[0] in ['4']:
        for a in range(xAmp.shape[0]):
            for t in range(xAmp.shape[2]):
                xAmp[a, :, t] = np.angle(hilbert(np.ravel(xAmp[a, :, t])))

    # 2D loop trick :
    claIdx, listWin, listTrial = list2index(nwin, ntrial)

    # Get the unormalized cfc :
    uCfc = [_cfcGet(np.squeeze(xPha[:, W[k[0]][0]:W[k[0]][1], k[1]]),
                    np.squeeze(xAmp[:, W[k[0]][0]:W[k[0]][1], k[1]]),
                    self.Id, self._nbins) for k in claIdx]
    uCfc = np.array(groupInList(uCfc, listWin))

    # Run surogates on each window :
    if (self.n_perm != 0) and (self.Id[0] is not '5') and (self.Id[1] is not '0'):
        Suro = Parallel(n_jobs=surJob)(delayed(_cfcGetSuro)(
            xPha[:, k[0]:k[1], :], xAmp[:, k[0]:k[1], :],
            self.Id, self.n_perm, self._nbins, self._matricial) for k in self._window)
        mSuro = [np.mean(k, 3) for k in Suro]
        stdSuro = [np.std(k, 3) for k in Suro]
    else:
        Suro, mSuro, stdSuro = None, None, None

    return uCfc, Suro, mSuro, stdSuro
utils.py 文件源码 项目:nelpy 作者: nelpy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def signal_envelope1D(data, *, sigma=None, fs=None):
    """Docstring goes here

    TODO: this is not yet epoch-aware!

    sigma = 0 means no smoothing (default 4 ms)
    """

    if sigma is None:
        sigma = 0.004   # 4 ms standard deviation
    if fs is None:
        if isinstance(data, (np.ndarray, list)):
            raise ValueError("sampling frequency must be specified!")
        elif isinstance(data, core.AnalogSignalArray):
            fs = data.fs

    if isinstance(data, (np.ndarray, list)):
        # Compute number of samples to compute fast FFTs
        padlen = nextfastpower(len(data)) - len(data)
        # Pad data
        paddeddata = np.pad(data, (0, padlen), 'constant')
        # Use hilbert transform to get an envelope
        envelope = np.absolute(hilbert(paddeddata))
        # Truncate results back to original length
        envelope = envelope[:len(data)]
        if sigma:
            # Smooth envelope with a gaussian (sigma = 4 ms default)
            EnvelopeSmoothingSD = sigma*fs
            smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
            envelope = smoothed_envelope
    elif isinstance(data, core.AnalogSignalArray):
        newasa = copy.deepcopy(data)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            cum_lengths = np.insert(np.cumsum(data.lengths), 0, 0)

        # for segment in data:
        for idx in range(data.n_epochs):
            # print('hilberting epoch {}/{}'.format(idx+1, data.n_epochs))
            segment_data = data._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]]
            n_signals, n_samples = segment_data.shape
            assert n_signals == 1, 'only 1D signals supported!'
            # Compute number of samples to compute fast FFTs:
            padlen = nextfastpower(n_samples) - n_samples
            # Pad data
            paddeddata = np.pad(segment_data.squeeze(), (0, padlen), 'constant')
            # Use hilbert transform to get an envelope
            envelope = np.absolute(hilbert(paddeddata))
            # free up memory
            del paddeddata
            # Truncate results back to original length
            envelope = envelope[:n_samples]
            if sigma:
                # Smooth envelope with a gaussian (sigma = 4 ms default)
                EnvelopeSmoothingSD = sigma*fs
                smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
                envelope = smoothed_envelope
            newasa._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]] = np.atleast_2d(envelope)
        return newasa
    return envelope
correlate.py 文件源码 项目:yam 作者: trichter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def time_norm(data, method, clip_factor=1, clip_set_zero=False,
              mute_parts=48, mute_factor=2):
    """
    Calculate normalized data, see e.g. Bensen et al. (2007)

    :param data: numpy array with data to manipulate
    :param str method:
        1bit: reduce data to +1 if >0 and -1 if <0\n
        clip: clip data to the root mean square (rms)\n
        mute_envelope: calculate envelope and set data to zero where envelope
        is larger than specified
    :param float clip_factor: multiply std with this value before cliping
    :param bool clip_mask: instead of clipping, set the values to zero and mask
        them
    :param int mute_parts: mean of the envelope is calculated by dividing the
        envelope into several parts, the mean calculated in each part and
        the median of this averages defines the mean envelope
    :param float mute_factor: mean of envelope multiplied by this
        factor defines the level for muting

    :return: normalized data
    """
    mask = np.ma.getmask(data)
    if method == '1bit':
        data = np.sign(data)
    elif method == 'clip':
        std = np.std(data)
        args = (data < -clip_factor * std, data > clip_factor * std)
        if clip_set_zero:
            ind = np.logical_or(*args)
            data[ind] = 0
        else:
            np.clip(data, *args, out=data)
    elif method == 'mute_envelope':
        N = next_fast_len(len(data))
        envelope = np.abs(hilbert(data, N))[:len(data)]
        levels = [np.mean(d) for d in np.array_split(envelope, mute_parts)]
        level = mute_factor * np.median(levels)
        data[envelope > level] = 0
    else:
        msg = 'The method passed to time_norm is not known: %s.' % method
        raise ValueError(msg)
    return _fill_array(data, mask=mask, fill_value=0.)


# http://azitech.wordpress.com/
# 2011/03/15/designing-a-butterworth-low-pass-filter-with-scipy/


问题


面经


文章

微信
公众号

扫码关注公众号