# Codelab: DFT, STFT e CWT in Python su segnali ECG

Questo notebook contiene **una possibile soluzione** del codelab sugli ECG.

Analizziamo tre segnali (`heart100`, `heart101`, `heart102`) utilizzando:

- **FFT (DFT)** per trovare la frequenza dominante,
- **STFT** per vedere come lo spettro cambia nel tempo,
- **CWT** per un'analisi tempo–scala.

La struttura segue quella del notebook per lo studente, ma con tutte le parti completate.

> Nota: da terminale, prima di eseguire il notebook, installare le librerie (se necessario):

```bash
pip install numpy matplotlib scipy pywavelets
```

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from scipy.io import loadmat
from scipy.signal import spectrogram, get_window
import pywt

%matplotlib inline

fs = 1000.0  # Hz

## Caricamento dei dati

In [None]:
data = loadmat("cardio100.mat")

# Estrazione come array 1D
heart100 = np.asarray(data["heart100"]).squeeze()
heart101 = np.asarray(data["heart101"]).squeeze()
heart102 = np.asarray(data["heart102"]).squeeze()

N = len(heart100)
t = np.arange(N) / fs

print("Numero di campioni N =", N)
print("Durata in secondi   =", N / fs)

## Normalizzazione e visualizzazione nel dominio del tempo

In [None]:
def normalizza(x: np.ndarray) -> np.ndarray:
    """Normalizza x in modo che max(|x|) = 1."""
    return x / np.max(np.abs(x))

x  = normalizza(heart100)
x1 = normalizza(heart101)
x2 = normalizza(heart102)

plt.figure(figsize=(10, 4))
plt.plot(t, x,  label="heart100")
plt.plot(t, x1, label="heart101")
plt.plot(t, x2, label="heart102")
plt.xlabel("Tempo [s]")
plt.ylabel("Ampiezza normalizzata")
plt.title("Tracce ECG normalizzate")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

### Periodo medio e indice *k* atteso (discussione teorica)

- Periodo medio `T` tra picchi ⇒ frequenza dominante `f0 = 1/T`.
- Per una DFT di lunghezza `N_FFT`, l'indice del bin spettrale atteso è:

\[
k \approx \frac{f_0}{f_s} \cdot N_{\text{FFT}}.
\]

Nel notebook studente si può chiedere di stimare `T`, `f0` e `k` manualmente
o con un piccolo script di rilevamento dei picchi.

## Funzione di utilità: FFT (solo frequenze positive)

In [None]:
def fft_pos(x: np.ndarray, fs: float, Nfft=None, use_hamming: bool = False):
    """Calcola la FFT di x (lunghezza Nfft) e restituisce (f_pos, |X(f)|) per f >= 0."""
    x = np.asarray(x)

    if Nfft is None:
        Nfft = len(x)

    x_seg = x[:Nfft]

    if use_hamming:
        w = np.hamming(len(x_seg))
        x_seg = x_seg * w

    X_full = np.fft.fft(x_seg, n=Nfft)
    f_full = np.fft.fftfreq(Nfft, d=1.0 / fs)

    mask = f_full >= 0
    f_pos = f_full[mask]
    X_pos = np.abs(X_full[mask])

    return f_pos, X_pos

### FFT 1000 punti su heart100

In [None]:
Nfft = 1000

# senza finestra
f_1000_nowin, X_1000_nowin = fft_pos(x, fs, Nfft=Nfft, use_hamming=False)

plt.figure(figsize=(8, 4))
plt.plot(f_1000_nowin, X_1000_nowin)
plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("heart100 - FFT 1000 pt (senza finestra)")
plt.grid(True)
plt.tight_layout()
plt.show()

# con finestra di Hamming
f_1000_ham, X_1000_ham = fft_pos(x, fs, Nfft=Nfft, use_hamming=True)

plt.figure(figsize=(8, 4))
plt.plot(f_1000_ham, X_1000_ham)
plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("heart100 - FFT 1000 pt (Hamming)")
plt.grid(True)
plt.tight_layout()
plt.show()

