Source code for dsigma.jackknife

"""Module containing jackknife resampling functions."""

from copy import deepcopy

import astropy.units as u
import numpy as np
from astropy.table import Table
from scipy.ndimage import gaussian_filter
from scipy.spatial import cKDTree
from sklearn.cluster import DBSCAN, MiniBatchKMeans

from .helpers import in_degrees, spherical_to_cartesian

__all__ = ['compress_jackknife_fields', 'compute_jackknife_fields',
           'jackknife_resampling', 'smooth_covariance_matrix']


[docs] def compute_jackknife_fields(table, centers, distance_threshold=1, weights=None, seed=None): """Compute the centers for jackknife regions using DBSCAN and KMeans. The function first runs DBSCAN to identify continuous fields of points. Afterwards, KMeans clustering is run. The initial cluster centers are random points from each continuous field. The number of initial cluster centers per field is determined according to the total weight of each continuous field. The centers are defined in cartesian coordinates on a unit sphere. Parameters ---------- table : astropy.table.Table Catalog containing objects. The catalog needs to have coordinates and field IDs. The jackknife field for each galaxy will be added in the `field_jk` column. centers : int or numpy.ndarray If a number, total number of jackknife fields. Otherwise, the centers returned from a previous call to that function. This allows for different samples to have the same jackknife fields. distance_threshold : float, optional The angular separation in degrees used to link points and calculate continuous fields before running KMeans. Default is 1. weights : numpy.ndarray or None, optional Per-lens weights for clustering. If ``None``, assume the same weight for all points. Default is ``None``. seed : int or None, optional Random seed to initialize the random number generator. Default is ``None``. Returns ------- centers : numpy.ndarray The coordinates of the centers of the jackknife regions. """ xyz = np.column_stack(spherical_to_cartesian( in_degrees(table['ra'].quantity), in_degrees(table['dec'].quantity))) if isinstance(centers, np.ndarray): kdtree = cKDTree(centers) table['field_jk'] = kdtree.query(xyz)[1] return centers if weights is None: weights = np.ones(len(table)) n_jk = centers if not isinstance(distance_threshold, u.quantity.Quantity): distance_threshold *= u.deg eps = np.sqrt( 2 - 2 * np.cos(distance_threshold.to(u.rad).value)) c = DBSCAN(eps=eps, algorithm='kd_tree').fit(xyz).labels_ w_c = np.bincount(c[c != -1], weights=weights[c != -1]) if n_jk < len(w_c): msg = ("The number of jackknife regions cannot be smaller than the " "number of continuous fields. Try increasing " "`distance_threshold` or decreasing `centers`.") raise RuntimeError(msg) # Assign the number of jackknife fields according to the total number of # objects in each field. n_jk_per_c = np.diff(np.rint( np.cumsum(w_c) / np.sum(w_c) * n_jk).astype(int), prepend=0) # It can happen that one field is assigned 0 jackknife fields. In this # case, we will assign 1. while np.any(w_c[n_jk_per_c == 0] > 0): n_jk_per_c[np.argmin(n_jk_per_c)] += 1 n_jk_per_c[np.argmax(n_jk_per_c)] -= 1 rng = np.random.default_rng(seed) init = np.zeros((0, 3)) for i in range(len(w_c)): mask = i != c if w_c[i] > 0: init = np.vstack([init, xyz[~mask][rng.choice( np.sum(~mask), n_jk_per_c[i], replace=False, p=weights[~mask] / w_c[i])]]) kmeans = MiniBatchKMeans(n_clusters=n_jk, init=init, n_init=1, random_state=int(rng.integers(2**31))) centers = kmeans.fit( xyz[weights > 0], sample_weight=weights[weights > 0]).cluster_centers_ compute_jackknife_fields(table, centers) return centers
[docs] def compress_jackknife_fields(table): """Sum together all lenses in each jackknife field. After assigning jackknife fields, for most applications, we do not need information on individual objects anymore. Compress the information in each jackknife field by taking weighted averages. The only exception is the weight column where the sum is taken. Parameters ---------- table : astropy.table.Table Catalog containing objects. The catalog needs to have been assigned jackknife fields. Returns ------- table_jk : astropy.table.Table Catalog containing the information for each jackknife field. It has exactly as many rows as there are jackknife fields. """ table = table.copy() table.sort('field_jk') table_jk = Table() table_jk.meta = deepcopy(table.meta) table_jk['field_jk'], counts = np.unique( table['field_jk'], return_counts=True) for key in table.colnames: if not (key in ['w_sys', 'ra', 'dec', 'z'] or key[:3] == 'sum'): continue table_jk[key] = np.zeros( (len(table_jk), ) + table[key].shape[1:], dtype=table[key].dtype) if table[key].unit is not None: table_jk[key] = table_jk[key] * table[key].unit for i in range(len(table_jk)): k_min = 0 if i == 0 else np.cumsum(counts)[i - 1] k_max = np.cumsum(counts)[i] if key == 'w_sys': table_jk[key][i] = np.sum(table[key][k_min:k_max]) elif key == 'sum 1': table_jk[key][i] = np.sum(table[key][k_min:k_max], axis=0) elif np.sum(table['w_sys'][k_min:k_max]) > 0: table_jk[key][i] = np.average( table[key][k_min:k_max], weights=table['w_sys'][k_min:k_max], axis=0) return table_jk
[docs] def smooth_covariance_matrix(cov, sigma): """Smooth a covariance matrix. This function first calculates the correlation matrix, then applies a Gaussian filter on the correlation matrix, and finally reconstructs the covariance matrix using the original diagonal and smoothed correlation matrix. Parameters ---------- cov : numpy.ndarray or astropy.units.quantity.Quantity Covariance matrix. sigma : float Scale of the Gaussian filter. Returns ------- cov_smooth : numpy.ndarray or astropy.units.quantity.Quantity Smoothed covariance matrix. """ n_dim = len(np.diag(cov)) diag_cov = np.diag(cov) cor = cov / np.outer(np.sqrt(diag_cov), np.sqrt(diag_cov)) # Set diagonal elements to 0 before filtering. cor = gaussian_filter(cor - np.eye(n_dim), sigma) # Diagonal elements were 0 but may not be 0 now. Undo that by return the # values to the off-diagonal elements. while not np.allclose(np.diag(cor), 0, rtol=0, atol=1e-12): cor += (gaussian_filter(np.diag(np.diag(cor)), sigma) - np.diag(np.diag(cor))) for i in range(n_dim): cor[i, i] = 1 return cor * np.outer(np.sqrt(diag_cov), np.sqrt(diag_cov))
[docs] def jackknife_resampling(f, table_l, table_r=None, table_l_2=None, table_r_2=None, compress=True, **kwargs): """Compute the covariance of a function from jackknife re-sampling. Parameters ---------- f : callable Function that returns a result for which we want to have uncertainties. The function must take exactly one positional argument, the lens table. Additionally, it can have several additional keyword arguments. table_l : astropy.table.Table Precompute results for the lenses. The catalog must have jackknife regions assigned to it. table_r : astropy.table.Table or None, optional Precompute results for random lenses. The input function must accept the random lens table via the ``table_r`` keyword argument. Default is ``None``. table_l_2 : astropy.table.Table or None, optional Precompute results for a second set of lenses.The input function must accept the second lens table via the ``table_l_2`` keyword argument. Default is ``None``. table_r_2 : astropy.table.Table or None, optional Precompute results for a second set of random lenses. The input function must accept the second random lens table via the ``table_r_2`` keyword argument. Default is ``None``. compress : bool, optional If ``True``, compress jackknife fields via ``dsigma.jackknife.compress_jackknife_fields`` before performing the jackknife calculation. This can substantially improve performance. Default is ``True``. **kwargs Additional keyword arguments to be passed to the function. Returns ------- cov : numpy.ndarray or astropy.units.quantity.Quantity Covariance matrix of the result derived from jackknife re-sampling. """ samples = [] if compress: table_l = compress_jackknife_fields(table_l) if table_r is not None: table_r = compress_jackknife_fields(table_r) if table_l_2 is not None: table_l_2 = compress_jackknife_fields(table_l_2) if table_r_2 is not None: table_r_2 = compress_jackknife_fields(table_r_2) for field_jk in np.unique(table_l['field_jk']): for name, table in zip(['table_r', 'table_l_2', 'table_r_2'], [table_r, table_l_2, table_r_2]): if table is not None: kwargs[name] = table[table['field_jk'] != field_jk] samples.append(f(table_l[table_l['field_jk'] != field_jk], **kwargs)) cov = ((len(np.unique(table_l['field_jk'])) - 1) * np.cov(np.array(samples), rowvar=False, ddof=0)) if isinstance(samples[0], u.Quantity): cov = cov * samples[0].unit**2 return cov