Source code for hera_stats.split

import numpy as np
import hera_pspec as hp
from pyuvdata import UVData
import copy
import astropy.coordinates as coords
from . import utils


def _stripe_array(x, stripes, width=1):
    """
    Split 1D array into a number of stripes of given width.
    
    Parameters
    ----------
    x : array_like
        Input array of values to be striped.
    
    stripes : int
        Number of stripes.
    
    width : int, optional
        Width of each stripe. Default: 1.
    
    Returns
    -------
    x_stripes : list of array_like
        List containing each stripe of the input array.
    """
    blocks = []
    # Loop over no. of stripes
    for i in range(stripes): 
        x_stripe = []
        for j in range(width): # loop over width of stripe 
            x_stripe = np.concatenate((x_stripe, 
                                       x[i*(width)+j::stripes*width]))
        blocks.append( np.sort(x_stripe) )
    return blocks


[docs]def lst_blocks(uvp, blocks=2, lst_range=(0., 2.*np.pi)): """ Split a UVPSpec object into multiple objects, each containing spectra within different contiguous LST ranges. There is no guarantee that each block will contain the same number of spectra or samples. N.B. This function uses the `lst_avg_array` property of an input UVPSpec object to split the LSTs (and not the LSTs of the individual visibilities that went into creating each delay spectrum). Parameters ---------- uvp : UVPSpec Object containing delay spectra. blocks : int, optional How many blocks to return. Default: 2. lst_range : tuple, optional Tuple containing the minimum and maximum LST to retain. This is the range that will be split up into blocks. Default: (0., 2*pi) Returns ------- uvp_list : list of UVPSpec List of UVPSpec objects, one for each LST range. Empty blocks will appear as None in the list. lst_bins : array_like Array of LST bin edges. This has dimension (blocks+1,). """ # Check validity of inputs if not isinstance(uvp, hp.UVPSpec): raise TypeError("uvp must be a single UVPSpec object.") if not (lst_range[0] >= 0. and lst_range[1] <= 2.*np.pi): raise ValueError("lst_range must be in the interval (0, 2*pi)") if not isinstance(blocks, (int, np.int, np.integer)): raise TypeError("'blocks' must be an integer") if not blocks > 0: raise ValueError("Must have blocks >= 1") # Get LSTs lsts = np.unique(uvp.lst_avg_array) # Define bin edges lst_bins = np.linspace(lst_range[0], lst_range[1], blocks+1) # Loop over bins and select() the LST ranges required uvp_list = [] for i in range(lst_bins.size - 1): idxs = np.where( np.logical_and(lsts >= lst_bins[i], lsts < lst_bins[i+1]) )[0] _uvp = None if idxs.size > 0: # Select LSTs in this range _uvp = uvp.select(lsts=lsts[idxs], inplace=False) uvp_list.append(_uvp) return uvp_list, lst_bins
[docs]def lst_stripes(uvp, stripes=2, width=1, lst_range=(0., 2.*np.pi)): """ Split a UVPSpec object into multiple UVPSpec objects, each containing spectra within alternating stripes of LST. N.B. Gaps in LST are ignored; this function stripes based on the ordered list of available LSTs in `uvp` only. N.B. This function uses the `lst_avg_array` property of the input UVPSpec object to split the LSTs (and not the LSTs of the individual visibilities that went into creating each delay spectrum). Parameters ---------- uvp : UVPSpec Object containing delay spectra. stripes : int, optional How many stripes to return. Default: 2. width : int, optional Width of each stripe, in number of LST bins. Default: 1. lst_range : tuple, optional Tuple containing the minimum and maximum LST to retain. This is the range that will be split up into blocks. Default: (0., 2*pi) Returns ------- uvp_list : list of UVPSpec List of UVPSpec objects, one for each LST range. """ # Check validity of inputs if not isinstance(uvp, hp.UVPSpec): raise TypeError("uvp must be a single UVPSpec object.") if not (lst_range[0] >= 0. and lst_range[1] <= 2.*np.pi): raise ValueError("lst_range must be in the interval (0, 2*pi)") if not isinstance(stripes, (int, np.int, np.integer)): raise TypeError("'stripes' must be an integer") assert stripes > 0, "Must have stripes >= 1" assert width > 0, "Must have width >= 1" # Get sorted list of LSTs within the specified range lsts = np.unique(uvp.lst_avg_array) idxs = np.where( np.logical_and(lsts >= lst_range[0], lsts < lst_range[1]) ) lsts = np.sort(lsts[idxs]) # Loop over bins and select() the LST ranges required uvp_list = [] lst_stripes = _stripe_array(lsts, stripes=stripes, width=width) for lstripe in lst_stripes: _uvp = uvp.select(lsts=lstripe, inplace=False) uvp_list.append(_uvp) return uvp_list
[docs]def hour_angle(uvp, bins_list, specify_bins=False, bls=None): """ Splits UVPSpec based on the galactic hour-angle at the time of measurement. Parameters ---------- uvp: list or UVPSpec List or single hera_pspec.UVPSpec object, containing data to use. bins_list: list One entry for each bin layout, default is that it must be an integer, where min and max values for hourangle values will automatically be set as limits. If specify_bins is True, then the inpud must be a list of ndarrays. specify_bins: boolean If true, allows bins_list to be specified as a list of the bins themselves. Default: False bls: list of tuples, optional The baselines to use in in omitting antenna. If None, uses all baselines. Default: None. Returns ------- uvpl: list of UVPSpec pairs The resulting data, one list per jackknife. """ uvp = copy.deepcopy(uvp) if isinstance(uvp, list): if len(uvp) > 1: uvp = hp.uvpspec.combine_uvpspec(uvp) else: uvp = uvp[0] assert isinstance(uvp, hp.UVPSpec), \ "Expected uvp to be list or UVPSpec, not {}".format(type(uvp).__name__) if specify_bins: assert np.asarray(bins_list).ndim == 2, \ "Expected bins to be a list of lists." else: assert np.asarray(bins_list).ndim == 1, \ "Expected bins to be a list of antenna numbers." if bls is not None: uvp.select(bls=bls, inplace=True) # Create reference lst -> time_avg dictionary (for sorting) ref = dict(list(zip(uvp.lst_avg_array, uvp.time_avg_array))) rads = np.unique(uvp.lst_avg_array) # Get telescope location information R = np.sqrt(sum(uvp.telescope_location**2)) lat = np.arcsin(uvp.telescope_location[2]/R) * 180. / np.pi # Convert lst to gha norms = coords.SkyCoord(rads, lat, unit=["rad", "deg"]) gha = norms.transform_to(coords.builtin_frames.Galactic) uvpl = [] for bins in bins_list: # If bins is an integer, use bin wrapping function if isinstance(bins, int): bins = utils.bin_wrap(gha.l.deg, bins) inrange = [] for i in range(len(bins) - 1): # Check if gha_range = (bins[i], bins[i + 1]) val = np.array([utils.is_in_wrap(bins[i], bins[i + 1], deg) for deg in gha.l.deg]) jdays = [ref[rads[i]] for i in range(len(rads)) if val[i] == True] if sum(jdays) == 0: raise AttributeError("No times found in one or more of the " "bins specified.") _uvp = uvp.select(times=jdays, inplace=False) # Set metadata angs = gha.l.deg[val] angs_180 = (angs + 180) % 360 if np.std(angs) <= np.std(angs_180): _uvp.labels = np.array([np.average(angs)]) else: _uvp.labels = np.array([(np.average(angs_180) - 180) % 360]) inrange.append(_uvp) uvpl.append(inrange) return uvpl
#*********#
[docs]def omit_ants(uvp, ant_nums=None, bls=None): """ Splits UVPSpecs into groups, omitting one antenna from each. uvp: UVPSpec or list Single UVPSpec or list of UVPSpecs to use in splitting. ant_nums: list, optional A list containing integers, each entry will generate one UVPSpec which does not contain the antenna specified. bls: list of tuples, optional The baselines to use in in omitting antenna. If None, uses all baselines. Default: None. Returns ------- uvp_list: list of UVPSpecs A list containing one list of UVPSpecs, with one for every ant_num specified. """ uvp = copy.deepcopy(uvp) # Check if uvp is individual UVPSpec. If not, combine list. if isinstance(uvp, (list, tuple, np.ndarray)): if len(uvp) != 1: uvp = hp.uvpspec.combine_uvpspec(uvp) else: uvp = uvp[0] # Set up ant_nums if isinstance(ant_nums, (list, tuple, np.ndarray)): ant_nums = list(ant_nums) elif isinstance(ant_nums, int): ant_nums = [ant_nums] elif ant_nums == None: ant_nums = np.unique(uvp.get_blpairs()) else: raise AssertionError("Expected ant_nums to be list or int, not {}".format(type(ant_nums).__name__)) assert isinstance(uvp, hp.UVPSpec), \ "Expected uvp to be hera_pspec.UVPSpec, not {}".format(type(uvp).__name__) if bls is None: # Get all baseline pairs blpairs = uvp.get_blpairs() # Find unique baselines bls = [] [[bls.append(b) for b in blp if b not in bls] for blp in blpairs] unique_ants = np.unique(bls) bl_list = [] for ant in ant_nums: if ant not in unique_ants: raise AttributeError("No data for antenna {} found.".format(ant)) valid = [bl for bl in bls if ant not in bl] bl_list.append(valid) minlen = min([len(bll) for bll in bl_list]) uvp_list = [] for i, bl in enumerate(bl_list): inds = np.random.choice(list(range(len(bl))), minlen, replace=False) bl_i = np.array(list(map(uvp.antnums_to_bl, bl)))[inds] uvp1 = uvp.select(bls=bl_i, inplace=False) uvp1.labels = np.array([ant_nums[i]]) uvp1.jktype = "omit_ants" uvp_list.append(uvp1) return [uvp_list]
[docs]def blps_by_antnum(blps, split='norepeat'): """ Split a list of redundant groups of baseline-pairs into two, depending on whether there are repeated antennas in the baseline pair (`norepeat`), or whether there are auto baselines in the pair (`noautos`). Parameters ---------- blps : list of list of blpairs List of redundant groups of baseline-pairs. The blps can be either tuples of tuples, or blpair integers. split : str, optional Type of split to perform on each baseline group. Available options are: - 'norepeat': Split into one group where antennas are used at most once per blpair, and another where they are used more than once. - 'noautos': Split into one group with auto-blpairs and one group with non-autos (but antennas can be used more than once per blpair). Returns ------- blps_a, blps_b : list of list of blpairs List of redundant groups of baseline-pairs. - For 'norepeat', group A contains the blps *without* repeated antennas, while group B contains the blps *with* repeated antennas. - For 'noautos', group A contains the blps *without* auto-baselines, while group B contains the blps *with* auto-baselines. """ # Check that input blps is list of lists if not isinstance(blps[0], list): raise TypeError("blps must be a list of lists of baseline-pairs") # Check for valid split type if split not in ['norepeat', 'noautos']: raise ValueError("Split type '%s' not recognized." % split) # Loop over redundant groups blps_a, blps_b = [], [] for grp in blps: # Loop over baseline-pairs grp_a, grp_b = [], [] for blp in grp: # Convert into antnum pairs if not already if isinstance(blp, int): blp = hp.uvputils._blpair_to_antnums(blp) # Split according to whether antenna numbers are repeated if split == 'norepeat': # Split out if np.unique(blp).size == 4: grp_a.append(blp) # without repeated antennas else: grp_b.append(blp) # with repeated antennas elif split == 'noautos': # Split out blpairs being multiplied by themselves if sorted(blp[0]) == sorted(blp[1]): grp_b.append(blp) # with repeated baselines else: grp_a.append(blp) # without repeated baselines blps_a.append(grp_a) blps_b.append(grp_b) return blps_a, blps_b