ecg.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码
def detect_rpeak(ecg: DataStream,
                 fs: float = 64,
                 threshold: float = 0.5,
                 blackman_win_len_range: float = 0.2) -> DataStream:
    """
    This program implements the Pan Tomkins algorithm on ECG signal to detect the R peaks

    Since the ecg array can have discontinuity in the timestamp arrays the rr-interval calculated
    in the algorithm is calculated in terms of the index in the sample array

    The algorithm consists of some major steps

    1. computation of the moving window integration of the signal in terms of blackman window of a prescribed length
    2. compute all the peaks of the moving window integration signal
    3. adaptive thresholding with dynamic signal and noise thresholds applied to filter out the R peak locations
    4. confirm the R peaks through differentiation from the nearby peaks and remove the false peaks

    :param ecg: ecg array of tuples (timestamp,value)
    :param fs: sampling frequency
    :param threshold: initial threshold to detect the R peak in a signal normalized by the 90th percentile. .5 is default.
    :param blackman_win_len_range : the range to calculate blackman window length

    :return: R peak array of tuples (timestamp, Rpeak interval)
    """

    data = ecg.data
    result = DataStream.from_datastream([ecg])
    if len(data) == 0:
        result.data = []
        return result
    sample = np.array([i.sample for i in data])
    timestamp = np.array([i.start_time for i in data])

    # computes the moving window integration of the signal
    blackman_win_len = np.ceil(fs * blackman_win_len_range)
    y = compute_moving_window_int(sample, fs, blackman_win_len)

    peak_location_values = [(i, y[i]) for i in range(2, len(y) - 1) if check_peak(y[i - 2:i + 3])]

    # initial RR interval average
    peak_location = [i[0] for i in peak_location_values]
    running_rr_avg = sum(np.diff(peak_location)) / (len(peak_location) - 1)

    rpeak_temp1 = compute_r_peaks(threshold, running_rr_avg, y, peak_location_values)
    rpeak_temp2 = remove_close_peaks(rpeak_temp1, sample, fs)
    index = confirm_peaks(rpeak_temp2, sample, fs)

    rpeak_timestamp = timestamp[index]
    rpeak_value = np.diff(rpeak_timestamp)
    rpeak_timestamp = rpeak_timestamp[1:]

    result_data = []
    for k in range(len(rpeak_value)):
        result_data.append(
            DataPoint.from_tuple(rpeak_timestamp[k], rpeak_value[k].seconds + rpeak_value[k].microseconds / 1e6))

    # Create resulting datastream to be returned

    result.data = result_data

    return result
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号