Source code for hera_stats.shuffle

import numpy as np
from pyuvdata import UVData
import copy

[docs]def shuffle_data_redgrp(uvd, redgrps): """ Construct a new set of visibilities by randomly shuffling samples between baselines in a redundant group. Different random shuffles are performed at each frequency, time, and polarization. This creates a new set of visibilities that are made up of samples from a random combination of redundant baselines, but that do not mix or shuffle times, frequencies or polarizations. The samples are shuffled _without_ replacement, so each sample is only ever used once. Example: Original visibility (fixed time and pol, for freq. channels 1, 2, 3...): bl_a = [a1, a2, a3, a4, ...] bl_b = [b1, b2, b3, b4, ...] bl_c = [c1, c2, c3, c4, ...] Shuffled visibility (for example): new_a = [b1, a2, c3, b4, ...] new_b = [c1, c2, a3, a4, ...] new_c = [a1, b2, b3, c4, ...] Parameters ---------- uvd : UVData object Input visibilities. redgrps : list of lists of bls List of redundant baseline groups. Returns ------- uvd_new : UVData object Copy of `uvd` with baseline-shuffled visibilities. """ # Check validity of inputs if not isinstance(redgrps, list) \ or not isinstance(redgrps[0], (list, np.ndarray)): raise TypeError("redgrps must be a list of lists of baseline identifiers.") assert isinstance(uvd, UVData), "uvd must be a UVData object." # Make a copy of the UVPSpec object uvd_new = copy.deepcopy(uvd) # Get data types of arrays dtype = uvd.data_array.dtype # data array ftype = uvd.flag_array.dtype # flag array ntype = uvd.nsample_array.dtype # nsample array # Shuffle baselines in each redgrp for grp in redgrps: # Create array to store ushuffled data dshape = (uvd.Ntimes, len(grp), uvd.Nfreqs, uvd.Npols) orig_data = np.zeros(dshape, dtype=dtype) orig_flag = np.zeros(dshape, dtype=ftype) orig_nsamp = np.zeros(dshape, dtype=ntype) # Get data for each baseline in the group for b, key in enumerate(grp): orig_data[:,b,:,:] = uvd.get_data(key, squeeze='none')[:,0,:,:] orig_flag[:,b,:,:] = uvd.get_flags(key, squeeze='none')[:,0,:,:] orig_nsamp[:,b,:,:] = uvd.get_nsamples(key, squeeze='none')[:,0,:,:] # Create array to store shuffled data shuf_data = np.zeros(dshape, dtype=dtype) shuf_flag = np.zeros(dshape, dtype=ftype) shuf_nsamp = np.zeros(dshape, dtype=ntype) # Loop over times and freqs for p in range(uvd.Npols): for t in range(uvd.Ntimes): for i in range(uvd.Nfreqs): # Randomly shuffle data between bls in redundant group # (for each time, frequency, and polarization) idxs = np.random.permutation(np.arange(len(grp))) shuf_data[t,:,i,p] = orig_data[t,idxs,i,p] shuf_flag[t,:,i,p] = orig_flag[t,idxs,i,p] shuf_nsamp[t,:,i,p] = orig_nsamp[t,idxs,i,p] # Put shuffled data into new UVData object with the same shape etc. for k in range(len(grp)): # Get idxs for each bl in the group bl = grp[k] if not isinstance(bl, tuple): bl = uvd.baseline_to_antnums(bl) ant1, ant2 = bl idxs = uvd.antpair2ind(ant1, ant2) uvd_new.data_array[idxs,0,:,:] = shuf_data[:,k,:,:] uvd_new.flag_array[idxs,0,:,:] = shuf_flag[:,k,:,:] uvd_new.nsample_array[idxs,0,:,:] = shuf_nsamp[:,k,:,:] return uvd_new