Source code for dowhy.gcm.independence_test.regression

""" Regression based (conditional) independence test. Testing independence via regression, i.e. if a variable has
information about another variable, then they are dependent.
"""
from typing import Callable, List, Optional, Union

import numpy as np
from joblib import Parallel, delayed
from sklearn.kernel_approximation import Nystroem
from sklearn.model_selection import KFold
from sklearn.preprocessing import scale

import dowhy.gcm.config as config
from dowhy.gcm.stats import estimate_ftest_pvalue, quantile_based_fwer
from dowhy.gcm.util.general import auto_apply_encoders, auto_fit_encoders, set_random_seed, shape_into_2d


[docs]def regression_based( X: np.ndarray, Y: np.ndarray, Z: Optional[np.ndarray] = None, max_num_components_all_inputs: int = 40, k_folds: int = 3, p_value_adjust_func: Callable[[Union[np.ndarray, List[float]]], float] = quantile_based_fwer, max_samples_per_fold: int = -1, n_jobs: Optional[int] = None, ) -> float: """The main idea is that if X and Y are dependent, then X should help in predicting Y. If there is no dependency, then X should not help. When Z is given, the idea remains the same, but here X and Y are conditionally independent given Z if X does not help in predicting Y when knowing Z. This is, X has not additional information about Y given Z. In the pairwise case (Z is not given), the performances (in terms of squared error) between predicting Y based on X and predicting Y by returning its mean (the best estimator without any inputs) are compared. Note that categorical inputs are transformed via encoders. Here, we use the :class:`sklearn.kernel_approximation.Nystroem` approach to approximate a kernel map of the inputs that serves as new input features. These new features allow to model complex non-linear relationships. In case of categorical data, we first apply an encoding and then map it into the kernel feature space. Afterwards, we use linear regression as a prediction model based on the non-linear input features. The idea is then to apply a f-test to see if the additional input features significantly help in predicting the target or not. This test is motivated by Granger causality, the approx_kernel_based test and the following paper: K Chalupka, P Perona, F. Eberhardt. *Fast Conditional Independence Test for Vector Variables with Large Sample Sizes*. arXiv:1804.02747, 2018. :param X: Input data for X. :param Y: Input data for Y. :param Z: Input data for Z. The set of variables to (optionally) condition on. :param max_num_components_all_inputs: Maximum number of kernel features when combining X and Z. If Z is not given, it will be replaced with an empty array. If Z is given, half of the number is used to generate features for Z. Note that the actual number of components is 1/10 of the number of samples, but at most max_num_components_all_inputs. :param num_target_components_factor: The factor indicates how many components are used for the target variable. This is, num_target_components_factor * dimension of the target many components. :param k_folds: Number of folds for training and test set. This equals the number of estimated p-values, which get adjusted by the p_value_adjust_func. :param p_value_adjust_func: A callable that expects a numpy array of multiple p-values and returns one p-value. This is typically used a family wise error rate control method. :param max_samples_per_fold: Maximum number of samples used per fold for training and testing. If -1, it uses all data. :param n_jobs: Number of parallel jobs for the evaluation of the folds. :return: The p-value for the null hypothesis that X and Y are independent given Z. If Z is not given, then for the hypothesis that X and Y are independent. """ n_jobs = config.default_n_jobs if n_jobs is None else n_jobs if max_num_components_all_inputs < 2: raise ValueError( "Need at least two components for all inputs, but only %d were given!" % max_num_components_all_inputs ) X, Y = shape_into_2d(X, Y) if max_samples_per_fold == -1: max_samples_per_fold = X.shape[0] # Take the lower dimensional variable as target. if X.shape[1] < Y.shape[1]: X, Y = Y, X X = scale(auto_apply_encoders(X, auto_fit_encoders(X), Y)) Y = scale(auto_apply_encoders(Y, auto_fit_encoders(Y))) if Z is not None: Z = shape_into_2d(Z) Z = scale(auto_apply_encoders(Z, auto_fit_encoders(Z))) def estimate_p_value(training_indices, test_indices, parallel_random_seed: int) -> float: set_random_seed(parallel_random_seed) adaptive_num_components = max(min(max_num_components_all_inputs, training_indices.shape[0] // 3), 2) if X.shape[0] > max_samples_per_fold: training_indices = training_indices[:max_samples_per_fold] test_indices = test_indices[:max_samples_per_fold] if Z is not None: transformer_input_all = Nystroem(n_components=adaptive_num_components) transformer_input_Z = Nystroem(n_components=adaptive_num_components // 2) XZ = np.column_stack([X, Z]) training_all_inputs = transformer_input_all.fit_transform(XZ[training_indices]) training_Z = transformer_input_Z.fit_transform(Z[training_indices]) test_all_inputs = transformer_input_all.transform(XZ[test_indices]) test_Z = transformer_input_Z.transform(Z[test_indices]) else: transformer_input_all = Nystroem(n_components=adaptive_num_components) training_all_inputs = transformer_input_all.fit_transform(X[training_indices]) test_all_inputs = transformer_input_all.transform(X[test_indices]) training_Z = test_Z = np.array([]).reshape(1, -1) training_target = Y[training_indices] test_target = Y[test_indices] return estimate_ftest_pvalue( training_all_inputs, training_Z, training_target, test_all_inputs, test_Z, test_target, ) random_seeds = np.random.randint(np.iinfo(np.int32).max, size=k_folds) p_values = Parallel(n_jobs=n_jobs)( delayed(estimate_p_value)(training_indices, test_indices, int(random_seed)) for (training_indices, test_indices), random_seed in zip( KFold(n_splits=k_folds, shuffle=True).split(X), random_seeds ) ) return p_value_adjust_func(p_values)