-
Notifications
You must be signed in to change notification settings - Fork 6
/
Preprocessing.py
122 lines (103 loc) · 4.32 KB
/
Preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import pickle
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
import biosppy.signals.tools as st
import numpy as np
import os
import wfdb
from biosppy.signals.ecg import correct_rpeaks, hamilton_segmenter
from scipy.signal import medfilt
from sklearn.utils import cpu_count
from tqdm import tqdm
# PhysioNet Apnea-ECG dataset
# url: https://physionet.org/physiobank/database/apnea-ecg/
base_dir = "dataset"
fs = 100
sample = fs * 60 # 1 min's sample points
before = 2 # forward interval (min)
after = 2 # backward interval (min)
hr_min = 20
hr_max = 300
num_worker = 35 if cpu_count() > 35 else cpu_count() - 1 # Setting according to the number of CPU cores
def worker(name, labels):
X = []
y = []
groups = []
signals = wfdb.rdrecord(os.path.join(base_dir, name), channels=[0]).p_signal[:, 0]
for j in tqdm(range(len(labels)), desc=name, file=sys.stdout):
if j < before or \
(j + 1 + after) > len(signals) / float(sample):
continue
signal = signals[int((j - before) * sample):int((j + 1 + after) * sample)]
signal, _, _ = st.filter_signal(signal, ftype='FIR', band='bandpass', order=int(0.3 * fs),
frequency=[3, 45], sampling_rate=fs)
# Find R peaks
rpeaks, = hamilton_segmenter(signal, sampling_rate=fs)
rpeaks, = correct_rpeaks(signal, rpeaks=rpeaks, sampling_rate=fs, tol=0.1)
if len(rpeaks) / (1 + after + before) < 40 or \
len(rpeaks) / (1 + after + before) > 200: # Remove abnormal R peaks signal
continue
# Extract RRI, Ampl signal
rri_tm, rri_signal = rpeaks[1:] / float(fs), np.diff(rpeaks) / float(fs)
rri_signal = medfilt(rri_signal, kernel_size=3)
ampl_tm, ampl_siganl = rpeaks / float(fs), signal[rpeaks]
hr = 60 / rri_signal
# Remove physiologically impossible HR signal
if np.all(np.logical_and(hr >= hr_min, hr <= hr_max)):
# Save extracted signal
X.append([(rri_tm, rri_signal), (ampl_tm, ampl_siganl)])
y.append(0. if labels[j] == 'N' else 1.)
groups.append(name)
return X, y, groups
if __name__ == "__main__":
apnea_ecg = {}
names = [
"a01", "a02", "a03", "a04", "a05", "a06", "a07", "a08", "a09", "a10",
"a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a20",
"b01", "b02", "b03", "b04", "b05",
"c01", "c02", "c03", "c04", "c05", "c06", "c07", "c08", "c09", "c10"
]
o_train = []
y_train = []
groups_train = []
print('Training...')
with ProcessPoolExecutor(max_workers=num_worker) as executor:
task_list = []
for i in range(len(names)):
labels = wfdb.rdann(os.path.join(base_dir, names[i]), extension="apn").symbol
task_list.append(executor.submit(worker, names[i], labels))
for task in as_completed(task_list):
X, y, groups = task.result()
o_train.extend(X)
y_train.extend(y)
groups_train.extend(groups)
print()
answers = {}
with open(os.path.join(base_dir, "event-2-answers"), "r") as f:
for answer in f.read().split("\n\n"):
answers[answer[:3]] = list("".join(answer.split()[2::2]))
names = [
"x01", "x02", "x03", "x04", "x05", "x06", "x07", "x08", "x09", "x10",
"x11", "x12", "x13", "x14", "x15", "x16", "x17", "x18", "x19", "x20",
"x21", "x22", "x23", "x24", "x25", "x26", "x27", "x28", "x29", "x30",
"x31", "x32", "x33", "x34", "x35"
]
o_test = []
y_test = []
groups_test = []
print("Testing...")
with ProcessPoolExecutor(max_workers=num_worker) as executor:
task_list = []
for i in range(len(names)):
labels = answers[names[i]]
task_list.append(executor.submit(worker, names[i], labels))
for task in as_completed(task_list):
X, y, groups = task.result()
o_test.extend(X)
y_test.extend(y)
groups_test.extend(groups)
apnea_ecg = dict(o_train=o_train, y_train=y_train, groups_train=groups_train, o_test=o_test, y_test=y_test,
groups_test=groups_test)
with open(os.path.join(base_dir, "apnea-ecg.pkl"), "wb") as f:
pickle.dump(apnea_ecg, f, protocol=2)
print("\nok!")