# stdlib
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Any, Callable, List, Optional, Tuple, Union
# third party
import numpy as np
import pandas as pd
import torch
from geomloss import SamplesLoss
from pydantic import validate_arguments
from scipy.stats import multivariate_normal
from sklearn.metrics import (accuracy_score, auc, average_precision_score, precision_recall_curve, roc_auc_score,
roc_curve)
from sklearn.preprocessing import MinMaxScaler, label_binarize
from torch import nn
from tqdm import tqdm
# tabeval absolute
from tabeval.logger import logger as log
from tabeval.plugins.core.models import bnaf
# tabeval absolute
from tabeval.utils.constants import DEVICE
[docs]
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def get_frequency(X_gt: pd.DataFrame, X_synth: pd.DataFrame, n_histogram_bins: int = 10) -> dict:
"""Get percentual frequencies for each possible real categorical value.
Returns:
The observed and expected frequencies (as a percent).
"""
res = {}
for col in X_gt.columns:
local_bins = min(n_histogram_bins, len(X_gt[col].unique()))
if len(X_gt[col].unique()) < 5: # categorical
gt = (X_gt[col].value_counts() / len(X_gt)).to_dict()
synth = (X_synth[col].value_counts() / len(X_synth)).to_dict()
else:
gt_vals, bins = np.histogram(X_gt[col], bins=local_bins)
synth_vals, _ = np.histogram(X_synth[col], bins=bins)
gt = {k: v / (sum(gt_vals) + 1e-8) for k, v in zip(bins, gt_vals)}
synth = {k: v / (sum(synth_vals) + 1e-8) for k, v in zip(bins, synth_vals)}
for val in gt:
if val not in synth or synth[val] == 0:
synth[val] = 1e-11
for val in synth:
if val not in gt or gt[val] == 0:
gt[val] = 1e-11
if gt.keys() != synth.keys():
raise ValueError(f"Invalid features. {gt.keys()}. syn = {synth.keys()}")
res[col] = (list(gt.values()), list(synth.values()))
return res
[docs]
def get_features(X: pd.DataFrame, sensitive_features: List[str] = []) -> List:
"""Return the non-sensitive features from dataset X"""
features = list(X.columns)
for col in sensitive_features:
if col in features:
features.remove(col)
return features
[docs]
def get_y_pred_proba_hlpr(y_pred_proba: np.ndarray, nclasses: int) -> np.ndarray:
if nclasses == 2:
if len(y_pred_proba.shape) < 2:
return y_pred_proba
if y_pred_proba.shape[1] == 2:
return y_pred_proba[:, 1]
return y_pred_proba
[docs]
def evaluate_auc(
y_test: np.ndarray,
y_pred_proba: np.ndarray,
classes: Union[np.ndarray, None] = None,
) -> Tuple[float, float]:
y_test = np.asarray(y_test)
y_pred_proba = np.asarray(y_pred_proba)
nnan = sum(np.ravel(np.isnan(y_pred_proba)))
if nnan:
raise ValueError("nan in predictions. aborting")
n_classes = len(set(np.ravel(y_test)))
y_pred_proba_tmp = get_y_pred_proba_hlpr(y_pred_proba, n_classes)
if n_classes > 2:
fpr = dict()
tpr = dict()
precision = dict()
recall = dict()
average_precision = dict()
roc_auc: dict = dict()
if classes is None:
classes = sorted(set(np.ravel(y_test)))
y_test = label_binarize(y_test, classes=classes)
fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_pred_proba_tmp.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
precision["micro"], recall["micro"], _ = precision_recall_curve(y_test.ravel(), y_pred_proba_tmp.ravel())
average_precision["micro"] = average_precision_score(y_test, y_pred_proba_tmp, average="micro")
aucroc = roc_auc["micro"]
aucprc = average_precision["micro"]
else:
aucroc = roc_auc_score(np.ravel(y_test), y_pred_proba_tmp, multi_class="ovr")
aucprc = average_precision_score(np.ravel(y_test), y_pred_proba_tmp)
return aucroc, aucprc
# Probability Density Function Classes for Domias
[docs]
class gaussian:
def __init__(self, X: np.ndarray) -> None:
var = np.std(X, axis=0) ** 2
mean = np.mean(X, axis=0)
self.rv = multivariate_normal(mean, np.diag(var))
[docs]
def pdf(self, Z: np.ndarray) -> np.ndarray:
return self.rv.pdf(Z)
[docs]
class normal_func:
def __init__(self, X: np.ndarray) -> None:
self.var = np.ones_like(np.std(X, axis=0) ** 2)
self.mean = np.zeros_like(np.mean(X, axis=0))
[docs]
def pdf(self, Z: np.ndarray) -> np.ndarray:
return multivariate_normal.pdf(Z, self.mean, np.diag(self.var))
# return multivariate_normal.pdf(Z, np.zeros_like(self.mean), np.diag(np.ones_like(self.var)))
[docs]
class normal_func_feat:
def __init__(
self,
X: np.ndarray,
continuous: list,
) -> None:
if np.any(np.array(continuous) > 1) or len(continuous) != X.shape[1]:
raise ValueError("Continous variable needs to be boolean")
self.feat = np.array(continuous).astype(bool)
if np.sum(self.feat) == 0:
raise ValueError("there needs to be at least one continuous feature")
for i in np.arange(X.shape[1])[self.feat]:
if len(np.unique(X[:, i])) < 10:
log.debug(f"Warning: feature {i} does not seem continous. CHECK")
self.var = np.std(X[:, self.feat], axis=0) ** 2
self.mean = np.mean(X[:, self.feat], axis=0)
[docs]
def pdf(self, Z: np.ndarray) -> np.ndarray:
return multivariate_normal.pdf(Z[:, self.feat], self.mean, np.diag(self.var))
[docs]
class GeneratorInterface(metaclass=ABCMeta):
[docs]
@abstractmethod
def fit(self, data: pd.DataFrame) -> "GeneratorInterface": ...
[docs]
@abstractmethod
def generate(self, count: int) -> pd.DataFrame: ...
# Domias helper functions
[docs]
def compute_wd(
X_syn: np.ndarray,
X: np.ndarray,
) -> float:
X_ = X.copy()
X_syn_ = X_syn.copy()
if len(X_) > len(X_syn_):
X_syn_ = np.concatenate([X_syn_, np.zeros((len(X_) - len(X_syn_), X_.shape[1]))])
scaler = MinMaxScaler().fit(X_)
X_ = scaler.transform(X_)
X_syn_ = scaler.transform(X_syn_)
X_ten = torch.from_numpy(X_).reshape(-1, 1)
Xsyn_ten = torch.from_numpy(X_syn_).reshape(-1, 1)
OT_solver = SamplesLoss(loss="sinkhorn")
return OT_solver(X_ten, Xsyn_ten).cpu().numpy().item()
# BNAF for Domias
[docs]
def load_dataset(
data_train: Optional[np.ndarray] = None,
data_valid: Optional[np.ndarray] = None,
data_test: Optional[np.ndarray] = None,
device: Any = DEVICE,
batch_dim: int = 50,
) -> Tuple[
torch.utils.data.DataLoader,
torch.utils.data.DataLoader,
torch.utils.data.DataLoader,
]:
if data_train is not None:
dataset_train = torch.utils.data.TensorDataset(torch.from_numpy(data_train).float().to(device))
if data_valid is None:
log.debug("No validation set passed")
data_valid = np.random.randn(*data_train.shape)
if data_test is None:
log.debug("No test set passed")
data_test = np.random.randn(*data_train.shape)
dataset_valid = torch.utils.data.TensorDataset(torch.from_numpy(data_valid).float().to(device))
dataset_test = torch.utils.data.TensorDataset(torch.from_numpy(data_test).float().to(device))
else:
raise RuntimeError()
data_loader_train = torch.utils.data.DataLoader(dataset_train, batch_size=batch_dim, shuffle=True)
data_loader_valid = torch.utils.data.DataLoader(dataset_valid, batch_size=batch_dim, shuffle=False)
data_loader_test = torch.utils.data.DataLoader(dataset_test, batch_size=batch_dim, shuffle=False)
return data_loader_train, data_loader_valid, data_loader_test
[docs]
def create_model(
n_dims: int,
n_flows: int = 5,
n_layers: int = 3,
hidden_dim: int = 32,
residual: Optional[str] = "gated", # [None, "normal", "gated"]
verbose: bool = False,
device: Any = DEVICE,
batch_dim: int = 50,
) -> nn.Module:
flows: List = []
for f in range(n_flows):
layers: List = []
for _ in range(n_layers - 1):
layers.append(
bnaf.MaskedWeight(
n_dims * hidden_dim,
n_dims * hidden_dim,
dim=n_dims,
)
)
layers.append(bnaf.Tanh())
flows.append(
bnaf.BNAF(
*(
[
bnaf.MaskedWeight(n_dims, n_dims * hidden_dim, dim=n_dims),
bnaf.Tanh(),
]
+ layers
+ [bnaf.MaskedWeight(n_dims * hidden_dim, n_dims, dim=n_dims)]
),
res=residual if f < n_flows - 1 else None,
)
)
if f < n_flows - 1:
flows.append(bnaf.Permutation(n_dims, "flip"))
model = bnaf.Sequential(*flows).to(device)
return model
[docs]
def save_model(
model: nn.Module,
optimizer: Any,
epoch: int,
save: bool = False,
workspace: Path = Path("logs/tabeval_workspace"),
) -> Callable:
workspace.mkdir(parents=True, exist_ok=True)
def f() -> None:
if save:
log.debug("Saving model..")
torch.save(
{
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch,
},
workspace / "DomiasMIA_bnaf_checkpoint.pt",
) # nosec B614
return f
[docs]
def load_model(
model: nn.Module,
optimizer: Any,
workspace: Path = Path("logs/tabeval_workspace"),
) -> Callable:
def f() -> None:
if workspace.exists():
return
log.info("Loading model..")
if (workspace / "checkpoint.pt").exists():
checkpoint = torch.load(workspace / "checkpoint.pt") # nosec B614
model.load_state_dict(checkpoint["model"])
optimizer.load_state_dict(checkpoint["optimizer"])
return f
[docs]
def compute_log_p_x(model: nn.Module, x_mb: torch.Tensor) -> torch.Tensor:
y_mb, log_diag_j_mb = model(x_mb)
log_p_y_mb = torch.distributions.Normal(torch.zeros_like(y_mb), torch.ones_like(y_mb)).log_prob(y_mb).sum(-1)
return log_p_y_mb + log_diag_j_mb
[docs]
def train(
model: nn.Module,
optimizer: Any,
scheduler: Any,
data_loader_train: torch.utils.data.DataLoader,
data_loader_valid: torch.utils.data.DataLoader,
data_loader_test: torch.utils.data.DataLoader,
workspace: Path = Path("logs/tabeval_workspace"),
start_epoch: int = 0,
device: Any = DEVICE,
epochs: int = 50,
save: bool = False,
clip_norm: float = 0.1,
) -> Callable:
epoch = start_epoch
for epoch in range(start_epoch, start_epoch + epochs):
t = tqdm(data_loader_train, smoothing=0, ncols=80, disable=True)
train_loss: torch.Tensor = []
for (x_mb,) in t:
loss = -compute_log_p_x(model, x_mb).mean()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=clip_norm)
optimizer.step()
optimizer.zero_grad()
t.set_postfix(loss="{:.2f}".format(loss.item()), refresh=False)
train_loss.append(loss)
train_loss = torch.stack(train_loss).mean()
optimizer.swap()
validation_loss = -torch.stack(
[compute_log_p_x(model, x_mb).mean().detach() for x_mb, in data_loader_valid],
-1,
).mean()
optimizer.swap()
log.debug(
"Epoch {:3}/{:3} -- train_loss: {:4.3f} -- validation_loss: {:4.3f}".format(
epoch + 1,
start_epoch + epochs,
train_loss.item(),
validation_loss.item(),
)
)
stop = scheduler.step(
validation_loss,
callback_best=save_model(model, optimizer, epoch + 1, save=save, workspace=workspace),
callback_reduce=load_model(model, optimizer, workspace=workspace),
)
if stop:
break
load_model(model, optimizer, workspace=workspace)()
optimizer.swap()
validation_loss = -torch.stack(
[compute_log_p_x(model, x_mb).mean().detach() for x_mb, in data_loader_valid],
-1,
).mean()
test_loss = -torch.stack([compute_log_p_x(model, x_mb).mean().detach() for x_mb, in data_loader_test], -1).mean()
log.debug(
f"""
###### Stop training after {epoch + 1} epochs!
Validation loss: {validation_loss.item():4.3f}
Test loss: {test_loss.item():4.3f}
"""
)
if save:
torch.save(
{
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch,
},
workspace / "checkpoint.pt",
) # nosec B614
log.debug(
f"""
###### Stop training after {epoch + 1} epochs!
Validation loss: {validation_loss.item():4.3f}
Test loss: {test_loss.item():4.3f}
"""
)
def p_func(x: np.ndarray) -> np.ndarray:
return np.exp(compute_log_p_x(model, x))
return p_func
[docs]
def density_estimator_trainer(
data_train: np.ndarray,
data_val: Optional[np.ndarray] = None,
data_test: Optional[np.ndarray] = None,
batch_dim: int = 50,
flows: int = 5,
layers: int = 3,
hidden_dim: int = 32,
residual: Optional[str] = "gated", # [None, "normal", "gated"]
workspace: Path = Path("logs/tabeval_workspace"),
decay: float = 0.5,
patience: int = 20,
cooldown: int = 10,
min_lr: float = 5e-4,
early_stopping: int = 100,
device: Any = DEVICE,
epochs: int = 50,
learning_rate: float = 1e-2,
clip_norm: float = 0.1,
polyak: float = 0.998,
save: bool = True,
load: bool = True,
) -> Tuple[Callable, nn.Module]:
log.debug("Loading dataset..")
data_loader_train, data_loader_valid, data_loader_test = load_dataset(
data_train,
data_val,
data_test,
device=device,
batch_dim=batch_dim,
)
if save:
log.debug("Creating directory experiment..")
workspace.mkdir(parents=True, exist_ok=True)
log.debug("Creating BNAF model..")
model = create_model(
data_train.shape[1],
batch_dim=batch_dim,
n_flows=flows,
n_layers=layers,
hidden_dim=hidden_dim,
verbose=True,
device=device,
)
log.debug("Creating optimizer..")
optimizer = bnaf.Adam(model.parameters(), lr=learning_rate, amsgrad=True, polyak=polyak)
log.debug("Creating scheduler..")
scheduler = bnaf.ReduceLROnPlateau(
optimizer,
factor=decay,
patience=patience,
cooldown=cooldown,
min_lr=min_lr,
verbose=True,
early_stopping=early_stopping,
threshold_mode="abs",
)
if load:
load_model(model, optimizer, workspace=workspace)()
log.debug("Training..")
p_func = train(
model,
optimizer,
scheduler,
data_loader_train,
data_loader_valid,
data_loader_test,
workspace=workspace,
device=device,
epochs=epochs,
save=save,
clip_norm=clip_norm,
)
return p_func, model
[docs]
def compute_metrics_baseline(
y_scores: np.ndarray, y_true: np.ndarray, sample_weight: Optional[np.ndarray] = None
) -> Tuple[float, float]:
y_pred = y_scores > np.median(y_scores)
y_true = np.nan_to_num(y_true)
y_pred = np.nan_to_num(y_pred)
y_scores = np.nan_to_num(y_scores)
acc = accuracy_score(y_true, y_pred, sample_weight=sample_weight)
auc = roc_auc_score(y_true, y_scores, sample_weight=sample_weight)
return acc, auc