Source code for astroquery.alma.utils

"""
Utilities for making finder charts and overlay images for ALMA proposing
"""
import string
import os

import numpy as np

from astropy import wcs
from astroquery import log
from astropy import units as u
from astropy.io import fits
from astroquery.utils.commons import ASTROPY_LT_4_1

from astroquery.skyview import SkyView
from astroquery.alma import Alma


[docs]def pyregion_subset(region, data, mywcs): """ Return a subset of an image (``data``) given a region. Parameters ---------- region : `~pyregion.Shape` A Shape from a pyregion-parsed region file data : np.ndarray An array with shape described by WCS mywcs : `astropy.wcs.WCS` A world coordinate system describing the data """ import pyregion shapelist = pyregion.ShapeList([region]) if shapelist[0].coord_format not in ('physical', 'image'): celhdr = mywcs.sub([wcs.WCSSUB_CELESTIAL]).to_header() pixel_regions = shapelist.as_imagecoord(celhdr) else: # For this to work, we'd need to change the reference pixel after # cropping. Alternatively, we can just make the full-sized # mask... todo.... raise NotImplementedError("Can't use non-celestial coordinates " "with regions.") pixel_regions = shapelist # This is a hack to use mpl to determine the outer bounds of the regions # (but it's a legit hack - pyregion needs a major internal refactor # before we can approach this any other way, I think -AG) mpl_objs = pixel_regions.get_mpl_patches_texts()[0] # Find the minimal enclosing box containing all of the regions # (this will speed up the mask creation below) extent = mpl_objs[0].get_extents() xlo, ylo = extent.min xhi, yhi = extent.max all_extents = [obj.get_extents() for obj in mpl_objs] for ext in all_extents: xlo = int(np.round(xlo if xlo < ext.min[0] else ext.min[0])) ylo = int(np.round(ylo if ylo < ext.min[1] else ext.min[1])) xhi = int(np.round(xhi if xhi > ext.max[0] else ext.max[0])) yhi = int(np.round(yhi if yhi > ext.max[1] else ext.max[1])) log.debug("Region boundaries: ") log.debug("xlo={xlo}, ylo={ylo}, xhi={xhi}, yhi={yhi}".format(xlo=xlo, ylo=ylo, xhi=xhi, yhi=yhi)) subwcs = mywcs[int(ylo):int(yhi), int(xlo):int(xhi)] subhdr = subwcs.sub([wcs.WCSSUB_CELESTIAL]).to_header() subdata = data[int(ylo):int(yhi), int(xlo):int(xhi)] mask = shapelist.get_mask(header=subhdr, shape=subdata.shape) log.debug("Shapes: data={0}, subdata={2}, mask={1}" .format(data.shape, mask.shape, subdata.shape)) return (xlo, xhi, ylo, yhi), mask
[docs]def parse_frequency_support(frequency_support_str): """ ALMA "Frequency Support" strings have the form: [100.63..101.57GHz,488.28kHz, XX YY] U [102.43..103.37GHz,488.28kHz, XX YY] U [112.74..113.68GHz,488.28kHz, XX YY] U [114.45..115.38GHz,488.28kHz, XX YY] at least, as far as we have seen. The "U" is meant to be the Union symbol. This function will parse such a string into a list of pairs of astropy Quantities representing the frequency range. It will ignore the resolution and polarizations. """ if not isinstance(frequency_support_str, str): supports = frequency_support_str.tostring().decode('ascii').split('U') else: supports = frequency_support_str.split('U') freq_ranges = [(float(sup[0]), float(sup[1].split(',')[0].strip(string.ascii_letters))) * u.Unit(sup[1].split(',')[0].strip(string.punctuation + string.digits)) for i in supports for sup in [i.strip('[] ').split('..'), ]] return u.Quantity(freq_ranges)
[docs]def footprint_to_reg(footprint): """ ALMA footprints have the form: 'Polygon ICRS 266.519781 -28.724666 266.524678 -28.731930 266.536683 -28.737784 266.543860 -28.737586 266.549277 -28.733370 266.558133 -28.729545 266.560136 -28.724666 266.558845 -28.719605 266.560133 -28.694332 266.555234 -28.687069 266.543232 -28.681216 266.536058 -28.681414 266.530644 -28.685630 266.521788 -28.689453 266.519784 -28.694332 266.521332 -28.699778' Some of them have *additional* polygons """ if ASTROPY_LT_4_1: footprint = footprint.decode('utf-8') if footprint[:7] != 'Polygon' and footprint[:6] != 'Circle': raise ValueError("Unrecognized footprint type") from pyregion.parser_helper import Shape reglist = [] entries = footprint.split() if entries[0] == 'Circle': reg = Shape('circle', [float(x) for x in entries[1:] if x != 'ICRS']) reg.coord_format = 'icrs' reg.coord_list = reg.params # the coord_list attribute is needed somewhere reg.attr = ([], {'color': 'green', 'dash': '0 ', 'dashlist': '8 3', 'delete': '1 ', 'edit': '1 ', 'fixed': '0 ', 'font': '"helvetica 10 normal roman"', 'highlite': '1 ', 'include': '1 ', 'move': '1 ', 'select': '1', 'source': '1', 'text': '', 'width': '1 '}) reglist.append(reg) else: polygons = [index for index, entry in enumerate(entries) if entry == 'Polygon'] for start, stop in zip(polygons, polygons[1:]+[None]): reg = Shape('polygon', [float(x) for x in entries[start+1:stop] if x != 'ICRS']) reg.coord_format = 'icrs' reg.coord_list = reg.params # the coord_list attribute is needed somewhere reg.attr = ([], {'color': 'green', 'dash': '0 ', 'dashlist': '8 3', 'delete': '1 ', 'edit': '1 ', 'fixed': '0 ', 'font': '"helvetica 10 normal roman"', 'highlite': '1 ', 'include': '1 ', 'move': '1 ', 'select': '1', 'source': '1', 'text': '', 'width': '1 '}) reglist.append(reg) return reglist
[docs]def add_meta_to_reg(reg, meta): reg.meta = meta return reg
[docs]def approximate_primary_beam_sizes(frequency_support_str, dish_diameter=12 * u.m, first_null=1.220): r""" Using parse_frequency_support, determine the mean primary beam size in each observed band Parameters ---------- frequency_support_str : str The frequency support string, see `parse_frequency_support` dish_diameter : `~astropy.units.Quantity` Meter-equivalent unit. The diameter of the dish. first_null : float The position of the first null of an Airy. Used to compute resolution as :math:`R = 1.22 \lambda/D` """ freq_ranges = parse_frequency_support(frequency_support_str) beam_sizes = [(first_null * fr.mean().to(u.m, u.spectral()) / (dish_diameter)).to(u.arcsec, u.dimensionless_angles()) for fr in freq_ranges] return u.Quantity(beam_sizes)
[docs]def make_finder_chart(target, radius, save_prefix, service=SkyView.get_images, service_kwargs={'survey': ['2MASS-K'], 'pixels': 500}, alma_kwargs={'public': False, 'science': False}, **kwargs): """ Create a "finder chart" showing where ALMA has pointed in various bands, including different color coding for public/private data and each band. Contours are set at various integration times. Parameters ---------- target : `astropy.coordinates` or str A legitimate target name radius : `~astropy.units.Quantity` A degree-equivalent radius. save_prefix : str The prefix for the output files. Both .reg and .png files will be written. The .reg files will have the band numbers and public/private appended, while the .png file will be named prefix_almafinderchart.png service : function The ``get_images`` function of an astroquery service, e.g. SkyView. service_kwargs : dict The keyword arguments to pass to the specified service. For example, for SkyView, you can give it the survey ID (e.g., 2MASS-K) and the number of pixels in the resulting image. See the documentation for the individual services for more details. alma_kwargs : dict Keywords to pass to the ALMA archive when querying. private_band_colors / public_band_colors : tuple A tuple or list of colors to be associated with private/public observations in the various bands integration_time_contour_levels : list or np.array The levels at which to draw contours in units of seconds. Default is log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.]) """ log.info("Querying {0} for images".format(service)) images = service(target, radius=radius, **service_kwargs) image0_hdu = images[0][0] return make_finder_chart_from_image(image0_hdu, target=target, radius=radius, save_prefix=save_prefix, alma_kwargs=alma_kwargs, **kwargs)
[docs]def make_finder_chart_from_image(image, target, radius, save_prefix, alma_kwargs={'public': False, 'science': False, 'cache': False, }, **kwargs): """ Create a "finder chart" showing where ALMA has pointed in various bands, including different color coding for public/private data and each band. Contours are set at various integration times. Parameters ---------- image : fits.PrimaryHDU or fits.ImageHDU object The image to overlay onto target : `astropy.coordinates` or str A legitimate target name radius : `astropy.units.Quantity` A degree-equivalent radius save_prefix : str The prefix for the output files. Both .reg and .png files will be written. The .reg files will have the band numbers and public/private appended, while the .png file will be named prefix_almafinderchart.png alma_kwargs : dict Keywords to pass to the ALMA archive when querying. private_band_colors / public_band_colors : tuple A tuple or list of colors to be associated with private/public observations in the various bands integration_time_contour_levels : list or np.array The levels at which to draw contours in units of seconds. Default is log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.]) """ log.info("Querying ALMA around {0}".format(target)) catalog = Alma.query_region(coordinate=target, radius=radius, get_html_version=True, **alma_kwargs) return make_finder_chart_from_image_and_catalog(image, catalog=catalog, save_prefix=save_prefix, **kwargs)
[docs]def make_finder_chart_from_image_and_catalog(image, catalog, save_prefix, alma_kwargs={'public': False, 'science': False}, bands=(3, 4, 5, 6, 7, 8, 9, 10), private_band_colors=('maroon', 'red', 'orange', 'coral', 'brown', 'yellow', 'mediumorchid', 'palegoldenrod', ), public_band_colors=('blue', 'cyan', 'green', 'turquoise', 'teal', 'darkslategrey', 'chartreuse', 'lime', ), integration_time_contour_levels=np.logspace(0, 5, base=2, num=6), save_masks=False, use_saved_masks=False, linewidth=1, ): """ Create a "finder chart" showing where ALMA has pointed in various bands, including different color coding for public/private data and each band. Contours are set at various integration times. Parameters ---------- image : fits.PrimaryHDU or fits.ImageHDU object The image to overlay onto catalog : astropy.Table object The catalog of ALMA observations save_prefix : str The prefix for the output files. Both .reg and .png files will be written. The .reg files will have the band numbers and public/private appended, while the .png file will be named prefix_almafinderchart.png alma_kwargs : dict Keywords to pass to the ALMA archive when querying. private_band_colors / public_band_colors : tuple A tuple or list of colors to be associated with private/public observations in the various bands integration_time_contour_levels : list or np.array The levels at which to draw contours in units of seconds. Default is log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.]) """ import aplpy import pyregion all_bands = bands bands = used_bands = [band for band in np.unique(catalog['band_list'])] log.info("The bands used include: {0}".format(used_bands)) band_colors_priv = dict(zip(all_bands, private_band_colors)) band_colors_pub = dict(zip(all_bands, public_band_colors)) log.info("Color map private: {0}".format(band_colors_priv)) log.info("Color map public: {0}".format(band_colors_pub)) if use_saved_masks: hit_mask_public = {} hit_mask_private = {} for band in bands: pubfile = '{0}_band{1}_public.fits'.format(save_prefix, band) if os.path.exists(pubfile): hit_mask_public[band] = fits.getdata(pubfile) privfile = '{0}_band{1}_private.fits'.format(save_prefix, band) if os.path.exists(privfile): hit_mask_private[band] = fits.getdata(privfile) else: today = np.datetime64('today') # At least temporarily obsolete # private_circle_parameters = { # band: [(row['RA'], row['Dec'], np.mean(rad).to(u.deg).value) # for row, rad in zip(catalog, primary_beam_radii) # if not row['Release date'] or # (np.datetime64(row['Release date']) > today and row['Band'] == band)] # for band in bands} # public_circle_parameters = { # band: [(row['RA'], row['Dec'], np.mean(rad).to(u.deg).value) # for row, rad in zip(catalog, primary_beam_radii) # if row['Release date'] and # (np.datetime64(row['Release date']) <= today and row['Band'] == band)] # for band in bands} # unique_private_circle_parameters = { # band: np.array(list(set(private_circle_parameters[band]))) # for band in bands} # unique_public_circle_parameters = { # band: np.array(list(set(public_circle_parameters[band]))) # for band in bands} release_dates = np.array(catalog['obs_release_date'], dtype=np.datetime64) for band in bands: log.info("BAND {0}".format(band)) privrows = sum((catalog['band_list'] == band) & (release_dates > today)) pubrows = sum((catalog['band_list'] == band) & (release_dates <= today)) log.info("PUBLIC: Number of rows: {0}".format(pubrows,)) log.info("PRIVATE: Number of rows: {0}.".format(privrows)) log.debug('Creating regions') prv_regions = { band: pyregion.ShapeList([add_meta_to_reg(fp, {'integration': row['t_exptime']}) for row in catalog for fp in footprint_to_reg(row['s_region']) if (not row['obs_release_date']) or (np.datetime64(row['obs_release_date']) > today and row['band_list'] == band)]) for band in bands} pub_regions = { band: pyregion.ShapeList([add_meta_to_reg(fp, {'integration': row['t_exptime']}) for row in catalog for fp in footprint_to_reg(row['s_region']) if row['obs_release_date'] and (np.datetime64(row['obs_release_date']) <= today and row['band_list'] == band)]) for band in bands} log.debug('Creating masks') prv_mask = { band: fits.PrimaryHDU( prv_regions[band].get_mask(image).astype('int'), header=image.header) for band in bands if prv_regions[band]} pub_mask = { band: fits.PrimaryHDU( pub_regions[band].get_mask(image).astype('int'), header=image.header) for band in bands if pub_regions[band]} hit_mask_public = {band: np.zeros_like(image.data) for band in pub_mask} hit_mask_private = {band: np.zeros_like(image.data) for band in prv_mask} mywcs = wcs.WCS(image.header) for band in bands: log.debug('Adding integration-scaled masks for Band: {0}'.format(band)) shapes = prv_regions[band] for shape in shapes: # private: release_date = 'sometime' says when it will be released (xlo, xhi, ylo, yhi), mask = pyregion_subset( shape, hit_mask_private[band], mywcs) log.debug("{0},{1},{2},{3}: {4}" .format(xlo, xhi, ylo, yhi, mask.sum())) hit_mask_private[band][ylo:yhi, xlo:xhi] += shape.meta['integration']*mask if save_masks: shapes.write('{0}_band{1}_private.reg'.format(save_prefix, band)) shapes = pub_regions[band] for shape in shapes: # public: release_date = '' should mean already released (xlo, xhi, ylo, yhi), mask = pyregion_subset( shape, hit_mask_public[band], mywcs) log.debug("{0},{1},{2},{3}: {4}" .format(xlo, xhi, ylo, yhi, mask.sum())) hit_mask_public[band][ylo:yhi, xlo:xhi] += shape.meta['integration']*mask if save_masks: shapes.write('{0}_band{1}_public.reg'.format(save_prefix, band)) if save_masks: for band in bands: if band in hit_mask_public: if hit_mask_public[band].any(): hdu = fits.PrimaryHDU(data=hit_mask_public[band], header=image.header) hdu.writeto('{0}_band{1}_public.fits'.format(save_prefix, band), clobber=True) if band in hit_mask_private: if hit_mask_private[band].any(): hdu = fits.PrimaryHDU(data=hit_mask_private[band], header=image.header) hdu.writeto('{0}_band{1}_private.fits'.format(save_prefix, band), clobber=True) fig = aplpy.FITSFigure(fits.HDUList(image), convention='calabretta') fig.show_grayscale(stretch='arcsinh', vmid=np.nanmedian(image.data)) for band in bands: if band in hit_mask_public: if hit_mask_public[band].any(): fig.show_contour(fits.PrimaryHDU(data=hit_mask_public[band], header=image.header), levels=integration_time_contour_levels, colors=[band_colors_pub[int(band)]] * len(integration_time_contour_levels), linewidth=linewidth, convention='calabretta') if band in hit_mask_private: if hit_mask_private[band].any(): fig.show_contour(fits.PrimaryHDU(data=hit_mask_private[band], header=image.header), levels=integration_time_contour_levels, colors=[band_colors_priv[int(band)]] * len(integration_time_contour_levels), linewidth=linewidth, convention='calabretta') fig.save('{0}_almafinderchart.png'.format(save_prefix)) return image, catalog, hit_mask_public, hit_mask_private