Source code for dowhy.causal_identifiers.id_identifier

import numpy as np
import pandas as pd
import networkx as nx
from dowhy.utils.ordered_set import OrderedSet
from dowhy.utils.graph_operations import find_c_components, induced_graph, find_ancestor
from dowhy.causal_identifier import CausalIdentifier
from dowhy.utils.api import parse_state

[docs]class IDExpression: """ Class for storing a causal estimand, as a result of the identification step using the ID algorithm. The object stores a list of estimators(self._product) whose porduct must be obtained and a list of variables (self._sum) over which the product must be marginalized. """ def __init__(self): self._product = [] self._sum = []
[docs] def add_product(self, element): ''' Add an estimator to the list of product. :param element: Estimator to append to the product list. ''' self._product.append(element)
[docs] def add_sum(self, element): ''' Add variables to the list. :param element: Set of variables to append to the list self._sum. ''' for el in element: self._sum.append(el)
[docs] def get_val(self, return_type): """ Get either the list of estimators (for product) or list of variables (for the marginalization). :param return_type: "prod" to return the list of estimators or "sum" to return the list of variables. """ if return_type=="prod": return self._product elif return_type=="sum": return self._sum else: raise Exception("Provide correct return type.")
def _print_estimator(self, prefix, estimator=None, start=False): ''' Print the IDExpression object. ''' if estimator is None: return None string = "" if isinstance(estimator, IDExpression): s = True if len(estimator.get_val(return_type="sum"))>0 else False if s: sum_vars = "{" + ",".join(estimator.get_val(return_type="sum")) + "}" string += prefix + "Sum over " + sum_vars + ":\n" prefix += "\t" for expression in estimator.get_val(return_type='prod'): add_string = self._print_estimator(prefix, expression) if add_string is None: return None else: string += add_string else: outcome_vars = list(estimator['outcome_vars']) condition_vars = list(estimator['condition_vars']) string += prefix + "Predictor: P(" + ",".join(outcome_vars) if len(condition_vars)>0: string += "|" + ",".join(condition_vars) string += ")\n" if start: string = string[:-1] return string def __str__(self): string = self._print_estimator(prefix="", estimator=self, start=True) if string is None: return "The graph is not identifiable." else: return string
[docs]class IDIdentifier(CausalIdentifier): def __init__(self, graph, estimand_type, method_name = "default", proceed_when_unidentifiable=None): ''' Class to perform identification using the ID algorithm. :param self: instance of the IDIdentifier class. :param estimand_type: Type of estimand ("nonparametric-ate", "nonparametric-nde" or "nonparametric-nie"). :param method_name: Identification method ("id-algorithm" in this case). :param proceed_when_unidentifiable: If True, proceed with identification even in the presence of unobserved/missing variables. ''' super().__init__(graph, estimand_type, method_name, proceed_when_unidentifiable) if self.estimand_type != CausalIdentifier.NONPARAMETRIC_ATE: raise Exception("The estimand type should be 'non-parametric ate' for the ID method type.") self._treatment_names = OrderedSet(parse_state(graph.treatment_name)) self._outcome_names = OrderedSet(parse_state(graph.outcome_name)) self._adjacency_matrix = graph.get_adjacency_matrix() try: self._tsort_node_names = OrderedSet(list(nx.topological_sort(graph._graph))) # topological sorting of graph nodes except: raise Exception("The graph must be a directed acyclic graph (DAG).") self._node_names = OrderedSet(graph._graph.nodes)
[docs] def identify_effect(self, treatment_names=None, outcome_names=None, adjacency_matrix=None, node_names=None): ''' Implementation of the ID algorithm. Link - https://ftp.cs.ucla.edu/pub/stat_ser/shpitser-thesis.pdf The pseudo code has been provided on Pg 40. :param self: instance of the IDIdentifier class. :param treatment_names: OrderedSet comprising names of treatment variables. :param outcome_names:OrderedSet comprising names of outcome variables. :param adjacency_matrix: Graph adjacency matrix. :param node_names: OrderedSet comprising names of all nodes in the graph. :returns: target estimand, an instance of the IDExpression class. ''' if adjacency_matrix is None: adjacency_matrix = self._adjacency_matrix if treatment_names is None: treatment_names = self._treatment_names if outcome_names is None: outcome_names = self._outcome_names if node_names is None: node_names = self._node_names node2idx, idx2node = self._idx_node_mapping(node_names) # Estimators list for returning after identification estimators = IDExpression() # Line 1 # If no action has been taken, the effect on Y is just the marginal of the observational distribution P(v) on Y. if len(treatment_names) == 0: identifier = IDExpression() estimator = {} estimator['outcome_vars'] = node_names estimator['condition_vars'] = OrderedSet() identifier.add_product(estimator) identifier.add_sum(node_names.difference(outcome_names)) estimators.add_product(identifier) return estimators # Line 2 # If we are interested in the effect on Y, it is sufficient to restrict our attention on the parts of the model ancestral to Y. ancestors = find_ancestor(outcome_names, node_names, adjacency_matrix, node2idx, idx2node) if len(node_names.difference(ancestors)) != 0: # If there are elements which are not the ancestor of the outcome variables # Modify list of valid nodes treatment_names = treatment_names.intersection(ancestors) node_names = node_names.intersection(ancestors) adjacency_matrix = induced_graph(node_set=node_names, adjacency_matrix=adjacency_matrix, node2idx=node2idx) return self.identify_effect(treatment_names=treatment_names, outcome_names=outcome_names, adjacency_matrix=adjacency_matrix, node_names=node_names) # Line 3 - forces an action on any node where such an action would have no effect on Y – assuming we already acted on X. # Modify adjacency matrix to obtain that corresponding to do(X) adjacency_matrix_do_x = adjacency_matrix.copy() for x in treatment_names: x_idx = node2idx[x] for i in range(len(node_names)): adjacency_matrix_do_x[i, x_idx] = 0 ancestors = find_ancestor(outcome_names, node_names, adjacency_matrix_do_x, node2idx, idx2node) W = node_names.difference(treatment_names).difference(ancestors) if len(W) != 0: return self.identify_effect(treatment_names = treatment_names.union(W), outcome_names=outcome_names, adjacency_matrix=adjacency_matrix, node_names=node_names) # Line 4 - Decomposes the problem into a set of smaller problems using the key property of C-component factorization of causal models. # If the entire graph is a single C-component already, further problem decomposition is impossible, and we must provide base cases. # Modify adjacency matrix to remove treatment variables node_names_minus_x = node_names.difference(treatment_names) node2idx_minus_x, idx2node_minus_x = self._idx_node_mapping(node_names_minus_x) adjacency_matrix_minus_x = induced_graph(node_set=node_names_minus_x, adjacency_matrix=adjacency_matrix, node2idx=node2idx) c_components = find_c_components(adjacency_matrix=adjacency_matrix_minus_x, node_set=node_names_minus_x, idx2node=idx2node_minus_x) if len(c_components)>1: identifier = IDExpression() sum_over_set = node_names.difference(outcome_names.union(treatment_names)) for component in c_components: expressions = self.identify_effect(treatment_names=node_names.difference(component), outcome_names=OrderedSet(list(component)), adjacency_matrix=adjacency_matrix, node_names=node_names) for expression in expressions.get_val(return_type="prod"): identifier.add_product(expression) identifier.add_sum(sum_over_set) estimators.add_product(identifier) return estimators # Line 5 - The algorithms fails due to the presence of a hedge - the graph G, and a subgraph S that does not contain any X nodes. S = c_components[0] c_components_G = find_c_components(adjacency_matrix=adjacency_matrix, node_set=node_names, idx2node=idx2node) if len(c_components_G)==1 and c_components_G[0] == node_names: return None # Line 6 - If there are no bidirected arcs from X to the other nodes in the current subproblem under consideration, then we can replace acting on X by conditioning, and thus solve the subproblem. if S in c_components_G: sum_over_set = S.difference(outcome_names) prev_nodes = [] for node in self._tsort_node_names: if node in S: identifier = IDExpression() estimator = {} estimator['outcome_vars'] = OrderedSet([node]) estimator['condition_vars'] = OrderedSet(prev_nodes) identifier.add_product(estimator) identifier.add_sum(sum_over_set) estimators.add_product(identifier) prev_nodes.append(node) return estimators # Line 7 - This is the most complicated case in the algorithm. Explain in the second last paragraph on Pg 41 of the link provided in the docstring above. for component in c_components_G: C = S.difference(component) if C.is_empty() is None: return self.identify_effect(treatment_names=treatment_names.intersection(component), outcome_names=outcome_names, adjacency_matrix=induced_graph(node_set=component, adjacency_matrix=adjacency_matrix,node2idx=node2idx), node_names=node_names)
def _idx_node_mapping(self, node_names): ''' Obtain the node name to index and index to node name mappings. :param node_names: Name of all nodes in the graph. :return: node to index and index to node mappings. ''' node2idx = {} idx2node = {} for i, node in enumerate(node_names.get_all()): node2idx[node] = i idx2node[i] = node return node2idx, idx2node