import copy
import math
import numpy as np
import pandas as pd
import logging
import pdb
from dowhy.causal_refuter import CausalRefutation
from dowhy.causal_refuter import CausalRefuter
from dowhy.causal_estimator import CausalEstimator,CausalEstimate
from sklearn.linear_model import LinearRegression
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
[docs]class DummyOutcomeRefuter(CausalRefuter):
"""Refute an estimate by replacing the outcome with a randomly generated variable.
Supports additional parameters that can be specified in the refute_estimate() method.
- 'num_simulations': int, CausalRefuter.DEFAULT_NUM_SIMULATIONS by default
The number of simulations to be run
- 'transformation_list': list, DummyOutcomeRefuter.DEFAULT_TRANSFORMATION
The transformation_list is a list of actions to be performed to obtain the outcome. The actions are of the following types:
* function argument: function pd.Dataframe -> np.ndarray
It takes in a function that takes the input data frame as the input and outputs the outcome
variable. This allows us to create an output varable that only depends on the covariates and does not depend
on the treatment variable.
* string argument
- Currently it supports some common functions like
1. Linear Regression
2. K Nearest Neighbours
3. Support Vector Machine
4. Neural Network
5. Random Forest
- On the other hand, there are other options:
1. Permute
This permutes the rows of the outcome, disassociating any effect of the treatment on the outcome.
2. Noise
This adds white noise to the outcome with white noise, reducing any causal relationship with the treatment.
3. Zero
It replaces all the values in the outcome by zero
The transformation_list is of the following form:
* If the function pd.Dataframe -> np.ndarray is already defined.
[(func,func_params),('permute', {'permute_fraction': val} ), ('noise', {'std_dev': val} )]
* If a function from the above list is used
[('knn',{'n_neighbors':5}), ('permute', {'permute_fraction': val} ), ('noise', {'std_dev': val} )]
- 'required_variables': int, list, bool, True by default
The inputs are either an integer value, list or bool.
1. An integer argument refers to how many variables will be used for estimating the value of the outcome
2. A list explicitly refers to which variables will be used to estimate the outcome
Furthermore, it gives the ability to explictly select or deselect the covariates present in the estimation of the
outcome. This is done by either adding or explicitly removing variables from the list as shown below:
For example:
We need to pass required_variables = [W0,W1] is we want W0 and W1.
We need to pass required_variables = [-W0,-W1] if we want all variables excluding W0 and W1.
3. If the value is True, we wish to include all variables to estimate the value of the outcome. A False value is INVALID
and will result in an error.
Note:
These inputs are fed to the function for estimating the outcome variable. The same set of required_variables is used for each
instance of an internal function.
- 'bucket_size_scale_factor': float, DummyOutcomeRefuter.DEFAULT_BUCKET_SCALE_FACTOR by default
For continuous data, the scale factor helps us scale the size of the bucket used on the data
Note:
The number of buckets is given by:
(max value - min value)
------------------------
(scale_factor * std_dev)
- 'min_data_point_threshold': int, DummyOutcomeRefuter.MIN_DATA_POINT_THRESHOLD by default
The minimum number of data points for an estimator to run.
"""
# The currently supported estimators
SUPPORTED_ESTIMATORS = ["linear_regression", "knn", "svm", "random_forest", "neural_network"]
# The default standard deviation for noise
DEFAULT_STD_DEV = 0.1
# The default scaling factor to determine the bucket size
DEFAULT_BUCKET_SCALE_FACTOR = 0.5
# The minimum number of points for the estimator to run
MIN_DATA_POINT_THRESHOLD = 30
# The Default Transformation, when no arguments are given, or if the number of data points are insufficient for an estimator
DEFAULT_TRANSFORMATION = [("zero",""),("noise", {'std_dev': 1} )]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._num_simulations = kwargs.pop("num_simulations", CausalRefuter.DEFAULT_NUM_SIMULATIONS)
self._transformation_list = kwargs.pop("transformation_list", DummyOutcomeRefuter.DEFAULT_TRANSFORMATION)
self._bucket_size_scale_factor = kwargs.pop("bucket_size_scale_factor", DummyOutcomeRefuter.DEFAULT_BUCKET_SCALE_FACTOR)
self._min_data_point_threshold = kwargs.pop("min_data_point_threshold", DummyOutcomeRefuter.MIN_DATA_POINT_THRESHOLD)
required_variables = kwargs.pop("required_variables", True)
if required_variables is False:
raise ValueError("The value of required_variables cannot be False")
self._chosen_variables = self.choose_variables(required_variables)
if 'logging_level' in kwargs:
logging.basicConfig(level=kwargs['logging_level'])
else:
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
[docs] def refute_estimate(self):
# We need to change the identified estimand
# We thus, make a copy. This is done as we don't want
# to change the original DataFrame
identified_estimand = copy.deepcopy(self._target_estimand)
identified_estimand.outcome_variable = ["dummy_outcome"]
self.logger.info("Refutation over {} simulated datasets".format(self._num_simulations) )
self.logger.info("The transformation passed: {}".format(self._transformation_list) )
simulation_results = []
refute_list = []
no_estimator = self.check_for_estimator()
for _ in range( self._num_simulations ):
estimates = []
if no_estimator:
# We set X_train = 0 and outcome_train to be 0
validation_df = self._data
X_train = None
outcome_train = None
X_validation = validation_df[self._chosen_variables].values
outcome_validation = validation_df['y'].values
# Get the final outcome, after running through all the values in the transformation list
outcome_validation = self.process_data(X_train, outcome_train, X_validation, outcome_validation, self._transformation_list)
else:
groups = self.preprocess_data_by_treatment()
for key_train, _ in groups:
X_train = groups.get_group(key_train)[self._chosen_variables].values
outcome_train = groups.get_group(key_train)['y'].values
validation_df = []
transformation_list = self._transformation_list
for key_validation, _ in groups:
if key_validation != key_train:
validation_df.append(groups.get_group(key_validation))
validation_df = pd.concat(validation_df)
X_validation = validation_df[self._chosen_variables].values
outcome_validation = validation_df['y'].values
# If the number of data points is too few, run the default transformation: [("zero",""),("noise", {'std_dev':1} )]
if X_train.shape[0] <= self._min_data_point_threshold:
transformation_list = DummyOutcomeRefuter.DEFAULT_TRANSFORMATION
outcome_validation = self.process_data(X_train, outcome_train, X_validation, outcome_validation, transformation_list)
new_data = validation_df.assign(dummy_outcome=outcome_validation)
new_estimator = CausalEstimator.get_estimator_object(new_data, identified_estimand, self._estimate)
new_effect = new_estimator.estimate_effect()
estimates.append(new_effect.value)
simulation_results.append(estimates)
# We convert to ndarray for ease in indexing
# The data is of the form
# sim1: cat1 cat2 ... catn
# sim2: cat1 cat2 ... catn
simulation_results = np.array(simulation_results)
# Note: We hardcode the estimate value to ZERO as we want to check if it falls in the distribution of the refuter
# Ideally we should expect that ZERO should fall in the distribution of the effect estimates as we have severed any causal
# relationship between the treatment and the outcome.
dummy_estimator = CausalEstimate(
estimate = 0,
target_estimand =self._estimate.target_estimand,
realized_estimand_expr=self._estimate.realized_estimand_expr)
if no_estimator:
refute = CausalRefutation(
self._estimate.value,
np.mean(simulation_results),
refutation_type="Refute: Use a Dummy Outcome"
)
refute.add_significance_test_results(
self.test_significance(dummy_estimator, simulation_results)
)
refute_list.append(refute)
else:
for category in simulation_results.shape[1]:
refute = CausalRefutation(
self._estimate.value,
np.mean(simulation_results[:, category]),
refutation_type="Refute: Use a Dummy Outcome"
)
refute.add_significance_test_results(
self.test_significance(dummy_estimator, simulation_results[:, category])
)
refute_list.append(refute)
return refute_list
[docs] def process_data(self, X_train, outcome_train, X_validation, outcome_validation, transformation_list):
"""
We process the data by first training the estimators in the transformation_list on X_train and outcome_train.
We then apply the estimators on X_validation to get the value of the dummy outcome, which we store in outcome_validation.
- 'X_train': np.ndarray
The data of the covariates which is used to train an estimator. It corresponds to the data of a single category of the treatment
- 'outcome_train': np.ndarray
This is used to hold the intermediate values of the outcome variable in the transformation list
For Example:
[ ('permute', {'permute_fraction': val} ), (func,func_params)]
The value obtained from permutation is used as an input for the custom estimator.
- 'X_validation': np.ndarray
The data of the covariates that is fed to a trained estimator to generate a dummy outcome
- 'outcome_validation': np.ndarray
This variable stores the dummy_outcome generated by the transformations
- 'transformation_list': np.ndarray
The list of transformations on the outcome data required to produce a dummy outcome
"""
for action, func_args in transformation_list:
if callable(action):
estimator = action(X_train, outcome_train, **func_args)
outcome_train = estimator(X_train)
outcome_validation = estimator(X_validation)
elif action in DummyOutcomeRefuter.SUPPORTED_ESTIMATORS:
estimator = self._estimate_dummy_outcome(action, X_train, outcome_train, **func_args)
outcome_train = estimator(X_train)
outcome_validation = estimator(X_validation)
elif action == 'noise':
if X_train is not None:
outcome_train = self._noise(outcome_train, **func_args)
outcome_validation = self._noise(outcome_validation, **func_args)
elif action == 'permute':
if X_train is not None:
outcome_train = self._permute(outcome_train, **func_args)
outcome_validation = self._permute(outcome_validation, **func_args)
elif action =='zero':
if X_train is not None:
outcome_train = np.zeros(outcome_train.shape)
outcome_validation = np.zeros(outcome_validation.shape)
return outcome_validation
[docs] def check_for_estimator(self):
"""
This function checks if there is an estimator in the transformation list.
If there are no estimators, it allows us to optimize processing by skipping the
data preprocessing and running the transformations on the whole dataset.
"""
for action,_ in self._transformation_list:
if callable(action) or action in DummyOutcomeRefuter.SUPPORTED_ESTIMATORS:
return False
return True
[docs] def preprocess_data_by_treatment(self):
"""
This function groups data based on the data type of the treatment.
Expected variable types supported for the treatment:
* bool
* pd.categorical
* float
* int
returns pandas.core.groupby.generic.DataFrameGroupBy
"""
assert len(self._treatment_name) == 1, "At present, DoWhy supports a simgle treatment variable"
treatment_variable_name = self._target_estimand.treatment_name[0] # As we only have a single treatment
variable_type = self._data[treatment_variable_name].dtypes
if bool == variable_type:
groups = self._data.groupby(treatment_variable_name)
return groups
# We use string arguments to account for both 32 and 64 bit varaibles
elif 'float' in variable_type.name or \
'int' in variable_type.name:
# action for continuous variables
data = self._data
std_dev = data[treatment_variable_name].std()
num_bins = ( data.max() - data.min() )/ (self._bucket_size_scale_factor * std_dev)
data['bins'] = pd.cut(data[treatment_variable_name], num_bins)
groups = data.groupby('bins')
data.drop('bins')
return groups
elif 'categorical' in variable_type.name:
# Action for categorical variables
groups = data.groupby(treatment_variable_name)
groups = data.groupby('bins')
return groups
else:
raise ValueError("Passed {}. Expected bool, float, int or categorical.".format(variable_type.name))
def _estimate_dummy_outcome(self, action, X_train, outcome, **func_args):
"""
A function that takes in any sklearn estimator and returns a trained estimator
- 'action': str
The sklearn estimator to be used.
- 'X_train': np.ndarray
The variable used to estimate the value of outcome.
- 'outcome': np.ndarray
The variable which we wish to estimate.
- 'func_args': variable length keyworded argument
The parameters passed to the estimator.
"""
estimator = self._get_regressor_object(action, **func_args)
X = X_train
y = outcome
estimator = estimator.fit(X, y)
return estimator.predict
def _get_regressor_object(self, action, **func_args):
"""
Return a sklearn estimator object based on the estimator and corresponding parameters
- 'action': str
The sklearn estimator used.
- 'func_args': variable length keyworded argument
The parameters passed to the sklearn estimator.
"""
if action == "linear_regression":
return LinearRegression(**func_args)
elif action == "knn":
return KNeighborsRegressor(**func_args)
elif action == "svm":
return SVR(**func_args)
elif action == "random_forest":
return RandomForestRegressor(**func_args)
elif action == "neural_network":
return MLPRegressor(**func_args)
else:
raise ValueError("The function: {} is not supported by dowhy at the moment.".format(action))
def _permute(self, outcome, permute_fraction):
'''
If the permute_fraction is 1, we permute all the values in the outcome.
Otherwise we make use of the Fisher Yates shuffle.
Refer to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle for more details.
'outcome': np.ndarray
The outcome variable to be permuted.
'permute_fraction': float [0, 1]
The fraction of rows permuted.
'''
if permute_fraction == 1:
outcome = pd.DataFrame(outcome)
outcome.columns = ['y']
return outcome['y'].sample(frac=1).values
elif permute_fraction < 1:
permute_fraction /= 2 # We do this as every swap leads to two changes
changes = np.where( np.random.uniform(0,1,outcome.shape[0]) <= permute_fraction )[0] # As this is tuple containing a single element (array[...])
num_rows = outcome.shape[0]
for change in changes:
if change + 1 < num_rows:
index = np.random.randint(change+1,num_rows)
temp = outcome[change]
outcome[change] = outcome[index]
outcome[index] = temp
return outcome
else:
raise ValueError("The value of permute_fraction is {}. Which is greater than 1.".format(permute_fraction))
def _noise(self, outcome, std_dev):
"""
Add white noise with mean 0 and standard deviation = std_dev
- 'outcome': np.ndarray
The outcome variable, to which the white noise is added.
- 'std_dev': float
The standard deviation of the white noise.
"""
return outcome + np.random.normal(scale=std_dev,size=outcome.shape[0])