Source code for dice_ml.explainer_interfaces.feasible_base_vae

# General Imports
import numpy as np
# Pytorch
import torch
import torch.utils.data
from torch.nn import functional as F

from dice_ml import diverse_counterfactuals as exp
# Dice Imports
from dice_ml.explainer_interfaces.explainer_base import ExplainerBase
from dice_ml.utils.helpers import get_base_gen_cf_initialization


[docs]class FeasibleBaseVAE(ExplainerBase): def __init__(self, data_interface, model_interface, **kwargs): """ :param data_interface: an interface class to data related params :param model_interface: an interface class to access trained ML model """ # initiating data related parameters super().__init__(data_interface) # Black Box ML Model to be explained self.pred_model = model_interface.model self.minx, self.maxx, self.encoded_categorical_feature_indexes, \ self.encoded_continuous_feature_indexes, self.cont_minx, self.cont_maxx, \ self.cont_precisions = self.data_interface.get_data_params_for_gradient_dice() self.data_interface.one_hot_encoded_data = self.data_interface.one_hot_encode_data(self.data_interface.data_df) # Hyperparam self.encoded_size = kwargs['encoded_size'] self.learning_rate = kwargs['lr'] self.batch_size = kwargs['batch_size'] self.validity_reg = kwargs['validity_reg'] self.margin = kwargs['margin'] self.epochs = kwargs['epochs'] self.wm1 = kwargs['wm1'] self.wm2 = kwargs['wm2'] self.wm3 = kwargs['wm3'] # Initializing parameters for the DiceBaseGenCF self.vae_train_dataset, self.vae_val_dataset, self.vae_test_dataset, self.normalise_weights, \ self.cf_vae, self.cf_vae_optimizer = get_base_gen_cf_initialization( self.data_interface, self.encoded_size, self.cont_minx, self.cont_maxx, self.margin, self.validity_reg, self.epochs, self.wm1, self.wm2, self.wm3, self.learning_rate) # Data paths self.base_model_dir = '../../../dice_ml/utils/sample_trained_models/' self.save_path = self.base_model_dir + self.data_interface.data_name + \ '-margin-' + str(self.margin) + '-validity_reg-' + str(self.validity_reg) + \ '-epoch-' + str(self.epochs) + '-' + 'base-gen' + '.pth'
[docs] def compute_loss(self, model_out, x, target_label): em = model_out['em'] ev = model_out['ev'] dm = model_out['x_pred'] mc_samples = model_out['mc_samples'] # KL Divergence kl_divergence = 0.5*torch.mean(em**2 + ev - torch.log(ev) - 1, axis=1) # Reconstruction Term # Proximity: L1 Loss x_pred = dm[0] s = self.cf_vae.encoded_start_cat recon_err = -torch.sum(torch.abs(x[:, s: -1] - x_pred[:, s: -1]), axis=1) for key in self.normalise_weights.keys(): recon_err += \ -(self.normalise_weights[key][1] - self.normalise_weights[key][0])*torch.abs(x[:, key] - x_pred[:, key]) # Sum to 1 over the categorical indexes of a feature for v in self.cf_vae.encoded_categorical_feature_indexes: temp = -torch.abs(1.0-torch.sum(x_pred[:, v[0]:v[-1]+1], axis=1)) recon_err += temp # Validity temp_logits = self.pred_model(x_pred) validity_loss = torch.zeros(1) temp_1 = temp_logits[target_label == 1, :] temp_0 = temp_logits[target_label == 0, :] validity_loss += F.hinge_embedding_loss( F.sigmoid(temp_1[:, 1]) - F.sigmoid(temp_1[:, 0]), torch.tensor(-1), self.margin, reduction='mean') validity_loss += F.hinge_embedding_loss( F.sigmoid(temp_0[:, 0]) - F.sigmoid(temp_0[:, 1]), torch.tensor(-1), self.margin, reduction='mean') for i in range(1, mc_samples): x_pred = dm[i] recon_err += -torch.sum(torch.abs(x[:, s:-1] - x_pred[:, s:-1]), axis=1) for key in self.normalise_weights.keys(): recon_err += \ -(self.normalise_weights[key][1] - self.normalise_weights[key][0])*torch.abs(x[:, key] - x_pred[:, key]) # Sum to 1 over the categorical indexes of a feature for v in self.cf_vae.encoded_categorical_feature_indexes: temp = -torch.abs(1.0 - torch.sum(x_pred[:, v[0]:v[-1]+1], axis=1)) recon_err += temp # Validity temp_logits = self.pred_model(x_pred) temp_1 = temp_logits[target_label == 1, :] temp_0 = temp_logits[target_label == 0, :] validity_loss += F.hinge_embedding_loss( F.sigmoid(temp_1[:, 1]) - F.sigmoid(temp_1[:, 0]), torch.tensor(-1), self.margin, reduction='mean') validity_loss += F.hinge_embedding_loss( F.sigmoid(temp_0[:, 0]) - F.sigmoid(temp_0[:, 1]), torch.tensor(-1), self.margin, reduction='mean') recon_err = recon_err / mc_samples validity_loss = -1*self.validity_reg*validity_loss/mc_samples print('recon: ', -torch.mean(recon_err), ' KL: ', torch.mean(kl_divergence), ' Validity: ', -validity_loss) return -torch.mean(recon_err - kl_divergence) - validity_loss
[docs] def train(self, pre_trained=False): ''' pre_trained: Bool Variable to check whether pre trained model exists to avoid training again ''' if pre_trained: self.cf_vae.load_state_dict(torch.load(self.save_path)) self.cf_vae.eval() return # TODO: Handling such dataset specific constraints in a more general way # CF Generation for only low to high income data points self.vae_train_dataset = self.vae_train_dataset[self.vae_train_dataset[:, -1] == 0, :] self.vae_val_dataset = self.vae_val_dataset[self.vae_val_dataset[:, -1] == 0, :] # Removing the outcome variable from the datasets self.vae_train_feat = self.vae_train_dataset[:, : -1] self.vae_val_feat = self.vae_val_dataset[:, : -1] for epoch in range(self.epochs): batch_num = 0 train_loss = 0.0 train_size = 0 train_dataset = torch.tensor(self.vae_train_feat).float() train_dataset = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) for train in enumerate(train_dataset): self.cf_vae_optimizer.zero_grad() train_x = train[1] train_y = 1.0-torch.argmax(self.pred_model(train_x), dim=1) train_size += train_x.shape[0] out = self.cf_vae(train_x, train_y) loss = self.compute_loss(out, train_x, train_y) loss.backward() train_loss += loss.item() self.cf_vae_optimizer.step() batch_num += 1 ret = loss/batch_num print('Train Avg Loss: ', ret, train_size) # Save the model after training every 10 epochs and at the last epoch if (epoch != 0 and (epoch % 10) == 0) or epoch == self.epochs-1: torch.save(self.cf_vae.state_dict(), self.save_path)
# The input arguments for this function same as the one defined for Diverse CF
[docs] def generate_counterfactuals(self, query_instance, total_CFs, desired_class="opposite"): # Loading the latest trained CFVAE model self.cf_vae.load_state_dict(torch.load(self.save_path)) self.cf_vae.eval() query_instance = self.data_interface.get_ohe_min_max_normalized_data(query_instance) query_instance = np.array([query_instance.iloc[0].values]) if query_instance.shape[0] > self.batch_size: test_dataset = np.array_split(query_instance, query_instance.shape[0]//self.batch_size, axis=0) else: test_dataset = [query_instance] final_gen_cf = [] final_cf_pred = [] final_test_pred = [] for i in range(len(query_instance)): train_x = test_dataset[i] train_x = torch.tensor(train_x).float() train_y = torch.argmax(self.pred_model(train_x), dim=1) curr_gen_cf = [] curr_cf_pred = [] curr_test_pred = train_y.numpy() for _ in range(total_CFs): recon_err, kl_err, x_true, x_pred, cf_label = \ self.cf_vae.compute_elbo(train_x, 1.0-train_y, self.pred_model) while (cf_label == train_y): print(cf_label, train_y) recon_err, kl_err, x_true, x_pred, cf_label = \ self.cf_vae.compute_elbo(train_x, 1.0-train_y, self.pred_model) x_pred = x_pred.detach().numpy() # Converting mixed scores into one hot feature representations for v in self.cf_vae.encoded_categorical_feature_indexes: curr_max = x_pred[:, v[0]] curr_max_idx = v[0] for idx in v: if curr_max < x_pred[:, idx]: curr_max = x_pred[:, idx] curr_max_idx = idx for idx in v: if idx == curr_max_idx: x_pred[:, idx] = 1 else: x_pred[:, idx] = 0 cf_label = cf_label.detach().numpy() cf_label = np.reshape(cf_label, (cf_label.shape[0], 1)) curr_gen_cf.append(x_pred) curr_cf_pred.append(cf_label) final_gen_cf.append(curr_gen_cf) final_cf_pred.append(curr_cf_pred) final_test_pred.append(curr_test_pred) # CF Gen out result = {} result['query-instance'] = query_instance result['test-pred'] = final_test_pred[0][0] result['CF'] = final_gen_cf[0] result['CF-Pred'] = final_cf_pred[0] # do inverse transform of CFs to original user-fed format cfs = np.array([cfval[0] for cfval in result['CF']]) final_cfs_df = self.data_interface.get_inverse_ohe_min_max_normalized_data(cfs) cfs_preds = [np.round(preds.flatten().tolist(), 3) for preds in result['CF-Pred']] cfs_preds = [item for sublist in cfs_preds for item in sublist] final_cfs_df[self.data_interface.outcome_name] = np.array(cfs_preds) test_instance_df = self.data_interface.get_inverse_ohe_min_max_normalized_data(result['query-instance']) test_instance_df[self.data_interface.outcome_name] = np.array(np.round(result['test-pred'], 3)) # Adding empty list for sparse cf gen and pred; adding 'NA' for the posthoc sparsity cofficient return exp.CounterfactualExamples(data_interface=self.data_interface, final_cfs_df=final_cfs_df, test_instance_df=test_instance_df, final_cfs_df_sparse=None, posthoc_sparsity_param=None, desired_class=desired_class)