Source code for dowhy.gcm.influence

"""This module provides functions to estimate causal influences.

Functions in this module should be considered experimental, meaning there might be breaking API changes in the future.
"""
import logging
import warnings
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from numpy.matlib import repmat

import dowhy.gcm.auto as auto
from dowhy.gcm import feature_relevance_sample
from dowhy.gcm._noise import compute_data_from_noise, compute_noise_from_data, noise_samples_of_ancestors
from dowhy.gcm.causal_mechanisms import ClassifierFCM, ConditionalStochasticModel, ProbabilityEstimatorModel
from dowhy.gcm.causal_models import (
    InvertibleStructuralCausalModel,
    ProbabilisticCausalModel,
    StructuralCausalModel,
    validate_causal_dag,
    validate_node,
)
from dowhy.gcm.divergence import estimate_kl_divergence_of_probabilities
from dowhy.gcm.fitting_sampling import draw_samples
from dowhy.gcm.ml import ClassificationModel, PredictionModel
from dowhy.gcm.shapley import ShapleyConfig, estimate_shapley_values
from dowhy.gcm.stats import marginal_expectation
from dowhy.gcm.uncertainty import estimate_entropy_of_probabilities, estimate_variance
from dowhy.gcm.util.general import has_categorical, is_categorical, means_difference, set_random_seed, shape_into_2d
from dowhy.graph import get_ordered_predecessors, is_root_node, node_connected_subgraph_view

_logger = logging.getLogger(__name__)


