import os
import sys
import warnings
from functools import wraps
from astropy.utils.exceptions import AstropyWarning
from ..Configuration.epochs import EpochStruct
from ..StructureMethods.method_definitions import (exportplot, plot, savedata,
saveplot, showdata,
showplot)
warnings.simplefilter("ignore", category=AstropyWarning)
epochs = EpochStruct().epoch_list
[docs]
class ImageStruct(object):
"""ImageStruct()
This structure is returned from image queries, when read from a data file that was originally created by an image query, or through the Models module (in which case all attributes are set to None).
.. rubric:: Attributes
:heading-level: 1
kind : *str*
"image"
survey : *str*
survey to which the data belongs
source : *int*
Gaia source ID of target system (if given, else None)
pos : *list<float>*
Position of target system [right ascension, declination] in degrees
identifier : *str*
Position of target system in JHHMMSS.SS±DDMMSS.SS format
dataname : *str*
Default file for the :func:`savedata` method
plotname : *str*
Default file name for the :func:`showplot` and :func:`saveplot` methods
figure : *Bokeh figure*
Stores figures generated by the :func:`plot` method
data: *dict*
Returned data in format:
.. code-block:: python
"image_data": [raw image data]
"image_header": astropy image header
"size": image size (arcseconds)
"image_time": time at which image was taken ([year,month])
"wcs": astropy wcs
"image_focus": centre of image ([right ascension,declination], degrees)
"overlay": [data entries to overlay when plotting images]
|
"""
def __init__(self, survey, source, pos, data, identifier=None, trace=None):
self.kind = "image"
self.survey = survey
self.source = source
self.pos = pos
self.identifier = identifier
self.data = data
self.figure = None
self.dataname = None
self.plotname = None
self.trace = trace
def __str__(self):
return "<ATK Image Structure>"
[docs]
def plot(self, **kwargs):
"""
Plots data contained within a given data stucture and assigns the resulting figure to the data structure's 'figure' attribute.
**Additional Parameters:**
:param searchradius: search radius to use in SIMBAD when generating objcet IDs and search URLs in overlays. Defaults to value given in key 'overlay_simbad_search_radius' of :ref:`config <config-keys>`.
"""
return plot(self, **kwargs)
@wraps(showdata)
def showdata(self, pprint=True, print_methods=True):
showdata(self, pprint, print_methods)
return self
@wraps(savedata)
def savedata(self, fname=None):
fname = savedata(self, fname)
return fname
@wraps(showplot)
def showplot(self, fname=None):
fname = showplot(self, fname)
return fname
@wraps(saveplot)
def saveplot(self, fname=None):
fname = saveplot(self, fname)
return fname
@wraps(exportplot)
def exportplot(self, fname=None):
fname = exportplot(self, fname=fname)
return fname
class GeneralQuery(object):
"""Base class for image queries"""
def __init__(self, survey, size, band, pos):
self.survey = survey
self.size = size
self.band = band
self.pos = pos
self.url = ""
def get_image_data(self):
from io import BytesIO
import numpy as np
import requests
from astropy.io import fits
from astropy.visualization import AsinhStretch, PercentileInterval
# send request using the URL returned for the selected survey
try:
r = requests.get(self.url, timeout=15)
except:
print(f"Note: experiencing issues with {self.survey}.")
return None, None
if r.status_code != 200:
print(f"Note: experiencing issues with {self.survey}.")
return None, None
try:
fh = fits.open(BytesIO(r.content))
except:
print(f"Note: {self.survey} image query returned no data.")
return None, None
# read image fits file, apply a contrast filter
(fh[0].data)[np.isnan(fh[0].data)] = 0.0
transform = AsinhStretch() + PercentileInterval(95)
self.image_header = fh[0].header
self.image_data = transform(fh[0].data)
return self.image_data, self.image_header
@property
def image_time(self):
from astropy.time import Time
mjd = self.image_header["MJD-OBS"]
imageTime = Time(mjd, format="mjd").to_datetime()
imageTime = [imageTime.year, imageTime.month]
return imageTime
@property
def get_wcs(self):
from astropy.wcs import WCS
return WCS(self.image_header)
class PanstarrsQuery(GeneralQuery):
def set_url(self):
import numpy as np
from astropy.table import Table
url_size = self.size * 4
url = f"https://ps1images.stsci.edu/cgi-bin/ps1filenames.py?ra={self.pos[0]}&dec={self.pos[1]}&band={self.band}"
try:
# supresses panstarrs query stdout by directing it to null
with open(os.devnull, "w") as stdout_null:
stdout_sys = sys.stdout
sys.stdout = stdout_null
table = Table.read(url, format="ascii")
sys.stdout = stdout_sys
if not len(table) > 0:
print(f"Note: {self.survey} image query returned no data.")
return None
sub_url = f"https://ps1images.stsci.edu/cgi-bin/fitscut.cgi?ra={self.pos[0]}&dec={self.pos[1]}&size={url_size}&format=fits"
filter_list = ["yzirg".find(x) for x in table["filter"]]
sub_table = table[np.argsort(filter_list)]
url_base = f"{sub_url}&red="
url_main = []
for filename in sub_table["filename"]:
url_main.append(url_base + filename)
url_main = url_main[0]
self.url = url_main
except:
print(f"Note: experiencing issues with {self.survey}.")
return None
class SkymapperQuery(GeneralQuery):
def set_url(self):
import pandas as pd
from astropy.table import Table
url_size = self.size / 3600
url = f"https://api.skymapper.nci.org.au/public/siap/dr2/query?POS={self.pos[0]},{self.pos[1]}&SIZE={url_size}&BAND={self.band}&FORMAT=image/fits&VERB=3&INTERSECT=covers&RESPONSEFORMAT=CSV"
try:
table = pd.read_csv(url)
table = Table.from_pandas(table)
except:
print(f"Note: experiencing issues with {self.survey}.")
return None
if len(table) > 0:
url_main = table["get_image"][0]
self.url = url_main
else:
print(f"Note: {self.survey} image query returned no data.")
return None
class DssQuery(GeneralQuery):
def set_url(self):
url_size = self.size / 60
url_main = (
f"http://archive.stsci.edu/cgi-bin/dss_search?ra={self.pos[0]}&d={self.pos[1]}&v=3&e=J2000&f=fits&h={url_size}&w={url_size}"
)
self.url = url_main
@property
def image_time(self):
from astropy.time import Time
time = self.image_header["DATE-OBS"]
mins = int(time[14:16])
hours = int(time[11:13])
if mins >= 60:
mins = mins - 60
hours += 1
mins = str(mins).zfill(2)
hours = str(hours).zfill(2)
time = time[0:10] + "T" + str(hours) + ":" + str(mins) + ":" + time[17:20]
imageTime = Time(time, format="fits")
mjd = imageTime.mjd
imageTime = Time(mjd, format="mjd").to_datetime()
imageTime = [imageTime.year, imageTime.month]
return imageTime
def query(survey, size, band, pos=None, source=None, overlays=None):
f_return = ImageStruct(survey=survey, source=source, pos=pos, data=None)
def getimage(position):
query_object = globals()[f"{survey.capitalize()}Query"](pos=position, size=size, band=band, survey=survey)
query_object.set_url()
if not query_object.url:
return None
image_data, image_header = query_object.get_image_data()
if image_data is None or image_header is None:
return None
image_time = query_object.image_time
wcs = query_object.get_wcs
data_dict = {
"image_data": image_data,
"image_header": image_header,
"size": size,
"image_time": image_time,
"wcs": wcs,
"image_focus": position,
}
data = ImageStruct(survey=survey, source=source, pos=position, data=data_dict)
return data
if source:
from ..Tools import correctpm
from ..Tools import query as data_query
gaia_data = data_query(kind="data", survey="gaia", source=source, level="internal").data
ra, dec, pmra, pmdec = (gaia_data["ra"][0], gaia_data["dec"][0], gaia_data["pmra"][0], gaia_data["pmdec"][0])
gaia_pos = [ra, dec]
# get an initial image to get image_time
image = getimage(position=gaia_pos)
if image:
image_time = image.data["image_time"]
else:
f_return.pos, success = correctpm(
pos=gaia_pos, input_time=epochs["gaia"], target_time=[2000, 0], pmra=pmra, pmdec=pmdec, check_success=True
)
if success:
f_return.trace = (
f"start -> extracted pos from source query, assumed {epochs['gaia']} -> initial query performed -> [2000,0] -> end"
)
else:
f_return.trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> initial query performed -> proper motion correction failed -> end"
return f_return
# correct coords of source to image_time
corrected_pos, success1 = correctpm(
pos=gaia_pos, input_time=epochs["gaia"], target_time=image_time, pmra=pmra, pmdec=pmdec, check_success=True
)
final_pos, success2 = correctpm(
pos=gaia_pos, input_time=epochs["gaia"], target_time=[2000, 0], pmra=pmra, pmdec=pmdec, check_success=True
)
if success1 and success2:
trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> initial query performed -> {survey} (image_time): {image_time} -> final query performed -> [2000,0] -> end"
else:
trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> initial query performed -> proper motion correction failed -> final query performed -> proper motion correction failed -> end"
else:
corrected_pos = pos
final_pos = pos
trace = None
image = getimage(corrected_pos)
if image:
if overlays:
from ..Data.imageoverlay import overlay_query
overlay_data = overlay_query(image, overlays)
image.data["overlay"] = overlay_data
else:
image.data["overlay"] = None
image.trace = trace
image.pos = final_pos
else:
return f_return
return image