Source code for dice_ml.explainer_interfaces.dice_genetic

"""
Module to generate diverse counterfactual explanations based on genetic algorithm
This code is similar to 'GeCo: Quality Counterfactual Explanations in Real Time': https://arxiv.org/pdf/2101.01292.pdf
"""
import copy
import random
import timeit

import numpy as np
import pandas as pd
from raiutils.exceptions import UserConfigValidationException

from dice_ml import diverse_counterfactuals as exp
from dice_ml.constants import ModelTypes
from dice_ml.explainer_interfaces.explainer_base import ExplainerBase


[docs]class DiceGenetic(ExplainerBase): def __init__(self, data_interface, model_interface): """Init method :param data_interface: an interface class to access data related params. :param model_interface: an interface class to access trained ML model. """ super().__init__(data_interface, model_interface) # initiating data related parameters self.num_output_nodes = None # variables required to generate CFs - see generate_counterfactuals() for more info self.cfs = [] self.features_to_vary = [] self.cf_init_weights = [] # total_CFs, algorithm, features_to_vary self.loss_weights = [] # yloss_type, diversity_loss_type, feature_weights self.feature_weights_input = '' # Initializing a label encoder to obtain label-encoded values for categorical variables self.labelencoder = self.data_interface.fit_label_encoders() self.predicted_outcome_name = self.data_interface.outcome_name + '_pred'
[docs] def update_hyperparameters(self, proximity_weight, sparsity_weight, diversity_weight, categorical_penalty): """Update hyperparameters of the loss function""" self.proximity_weight = proximity_weight self.sparsity_weight = sparsity_weight self.diversity_weight = diversity_weight self.categorical_penalty = categorical_penalty
[docs] def do_loss_initializations(self, yloss_type, diversity_loss_type, feature_weights, encoding='one-hot'): """Intializes variables related to main loss function""" self.loss_weights = [yloss_type, diversity_loss_type, feature_weights] # define the loss parts self.yloss_type = yloss_type self.diversity_loss_type = diversity_loss_type # define feature weights if feature_weights != self.feature_weights_input: self.feature_weights_input = feature_weights if feature_weights == "inverse_mad": normalized_mads = self.data_interface.get_valid_mads(normalized=False) feature_weights = {} for feature in normalized_mads: feature_weights[feature] = round(1 / normalized_mads[feature], 2) feature_weights_list = [] if encoding == 'one-hot': for feature in self.data_interface.encoded_feature_names: if feature in feature_weights: feature_weights_list.append(feature_weights[feature]) else: feature_weights_list.append(1.0) elif encoding == 'label': for feature in self.data_interface.feature_names: if feature in feature_weights: feature_weights_list.append(feature_weights[feature]) else: # the weight is inversely proportional to max value feature_weights_list.append(round(1 / self.feature_range[feature].max(), 2)) self.feature_weights_list = [feature_weights_list]
[docs] def do_random_init(self, num_inits, features_to_vary, query_instance, desired_class, desired_range): remaining_cfs = np.zeros((num_inits, self.data_interface.number_of_features)) # kx is the number of valid inits found so far kx = 0 precisions = self.data_interface.get_decimal_precisions() while kx < num_inits: one_init = np.zeros(self.data_interface.number_of_features) for jx, feature in enumerate(self.data_interface.feature_names): if feature in features_to_vary: if feature in self.data_interface.continuous_feature_names: one_init[jx] = np.round(np.random.uniform( self.feature_range[feature][0], self.feature_range[feature][1]), precisions[jx]) else: one_init[jx] = np.random.choice(self.feature_range[feature]) else: one_init[jx] = query_instance[jx] if self.is_cf_valid(self.predict_fn_scores(one_init)): remaining_cfs[kx] = one_init kx += 1 return remaining_cfs
[docs] def do_KD_init(self, features_to_vary, query_instance, cfs, desired_class, desired_range): cfs = self.label_encode(cfs) cfs = cfs.reset_index(drop=True) self.cfs = np.zeros((self.population_size, self.data_interface.number_of_features)) for kx in range(self.population_size): if kx >= len(cfs): break one_init = np.zeros(self.data_interface.number_of_features) for jx, feature in enumerate(self.data_interface.feature_names): if feature not in features_to_vary: one_init[jx] = (query_instance[jx]) else: if feature in self.data_interface.continuous_feature_names: if self.feature_range[feature][0] <= cfs.iat[kx, jx] <= self.feature_range[feature][1]: one_init[jx] = cfs.iat[kx, jx] else: if self.feature_range[feature][0] <= query_instance[jx] <= self.feature_range[feature][1]: one_init[jx] = query_instance[jx] else: one_init[jx] = np.random.uniform( self.feature_range[feature][0], self.feature_range[feature][1]) else: if cfs.iat[kx, jx] in self.feature_range[feature]: one_init[jx] = cfs.iat[kx, jx] else: if query_instance[jx] in self.feature_range[feature]: one_init[jx] = query_instance[jx] else: one_init[jx] = np.random.choice(self.feature_range[feature]) self.cfs[kx] = one_init kx += 1 new_array = [tuple(row) for row in self.cfs] uniques = np.unique(new_array, axis=0) if len(uniques) != self.population_size: remaining_cfs = self.do_random_init( self.population_size - len(uniques), features_to_vary, query_instance, desired_class, desired_range) self.cfs = np.concatenate([uniques, remaining_cfs])
[docs] def do_cf_initializations(self, total_CFs, initialization, algorithm, features_to_vary, desired_range, desired_class, query_instance, query_instance_df_dummies, verbose): """Intializes CFs and other related variables.""" self.cf_init_weights = [total_CFs, algorithm, features_to_vary] if algorithm == "RandomInitCF": # no. of times to run the experiment with random inits for diversity self.total_random_inits = total_CFs self.total_CFs = 1 # size of counterfactual set else: self.total_random_inits = 0 self.total_CFs = total_CFs # size of counterfactual set # freeze those columns that need to be fixed self.features_to_vary = features_to_vary # CF initialization self.cfs = [] if initialization == 'random': self.cfs = self.do_random_init( self.population_size, features_to_vary, query_instance, desired_class, desired_range) elif initialization == 'kdtree': # Partitioned dataset and KD Tree for each class (binary) of the dataset self.dataset_with_predictions, self.KD_tree, self.predictions = \ self.build_KD_tree(self.data_interface.data_df.copy(), desired_range, desired_class, self.predicted_outcome_name) if self.KD_tree is None: self.cfs = self.do_random_init( self.population_size, features_to_vary, query_instance, desired_class, desired_range) else: num_queries = min(len(self.dataset_with_predictions), self.population_size * self.total_CFs) indices = self.KD_tree.query(query_instance_df_dummies, num_queries)[1][0] KD_tree_output = self.dataset_with_predictions.iloc[indices].copy() self.do_KD_init(features_to_vary, query_instance, KD_tree_output, desired_class, desired_range) if verbose: print("Initialization complete! Generating counterfactuals...")
[docs] def do_param_initializations(self, total_CFs, initialization, desired_range, desired_class, query_instance, query_instance_df_dummies, algorithm, features_to_vary, permitted_range, yloss_type, diversity_loss_type, feature_weights, proximity_weight, sparsity_weight, diversity_weight, categorical_penalty, verbose): if verbose: print("Initializing initial parameters to the genetic algorithm...") self.feature_range = self.get_valid_feature_range(normalized=False) if len(self.cfs) != total_CFs: self.do_cf_initializations( total_CFs, initialization, algorithm, features_to_vary, desired_range, desired_class, query_instance, query_instance_df_dummies, verbose) else: self.total_CFs = total_CFs self.do_loss_initializations(yloss_type, diversity_loss_type, feature_weights, encoding='label') self.update_hyperparameters(proximity_weight, sparsity_weight, diversity_weight, categorical_penalty)
def _generate_counterfactuals(self, query_instance, total_CFs, initialization="kdtree", desired_range=None, desired_class="opposite", proximity_weight=0.2, sparsity_weight=0.2, diversity_weight=5.0, categorical_penalty=0.1, algorithm="DiverseCF", features_to_vary="all", permitted_range=None, yloss_type="hinge_loss", diversity_loss_type="dpp_style:inverse_dist", feature_weights="inverse_mad", stopping_threshold=0.5, posthoc_sparsity_param=0.1, posthoc_sparsity_algorithm="binary", maxiterations=500, thresh=1e-2, verbose=False): """Generates diverse counterfactual explanations :param query_instance: A dictionary of feature names and values. Test point of interest. :param total_CFs: Total number of counterfactuals required. :param initialization: Method to use to initialize the population of the genetic algorithm :param desired_range: For regression problems. Contains the outcome range to generate counterfactuals in. :param desired_class: For classification problems. Desired counterfactual class - can take 0 or 1. Default value is "opposite" to the outcome class of query_instance for binary classification. :param proximity_weight: A positive float. Larger this weight, more close the counterfactuals are to the query_instance. :param sparsity_weight: A positive float. Larger this weight, less features are changed from the query_instance. :param diversity_weight: A positive float. Larger this weight, more diverse the counterfactuals are. :param categorical_penalty: A positive float. A weight to ensure that all levels of a categorical variable sums to 1. :param algorithm: Counterfactual generation algorithm. Either "DiverseCF" or "RandomInitCF". :param features_to_vary: Either a string "all" or a list of feature names to vary. :param permitted_range: Dictionary with continuous feature names as keys and permitted min-max range in list as values. Defaults to the range inferred from training data. If None, uses the parameters initialized in data_interface. :param yloss_type: Metric for y-loss of the optimization function. Takes "l2_loss" or "log_loss" or "hinge_loss". :param diversity_loss_type: Metric for diversity loss of the optimization function. Takes "avg_dist" or "dpp_style:inverse_dist". :param feature_weights: Either "inverse_mad" or a dictionary with feature names as keys and corresponding weights as values. Default option is "inverse_mad" where the weight for a continuous feature is the inverse of the Median Absolute Devidation (MAD) of the feature's values in the training set; the weight for a categorical feature is equal to 1 by default. :param stopping_threshold: Minimum threshold for counterfactuals target class probability. :param posthoc_sparsity_param: Parameter for the post-hoc operation on continuous features to enhance sparsity. :param posthoc_sparsity_algorithm: Perform either linear or binary search. Takes "linear" or "binary". Prefer binary search when a feature range is large (for instance, income varying from 10k to 1000k) and only if the features share a monotonic relationship with predicted outcome in the model. :param maxiterations: Maximum iterations to run the genetic algorithm for. :param thresh: The genetic algorithm stops when the difference between the previous best loss and current best loss is less than thresh :param verbose: Parameter to determine whether to print 'Diverse Counterfactuals found!' :return: A CounterfactualExamples object to store and visualize the resulting counterfactual explanations (see diverse_counterfactuals.py). """ if not hasattr(self.data_interface, 'data_df') and initialization == "kdtree": raise UserConfigValidationException( "kd-tree initialization is not supported for private data" " interface because training data to build kd-tree is not available.") self.population_size = 10 * total_CFs self.start_time = timeit.default_timer() features_to_vary = self.setup(features_to_vary, permitted_range, query_instance, feature_weights) # Prepares user defined query_instance for DiCE. query_instance_orig = query_instance query_instance_orig = self.data_interface.prepare_query_instance( query_instance=query_instance_orig) query_instance = self.data_interface.prepare_query_instance( query_instance=query_instance) # number of output nodes of ML model self.num_output_nodes = None if self.model.model_type == ModelTypes.Classifier: self.num_output_nodes = self.model.get_num_output_nodes2(query_instance) query_instance = self.label_encode(query_instance) query_instance = np.array(query_instance.values[0]) self.x1 = query_instance # find the predicted value of query_instance test_pred = self.predict_fn_scores(query_instance) self.test_pred = test_pred desired_class = self.misc_init(stopping_threshold, desired_class, desired_range, test_pred[0]) query_instance_df_dummies = pd.get_dummies(query_instance_orig) for col in self.data_interface.get_all_dummy_colnames(): if col not in query_instance_df_dummies.columns: query_instance_df_dummies[col] = 0 self.do_param_initializations(total_CFs, initialization, desired_range, desired_class, query_instance, query_instance_df_dummies, algorithm, features_to_vary, permitted_range, yloss_type, diversity_loss_type, feature_weights, proximity_weight, sparsity_weight, diversity_weight, categorical_penalty, verbose) query_instance_df = self.find_counterfactuals(query_instance, desired_range, desired_class, features_to_vary, maxiterations, thresh, verbose) desired_class_param = self.decode_model_output(pd.Series(self.target_cf_class[0]))[0] \ if hasattr(self, 'target_cf_class') else desired_class return exp.CounterfactualExamples(data_interface=self.data_interface, test_instance_df=query_instance_df, final_cfs_df=self.final_cfs_df, final_cfs_df_sparse=self.final_cfs_df_sparse, posthoc_sparsity_param=posthoc_sparsity_param, desired_range=desired_range, desired_class=desired_class_param, model_type=self.model.model_type)
[docs] def predict_fn_scores(self, input_instance): """Returns prediction scores.""" input_instance = self.label_decode(input_instance) out = self.model.get_output(input_instance) if self.model.model_type == ModelTypes.Classifier and out.shape[1] == 1: # DL models return only 1 for binary classification out = np.hstack((1-out, out)) return out
[docs] def predict_fn(self, input_instance): """Returns actual prediction.""" input_instance = self.label_decode(input_instance) preds = self.model.get_output(input_instance, model_score=False) return preds
def _predict_fn_custom(self, input_instance, desired_class): """Checks that the maximum predicted score lies in the desired class.""" """The reason we do so can be illustrated by this example: If the predict probabilities are [0, 0.5, 0,5], the computed yloss is 0 as class 2 has the same value as the maximum score. sklearn's usual predict function, which implements argmax, returns class 1 instead of 2. This is why we need a custom predict function that returns the desired class if the maximum predict probability is the same as the probability of the desired class.""" input_instance = self.label_decode(input_instance) output = self.model.get_output(input_instance, model_score=True) if self.model.model_type == ModelTypes.Classifier and np.array(output).shape[1] == 1: # DL models return only 1 for binary classification output = np.hstack((1-output, output)) desired_class = int(desired_class) maxvalues = np.max(output, 1) predicted_values = np.argmax(output, 1) # We iterate through output as we often call _predict_fn_custom for multiple inputs at once for i in range(len(output)): if output[i][desired_class] == maxvalues[i]: predicted_values[i] = desired_class return predicted_values
[docs] def compute_yloss(self, cfs, desired_range, desired_class): """Computes the first part (y-loss) of the loss function.""" yloss = 0.0 if self.model.model_type == ModelTypes.Classifier: predicted_value = np.array(self.predict_fn_scores(cfs)) if self.yloss_type == 'hinge_loss': maxvalue = np.full((len(predicted_value)), -np.inf) for c in range(self.num_output_nodes): if c != desired_class: maxvalue = np.maximum(maxvalue, predicted_value[:, c]) yloss = np.maximum(0, maxvalue - predicted_value[:, int(desired_class)]) return yloss elif self.model.model_type == ModelTypes.Regressor: predicted_value = self.predict_fn(cfs) if self.yloss_type == 'hinge_loss': yloss = np.zeros(len(predicted_value)) for i in range(len(predicted_value)): if not desired_range[0] <= predicted_value[i] <= desired_range[1]: yloss[i] = min(abs(predicted_value[i] - desired_range[0]), abs(predicted_value[i] - desired_range[1])) return yloss
[docs] def compute_proximity_loss(self, x_hat_unnormalized, query_instance_normalized): """Compute weighted distance between two vectors.""" x_hat = self.data_interface.normalize_data(x_hat_unnormalized) feature_weights = np.array( [self.feature_weights_list[0][i] for i in self.data_interface.continuous_feature_indexes]) product = np.multiply( (abs(x_hat - query_instance_normalized)[:, [self.data_interface.continuous_feature_indexes]]), feature_weights) product = product.reshape(-1, product.shape[-1]) proximity_loss = np.sum(product, axis=1) # Dividing by the sum of feature weights to normalize proximity loss return proximity_loss / sum(feature_weights)
[docs] def compute_sparsity_loss(self, cfs): """Compute weighted distance between two vectors.""" sparsity_loss = np.count_nonzero(cfs - self.x1, axis=1) return sparsity_loss / len( self.data_interface.feature_names) # Dividing by the number of features to normalize sparsity loss
[docs] def compute_loss(self, cfs, desired_range, desired_class): """Computes the overall loss""" self.yloss = self.compute_yloss(cfs, desired_range, desired_class) self.proximity_loss = self.compute_proximity_loss(cfs, self.query_instance_normalized) \ if self.proximity_weight > 0 else 0.0 self.sparsity_loss = self.compute_sparsity_loss(cfs) if self.sparsity_weight > 0 else 0.0 self.loss = np.reshape(np.array(self.yloss + (self.proximity_weight * self.proximity_loss) + self.sparsity_weight * self.sparsity_loss), (-1, 1)) index = np.reshape(np.arange(len(cfs)), (-1, 1)) self.loss = np.concatenate([index, self.loss], axis=1) return self.loss
[docs] def mate(self, k1, k2, features_to_vary, query_instance): """Performs mating and produces new offsprings""" # chromosome for offspring one_init = np.zeros(self.data_interface.number_of_features) for j in range(self.data_interface.number_of_features): gp1 = k1[j] gp2 = k2[j] feat_name = self.data_interface.feature_names[j] # random probability prob = random.random() if prob < 0.40: # if prob is less than 0.40, insert gene from parent 1 one_init[j] = gp1 elif prob < 0.80: # if prob is between 0.40 and 0.80, insert gene from parent 2 one_init[j] = gp2 else: # otherwise insert random gene(mutate) for maintaining diversity if feat_name in features_to_vary: if feat_name in self.data_interface.continuous_feature_names: one_init[j] = np.random.uniform(self.feature_range[feat_name][0], self.feature_range[feat_name][0]) else: one_init[j] = np.random.choice(self.feature_range[feat_name]) else: one_init[j] = query_instance[j] return one_init
[docs] def find_counterfactuals(self, query_instance, desired_range, desired_class, features_to_vary, maxiterations, thresh, verbose): """Finds counterfactuals by generating cfs through the genetic algorithm""" population = self.cfs.copy() iterations = 0 previous_best_loss = -np.inf current_best_loss = np.inf stop_cnt = 0 cfs_preds = [np.inf] * self.total_CFs to_pred = None self.query_instance_normalized = self.data_interface.normalize_data(self.x1) self.query_instance_normalized = self.query_instance_normalized.astype('float') while iterations < maxiterations and self.total_CFs > 0: if abs(previous_best_loss - current_best_loss) <= thresh and \ (self.model.model_type == ModelTypes.Classifier and all(i == desired_class for i in cfs_preds) or (self.model.model_type == ModelTypes.Regressor and all(desired_range[0] <= i <= desired_range[1] for i in cfs_preds))): stop_cnt += 1 else: stop_cnt = 0 if stop_cnt >= 5: break previous_best_loss = current_best_loss population = np.unique(tuple(map(tuple, population)), axis=0) population_fitness = self.compute_loss(population, desired_range, desired_class) population_fitness = population_fitness[population_fitness[:, 1].argsort()] current_best_loss = population_fitness[0][1] to_pred = np.array([population[int(tup[0])] for tup in population_fitness[:self.total_CFs]]) if self.total_CFs > 0: if self.model.model_type == ModelTypes.Classifier: cfs_preds = self._predict_fn_custom(to_pred, desired_class) else: cfs_preds = self.predict_fn(to_pred) # self.total_CFS of the next generation obtained from the fittest members of current generation top_members = self.total_CFs new_generation_1 = np.array([population[int(tup[0])] for tup in population_fitness[:top_members]]) # rest of the next generation obtained from top 50% of fittest members of current generation rest_members = self.population_size - top_members new_generation_2 = None if rest_members > 0: new_generation_2 = np.zeros((rest_members, self.data_interface.number_of_features)) for new_gen_idx in range(rest_members): parent1 = random.choice(population[:int(len(population) / 2)]) parent2 = random.choice(population[:int(len(population) / 2)]) child = self.mate(parent1, parent2, features_to_vary, query_instance) new_generation_2[new_gen_idx] = child if new_generation_2 is not None: if self.total_CFs > 0: population = np.concatenate([new_generation_1, new_generation_2]) else: population = new_generation_2 else: raise SystemError("The number of total_Cfs is greater than the population size!") iterations += 1 self.cfs_preds = [] self.final_cfs = [] i = 0 while i < self.total_CFs: predictions = self.predict_fn_scores(population[i])[0] if self.is_cf_valid(predictions): self.final_cfs.append(population[i]) # checking if predictions is a float before taking the length as len() works only for array-like # elements. isinstance(predictions, (np.floating, float)) checks if it's any float (numpy or otherwise) # We do this as we take the argmax if the prediction is a vector -- like the output of a classifier if not isinstance(predictions, (np.floating, float)) and len(predictions) > 1: self.cfs_preds.append(np.argmax(predictions)) else: self.cfs_preds.append(predictions) i += 1 # converting to dataframe query_instance_df = self.label_decode(query_instance) query_instance_df[self.data_interface.outcome_name] = self.get_model_output_from_scores(self.test_pred) self.final_cfs_df = self.label_decode_cfs(self.final_cfs) self.final_cfs_df_sparse = copy.deepcopy(self.final_cfs_df) if self.final_cfs_df is not None: self.final_cfs_df[self.data_interface.outcome_name] = self.cfs_preds self.final_cfs_df_sparse[self.data_interface.outcome_name] = self.cfs_preds self.round_to_precision() # decoding to original label query_instance_df, self.final_cfs_df, self.final_cfs_df_sparse = \ self.decode_to_original_labels(query_instance_df, self.final_cfs_df, self.final_cfs_df_sparse) self.elapsed = timeit.default_timer() - self.start_time m, s = divmod(self.elapsed, 60) if verbose: if len(self.final_cfs) == self.total_CFs: print('Diverse Counterfactuals found! total time taken: %02d' % m, 'min %02d' % s, 'sec') else: print('Only %d (required %d) ' % (len(self.final_cfs), self.total_CFs), 'Diverse Counterfactuals found for the given configuation, perhaps ', 'change the query instance or the features to vary...' '; total time taken: %02d' % m, 'min %02d' % s, 'sec') return query_instance_df
[docs] def label_encode(self, input_instance): for column in self.data_interface.categorical_feature_names: input_instance[column] = self.labelencoder[column].transform(input_instance[column]) return input_instance
[docs] def label_decode(self, labelled_input): """Transforms label encoded data back to categorical values """ num_to_decode = 1 if len(labelled_input.shape) > 1: num_to_decode = len(labelled_input) else: labelled_input = [labelled_input] input_instance = [] for j in range(num_to_decode): temp = {} for i in range(len(labelled_input[j])): if self.data_interface.feature_names[i] in self.data_interface.categorical_feature_names: enc = self.labelencoder[self.data_interface.feature_names[i]] val = enc.inverse_transform(np.array([labelled_input[j][i]], dtype=np.int32)) temp[self.data_interface.feature_names[i]] = val[0] else: temp[self.data_interface.feature_names[i]] = labelled_input[j][i] input_instance.append(temp) input_instance_df = pd.DataFrame(input_instance, columns=self.data_interface.feature_names) return input_instance_df
[docs] def label_decode_cfs(self, cfs_arr): ret_df = None if cfs_arr is None: return None for cf in cfs_arr: df = self.label_decode(cf) if ret_df is None: ret_df = df else: ret_df = pd.concat([ret_df, df]) return ret_df
[docs] def get_valid_feature_range(self, normalized=False): ret = self.data_interface.get_valid_feature_range(self.feature_range, normalized=normalized) for feat_name in self.data_interface.categorical_feature_names: ret[feat_name] = self.labelencoder[feat_name].transform(ret[feat_name]) return ret