Source code for dowhy.gcm.stats

from typing import Callable, List, Optional, Union

import numpy as np
from numpy.matlib import repmat
from scipy import stats
from sklearn.linear_model import LinearRegression

from dowhy.gcm.constant import EPS
from dowhy.gcm.util.general import shape_into_2d


[docs]def quantile_based_fwer( p_values: Union[np.ndarray, List[float]], p_values_scaling: Optional[np.ndarray] = None, quantile: float = 0.5 ) -> float: """Applies a quantile based family wise error rate (FWER) control to the given p-values. This is based on the approach described in: Meinshausen, N., Meier, L. and Buehlmann, P. (2009). p-values for high-dimensional regression. J. Amer. Statist. Assoc.104 1671–1681 :param p_values: A list or array of p-values. :param p_values_scaling: An optional list of scaling factors for each p-value. :param quantile: The quantile used for the p-value adjustment. By default, this is the median (0.5). :return: The p-value that lies on the quantile threshold. Note that this is the quantile based on scaled values p_values / quantile. """ if quantile <= 0 or abs(quantile - 1) >= 1: raise ValueError("The given quantile is %f, but it needs to be on (0, 1]!" % quantile) p_values = np.array(p_values) if p_values_scaling is None: p_values_scaling = np.ones(p_values.shape[0]) if p_values.shape != p_values_scaling.shape: raise ValueError("The p-value scaling array needs to have the same dimension as the given p-values.") p_values_scaling = p_values_scaling[~np.isnan(p_values)] p_values = p_values[~np.isnan(p_values)] p_values = p_values * p_values_scaling p_values[p_values > 1] = 1.0 if p_values.shape[0] == 1: return float(p_values[0]) else: return float(min(1.0, np.quantile(p_values / quantile, quantile)))
[docs]def marginal_expectation( prediction_method: Callable[[np.ndarray], np.ndarray], feature_samples: np.ndarray, baseline_samples: np.ndarray, baseline_feature_indices: List[int], return_averaged_results: bool = True, feature_perturbation: str = "randomize_columns_jointly", max_batch_size: int = -1, ) -> np.ndarray: """Estimates the marginal expectation for samples in baseline_noise_samples when randomizing features that are not part of baseline_feature_indices. This is, this function estimates y^i = E[Y | do(x^i_s)] := \\int_x_s' E[Y | x^i_s, x_s'] p(x_s') d x_s', where x^i_s is the i-th sample from baseline_noise_samples, s denotes the baseline_feature_indices and x_s' ~ X_s' denotes the randomized features that are not in s. For an approximation of the integral, the given prediction_method is evaluated multiple times for the same x^i_s, but different x_s' ~ X_s'. :param prediction_method: Prediction method of interest. This should expect a numpy array as input for making predictions. :param feature_samples: Samples from the joint distribution. These are used for randomizing the features that are not in baseline_feature_indices. :param baseline_samples: Samples for which the marginal expectation should be estimated. :param baseline_feature_indices: Column indices of the features in s. These values for these features are remain constant when estimating the expectation. :param return_averaged_results: If set to True, the expectation over all evaluated samples for the i-th baseline_noise_samples is returned. If set to False, all corresponding results for the i-th sample are returned. :param feature_perturbation: Type of feature permutation: 'randomize_columns_independently': Each feature not in s is randomly permuted separately. 'randomize_columns_jointly': All features not in s are jointly permuted. Note that this still represents an interventional distribution. :param max_batch_size: Maximum batch size for a estimating the predictions. This has a significant influence on the overall memory usage. If set to -1, all samples are used in one batch. :return: If return_averaged_results is False, a numpy array where the i-th entry belongs to the marginal expectation of x^i_s when randomizing the remaining features. If return_averaged_results is True, a two dimensional numpy array where the i-th entry contains all predictions for x^i_s when randomizing the remaining features. """ feature_samples, baseline_samples = shape_into_2d(feature_samples, baseline_samples) batch_size = baseline_samples.shape[0] if max_batch_size == -1 else max_batch_size result = [np.nan] * baseline_samples.shape[0] # Make copy to avoid manipulating the original matrix. feature_samples = np.array(feature_samples) features_to_randomize = np.delete(np.arange(0, feature_samples.shape[1]), baseline_feature_indices) if feature_perturbation == "randomize_columns_independently": feature_samples = permute_features(feature_samples, features_to_randomize, False) elif feature_perturbation == "randomize_columns_jointly": feature_samples = permute_features(feature_samples, features_to_randomize, True) else: raise ValueError("Unknown argument %s as feature_perturbation type!" % feature_perturbation) # The given prediction method has to be evaluated multiple times on a large amount of different inputs. Typically, # the batch evaluation of a prediction model on multiple inputs at the same time is significantly faster # than evaluating it on single simples in a for-loop. To make use of this, we try to evaluate as many samples as # possible in one batch call of the prediction method. However, this also requires a lot of memory for many samples. # To overcome potential memory issues, multiple batch calls are performed, each with at most batch_size many # samples. The number of samples that are evaluated is normally # baseline_noise_samples.shape[0] * feature_samples.shape[0]. Here, we reduce it to # batch_size * feature_samples.shape[0]. If the batch_size would be set 1, then each baseline_noise_samples is # evaluated one by one in a for-loop. inputs = repmat(feature_samples, batch_size, 1) for offset in range(0, baseline_samples.shape[0], batch_size): # Each batch consist of at most batch_size * feature_samples.shape[0] many samples. If there are multiple # batches, the offset indicates the index of the current baseline_noise_samples that has not been evaluated yet. if offset + batch_size > baseline_samples.shape[0]: # If the batch size would be larger than the remaining amount of samples, it is reduced to only include the # remaining baseline_noise_samples. adjusted_batch_size = baseline_samples.shape[0] - offset inputs = inputs[: adjusted_batch_size * feature_samples.shape[0]] else: adjusted_batch_size = batch_size for index in range(adjusted_batch_size): # The inputs consist of batch_size many copies of feature_samples. Here, we set the columns of the features # in baseline_feature_indices to their respective values in baseline_noise_samples. inputs[ index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0], baseline_feature_indices ] = baseline_samples[offset + index, baseline_feature_indices] # After creating the (potentially large) input data matrix, we can evaluate the prediction method. predictions = np.array(prediction_method(inputs)) for index in range(adjusted_batch_size): # Here, offset + index now indicates the sample index in baseline_noise_samples. if return_averaged_results: # This would average all prediction results obtained for the 'offset + index'-th sample in # baseline_noise_samples. This is, y^(offset + index) = E[Y | do(x^(offset + index)_s)]. result[offset + index] = np.mean( predictions[index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0]], axis=0 ) else: # This would return all prediction results obtained for the 'offset + index'-th sample in # baseline_noise_samples, i.e. the results are not averaged. result[offset + index] = predictions[ index * feature_samples.shape[0] : (index + 1) * feature_samples.shape[0] ] return np.array(result)
[docs]def permute_features( feature_samples: np.ndarray, features_to_permute: Union[List[int], np.ndarray], randomize_features_jointly: bool ) -> np.ndarray: # Making copy to ensure that the original object is not modified. feature_samples = np.array(feature_samples) if randomize_features_jointly: # Permute samples jointly. This still represents an interventional distribution. feature_samples[:, features_to_permute] = feature_samples[ np.random.choice(feature_samples.shape[0], feature_samples.shape[0], replace=False) ][:, features_to_permute] else: # Permute samples independently. for feature in features_to_permute: np.random.shuffle(feature_samples[:, feature]) return feature_samples
[docs]def estimate_ftest_pvalue( X_training_a: np.ndarray, X_training_b: np.ndarray, Y_training: np.ndarray, X_test_a: np.ndarray, X_test_b: np.ndarray, Y_test: np.ndarray, ) -> float: """Estimates the p-value for the null hypothesis that the same regression error with less parameters can be achieved. This is, a linear model trained on a data set A with d number of features has the same performance (in terms of squared error) relative to the number of features as a model trained on a data set B with k number features, where k < d. Here, both data sets need to have the same target values. A small p-value would indicate that the model performances are significantly different. Note that all given test samples are utilized in the f-test. See https://en.wikipedia.org/wiki/F-test#Regression_problems for more details. :param X_training_a: Input training samples for model A. :param X_training_b: Input training samples for model B. These samples should have less features than samples in X_training_a. :param Y_training: Target training values. :param X_test_a: Test samples for model A. :param X_test_b: Test samples for model B. :param Y_test: Test values. :return: A p-value on [0, 1]. """ X_training_a, X_test_a = shape_into_2d(X_training_a, X_test_a) if X_training_b.size > 0: X_training_b, X_test_b = shape_into_2d(X_training_b, X_test_b) else: X_training_b = X_training_b.reshape(0, 0) X_test_b = X_test_b.reshape(0, 0) if X_training_a.shape[1] <= X_training_b.shape[1]: raise ValueError( "The data X_training_a should have more dimensions (model parameters) than the data " "X_training_b!" ) ssr_a = np.sum((Y_test - LinearRegression().fit(X_training_a, Y_training).predict(X_test_a)) ** 2) if X_training_b.shape[1] > 0: ssr_b = np.sum((Y_test - LinearRegression().fit(X_training_b, Y_training).predict(X_test_b)) ** 2) else: ssr_b = np.sum((Y_test - np.mean(Y_training)) ** 2) dof_diff_1 = X_test_a.shape[1] - X_test_b.shape[1] # p1 - p2 dof_diff_2 = X_test_a.shape[0] - X_test_a.shape[1] - 1 # n - p2 (parameters include intercept) f_statistic = (ssr_b - ssr_a) / dof_diff_1 * dof_diff_2 if ssr_a < EPS: ssr_a = 0 if ssr_b < EPS: ssr_b = 0 if ssr_a == 0 and ssr_b == 0: f_statistic = 0 elif ssr_a != 0: f_statistic /= ssr_a return stats.f.sf(f_statistic, dof_diff_1, dof_diff_2)