def detect_rpeak(ecg: DataStream,
fs: float = 64,
threshold: float = 0.5,
blackman_win_len_range: float = 0.2) -> DataStream:
"""
This program implements the Pan Tomkins algorithm on ECG signal to detect the R peaks
Since the ecg array can have discontinuity in the timestamp arrays the rr-interval calculated
in the algorithm is calculated in terms of the index in the sample array
The algorithm consists of some major steps
1. computation of the moving window integration of the signal in terms of blackman window of a prescribed length
2. compute all the peaks of the moving window integration signal
3. adaptive thresholding with dynamic signal and noise thresholds applied to filter out the R peak locations
4. confirm the R peaks through differentiation from the nearby peaks and remove the false peaks
:param ecg: ecg array of tuples (timestamp,value)
:param fs: sampling frequency
:param threshold: initial threshold to detect the R peak in a signal normalized by the 90th percentile. .5 is default.
:param blackman_win_len_range : the range to calculate blackman window length
:return: R peak array of tuples (timestamp, Rpeak interval)
"""
data = ecg.data
result = DataStream.from_datastream([ecg])
if len(data) == 0:
result.data = []
return result
sample = np.array([i.sample for i in data])
timestamp = np.array([i.start_time for i in data])
# computes the moving window integration of the signal
blackman_win_len = np.ceil(fs * blackman_win_len_range)
y = compute_moving_window_int(sample, fs, blackman_win_len)
peak_location_values = [(i, y[i]) for i in range(2, len(y) - 1) if check_peak(y[i - 2:i + 3])]
# initial RR interval average
peak_location = [i[0] for i in peak_location_values]
running_rr_avg = sum(np.diff(peak_location)) / (len(peak_location) - 1)
rpeak_temp1 = compute_r_peaks(threshold, running_rr_avg, y, peak_location_values)
rpeak_temp2 = remove_close_peaks(rpeak_temp1, sample, fs)
index = confirm_peaks(rpeak_temp2, sample, fs)
rpeak_timestamp = timestamp[index]
rpeak_value = np.diff(rpeak_timestamp)
rpeak_timestamp = rpeak_timestamp[1:]
result_data = []
for k in range(len(rpeak_value)):
result_data.append(
DataPoint.from_tuple(rpeak_timestamp[k], rpeak_value[k].seconds + rpeak_value[k].microseconds / 1e6))
# Create resulting datastream to be returned
result.data = result_data
return result
评论列表
文章目录