"""This module defines multiple implementations of the abstract class :class:`~dowhy.gcm.graph.StochasticModel`.
Classes in this module should be considered experimental, meaning there might be breaking API changes in the future.
"""
import warnings
from typing import Dict, Optional, Tuple, Union
import numpy as np
import scipy
from scipy.stats import norm, rv_continuous, rv_discrete
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.mixture import BayesianGaussianMixture
from dowhy.gcm.causal_mechanisms import StochasticModel
from dowhy.gcm.divergence import estimate_kl_divergence_continuous
from dowhy.gcm.util.general import shape_into_2d
_CONTINUOUS_DISTRIBUTIONS = [
scipy.stats.norm,
scipy.stats.laplace,
scipy.stats.t,
scipy.stats.uniform,
scipy.stats.rayleigh,
]
_CONTINUOUS_DISTRIBUTIONS.extend(
[
getattr(scipy.stats, d)
for d in dir(scipy.stats)
if isinstance(getattr(scipy.stats, d), scipy.stats.rv_continuous) and d not in _CONTINUOUS_DISTRIBUTIONS
]
)
_DISCRETE_DISTRIBUTIONS = [
getattr(scipy.stats, d) for d in dir(scipy.stats) if isinstance(getattr(scipy.stats, d), scipy.stats.rv_discrete)
]
_CONTINUOUS_DISTRIBUTIONS = {x.name: x for x in _CONTINUOUS_DISTRIBUTIONS}
_DISCRETE_DISTRIBUTIONS = {x.name: x for x in _DISCRETE_DISTRIBUTIONS}
[docs]class ScipyDistribution(StochasticModel):
"""Represents any parametric distribution that can be modeled by scipy."""
def __init__(self, scipy_distribution: Optional[Union[rv_continuous, rv_discrete]] = None, **parameters) -> None:
"""Initializes a stochastic model that allows to sample from a parametric distribution implemented in Scipy.
For instance, to use a beta distribution with parameters a=2 and b=0.5:
ScipyDistribution(stats.beta, a=2, b=0.5)
Or a Gaussian distribution with mean=0 and standard deviation 2:
ScipyDistribution(stats.norm, loc=2, scale=0.5)
Note that the parameter names need to coincide with the parameter names in the corresponding Scipy
implementations. See https://docs.scipy.org/doc/scipy/tutorial/stats.html for more information.
:param scipy_distribution: A continuous or discrete distribution parametric distribution implemented in Scipy.
:param parameters: Set of parameters of the parametric distribution.
"""
self._distribution = scipy_distribution
self._parameters = parameters
self._fixed_parameters = len(parameters) > 0
[docs] def draw_samples(self, num_samples: int) -> np.ndarray:
if len(self._parameters) == 0 or self._distribution is None:
raise ValueError("Cannot draw samples. Model has not been fit!")
return shape_into_2d(self._distribution.rvs(size=num_samples, **self.parameters))
[docs] def fit(self, X: np.ndarray) -> None:
if self._distribution is None:
# Currently only support continuous distributions for auto selection.
best_model, best_parameters = self.find_suitable_continuous_distribution(X)
self._distribution = best_model
self._parameters = best_parameters
elif not self._fixed_parameters:
self._parameters = self.map_scipy_distribution_parameters_to_names(
self._distribution, self._distribution.fit(shape_into_2d(X))
)
@property
def parameters(self) -> Dict[str, float]:
return self._parameters
@property
def scipy_distribution(self) -> Optional[Union[rv_continuous, rv_discrete]]:
return self._distribution
[docs] def clone(self):
if self._fixed_parameters:
return ScipyDistribution(scipy_distribution=self._distribution, **self._parameters)
else:
return ScipyDistribution(scipy_distribution=self._distribution)
[docs] @staticmethod
def find_suitable_continuous_distribution(
distribution_samples: np.ndarray, divergence_threshold: float = 10**-2
) -> Tuple[rv_continuous, Dict[str, float]]:
"""Tries to find the best fitting continuous parametric distribution of given samples. This is done by fitting
different parametric models and selecting the one with the smallest KL divergence between observed and generated
samples.
"""
distribution_samples = shape_into_2d(distribution_samples)
currently_best_distribution = norm
currently_best_parameters = (0.0, 1.0)
currently_smallest_divergence = np.inf
# Estimate distribution parameters from data.
for distribution in _CONTINUOUS_DISTRIBUTIONS.values():
# Ignore warnings from fitting process.
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
try:
# Fit distribution to data.
params = distribution.fit(distribution_samples)
except ValueError:
# Some distributions might not be compatible with the data.
continue
# Separate parts of parameters.
arg = params[:-2]
loc = params[-2]
scale = params[-1]
generated_samples = distribution.rvs(size=distribution_samples.shape[0], loc=loc, scale=scale, *arg)
# Check the KL divergence between the distribution of the given and fitted distribution.
divergence = estimate_kl_divergence_continuous(distribution_samples, generated_samples)
if divergence < divergence_threshold:
currently_best_distribution = distribution
currently_best_parameters = params
break
# Identify if this distribution is better.
if currently_smallest_divergence > divergence:
currently_best_distribution = distribution
currently_best_parameters = params
currently_smallest_divergence = divergence
return currently_best_distribution, ScipyDistribution.map_scipy_distribution_parameters_to_names(
currently_best_distribution, currently_best_parameters
)
[docs] @staticmethod
def map_scipy_distribution_parameters_to_names(
scipy_distribution: Union[rv_continuous, rv_discrete], parameters: Tuple[float]
) -> Dict[str, float]:
"""Helper function to obtain a mapping from parameter name to parameter value. Depending whether the
distribution is discrete or continuous, there are slightly different parameter names. The given parameters are
assumed to follow the order as provided by the scipy fit function.
:param scipy_distribution: The scipy distribution.
:param parameters: The values of the corresponding parameters of the distribution. Here, it is expected to
follow the same order as defined by the scipy fit function.
:return: A dictionary that maps a parameter name to its value.
"""
if scipy_distribution.shapes:
parameter_list = [name.strip() for name in scipy_distribution.shapes.split(",")]
else:
parameter_list = []
if scipy_distribution.name in _DISCRETE_DISTRIBUTIONS:
parameter_list += ["loc"]
elif scipy_distribution.name in _CONTINUOUS_DISTRIBUTIONS:
parameter_list += ["loc", "scale"]
else:
raise ValueError(
"Distribution %s not found in the list of continuous and discrete distributions!"
% scipy_distribution.name
)
parameters_dictionary = {}
for i, parameter_name in enumerate(parameter_list):
parameters_dictionary[parameter_name] = parameters[i]
return parameters_dictionary
[docs]class EmpiricalDistribution(StochasticModel):
"""An implementation of a stochastic model that uniformly samples from data samples. By randomly returning a sample
from the training data set, this model represents a parameter free representation of the marginal distribution of
the training data. However, it will not generate unseen data points. For this, consider :py:class:`BayesianGaussianMixtureDistribution <dowhy.gcm.BayesianGaussianMixtureDistribution>`.
"""
def __init__(self) -> None:
self._data = None
@property
def data(self) -> np.ndarray:
return self._data
[docs] def fit(self, X: np.ndarray) -> None:
self._data = shape_into_2d(X)
[docs] def draw_samples(self, num_samples: int) -> np.ndarray:
if self.data is None:
raise RuntimeError("%s has not been fitted!" % self.__class__.__name__)
return self.data[np.random.choice(self.data.shape[0], size=num_samples, replace=True), :]
[docs] def clone(self):
return EmpiricalDistribution()
[docs]class BayesianGaussianMixtureDistribution(StochasticModel):
def __init__(self) -> None:
self._gmm_model = None
[docs] def fit(self, X: np.ndarray) -> None:
X = shape_into_2d(X)
self._gmm_model = BayesianGaussianMixture(
n_components=BayesianGaussianMixtureDistribution._get_optimal_number_of_components(X), max_iter=1000
).fit(X)
@staticmethod
def _get_optimal_number_of_components(X: np.ndarray) -> int:
current_best = 0
current_best_num_components = 1
num_best_in_succession = 0
try:
for i in range(2, int(np.sqrt(X.shape[0] / 2))):
kmeans = KMeans(n_clusters=i).fit(X)
coefficient = silhouette_score(X, kmeans.labels_, sample_size=5000)
if coefficient > current_best:
current_best = coefficient
current_best_num_components = i
num_best_in_succession = 0
else:
num_best_in_succession += 1
if num_best_in_succession >= 3:
break
except ValueError:
# This error is typically raised when the data is discrete and all points are assigned to less cluster than
# specified. It can also happen due to duplicated points. In these cases, the current best solution should
# be sufficient.
return current_best_num_components
return current_best_num_components
[docs] def draw_samples(self, num_samples: int) -> np.ndarray:
if self._gmm_model is None:
raise RuntimeError("%s has not been fitted!" % self.__class__.__name__)
return shape_into_2d(self._gmm_model.sample(num_samples)[0])
def __str__(self) -> str:
return "Approximated data distribution"
[docs] def clone(self):
return BayesianGaussianMixtureDistribution()