Source code for dice_ml.explainer_interfaces.dice_random


"""
Module to generate diverse counterfactual explanations based on random sampling.
A simple implementation.
"""
import random
import timeit

import numpy as np
import pandas as pd

from dice_ml import diverse_counterfactuals as exp
from dice_ml.constants import ModelTypes
from dice_ml.explainer_interfaces.explainer_base import ExplainerBase


[docs]class DiceRandom(ExplainerBase): def __init__(self, data_interface, model_interface): """Init method :param data_interface: an interface class to access data related params. :param model_interface: an interface class to access trained ML model. """ super().__init__(data_interface) # initiating data related parameters self.model = model_interface self.model.load_model() # loading pickled trained model if applicable self.model.transformer.feed_data_params(data_interface) self.model.transformer.initialize_transform_func() self.precisions = self.data_interface.get_decimal_precisions(output_type="dict") if self.data_interface.outcome_name in self.precisions: self.outcome_precision = [self.precisions[self.data_interface.outcome_name]] else: self.outcome_precision = 0 def _generate_counterfactuals(self, query_instance, total_CFs, desired_range=None, desired_class="opposite", permitted_range=None, features_to_vary="all", stopping_threshold=0.5, posthoc_sparsity_param=0.1, posthoc_sparsity_algorithm="linear", sample_size=1000, random_seed=None, verbose=False, limit_steps_ls=10000): """Generate counterfactuals by randomly sampling features. :param query_instance: Test point of interest. A dictionary of feature names and values or a single row dataframe. :param total_CFs: Total number of counterfactuals required. :param desired_range: For regression problems. Contains the outcome range to generate counterfactuals in. :param desired_class: Desired counterfactual class - can take 0 or 1. Default value is "opposite" to the outcome class of query_instance for binary classification. :param permitted_range: Dictionary with feature names as keys and permitted range in list as values. Defaults to the range inferred from training data. If None, uses the parameters initialized in data_interface. :param features_to_vary: Either a string "all" or a list of feature names to vary. :param stopping_threshold: Minimum threshold for counterfactuals target class probability. :param posthoc_sparsity_param: Parameter for the post-hoc operation on continuous features to enhance sparsity. :param posthoc_sparsity_algorithm: Perform either linear or binary search. Takes "linear" or "binary". Prefer binary search when a feature range is large (for instance, income varying from 10k to 1000k) and only if the features share a monotonic relationship with predicted outcome in the model. :param sample_size: Sampling size :param random_seed: Random seed for reproducibility :param limit_steps_ls: Defines an upper limit for the linear search step in the posthoc_sparsity_enhancement :returns: A CounterfactualExamples object that contains the dataframe of generated counterfactuals as an attribute. """ self.features_to_vary = self.setup(features_to_vary, permitted_range, query_instance, feature_weights=None) if features_to_vary == "all": self.fixed_features_values = {} else: self.fixed_features_values = {} for feature in self.data_interface.feature_names: if feature not in features_to_vary: self.fixed_features_values[feature] = query_instance[feature].iat[0] # Do predictions once on the query_instance and reuse across to reduce the number # inferences. model_predictions = self.predict_fn(query_instance) # number of output nodes of ML model self.num_output_nodes = None if self.model.model_type == ModelTypes.Classifier: self.num_output_nodes = model_predictions.shape[1] # query_instance need no transformation for generating CFs using random sampling. # find the predicted value of query_instance test_pred = model_predictions[0] if self.model.model_type == ModelTypes.Classifier: self.target_cf_class = self.infer_target_cfs_class(desired_class, test_pred, self.num_output_nodes) elif self.model.model_type == ModelTypes.Regressor: self.target_cf_range = self.infer_target_cfs_range(desired_range) # fixing features that are to be fixed self.total_CFs = total_CFs self.stopping_threshold = stopping_threshold if self.model.model_type == ModelTypes.Classifier: # TODO Generalize this for multi-class if self.target_cf_class == 0 and self.stopping_threshold > 0.5: self.stopping_threshold = 0.25 elif self.target_cf_class == 1 and self.stopping_threshold < 0.5: self.stopping_threshold = 0.75 # get random samples for each feature independently start_time = timeit.default_timer() random_instances = self.get_samples( self.fixed_features_values, self.feature_range, sampling_random_seed=random_seed, sampling_size=sample_size) # Generate copies of the query instance that will be changed one feature # at a time to encourage sparsity. cfs_df = None candidate_cfs = pd.DataFrame( np.repeat(query_instance.values, sample_size, axis=0), columns=query_instance.columns) # Loop to change one feature at a time, then two features, and so on. for num_features_to_vary in range(1, len(self.features_to_vary)+1): selected_features = np.random.choice(self.features_to_vary, (sample_size, 1), replace=True) for k in range(sample_size): candidate_cfs.at[k, selected_features[k][0]] = random_instances.at[k, selected_features[k][0]] scores = self.predict_fn(candidate_cfs) validity = self.decide_cf_validity(scores) if sum(validity) > 0: rows_to_add = candidate_cfs[validity == 1] if cfs_df is None: cfs_df = rows_to_add.copy() else: cfs_df = pd.concat([cfs_df, rows_to_add]) cfs_df.drop_duplicates(inplace=True) # Always change at least 2 features before stopping if num_features_to_vary >= 2 and len(cfs_df) >= total_CFs: break self.total_cfs_found = 0 self.valid_cfs_found = False if cfs_df is not None and len(cfs_df) > 0: if len(cfs_df) > total_CFs: cfs_df = cfs_df.sample(total_CFs) cfs_df.reset_index(inplace=True, drop=True) if len(cfs_df) > 0: self.cfs_pred_scores = self.predict_fn(cfs_df) cfs_df[self.data_interface.outcome_name] = self.get_model_output_from_scores(self.cfs_pred_scores) else: if self.model.model_type == ModelTypes.Classifier: self.cfs_pred_scores = [0]*self.num_output_nodes else: self.cfs_pred_scores = [0] self.total_cfs_found = len(cfs_df) self.valid_cfs_found = True if self.total_cfs_found >= self.total_CFs else False if len(cfs_df) > 0: final_cfs_df = cfs_df[self.data_interface.feature_names + [self.data_interface.outcome_name]] final_cfs_df[self.data_interface.outcome_name] = \ final_cfs_df[self.data_interface.outcome_name].round(self.outcome_precision) self.cfs_preds = final_cfs_df[[self.data_interface.outcome_name]].values self.final_cfs = final_cfs_df[self.data_interface.feature_names].values else: final_cfs_df = None self.cfs_preds = None self.cfs_pred_scores = None self.final_cfs = None else: final_cfs_df = None self.cfs_preds = None self.cfs_pred_scores = None self.final_cfs = None test_instance_df = self.data_interface.prepare_query_instance(query_instance) test_instance_df[self.data_interface.outcome_name] = \ np.array(np.round(self.get_model_output_from_scores((test_pred,)), self.outcome_precision)) # post-hoc operation on continuous features to enhance sparsity - only for public data if posthoc_sparsity_param is not None and posthoc_sparsity_param > 0 and \ self.final_cfs is not None and 'data_df' in self.data_interface.__dict__: final_cfs_df_sparse = final_cfs_df.copy() final_cfs_df_sparse = self.do_posthoc_sparsity_enhancement(final_cfs_df_sparse, test_instance_df, posthoc_sparsity_param, posthoc_sparsity_algorithm, limit_steps_ls) elif self.final_cfs is not None: final_cfs_df_sparse = final_cfs_df.copy() else: final_cfs_df_sparse = None self.elapsed = timeit.default_timer() - start_time m, s = divmod(self.elapsed, 60) # decoding to original label test_instance_df, final_cfs_df, final_cfs_df_sparse = \ self.decode_to_original_labels(test_instance_df, final_cfs_df, final_cfs_df_sparse) if final_cfs_df is not None: if verbose: print('Diverse Counterfactuals found! total time taken: %02d' % m, 'min %02d' % s, 'sec') else: if self.total_cfs_found == 0: print('No Counterfactuals found for the given configuration, perhaps try with different parameters...', '; total time taken: %02d' % m, 'min %02d' % s, 'sec') else: print('Only %d (required %d) ' % (self.total_cfs_found, self.total_CFs), 'Diverse Counterfactuals found for the given configuration, perhaps try with different parameters...', '; total time taken: %02d' % m, 'min %02d' % s, 'sec') desired_class_param = self.decode_model_output(pd.Series(self.target_cf_class))[0] \ if hasattr(self, 'target_cf_class') else desired_class return exp.CounterfactualExamples(data_interface=self.data_interface, final_cfs_df=final_cfs_df, test_instance_df=test_instance_df, final_cfs_df_sparse=final_cfs_df_sparse, posthoc_sparsity_param=posthoc_sparsity_param, desired_class=desired_class_param, desired_range=desired_range, model_type=self.model.model_type)
[docs] def get_samples(self, fixed_features_values, feature_range, sampling_random_seed, sampling_size): # first get required parameters precisions = self.data_interface.get_decimal_precisions(output_type="dict") if sampling_random_seed is not None: random.seed(sampling_random_seed) samples = [] for feature in self.data_interface.feature_names: if feature in fixed_features_values: sample = [fixed_features_values[feature]]*sampling_size elif feature in self.data_interface.continuous_feature_names: low = feature_range[feature][0] high = feature_range[feature][1] sample = self.get_continuous_samples( low, high, precisions[feature], size=sampling_size, seed=sampling_random_seed) else: if sampling_random_seed is not None: random.seed(sampling_random_seed) sample = random.choices(feature_range[feature], k=sampling_size) samples.append(sample) samples = pd.DataFrame(dict(zip(self.data_interface.feature_names, samples))) return samples
[docs] def get_continuous_samples(self, low, high, precision, size=1000, seed=None): if seed is not None: np.random.seed(seed) if precision == 0: result = np.random.randint(low, high+1, size).tolist() result = [float(r) for r in result] else: result = np.random.uniform(low, high+(10**-precision), size) result = [round(r, precision) for r in result] return result