import random
from typing import Dict, Optional, Union
import numpy as np
from scipy.optimize import minimize
from sklearn.preprocessing import OneHotEncoder
from dowhy.gcm.util.catboost_encoder import CatBoostEncoder
[docs]def shape_into_2d(*args):
"""If necessary, shapes the numpy inputs into 2D matrices.
Example:
array([1, 2, 3]) -> array([[1], [2], [3]])
2 -> array([[2]])
:param args: The function expects numpy arrays as inputs and returns a reshaped (2D) version of them (if necessary).
:return: Reshaped versions of the input numpy arrays. For instance, given 1D inputs X, Y and Z, then
shape_into_2d(X, Y, Z) reshapes them into 2D and returns them. If an input is already 2D, it will not be
modified and returned as it is.
"""
def shaping(X: np.ndarray):
if X.ndim < 2:
return np.column_stack([X])
elif X.ndim > 2:
raise ValueError("Cannot reshape a %dD array into a 2D array!" % X.ndim)
return X
result = [shaping(x) for x in args]
if len(result) == 1:
return result[0]
else:
return result
[docs]def set_random_seed(random_seed: int) -> None:
"""Sets random seed in numpy and the random module.
:param random_seed: Random see for the numpy and random module.
:return: None
"""
np.random.seed(random_seed)
random.seed(random_seed)
[docs]def auto_fit_encoders(
X: np.ndarray, Y: Optional[np.ndarray] = None, catboost_threshold: int = 7
) -> Dict[int, Union[OneHotEncoder, CatBoostEncoder]]:
if Y is None:
return fit_one_hot_encoders(X)
X = shape_into_2d(X)
total_num_categories = 0
for column in range(X.shape[1]):
if is_categorical(X[:, column]):
total_num_categories += len(np.unique(X[:, column]))
if total_num_categories > catboost_threshold:
return fit_catboost_encoders(X, Y)
else:
return fit_one_hot_encoders(X)
[docs]def auto_apply_encoders(
X: np.ndarray, encoder_map: Dict[int, Union[OneHotEncoder, CatBoostEncoder]], Y: Optional[np.ndarray] = None
) -> np.ndarray:
X = shape_into_2d(X)
if not encoder_map:
return X
if isinstance(list(encoder_map.values())[0], OneHotEncoder):
return apply_one_hot_encoding(X, encoder_map)
else:
return apply_catboost_encoding(X, encoder_map, Y)
[docs]def fit_one_hot_encoders(X: np.ndarray) -> Dict[int, OneHotEncoder]:
"""Fits one-hot encoders to each categorical column in X. A categorical input needs to be a string, i.e. a
categorical column consists only of strings.
:param X: Input data matrix.
:return: Dictionary that maps a column index to a scikit OneHotEncoder.
"""
X = shape_into_2d(X)
one_hot_encoders = {}
for column in range(X.shape[1]):
if is_categorical(X[:, column]):
one_hot_encoders[column] = OneHotEncoder(handle_unknown="ignore")
one_hot_encoders[column].fit(X[:, column].reshape(-1, 1))
return one_hot_encoders
[docs]def apply_one_hot_encoding(X: np.ndarray, one_hot_encoder_map: Dict[int, OneHotEncoder]) -> np.ndarray:
X = shape_into_2d(X)
if not one_hot_encoder_map:
return X
one_hot_features = []
for column in range(X.shape[1]):
if column in one_hot_encoder_map:
one_hot_features.append(one_hot_encoder_map[column].transform(X[:, column].reshape(-1, 1)).toarray())
else:
one_hot_features.append(X[:, column].reshape(-1, 1))
return np.hstack(one_hot_features).astype(float)
[docs]def fit_catboost_encoders(X: np.ndarray, Y: np.ndarray) -> Dict[int, CatBoostEncoder]:
X = shape_into_2d(X)
catboost_encoders = {}
for column in range(X.shape[1]):
if is_categorical(X[:, column]):
catboost_encoders[column] = CatBoostEncoder()
catboost_encoders[column].fit(X[:, column], Y)
return catboost_encoders
[docs]def apply_catboost_encoding(
X: np.ndarray, catboost_encoder_map: Dict[int, CatBoostEncoder], Y: Optional[np.ndarray] = None
) -> np.ndarray:
X = shape_into_2d(X)
if not catboost_encoder_map:
return X
one_hot_features = []
for column in range(X.shape[1]):
if column in catboost_encoder_map:
one_hot_features.append(catboost_encoder_map[column].transform(X[:, column], Y).reshape(-1, 1))
else:
one_hot_features.append(X[:, column].reshape(-1, 1))
return np.hstack(one_hot_features).astype(float)
[docs]def is_categorical(X: np.ndarray) -> bool:
"""Checks if all of the given columns are categorical, i.e. either a string or a boolean. Only if all of the
columns are categorical, this method will return True. Alternatively, consider has_categorical for checking if any
of the columns is categorical.
Note: A np matrix with mixed data types might internally convert numeric columns to strings and vice versa. To
ensure that the given given data keeps the original data type, consider converting/initializing it with the dtype
'object'. For instance: np.array([[1, 'True', '0', 0.2], [3, 'False', '1', 2.3]], dtype=object)
:param X: Input array to check if all columns are categorical.
:return: True if all columns of the input are categorical, False otherwise.
"""
X = shape_into_2d(X)
status = True
for column in range(X.shape[1]):
if (isinstance(X[0, column], int) or isinstance(X[0, column], float)) and np.isnan(X[0, column]):
raise ValueError(
"Input contains NaN values! This is currently not supported. " "Consider imputing missing values."
)
status &= isinstance(X[0, column], str) or isinstance(X[0, column], bool) or isinstance(X[0, column], np.bool_)
if not status:
break
return status
[docs]def has_categorical(X: np.ndarray) -> bool:
"""Checks if any of the given columns are categorical, i.e. either a string or a boolean. If any of the columns
is categorical, this method will return True. Alternatively, consider is_categorical for checking if all columns are
categorical.
Note: A np matrix with mixed data types might internally convert numeric columns to strings and vice versa. To
ensure that the given given data keeps the original data type, consider converting/initializing it with the dtype
'object'. For instance: np.array([[1, 'True', '0', 0.2], [3, 'False', '1', 2.3]], dtype=object)
:param X: Input array to check if all columns are categorical.
:return: True if all columns of the input are categorical, False otherwise.
"""
X = shape_into_2d(X)
for column in range(X.shape[1]):
if is_categorical(X[:, column]):
return True
return False
[docs]def is_discrete(X: np.ndarray) -> bool:
"""Checks if all values in the given array are discrete.
:param X: Input array to check.
:return: True if all values in the input are discrete, False otherwise.
"""
return np.all(X == np.floor(X))
[docs]def setdiff2d(ar1: np.ndarray, ar2: np.ndarray, assume_unique: bool = False) -> np.ndarray:
"""This method generalizes numpy's setdiff1d to 2d, i.e., it compares vectors for arbitrary length. See
https://numpy.org/doc/stable/reference/generated/numpy.setdiff1d.html for more details."""
if ar1.ndim == ar2.ndim != 2:
raise ValueError("Only support 2D arrays!")
if ar1.shape[1] != ar2.shape[1]:
return ar1
dtype = {"names": ["f{}".format(i) for i in range(ar1.shape[1])], "formats": ar1.shape[1] * [ar1.dtype]}
if not ar1.flags["C_CONTIGUOUS"]:
ar1 = np.ascontiguousarray(ar1)
if not ar2.flags["C_CONTIGUOUS"]:
ar2 = np.ascontiguousarray(ar2)
return (
np.setdiff1d(ar1.view(dtype), ar2.view(dtype), assume_unique=assume_unique)
.view(ar1.dtype)
.reshape(-1, ar1.shape[1])
)
[docs]def means_difference(randomized_predictions: np.ndarray, baseline_values: np.ndarray) -> np.ndarray:
return np.mean(randomized_predictions).squeeze() - np.mean(baseline_values).squeeze()
[docs]def variance_of_deviations(randomized_predictions: np.ndarray, baseline_values: np.ndarray) -> np.ndarray:
# Using the negative value here seeing that the Shapley estimation evaluates v(S u {i}) - v(S) for a subset S. In
# case of variance, we have v(S u {i}) <= v(S), which would result in a negative contribution of players to the
# target quantity (here, variance).
return -np.var((randomized_predictions - baseline_values).squeeze())
[docs]def variance_of_matching_values(randomized_predictions: np.ndarray, baseline_values: np.ndarray) -> np.ndarray:
# Using the negative value here seeing that the Shapley estimation evaluates v(S u {i}) - v(S) for a subset S. In
# case of variance, we have v(S u {i}) <= v(S), which would result in a negative contribution of players to the
# target quantity (here, variance).
return -np.var((randomized_predictions == baseline_values).squeeze())