Source code for dowhy.causal_estimators.propensity_score_weighting_estimator

import numpy as np
import pandas as pd
from sklearn import linear_model

from dowhy.causal_estimator import CausalEstimate
from dowhy.causal_estimators.propensity_score_estimator import PropensityScoreEstimator


[docs]class PropensityScoreWeightingEstimator(PropensityScoreEstimator): """ Estimate effect of treatment by weighing the data by inverse probability of occurrence. Straightforward application of the back-door criterion. For a list of standard args and kwargs, see documentation for :class:`~dowhy.causal_estimator.CausalEstimator`. Supports additional parameters as listed below. """ def __init__( self, *args, min_ps_score=0.05, max_ps_score=0.95, weighting_scheme='ips_weight', propensity_score_model=None, recalculate_propensity_score=True, propensity_score_column="propensity_score", **kwargs): """ :param min_ps_score: Lower bound used to clip the propensity score. Default=0.05 :param max_ps_score: Upper bound used to clip the propensity score. Default=0.95 :param weighting_scheme: Weighting method to use. Can be inverse propensity score ("ips_weight", default), stabilized IPS score ("ips_stabilized_weight"), or normalized IPS score ("ips_normalized_weight"). :param propensity_score_model: The model used to compute propensity score. Can be any classification model that supports fit() and predict_proba() methods. If None, use LogisticRegression model as the default. Default=None :param recalculate_propensity_score: If true, force the estimator to estimate the propensity score. To use pre-computed propensity scores, set this value to false. Default=True :param propensity_score_column: Column name that stores the propensity score. Default='propensity_score' """ # Required to ensure that self.method_params contains all the information # to create an object of this class args_dict = kwargs args_dict.update({ 'min_ps_score': min_ps_score, 'max_ps_score': max_ps_score, 'weighting_scheme': weighting_scheme }) super().__init__( *args, propensity_score_model=propensity_score_model, recalculate_propensity_score=recalculate_propensity_score, propensity_score_column=propensity_score_column, **args_dict) self.logger.info("INFO: Using Propensity Score Weighting Estimator") self.symbolic_estimator = self.construct_symbolic_estimator( self._target_estimand) self.logger.info(self.symbolic_estimator) # Setting method specific parameters self.weighting_scheme = weighting_scheme self.min_ps_score = min_ps_score self.max_ps_score = max_ps_score def _estimate_effect(self): if self.recalculate_propensity_score is True: if self.propensity_score_model is None: self.propensity_score_model = linear_model.LogisticRegression() self.propensity_score_model.fit(self._observed_common_causes, self._treatment) self._data[self.propensity_score_column] = self.propensity_score_model.predict_proba(self._observed_common_causes)[:, 1] else: # check if user provides the propensity score column if self.propensity_score_column not in self._data.columns: raise ValueError(f"Propensity score column {self.propensity_score_column} does not exist. Please specify the column name that has your pre-computed propensity score.") else: self.logger.info(f"INFO: Using pre-computed propensity score in column {self.propensity_score_column}") # trim propensity score weights self._data[self.propensity_score_column] = np.minimum(self.max_ps_score, self._data[self.propensity_score_column]) self._data[self.propensity_score_column] = np.maximum(self.min_ps_score, self._data[self.propensity_score_column]) # ips ==> (isTreated(y)/ps(y)) + ((1-isTreated(y))/(1-ps(y))) # nips ==> ips / (sum of ips over all units) # icps ==> ps(y)/(1-ps(y)) / (sum of (ps(y)/(1-ps(y))) over all control units) # itps ==> ps(y)/(1-ps(y)) / (sum of (ps(y)/(1-ps(y))) over all treatment units) ipst_sum = sum(self._data[self._treatment_name[0]] / self._data[self.propensity_score_column]) ipsc_sum = sum((1 - self._data[self._treatment_name[0]]) / (1-self._data[self.propensity_score_column])) num_units = len(self._data[self._treatment_name[0]]) num_treatment_units = sum(self._data[self._treatment_name[0]]) num_control_units = num_units - num_treatment_units # Vanilla IPS estimator self._data['ips_weight'] = ( self._data[self._treatment_name[0]] / self._data[self.propensity_score_column] + (1 - self._data[self._treatment_name[0]]) / (1 - self._data[self.propensity_score_column]) ) self._data['tips_weight'] = ( self._data[self._treatment_name[0]] + (1 - self._data[self._treatment_name[0]]) * self._data[self.propensity_score_column] / (1 - self._data[self.propensity_score_column]) ) self._data['cips_weight'] = ( self._data[self._treatment_name[0]] * (1 - self._data[self.propensity_score_column]) / self._data[self.propensity_score_column] + (1 - self._data[self._treatment_name[0]]) ) # The Hajek estimator (or the self-normalized estimator) self._data['ips_normalized_weight'] = ( self._data[self._treatment_name[0]] / self._data[self.propensity_score_column] / ipst_sum + (1 - self._data[self._treatment_name[0]]) / (1 - self._data[self.propensity_score_column]) / ipsc_sum ) ipst_for_att_sum = sum(self._data[self._treatment_name[0]]) ipsc_for_att_sum = sum((1-self._data[self._treatment_name[0]])/(1 - self._data[self.propensity_score_column])*self._data[self.propensity_score_column]) self._data['tips_normalized_weight'] = ( self._data[self._treatment_name[0]] / ipst_for_att_sum + (1 - self._data[self._treatment_name[0]]) * self._data[self.propensity_score_column] / (1 - self._data[self.propensity_score_column]) / ipsc_for_att_sum ) ipst_for_atc_sum = sum(self._data[self._treatment_name[0]] / self._data[self.propensity_score_column] * (1-self._data[self.propensity_score_column])) ipsc_for_atc_sum = sum((1 - self._data[self._treatment_name[0]])) self._data['cips_normalized_weight'] = ( self._data[self._treatment_name[0]] * (1 - self._data[self.propensity_score_column]) / self._data[self.propensity_score_column] / ipst_for_atc_sum + (1 - self._data[self._treatment_name[0]])/ipsc_for_atc_sum ) # Stabilized weights (from Robins, Hernan, Brumback (2000)) # Paper: Marginal Structural Models and Causal Inference in Epidemiology p_treatment = sum(self._data[self._treatment_name[0]])/num_units self._data['ips_stabilized_weight'] = ( self._data[self._treatment_name[0]] / self._data[self.propensity_score_column] * p_treatment + (1 - self._data[self._treatment_name[0]]) / (1 - self._data[self.propensity_score_column]) * (1 - p_treatment) ) self._data['tips_stabilized_weight'] = ( self._data[self._treatment_name[0]] * p_treatment + (1 - self._data[self._treatment_name[0]]) * self._data[self.propensity_score_column] / (1 - self._data[self.propensity_score_column]) * (1 - p_treatment) ) self._data['cips_stabilized_weight'] = ( self._data[self._treatment_name[0]] * (1 - self._data[self.propensity_score_column]) / self._data[self.propensity_score_column] * p_treatment + (1 - self._data[self._treatment_name[0]]) * (1-p_treatment) ) if self._target_units == "ate": weighting_scheme_name = self.weighting_scheme elif self._target_units == "att": weighting_scheme_name = "t" + self.weighting_scheme elif self._target_units == "atc": weighting_scheme_name = "c" + self.weighting_scheme else: raise ValueError("Target units string value not supported") # Calculating the effect self._data['d_y'] = ( self._data[weighting_scheme_name] * self._data[self._treatment_name[0]] * self._data[self._outcome_name] ) self._data['dbar_y'] = ( self._data[weighting_scheme_name] * (1 - self._data[self._treatment_name[0]]) * self._data[self._outcome_name] ) sum_dy_weights = np.sum( self._data[self._treatment_name[0]] * self._data[weighting_scheme_name]) sum_dbary_weights = np.sum( (1 - self._data[self._treatment_name[0]]) * self._data[weighting_scheme_name]) # Subtracting the weighted means est = self._data['d_y'].sum() / sum_dy_weights - self._data['dbar_y'].sum() / sum_dbary_weights # TODO - how can we add additional information into the returned estimate? estimate = CausalEstimate(estimate=est, control_value=self._control_value, treatment_value=self._treatment_value, target_estimand=self._target_estimand, realized_estimand_expr=self.symbolic_estimator, propensity_scores=self._data[self.propensity_score_column]) return estimate
[docs] def construct_symbolic_estimator(self, estimand): expr = "b: " + ",".join(estimand.outcome_variable) + "~" # TODO -- fix: we are actually conditioning on positive treatment (d=1) var_list = estimand.treatment_variable + estimand.get_backdoor_variables() expr += "+".join(var_list) return expr