import copy
import itertools
import logging
import sympy as sp
import sympy.stats as spstats
import dowhy.utils.cli_helpers as cli
from dowhy.utils.api import parse_state
[docs]class CausalIdentifier:
"""Class that implements different identification methods.
Currently supports backdoor and instrumental variable identification methods. The identification is based on the causal graph provided.
Other specific ways of identification, such as the ID* algorithm, minimal adjustment criteria, etc. will be added in the future.
If you'd like to contribute, please raise an issue or a pull request on Github.
"""
NONPARAMETRIC_ATE="nonparametric-ate"
NONPARAMETRIC_NDE="nonparametric-nde"
NONPARAMETRIC_NIE="nonparametric-nie"
MAX_BACKDOOR_ITERATIONS = 100000
# Backdoor method names
BACKDOOR_DEFAULT="default"
BACKDOOR_EXHAUSTIVE="exhaustive-search"
BACKDOOR_MIN="minimal-adjustment"
BACKDOOR_MAX="maximal-adjustment"
METHOD_NAMES = {BACKDOOR_DEFAULT, BACKDOOR_EXHAUSTIVE, BACKDOOR_MIN, BACKDOOR_MAX}
DEFAULT_BACKDOOR_METHOD = BACKDOOR_DEFAULT
def __init__(self, graph, estimand_type,
method_name = "default",
proceed_when_unidentifiable=False):
self._graph = graph
self.estimand_type = estimand_type
self.treatment_name = graph.treatment_name
self.outcome_name = graph.outcome_name
self.method_name = method_name
self._proceed_when_unidentifiable = proceed_when_unidentifiable
self.logger = logging.getLogger(__name__)
[docs] def identify_effect(self, optimize_backdoor=False):
"""Main method that returns an identified estimand (if one exists).
If estimand_type is non-parametric ATE, then uses backdoor, instrumental variable and frontdoor identification methods, to check if an identified estimand exists, based on the causal graph.
:param self: instance of the CausalIdentifier class (or its subclass)
:returns: target estimand, an instance of the IdentifiedEstimand class
"""
# First, check if there is a directed path from action to outcome
if not self._graph.has_directed_path(self.treatment_name, self.outcome_name):
self.logger.warn("No directed path from treatment to outcome. Causal Effect is zero.")
return IdentifiedEstimand(self,
treatment_variable=self.treatment_name,
outcome_variable=self.outcome_name,
no_directed_path=True)
if self.estimand_type == CausalIdentifier.NONPARAMETRIC_ATE:
return self.identify_ate_effect(optimize_backdoor=optimize_backdoor)
elif self.estimand_type == CausalIdentifier.NONPARAMETRIC_NDE:
return self.identify_nde_effect()
elif self.estimand_type == CausalIdentifier.NONPARAMETRIC_NIE:
return self.identify_nie_effect()
else:
raise ValueError("Estimand type is not supported. Use either {0}, {1}, or {2}.".format(
CausalIdentifier.NONPARAMETRIC_ATE,
CausalIdentifier.NONPARAMETRIC_NDE,
CausalIdentifier.NONPARAMETRIC_NIE))
[docs] def identify_ate_effect(self, optimize_backdoor):
estimands_dict = {}
mediation_first_stage_confounders = None
mediation_second_stage_confounders = None
### 1. BACKDOOR IDENTIFICATION
# First, checking if there are any valid backdoor adjustment sets
if optimize_backdoor == False:
backdoor_sets = self.identify_backdoor(self.treatment_name, self.outcome_name)
else:
from dowhy.causal_identifiers.backdoor import Backdoor
path = Backdoor(self._graph._graph, self.treatment_name, self.outcome_name)
backdoor_sets = path.get_backdoor_vars()
estimands_dict, backdoor_variables_dict = self.build_backdoor_estimands_dict(
self.treatment_name,
self.outcome_name,
backdoor_sets,
estimands_dict)
# Setting default "backdoor" identification adjustment set
default_backdoor_id = self.get_default_backdoor_set_id(backdoor_variables_dict)
if len(backdoor_variables_dict) > 0:
estimands_dict["backdoor"] = estimands_dict.get(str(default_backdoor_id), None)
backdoor_variables_dict["backdoor"] = backdoor_variables_dict.get(str(default_backdoor_id), None)
else:
estimands_dict["backdoor"] = None
### 2. INSTRUMENTAL VARIABLE IDENTIFICATION
# Now checking if there is also a valid iv estimand
instrument_names = self._graph.get_instruments(self.treatment_name,
self.outcome_name)
self.logger.info("Instrumental variables for treatment and outcome:" +
str(instrument_names))
if len(instrument_names) > 0:
iv_estimand_expr = self.construct_iv_estimand(
self.estimand_type,
self._graph.treatment_name,
self._graph.outcome_name,
instrument_names
)
self.logger.debug("Identified expression = " + str(iv_estimand_expr))
estimands_dict["iv"] = iv_estimand_expr
else:
estimands_dict["iv"] = None
### 3. FRONTDOOR IDENTIFICATION
# Now checking if there is a valid frontdoor variable
frontdoor_variables_names = self.identify_frontdoor()
self.logger.info("Frontdoor variables for treatment and outcome:" +
str(frontdoor_variables_names))
if len(frontdoor_variables_names) >0:
frontdoor_estimand_expr = self.construct_frontdoor_estimand(
self.estimand_type,
self._graph.treatment_name,
self._graph.outcome_name,
frontdoor_variables_names
)
self.logger.debug("Identified expression = " + str(frontdoor_estimand_expr))
estimands_dict["frontdoor"] = frontdoor_estimand_expr
mediation_first_stage_confounders = self.identify_mediation_first_stage_confounders(self.treatment_name, frontdoor_variables_names)
mediation_second_stage_confounders = self.identify_mediation_second_stage_confounders(frontdoor_variables_names, self.outcome_name)
else:
estimands_dict["frontdoor"] = None
# Finally returning the estimand object
estimand = IdentifiedEstimand(
self,
treatment_variable=self._graph.treatment_name,
outcome_variable=self._graph.outcome_name,
estimand_type=self.estimand_type,
estimands=estimands_dict,
backdoor_variables=backdoor_variables_dict,
instrumental_variables=instrument_names,
frontdoor_variables=frontdoor_variables_names,
mediation_first_stage_confounders=mediation_first_stage_confounders,
mediation_second_stage_confounders=mediation_second_stage_confounders,
default_backdoor_id = default_backdoor_id
)
return estimand
[docs] def identify_nie_effect(self):
estimands_dict = {}
### 1. FIRST DOING BACKDOOR IDENTIFICATION
# First, checking if there are any valid backdoor adjustment sets
backdoor_sets = self.identify_backdoor(self.treatment_name, self.outcome_name)
estimands_dict, backdoor_variables_dict = self.build_backdoor_estimands_dict(
self.treatment_name,
self.outcome_name,
backdoor_sets,
estimands_dict)
# Setting default "backdoor" identification adjustment set
default_backdoor_id = self.get_default_backdoor_set_id(backdoor_variables_dict)
backdoor_variables_dict["backdoor"] = backdoor_variables_dict.get(str(default_backdoor_id), None)
### 2. SECOND, CHECKING FOR MEDIATORS
# Now checking if there are valid mediator variables
estimands_dict = {} # Need to reinitialize this dictionary to avoid including the backdoor sets
mediation_first_stage_confounders = None
mediation_second_stage_confounders = None
mediators_names = self.identify_mediation()
self.logger.info("Mediators for treatment and outcome:" +
str(mediators_names))
if len(mediators_names) >0:
mediation_estimand_expr = self.construct_mediation_estimand(
self.estimand_type,
self._graph.treatment_name,
self._graph.outcome_name,
mediators_names
)
self.logger.debug("Identified expression = " + str(mediation_estimand_expr))
estimands_dict["mediation"] = mediation_estimand_expr
mediation_first_stage_confounders = self.identify_mediation_first_stage_confounders(self.treatment_name, mediators_names)
mediation_second_stage_confounders = self.identify_mediation_second_stage_confounders(mediators_names, self.outcome_name)
else:
estimands_dict["mediation"] = None
# Finally returning the estimand object
estimand = IdentifiedEstimand(
self,
treatment_variable=self._graph.treatment_name,
outcome_variable=self._graph.outcome_name,
estimand_type=self.estimand_type,
estimands=estimands_dict,
backdoor_variables=backdoor_variables_dict,
instrumental_variables=None,
frontdoor_variables=None,
mediator_variables=mediators_names,
mediation_first_stage_confounders=mediation_first_stage_confounders,
mediation_second_stage_confounders=mediation_second_stage_confounders,
default_backdoor_id = None
)
return estimand
[docs] def identify_nde_effect(self):
estimands_dict = {}
### 1. FIRST DOING BACKDOOR IDENTIFICATION
# First, checking if there are any valid backdoor adjustment sets
backdoor_sets = self.identify_backdoor(self.treatment_name, self.outcome_name)
estimands_dict, backdoor_variables_dict = self.build_backdoor_estimands_dict(
self.treatment_name,
self.outcome_name,
backdoor_sets,
estimands_dict)
# Setting default "backdoor" identification adjustment set
default_backdoor_id = self.get_default_backdoor_set_id(backdoor_variables_dict)
backdoor_variables_dict["backdoor"] = backdoor_variables_dict.get(str(default_backdoor_id), None)
### 2. SECOND, CHECKING FOR MEDIATORS
# Now checking if there are valid mediator variables
estimands_dict = {}
mediation_first_stage_confounders = None
mediation_second_stage_confounders = None
mediators_names = self.identify_mediation()
self.logger.info("Mediators for treatment and outcome:" +
str(mediators_names))
if len(mediators_names) >0:
mediation_estimand_expr = self.construct_mediation_estimand(
self.estimand_type,
self._graph.treatment_name,
self._graph.outcome_name,
mediators_names
)
self.logger.debug("Identified expression = " + str(mediation_estimand_expr))
estimands_dict["mediation"] = mediation_estimand_expr
mediation_first_stage_confounders = self.identify_mediation_first_stage_confounders(self.treatment_name, mediators_names)
mediation_second_stage_confounders = self.identify_mediation_second_stage_confounders(mediators_names, self.outcome_name)
else:
estimands_dict["mediation"] = None
# Finally returning the estimand object
estimand = IdentifiedEstimand(
self,
treatment_variable=self._graph.treatment_name,
outcome_variable=self._graph.outcome_name,
estimand_type=self.estimand_type,
estimands=estimands_dict,
backdoor_variables=backdoor_variables_dict,
instrumental_variables=None,
frontdoor_variables=None,
mediator_variables=mediators_names,
mediation_first_stage_confounders=mediation_first_stage_confounders,
mediation_second_stage_confounders=mediation_second_stage_confounders,
default_backdoor_id = None
)
return estimand
[docs] def identify_backdoor(self, treatment_name, outcome_name,
include_unobserved=False, dseparation_algo="default"):
backdoor_sets = []
backdoor_paths = None
bdoor_graph = None
if dseparation_algo == "naive":
backdoor_paths = self._graph.get_backdoor_paths(treatment_name, outcome_name)
elif dseparation_algo == "default":
bdoor_graph = self._graph.do_surgery(treatment_name,
remove_outgoing_edges=True)
else:
raise ValueError(f"d-separation algorithm {dseparation_algo} is not supported")
method_name = self.method_name if self.method_name != CausalIdentifier.BACKDOOR_DEFAULT else CausalIdentifier.DEFAULT_BACKDOOR_METHOD
# First, checking if empty set is a valid backdoor set
empty_set = set()
check = self._graph.check_valid_backdoor_set(treatment_name,
outcome_name, empty_set,
backdoor_paths=backdoor_paths, new_graph=bdoor_graph,
dseparation_algo=dseparation_algo)
if check["is_dseparated"]:
backdoor_sets.append({'backdoor_set':empty_set})
# If the method is `minimal-adjustment`, return the empty set right away.
if method_name == CausalIdentifier.BACKDOOR_MIN:
return backdoor_sets
# Second, checking for all other sets of variables. If include_unobserved is false, then only observed variables are eligible.
eligible_variables = self._graph.get_all_nodes(include_unobserved=include_unobserved) \
- set(treatment_name) \
- set(outcome_name)
eligible_variables -= self._graph.get_descendants(treatment_name)
# If var is d-separated from both treatment or outcome, it cannot
# be a part of the backdoor set
filt_eligible_variables = set()
for var in eligible_variables:
dsep_treat_var = self._graph.check_dseparation(
treatment_name, parse_state(var),
set())
dsep_outcome_var = self._graph.check_dseparation(
outcome_name, parse_state(var), set())
if not dsep_outcome_var or not dsep_treat_var:
filt_eligible_variables.add(var)
if method_name in CausalIdentifier.METHOD_NAMES:
backdoor_sets, found_valid_adjustment_set = self.find_valid_adjustment_sets(
treatment_name, outcome_name,
backdoor_paths, bdoor_graph,
dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name=method_name,
max_iterations= CausalIdentifier.MAX_BACKDOOR_ITERATIONS)
if method_name == CausalIdentifier.BACKDOOR_DEFAULT and found_valid_adjustment_set:
# repeat the above search with BACKDOOR_MIN
backdoor_sets, _ = self.find_valid_adjustment_sets(
treatment_name, outcome_name,
backdoor_paths, bdoor_graph,
dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name=CausalIdentifier.BACKDOOR_MIN,
max_iterations= CausalIdentifier.MAX_BACKDOOR_ITERATIONS)
else:
raise ValueError(f"Identifier method {method_name} not supported. Try one of the following: {CausalIdentifier.METHOD_NAMES}")
return backdoor_sets
[docs] def find_valid_adjustment_sets(self, treatment_name, outcome_name,
backdoor_paths, bdoor_graph, dseparation_algo,
backdoor_sets, filt_eligible_variables,
method_name, max_iterations):
num_iterations = 0
found_valid_adjustment_set = False
all_nodes_observed = self._graph.all_observed(self._graph.get_all_nodes())
# If `minimal-adjustment` method is specified, start the search from the set with minimum size. Otherwise, start from the largest.
set_sizes = range(1, len(filt_eligible_variables) + 1, 1) if method_name == CausalIdentifier.BACKDOOR_MIN else range(len(filt_eligible_variables), 0, -1)
for size_candidate_set in set_sizes:
for candidate_set in itertools.combinations(filt_eligible_variables, size_candidate_set):
check = self._graph.check_valid_backdoor_set(treatment_name,
outcome_name, candidate_set,
backdoor_paths=backdoor_paths,
new_graph = bdoor_graph,
dseparation_algo = dseparation_algo)
self.logger.debug("Candidate backdoor set: {0}, is_dseparated: {1}".format(candidate_set, check["is_dseparated"]))
if check["is_dseparated"]:
backdoor_sets.append({'backdoor_set': candidate_set})
found_valid_adjustment_set = True
num_iterations += 1
if method_name == CausalIdentifier.BACKDOOR_EXHAUSTIVE and num_iterations > max_iterations:
self.logger.warning(f"Max number of iterations {max_iterations} reached.")
break
# If the backdoor method is `maximal-adjustment` or `minimal-adjustment`, return the first found adjustment set.
if method_name in {CausalIdentifier.BACKDOOR_DEFAULT, CausalIdentifier.BACKDOOR_MAX, CausalIdentifier.BACKDOOR_MIN} and found_valid_adjustment_set:
break
# If all variables are observed, and the biggest eligible set
# does not satisfy backdoor, then none of its subsets will.
if method_name in {CausalIdentifier.BACKDOOR_DEFAULT, CausalIdentifier.BACKDOOR_MAX} and all_nodes_observed:
break
if num_iterations > max_iterations:
self.logger.warning(f"Max number of iterations {max_iterations} reached. Could not find a valid backdoor set.")
break
return backdoor_sets, found_valid_adjustment_set
[docs] def get_default_backdoor_set_id(self, backdoor_sets_dict):
# Adding a None estimand if no backdoor set found
if len(backdoor_sets_dict) == 0:
return None
# Default set contains minimum possible number of instrumental variables, to prevent lowering variance in the treatment variable.
instrument_names = set(self._graph.get_instruments(self.treatment_name, self.outcome_name))
iv_count_dict = {key: len(set(bdoor_set).intersection(instrument_names)) for key, bdoor_set in backdoor_sets_dict.items()}
min_iv_count = min(iv_count_dict.values())
min_iv_keys = {key for key, iv_count in iv_count_dict.items() if iv_count == min_iv_count}
min_iv_backdoor_sets_dict = {key: backdoor_sets_dict[key] for key in min_iv_keys}
# Default set is the one with the least number of adjustment variables (optimizing for efficiency)
min_set_length = 1000000
default_key = None
for key, bdoor_set in min_iv_backdoor_sets_dict.items():
if len(bdoor_set) < min_set_length:
min_set_length = len(bdoor_set)
default_key = key
return default_key
[docs] def build_backdoor_estimands_dict(self, treatment_name, outcome_name,
backdoor_sets, estimands_dict, proceed_when_unidentifiable=None):
"""Build the final dict for backdoor sets by filtering unobserved variables if needed.
"""
backdoor_variables_dict = {}
if proceed_when_unidentifiable is None:
proceed_when_unidentifiable = self._proceed_when_unidentifiable
is_identified = [ self._graph.all_observed(bset["backdoor_set"]) for bset in backdoor_sets ]
if any(is_identified):
self.logger.info("Causal effect can be identified.")
backdoor_sets_arr = [list(
bset["backdoor_set"])
for bset in backdoor_sets
if self._graph.all_observed(bset["backdoor_set"]) ]
else: # there is unobserved confounding
self.logger.warning("Backdoor identification failed.")
backdoor_sets_arr = []
for i in range(len(backdoor_sets_arr)):
backdoor_estimand_expr = self.construct_backdoor_estimand(
self.estimand_type, treatment_name,
outcome_name, backdoor_sets_arr[i])
self.logger.debug("Identified expression = " + str(backdoor_estimand_expr))
estimands_dict["backdoor"+str(i+1)] = backdoor_estimand_expr
backdoor_variables_dict["backdoor"+str(i+1)] = backdoor_sets_arr[i]
return estimands_dict, backdoor_variables_dict
[docs] def identify_frontdoor(self, dseparation_algo="default"):
""" Find a valid frontdoor variable if it exists.
Currently only supports a single variable frontdoor set.
"""
frontdoor_var = None
frontdoor_paths = None
fdoor_graph = None
if dseparation_algo == "default":
cond1_graph = self._graph.do_surgery(self.treatment_name,
remove_incoming_edges=True)
bdoor_graph1 = self._graph.do_surgery(self.treatment_name,
remove_outgoing_edges=True)
elif dseparation_algo == "naive":
frontdoor_paths = self._graph.get_all_directed_paths(self.treatment_name, self.outcome_name)
else:
raise ValueError(f"d-separation algorithm {dseparation_algo} is not supported")
eligible_variables = self._graph.get_descendants(self.treatment_name) \
- set(self.outcome_name) \
- set(self._graph.get_descendants(self.outcome_name))
# For simplicity, assuming a one-variable frontdoor set
for candidate_var in eligible_variables:
# Cond 1: All directed paths intercepted by candidate_var
cond1 = self._graph.check_valid_frontdoor_set(
self.treatment_name, self.outcome_name,
parse_state(candidate_var),
frontdoor_paths=frontdoor_paths,
new_graph=cond1_graph,
dseparation_algo=dseparation_algo)
self.logger.debug("Candidate frontdoor set: {0}, is_dseparated: {1}".format(candidate_var, cond1))
if not cond1:
continue
# Cond 2: No confounding between treatment and candidate var
cond2 = self._graph.check_valid_backdoor_set(
self.treatment_name, parse_state(candidate_var),
set(),
backdoor_paths=None,
new_graph= bdoor_graph1,
dseparation_algo=dseparation_algo)
if not cond2:
continue
# Cond 3: treatment blocks all confounding between candidate_var and outcome
bdoor_graph2 = self._graph.do_surgery(candidate_var,
remove_outgoing_edges=True)
cond3 = self._graph.check_valid_backdoor_set(
parse_state(candidate_var), self.outcome_name,
self.treatment_name,
backdoor_paths=None,
new_graph= bdoor_graph2,
dseparation_algo=dseparation_algo)
is_valid_frontdoor = cond1 and cond2 and cond3
if is_valid_frontdoor:
frontdoor_var = candidate_var
break
return parse_state(frontdoor_var)
[docs] def construct_backdoor_estimand(self, estimand_type, treatment_name,
outcome_name, common_causes):
# TODO: outputs string for now, but ideally should do symbolic
# expressions Mon 19 Feb 2018 04:54:17 PM DST
# TODO Better support for multivariate treatments
expr = None
outcome_name = outcome_name[0]
num_expr_str = outcome_name
if len(common_causes)>0:
num_expr_str += "|" + ",".join(common_causes)
expr = "d(" + num_expr_str + ")/d" + ",".join(treatment_name)
sym_mu = sp.Symbol("mu")
sym_sigma = sp.Symbol("sigma", positive=True)
sym_outcome = spstats.Normal(num_expr_str, sym_mu, sym_sigma)
sym_treatment_symbols = [sp.Symbol(t) for t in treatment_name]
sym_treatment = sp.Array(sym_treatment_symbols)
sym_conditional_outcome = spstats.Expectation(sym_outcome)
sym_effect = sp.Derivative(sym_conditional_outcome, sym_treatment)
sym_assumptions = {
'Unconfoundedness': (
u"If U\N{RIGHTWARDS ARROW}{{{0}}} and U\N{RIGHTWARDS ARROW}{1}"
" then P({1}|{0},{2},U) = P({1}|{0},{2})"
).format(",".join(treatment_name), outcome_name, ",".join(common_causes))
}
estimand = {
'estimand': sym_effect,
'assumptions': sym_assumptions
}
return estimand
[docs] def construct_iv_estimand(self, estimand_type, treatment_name,
outcome_name, instrument_names):
# TODO: support multivariate treatments better.
expr = None
outcome_name = outcome_name[0]
sym_outcome = spstats.Normal(outcome_name, 0, 1)
sym_treatment_symbols = [spstats.Normal(t, 0, 1) for t in treatment_name]
sym_treatment = sp.Array(sym_treatment_symbols)
sym_instrument_symbols = [sp.Symbol(inst) for inst in instrument_names]
sym_instrument = sp.Array(sym_instrument_symbols) # ",".join(instrument_names))
sym_outcome_derivative = sp.Derivative(sym_outcome, sym_instrument)
sym_treatment_derivative = sp.Derivative(sym_treatment, sym_instrument)
sym_effect = spstats.Expectation(sym_outcome_derivative / sym_treatment_derivative)
sym_assumptions = {
"As-if-random": (
"If U\N{RIGHTWARDS ARROW}\N{RIGHTWARDS ARROW}{0} then "
"\N{NOT SIGN}(U \N{RIGHTWARDS ARROW}\N{RIGHTWARDS ARROW}{{{1}}})"
).format(outcome_name, ",".join(instrument_names)),
"Exclusion": (
u"If we remove {{{0}}}\N{RIGHTWARDS ARROW}{{{1}}}, then "
u"\N{NOT SIGN}({{{0}}}\N{RIGHTWARDS ARROW}{2})"
).format(",".join(instrument_names), ",".join(treatment_name),
outcome_name)
}
estimand = {
'estimand': sym_effect,
'assumptions': sym_assumptions
}
return estimand
[docs] def construct_frontdoor_estimand(self, estimand_type, treatment_name,
outcome_name, frontdoor_variables_names):
# TODO: support multivariate treatments better.
expr = None
outcome_name = outcome_name[0]
sym_outcome = spstats.Normal(outcome_name, 0, 1)
sym_treatment_symbols = [spstats.Normal(t, 0, 1) for t in treatment_name]
sym_treatment = sp.Array(sym_treatment_symbols)
sym_frontdoor_symbols = [sp.Symbol(inst) for inst in frontdoor_variables_names]
sym_frontdoor = sp.Array(sym_frontdoor_symbols) # ",".join(instrument_names))
sym_outcome_derivative = sp.Derivative(sym_outcome, sym_frontdoor)
sym_treatment_derivative = sp.Derivative(sym_frontdoor, sym_treatment)
sym_effect = spstats.Expectation(sym_treatment_derivative * sym_outcome_derivative)
sym_assumptions = {
"Full-mediation": (
"{2} intercepts (blocks) all directed paths from {0} to {1}."
).format(",".join(treatment_name), ",".join(outcome_name), ",".join(frontdoor_variables_names)),
"First-stage-unconfoundedness": (
u"If U\N{RIGHTWARDS ARROW}{{{0}}} and U\N{RIGHTWARDS ARROW}{{{1}}}"
" then P({1}|{0},U) = P({1}|{0})"
).format(",".join(treatment_name), ",".join(frontdoor_variables_names)),
"Second-stage-unconfoundedness": (
u"If U\N{RIGHTWARDS ARROW}{{{2}}} and U\N{RIGHTWARDS ARROW}{1}"
" then P({1}|{2}, {0}, U) = P({1}|{2}, {0})"
).format(",".join(treatment_name), outcome_name, ",".join(frontdoor_variables_names))
}
estimand = {
'estimand': sym_effect,
'assumptions': sym_assumptions
}
return estimand
[docs]class IdentifiedEstimand:
"""Class for storing a causal estimand, typically as a result of the identification step.
"""
def __init__(self, identifier, treatment_variable, outcome_variable,
estimand_type=None, estimands=None,
backdoor_variables=None, instrumental_variables=None,
frontdoor_variables=None,
mediator_variables=None,
mediation_first_stage_confounders=None,
mediation_second_stage_confounders=None,
default_backdoor_id=None, identifier_method=None,
no_directed_path=False):
self.identifier = identifier
self.treatment_variable = parse_state(treatment_variable)
self.outcome_variable = parse_state(outcome_variable)
self.backdoor_variables = backdoor_variables
self.instrumental_variables = parse_state(instrumental_variables)
self.frontdoor_variables = parse_state(frontdoor_variables)
self.mediator_variables = parse_state(mediator_variables)
self.mediation_first_stage_confounders=mediation_first_stage_confounders
self.mediation_second_stage_confounders=mediation_second_stage_confounders
self.estimand_type = estimand_type
self.estimands = estimands
self.default_backdoor_id = default_backdoor_id
self.identifier_method = identifier_method
self.no_directed_path = no_directed_path
[docs] def set_identifier_method(self, identifier_name):
self.identifier_method = identifier_name
[docs] def get_backdoor_variables(self, key=None):
""" Return a list containing the backdoor variables.
If the calling estimator method is a backdoor method, return the
backdoor variables corresponding to its target estimand.
Otherwise, return the backdoor variables for the default backdoor estimand.
"""
if key is None:
if self.identifier_method and self.identifier_method.startswith("backdoor"):
return self.backdoor_variables[self.identifier_method]
elif self.backdoor_variables is not None and len(self.backdoor_variables) > 0:
return self.backdoor_variables[self.default_backdoor_id]
else:
return []
else:
return self.backdoor_variables[key]
[docs] def set_backdoor_variables(self, bdoor_variables_arr, key=None):
if key is None:
key = self.identifier_method
self.backdoor_variables[key] = bdoor_variables_arr
[docs] def get_frontdoor_variables(self):
"""Return a list containing the frontdoor variables (if present)
"""
return self.frontdoor_variables
[docs] def get_instrumental_variables(self):
"""Return a list containing the instrumental variables (if present)
"""
return self.instrumental_variables
def __deepcopy__(self, memo):
return IdentifiedEstimand(
self.identifier, # not deep copied
copy.deepcopy(self.treatment_variable),
copy.deepcopy(self.outcome_variable),
estimand_type=copy.deepcopy(self.estimand_type),
estimands=copy.deepcopy(self.estimands),
backdoor_variables=copy.deepcopy(self.backdoor_variables),
instrumental_variables=copy.deepcopy(self.instrumental_variables),
frontdoor_variables=copy.deepcopy(self.frontdoor_variables),
mediator_variables=copy.deepcopy(self.mediator_variables),
default_backdoor_id=copy.deepcopy(self.default_backdoor_id),
identifier_method=copy.deepcopy(self.identifier_method)
)
def __str__(self, only_target_estimand=False, show_all_backdoor_sets=False):
if self.no_directed_path:
s = "No directed path from {0} to {1} in the causal graph.".format(
self.treatment_variable,
self.outcome_variable)
s += "\nCausal effect is zero."
return s
s = "Estimand type: {0}\n".format(self.estimand_type)
i = 1
has_valid_backdoor = sum("backdoor" in key for key in self.estimands.keys())
for k, v in self.estimands.items():
if show_all_backdoor_sets:
# Do not show backdoor key unless it is the only backdoor set.
if k == "backdoor" and has_valid_backdoor > 1:
continue
else:
# Just show the default backdoor set
if k.startswith("backdoor") and k != "backdoor":
continue
if only_target_estimand and k != self.identifier_method:
continue
s += "\n### Estimand : {0}\n".format(i)
s += "Estimand name: {0}".format(k)
if k == self.default_backdoor_id:
s += " (Default)"
s += "\n"
if v is None:
s += "No such variable(s) found!\n"
else:
sp_expr_str = sp.pretty(v["estimand"], use_unicode=True)
s += "Estimand expression:\n{0}\n".format(sp_expr_str)
j = 1
for ass_name, ass_str in v["assumptions"].items():
s += "Estimand assumption {0}, {1}: {2}\n".format(j, ass_name, ass_str)
j += 1
i += 1
return s