Source code for

from typing import List, Optional, Tuple

import networkx as nx
import numpy as np
from numpy.typing import NDArray

from dodiscover.toporder._base import BaseTopOrder
from dodiscover.toporder.utils import full_adj_to_order, pns

[docs]class CAM(BaseTopOrder): """The CAM (Causal Additive Model) algorithm for causal discovery. CAM :footcite:`Buhlmann2013` iteratively defines a topological ordering by leaf additions. Then it prunes the fully connected DAG consistent with the inferred topological order. The method assumes Additive Noise Model and Gaussianity of the noise terms. Parameters ---------- alpha : float, optional Alpha cutoff value for variable selection with hypothesis testing over regression coefficients, default is 0.05. prune : bool, optional If True (default), apply CAM-pruning after finding the topological order. n_splines : int, optional Number of splines to use for the feature function, default is 10. Automatically decreased in case of insufficient samples splines_degree: int, optional Order of spline to use for the feature function, default is 3. pns : bool, optional If True, perform Preliminary Neighbour Search (PNS) before CAM pruning step, default is False. Allows scaling CAM pruning and ordering to large graphs. pns_num_neighbors: int, optional Number of neighbors to use for PNS. If None (default) use all variables. pns_threshold: float, optional Threshold to use for PNS, default is 1. References ---------- .. footbibliography:: Notes ----- Prior knowledge about the included and excluded directed edges in the output DAG is supported. It is not possible to provide explicit constraints on the relative positions of nodes in the topological ordering. However, explicitly including a directed edge in the DAG defines an implicit constraint on the relative position of the nodes in the topological ordering (i.e. if directed edge `(i,j)` is encoded in the graph, node `i` will precede node `j` in the output order). """ def __init__( self, alpha: float = 0.05, prune: bool = True, n_splines: int = 10, splines_degree: int = 3, pns: bool = False, pns_num_neighbors: Optional[int] = None, pns_threshold: float = 1, ): super().__init__( alpha, prune, n_splines, splines_degree, pns, pns_num_neighbors, pns_threshold ) self.inf = np.finfo(np.float32).min def _top_order(self, X: NDArray) -> Tuple[NDArray, List[int]]: """ Find the topological ordering of the causal variables from the dataset `X`. Parameters ---------- X : np.ndarray of shape (n_samples, n_nodes) Dataset of observations of the causal variables. Returns ------- A_dense : np.ndarray Fully connexted matrix admitted by the topological ordering. order : List[int] Inferred causal order """ def initialize_directed_paths(d): G_included = self._get_included_edges_graph() # nx.Graph with included edges directed_paths = np.zeros((d, d)) np.fill_diagonal(directed_paths, 1) for i in range(d): for j in range(d): if G_included.has_edge(i, j): self._update_directed_paths(i, j, directed_paths) return directed_paths _, d = X.shape A = nx.to_numpy_array(self._get_included_edges_graph()) directed_paths = initialize_directed_paths( d ) # directed_paths[i,j]=1 if there is a directed path from i to j score_gains, score = self._initialize_score(X, directed_paths) while np.sum(score_gains > self.inf) > 0: parent, child = np.unravel_index( np.argmax(score_gains, axis=None), score_gains.shape ) # greedily select edge with largest score contribution A[parent, child] = 1 score[child] += score_gains[ parent, child ] # update score of child node given the selected edge self._update_acyclicity_constraints(parent, child, score_gains, directed_paths) # Update score column associated to child node self._update_score(X, A, child, score_gains, score[child]) order = full_adj_to_order(A) return A, order def _update_score(self, X: NDArray, A: NDArray, c: int, score_gains: NDArray, score_c: float): """ Update column c of score_gains matrix, where c is a node associated to a new incoming edge. Parameters ---------- X : np.ndarray of shape (n_samples, n_nodes) Matrix of the data. A : np.ndarray of shape (n_nodes, n_nodes) Current adjacency matrix. c : int Column of score_gains to be updated. score_gains : np.ndarray Matrix of score gains to be updated. score_c : float Score of c-th node under current ordering. """ def valid_parent(pot_parent, current_parents): if pot_parent == c or pot_parent in current_parents: return False if score_gains[pot_parent, c] == self.inf: return False return True d = A.shape[0] current_parents = np.argwhere(A[:, c]) for pot_parent in range(d): if valid_parent(pot_parent, current_parents): predictors = np.append(current_parents, [pot_parent]) gam = self._fit_gam_model(X[:, predictors], X[:, c].reshape(-1, 1)) residuals = X[:, c] - gam.predict(X[:, predictors]) gain = -np.log(np.var(residuals)) - score_c score_gains[pot_parent, c] = gain def _update_directed_paths(self, parent: int, child: int, directed_paths: NDArray): """Update directed paths in the graph given the new (parent, child) edge. Parameters ---------- parent : int Parent of the child node in the input `parent`,`child` edge. child : int Child of the parent node in the input `parent`,`child` edge. directed_paths : np.ndarray Existing directed paths in the graph. """ directed_paths[ parent, child ] = 1 # directed_paths[i,j]=1 if there is a directed path from i to j child_descendants = np.argwhere(directed_paths[child, :]) parent_ancestors = np.argwhere(directed_paths[:, parent]) for p in parent_ancestors: # add path from every ancestor to every descendant for c in child_descendants: directed_paths[p, c] = 1 def _update_acyclicity_constraints( self, parent: int, child: int, score_gains: NDArray, directed_paths: NDArray ): """Update the acyclicity constraints and directed paths given new (parent, child) edge. Add aciclicity constraints given (parent, child) edge addition, and update the existing directed paths. In order to forbid i -> j edge selection, set score_gains[i, j] = -Inf Parameters ---------- parent : int Parent of the child node in the input `parent`,`child` edge. child : int Child of the parent node in the input `parent`,`child` edge. score_gains : np.ndarray Matrix of the score gain. score_gain[i,j] is the gain in score obtaied by addition of i -> j edge to the graph. directed_paths : np.ndarray Existing directed paths in the graph. """ score_gains[parent, child] = self.inf # do not select same edge twice score_gains[child, parent] = self.inf self._update_directed_paths(parent, child, directed_paths) score_gains[np.transpose(directed_paths == 1, (1, 0))] = self.inf def _initialize_score(self, X: NDArray, directed_paths: NDArray) -> Tuple[NDArray, NDArray]: """ Initialize score gains matrix and the score contribution of each node. If self.do_pns = True or self.do_pns is None and number of nodes >= 20. Parameters ---------- X : np.array of shape (n_samples, n_nodes) Matrix of the data. directed_paths : np.ndarray of shape (n_nodes, n_nodes) Matrix encoding the directed paths in the graph. directed_paths[i,j]=1 if there is a directed path from i to j. Returns ------- score_gain : np.ndarray of shape (n_nodes, n_nodes) Matrix of the gains. score_gain[i, j] is the additive contribute to the score (i.e. the gain) in adding i as parent of j. init_score : np.ndarray of shape (n_nodes,) Vector with the score contribute of each node. Since the initial topological ordering is empty, all nodes are initially treated as source. """ _, d = X.shape G_excluded = self._get_excluded_edges_graph() # nx.Graph with excluded edges G_included = self._get_included_edges_graph() # nx.Graph with included edges if (self.do_pns) or (self.do_pns is None and d > 20): A_pns = pns( A=np.ones((d, d)), X=X, pns_threshold=self.pns_threshold, pns_num_neighbors=self.pns_num_neighbors, ) # Exclude edges removed by PNS for i in range(d): for j in range(d): if i != j and A_pns[i, j] == 0 and (not G_included.has_edge(i, j)): G_excluded.add_edge(i, j) # Initialize matrix of score gains and vector of initial scores score_gains = np.zeros((d, d)) score_gains[ np.transpose(directed_paths == 1, (1, 0)) ] = self.inf # avoid cycles setting entries to -Inf according to current directed_paths nodes_variance = np.var(X, axis=0) init_score = -np.log(nodes_variance) # Score gain for each edge i -> j for i in range(d): for j in range(d): if G_excluded.has_edge(i, j): score_gains[i, j] = self.inf elif score_gains[i, j] != self.inf: gam = self._fit_gam_model(X[:, i].reshape(-1, 1), X[:, j].reshape(-1, 1)) residuals = X[:, j] - gam.predict(X[:, i].reshape(-1, 1)) gain = -np.log(np.var(residuals)) - init_score[j] score_gains[i, j] = gain return score_gains, init_score