import logging
from itertools import combinations
from typing import Optional, Tuple
import networkx as nx
import pandas as pd
from dodiscover.ci.base import BaseConditionalIndependenceTest
from dodiscover.constraint.config import ConditioningSetSelection
from dodiscover.constraint.utils import is_in_sep_set
from dodiscover.typing import Column, SeparatingSet
from .._protocol import EquivalenceClass
from ..context import Context
from ._classes import BaseConstraintDiscovery
from .skeleton import LearnSkeleton
logger = logging.getLogger()
[docs]class PC(BaseConstraintDiscovery):
"""Peter and Clarke (PC) algorithm for causal discovery.
Assumes causal sufficiency, that is, all confounders in the
causal graph are observed variables. See :footcite:`Spirtes1993` for
full details on the algorithm.
Parameters
----------
ci_estimator : BaseConditionalIndependenceTest
The conditional independence test function. The arguments of the estimator should
be data, node, node to compare, conditioning set of nodes, and any additional
keyword arguments. It must implement the ``test`` function which accepts the data,
a set of X nodes, a set of Y nodes and an optional set of Z nodes, which returns a
ordered tuple of test statistic and pvalue associated with the null hypothesis
:math:`X \\perp Y | Z`.
alpha : float, optional
The significance level for the conditional independence test, by default 0.05.
min_cond_set_size : int, optional
Minimum size of the conditioning set, by default None, which will be set to '0'.
Used to constrain the computation spent on the algorithm.
max_cond_set_size : int, optional
Maximum size of the conditioning set, by default None. Used to limit
the computation spent on the algorithm.
max_combinations : int, optional
The maximum number of conditional independence tests to run from the set
of possible conditioning sets. By default None, which means the algorithm will
check all possible conditioning sets. If ``max_combinations=n`` is set, then
for every conditioning set size, 'p', there will be at most 'n' CI tests run
before the conditioning set size 'p' is incremented. For controlling the size
of 'p', see ``min_cond_set_size`` and ``max_cond_set_size``. This can be used
in conjunction with ``keep_sorted`` parameter to only test the "strongest"
dependences.
condsel_method : ConditioningSetSelection
The method to use for selecting the conditioning set. Must be one of
('neighbors', 'complete', 'neighbors_path'). See Notes for more details.
apply_orientations : bool
Whether or not to apply orientation rules given the learned skeleton graph
and separating set per pair of variables. If ``True`` (default), will
apply Meek's orientation rules R0-3, orienting colliders and certain
arrowheads :footcite:`Meek1995`.
keep_sorted : bool
Whether or not to keep the considered conditioning set variables in sorted
dependency order. If True (default) will sort the existing dependencies of each variable
by its dependencies from strongest to weakest (i.e. largest CI test statistic value
to lowest). The conditioning set is chosen lexographically
based on the sorted test statistic values of 'ith Pa(X) -> X', for each possible
parent node of 'X'. This can be used in conjunction with ``max_combinations`` parameter
to only test the "strongest" dependences.
max_iter : int
The maximum number of iterations through the graph to apply
orientation rules.
Attributes
----------
graph_ : EquivalenceClass
The equivalence class of graphs discovered.
separating_sets_ : dict of dict of list of set
The dictionary of separating sets, where it is a nested dictionary from
the variable name to the variable it is being compared to the set of
variables in the graph that separate the two.
References
----------
.. footbibliography::
"""
graph_: EquivalenceClass
separating_sets_: SeparatingSet
def __init__(
self,
ci_estimator: BaseConditionalIndependenceTest,
alpha: float = 0.05,
min_cond_set_size: Optional[int] = None,
max_cond_set_size: Optional[int] = None,
max_combinations: Optional[int] = None,
condsel_method: ConditioningSetSelection = ConditioningSetSelection.NBRS,
apply_orientations: bool = True,
keep_sorted: bool = False,
max_iter: int = 1000,
n_jobs: Optional[int] = None,
):
super().__init__(
ci_estimator,
alpha,
min_cond_set_size=min_cond_set_size,
max_cond_set_size=max_cond_set_size,
max_combinations=max_combinations,
condsel_method=condsel_method,
apply_orientations=apply_orientations,
keep_sorted=keep_sorted,
n_jobs=n_jobs,
)
self.max_iter = max_iter
[docs] def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass:
"""Convert skeleton graph as undirected networkx Graph to CPDAG.
Parameters
----------
graph : nx.Graph
Converts a skeleton graph to the representation needed
for PC algorithm, a CPDAG.
Returns
-------
graph : EquivalenceClass
The CPDAG class.
"""
from pywhy_graphs import CPDAG
# convert Graph object to a CPDAG object with
# all undirected edges
graph = CPDAG(incoming_undirected_edges=graph)
return graph
[docs] def learn_skeleton(
self, data: pd.DataFrame, context: Context, sep_set: Optional[SeparatingSet] = None
) -> Tuple[nx.Graph, SeparatingSet]:
"""Learns the skeleton of a causal DAG using pairwise (conditional) independence testing.
Parameters
----------
data : pd.DataFrame
The dataset.
context : Context
A context object.
sep_set : SeparatingSet
The separating set.
Returns
-------
skel_graph : nx.Graph
The undirected graph of the causal graph's skeleton.
sep_set : SeparatingSet
The separating set per pairs of variables.
Notes
-----
Learning the skeleton of a causal DAG uses (conditional) independence testing
to determine which variables are (in)dependent. This specific algorithm
compares exhaustively pairs of adjacent variables.
"""
skel_alg = LearnSkeleton(
self.ci_estimator,
sep_set=sep_set,
alpha=self.alpha,
min_cond_set_size=self.min_cond_set_size,
max_cond_set_size=self.max_cond_set_size,
max_combinations=self.max_combinations,
condsel_method=self.condsel_method,
keep_sorted=self.keep_sorted,
n_jobs=self.n_jobs,
)
skel_alg.learn_graph(data, context)
skel_graph = skel_alg.adj_graph_
sep_set = skel_alg.sep_set_
self.n_ci_tests += skel_alg.n_ci_tests
return skel_graph, sep_set
[docs] def orient_edges(self, graph: EquivalenceClass) -> None:
"""Orient edges in a skeleton graph to estimate the causal DAG, or CPDAG.
These are known as the Meek rules :footcite:`Meek1995`. They are deterministic
in the sense that they are logical characterizations of what edges must be
present given the rest of the local graph structure.
Parameters
----------
graph : EquivalenceClass
A skeleton graph. If ``None``, then will initialize PC using a
complete graph. By default None.
"""
# For all the combination of nodes i and j, apply the following
# rules.
idx = 0
finished = False
while idx < self.max_iter and not finished: # type: ignore
change_flag = False
for i in graph.nodes:
for j in graph.neighbors(i):
if i == j:
continue
# Rule 1: Orient i-j into i->j whenever there is an arrow k->i
# such that k and j are nonadjacent.
r1_add = self._apply_meek_rule1(graph, i, j)
# Rule 2: Orient i-j into i->j whenever there is a chain
# i->k->j.
r2_add = self._apply_meek_rule2(graph, i, j)
# Rule 3: Orient i-j into i->j whenever there are two chains
# i-k->j and i-l->j such that k and l are nonadjacent.
r3_add = self._apply_meek_rule3(graph, i, j)
# Rule 4: Orient i-j into i->j whenever there are two chains
# i-k->l and k->l->j such that k and j are nonadjacent.
#
# However, this rule is not necessary when the PC-algorithm
# is used to estimate a DAG.
if any([r1_add, r2_add, r3_add]) and not change_flag:
change_flag = True
if not change_flag:
finished = True
logger.info(f"Finished applying R1-3, with {idx} iterations")
break
idx += 1
[docs] def orient_unshielded_triples(
self,
graph: EquivalenceClass,
sep_set: SeparatingSet,
) -> None:
"""Orient colliders given a graph and separation set.
Parameters
----------
graph : EquivalenceClass
The CPDAG.
sep_set : Dict[Dict[Set[Set[Any]]]]
The separating set between any two nodes.
"""
# for every node in the PAG, evaluate neighbors that have any edge
for u in graph.nodes:
for v_i, v_j in combinations(graph.neighbors(u), 2):
# Check that there is no edge of any type between
# v_i and v_j, else this is a "shielded" collider.
# Then check to see if 'u' is in "any" separating
# set. If it is not, then there is a collider.
if v_j not in graph.neighbors(v_i) and not is_in_sep_set(
u, sep_set, v_i, v_j, mode="any"
):
self._orient_collider(graph, v_i, u, v_j)
def _orient_collider(
self, graph: EquivalenceClass, v_i: Column, u: Column, v_j: Column
) -> None:
logger.info(
f"orienting collider: {v_i} -> {u} and {v_j} -> {u} to make {v_i} -> {u} <- {v_j}."
)
if graph.has_edge(v_i, u, graph.undirected_edge_name):
graph.orient_uncertain_edge(v_i, u)
if graph.has_edge(v_j, u, graph.undirected_edge_name):
graph.orient_uncertain_edge(v_j, u)
def _apply_meek_rule1(self, graph: EquivalenceClass, i: Column, j: Column) -> bool:
"""Apply rule 1 of Meek's rules.
Looks for i - j such that k -> i, such that (k,i,j)
is an unshielded triple. Then can orient i - j as i -> j.
"""
added_arrows = False
# Check if i-j.
if graph.has_edge(i, j, graph.undirected_edge_name):
for k in graph.predecessors(i):
# Skip if k and j are adjacent because then it is a
# shielded triple
if j in graph.neighbors(k):
continue
# check if the triple is in the graph's excluded triples
if frozenset((k, i, j)) in graph.excluded_triples:
continue
# Make i-j into i->j
logger.info(f"R1: Removing edge ({i}, {j}) and orienting as {k} -> {i} -> {j}.")
graph.orient_uncertain_edge(i, j)
added_arrows = True
break
return added_arrows
def _apply_meek_rule2(self, graph: EquivalenceClass, i: Column, j: Column) -> bool:
"""Apply rule 2 of Meek's rules.
Check for i - j, and then looks for i -> k -> j
triple, to orient i - j as i -> j.
"""
added_arrows = False
# Check if i-j.
if graph.has_edge(i, j, graph.undirected_edge_name):
# Find nodes k where k is i->k
succs_i = set()
for k in graph.successors(i):
if not graph.has_edge(k, i, graph.directed_edge_name):
succs_i.add(k)
# Find nodes j where j is k->j.
preds_j = set()
for k in graph.predecessors(j):
if not graph.has_edge(j, k, graph.directed_edge_name):
preds_j.add(k)
# Check if there is any node k where i->k->j.
candidate_k = succs_i.intersection(preds_j)
# if the graph has excluded triples, we would check at this point
if graph.excluded_triples:
# check if the triple is in the graph's excluded triples
# if so, remove them from the candidates
for k in candidate_k:
if frozenset((i, k, j)) in graph.excluded_triples:
candidate_k.remove(k)
# if there are candidate 'k' nodes, then orient the edge accordingly
if len(candidate_k) > 0:
# Make i-j into i->j
logger.info(f"R2: Removing edge {i}-{j} to form {i}->{j}.")
graph.orient_uncertain_edge(i, j)
added_arrows = True
return added_arrows
def _apply_meek_rule3(self, graph: EquivalenceClass, i: Column, j: Column) -> bool:
"""Apply rule 3 of Meek's rules.
Check for i - j, and then looks for k -> j <- l
collider, and i - k and i - l, then orient i -> j.
"""
added_arrows = False
# Check if i-j first
if graph.has_edge(i, j, graph.undirected_edge_name):
# For all the pairs of nodes adjacent to i,
# look for (k, l), such that j -> l and k -> l
for (k, l) in combinations(graph.neighbors(i), 2):
# Skip if k and l are adjacent.
if l in graph.neighbors(k):
continue
# Skip if not k->j.
if graph.has_edge(j, k, graph.directed_edge_name) or (
not graph.has_edge(k, j, graph.directed_edge_name)
):
continue
# Skip if not l->j.
if graph.has_edge(j, l, graph.directed_edge_name) or (
not graph.has_edge(l, j, graph.directed_edge_name)
):
continue
# check if the triple is inside graph's excluded triples
if frozenset((l, i, k)) in graph.excluded_triples:
continue
# if i - k and i - l, then at this point, we have a valid path
# to orient
if graph.has_edge(k, i, graph.undirected_edge_name) and graph.has_edge(
l, i, graph.undirected_edge_name
):
logger.info(f"R3: Removing edge {i}-{j} to form {i}->{j}")
graph.orient_uncertain_edge(i, j)
added_arrows = True
break
return added_arrows