Source code for dowhy.causal_refuters.overrule.ruleset

"""Ruleset estimator class for OverRule.

This module implements the boolean ruleset estimator from OverRule [1]. Code is adapted (with some simplifications)
from https://github.com/clinicalml/overlap-code, under the MIT License.

[1] Oberst, M., Johansson, F., Wei, D., Gao, T., Brat, G., Sontag, D., & Varshney, K. (2020). Characterization of
Overlap in Observational Studies. In S. Chiappa & R. Calandra (Eds.), Proceedings of the Twenty Third International
Conference on Artificial Intelligence and Statistics (Vol. 108, pp. 788–798). PMLR. https://arxiv.org/abs/1907.04138
"""
from typing import Callable, Dict, List, Optional

import numpy as np
import pandas as pd

from .BCS.load_process_data_BCS import FeatureBinarizer
from .BCS.overlap_boolean_rule import OverlapBooleanRule
from .utils import rule_str, sample_reference


[docs]class BCSRulesetEstimator: """Ruleset estimator based on Boolean Rules with Column Generation. Operates according to an scikit-learn interface with a few additional methods. """ def __init__( self, n_ref_multiplier: float = 1.0, lambda0: float = 0.0, lambda1: float = 0.0, cat_cols: Optional[List] = None, negations: bool = True, num_thresh: int = 9, seed: int = None, ref_range: Optional[Dict[str, Dict]] = None, thresh_override: Optional[Dict] = None, **kwargs, ): """ Initializes the estimator. `**kwargs` are passed to OverlapBooleanRule (see ./BCS/overlap_boolean_rule.py for description of arguments) :param n_ref_multiplier: Reference sample count multiplier, only used for estimating support, defaults to 1.0, but should be set to zero for Overlap rules :type n_ref_multiplier: float, optional :param lambda0: Regularization on the # of rules, defaults to 0.0 :type lambda0: float, optional :param lambda1: Regularization on the # of literals, defaults to 0.0 :type lambda1: float, optional :param cat_cols: Set of categorical columns, defaults to None :type cat_cols: Optional[List], optional :param negations: Include negation of literals, defaults to True :type negations: bool, optional :param num_thresh: Number of bins to discretize continuous variables, defaults to 9 (for deciles) :type num_thresh: int, optional :param seed: Random seed for reference samples, only used for estimating support, defaults to None :type seed: int, optional :param ref_range: Manual override of the range for reference samples, given as a dictionary of the form `ref_range = {c: {"is_binary": True/False, "min": min_value, "max": max_value}}` :type ref_range: Optional[Dict], optional :param thresh_override: Manual override of the thresholds for continuous features, given as a dictionary like the following, will only be applied to continuous features with more than num_thresh unique values `thresh_override = {column_name: np.linspace(0, 100, 10)}` :type thresh_override: Optional[Dict], optional """ # Parameters self.n_ref_multiplier = n_ref_multiplier self.lambda0 = lambda0 self.lambda1 = lambda1 self.cat_cols = cat_cols if cat_cols else [] self.negations = negations self.num_thresh = num_thresh self.seed = seed self.ref_range = ref_range self.thresh_override = thresh_override self.kwargs = kwargs # Bookkeeping self.refSamples = None self.overlapSamples = None # Initialize estimators self.init_estimator_() self.valid_params = ["lambda0", "lambda1", "cat_cols", "n_ref_multiplier", "negations", "num_thresh", "seed"] def __getstate__(self): state = self.__dict__.copy() if "logger" in self.kwargs.keys(): state["kwargs"]["logger"] = None return state def __setstate__(self, state): self.__dict__.update(state)
[docs] def init_estimator_(self): """Initialize rule set estimator and feature binarizer.""" self.M = OverlapBooleanRule(lambda0=self.lambda0, lambda1=self.lambda1, **self.kwargs) self.FeatureBinarizer = FeatureBinarizer( negations=self.negations, colCateg=self.cat_cols, numThresh=self.num_thresh, threshOverride=self.thresh_override, )
[docs] def fit(self, x, o=None): """ Fit rules for either characterizing support (if O is not provided) or for characterizing overlap, in which case O should be a vector indicating overlap by 1 and non-overlap by 0. This function is primarily a wrapper around the OverlapBooleanRule estimator, making sure that features are binarized before being fed into the ruleset estimator, constructing reference samples for the support characterization, and so on. :param x: Samples of covariates :type x: Pandas DataFrame or Numpy Array, shape (n, d) :param o: Binary indicator for whether or not a sample belongs in the overlap region, defaults to None. If provided, should have the same length as `x` :type o: Pandas DataFrame or Numpy Array, shape (n, ) """ n = x.shape[0] dim = x.shape[1] nRef = int(n * dim * self.n_ref_multiplier) if o is None: o = np.ones((n,)) # Convert to dataframe if not X = x if isinstance(x, pd.DataFrame) else pd.DataFrame(dict([("x%d" % i, x[:, i]) for i in range(x.shape[1])])) # Format labels o = o.values.ravel() if (isinstance(o, pd.DataFrame) or isinstance(o, pd.Series)) else o.ravel() # Sample from reference measure and construct features self.refSamples = sample_reference(X, n=nRef, seed=self.seed, ref_range=self.ref_range) # Add reference samples data = pd.concat([X, self.refSamples], axis=0, sort=False) o = np.hstack([o, -np.ones(nRef)]) # Binarize features (fit to data only) self.FeatureBinarizer.fit(data.iloc[:n]) X = self.FeatureBinarizer.transform(data) # Fit estimator self.M.fit(X, o) # Store reference volume if nRef > 0: self.relative_volume = self.predict(self.refSamples).mean() return self
[docs] def predict(self, x): """ Predict whether or not X lies in the overlap region (1 = True). :param x: Samples of covariates :type x: Pandas DataFrame or Numpy Array, shape (n, d) """ # Construct features dataframe data = ( x if isinstance(x, pd.DataFrame) else pd.DataFrame(dict([("x%d" % i, x[:, i]) for i in range(x.shape[1])])) ) X = self.FeatureBinarizer.transform(data).fillna(0) preds = self.M.predict(X) return preds
[docs] def predict_rules(self, x): """ Predict rules activated by x :param x: Samples of covariates :type x: Pandas DataFrame or Numpy Array, shape (n, d) :return: Matrix with binary values, of shape (n, r), where r is the total number of rules considered by the estimator, and where 1 indicates that the sample matches the rule, and 0 indicates otherwise. :rtype: Numpy Array, shape (n, r) """ # Construct features dataframe data = ( x if isinstance(x, pd.DataFrame) else pd.DataFrame(dict([("x%d" % i, x[:, i]) for i in range(x.shape[1])])) ) X = self.FeatureBinarizer.transform(data).fillna(0) return self.M.predict_rules(X)
[docs] def rules( self, as_str: bool = False, transform: Optional[Callable[[str, float], float]] = None, fmt: str = "%.3f", labels: Dict[str, str] = {}, ): """ Return rules learned by the estimator. :param as_str: Return a string if True, otherwise a dictionary, defaults to False :type as_str: bool, optional :param transform: A function that takes key-value pairs for rules and thresholds and transforms the value. This function is used to re-scale standardized data, defaults to None :type transform: Optional[Callable[[str, float], float]], optional :param fmt: Formatting string for float values, for printing rules with thresholds, defaults to "%.3f" :type fmt: str, optional :param labels: Dictionary mapping from original feature names to display names when printing rules, any feature not specified here will default to the original name, defaults to {} :type labels: Dict[str, str], optional """ w, z = (self.M.w, self.M.z) w_sel = np.where(w)[0] def t_(k, v): return v if transform is None else transform(k, v) C = [] for j in w_sel: index_j = z[z[j] == 1][j].index f = index_j.get_level_values(0).values o = index_j.get_level_values(1).values v = index_j.get_level_values(2).values l = [labels.get(a, a) for a in f] dis_j = [(l[i], o[i], t_(f[i], v[i])) for i in range(len(f))] C.append(dis_j) if as_str: return rule_str(C, fmt=fmt) else: return C
[docs] def get_params(self, deep=False): """Return estimator parameters""" params = dict([(k, getattr(self, k)) for k in self.valid_params]) if deep: return {**params, **self.M.get_params(deep=True)} else: return params
[docs] def set_params(self, **params): """Set estimator parameters""" if not params: return self reinit = False for k, v in params.items(): if k in self.valid_params: setattr(self, k, v) elif k in self.M.valid_params: reinit = True self.kwargs[k] = v if reinit: self.init_estimator_() return self