Source code for dowhy.gcm.stats

"""Functions in this module should be considered experimental, meaning there might be breaking API changes in the
future.
"""

from typing import Callable, List, Optional, Union

import numpy as np
from numpy.matlib import repmat
from scipy import stats
from sklearn.linear_model import LinearRegression

from dowhy.gcm.constant import EPS
from dowhy.gcm.util.general import shape_into_2d


[docs]def quantile_based_fwer( p_values: Union[np.ndarray, List[float]], p_values_scaling: Optional[np.ndarray] = None, quantile: float = 0.5 ) -> float: """Applies a quantile based family wise error rate (FWER) control to the given p-values. This is based on the approach described in: Meinshausen, N., Meier, L. and Buehlmann, P. (2009). p-values for high-dimensional regression. J. Amer. Statist. Assoc.104 1671–1681 :param p_values: A list or array of p-values. :param p_values_scaling: An optional list of scaling factors for each p-value. :param quantile: The quantile used for the p-value adjustment. By default, this is the median (0.5). :return: The p-value that lies on the quantile threshold. Note that this is the quantile based on scaled values p_values / quantile. """ if quantile <= 0 or abs(quantile - 1) >= 1: raise ValueError("The given quantile is %f, but it needs to be on (0, 1]!" % quantile) p_values = np.array(p_values) if p_values_scaling is None: p_values_scaling = np.ones(p_values.shape[0]) if p_values.shape != p_values_scaling.shape: raise ValueError("The p-value scaling array needs to have the same dimension as the given p-values.") p_values_scaling = p_values_scaling[~np.isnan(p_values)] p_values = p_values[~np.isnan(p_values)] p_values = p_values * p_values_scaling p_values[p_values > 1] = 1.0 if p_values.shape[0] == 1: return float(p_values[0]) else: return float(min(1.0, np.quantile(p_values / quantile, quantile)))
[docs]def marginal_expectation( prediction_method: Callable[[np.ndarray], np.ndarray], feature_samples: np.ndarray, baseline_samples: np.ndarray, baseline_feature_indices: List[int], return_averaged_results: bool = True, feature_perturbation: str = "randomize_columns_jointly", max_batch_size: int = -1, ) -> np.ndarray: """Estimates the marginal expectation for samples in baseline_noise_samples when randomizing features that are not part of baseline_feature_indices. This is, this function estimates y^i = E[Y | do(x^i_s)] := \\int_x_s' E[Y | x^i_s, x_s'] p(x_s') d x_s', where x^i_s is the i-th sample from baseline_noise_samples, s denotes the baseline_feature_indices and x_s' ~ X_s' denotes the randomized features that are not in s. For an approximation of the integral, the given prediction_method is evaluated multiple times for the same x^i_s, but different x_s' ~ X_s'. :param prediction_method: Prediction method of interest. This should expect a numpy array as input for making predictions. :param feature_samples: Samples from the joint distribution. These are used for randomizing the features that are not in baseline_feature_indices. :param baseline_samples: Samples for which the marginal expectation should be estimated. :param baseline_feature_indices: Column indices of the features in s. These values for these features are remain constant when estimating the expectation. :param return_averaged_results: If set to True, the expectation over all evaluated samples for the i-th baseline_noise_samples is returned. If set to False, all corresponding results for the i-th sample are returned. :param feature_perturbation: Type of feature permutation: 'randomize_columns_independently': Each feature not in s is randomly permuted separately. 'randomize_columns_jointly': All features not in s are jointly permuted. Note that this still represents an interventional distribution. :param max_batch_size: Maximum batch size for a estimating the predictions. This has a significant influence on the overall memory usage. If set to -1, all samples are used in one batch. :return: If return_averaged_results is False, a numpy array where the i-th entry belongs to the marginal expectation of x^i_s when randomizing the remaining features. If return_averaged_results is True, a two dimensional numpy array where the i-th entry contains all predictions for x^i_s when randomizing the remaining features. """ feature_samples, baseline_samples = shape_into_2d(feature_samples, baseline_samples) batch_size = baseline_samples.shape[0] if max_batch_size == -1 else max_batch_size result = [np.nan] * baseline_samples.shape[0] # Make copy to avoid manipulating the original matrix. feature_samples = np.array(feature_samples) features_to_randomize = np.delete(np.arange(0, feature_samples.shape[1]), baseline_feature_indices) if feature_perturbation == "randomize_columns_independently": feature_samples = permute_features(feature_samples, features_to_randomize, False) elif feature_perturbation == "randomize_columns_jointly": feature_samples = permute_features(feature_samples, features_to_randomize, True) else: raise ValueError("Unknown argument %s as feature_perturbation type!" % feature_perturbation) # The given prediction method has to be evaluated multiple times on a large amount of different inputs. Typically, # the batch evaluation of a prediction model on multiple inputs at the same time is significantly faster # than evaluating it on single simples in a for-loop. To make use of this, we try to evaluate as many samples as # possible in one batch call of the prediction method. However, this also requires a lot of memory for many samples. # To overcome potential memory issues, multiple batch calls are performed, each with at most batch_size many # samples. The number of samples that are evaluated is normally # baseline_noise_samples.shape[0] * feature_samples.shape[0]. Here, we reduce it to # batch_size * feature_samples.shape[0]. If the batch_size would be set 1, then each baseline_noise_samples is # evaluated one by one in a for-loop. inputs = repmat(feature_samples, batch_size, 1) for offset in range(0, baseline_samples.shape[0], batch_size): # Each batch consist of at most batch_size * feature_samples.shape[0] many samples. If there are multiple # batches, the offset indicates the index of the current baseline_noise_samples that has not been evaluated yet. if offset + batch_size > baseline_samples.shape[0]: # If the batch size would be larger than the remaining amount of samples, it is reduced to only include the # remaining baseline_noise_samples. adjusted_batch_size = baseline_samples.shape[0] - offset inputs = inputs[: adjusted_batch_size * feature_samples.shape[0]] else: adjusted_batch_size = batch_size for index in range(adjusted_batch_size): # The inputs consist of batch_size many copies of feature_samples. Here, we set the columns of the features # in baseline_feature_indices to their respective values in baseline_noise_samples. inputs[ index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0], baseline_feature_indices ] = baseline_samples[offset + index, baseline_feature_indices] # After creating the (potentially large) input data matrix, we can evaluate the prediction method. predictions = np.array(prediction_method(inputs)) for index in range(adjusted_batch_size): # Here, offset + index now indicates the sample index in baseline_noise_samples. if return_averaged_results: # This would average all prediction results obtained for the 'offset + index'-th sample in # baseline_noise_samples. This is, y^(offset + index) = E[Y | do(x^(offset + index)_s)]. result[offset + index] = np.mean( predictions[index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0]], axis=0 ) else: # This would return all prediction results obtained for the 'offset + index'-th sample in # baseline_noise_samples, i.e. the results are not averaged. result[offset + index] = predictions[ index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0] ] return np.array(result)
[docs]def permute_features( feature_samples: np.ndarray, features_to_permute: Union[List[int], np.ndarray], randomize_features_jointly: bool ) -> np.ndarray: # Making copy to ensure that the original object is not modified. feature_samples = np.array(feature_samples) if randomize_features_jointly: # Permute samples jointly. This still represents an interventional distribution. feature_samples[:, features_to_permute] = feature_samples[ np.random.choice(feature_samples.shape[0], feature_samples.shape[0], replace=False) ][:, features_to_permute] else: # Permute samples independently. for feature in features_to_permute: np.random.shuffle(feature_samples[:, feature]) return feature_samples
[docs]def estimate_ftest_pvalue( X_training_a: np.ndarray, X_training_b: np.ndarray, Y_training: np.ndarray, X_test_a: np.ndarray, X_test_b: np.ndarray, Y_test: np.ndarray, ) -> float: """Estimates the p-value for the null hypothesis that the same regression error with less parameters can be achieved. This is, a linear model trained on a data set A with d number of features has the same performance (in terms of squared error) relative to the number of features as a model trained on a data set B with k number features, where k < d. Here, both data sets need to have the same target values. A small p-value would indicate that the model performances are significantly different. Note that all given test samples are utilized in the f-test. See https://en.wikipedia.org/wiki/F-test#Regression_problems for more details. :param X_training_a: Input training samples for model A. :param X_training_b: Input training samples for model B. These samples should have less features than samples in X_training_a. :param Y_training: Target training values. :param X_test_a: Test samples for model A. :param X_test_b: Test samples for model B. :param Y_test: Test values. :return: A p-value on [0, 1]. """ X_training_a, X_test_a = shape_into_2d(X_training_a, X_test_a) if X_training_b.size > 0: X_training_b, X_test_b = shape_into_2d(X_training_b, X_test_b) else: X_training_b = X_training_b.reshape(0, 0) X_test_b = X_test_b.reshape(0, 0) if X_training_a.shape[1] <= X_training_b.shape[1]: raise ValueError( "The data X_training_a should have more dimensions (model parameters) than the data " "X_training_b!" ) ssr_a = np.sum((Y_test - LinearRegression().fit(X_training_a, Y_training).predict(X_test_a)) ** 2) if X_training_b.shape[1] > 0: ssr_b = np.sum((Y_test - LinearRegression().fit(X_training_b, Y_training).predict(X_test_b)) ** 2) else: ssr_b = np.sum((Y_test - np.mean(Y_training)) ** 2) dof_diff_1 = X_test_a.shape[1] - X_test_b.shape[1] # p1 - p2 dof_diff_2 = X_test_a.shape[0] - X_test_a.shape[1] - 1 # n - p2 (parameters include intercept) f_statistic = (ssr_b - ssr_a) / dof_diff_1 * dof_diff_2 if ssr_a < EPS: ssr_a = 0 if ssr_b < EPS: ssr_b = 0 if ssr_a == 0 and ssr_b == 0: f_statistic = 0 elif ssr_a != 0: f_statistic /= ssr_a return stats.f.sf(f_statistic, dof_diff_1, dof_diff_2)