Benutzer-Werkzeuge

Webseiten-Werkzeuge


Seitenleiste

ws2021:hier_findet_ihr_den_finalen_code

Dieser Code ist dafür geeignet in Google CoLab ausgeführt zu werden und setzt voraus, dass der Datensatz im jeweiligen GoogleDrive der Benutzer*innen abgespeichert ist.

from google.colab import drive
drive.mount('/content/drive')
path = '/content/drive/MyDrive/Colab\ Notebooks/' 

import time
t1 = time.time()

!unzip /content/drive/MyDrive/Colab\ Notebooks/PTB-XL.zip >/dev/null

t = time.time() - t1
print("Extraction took", t//60, "Minutes and ", t%60, "seconds.")

!pip install wfdb >/dev/null

import pandas as pd
import numpy as np
import wfdb
import ast

import random
import matplotlib.pyplot as plt
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim

def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data


def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

def nn_output(diag_superclass):
  #print(diag_superclass)
  if diag_superclass == ["NORM"]:
    return 0
  else:
    return 1

path_dbase = "/content/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/"

sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(path_dbase + 'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path_dbase)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path_dbase + 'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)
Y['nn_output'] = Y.diagnostic_superclass.apply(nn_output)

Y[['diagnostic_superclass', 'nn_output']][:20]

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].nn_output
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].nn_output

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].nn_output
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].nn_output

# split test data in test and evaluation data
y_evaluation = y_test[:1000]
y_test = y_test[1000:]
X_evaluation = X_test[:1000]
X_test = X_test[1000:]

class block(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.out_channels, self.in_channels = out_channels, in_channels
        
        self.expansion = 1
        self.conv1 = nn.Conv1d(self.in_channels, self.out_channels, kernel_size=15, stride=stride, padding=7)
        self.bn1 = nn.BatchNorm1d(self.out_channels)
        self.conv2 = nn.Conv1d(self.out_channels, self.out_channels*2, kernel_size=9, stride=1, padding=4)
        self.bn2 = nn.BatchNorm1d(self.out_channels*2)
        self.conv3 = nn.Conv1d(self.out_channels*2, self.out_channels*2, kernel_size=5, stride=1, padding=2)
        self.bn3 = nn.BatchNorm1d(self.out_channels*2)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        
        
        self.lin1 = nn.Linear(out_channels*2*250, 800)
        self.lin2 = nn.Linear(800, 40)
        self.lin3 = nn.Linear(40, 10)
        self.lin4 = nn.Linear(10, 2)


    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        
        if self.identity_downsample is not None:
          identity = self.identity_downsample(identity)
       
        x += identity
        return x

class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels):
          super(ResNet, self).__init__()

          self.history_loss = []
          self.history_eval = []
          self.classific_accuracy_training = []
          self.current_epoch = 0

          self.in_channels = 12
          self.conv1 = nn.Conv1d(12, 12, kernel_size=7, stride=2, padding=3)
          self.bn1 = nn.BatchNorm1d(12)
          self.relu = nn.ReLU()
          self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

          self.layer1 = self._make_layer(block, layers[0], out_channels=12, stride=1)
          self.layer2 = self._make_layer(block, layers[1], out_channels=24, stride=2)
          self.layer3 = self._make_layer(block, layers[2], out_channels=24*2, stride=2)
          self.layer4 = self._make_layer(block, layers[3], out_channels=24*4, stride=2)

          self.avgpool = nn.AdaptiveAvgPool1d(25)
          self.lin1 = nn.Linear(4800, 800)
          self.lin2 = nn.Linear(800, 40)
          self.lin3 = nn.Linear(40, 10)
          self.lin4 = nn.Linear(10, 2)


    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)   # eventuell auf x.view Funktion umsteigen
        
        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        x = F.relu(self.lin3(x))
        x = self.lin4(x)
        return x

    def _make_layer(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []

        if stride != 1 or self.in_channels != out_channels*2:
            identity_downsample = nn.Sequential(nn.Conv1d(self.in_channels, out_channels*2, kernel_size=1, stride=stride), nn.BatchNorm1d(out_channels*2))

        layers.append(block(self.in_channels, out_channels, identity_downsample, stride=stride))

        
        self.in_channels = out_channels*2
        
        for i in range(num_residual_blocks -1):
            layers.append(block(self.in_channels, out_channels))
        
        return nn.Sequential(*layers)
        
        
def ResNet50(img_channels=12):
    return ResNet(block, [3, 4, 6, 3], img_channels)

def test():
    net = ResNet50()
    x = torch.randn(30, 12, 1000)
    y = net(x) #.to('cuda')
    return y

net = ResNet50()
net = net.cuda()
learning_rate = 0.1
batch_size = 50
optimizer = optim.SGD(net.parameters(), lr=learning_rate)

def train(epoch = 3000, b_s = batch_size, learning_rate = 0.8, progress = []):
    net.train()

    for i in range(epoch):
        random_batch = [np.random.randint(len(X_train)) for i in range(b_s)]

        # input-Werte
        my_in = np.swapaxes(X_train[random_batch], 1, 2)
        my_in = Variable(torch.Tensor(my_in))
        
        # target-Werte
        ziel = y_train[y_train.index[random_batch]].tolist()
        zielw = Variable(torch.Tensor(ziel))
        zielw = zielw.type(torch.LongTensor)

        #cuda
        my_in = my_in.cuda()
        zielw = zielw.cuda()

        # forward prop
        criterion = nn.CrossEntropyLoss()    # die Fehlerfunktion auf mean-squared-error setzen
        optimizer.zero_grad()       # Gradienten der vorherigen Epoche ausloeschen
        out = net(my_in)            # forward propagation

        #cuda
        out = out.cuda()

        if net.current_epoch >= 1500 and net.current_epoch < 3000:
          optimizer.param_groups[0]['lr'] = 0.05
        if net.current_epoch >= 3000:
          optimizer.param_groups[0]['lr'] = 0.03

        loss = criterion(out, zielw)    # Fehler berechnen

        #backward prop
        loss.backward()                 # backward propagation        
        optimizer.step()                # weights and biases neu einstellen
        net.history_loss.append(loss)

        if i%50 == 0:
          eval = evaluate()
          print('\r', net.current_epoch, '/', epoch, "Loss: " + "{:2.5f}".format(loss.item()), "Evaluation: " + "{:2.5f}".format(eval))
          net.history_eval.append(eval)
          net.classific_accuracy_training.append(evaluate(test_x=X_train[:2000], test_y=y_train[:2000]))

        net.current_epoch += 1

def evaluate(test_x = X_evaluation, test_y = y_evaluation, labels_von_richtig=False, werte_vgl=False):
    net.eval()
    """ermittelt, wie oft das Netz ein richtig zugeordnetes EKG erkennt
    mittels 'werte_vgl=True' kann man sich die Label der richtig erkannten EKGs ausgeben lassen"""
    z = 0
    for i in range(len(test_x)):
        x = np.swapaxes(test_x[i], 0, 1)
        x = Variable(torch.Tensor(x))
        x = x.unsqueeze(0)
        #cuda
        x = x.cuda()

        if torch.argmax(net(x)) == test_y[test_y.index[i]]:
            z += 1
            if labels_von_richtig:        # printet Labels der richtig klassifizierten EKGs aus Test-Werten
                print(test_y[test_y.index[i]])
        if werte_vgl:
            print(net(x)[0], test_y[test_y.index[i]])
    return z/len(test_x)
    
train()
ws2021/hier_findet_ihr_den_finalen_code.txt · Zuletzt geändert: 2021/04/06 21:33 von annika_cibis