Source code for dice_ml.diverse_counterfactuals

import copy
import json
import math

import numpy as np
import pandas as pd

from dice_ml.constants import ModelTypes, _SchemaVersions
from dice_ml.utils.serialize import DummyDataInterface


class _DiverseCFV1SchemaConstants:
    DATA_INTERFACE = 'data_interface'
    MODEL_TYPE = 'model_type'
    DESIRED_CLASS = 'desired_class'
    DESIRED_RANGE = 'desired_range'
    TEST_INSTANCE_DF = 'test_instance_df'
    FINAL_CFS_DF = 'final_cfs_df'


class _DiverseCFV2SchemaConstants:
    DATA_INTERFACE = 'data_interface'
    MODEL_TYPE = 'model_type'
    DESIRED_CLASS = 'desired_class'
    DESIRED_RANGE = 'desired_range'
    FEATURE_NAMES_INCLUDING_TARGET = 'feature_names_including_target'
    FEATURE_NAMES = 'feature_names'
    TEST_INSTANCE_LIST = 'test_instance_list'
    FINAL_CFS_LIST = 'final_cfs_list'


[docs]def json_converter(obj): """ Helper function to convert object to json. """ try: return obj.to_json() except AttributeError: if isinstance(obj, np.generic): return obj.item() return obj.__dict__
[docs]class CounterfactualExamples: """A class to store and visualize the resulting counterfactual explanations.""" def __init__(self, data_interface=None, final_cfs_df=None, test_instance_df=None, final_cfs_df_sparse=None, posthoc_sparsity_param=0, desired_range=None, desired_class="opposite", model_type=ModelTypes.Classifier): self.data_interface = data_interface self.final_cfs_df = final_cfs_df self.test_instance_df = test_instance_df self.final_cfs_df_sparse = final_cfs_df_sparse self.model_type = model_type self.desired_class = desired_class self.desired_range = desired_range self.final_cfs_list = None self.posthoc_sparsity_param = posthoc_sparsity_param # might be useful for future additions self.test_pred = self.test_instance_df[self.data_interface.outcome_name].iat[0] if model_type == ModelTypes.Classifier: if desired_class == "opposite": self.new_outcome = 1.0 - round(self.test_pred) else: self.new_outcome = desired_class elif model_type == ModelTypes.Regressor: self.new_outcome = desired_range def __eq__(self, other_counterfactual_example): if isinstance(other_counterfactual_example, CounterfactualExamples): return self.desired_class == other_counterfactual_example.desired_class and \ self.desired_range == other_counterfactual_example.desired_range and \ self.model_type == other_counterfactual_example.model_type and \ (self.final_cfs_df is None) == \ (other_counterfactual_example.final_cfs_df is None) and \ (self.final_cfs_df_sparse is None) == \ (other_counterfactual_example.final_cfs_df_sparse is None) return False def _dump_output(self, content, show_only_changes=False, is_notebook_console=False): if is_notebook_console: self.display_df(content, show_only_changes=show_only_changes) else: assert isinstance(content, pd.DataFrame), "Expecting a pandas dataframe" self.print_list(content.values.tolist(), show_only_changes=show_only_changes) def _visualize_internal(self, display_sparse_df=True, show_only_changes=False, is_notebook_console=False): if self.final_cfs_df is not None and len(self.final_cfs_df) > 0: if self.posthoc_sparsity_param is None: print('\nCounterfactual set (new outcome: {0})'.format(self.new_outcome)) self._dump_output(content=self.final_cfs_df, show_only_changes=show_only_changes, is_notebook_console=is_notebook_console) elif hasattr(self.data_interface, 'data_df') and \ display_sparse_df is True and self.final_cfs_df_sparse is not None: # CFs print('\nDiverse Counterfactual set (new outcome: {0})'.format(self.new_outcome)) self._dump_output(content=self.final_cfs_df_sparse, show_only_changes=show_only_changes, is_notebook_console=is_notebook_console) elif hasattr(self.data_interface, 'data_df') and \ display_sparse_df is True and self.final_cfs_df_sparse is None: print('\nPlease specify a valid posthoc_sparsity_param to perform sparsity correction.. ', 'displaying Diverse Counterfactual set without', 'sparsity correction (new outcome : {})'.format(self.new_outcome)) self._dump_output(content=self.final_cfs_df, show_only_changes=show_only_changes, is_notebook_console=is_notebook_console) elif not hasattr(self.data_interface, 'data_df'): # for private data print('\nDiverse Counterfactual set without sparsity correction since only metadata about each', ' feature is available (new outcome: {}'.format(self.new_outcome)) self._dump_output(content=self.final_cfs_df, show_only_changes=show_only_changes, is_notebook_console=is_notebook_console) else: # CFs print('\nDiverse Counterfactual set without sparsity correction (new outcome: ', self.new_outcome) self._dump_output(content=self.final_cfs_df, show_only_changes=show_only_changes, is_notebook_console=is_notebook_console) else: print('\nNo counterfactuals found!')
[docs] def visualize_as_dataframe(self, display_sparse_df=True, show_only_changes=False): from IPython.display import display # original instance print('Query instance (original outcome : {0})'.format(self.test_pred)) display(self.test_instance_df) # works only in Jupyter notebook self._visualize_internal(display_sparse_df=display_sparse_df, show_only_changes=show_only_changes, is_notebook_console=True)
[docs] def display_df(self, df, show_only_changes): from IPython.display import display if show_only_changes is False: display(df) # works only in Jupyter notebook else: newdf = df.values.tolist() org = self.test_instance_df.values.tolist()[0] for ix in range(df.shape[0]): for jx in range(len(org)): if not isinstance(newdf[ix][jx], str): if math.isclose(newdf[ix][jx], org[jx], rel_tol=abs(org[jx]/10000)): newdf[ix][jx] = '-' else: newdf[ix][jx] = str(newdf[ix][jx]) else: if newdf[ix][jx] == org[jx]: newdf[ix][jx] = '-' else: newdf[ix][jx] = str(newdf[ix][jx]) display(pd.DataFrame(newdf, columns=df.columns, index=df.index)) # works only in Jupyter notebook
[docs] def visualize_as_list(self, display_sparse_df=True, show_only_changes=False): # original instance print('Query instance (original outcome : {})'.format(self.test_pred)) print(self.test_instance_df.values.tolist()[0]) self._visualize_internal(display_sparse_df=display_sparse_df, show_only_changes=show_only_changes, is_notebook_console=False)
[docs] def print_list(self, li, show_only_changes): if show_only_changes is False: for ix in range(len(li)): print(li[ix]) else: newli = copy.deepcopy(li) org = self.test_instance_df.values.tolist()[0] for ix in range(len(newli)): for jx in range(len(newli[ix])): if math.isclose(newli[ix][jx], org[jx], rel_tol=abs(org[jx]/10000)): newli[ix][jx] = '-' print(newli[ix])
[docs] def to_json(self, serialization_version): if self.final_cfs_df_sparse is not None: df = self.final_cfs_df_sparse else: df = self.final_cfs_df dummy_data_interface = None if hasattr(self.data_interface, 'data_df'): dummy_data_interface = DummyDataInterface( self.data_interface.outcome_name, "dummy_data") else: dummy_data_interface = DummyDataInterface( self.data_interface.outcome_name) if serialization_version == _SchemaVersions.V1: obj = { _DiverseCFV1SchemaConstants.DATA_INTERFACE: dummy_data_interface, _DiverseCFV1SchemaConstants.MODEL_TYPE: self.model_type, _DiverseCFV1SchemaConstants.DESIRED_CLASS: self.desired_class, _DiverseCFV1SchemaConstants.DESIRED_RANGE: self.desired_range, _DiverseCFV1SchemaConstants.TEST_INSTANCE_DF: self.test_instance_df, _DiverseCFV1SchemaConstants.FINAL_CFS_DF: df } return json.dumps(obj, default=json_converter) elif serialization_version == _SchemaVersions.V2: dummy_data_interface_dict = dummy_data_interface.to_json() feature_names_including_target = self.test_instance_df.columns.tolist() feature_names = self.test_instance_df.columns.tolist().copy() feature_names.remove(dummy_data_interface.outcome_name) test_instance_df_as_list = self.test_instance_df.values.tolist() if df is not None: final_cfs_df_as_as_list = df.values.tolist() else: final_cfs_df_as_as_list = None alternate_obj = { _DiverseCFV2SchemaConstants.TEST_INSTANCE_LIST: test_instance_df_as_list, _DiverseCFV2SchemaConstants.FINAL_CFS_LIST: final_cfs_df_as_as_list, _DiverseCFV2SchemaConstants.DATA_INTERFACE: dummy_data_interface_dict, _DiverseCFV2SchemaConstants.FEATURE_NAMES: feature_names, _DiverseCFV2SchemaConstants.FEATURE_NAMES_INCLUDING_TARGET: feature_names_including_target, _DiverseCFV2SchemaConstants.MODEL_TYPE: self.model_type, _DiverseCFV2SchemaConstants.DESIRED_CLASS: self.desired_class, _DiverseCFV2SchemaConstants.DESIRED_RANGE: self.desired_range } return json.dumps(alternate_obj, default=json_converter)
[docs] @staticmethod def from_json(cf_example_json_str): cf_example_dict = json.loads(cf_example_json_str) if cf_example_dict.get(_DiverseCFV1SchemaConstants.TEST_INSTANCE_DF) is not None: test_instance_df = pd.read_json(cf_example_dict[ _DiverseCFV1SchemaConstants.TEST_INSTANCE_DF]) if cf_example_dict[_DiverseCFV1SchemaConstants.FINAL_CFS_DF] is not None: cfs_df = pd.read_json(cf_example_dict[_DiverseCFV1SchemaConstants.FINAL_CFS_DF]) else: cfs_df = None # Creating the object for dummy_data_interface dummy_data_interface = DummyDataInterface(**cf_example_dict[_DiverseCFV1SchemaConstants.DATA_INTERFACE]) return CounterfactualExamples(data_interface=dummy_data_interface, test_instance_df=test_instance_df, final_cfs_df=cfs_df, final_cfs_df_sparse=cfs_df, posthoc_sparsity_param=None, desired_class=cf_example_dict[_DiverseCFV1SchemaConstants.DESIRED_CLASS], desired_range=cf_example_dict[_DiverseCFV1SchemaConstants.DESIRED_RANGE], model_type=cf_example_dict[_DiverseCFV1SchemaConstants.MODEL_TYPE]) else: final_cfs_list = cf_example_dict[_DiverseCFV2SchemaConstants.FINAL_CFS_LIST] test_instance_list = cf_example_dict[_DiverseCFV2SchemaConstants.TEST_INSTANCE_LIST] feature_names_including_target = cf_example_dict[_DiverseCFV2SchemaConstants.FEATURE_NAMES_INCLUDING_TARGET] data_interface = cf_example_dict[_DiverseCFV2SchemaConstants.DATA_INTERFACE] desired_class = cf_example_dict[_DiverseCFV2SchemaConstants.DESIRED_CLASS] desired_range = cf_example_dict[_DiverseCFV2SchemaConstants.DESIRED_RANGE] model_type = cf_example_dict[_DiverseCFV2SchemaConstants.MODEL_TYPE] test_instance_df = pd.DataFrame(data=test_instance_list, columns=feature_names_including_target) if final_cfs_list is not None: cfs_df = pd.DataFrame(data=final_cfs_list, columns=feature_names_including_target) else: cfs_df = None # Creating the object for dummy_data_interface dummy_data_interface = DummyDataInterface(**data_interface) return CounterfactualExamples(data_interface=dummy_data_interface, test_instance_df=test_instance_df, final_cfs_df=cfs_df, final_cfs_df_sparse=cfs_df, posthoc_sparsity_param=None, desired_class=desired_class, desired_range=desired_range, model_type=model_type)