Source code for dodiscover.constraint.fcialg

import logging
from itertools import combinations, permutations
from typing import List, Optional, Set, Tuple

import networkx as nx
import pandas as pd

from dodiscover.ci.base import BaseConditionalIndependenceTest
from dodiscover.constraint.config import ConditioningSetSelection
from dodiscover.constraint.skeleton import LearnSemiMarkovianSkeleton
from dodiscover.constraint.utils import is_in_sep_set
from dodiscover.typing import Column, SeparatingSet

from .._protocol import EquivalenceClass
from ..context import Context
from ._classes import BaseConstraintDiscovery

logger = logging.getLogger()


[docs]class FCI(BaseConstraintDiscovery): """The Fast Causal Inference (FCI) algorithm for causal discovery. A complete constraint-based causal discovery algorithm that operates on observational data :footcite:`Zhang2008` assuming there may exist latent confounders, and optionally selection bias. Parameters ---------- ci_estimator : Callable The conditional independence test function. The arguments of the estimator should be data, node, node to compare, conditioning set of nodes, and any additional keyword arguments. alpha : float, optional The significance level for the conditional independence test, by default 0.05. min_cond_set_size : int, optional Minimum size of the conditioning set, by default None, which will be set to '0'. Used to constrain the computation spent on the algorithm. max_cond_set_size : int, optional Maximum size of the conditioning set, by default None. Used to limit the computation spent on the algorithm. max_combinations : int, optional The maximum number of conditional independence tests to run from the set of possible conditioning sets. By default None, which means the algorithm will check all possible conditioning sets. If ``max_combinations=n`` is set, then for every conditioning set size, 'p', there will be at most 'n' CI tests run before the conditioning set size 'p' is incremented. For controlling the size of 'p', see ``min_cond_set_size`` and ``max_cond_set_size``. This can be used in conjunction with ``keep_sorted`` parameter to only test the "strongest" dependences. condsel_method : ConditioningSetSelection The method to use for selecting the conditioning sets. Must be one of ('neighbors', 'complete', 'neighbors_path'). See Notes for more details. apply_orientations : bool Whether or not to apply orientation rules given the learned skeleton graph and separating set per pair of variables. If ``True`` (default), will apply Zhang's orientation rules R0-10, orienting colliders and certain arrowheads and tails :footcite:`Zhang2008`. keep_sorted : bool Whether or not to keep the considered conditioning set variables in sorted dependency order. If True (default) will sort the existing dependencies of each variable by its dependencies from strongest to weakest (i.e. largest CI test statistic value to lowest). The conditioning set is chosen lexographically based on the sorted test statistic values of 'ith Pa(X) -> X', for each possible parent node of 'X'. This can be used in conjunction with ``max_combinations`` parameter to only test the "strongest" dependences. max_iter : int The maximum number of iterations through the graph to apply orientation rules. max_path_length : int, optional The maximum length of any discriminating path, or None if unlimited. selection_bias : bool Whether or not to account for selection bias within the causal PAG. See :footcite:`Zhang2008`. pds_condsel_method : ConditioningSetSelection The method to use for selecting the conditioning sets using PDS. Must be one of ('pds', 'pds_path'). See Notes for more details. References ---------- .. footbibliography:: Notes ----- Note that the algorithm is called "fast causal inference", but in reality the algorithm is quite expensive in terms of the number of conditional independence tests it must run. """ graph_: EquivalenceClass separating_sets_: SeparatingSet def __init__( self, ci_estimator: BaseConditionalIndependenceTest, alpha: float = 0.05, min_cond_set_size: Optional[int] = None, max_cond_set_size: Optional[int] = None, max_combinations: Optional[int] = None, condsel_method: ConditioningSetSelection = ConditioningSetSelection.NBRS, apply_orientations: bool = True, keep_sorted: bool = False, max_iter: int = 1000, max_path_length: Optional[int] = None, selection_bias: bool = True, pds_condsel_method: ConditioningSetSelection = ConditioningSetSelection.PDS, n_jobs: Optional[int] = None, ): super().__init__( ci_estimator, alpha, min_cond_set_size=min_cond_set_size, max_cond_set_size=max_cond_set_size, max_combinations=max_combinations, condsel_method=condsel_method, keep_sorted=keep_sorted, apply_orientations=apply_orientations, n_jobs=n_jobs, ) self.max_iter = max_iter self.max_path_length = max_path_length self.selection_bias = selection_bias self.pds_condsel_method = pds_condsel_method
[docs] def orient_unshielded_triples(self, graph: EquivalenceClass, sep_set: SeparatingSet) -> None: """Orient colliders given a graph and separation set. Parameters ---------- graph : EquivalenceClass The partial ancestral graph (PAG). sep_set : SeparatingSet The separating set between any two nodes. """ # for every node in the PAG, evaluate neighbors that have any edge for u in graph.nodes: for v_i, v_j in combinations(graph.neighbors(u), 2): # Check that there is no edge of any type between # v_i and v_j, else this is a "shielded" collider. # Then check to see if 'u' is in the separating # set. If it is not, then there is a collider. if v_j not in graph.neighbors(v_i) and not is_in_sep_set( u, sep_set, v_i, v_j, mode="any" ): self._orient_collider(graph, v_i, u, v_j)
def _orient_collider( self, graph: EquivalenceClass, v_i: Column, u: Column, v_j: Column ) -> None: logger.info( f"orienting collider: {v_i} -> {u} and {v_j} -> {u} to make {v_i} -> {u} <- {v_j}." ) if graph.has_edge(v_i, u, graph.circle_edge_name): graph.orient_uncertain_edge(v_i, u) if graph.has_edge(v_j, u, graph.circle_edge_name): graph.orient_uncertain_edge(v_j, u) def _apply_rule1(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 1 of the FCI algorithm. If A *-> u o-* C, A and C are not adjacent, then we can orient the triple as A *-> u -> C. Parameters ---------- graph : EquivalenceClass The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. """ added_arrows = False # If A *-> u o-* C, A and C are not adjacent, # then we can orient the triple as A *-> u -> C. # check that a and c are not adjacent if a not in graph.neighbors(c): # check a *-> u o-* c if ( graph.has_edge(a, u, graph.directed_edge_name) or graph.has_edge(a, u, graph.bidirected_edge_name) ) and graph.has_edge(c, u, graph.circle_edge_name): logger.info(f"Rule 1: Orienting edge {u} o-* {c} to {u} -> {c}.") # orient the edge from u to c and delete # the edge from c to u if graph.has_edge(u, c, graph.circle_edge_name): graph.orient_uncertain_edge(u, c) if graph.has_edge(c, u, graph.circle_edge_name): graph.remove_edge(c, u, graph.circle_edge_name) added_arrows = True return added_arrows def _apply_rule2(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 2 of FCI algorithm. If - A -> u *-> C, or A *-> u -> C, and - A *-o C, then orient A *-> C. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. """ added_arrows = False # check that a *-o c edge exists if graph.has_edge(a, c, graph.circle_edge_name): # - A -> u *-> C, or A *-> u -> C, and # - A *-o C, # check for A -> u and check that u *-> c condition_one = ( graph.has_edge(a, u, graph.directed_edge_name) and not graph.has_edge(u, a, graph.directed_edge_name) and not graph.has_edge(u, a, graph.circle_edge_name) and ( graph.has_edge(u, c, graph.directed_edge_name) or graph.has_edge(u, c, graph.bidirected_edge_name) ) ) # check that a *-> u -> c condition_two = ( ( graph.has_edge(a, u, graph.directed_edge_name) or graph.has_edge(a, u, graph.bidirected_edge_name) ) and graph.has_edge(u, c, graph.directed_edge_name) and not graph.has_edge(c, u, graph.directed_edge_name) and not graph.has_edge(c, u, graph.circle_edge_name) ) if condition_one or condition_two: logger.info(f"Rule 2: Orienting circle edge to {a} -> {c}") # orient a *-> c graph.orient_uncertain_edge(a, c) added_arrows = True return added_arrows def _apply_rule3(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 3 of FCI algorithm. If A *-> u <-* C, A *-o v o-* C, A/C are not adjacent, and v *-o u, then orient v *-> u. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. """ added_arrows = False # check that a and c are not adjacent if c not in graph.neighbors(a): # If A *-> u <-* C, A *-o v o-* C, A/C are not adjacent, # and v *-o u, then orient v *-> u. # check that a *-> u <-* c condition_one = ( graph.has_edge(a, u, graph.directed_edge_name) or graph.has_edge(a, u, graph.bidirected_edge_name) ) and ( graph.has_edge(c, u, graph.directed_edge_name) or graph.has_edge(c, u, graph.bidirected_edge_name) ) if not condition_one: # add quick check here to skip non-relevant u nodes return added_arrows # check for all other neighbors to find a 'v' node # with the structure A *-o v o-* C for v in graph.neighbors(u): # check that v is not a, or c if v in (a, c): continue # check that v *-o u if not graph.has_edge(v, u, graph.circle_edge_name): continue # check that a *-o v o-* c condition_two = graph.has_edge(a, v, graph.circle_edge_name) and graph.has_edge( c, v, graph.circle_edge_name ) if condition_one and condition_two: logger.info(f"Rule 3: Orienting {v} -> {u}.") graph.orient_uncertain_edge(v, u) added_arrows = True return added_arrows def _apply_rule4( self, graph: EquivalenceClass, u: Column, a: Column, c: Column, sep_set ) -> Tuple[bool, Set[Column]]: """Apply rule 4 of FCI algorithm. If a path, U = <v, ..., a, u, c> is a discriminating path between v and c for u, u o-* c, u in SepSet(v, c), orient u -> c. Else, orient a <-> u <-> c. A discriminating path, p, is one where: - p has at least 3 edges - u is non-endpoint and u is adjacent to c - v is not adjacent to c - every vertex between v and u is a collider on p and parent of c Parameters ---------- graph : PAG PAG to orient. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. sep_set : set The separating set to check. Notes ----- ... """ import pywhy_graphs as pgraph added_arrows = False explored_nodes: Set[Column] = set() # a must point to c for us to begin a discriminating path and # not be bi-directional if not graph.has_edge(a, c, graph.directed_edge_name) or graph.has_edge( c, a, graph.bidirected_edge_name ): return added_arrows, explored_nodes # c must also point to u with a circle edge # check u o-* c if not graph.has_edge(c, u, graph.circle_edge_name): return added_arrows, explored_nodes # 'a' cannot be a definite collider if there is no arrow pointing from # u to a either as: u -> a, or u <-> a if not graph.has_edge(u, a, graph.directed_edge_name) and not graph.has_edge( u, a, graph.bidirected_edge_name ): return added_arrows, explored_nodes found_discriminating_path, disc_path, explored_nodes = pgraph.discriminating_path( graph, u, a, c, self.max_path_length ) disc_path_str = " ".join( [f"{disc_path[idx]}, {disc_path[idx + 1]}" for idx in range(len(disc_path) - 1)] ) if found_discriminating_path: # the last node is the first one on the discriminating path by convention last_node = disc_path[0] # now check if u is in SepSet(v, c) # handle edge case where sep_set is empty. if last_node in sep_set: if is_in_sep_set(u, sep_set, last_node, c, "any"): # orient u -> c graph.remove_edge(c, u, graph.circle_edge_name) if graph.has_edge(u, c, graph.circle_edge_name): graph.orient_uncertain_edge(u, c) logger.info(f"Rule 4: orienting {u} -> {c}.") logger.info(disc_path_str) else: # orient u <-> c if graph.has_edge(u, c, graph.circle_edge_name): graph.orient_uncertain_edge(u, c) if graph.has_edge(c, u, graph.circle_edge_name): graph.orient_uncertain_edge(c, u) logger.info(f"Rule 4: orienting {u} <-> {c}.") logger.info(disc_path_str) added_arrows = True return added_arrows, explored_nodes def _apply_rule5(self, graph: EquivalenceClass, u: Column, a: Column) -> bool: """Apply rule 5 of FCI algorithm. For each A o-o U, if there is an uncovered (e.g. every triple is unshielded) circle path p = <A, B, ..., D, U> such that A, D are not adjacent and U, B are not adjacent, then orient A o-o U and every edge on p as undirected. Note that unlike the other rules, R5 is a binary operator and operates on two nodes, rather than a triplet. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. Returns ------- added_tails : bool Whether or not tails were modified in the graph. """ import pywhy_graphs as pgraphs added_tails = False if graph.has_edge(a, u, graph.circle_edge_name) and graph.has_edge( u, a, graph.circle_edge_name ): circle_path, found_circle_path = pgraphs.uncovered_pd_path( graph, a, u, self.max_path_length, forbid_node=u, force_circle=True ) if found_circle_path: added_tails = True rotated_circle_path = circle_path.copy() rotated_circle_path.append(rotated_circle_path.pop(0)) for x, y in zip(circle_path, rotated_circle_path): graph.remove_edge(x, y, graph.circle_edge_name) graph.remove_edge(y, x, graph.circle_edge_name) graph.add_edge(x, y, graph.undirected_edge_name) return added_tails def _apply_rule6(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 6 of FCI algorithm. If A - u o-* C then orient u o-* C as u -* C Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_tails : bool Whether or not tails were modified in the graph. """ # Check A - u added_tails = False if graph.has_edge(a, u, graph.undirected_edge_name): # Check u o-* C if graph.has_edge(c, u, graph.circle_edge_name): added_tails = True graph.remove_edge(c, u, graph.circle_edge_name) # If u o- c then put the undirected edge in if not graph.has_edge(u, c, graph.directed_edge_name) and not graph.has_edge( u, c, graph.bidirected_edge_name ): graph.add_edge(c, u, graph.undirected_edge_name) return added_tails def _apply_rule7(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 7 of FCI algorithm. If a -o u o-* c and a, c are not adjacent, then u o-* c is oriented as u -* c Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_tails : bool Whether or not tails were modified in the graph. """ added_tails = False if c not in graph.neighbors(a) and a not in graph.neighbors(c): if ( graph.has_edge(a, u, graph.circle_edge_name) and not graph.has_edge(u, a, graph.circle_edge_name) and not graph.has_edge(u, a, graph.directed_edge_name) and graph.has_edge(c, u, graph.circle_edge_name) ): added_tails = True graph.remove_edge(c, u, graph.circle_edge_name) # If u o- c then put the undirected edge in if not graph.has_edge(u, c, graph.directed_edge_name) and not graph.has_edge( u, c, graph.bidirected_edge_name ): graph.add_edge(c, u, graph.undirected_edge_name) return added_tails def _apply_rule8(self, graph: EquivalenceClass, u: Column, a: Column, c: Column) -> bool: """Apply rule 8 of FCI algorithm. If: - A -> u -> C, or A -o u -> C, (the second condition is only present with selection bias) - and A o-> C, - then orient A o-> C as A -> C. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. """ # If A -> u -> C, or A -o u -> C # and A o-> C, then orient A o-> C as A -> C. added_arrows = False # First check that A o-> C if graph.has_edge(c, a, graph.circle_edge_name) and graph.has_edge( a, c, graph.directed_edge_name ): # check that A -> u condition_one_Adirectu = graph.has_edge( a, u, graph.directed_edge_name ) and not graph.has_edge(u, a, graph.circle_edge_name) # check that A -o u # Note: this is not possible without first running R5-7 because a tail with a circle # edge would not occur through any of the other rules. condition_one_Acircleu = graph.has_edge(a, u, graph.circle_edge_name) and not ( graph.has_edge(u, a, graph.circle_edge_name) or graph.has_edge(u, a, graph.directed_edge_name) ) if self.selection_bias: condition_one = condition_one_Adirectu or condition_one_Acircleu else: condition_one = condition_one_Adirectu # check that u -> C condition_two = graph.has_edge(u, c, graph.directed_edge_name) and not graph.has_edge( c, u, graph.circle_edge_name ) if condition_one and condition_two: logger.info(f"Rule 8: Orienting {a} o-> {c} as {a} -> {c}.") # now orient A o-> C as A -> C if graph.has_edge(c, a, graph.circle_edge_name): graph.remove_edge(c, a, graph.circle_edge_name) added_arrows = True return added_arrows def _apply_rule9( self, graph: EquivalenceClass, u: Column, a: Column, c: Column ) -> Tuple[bool, List]: """Apply rule 9 of FCI algorithm. If A o-> C and p = <A, u, v, ..., C> is an uncovered possibly directed path from A to C such that u and C are not adjacent, orient A o-> C as A -> C. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. uncov_path : list The uncovered potentially directed path from 'a' to 'c' through 'u'. """ import pywhy_graphs as pgraph added_arrows = False uncov_path: List[Column] = [] # Check A o-> C and # check that u is not adjacent to c if ( graph.has_edge(c, a, graph.circle_edge_name) and graph.has_edge(a, c, graph.directed_edge_name) ) and c not in graph.neighbors(u): # check that A - u - v, ..., c is an uncovered pd path uncov_path, path_exists = pgraph.uncovered_pd_path( graph, u, c, max_path_length=self.max_path_length, first_node=a ) # orient A o-> C to A -> C if path_exists: logger.info(f"Rule 9: Orienting edge {a} o-> {c} to {a} -> {c}.") if graph.has_edge(c, a, graph.circle_edge_name): graph.remove_edge(c, a, graph.circle_edge_name) added_arrows = True return added_arrows, uncov_path def _apply_rule10( self, graph: EquivalenceClass, u: Column, a: Column, c: Column ) -> Tuple[bool, List, List]: """Apply rule 10 of FCI algorithm. If A o-> C and u -> C <- v and - p1 is an uncovered pd path from A to u - p2 is an uncovered pd path from A to v Then say m is adjacent to A on p1 (could be u). Say w is adjacent to A on p2 (could be v). If m and w are distinct and not adjacent, then orient A o-> C as A -> C. Parameters ---------- graph : PAG The causal graph to apply rules to. u : node A node in the graph. a : node A node in the graph. c : node A node in the graph. Returns ------- added_arrows : bool Whether or not arrows were modified in the graph. """ import pywhy_graphs as pgraph added_arrows = False a_to_u_path: List[Column] = [] a_to_v_path: List[Column] = [] # Check A o-> C if graph.has_edge(c, a, graph.circle_edge_name) and graph.has_edge( a, c, graph.directed_edge_name ): # check that u -> C if graph.has_edge(u, c, graph.directed_edge_name) and not graph.has_edge( c, u, graph.circle_edge_name ): # loop through all adjacent neighbors of c now to get # possible 'v' node for v in graph.neighbors(c): if v in (a, u): continue # make sure v -> C and not v o-> C if not graph.has_edge(v, c, graph.directed_edge_name) or graph.has_edge( c, v, graph.circle_edge_name ): continue # At this point, we want the paths from A to u and A to v # to begin with a distinct m and w node, else we will not # apply R10. Thus, we will get all 2-pairs of neighbors of A # that: # i) begin the uncovered pd path and # ii) are distinct (done by construction) here for (m, w) in combinations(graph.neighbors(a), 2): # type: ignore if m == c or w == c: continue # m and w must be on a potentially directed path if not graph.has_edge(a, m, graph.directed_edge_name) or not graph.has_edge( a, w, graph.directed_edge_name ): continue # we do not know which path a-u or a-v, m and w are on # so we must traverse the graph in both directions # get the uncovered pd path from A to u just once found_uncovered_a_to_v = False a_to_u_path, found_uncovered_a_to_u = pgraph.uncovered_pd_path( graph, a, u, max_path_length=self.max_path_length, second_node=m ) # we did not find a path from 'a' to 'u' through 'm', so look for # a path through 'w' instead if not found_uncovered_a_to_u: a_to_u_path, found_uncovered_a_to_u = pgraph.uncovered_pd_path( graph, a, u, max_path_length=self.max_path_length, second_node=w ) # if we don't have an uncovered pd path here, then no point in looking # for other paths if found_uncovered_a_to_u: a_to_v_path, found_uncovered_a_to_v = pgraph.uncovered_pd_path( graph, a, v, max_path_length=self.max_path_length, second_node=m ) else: a_to_v_path, found_uncovered_a_to_v = pgraph.uncovered_pd_path( graph, a, v, max_path_length=self.max_path_length, second_node=w ) # if we have not found another path, then just continue if not found_uncovered_a_to_v: continue # at this point, we have an uncovered path from a to u and a to v # with a distinct second node on both paths # orient A o-> C to A -> C logger.info(f"Rule 10: Orienting edge {a} o-> {c} to {a} -> {c}.") if graph.has_edge(c, a, graph.circle_edge_name): graph.remove_edge(c, a, graph.circle_edge_name) added_arrows = True return added_arrows, a_to_u_path, a_to_v_path def _apply_orientation_rules(self, graph: EquivalenceClass, sep_set: SeparatingSet): idx = 0 finished = False while idx < self.max_iter and not finished: change_flag = False logger.info(f"Running R1-10 for iteration {idx}") for u in graph.nodes: for (a, c) in permutations(graph.neighbors(u), 2): logger.debug(f"Check {u} {a} {c}") # apply R1-3 to orient triples and arrowheads r1_add = self._apply_rule1(graph, u, a, c) r2_add = self._apply_rule2(graph, u, a, c) r3_add = self._apply_rule3(graph, u, a, c) # apply R4, orienting discriminating paths r4_add, _ = self._apply_rule4(graph, u, a, c, sep_set) # apply R5-7 to handle cases where selection bias is present if self.selection_bias: r5_add = self._apply_rule5(graph, u, a) r6_add = self._apply_rule6(graph, u, a, c) r7_add = self._apply_rule7(graph, u, a, c) else: r5_add = False r6_add = False r7_add = False # apply R8 to orient more tails r8_add = self._apply_rule8(graph, u, a, c) # apply R9-10 to orient uncovered potentially directed paths r9_add, _ = self._apply_rule9(graph, a, u, c) # a and c are neighbors of u, so u is the endpoint desired r10_add, _, _ = self._apply_rule10(graph, a, c, u) # see if there was a change flag all_flags = [ r1_add, r2_add, r3_add, r4_add, r5_add, r6_add, r7_add, r8_add, r9_add, r10_add, ] if any(all_flags) and not change_flag: logger.info(f"{change_flag} with " f"{all_flags}") change_flag = True # check if we should continue or not if not change_flag: finished = True if not self.selection_bias: logger.info(f"Finished applying R1-4, and R8-10 with {idx} iterations") if self.selection_bias: logger.info(f"Finished applying R1-10 with {idx} iterations") break idx += 1
[docs] def learn_skeleton( self, data: pd.DataFrame, context: Context, sep_set: Optional[SeparatingSet] = None ) -> Tuple[nx.Graph, SeparatingSet]: # now compute all possibly d-separating sets and learn a better skeleton skel_alg = LearnSemiMarkovianSkeleton( self.ci_estimator, sep_set=sep_set, alpha=self.alpha, min_cond_set_size=self.min_cond_set_size, max_cond_set_size=self.max_cond_set_size, max_combinations=self.max_combinations, condsel_method=self.condsel_method, second_stage_condsel_method=self.pds_condsel_method, keep_sorted=self.keep_sorted, max_path_length=self.max_path_length, n_jobs=self.n_jobs, ) skel_alg.learn_graph(data, context) skel_graph = skel_alg.adj_graph_ sep_set = skel_alg.sep_set_ self.n_ci_tests += skel_alg.n_ci_tests return skel_graph, sep_set
[docs] def orient_edges(self, graph: EquivalenceClass): # orient colliders again self.orient_unshielded_triples(graph, self.separating_sets_) # run the rest of the rules to orient as many edges # as possible self._apply_orientation_rules(graph, self.separating_sets_)
def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: import pywhy_graphs as pgraph # convert the undirected skeleton graph to a PAG, where # all left-over edges have a "circle" endpoint pag = pgraph.PAG(incoming_circle_edges=graph, name="PAG derived with FCI") return pag