# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
MAST Tesscut
============
Cutout queries on TESS FFIs.
"""
import warnings
import time
import json
import zipfile
import os
import requests
from io import BytesIO
import numpy as np
import astropy.units as u
from astropy.coordinates import Angle
from astropy.table import Table
from astropy.io import fits
from ..query import BaseQuery
from ..utils import commons
from ..exceptions import InputWarning, NoResultsWarning, InvalidQueryError, RemoteServiceError
from . import conf
from .utils import parse_input_location
from .core import MastQueryWithLogin
__all__ = ["TesscutClass", "Tesscut", "ZcutClass", "Zcut"]
def _parse_cutout_size(size):
"""
Take a user input cutout size and parse it into the regular format
[ny,nx] where nx/ny are quantities with units either pixels or degrees.
Parameters
----------
size : int, array-like, `~astropy.units.Quantity`
The size of the cutout array. If ``size`` is a scalar number or
a scalar `~astropy.units.Quantity`, then a square cutout of ``size``
will be created. If ``size`` has two elements, they should be in
``(ny, nx)`` order. Scalar numbers in ``size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
Returns
-------
response : array
Size array in the form [ny, nx] where nx/ny are quantities with units
either pixels or degrees.
"""
# Making size into an array [ny, nx]
if np.isscalar(size):
size = np.repeat(size, 2)
if isinstance(size, u.Quantity):
size = np.atleast_1d(size)
if len(size) == 1:
size = np.repeat(size, 2)
if len(size) > 2:
warnings.warn("Too many dimensions in cutout size, only the first two will be used.",
InputWarning)
# Getting x and y out of the size
if np.isscalar(size[0]):
x = size[1]
y = size[0]
units = "px"
elif size[0].unit == u.pixel:
x = size[1].value
y = size[0].value
units = "px"
elif size[0].unit.physical_type == 'angle':
x = size[1].to(u.deg).value
y = size[0].to(u.deg).value
units = "d"
else:
raise InvalidQueryError("Cutout size must be in pixels or angular quantity.")
return {"x": x, "y": y, "units": units}
[docs]class TesscutClass(MastQueryWithLogin):
"""
MAST TESS FFI cutout query class.
Class for accessing TESS full-frame image cutouts.
"""
def __init__(self):
super().__init__()
services = {"sector": {"path": "sector"},
"astrocut": {"path": "astrocut"}}
self._service_api_connection.set_service_params(services, "tesscut")
[docs] def get_sectors(self, coordinates=None, radius=0*u.deg, objectname=None):
"""
Get a list of the TESS data sectors whose footprints intersect
with the given search area.
Parameters
----------
coordinates : str or `astropy.coordinates` object, optional
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
One and only one of coordinates and objectname must be supplied.
radius : str, float, or `~astropy.units.Quantity` object, optional
Default 0 degrees.
If supplied as a float degrees is the assumed unit.
The string must be parsable by `~astropy.coordinates.Angle`. The
appropriate `~astropy.units.Quantity` object from
`astropy.units` may also be used.
objectname : str, optional
The target around which to search, by name (objectname="M104")
or TIC ID (objectname="TIC 141914082").
One and only one of coordinates and objectname must be supplied.
Returns
-------
response : `~astropy.table.Table`
Sector/camera/chip information for given coordinates/raduis.
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates, objectname)
# If radius is just a number we assume degrees
radius = Angle(radius, u.deg)
params = {"ra": coordinates.ra.deg,
"dec": coordinates.dec.deg,
"radius": radius.deg}
response = self._service_api_connection.service_request_async("sector", params)
response.raise_for_status() # Raise any errors
sector_json = response.json()['results']
sector_dict = {'sectorName': [],
'sector': [],
'camera': [],
'ccd': []}
for entry in sector_json:
sector_dict['sectorName'].append(entry['sectorName'])
sector_dict['sector'].append(int(entry['sector']))
sector_dict['camera'].append(int(entry['camera']))
sector_dict['ccd'].append(int(entry['ccd']))
if not len(sector_json):
warnings.warn("Coordinates are not in any TESS sector.", NoResultsWarning)
return Table(sector_dict)
[docs] def download_cutouts(self, coordinates=None, size=5, sector=None, path=".", inflate=True, objectname=None):
"""
Download cutout target pixel file(s) around the given coordinates with indicated size.
Parameters
----------
coordinates : str or `astropy.coordinates` object, optional
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
One and only one of coordinates and objectname must be supplied.
size : int, array-like, `~astropy.units.Quantity`
Optional, default 5 pixels.
The size of the cutout array. If ``size`` is a scalar number or
a scalar `~astropy.units.Quantity`, then a square cutout of ``size``
will be created. If ``size`` has two elements, they should be in
``(ny, nx)`` order. Scalar numbers in ``size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
sector : int
Optional.
The TESS sector to return the cutout from. If not supplied, cutouts
from all available sectors on which the coordinate appears will be returned.
path : str
Optional.
The directory in which the cutouts will be saved.
Defaults to current directory.
inflate : bool
Optional, default True.
Cutout target pixel files are returned from the server in a zip file,
by default they will be inflated and the zip will be removed.
Set inflate to false to stop before the inflate step.
objectname : str, optional
The target around which to search, by name (objectname="M104")
or TIC ID (objectname="TIC 141914082").
One and only one of coordinates and objectname must be supplied.
Returns
-------
response : `~astropy.table.Table`
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates, objectname)
size_dict = _parse_cutout_size(size)
path = os.path.join(path, '')
astrocut_request = "ra={}&dec={}&y={}&x={}&units={}".format(coordinates.ra.deg,
coordinates.dec.deg,
size_dict["y"],
size_dict["x"],
size_dict["units"])
if sector:
astrocut_request += "§or={}".format(sector)
astrocut_url = self._service_api_connection.REQUEST_URL + "astrocut?" + astrocut_request
zipfile_path = "{}tesscut_{}.zip".format(path, time.strftime("%Y%m%d%H%M%S"))
self._download_file(astrocut_url, zipfile_path)
localpath_table = Table(names=["Local Path"], dtype=[str])
# Checking if we got a zip file or a json no results message
if not zipfile.is_zipfile(zipfile_path):
with open(zipfile_path, 'r') as FLE:
response = json.load(FLE)
warnings.warn(response['msg'], NoResultsWarning)
return localpath_table
if not inflate: # not unzipping
localpath_table['Local Path'] = [zipfile_path]
return localpath_table
print("Inflating...")
# unzipping the zipfile
zip_ref = zipfile.ZipFile(zipfile_path, 'r')
cutout_files = zip_ref.namelist()
zip_ref.extractall(path, members=cutout_files)
zip_ref.close()
os.remove(zipfile_path)
localpath_table['Local Path'] = [path+x for x in cutout_files]
return localpath_table
[docs] def get_cutouts(self, coordinates=None, size=5, sector=None, objectname=None):
"""
Get cutout target pixel file(s) around the given coordinates with indicated size,
and return them as a list of `~astropy.io.fits.HDUList` objects.
Parameters
----------
coordinates : str or `astropy.coordinates` object, optional
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
One and only one of coordinates and objectname must be supplied.
size : int, array-like, `~astropy.units.Quantity`
Optional, default 5 pixels.
The size of the cutout array. If ``size`` is a scalar number or
a scalar `~astropy.units.Quantity`, then a square cutout of ``size``
will be created. If ``size`` has two elements, they should be in
``(ny, nx)`` order. Scalar numbers in ``size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
sector : int
Optional.
The TESS sector to return the cutout from. If not supplied, cutouts
from all available sectors on which the coordinate appears will be returned.
objectname : str, optional
The target around which to search, by name (objectname="M104")
or TIC ID (objectname="TIC 141914082").
One and only one of coordinates and objectname must be supplied.
Returns
-------
response : A list of `~astropy.io.fits.HDUList` objects.
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates, objectname)
param_dict = _parse_cutout_size(size)
param_dict["ra"] = coordinates.ra.deg
param_dict["dec"] = coordinates.dec.deg
if sector:
param_dict["sector"] = sector
response = self._service_api_connection.service_request_async("astrocut", param_dict)
response.raise_for_status() # Raise any errors
try:
ZIPFILE = zipfile.ZipFile(BytesIO(response.content), 'r')
except zipfile.BadZipFile:
message = response.json()
warnings.warn(message['msg'], NoResultsWarning)
return []
# Open all the contained fits files:
# Since we cannot seek on a compressed zip file,
# we have to read the data, wrap it in another BytesIO object,
# and then open that using fits.open
cutout_hdus_list = []
for name in ZIPFILE.namelist():
CUTOUT = BytesIO(ZIPFILE.open(name).read())
cutout_hdus_list.append(fits.open(CUTOUT))
# preserve the original filename in the fits object
cutout_hdus_list[-1].filename = name
return cutout_hdus_list
Tesscut = TesscutClass()
[docs]class ZcutClass(MastQueryWithLogin):
"""
MAST ZCUT cutout query class.
Class for accessing deep field full-frame image cutouts.
"""
def __init__(self):
super().__init__()
self.accepted_img_params = ['stretch', 'minmax_percent', 'minmax_value', 'invert']
services = {"survey": {"path": "survey"},
"astrocut": {"path": "astrocut"}}
self._service_api_connection.set_service_params(services, "zcut")
[docs] def get_surveys(self, coordinates, radius="0d"):
"""
Gives a list of deep field surveys available for a position in the sky
Parameters
----------
coordinates : str or `astropy.coordinates` object
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
radius : str, float, or `~astropy.units.Quantity` object, optional
Default 0 degrees.
If supplied as a float, degrees is the assumed unit.
The string must be parsable by `~astropy.coordinates.Angle`. The
appropriate `~astropy.units.Quantity` object from
`astropy.units` may also be used.
Returns
-------
response : list
List of available deep field surveys at the given coordinates.
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates)
radius = Angle(radius, u.deg)
params = {"ra": coordinates.ra.deg,
"dec": coordinates.dec.deg,
"radius": radius.deg}
response = self._service_api_connection.service_request_async("survey", params)
response.raise_for_status() # Raise any errors
survey_json = response.json()['surveys']
if not len(survey_json):
warnings.warn("Coordinates are not in an available deep field survey.", NoResultsWarning)
return survey_json
[docs] def download_cutouts(self, coordinates, size=5, survey=None, cutout_format="fits", path=".", inflate=True, **img_params):
"""
Download cutout FITS/image file(s) around the given coordinates with indicated size.
Parameters
----------
coordinates : str or `astropy.coordinates` object
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
size : int, array-like, `~astropy.units.Quantity`
Optional, default 5 pixels.
The size of the cutout array. If ``size`` is a scalar number or
a scalar `~astropy.units.Quantity`, then a square cutout of ``size``
will be created. If ``size`` has two elements, they should be in
``(ny, nx)`` order. Scalar numbers in ``size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
survey : str
Optional
The survey to restrict the cutout. The survey parameter will restrict to
only the matching survey. Default behavior is to return all matched surveys.
cutout_format : str
Optional
The cutout file format. Default is fits, valid options are fits, jpg, png.
path : str
Optional.
The directory in which the cutouts will be saved.
Defaults to current directory.
inflate : bool
Optional, default True.
Cutout target pixel files are returned from the server in a zip file,
by default they will be inflated and the zip will be removed.
Set inflate to false to stop before the inflate step.
**img_params : dict
Optional, only used if format is jpg or png
Valid parameters are stretch, minmax_percent, minmax_value, and invert.
These arguments are documented here:
https://astrocut.readthedocs.io/en/latest/api/astrocut.img_cut.html
The Column Name is the keyword, with the argument being one or more acceptable
values for that parameter, except for fields with a float datatype where the
argument should be in the form [minVal, maxVal].
Returns
-------
response : `~astropy.table.Table`
Cutout file(s) for given coordinates
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates)
size_dict = _parse_cutout_size(size)
path = os.path.join(path, '')
astrocut_request = "ra={}&dec={}&y={}&x={}&units={}".format(coordinates.ra.deg,
coordinates.dec.deg,
size_dict["y"],
size_dict["x"],
size_dict["units"])
if survey:
astrocut_request += "&survey={}".format(survey)
astrocut_request += "&format={}".format(cutout_format)
for key in img_params:
if key in self.accepted_img_params:
astrocut_request += "&{}={}".format(key, img_params[key])
astrocut_url = self._service_api_connection.REQUEST_URL + "astrocut?" + astrocut_request
zipfile_path = "{}zcut_{}.zip".format(path, time.strftime("%Y%m%d%H%M%S"))
self._download_file(astrocut_url, zipfile_path)
localpath_table = Table(names=["Local Path"], dtype=[str])
if not zipfile.is_zipfile(zipfile_path):
with open(zipfile_path, 'r') as FLE:
response = json.load(FLE)
warnings.warn(response['msg'], NoResultsWarning)
return localpath_table
if not inflate: # not unzipping
localpath_table['Local Path'] = [zipfile_path]
return localpath_table
print("Inflating...")
# unzipping the zipfile
zip_ref = zipfile.ZipFile(zipfile_path, 'r')
cutout_files = zip_ref.namelist()
zip_ref.extractall(path, members=cutout_files)
zip_ref.close()
os.remove(zipfile_path)
localpath_table['Local Path'] = [path+x for x in cutout_files]
return localpath_table
[docs] def get_cutouts(self, coordinates, size=5, survey=None):
"""
Get cutout FITS file(s) around the given coordinates with indicated size,
and return them as a list of `~astropy.io.fits.HDUList` objects.
Parameters
----------
coordinates : str or `astropy.coordinates` object
The target around which to search. It may be specified as a
string or as the appropriate `astropy.coordinates` object.
One and only one of coordinates and objectname must be supplied.
size : int, array-like, `~astropy.units.Quantity`
Optional, default 5 pixels.
The size of the cutout array. If ``size`` is a scalar number or
a scalar `~astropy.units.Quantity`, then a square cutout of ``size``
will be created. If ``size`` has two elements, they should be in
``(ny, nx)`` order. Scalar numbers in ``size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
survey : str
Optional
The survey to restrict the cutout. The survey parameter will restrict to
only the matching survey. Default behavior is to return all matched surveys.
Returns
-------
response : A list of `~astropy.io.fits.HDUList` objects.
Cutoutfiles for given coordinates
"""
# Get Skycoord object for coordinates/object
coordinates = parse_input_location(coordinates)
param_dict = _parse_cutout_size(size)
param_dict["ra"] = coordinates.ra.deg
param_dict["dec"] = coordinates.dec.deg
if survey:
param_dict["survey"] = survey
response = self._service_api_connection.service_request_async("astrocut", param_dict)
response.raise_for_status() # Raise any errors
try:
ZIPFILE = zipfile.ZipFile(BytesIO(response.content), 'r')
except zipfile.BadZipFile:
message = response.json()
warnings.warn(message['msg'], NoResultsWarning)
return []
# Open all the contained fits files:
# Since we cannot seek on a compressed zip file,
# we have to read the data, wrap it in another BytesIO object,
# and then open that using fits.open
cutout_list = []
for name in ZIPFILE.namelist():
CUTOUT = BytesIO(ZIPFILE.open(name).read())
cutout_list.append(fits.open(CUTOUT))
# preserve the original filename in the fits object
cutout_list[-1].filename = name
return cutout_list
Zcut = ZcutClass()