import logging
from itertools import permutations
from typing import FrozenSet, List, Optional, Tuple
import networkx as nx
import pandas as pd
from dodiscover._protocol import EquivalenceClass
from dodiscover.cd import BaseConditionalDiscrepancyTest
from dodiscover.ci import BaseConditionalIndependenceTest
from dodiscover.context import Context
from dodiscover.typing import Column, SeparatingSet
from .config import ConditioningSetSelection
from .fcialg import FCI
from .skeleton import LearnInterventionSkeleton
logger = logging.getLogger()
[docs]class PsiFCI(FCI):
"""Interventional (Psi) FCI algorithm.
The I-FCI (or Psi-FCI) algorithm is an algorithm that accepts
multiple sets of data that may pertain to observational and/or
multiple interventional datasets under a known (I-FCI), or unknown (Psi-FCI)
intervention target setting. Our API consolidates them here under
one class, but you can control the setting using our hyperparameter.
See :footcite:`Kocaoglu2019characterization` for more information on
I-FCI and :footcite:`Jaber2020causal` for more information on Psi-FCI.
The Psi-FCI algorithm is complete for the Psi-PAG equivalence class.
However, the I-FCI has not been shown to be complete for the I-PAG
equivalence class. Note that the I-FCI algorithm may change without
notice.
Parameters
----------
ci_estimator : BaseConditionalIndependenceTest
The conditional independence test function. The arguments of the estimator should
be data, node, node to compare, conditioning set of nodes, and any additional
keyword arguments.
cd_estimator : BaseConditionalDiscrepancyTest
The conditional discrepancy test function.
alpha : float, optional
The significance level for the conditional independence test, by default 0.05.
min_cond_set_size : int, optional
Minimum size of the conditioning set, by default None, which will be set to '0'.
Used to constrain the computation spent on the algorithm.
max_cond_set_size : int, optional
Maximum size of the conditioning set, by default None. Used to limit
the computation spent on the algorithm.
max_combinations : int, optional
The maximum number of conditional independence tests to run from the set
of possible conditioning sets. By default None, which means the algorithm will
check all possible conditioning sets. If ``max_combinations=n`` is set, then
for every conditioning set size, 'p', there will be at most 'n' CI tests run
before the conditioning set size 'p' is incremented. For controlling the size
of 'p', see ``min_cond_set_size`` and ``max_cond_set_size``. This can be used
in conjunction with ``keep_sorted`` parameter to only test the "strongest"
dependences.
condsel_method : ConditioningSetSelection
The method to use for selecting the conditioning sets. Must be one of
('neighbors', 'complete', 'neighbors_path'). See Notes for more details.
apply_orientations : bool
Whether or not to apply orientation rules given the learned skeleton graph
and separating set per pair of variables. If ``True`` (default), will
apply Zhang's orientation rules R0-10, orienting colliders and certain
arrowheads and tails :footcite:`Zhang2008`.
keep_sorted : bool
Whether or not to keep the considered conditioning set variables in sorted
dependency order. If True (default) will sort the existing dependencies of each variable
by its dependencies from strongest to weakest (i.e. largest CI test statistic value
to lowest). The conditioning set is chosen lexographically
based on the sorted test statistic values of 'ith Pa(X) -> X', for each possible
parent node of 'X'. This can be used in conjunction with ``max_combinations`` parameter
to only test the "strongest" dependences.
max_iter : int
The maximum number of iterations through the graph to apply
orientation rules.
max_path_length : int, optional
The maximum length of any discriminating path, or None if unlimited.
pds_condsel_method : ConditioningSetSelection
The method to use for selecting the conditioning sets using PDS. Must be one of
('pds', 'pds_path'). See Notes for more details.
known_intervention_targets : bool, optional
If `True`, then will run the I-FCI algorithm. If `False`, will run the
Psi-FCI algorithm. By default False.
n_jobs : int, optional
The number of parallel jobs to run. If -1, then the number of jobs is set to
the number of cores. If 1 is given, no parallel computing code is used at all,
By default None, which means 1.
Notes
-----
Selection bias is unsupported because it is still an active research area.
"""
def __init__(
self,
ci_estimator: BaseConditionalIndependenceTest,
cd_estimator: BaseConditionalDiscrepancyTest,
alpha: float = 0.05,
min_cond_set_size: Optional[int] = None,
max_cond_set_size: Optional[int] = None,
max_combinations: Optional[int] = None,
condsel_method: ConditioningSetSelection = ConditioningSetSelection.NBRS,
apply_orientations: bool = True,
keep_sorted: bool = False,
max_iter: int = 1000,
max_path_length: Optional[int] = None,
pds_condsel_method: ConditioningSetSelection = ConditioningSetSelection.PDS,
known_intervention_targets: bool = False,
n_jobs: Optional[int] = None,
):
super().__init__(
ci_estimator,
alpha,
min_cond_set_size,
max_cond_set_size,
max_combinations,
condsel_method,
apply_orientations,
keep_sorted=keep_sorted,
max_iter=max_iter,
max_path_length=max_path_length,
selection_bias=False,
pds_condsel_method=pds_condsel_method,
n_jobs=n_jobs,
)
self.cd_estimator = cd_estimator
self.known_intervention_targets = known_intervention_targets
[docs] def learn_skeleton(
self, data: pd.DataFrame, context: Context, sep_set: Optional[SeparatingSet] = None
) -> Tuple[nx.Graph, SeparatingSet]:
# now compute all possibly d-separating sets and learn a better skeleton
self.skeleton_learner_ = LearnInterventionSkeleton(
self.ci_estimator,
self.cd_estimator,
sep_set=sep_set,
alpha=self.alpha,
min_cond_set_size=self.min_cond_set_size,
max_cond_set_size=self.max_cond_set_size,
max_combinations=self.max_combinations,
condsel_method=self.condsel_method,
second_stage_condsel_method=self.pds_condsel_method,
keep_sorted=False,
max_path_length=self.max_path_length,
n_jobs=self.n_jobs,
)
self.skeleton_learner_.learn_graph(data, context)
self.context_ = self.skeleton_learner_.context_.copy()
skel_graph = self.skeleton_learner_.adj_graph_
sep_set = self.skeleton_learner_.sep_set_
self.n_ci_tests += self.skeleton_learner_.n_ci_tests
return skel_graph, sep_set
[docs] def learn_graph(self, data: List[pd.DataFrame], context: Context):
"""Learn the relevant causal graph equivalence class.
From the pairs of datasets, we take all combinations and
construct F-nodes corresponding to those.
Parameters
----------
data : List[pd.DataFrame]
The list of different datasets assigned to different
environments. We assume the first dataset is always
observational.
context : Context
The context with interventional assumptions.
Returns
-------
self : PsiFCI
The fitted learner.
"""
if not isinstance(data, list):
raise TypeError("The input datasets must be in a Python list.")
n_datasets = len(data)
n_distributions = context.num_distributions
if n_datasets != n_distributions:
raise RuntimeError(
f"There are {n_datasets} passed in, but {n_distributions} "
f"total assumed distributions. There must be a matching number of datasets and "
f"'context.num_distributions'."
)
return super().learn_graph(data, context)
def _apply_rule11(self, graph: EquivalenceClass, context: Context) -> Tuple[bool, List]:
"""Apply "Rule 8" in I-FCI algorithm, which we call Rule 11.
This orients all edges out of F-nodes. So patterns of the form
``('F', 0) *-* 'x'`` will become ``('F', 0) -> 'x'``.
For original details of the rule, see :footcite:`Kocaoglu2019characterization`.
Parameters
----------
graph : EquivalenceClass
The causal graph to apply rules to.
context : Context
The causal context.
Returns
-------
added_arrows : bool
Whether or not arrows were added.
oriented_edges : List
A list of oriented edges.
References
----------
.. footbibliography::
"""
augmented_nodes = context.get_augmented_nodes()
oriented_edges = []
added_arrows = True
for node in augmented_nodes:
for nbr in graph.neighbors(node):
if nbr in augmented_nodes:
continue
# remove all edges between node and nbr and orient this out
graph.remove_edge(node, nbr)
graph.remove_edge(nbr, node)
graph.add_edge(node, nbr, graph.directed_edge_name)
oriented_edges.append((node, nbr))
return added_arrows, oriented_edges
def _apply_rule12(
self, graph: EquivalenceClass, u: Column, a: Column, c: Column, context: Context
) -> bool:
"""Apply orientation rule of the I-FCI algorithm.
In the I-FCI algorithm, this is called "Rule 9". Checks for inducing paths where
'u' is the F-node, and 'a' and 'c' are connected:
'u' -> 'a' *-* 'c' with 'u' -> 'c', then orient 'a' -> 'c'.
For original details of the rule, see :footcite:`Kocaoglu2019characterization`.
Parameters
----------
graph : EquivalenceClass
The causal graph.
u : Column
The candidate F-node
a : Column
Neighbors of the F-node.
c : Column
Neighbors of the F-node.
context : Context
The causal context.
Returns
-------
added_arrows : bool
Whether or not an orientation was made.
References
----------
.. footbibliography::
"""
f_nodes = context.f_nodes
symmetric_diff_map = context.symmetric_diff_map
added_arrows = False
if u in f_nodes and self.known_intervention_targets:
# get sigma map to map F-node to its symmetric difference target
S_set: FrozenSet = symmetric_diff_map.get(u, frozenset())
# check a *-* c
if (
len(S_set) == 1
and a in S_set
and (graph.has_edge(a, c) or graph.has_edge(c, a))
and graph.has_edge(u, a)
and graph.has_edge(u, c)
):
# remove all edges between a and c
graph.remove_edge(a, c)
graph.remove_edge(c, a)
# then orient X -> Y
graph.add_edge(a, c, graph.directed_edge_name)
added_arrows = True
return added_arrows
def _apply_orientation_rules(self, graph: EquivalenceClass, sep_set: SeparatingSet):
idx = 0
finished = False
# apply R11, which is called R8 in I-FCI / Psi-FCI orienting all F-nodes
_ = self._apply_rule11(graph, self.context_)
while idx < self.max_iter and not finished:
change_flag = False
logger.info(f"Running R1-10 for iteration {idx}")
for u in graph.nodes:
for (a, c) in permutations(graph.neighbors(u), 2):
logger.debug(f"Check {u} {a} {c}")
# apply R1-3 to orient triples and arrowheads
r1_add = self._apply_rule1(graph, u, a, c)
r2_add = self._apply_rule2(graph, u, a, c)
r3_add = self._apply_rule3(graph, u, a, c)
# apply R4, orienting discriminating paths
r4_add, _ = self._apply_rule4(graph, u, a, c, sep_set)
# apply R8 to orient more tails
r8_add = self._apply_rule8(graph, u, a, c)
# apply R9-10 to orient uncovered potentially directed paths
r9_add, _ = self._apply_rule9(graph, a, u, c)
# a and c are neighbors of u, so u is the endpoint desired
r10_add, _, _ = self._apply_rule10(graph, a, c, u)
# apply R12, called R9 in I-FCI when we know the intervention targets
r12_add = self._apply_rule12(graph, u, a, c, self.context_)
# see if there was a change flag
all_flags = [r1_add, r2_add, r3_add, r4_add, r8_add, r9_add, r10_add, r12_add]
if any(all_flags) and not change_flag:
logger.info(f"{change_flag} with {all_flags}")
change_flag = True
# check if we should continue or not
if not change_flag:
finished = True
if not self.selection_bias:
logger.info(f"Finished applying R1-4, and R8-10 with {idx} iterations")
if self.selection_bias:
logger.info(f"Finished applying R1-10 with {idx} iterations")
break
idx += 1
def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass:
import pywhy_graphs as pgraph
# convert the undirected skeleton graph to its PAG-class, where
# all left-over edges have a "circle" endpoint
if self.known_intervention_targets:
pag = pgraph.AugmentedPAG(
incoming_circle_edges=graph, name="AugmentedPAG derived with I-FCI"
)
else:
pag = pgraph.AugmentedPAG(
incoming_circle_edges=graph, name="AugmentedPAG derived with Psi-FCI"
)
# XXX: assign targets as well
# assign f-nodes
for f_node in self.context_.f_nodes:
pag.set_f_node(f_node)
return pag