Source code for dowhy.causal_refuters.overrule.BCS.overlap_boolean_rule

"""OverlapBooleanRule.

This module implements the boolean ruleset estimator from OverRule [1]. Code is adapted (with some simplifications)
from https://github.com/clinicalml/overlap-code, under the MIT License.

[1] Oberst, M., Johansson, F., Wei, D., Gao, T., Brat, G., Sontag, D., & Varshney, K. (2020). Characterization of
Overlap in Observational Studies. In S. Chiappa & R. Calandra (Eds.), Proceedings of the Twenty Third International
Conference on Artificial Intelligence and Statistics (Vol. 108, pp. 788–798). PMLR. https://arxiv.org/abs/1907.04138
"""

import logging
from typing import Union

import cvxpy as cvx
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score

from .beam_search import beam_search


[docs]class OverlapBooleanRule: """Overlap Boolean Rule class in the style of scikit-learn""" def __init__( self, alpha=0.95, lambda0=1e-2, lambda1=1e-2, K=20, D=20, B=10, iterMax=10, eps=1e-6, silent=False, verbose=False, solver="ECOS", rounding="greedy_sweep", ): """ Learn Boolean Rules in Disjuntive Normal Form to describe the positive class. :param alpha: Fraction of the positive samples to ensure are included in the rules, defaults to 0.95 :type alpha: float, optional :param lambda0: Regularization on the # of rules, defaults to 1e-2 :type lambda0: float, optional :param lambda1: Regularization on the # of literals, defaults to 1e-2 :type lambda1: float, optional :param K: Maximum results returned during beam search, defaults to 20 :type K: int, optional :param D: Maximum extra rules per beam seach iteration, defaults to 20 :type D: int, optional :param B: Width of beam search, defaults to 10 :type B: int, optional :param iterMax: Maximum number of iterations of column generation, defaults to 10 :type iterMax: int, optional :param eps: Numerical tolerance on comparisons, defaults to 1e-6 :type eps: float, optional :param silent: Silence non-optimizer output, defaults to False :type silent: bool :param verbose: Verbose optimizer output, defaults to False :type verbose: bool, optional :param solver: Linear programming solver used by CVXPY to solve the LP relaxation, defaults to 'ECOS' :type solver: str, optional :param rounding: Strategy to perform rounding, either 'greedy' or 'greedy_sweep', defaults to 'greedy_sweep' :type rounding: str, optional """ self.alpha = alpha self.lambda0 = lambda0 self.lambda1 = lambda1 self.K = K self.D = D self.B = B self.iterMax = iterMax self.eps = eps self.silent = silent self.verbose = verbose self.solver = solver self.rounding = rounding self.logger = logging.getLogger(__name__) self.lp_obj_value = None # For get_params / set_params self.valid_params = [ "alpha", "lambda0", "lambda1", "K", "D", "B", "iterMax", "silent", "eps", ] def __getstate__(self): state = self.__dict__.copy() del state["logger"] return state def __setstate__(self, state): self.__dict__.update(state) self.logger = None
[docs] def fit(self, X: pd.DataFrame, y: Union[np.ndarray, pd.DataFrame]): """ Fit model to training data. :param X: Pandas DataFrame containing covariates :param y: +1 for Overlap/Support (depending on rules being learned), 0 for non-overlap, and -1 for background samples. Should only contain (+1/0) for overlap rules, or (+1/-1) for learning support rules. """ if not self.silent: self.logger.info("Learning Boolean rule set on DNF form with hamming loss") # Overlap / Support (y = +1), non-overlap (y = 0), and uniform background (y = -1) samples O = np.where(y > 0)[0] N = np.where(y == 0)[0] U = np.where(y < 0)[0] nO = len(O) nN = len(N) nU = len(U) # We should always have overlap samples, and either background or non-overlap samples # This will throw an error if, for example, all samples are considered to # be in the overlap region assert nO > 0 and ( nU > 0 or nN > 0 ), "Recieved positive samples, but no negative samples for learning Boolean Rules" assert nU == 0 or nN == 0 # Initialize with empty and singleton conjunctions, i.e. X plus all-ones feature # Feature indicator and conjunction matrices z = pd.DataFrame(np.eye(X.shape[1], X.shape[1] + 1, 1, dtype=int), index=X.columns) A = np.hstack((np.ones((X.shape[0], 1), dtype=int), X)) # Iteration counter self.it = 0 # Formulate master LP # Variables w = cvx.Variable(A.shape[1], nonneg=True) xiO = cvx.Variable(nO, nonneg=True) # Objective function (no penalty on empty conjunction) lambdas = self.lambda0 + self.lambda1 * z.sum().values lambdas[0] = 0 if nU: obj = cvx.Minimize(cvx.sum(A[U, :] @ w) / nU + lambdas @ w) elif nN: obj = cvx.Minimize(cvx.sum(A[N, :] @ w) / nN + lambdas @ w) # Constraints # This gets activated for DNF constraints = [cvx.sum(xiO) <= (1 - self.alpha) * nO, xiO + A[O, :] @ w >= 1] # Solve problem prob = cvx.Problem(obj, constraints) prob.solve(verbose=self.verbose, solver=self.solver) # Extract dual variables r = np.zeros_like(y, dtype=float) r[O] = -constraints[1].dual_value if nU: r[U] = 1.0 / nU elif nN: r[N] = 1.0 / nN if not self.silent: self.logger.info("Initial solve completed") # Beam search for conjunctions with negative reduced cost # Most negative reduced cost among current variables UB = np.dot(r, A) + lambdas UB = min(UB.min(), 0) v, zNew, Anew = beam_search(r, X, self.lambda0, self.lambda1, K=self.K, UB=UB, eps=self.eps, B=self.B, D=self.D) while (v < -self.eps).any() and (self.it < self.iterMax): # Negative reduced costs found self.it += 1 if not self.silent: self.logger.info("Iteration: %d, Objective: %.4f" % (self.it, prob.value)) # Add to existing conjunctions z = pd.concat([z, zNew], axis=1, ignore_index=True) A = np.concatenate((A, Anew), axis=1) # Reformulate master LP # Variables w = cvx.Variable(A.shape[1], nonneg=True) # Objective function lambdas = np.concatenate((lambdas, self.lambda0 + self.lambda1 * zNew.sum().values)) if nU: obj = cvx.Minimize(cvx.sum(A[U, :] @ w) / nU + lambdas @ w) elif nN: obj = cvx.Minimize(cvx.sum(A[N, :] @ w) / nN + lambdas @ w) # Constraints constraints = [cvx.sum(xiO) <= (1 - self.alpha) * nO, xiO + A[O, :] @ w >= 1] # Solve problem prob = cvx.Problem(obj, constraints) prob.solve(verbose=self.verbose, solver=self.solver) # Extract dual variables r = np.zeros_like(y, dtype=float) r[O] = -constraints[1].dual_value if nU: r[U] = 1.0 / nU elif nN: r[N] = 1.0 / nN # Beam search for conjunctions with negative reduced cost # Most negative reduced cost among current variables UB = np.dot(r, A) + lambdas # print('UB.min():', UB.min()) UB = min(UB.min(), 0) v, zNew, Anew = beam_search( r, X, self.lambda0, self.lambda1, K=self.K, UB=UB, eps=self.eps, B=self.B, D=self.D ) # Save generated conjunctions and coefficients self.z = z w = w.value self.w_raw = w self.lp_obj_value = prob.value self.round_(X, y, scoring=self.rounding)
[docs] def greedy_round_(self, X: pd.DataFrame, y: Union[np.ndarray, pd.DataFrame], xi: float = 0.5, use_lp: bool = False): """ Round the rule coefficients to integer values. For DNF, this starts with no conjunctions, and adds them greedily based on a cost, which penalizes (any) inclusion of negative samples, and rewards (new) inclusion of positive samples, and goes until it covers at least alpha fraction of positive samples. :param X: Pandas DataFrame containing covariates :param y: +1 for Overlap/Support (depending on rules being learned), 0 for non-overlap, and -1 for background samples. Should only contain (+1/0) for overlap rules, or (+1/-1) for learning support rules. :param xi: Reward for including positive samples, relative to cost (1) for including negative samples :param use_lp: Restrict to those conjuctions where the LP coefficients are positive. Note that the LP makes a difference regardless, as we only consider the rules generated by column generation here. """ A = self.compute_conjunctions(X) R = np.arange(0, A.shape[1]) # Remaining conjunctions U = [] # Used conjunctions C = np.zeros(X.shape[0]) # Coverage indicator MAX_ITER = 1000 i = 0 # Restrict conjunctions to those used by LP if use_lp: R = [R[i] for i in range(len(R)) if self.w_raw[i] > 0] # NOTE: This is a greedy approach, so it does not incorporate lambda0 explicitly # Similarly, it will prefer a larger number of smaller rules if lambda1 is set # to a larger value, because the incremental cost will be lower. while (i < MAX_ITER) and (C[y == 1].mean() < self.alpha): if (y == 0).sum() > 0: neg_cover = (A[(y == 0), :][:, R]).mean(0) else: neg_cover = 0 if (y == -1).sum() > 0: # Fraction of reference samples that each conjunction covers ref_cover = (A[(y == -1), :][:, R]).mean(0) else: ref_cover = 0 # Regularization (for each conjunction) reg = self.lambda1 * self.z.values[:, R].sum(0) # Positive samples newly covered (for each conjunction) pos_new_cover = (A[(y == 1) & (C < 1), :][:, R] + 1e-8).mean(0) # Costs (for each conjunction) costs = neg_cover + ref_cover + reg - xi * pos_new_cover r = np.argmin(costs) # Find min-cost conjunction C = (C + A[:, R[r]]) > 0.0 # Update coverage U.append(R[r]) R = np.array([R[i] for i in range(len(R)) if not i == r]) i += 1 # Zero out the rules and only take those which are used self.w = np.zeros(A.shape[1]) self.w[U] = 1
[docs] def round_( self, X: pd.DataFrame, y: Union[np.ndarray, pd.DataFrame], scoring: str = "greedy", xi=None, use_lp: bool = True, ): """ Round the rule coefficients to integer values via a greedy approach, either using a fixed reward (`scoring="greedy"`) or optimizing the reward for including positive examples according to balanced accuracy on classifying positive vs negative samples (`scoring="greedy_sweep`). :param X: Pandas DataFrame containing covariates :param y: +1 for Overlap/Support (depending on rules being learned), 0 for non-overlap, and -1 for background samples. Should only contain (+1/0) for overlap rules, or (+1/-1) for learning support rules. :param xi: Reward for including positive samples, relative to cost (1) for including negative samples. For `scoring="greedy"`, should be a single value, or an array of values for `scoring="greedy_sweep"`. For the latter, will default to `np.logspace(np.log10(0.01), 0.5, 20)`. :param use_lp: Restrict to those conjuctions where the LP coefficients are positive. Note that the LP makes a difference regardless, as we only consider the rules generated by column generation here. """ if scoring == "greedy": if xi is None: xi = 0.5 self.greedy_round_(X, y, xi=xi, use_lp=use_lp) elif scoring == "greedy_sweep": if xi is None: xi = np.logspace(np.log10(0.01), 0.5, 20) xis = np.array([xi]).ravel() best_xi = xis[0] if len(xis) > 1: best_xi = None best_auc = 0 for xii in xis: self.greedy_round_(X, y, xi=xii, use_lp=use_lp) auc = roc_auc_score(y, self.predict(X)) # Small tolerance on comparisons # This can be useful to break ties and favor larger values of xi if auc > best_auc - 0.001: best_xi = xii if auc > best_auc: best_auc = auc self.greedy_round_(X, y, xi=best_xi, use_lp=use_lp)
[docs] def get_objective_value(self, X, o, rounded=True): if rounded: w = self.w else: w = self.w_raw U = np.where(o < 0)[0] nU = len(U) assert nU > 0 A = self.compute_conjunctions(X) lambdas = self.lambda0 + self.lambda1 * self.z.sum().values lambdas[0] = 0 obj = np.sum(A[U, :].dot(w)) / nU + lambdas.dot(w) return obj
[docs] def compute_conjunctions(self, X): """Compute conjunctions of features specified in self.z""" try: A = 1 - (np.dot(1 - X, self.z) > 0) except AttributeError: print("Attribute 'z' does not exist, please fit model first.") return A
[docs] def predict_(self, X, w): """Predict whether points belong to overlap region""" # Compute conjunctions of features A = self.compute_conjunctions(X) # Predict labels return (np.dot(A, w) > 0).astype(int)
[docs] def predict(self, X): """Predict whether points belong to overlap region""" # Use helper function return self.predict_(X, self.w)
[docs] def predict_rules(self, X): """Predict whether points belong to overlap region""" # Use helper function A = self.compute_conjunctions(X) return ((A * self.w) > 0).astype(int)
[docs] def get_params(self): """Returns estimator parameters""" return dict([(k, getattr(self, k)) for k in self.valid_params])
[docs] def set_params(self, **params): """Sets estimator parameters""" if not params: return self for k, v in params.items(): if k in self.valid_params: setattr(self, k, v) return self