Dieser Code ist dafür geeignet in Google CoLab ausgeführt zu werden und setzt voraus, dass der Datensatz im jeweiligen GoogleDrive der Benutzer*innen abgespeichert ist. from google.colab import drive drive.mount('/content/drive') path = '/content/drive/MyDrive/Colab\ Notebooks/' import time t1 = time.time() !unzip /content/drive/MyDrive/Colab\ Notebooks/PTB-XL.zip >/dev/null t = time.time() - t1 print("Extraction took", t//60, "Minutes and ", t%60, "seconds.") !pip install wfdb >/dev/null import pandas as pd import numpy as np import wfdb import ast import random import matplotlib.pyplot as plt import math import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable import torch.optim as optim def load_raw_data(df, sampling_rate, path): if sampling_rate == 100: data = [wfdb.rdsamp(path+f) for f in df.filename_lr] else: data = [wfdb.rdsamp(path+f) for f in df.filename_hr] data = np.array([signal for signal, meta in data]) return data def aggregate_diagnostic(y_dic): tmp = [] for key in y_dic.keys(): if key in agg_df.index: tmp.append(agg_df.loc[key].diagnostic_class) return list(set(tmp)) def nn_output(diag_superclass): #print(diag_superclass) if diag_superclass == ["NORM"]: return 0 else: return 1 path_dbase = "/content/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/" sampling_rate=100 # load and convert annotation data Y = pd.read_csv(path_dbase + 'ptbxl_database.csv', index_col='ecg_id') Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x)) # Load raw signal data X = load_raw_data(Y, sampling_rate, path_dbase) # Load scp_statements.csv for diagnostic aggregation agg_df = pd.read_csv(path_dbase + 'scp_statements.csv', index_col=0) agg_df = agg_df[agg_df.diagnostic == 1] # Apply diagnostic superclass Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic) Y['nn_output'] = Y.diagnostic_superclass.apply(nn_output) Y[['diagnostic_superclass', 'nn_output']][:20] # Split data into train and test test_fold = 10 # Train X_train = X[np.where(Y.strat_fold != test_fold)] y_train = Y[(Y.strat_fold != test_fold)].nn_output X_test = X[np.where(Y.strat_fold == test_fold)] y_test = Y[Y.strat_fold == test_fold].nn_output # Split data into train and test test_fold = 10 # Train X_train = X[np.where(Y.strat_fold != test_fold)] y_train = Y[(Y.strat_fold != test_fold)].nn_output X_test = X[np.where(Y.strat_fold == test_fold)] y_test = Y[Y.strat_fold == test_fold].nn_output # split test data in test and evaluation data y_evaluation = y_test[:1000] y_test = y_test[1000:] X_evaluation = X_test[:1000] X_test = X_test[1000:] class block(nn.Module): def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1): super(block, self).__init__() self.out_channels, self.in_channels = out_channels, in_channels self.expansion = 1 self.conv1 = nn.Conv1d(self.in_channels, self.out_channels, kernel_size=15, stride=stride, padding=7) self.bn1 = nn.BatchNorm1d(self.out_channels) self.conv2 = nn.Conv1d(self.out_channels, self.out_channels*2, kernel_size=9, stride=1, padding=4) self.bn2 = nn.BatchNorm1d(self.out_channels*2) self.conv3 = nn.Conv1d(self.out_channels*2, self.out_channels*2, kernel_size=5, stride=1, padding=2) self.bn3 = nn.BatchNorm1d(self.out_channels*2) self.relu = nn.ReLU() self.identity_downsample = identity_downsample self.lin1 = nn.Linear(out_channels*2*250, 800) self.lin2 = nn.Linear(800, 40) self.lin3 = nn.Linear(40, 10) self.lin4 = nn.Linear(10, 2) def forward(self, x): identity = x x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.conv2(x) x = self.bn2(x) x = self.relu(x) x = self.conv3(x) x = self.bn3(x) if self.identity_downsample is not None: identity = self.identity_downsample(identity) x += identity return x class ResNet(nn.Module): def __init__(self, block, layers, image_channels): super(ResNet, self).__init__() self.history_loss = [] self.history_eval = [] self.classific_accuracy_training = [] self.current_epoch = 0 self.in_channels = 12 self.conv1 = nn.Conv1d(12, 12, kernel_size=7, stride=2, padding=3) self.bn1 = nn.BatchNorm1d(12) self.relu = nn.ReLU() self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, layers[0], out_channels=12, stride=1) self.layer2 = self._make_layer(block, layers[1], out_channels=24, stride=2) self.layer3 = self._make_layer(block, layers[2], out_channels=24*2, stride=2) self.layer4 = self._make_layer(block, layers[3], out_channels=24*4, stride=2) self.avgpool = nn.AdaptiveAvgPool1d(25) self.lin1 = nn.Linear(4800, 800) self.lin2 = nn.Linear(800, 40) self.lin3 = nn.Linear(40, 10) self.lin4 = nn.Linear(10, 2) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avgpool(x) x = x.reshape(x.shape[0], -1) # eventuell auf x.view Funktion umsteigen x = F.relu(self.lin1(x)) x = F.relu(self.lin2(x)) x = F.relu(self.lin3(x)) x = self.lin4(x) return x def _make_layer(self, block, num_residual_blocks, out_channels, stride): identity_downsample = None layers = [] if stride != 1 or self.in_channels != out_channels*2: identity_downsample = nn.Sequential(nn.Conv1d(self.in_channels, out_channels*2, kernel_size=1, stride=stride), nn.BatchNorm1d(out_channels*2)) layers.append(block(self.in_channels, out_channels, identity_downsample, stride=stride)) self.in_channels = out_channels*2 for i in range(num_residual_blocks -1): layers.append(block(self.in_channels, out_channels)) return nn.Sequential(*layers) def ResNet50(img_channels=12): return ResNet(block, [3, 4, 6, 3], img_channels) def test(): net = ResNet50() x = torch.randn(30, 12, 1000) y = net(x) #.to('cuda') return y net = ResNet50() net = net.cuda() learning_rate = 0.1 batch_size = 50 optimizer = optim.SGD(net.parameters(), lr=learning_rate) def train(epoch = 3000, b_s = batch_size, learning_rate = 0.8, progress = []): net.train() for i in range(epoch): random_batch = [np.random.randint(len(X_train)) for i in range(b_s)] # input-Werte my_in = np.swapaxes(X_train[random_batch], 1, 2) my_in = Variable(torch.Tensor(my_in)) # target-Werte ziel = y_train[y_train.index[random_batch]].tolist() zielw = Variable(torch.Tensor(ziel)) zielw = zielw.type(torch.LongTensor) #cuda my_in = my_in.cuda() zielw = zielw.cuda() # forward prop criterion = nn.CrossEntropyLoss() # die Fehlerfunktion auf mean-squared-error setzen optimizer.zero_grad() # Gradienten der vorherigen Epoche ausloeschen out = net(my_in) # forward propagation #cuda out = out.cuda() if net.current_epoch >= 1500 and net.current_epoch < 3000: optimizer.param_groups[0]['lr'] = 0.05 if net.current_epoch >= 3000: optimizer.param_groups[0]['lr'] = 0.03 loss = criterion(out, zielw) # Fehler berechnen #backward prop loss.backward() # backward propagation optimizer.step() # weights and biases neu einstellen net.history_loss.append(loss) if i%50 == 0: eval = evaluate() print('\r', net.current_epoch, '/', epoch, "Loss: " + "{:2.5f}".format(loss.item()), "Evaluation: " + "{:2.5f}".format(eval)) net.history_eval.append(eval) net.classific_accuracy_training.append(evaluate(test_x=X_train[:2000], test_y=y_train[:2000])) net.current_epoch += 1 def evaluate(test_x = X_evaluation, test_y = y_evaluation, labels_von_richtig=False, werte_vgl=False): net.eval() """ermittelt, wie oft das Netz ein richtig zugeordnetes EKG erkennt mittels 'werte_vgl=True' kann man sich die Label der richtig erkannten EKGs ausgeben lassen""" z = 0 for i in range(len(test_x)): x = np.swapaxes(test_x[i], 0, 1) x = Variable(torch.Tensor(x)) x = x.unsqueeze(0) #cuda x = x.cuda() if torch.argmax(net(x)) == test_y[test_y.index[i]]: z += 1 if labels_von_richtig: # printet Labels der richtig klassifizierten EKGs aus Test-Werten print(test_y[test_y.index[i]]) if werte_vgl: print(net(x)[0], test_y[test_y.index[i]]) return z/len(test_x) train()