Source code for dsigma.precompute

"""Module for pre-computing lensing results."""

import multiprocessing as mp
import queue
import warnings

import numpy as np
from astropy import units as u
from astropy.units import UnitConversionError
from astropy_healpix import HEALPix

from . import default_cosmology
from .helpers import interpolate_over_redshift
from .physics import critical_surface_density
from .physics import effective_critical_surface_density
from .precompute_engine import precompute_engine

__all__ = ['mean_photo_z_offset', 'photo_z_dilution_factor', 'precompute']


[docs] def photo_z_dilution_factor(z_l, table_c, cosmology, weighting=-2): r"""Calculate the photo-z delta sigma bias as a function of lens redshift. Parameters ---------- z_l : float or numpy.ndarray Redshift(s) of the lens. table_c : astropy.table.Table Photometric redshift calibration catalog. cosmology : astropy.cosmology Cosmology to assume for calculations. weighting : float, optional The exponent of weighting of each lens-source pair by the critical surface density. A natural choice is -2 which minimizes shape noise. Default is -2. Returns ------- f_bias : float or numpy.ndarray The photo-z bias factor, :math:`f_{\rm bias}`, for the lens redshift(s). """ z_l_max = table_c['z_l_max'] z_s = table_c['z'] z_s_true = table_c['z_true'] d_l = interpolate_over_redshift( cosmology.comoving_transverse_distance, z_l).to(u.Mpc).value d_s = cosmology.comoving_transverse_distance(table_c['z']).to(u.Mpc).value d_s_true = cosmology.comoving_transverse_distance( table_c['z_true']).to(u.Mpc).value w = table_c['w_sys'] * table_c['w'] if hasattr(z_l, '__len__'): shape = (len(z_l), len(table_c)) z_s = np.tile(z_s, len(z_l)).reshape(shape) z_s_true = np.tile(z_s_true, len(z_l)).reshape(shape) d_s = np.tile(d_s, len(z_l)).reshape(shape) d_s_true = np.tile(d_s_true, len(z_l)).reshape(shape) z_l_max = np.tile(z_l_max, len(z_l)).reshape(shape) w = np.tile(w, len(z_l)).reshape(shape) z_l = np.repeat(z_l, len(table_c)).reshape(shape) d_l = np.repeat(d_l, len(table_c)).reshape(shape) sigma_crit_phot = critical_surface_density(z_l, z_s, d_l=d_l, d_s=d_s) sigma_crit_true = critical_surface_density(z_l, z_s_true, d_l=d_l, d_s=d_s_true) mask = z_l >= z_l_max if np.any(np.all(mask, axis=-1)): msg = ("Could not find valid calibration sources for some lens " "redshifts. The f_bias correction may be undefined.") warnings.warn(msg, category=RuntimeWarning, stacklevel=2) return (np.sum((w * sigma_crit_phot**weighting) * (~mask), axis=-1) / np.sum((w * sigma_crit_phot**(weighting + 1) / sigma_crit_true) * (~mask), axis=-1))
[docs] def mean_photo_z_offset(z_l, table_c, cosmology, weighting=-2): """Calculate the mean offset of source photometric redshifts. Parameters ---------- z_l : float or numpy.ndarray Redshift(s) of the lens. table_c : astropy.table.Table, optional Photometric redshift calibration catalog. cosmology : astropy.cosmology Cosmology to assume for calculations. weighting : float, optional The exponent of weighting of each lens-source pair by the critical surface density. A natural choice is -2 which minimizes shape noise. Default is -2. Returns ------- dz : float or numpy.ndarray The mean source redshift offset for the lens redshift(s). """ z_l_max = table_c['z_l_max'] z_s = table_c['z'] z_s_true = table_c['z_true'] d_l = interpolate_over_redshift( cosmology.comoving_transverse_distance, z_l).to(u.Mpc).value d_s = cosmology.comoving_transverse_distance(table_c['z']).to(u.Mpc).value w = table_c['w_sys'] * table_c['w'] if not np.isscalar(z_l): shape = (len(z_l), len(table_c)) z_s = np.tile(z_s, len(z_l)).reshape(shape) z_s_true = np.tile(z_s_true, len(z_l)).reshape(shape) d_s = np.tile(d_s, len(z_l)).reshape(shape) z_l_max = np.tile(z_l_max, len(z_l)).reshape(shape) w = np.tile(w, len(z_l)).reshape(shape) z_l = np.repeat(z_l, len(table_c)).reshape(shape) d_l = np.repeat(d_l, len(table_c)).reshape(shape) sigma_crit = critical_surface_density(z_l, z_s, d_l=d_l, d_s=d_s) w = w * sigma_crit**weighting mask = z_l >= z_l_max return np.sum((z_s - z_s_true) * w * (~mask), axis=-1) / np.sum( w * (~mask), axis=-1)
def get_raw_multiprocessing_array(array): """Convert a numpy array into a shared-memory multiprocessing array. Parameters ---------- array : numpy.ndarray or None Input array. Returns ------- array_mp : multiprocessing.RawArray or None Output array. None if input is None. """ if array is None: return None array_mp = mp.RawArray('l' if np.issubdtype(array.dtype, np.integer) else 'd', len(array)) array_np = np.ctypeslib.as_array(array_mp) array_np[:] = array return array_mp
[docs] def precompute( table_l, table_s, bins, table_c=None, table_n=None, cosmology=None, comoving=True, weighting=-2, nside=256, n_jobs=1, progress_bar=False): """For all lenses in the catalog, precompute the lensing statistics. Parameters ---------- table_l : astropy.table.Table Catalog of lenses. table_s : astropy.table.Table Catalog of sources. bins : numpy.ndarray or astropy.units.quantity.Quantity Bins in radius to use for the stacking. If a numpy array, bins are assumed to be in Mpc. If an astropy quantity, one can pass both length units, e.g. kpc and Mpc, as well as angular units, i.e. deg and rad. table_c : astropy.table.Table, optional Additional photometric redshift calibration catalog. If provided, this will be used to statistically correct the photometric source redshifts and critical surface densities. Default is ``None``. table_n : astropy.table.Table, optional Source redshift distributions. If provided, this will be used to compute mean source redshifts and critical surface densities. These mean quantities would be used instead the individual photometric redshift estimates. The table needs to have a `z` column giving the redshift and a `n` column with the :math:`n(z)` for all samples. Default is ``None``. cosmology : astropy.cosmology or None, optional Cosmology to assume for calculations. If ``None``, use ``dsigma.default_cosmology``. Default is ``None``. comoving : bool, optional Whether to use comoving or physical quantities for radial bins (if given in physical units) and the excess surface density. Default is True. weighting : float, optional The exponent of weighting of each lens-source pair by the critical surface density. A natural choice is -2 which minimizes shape noise. Default is -2. nside : int, optional dsigma uses pixelization to group nearby lenses together and process them simultaneously. This parameter determines the number of pixels. It has to be a power of 2. May impact performance. Default is 256. n_jobs : int, optional Number of jobs to run at the same time. Default is 1. progress_bar : bool, option Whether to show a progress bar for the main loop over lens pixels. Default is False. Returns ------- table_l : astropy.table.Table Lens catalog with the pre-computation results attached to the table. Raises ------ ValueError If there are problems in the input. """ cosmology = default_cosmology if cosmology is None else cosmology if cosmology.Ok0 != 0: msg = "dsigma does not support non-flat cosmologies." raise ValueError(msg) if np.any(table_l['z'] < 0): msg = "Input lens redshifts must all be non-negative." raise ValueError(msg) if not isinstance(nside, int) or not np.isin(nside, 2**np.arange(15)): msg = f"nside must be a positive power of 2. Received {nside}." raise ValueError(msg) if not isinstance(n_jobs, int) or n_jobs < 1: msg = f"Number of jobs must be positive integer. Received {n_jobs}." raise ValueError(msg) if table_c is not None and table_n is not None: msg = "`table_c` and `table_n` cannot both be given." raise ValueError(msg) if table_n is not None: if 'z' in table_s.colnames: msg = ("When providing tomographic source redshift distributions " "via the `table_n` argument, the `z` column is ignored.") warnings.warn(msg, category=RuntimeWarning, stacklevel=2) if 'z_bin' not in table_s.colnames: msg = ("To use source redshift distributions, the source table " "needs to have a `z_bin` column.") raise ValueError(msg) if (not np.issubdtype(table_s['z_bin'].dtype, np.integer) or np.any(table_s['z_bin'] < 0)): msg = ("The `z_bin` column in the source table must contain only " "non-negative integers.") raise ValueError(msg) if np.amax(table_s['z_bin']) > table_n['n'].data.shape[1]: msg = ("The source table contains more redshift bins than where " "passed via the `table_n` argument.") raise ValueError(msg) elif 'z_l_max' in table_s.colnames and np.any( table_s['z_l_max'] > table_s['z']): msg = ("The maximum lens redshift can never be larger than the source " "redshift.") raise ValueError(msg) hp = HEALPix(nside, order='ring') pix_l = hp.lonlat_to_healpix(table_l['ra'] * u.deg, table_l['dec'] * u.deg) pix_s = hp.lonlat_to_healpix(table_s['ra'] * u.deg, table_s['dec'] * u.deg) argsort_pix_l = np.argsort(pix_l) argsort_pix_s = np.argsort(pix_s) u_pix_l, n_pix_l = np.unique(pix_l, return_counts=True) u_pix_l = np.ascontiguousarray(u_pix_l) n_pix_l = np.ascontiguousarray(np.cumsum(n_pix_l)) u_pix_s, n_pix_s = np.unique(pix_s, return_counts=True) u_pix_s = np.ascontiguousarray(u_pix_s) n_pix_s = np.ascontiguousarray(np.cumsum(n_pix_s)) table_engine_l = {} table_engine_s = {} table_engine_l['z'] = np.ascontiguousarray( table_l['z'][argsort_pix_l], dtype=np.float64) for f, f_name in zip([np.sin, np.cos], ['sin', 'cos']): for table, argsort_pix, table_engine in zip( [table_l, table_s], [argsort_pix_l, argsort_pix_s], [table_engine_l, table_engine_s]): for angle in ['ra', 'dec']: table_engine[f'{f_name} {angle}'] =\ np.ascontiguousarray(f(np.deg2rad(table[angle]))[ argsort_pix]) for key in ['z', 'z_l_max', 'w', 'e_1', 'e_2', 'm', 'e_rms', 'm_sel', 'R_11', 'R_22', 'R_12', 'R_21']: if key in table_s.colnames: table_engine_s[key] = np.ascontiguousarray( table_s[key][argsort_pix_s], dtype=np.float64) if 'z_bin' in table_s.colnames: table_engine_s['z_bin'] = np.ascontiguousarray( table_s['z_bin'][argsort_pix_s], dtype=int) if 'z_l_max' not in table_engine_s: if table_n is not None: table_engine_s['z_l_max'] = np.ascontiguousarray( np.repeat(np.finfo(np.float64).max, len(table_s)), dtype=np.float64) else: table_engine_s['z_l_max'] = table_engine_s['z'] for table, argsort_pix, table_engine in zip( [table_l, table_s], [argsort_pix_l, argsort_pix_s], [table_engine_l, table_engine_s]): if 'z' in table.colnames: table_engine['d_com'] = np.ascontiguousarray( interpolate_over_redshift( cosmology.comoving_transverse_distance, table_engine['z']).to(u.Mpc).value, dtype=np.float64) if table_n is not None: n_bins = table_n['n'].data.shape[1] z_ave = np.array([ np.average(table_n['z'], weights=table_n['n'][:, i]) for i in range(n_bins)]) table_engine_s['z'] = np.ascontiguousarray( z_ave[table_engine_s['z_bin']], dtype=np.float64) def _inverse_effective_critical_surface_density(z_l, z_bin): return 1.0 / effective_critical_surface_density( z_l, table_n['z'], table_n['n'][:, z_bin], cosmology=cosmology, comoving=comoving) sigma_crit_eff_inv = np.zeros(len(table_l) * n_bins, dtype=np.float64) for z_bin in range(n_bins): sigma_crit_eff_inv[z_bin::n_bins] = interpolate_over_redshift( _inverse_effective_critical_surface_density, table_engine_l['z'], z_bin) with np.errstate(divide='ignore'): sigma_crit_eff = np.where( sigma_crit_eff_inv > 0, 1.0 / sigma_crit_eff_inv, np.finfo(np.float64).max) table_engine_l['sigma_crit_eff'] = np.ascontiguousarray( sigma_crit_eff, dtype=np.float64) # Create arrays that will hold the final results. table_engine_r = {} n_results = len(table_l) * (len(bins) - 1) key_list = ['sum 1', 'sum w_ls', 'sum w_ls e_t', 'sum w_ls z_s', 'sum w_ls e_t sigma_crit', 'sum w_ls sigma_crit'] for table_s_key, table_r_key in zip( ['m', 'e_rms', 'm_sel', 'R_11'], ['sum w_ls m', 'sum w_ls (1 - e_rms^2)', 'sum w_ls m_sel', 'sum w_ls R_T']): if table_s_key in table_s.colnames: key_list.append(table_r_key) for key in key_list: table_engine_r[key] = np.ascontiguousarray( np.zeros(n_results, dtype=( np.int64 if key == 'sum 1' else np.float64))) if not isinstance(bins, u.quantity.Quantity): bins = bins * u.Mpc try: theta_bins = np.tile(bins.to(u.rad).value, len(table_l)) except UnitConversionError: theta_bins = (np.tile(bins.to(u.Mpc).value, len(table_l)) / np.repeat(table_engine_l['d_com'], len(bins))).flatten() if not comoving: theta_bins *= ( 1 + np.repeat(table_engine_l['z'], len(bins))).flatten() dist_3d_sq_bins = np.minimum(4 * np.sin(theta_bins / 2.0)**2, 2.0) # When running in parrallel, replace numpy arrays with shared-memory # multiprocessing arrays. if n_jobs > 1: dist_3d_sq_bins = get_raw_multiprocessing_array(dist_3d_sq_bins) for table_engine in [table_engine_l, table_engine_s, table_engine_r]: for key in table_engine: table_engine[key] = get_raw_multiprocessing_array( table_engine[key]) # Create a queue that holds all the pixels containing lenses. q = queue.Queue() if n_jobs == 1 else mp.Queue() for i in range(len(u_pix_l)): q.put(i) args = (u_pix_l, n_pix_l, u_pix_s, n_pix_s, dist_3d_sq_bins, table_engine_l, table_engine_s, table_engine_r, bins, comoving, weighting, nside, q, progress_bar) if n_jobs == 1: precompute_engine(*args) else: processes = [] for i in range(n_jobs): process = mp.Process(target=precompute_engine, args=(*args, )) if i == 0: args = list(args) args[-1] = False args = tuple(args) process.start() processes.append(process) for i in range(n_jobs): processes[i].join() if n_jobs > 1: for key in table_engine_r: table_engine_r[key] = np.array(table_engine_r[key]) inv_argsort_pix_l = np.argsort(argsort_pix_l) for key in table_engine_r: table_l[key] = table_engine_r[key].reshape( len(table_l), len(bins) - 1)[inv_argsort_pix_l] table_l['sum w_ls z_l'] = table_l['z'][:, np.newaxis] * table_l['sum w_ls'] if table_c is not None: table_c_copy = table_c.copy() if 'z_l_max' not in table_c_copy.colnames: table_c_copy['z_l_max'] = table_c_copy['z'] f_bias = interpolate_over_redshift( photo_z_dilution_factor, table_l['z'], table_c_copy, cosmology, weighting=weighting) for key in ['sum w_ls sigma_crit', 'sum w_ls e_t sigma_crit']: table_l[f'{key} f_bias'] = f_bias[:, np.newaxis] * table_l[key] delta_z_s = interpolate_over_redshift( mean_photo_z_offset, table_l['z'], table_c_copy, cosmology, weighting=weighting) table_l['sum w_ls z_s'] = ( table_l['sum w_ls z_s'] - table_l['sum w_ls'] * delta_z_s[ :, np.newaxis]) table_l.meta['bins'] = bins table_l.meta['comoving'] = comoving table_l.meta['cosmology'] = cosmology.to_format('yaml') table_l.meta['weighting'] = weighting return table_l