[docs]def arrow_strength( causal_model: ProbabilisticCausalModel, target_node: Any, parent_samples: Optional[pd.DataFrame] = None, num_samples_conditional: int = 2000, max_num_runs: int = 5000, tolerance: float = 0.01, n_jobs: int = -1, difference_estimation_func: Optional[Callable[[np.ndarray, np.ndarray], Union[np.ndarray, float]]] = None, ) -> Dict[Tuple[Any, Any], float]: """Computes the causal strength of each edge directed to the target node. The strength of an edge is quantified in terms of distance between conditional distributions of the target node in the original graph and the imputed graph wherein the edge has been removed and the target node is fed a random permutation of the observations of the source node. For more scientific details behind this API, please refer to the research paper below. **Research Paper**: Dominik Janzing, David Balduzzi, Moritz Grosse-Wentrup, Bernhard Schölkopf. *Quantifying Causal Influences*. The Annals of Statistics, Vol. 41, No. 5, 2324-2358, 2013. :param causal_model: The probabilistic causal model for whose target node we compute the strength of incoming edges for. :param target_node: The target node whose incoming edges' strength is to be computed. :param parent_samples: Optional samples from the parents of the target_node. If None are given, they are generated based on the provided causal model. Providing observational data can help to mitigate misspecifications in the graph, such as missing interactions between root nodes or confounders. :param num_samples_conditional: Sample size to use for estimating the distance between distributions. The more more samples, the higher the accuracy. :param max_num_runs: The maximum number of times to resample and estimate the strength to report the average strength. :param tolerance: If the percentage change in the estimated strength between two consecutive runs falls below the specified tolerance, the algorithm will terminate before reaching the maximum number of runs. A value of 0.01 would indicate a change of less than 1%. However, in order to minimize the impact of randomness, there must be at least three consecutive runs where the change is below the threshold. :param n_jobs: The number of jobs to run in parallel. Set it to -1 to use all processors. :param difference_estimation_func: Optional: How to measure the distance between two distributions. By default, the difference of the variance is estimated for a continuous target node and the KL divergence for a categorical target node. :return: Causal strength of each edge. """ if target_node not in causal_model.graph.nodes: raise ValueError("Target node %s can not be found in given graph!" % target_node) if is_root_node(causal_model.graph, target_node): raise ValueError("Target node %s is a root node, but it requires to have ancestors!" % target_node) # Creating a smaller subgraph, which only contains upstream nodes that are connected to the target node. sub_causal_model = ProbabilisticCausalModel(node_connected_subgraph_view(causal_model.graph, target_node)) validate_node(sub_causal_model.graph, target_node) ordered_predecessors = get_ordered_predecessors(sub_causal_model.graph, target_node) if parent_samples is None: parent_samples = draw_samples(sub_causal_model, num_samples_conditional * 20)[ordered_predecessors] direct_influences = arrow_strength_of_model( sub_causal_model.causal_mechanism(target_node), parent_samples[ordered_predecessors].to_numpy(), num_samples_from_conditional=num_samples_conditional, max_num_runs=max_num_runs, tolerance=tolerance, n_jobs=n_jobs, difference_estimation_func=difference_estimation_func, ) return {(predecessor, target_node): direct_influences[i] for i, predecessor in enumerate(ordered_predecessors)}
[docs]def arrow_strength_of_model( conditional_stochastic_model: ConditionalStochasticModel, input_samples: np.ndarray, num_samples_from_conditional: int = 2000, max_num_runs: int = 5000, tolerance: float = 0.01, n_jobs: int = -1, difference_estimation_func: Optional[Callable[[np.ndarray, np.ndarray], Union[np.ndarray, float]]] = None, input_subsets: Optional[List[List[int]]] = None, ) -> np.ndarray: input_samples = shape_into_2d(input_samples) if input_subsets is None: input_subsets = [[i] for i in range(input_samples.shape[1])] if difference_estimation_func is None: if isinstance(conditional_stochastic_model, ProbabilityEstimatorModel): difference_estimation_func = estimate_kl_divergence_of_probabilities else: def difference_estimation_func(old, new): return np.var(new) - np.var(old) if isinstance(conditional_stochastic_model, ProbabilityEstimatorModel): samples_creation_method = conditional_stochastic_model.estimate_probabilities else: samples_creation_method = conditional_stochastic_model.draw_samples with warnings.catch_warnings(): warnings.filterwarnings("ignore") def parallel_job(subset: List[int], parallel_random_seed: int): set_random_seed(parallel_random_seed) return _estimate_direct_strength( samples_creation_method, input_samples, subset, difference_estimation_func, num_samples_from_conditional, max_num_runs, tolerance, ) random_seeds = np.random.randint(np.iinfo(np.int32).max, size=len(input_subsets)) results = Parallel(n_jobs=n_jobs)( delayed(parallel_job)(subset, int(random_seed)) for subset, random_seed in zip(input_subsets, random_seeds) ) if np.any(results == np.inf): _logger.warning( "At least one arrow strength is infinite. This typically happens if the causal models are " "deterministic, i.e. there is no noise or it is extremely small." ) return np.array(results)
def _estimate_direct_strength( draw_samples_func: Callable[[np.ndarray], np.ndarray], distribution_samples: np.ndarray, parents_subset: List[int], difference_estimation_func: Callable[[np.ndarray, np.ndarray], Union[np.ndarray, float]], num_samples_conditional: int, max_num_runs: int, tolerance: float, ) -> float: distribution_samples = shape_into_2d(distribution_samples) num_samples_conditional = min(num_samples_conditional, distribution_samples.shape[0]) aggregated_conditional_difference_result = 0 average_difference_result = 0 converged_run = 0 for run, sample in enumerate(distribution_samples): tmp_samples = repmat(sample, num_samples_conditional, 1) rnd_permutation = np.random.choice(distribution_samples.shape[0], num_samples_conditional, replace=False) # Sampling from the conditional distribution based on the current sample. conditional_distribution_samples = draw_samples_func(tmp_samples) # Sampling from the conditional based on the current sample, but randomizing the inputs of all variables that # are in the given subset. By this, we can simulate the impact on the conditional distribution when removing # only the incoming edges of the variables in the subset. tmp_samples[:, parents_subset] = distribution_samples[:, parents_subset][rnd_permutation] cond_dist_removed_arr_samples = draw_samples_func(tmp_samples) old_average_difference_result = average_difference_result aggregated_conditional_difference_result += difference_estimation_func( conditional_distribution_samples, cond_dist_removed_arr_samples ) average_difference_result = aggregated_conditional_difference_result / (run + 1) if run >= max_num_runs: break elif run > 0: if old_average_difference_result == 0: converging = average_difference_result == 0 else: converging = abs(1 - average_difference_result / old_average_difference_result) < tolerance if converging: converged_run += 1 if converged_run >= 3: break else: converged_run = 0 return average_difference_result
[docs]def intrinsic_causal_influence( causal_model: StructuralCausalModel, target_node: Any, prediction_model: Union[PredictionModel, ClassificationModel, str] = "approx", attribution_func: Optional[Callable[[np.ndarray, np.ndarray], float]] = None, num_training_samples: int = 100000, num_samples_randomization: int = 1000, num_samples_baseline: int = 2000, max_batch_size: int = 250, auto_assign_quality: auto.AssignmentQuality = auto.AssignmentQuality.GOOD, shapley_config: Optional[ShapleyConfig] = None, ) -> Dict[Any, float]: """Computes the causal contribution of each upstream noise term of the target node (including the noise of the target itself) to the statistical property (e.g. mean, variance) of the target. We call this contribution *intrinsic* as noise terms, by definition, do not inherit properties of observed parents. The contribution of each noise term is then the *intrinsic* causal contribution of the corresponding node. For more scientific details, please refer to the paper below. **Research Paper**: Janzing et al. *Quantifying causal contributions via structure preserving interventions*. arXiv:2007.00714, 2021. :param causal_model: The structural causal model for whose target node we compute the intrinsic causal influence of its ancestors. :param target_node: Target node whose statistical property is to be attributed. :param prediction_model: Prediction model for estimating the functional relationship between subsets of ancestor noise terms and the target node. This can be an instance of a PredictionModel, the string 'approx' or the string 'exact'. With 'exact', the underlying causal models in the graph are utilized directly by propagating given noise inputs through the graph. This is generally more accurate but slow. With 'approx', an appropriate model is selected and trained based on sampled data from the graph, which is less accurate but faster. A more detailed treatment on why we need this parameter is also provided in :ref:`icc`. :param attribution_func: Optional attribution function to measure the statistical property of the target node. This function expects two inputs; predictions after the randomization of certain features (i.e. samples from noise nodes) and a baseline where no features were randomized. The baseline predictions can be typically ignored if one is interested in uncertainty measures such as entropy or variance, but they might be relevant if, for instance, these shall be estimated based on the residuals. By default, entropy is used if prediction model is a classifier, variance otherwise. :param num_training_samples: Number of samples drawn from the graphical causal model that are used for fitting the prediction_model (if necessary). :param num_samples_randomization: Number of noise samples drawn from the graphical causal model that are used for evaluating the set function. Here, these samples are samples from the noise distributions used for randomizing features that are not in the subset. :param num_samples_baseline: Number of noise samples drawn from the graphical causal model that are used for evaluating the set function. Here, these samples are used as fixed observations for features that are in the subset. :param max_batch_size: Maximum batch size for estimating the predictions from evaluation samples. This has a significant impact on the overall memory usage. If set to -1, all samples are used in one batch. :param auto_assign_quality: Auto assign quality for the 'approx' prediction_model option. :param shapley_config: :class:`~dowhy.gcm.shapley.ShapleyConfig` for the Shapley estimator. :return: Intrinsic causal contribution of each ancestor node to the statistical property defined by the attribution_func of the target node. """ validate_causal_dag(causal_model.graph) # Creating a smaller subgraph, which only contains upstream nodes that are connected to the target node. sub_causal_model = StructuralCausalModel(node_connected_subgraph_view(causal_model.graph, target_node)) data_samples, noise_samples = noise_samples_of_ancestors(sub_causal_model, target_node, num_training_samples) node_names = noise_samples.columns noise_samples, target_samples = shape_into_2d(noise_samples.to_numpy(), data_samples[target_node].to_numpy()) target_is_categorical = is_categorical(data_samples[target_node].to_numpy()) prediction_method = _get_icc_noise_function( causal_model, target_node, prediction_model, noise_samples, node_names, target_samples, auto_assign_quality, target_is_categorical, ) if attribution_func is None: if target_is_categorical: def attribution_func(x, _): return -estimate_entropy_of_probabilities(x) else: def attribution_func(x, _): return estimate_variance(x) _, noise_samples = noise_samples_of_ancestors( sub_causal_model, target_node, num_samples_randomization + num_samples_baseline ) noise_samples = shape_into_2d(noise_samples.to_numpy()) iccs = _estimate_iccs( attribution_func, prediction_method, noise_samples[:num_samples_randomization], noise_samples[num_samples_randomization : num_samples_randomization + num_samples_baseline], max_batch_size, ShapleyConfig() if shapley_config is None else shapley_config, ) return {node: iccs[i] for i, node in enumerate(node_names)}
[docs]def intrinsic_causal_influence_sample( causal_model: InvertibleStructuralCausalModel, target_node: Any, baseline_samples: pd.DataFrame, noise_feature_samples: Optional[pd.DataFrame] = None, subset_scoring_func: Optional[Callable[[np.ndarray, np.ndarray], Union[np.ndarray, float]]] = None, num_noise_feature_samples: int = 5000, max_batch_size: int = 100, shapley_config: Optional[ShapleyConfig] = None, ) -> List[Dict[Any, Any]]: """Estimates the intrinsic causal impact of upstream nodes on a specified target_node, using the provided baseline_samples as a reference. In this context, observed values are attributed to the noise factors present in upstream nodes. Compared to intrinsic_causal_influence, this method quantifies the influences with respect to single observations instead of the distribution. Note that the current implementation only supports non-categorical data, since the noise terms need to be reconstructed. **Research Paper**: Janzing et al. *Quantifying causal contributions via structure preserving interventions*. arXiv:2007.00714, 2021. :param causal_model: The fitted invertible structural causal model. :param target_node: Node of interest. :param baseline_samples: Samples for which the influence should be estimated. :param noise_feature_samples: Optional noise samples of upstream nodes used as 'background' samples.. If None is given, new noise samples are generated based on the graph. These samples are used for randomizing features that are not in the subset. :param subset_scoring_func: Set function for estimating the quantity of interest based. This function expects two inputs; the outcome of the model for some samples if certain features are permuted and the outcome of the model for the same samples when no features were permuted. By default, the difference between means of these samples are estimated. :param num_noise_feature_samples: If no noise_feature_samples are given, noise samples are drawn from the graph. This parameter indicates how many. :param max_batch_size: Maximum batch size for estimating multiple predictions at once. This has a significant influence on the overall memory usage. If set to -1, all samples are used in one batch. :param shapley_config: :class:`~dowhy.gcm.shapley.ShapleyConfig` for the Shapley estimator. :return: A list of dictionaries indicating the intrinsic causal influence of a node on the target for a particular sample. This is, each dictionary belongs to one baseline sample. """ validate_node(causal_model.graph, target_node) causal_model = InvertibleStructuralCausalModel(node_connected_subgraph_view(causal_model.graph, target_node)) feature_samples, tmp_noise_feature_samples = noise_samples_of_ancestors( causal_model, target_node, num_noise_feature_samples ) if has_categorical(feature_samples.to_numpy()): raise ValueError( "The current implementation requires all variables to be numeric, i.e., non-categorical! " "There is at least one node in the graph that is categorical." ) if noise_feature_samples is None: noise_feature_samples = tmp_noise_feature_samples if subset_scoring_func is None: subset_scoring_func = means_difference shapley_vales = feature_relevance_sample( _get_icc_noise_function( causal_model, target_node, "exact", noise_feature_samples, noise_feature_samples.columns, None, None, False ), feature_samples=noise_feature_samples.to_numpy(), baseline_samples=compute_noise_from_data(causal_model, baseline_samples)[ noise_feature_samples.columns ].to_numpy(), subset_scoring_func=subset_scoring_func, max_batch_size=max_batch_size, shapley_config=shapley_config, ) return [ {(predecessor, target_node): shapley_vales[i][q] for q, predecessor in enumerate(noise_feature_samples.columns)} for i in range(shapley_vales.shape[0]) ]
def _estimate_iccs( attribution_func: Callable[[np.ndarray, np.ndarray], float], prediction_method: Callable[[np.ndarray], np.ndarray], noise_samples: np.ndarray, baseline_noise_samples: np.ndarray, max_batch_size: int, shapley_config: ShapleyConfig, ): target_values = shape_into_2d(prediction_method(baseline_noise_samples)) def icc_set_function(subset: np.ndarray) -> Union[np.ndarray, float]: if np.all(subset == 1): # In case of the full subset (no randomization), we get the same predictions as when we apply the # prediction method to the samples of interest, since all noise samples are replaced with a sample of # interest. predictions = target_values elif np.all(subset == 0): # In case of the empty subset (all are jointly randomize), it boils down to taking the average over all # predictions, seeing that the randomization yields the same values for each sample of interest (none of the # samples of interest are used to replace a (jointly) 'randomized' sample). predictions = repmat(np.mean(prediction_method(noise_samples), axis=0), baseline_noise_samples.shape[0], 1) else: predictions = marginal_expectation( prediction_method, feature_samples=noise_samples, baseline_samples=baseline_noise_samples, baseline_feature_indices=np.arange(0, noise_samples.shape[1])[subset == 1], return_averaged_results=True, feature_perturbation="randomize_columns_jointly", max_batch_size=max_batch_size, ) return attribution_func(shape_into_2d(predictions), target_values) return estimate_shapley_values(icc_set_function, noise_samples.shape[1], shapley_config) def _get_icc_noise_function( causal_model: InvertibleStructuralCausalModel, target_node: Any, prediction_model: Union[PredictionModel, ClassificationModel, str], noise_samples: np.ndarray, node_names: Iterator[Any], target_samples: np.ndarray, auto_assign_quality: auto.AssignmentQuality, target_is_categorical: bool, ) -> Callable[[np.ndarray], np.ndarray]: if isinstance(prediction_model, str) and prediction_model not in ("approx", "exact"): raise ValueError( "Invalid value for prediction_model: %s! This should either be an instance of a PredictionModel or" "one of the two string options 'exact' or 'approx'." % prediction_model ) if not isinstance(prediction_model, str): prediction_model.fit(noise_samples, target_samples) return prediction_model.predict if prediction_model == "approx": prediction_model = auto.select_model(noise_samples, target_samples, auto_assign_quality) prediction_model.fit(noise_samples, target_samples) if target_is_categorical: return prediction_model.predict_probabilities else: return prediction_model.predict else: # Exact model def exact_model(X: np.ndarray) -> np.ndarray: return compute_data_from_noise(causal_model, pd.DataFrame(X, columns=[x for x in node_names]))[ target_node ].to_numpy() if target_is_categorical: list_of_classes = cast(ClassifierFCM, causal_model.causal_mechanism(target_node)).classifier_model.classes def prediction_method(X): return (shape_into_2d(exact_model(X)) == list_of_classes).astype(float) return prediction_method else: return exact_model