fromtypingimportCallable,List,Optionalimportnetworkxasnximportnumpyasnpimportscipy.statsimportpywhy_graphs.networkxaspywhy_nxfrompywhy_graphs.classesimportStationaryTimeSeriesDiGraphfrompywhy_graphs.classes.timeseriesimportnumpy_to_tsgraph,tsgraph_to_numpyfrompywhy_graphs.typingimportNodedefsimulate_random_er_dag(n_nodes:int,p:float=0.5,seed:int=None,ensure_acyclic:bool=False):"""Simulate a random Erdos-Renyi graph. Parameters ---------- n_nodes : int The number of nodes. p : float, optional The probability of an edge, by default 0.5. seed : int, optional Random seed, by default None. ensure_acyclic : bool Whether or not to ensure the resulting graph is acyclic (default is False). If True, then we will only keep edges in their topological order starting from the original simulated ER-graph that may be cyclic. Returns ------- graph : nx.DiGraph The resulting graph. """# Generate graph using networkx packageG=nx.gnp_random_graph(n=n_nodes,p=p,seed=seed,directed=True)# Convert generated graph to DAGifensure_acyclic:graph=nx.DiGraph()graph.add_nodes_from(G)graph.add_edges_from([(u,v,{})for(u,v)inG.edges()ifu<v])ifnotnx.is_directed_acyclic_graph(graph):raiseRuntimeError("The resulting random graph should be a DAG...")else:graph=Greturngraphdefsimulate_random_tsgraph(n_variables,max_lag,p_time_self=0.5,p_time_vars=0.5,p_contemporaneous=0.5,random_state=None):"""Simulate a random time-series graph. Parameters ---------- n_variables : int The number of variables. max_lag : int The maximum lag. p_time_self : float, optional Probability of auto-connection, by default 0.5. p_time_vars : float, optional Probability of connection between variables, by default 0.5. p_contemporaneous : float, optional Probability of contemporaneous edge, by default 0.5. random_state : int, optional Random seed, by default None. Returns ------- G : StationaryTimeSeriesDiGraph The stationary time series graph. """rng=np.random.default_rng(random_state)# first we sample the time-series graphnode_names=list(range(n_variables))G=StationaryTimeSeriesDiGraph(max_lag=max_lag)G.add_variables_from(node_names)# loop through all possible edge combinations from (x, 0) to (x', -lag)# for lag up to max_lagfornon_lag_nodeinG.nodes_at(t=0):forlaginrange(max_lag+1):forlag_nodeinG.nodes_at(t=lag):# then we are looking at a auto-lag nbr in the same variableifnon_lag_node[1]==lag_node[1]andp_contemporaneous>0:ifrng.binomial(n=1,p=p_contemporaneous,size=None)==1:G.add_edge(lag_node,non_lag_node)# check that the addition of this edge does not result in a cyclic# causal relationshipifnotnx.is_directed_acyclic_graph(G.subgraph(G.nodes_at(t=0))):G.remove_edge(lag_node,non_lag_node)elifnon_lag_node[0]==lag_node[0]andp_time_self>0:# sample binomial distribution with probability p_time_selfifrng.binomial(n=1,p=p_time_self,size=None)==1:G.add_edge(lag_node,non_lag_node)elifp_time_vars>0:ifrng.binomial(n=1,p=p_time_vars,size=None)==1:G.add_edge(lag_node,non_lag_node)returnG
[docs]defsimulate_data_from_var(var_arr:np.ndarray,n_times:int=1000,n_realizations:int=1,var_names:Optional[List[Node]]=None,random_state:int=None,):"""Simulate data from an already set VAR process. Parameters ---------- var_arr : ArrayLike of shape (n_variables, n_variables, max_lag + 1) The stationary time-series vector-auto-regressive process. The 3rd dimension is the lag and we assume that there are lag points ``(t=0, ..., t=-max_lag)``. n_times : int, optional Number of observations (time points) to simulate, this includes the initial observations to start the autoregressive process. By default 1000. n_realizations : int, optional The number of independent realizations to generate the VAR process, by default 1. var_names : list of nodes, optional The variable names. If passed in, then must have length equal ``n_variables``. If passed in, then the output will be converted to a pandas DataFrame with ``var_names`` as the columns. By default, None. random_state : int, optional The random state, by default None. Returns ------- x : ArrayLike of shape (n_variables, n_times * n_realizations), or pandas.DataFrame of shape (n_times * n_realizations, n_variables) The simulated data. If ``node_names`` are passed in, then the output will be converted to a pandas DataFrame. Notes ----- The simulated ``x`` array consists of multiple "instances" of the underlying stationary VAR process. For example, if ``n_times`` is 1000, and ``max_lag = 2``, then technically you have 500 realizations of the time-series graph occurring over this multivariate time-series. However, each realization is dependent on the previous realizations in this case. In order to start from a completely independent initial conditions, then one can modify the ``n_realizations`` parameter instead. To generate 500 independent realizations in the above example, one would set ``n_realizations = 500`` and ``n_times=2``. """importpandasaspdifvar_arr.shape[0]!=var_arr.shape[1]:raiseValueError(f"The shape of the VAR array should be "f"(n_variables, n_variables, max_lag). Your array dimensions are {var_arr.shape}.")n_vars,_,max_lag=var_arr.shape# the 3rd dimension of the VAR arraymax_lag=max_lag-1ifvar_namesisnotNoneandlen(var_names)!=n_vars:raiseValueError(f"The passed in node names {var_names} should have {n_vars} variables.")rng=np.random.default_rng(random_state)# initialize the final data array for speedx=np.zeros((n_vars,n_realizations*n_times))foridxinrange(n_realizations):# sample the initial conditionsx0=rng.standard_normal(size=(n_vars,max_lag))x[:,:max_lag]=x0# simulate forward in timefortdxinrange(max_lag,n_times):# note that for a single realization, this is just 'tdx'starting_point=(idx+1)*tdxygen=x[:,starting_point]# loop over the lags to generate the next sample as a linear# combination of previous lag data pointsforpdxinrange(max_lag):ygen+=np.dot(var_arr[...,pdx],x[:,tdx-pdx-1].T).T# convert to a DataFrame, if node names are explicitly passed inifvar_namesisnotNone:x=pd.DataFrame(x.T,columns=var_names)returnx
[docs]defsimulate_linear_var_process(n_variables:int=5,p_time_self:float=0.5,p_time_vars:float=0.5,p_contemporaneous:float=0.5,max_lag:int=1,n_times:int=1000,n_realizations:int=1,weight_dist:Callable=scipy.stats.norm,random_state:int=None,):"""Simulate a linear VAR process of a "stationary" causal graph. Parameters ---------- n_variables : int, optional The number of variables to included in the final simulation, by default 5. p_time_self : float, optional The probability for an edge across time for the same variable, by default 0.5. p_time_vars : float, optional The probability for an edge across time for different variables, by default 0.5. p_contemporaneous : float, optional The probability for a contemporaneous edge among different variables, by default 0.5. Can be set to 0 to specify that the underlying causal process has no instantaneous effects. max_lag : int, optional The maximum lag allowed in the resulting process, by default 1. n_times : int, optional The number of time points to generate per realization, by default 1000. See `simulate_data_from_var` for more information. n_realizations : int, optional The number of independent realizations, by default 1. See `simulate_data_from_var` for more information. weight_dist : Callable, optional The distribution of weights for connections, by default None. random_state : int, optional The random state, by default None. Returns ------- data : ArrayLike of shape (n_nodes, n_times * n_realizations) The simulated data. var_model : StationaryTimeSeriesDiGraph of shape (n_nodes, n_nodes, max_lag) The resulting time-series causal graph. Notes ----- In stationary time-series graphs without latent confounders, there is never a worry about acyclicity among "lagged nodes" in the graph. However, if we model the situation where there are instantaenous, or contemporaneous effects, then those edges may form a cycle when simulating the graph. Therefore, if ``p_contemporaneous > 0``, then an additional check for DAG among the contemporaneous edge network is checked. To simulate VAR process with latent confounders (i.e. missing variable time-series), then one can simply simulate the full VAR process and then delete the simulated time-series data from the latent variable. """# first we sample the time-series graphG=simulate_random_tsgraph(n_variables,max_lag=max_lag,p_time_self=p_time_self,p_time_vars=p_time_vars,p_contemporaneous=p_contemporaneous,random_state=random_state,)# then we convert this into an array of 1's and 0's# we maintain a lagged-order of the nodes, so that way# reshaping into a 3D array works properlyvar_order=list(G.variables)ts_graph_arr=tsgraph_to_numpy(G,var_order=var_order)# reshape the array to the correct shape# graph_arr = graph_arr.reshape((n_variables, n_variables, max_lag + 1))nnz_index=np.nonzero(ts_graph_arr)nnz=np.count_nonzero(ts_graph_arr)# we sample weights from our weight distribution to fill# in every non-zero index of our VAR arrayweights=weight_dist.rvs(size=nnz)# type: ignorets_graph_arr[nnz_index]=weights# our resulting VAR array is the function, which we will# simulate our data, starting from random Gaussian initial conditions.x=simulate_data_from_var(var_arr=ts_graph_arr,n_times=n_times,n_realizations=n_realizations,var_names=var_order,random_state=random_state,)returnx,G
[docs]defsimulate_var_process_from_summary_graph(G:pywhy_nx.MixedEdgeGraph,max_lag=1,n_times=1000,random_state:int=None):"""Simulate a VAR(max_lag) process starting from a summary graph. Parameters ---------- G : nx.MixedEdgeGraph A time-series summary graph. max_lag : int, optional The maximum time-lag to consider, by default 1, which corresponds to a VAR(1) process. n_times : int Number of observations (time points) to simulate, this includes the initial observations to start the autoregressive process. By default 1000. random_state : int, optional The random seed, by default None. Returns ------- x_df : pandas DataFrame of shape (n_nodes, n_times) The sampled dataset. var_arr : ArrayLike of shape (n_nodes, n_nodes, max_lag) The stationary time-series graph. Notes ----- Right now, it is assumed that the summary graph is just a DAG. """importpandasaspdrng=np.random.default_rng(random_state)n_nodes=G.number_of_nodes()var_arr=np.zeros((n_nodes,n_nodes,max_lag+1))# get the non-zeros# undir_graph = G.to_undirected()# simulate weights of the weight matrixsummary_arr=np.zeros((n_nodes,n_nodes))nodelist=list(G.nodes)for_,graphinG.get_graphs().items():# get the graph arraygraph_arr=nx.to_numpy_array(graph,nodelist=nodelist,weight="weight")non_zero_index=np.nonzero(graph_arr)weights=rng.normal(size=(len(non_zero_index[0]),))# set the weights in the summary graphsummary_arr[non_zero_index]=weights# Now simulate across time-points. First initialize such that# the edge between every time-point is there and reflective of the# summary graph.# Assume that every variable has an edge between time pointsforiinrange(max_lag+1):var_arr[...,i]=summary_arr# convert VAR array to stationary time-series graphG=numpy_to_tsgraph(var_arr,var_order=nodelist,create_using=StationaryTimeSeriesDiGraph)# simulate initial conditions datax0=rng.standard_normal(size=(n_nodes,max_lag))x=np.zeros((n_nodes,n_times))x[:,:max_lag]=x0# simulate forward in timefortdxinrange(max_lag,n_times):ygen=x[:,tdx]forpdxinrange(max_lag):ygen+=np.dot(var_arr[...,pdx],x[:,tdx-pdx-1].T).T# convert to a DataFramex_df=pd.DataFrame(x.T,columns=nodelist)returnx_df,G