idx_max_1000 = np.argmax(X_1000_ham)
print("heart100, FFT 1000pt (Hamming), f_dom ≈", f_1000_ham[idx_max_1000], "Hz")

### FFT 2048 punti su heart100

In [None]:
Nfft = 2048

f_2048_nowin, X_2048_nowin = fft_pos(x, fs, Nfft=Nfft, use_hamming=False)
f_2048_ham,   X_2048_ham   = fft_pos(x, fs, Nfft=Nfft, use_hamming=True)

plt.figure(figsize=(8, 4))
plt.plot(f_2048_nowin, X_2048_nowin, label="No window")
plt.plot(f_2048_ham,   X_2048_ham,   label="Hamming")
plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("heart100 - FFT 2048 pt (confronto)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

idx_max_2048 = np.argmax(X_2048_ham)
print("heart100, FFT 2048pt (Hamming), f_dom ≈", f_2048_ham[idx_max_2048], "Hz")

### FFT del segnale al quadrato (heart100)

In [None]:
# Segnale al quadrato
x_sq = x**2

# 1000 pt FFT su x_sq
f_1000_sq_nowin, X_1000_sq_nowin = fft_pos(x_sq, fs, Nfft=1000, use_hamming=False)
f_1000_sq_ham,   X_1000_sq_ham   = fft_pos(x_sq, fs, Nfft=1000, use_hamming=True)

plt.figure(figsize=(8, 4))
plt.plot(f_1000_sq_nowin, X_1000_sq_nowin, label="No window")
plt.plot(f_1000_sq_ham,   X_1000_sq_ham,   label="Hamming")
plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("heart100^2 - FFT 1000 pt")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# 2048 pt FFT su x_sq
f_2048_sq_nowin, X_2048_sq_nowin = fft_pos(x_sq, fs, Nfft=2048, use_hamming=False)
f_2048_sq_ham,   X_2048_sq_ham   = fft_pos(x_sq, fs, Nfft=2048, use_hamming=True)

plt.figure(figsize=(8, 4))
plt.plot(f_2048_sq_nowin, X_2048_sq_nowin, label="No window")
plt.plot(f_2048_sq_ham,   X_2048_sq_ham,   label="Hamming")
plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("heart100^2 - FFT 2048 pt")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

## FFT di heart101 e heart102

In [None]:
x1 = normalizza(heart101)
x2 = normalizza(heart102)

for label, sig in [("heart101", x1), ("heart102", x2)]:
    f_h, X_h = fft_pos(sig, fs, Nfft=2048, use_hamming=True)
    idx = np.argmax(X_h)
    print(f"{label}, f_dom (2048,Hamming) ≈ {f_h[idx]:.2f} Hz")

plt.figure(figsize=(8, 4))
for label, sig in [("heart100", x), ("heart101", x1), ("heart102", x2)]:
    f_h, X_h = fft_pos(sig, fs, Nfft=2048, use_hamming=True)
    plt.plot(f_h, X_h, label=label)

plt.xlim(0, 100)
plt.xlabel("Frequenza [Hz]")
plt.ylabel("|X(f)|")
plt.title("Confronto FFT 2048 pt (Hamming)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

## STFT (Short-Time Fourier Transform)

In [None]:
def stft_spettrogramma(x: np.ndarray, fs: float, window_len: int, overlap_frac: float):
    """Calcola STFT tramite scipy.signal.spectrogram. Restituisce (t_stft, f_stft, Sxx)."""
    win = get_window("hamming", window_len)
    noverlap = int(overlap_frac * window_len)

    f_stft, t_stft, Sxx = spectrogram(
        x,
        fs=fs,
        window=win,
        nperseg=window_len,
        noverlap=noverlap,
        mode="complex"
    )
    return t_stft, f_stft, Sxx

In [None]:
window_len   = 128
overlap_frac = 0.5

t_stft, f_stft, Sxx = stft_spettrogramma(x, fs, window_len, overlap_frac)

plt.figure(figsize=(8, 4))
plt.pcolormesh(t_stft, f_stft, np.abs(Sxx), shading="gouraud")
plt.colorbar(label="|S(t,f)|")
plt.xlabel("Tempo [s]")
plt.ylabel("Frequenza [Hz]")
plt.ylim(0, 100)
plt.title(f"Spettrogramma STFT (heart100, win={window_len})")
plt.tight_layout()
plt.show()

In [None]:
for wlen in [64, 128, 256]:
    t_stft, f_stft, Sxx = stft_spettrogramma(x, fs, wlen, overlap_frac=0.5)
    plt.figure(figsize=(8, 4))
    plt.pcolormesh(t_stft, f_stft, np.abs(Sxx), shading="gouraud")
    plt.colorbar(label="|S(t,f)|")
    plt.xlabel("Tempo [s]")
    plt.ylabel("Frequenza [Hz]")
    plt.ylim(0, 100)
    plt.title(f"Spettrogramma STFT (heart100, win={wlen})")
    plt.tight_layout()
    plt.show()

## CWT (Continuous Wavelet Transform)

In [None]:
wavelet_mexh = pywt.ContinuousWavelet("mexh")

scales_test = [1, 5, 10, 16, 32, 64]

print("Scala -> frequenza (Hz) per 'mexh':")
for s in scales_test:
    f_cicli = pywt.scale2frequency(wavelet_mexh, s)  # cicli per campione
    f_hz    = f_cicli * fs
    print(f"  scala = {s:3d}  ->  f ≈ {f_hz:7.2f} Hz")

### CWT di una sinusoide a 50 Hz

In [None]:
f0   = 50.0
Ns   = 1000
t_sin = np.arange(Ns) / fs
x_sin = np.sin(2 * np.pi * f0 * t_sin)

plt.figure(figsize=(8, 3))
plt.plot(t_sin, x_sin)
plt.xlabel("Tempo [s]")
plt.ylabel("x_sin(t)")
plt.title("Sinusoide a 50 Hz")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
scales = np.arange(1, 33)

coeffs_sin, freqs_sin = pywt.cwt(
    x_sin,
    scales,
    "mexh",
    sampling_period=1.0 / fs
)

plt.figure(figsize=(8, 4))
plt.contour(
    np.abs(coeffs_sin),
    levels=8
)
plt.colorbar(label="|C(a,b)|")
plt.title("CWT (mexh) della sinusoide a 50 Hz - contour a 8 livelli")
plt.tight_layout()
plt.show()

In [None]:
coeffs_sin_morl, freqs_sin_morl = pywt.cwt(
    x_sin,
    scales,
    "morl",
    sampling_period=1.0 / fs
)

plt.figure(figsize=(8, 4))
plt.contour(
    np.abs(coeffs_sin_morl),
    levels=8
)
plt.colorbar(label="|C(a,b)|")
plt.title("CWT (morl) della sinusoide a 50 Hz - contour a 8 livelli")
plt.tight_layout()
plt.show()

### CWT del segnale ECG heart100

In [None]:
scales_ecg = np.arange(1, 101)

coeffs_ecg, freqs_ecg = pywt.cwt(
    x,
    scales_ecg,
    "mexh",
    sampling_period=1.0 / fs
)

plt.figure(figsize=(8, 4))
plt.imshow(
    np.abs(coeffs_ecg),
    extent=[t[0], t[-1], scales_ecg[-1], scales_ecg[0]],
    aspect="auto",
    origin="upper",
    cmap="viridis"
)
plt.xlabel("Tempo [s]")
plt.ylabel("Scala")
plt.title("Scalogramma CWT (mexh) - heart100")
plt.colorbar(label="|C(a,b)|")
plt.tight_layout()
plt.show()

In [None]:
scala_tipo = 10
idx_scala = np.argmin(np.abs(scales_ecg - scala_tipo))
coeff_line = coeffs_ecg[idx_scala, :]

plt.figure(figsize=(8, 3))
plt.plot(t, np.abs(coeff_line)**2)
plt.xlabel("Tempo [s]")
plt.ylabel("|C(a,b)|^2")
plt.title(f"Coefficiente CWT alla scala circa {scala_tipo}")
plt.grid(True)
plt.tight_layout()
plt.show()