Source code for dodiscover.ci.monte_carlo
from typing import Optional
import numpy as np
import scipy.spatial
from numpy.typing import ArrayLike
from sklearn.neighbors import NearestNeighbors
[docs]def generate_knn_in_subspace(
z_arr: ArrayLike, method: str = "knn", k: int = 1, n_jobs: Optional[int] = None
) -> ArrayLike:
"""Generate kNN in subspace.
Parameters
----------
z_arr : ArrayLike of shape (n_samples, n_features_z)
The covariate space.
method : str, optional
Method to use, by default 'knn'. Can be ('knn', 'kdtree').
k : int, optional
The number of k-nearest neighbors to query, by default 1.
n_jobs : int,
The number of CPUs to use for joblib. By default, None.
Returns
-------
indices : ArrayLike of shape (n_samples, k)
The indices of the k-nearest-neighbors for each sample.
"""
# use a method to generate k-nearest-neighbors in subspace of Z
if method == "knn":
# compute the nearest neighbors in the space of "Z training" using ball-tree alg.
nbrs = NearestNeighbors(
n_neighbors=k + 1, algorithm="ball_tree", metric="l2", n_jobs=n_jobs
).fit(z_arr)
# then get the K nearest nbrs in the Z space
_, indices = nbrs.kneighbors(z_arr)
elif method == "kdtree":
tree_xyz = scipy.spatial.cKDTree(z_arr)
indices = tree_xyz.query(z_arr, k=k, p=np.inf, eps=0.0, workers=n_jobs)[1].astype(np.int32)
return indices
[docs]def restricted_nbr_permutation(nbrs: ArrayLike, random_seed=None) -> ArrayLike:
"""Compute a permutation of neighbors with restrictions.
Parameters
----------
nbrs : ArrayLike of shape (n_samples, k)
The k-nearest-neighbors for each sample index.
random_seed : int, optional
Random seed, by default None.
Returns
-------
restricted_perm : ArrayLike of shape (n_samples)
The final permutation order of the sample indices. There may be
repeating samples. See Notes for details.
Notes
-----
Restricted permutation goes through random samples and looks at the k-nearest
neighbors (columns of ``nbrs``) and shuffles the closest neighbor index only
if it has not been used to permute another sample. If it has been, then the
algorithm looks at the next nearest-neighbor and so on. If all k-nearest
neighbors of a sample has been checked, then a random neighbor is chosen. In this
manner, the algorithm tries to perform permutation without replacement, but
if necessary, will choose a repeating neighbor sample.
"""
n_samples, k_dims = nbrs.shape
rng = np.random.default_rng(seed=random_seed)
# initialize the final permutation order
restricted_perm = np.zeros((n_samples,))
# generate a random order of samples to go through
random_order = rng.permutation(n_samples)
# keep track of values we have already used
used = set()
# go through the random order
for idx in random_order:
m = 0
use_idx = nbrs[idx, m]
# if the current nbr is already used, continue incrementing
# until we have either found a new sample to use, or if
# we have reach the maximum number of shuffles to consider
while (use_idx in used) and (m < k_dims - 1):
m += 1
use_idx = nbrs[idx, m]
# check whether or not we have exhaustively checked all kNN
if use_idx in used and m == k_dims:
# XXX: Note this step is not in the original paper
# choose a random neighbor to permute
restricted_perm[idx] = rng.choice(nbrs[idx, :], size=1)
else:
# permute with the existing neighbor
restricted_perm[idx] = use_idx
used.add(use_idx)
return restricted_perm