from functools import wraps
from ..PackageInfo import SurveyInfo
from ..StructureMethods.method_definitions import (exportplot, plot, savedata,
saveplot, showdata,
showplot)
surveyInfo = SurveyInfo()
newline = "\n"
[docs]
class SedStruct(object):
"""SedStruct()
This structure is returned from sed queries, when read from a data file that was originally created by an sed query, or through the Models module (in which case all attributes are set to None).
|
.. rubric:: Attributes
:heading-level: 1
kind : *str*
"sed"
source : *int*
Gaia source ID of target system (if given, else None)
pos : *list<float>*
Position of target system [right ascension, declination] in degrees
identifier : *str*
Position of target system in JHHMMSS.SS±DDMMSS.SS format
dataname : *str*
Default file for the :func:`savedata` method
plotname : *str*
Default file name for the :func:`showplot` and :func:`saveplot` methods
figure : *Bokeh figure*
Stores figures generated by the :func:`plot` method
data : *list<dict>*
list of entries with each entry taking the form:
.. code-block:: python
{
"survey": entry survey
"wavelength": [central filter wavelengths (angstroms)]
"flux": [fluxes (mJy)]
"flux_rel_err": [relative errors on fluxes (mJy)]
}
|
"""
def __init__(self, source, pos, data, identifier=None, trace=None):
self.kind = "sed"
self.source = source
self.pos = pos
self.identifier = identifier
self.data = data
self.figure = None
self.dataname = None
self.plotname = None
self.trace = trace
def __str__(self):
return "<ATK SED Structure>"
@wraps(plot)
def plot(self, **kwargs):
return plot(self, **kwargs)
@wraps(showdata)
def showdata(self, pprint=True, print_methods=True):
showdata(self, pprint, print_methods)
return self
@wraps(savedata)
def savedata(self, fname=None):
fname = savedata(self, fname)
return fname
@wraps(showplot)
def showplot(self, fname=None):
fname = showplot(self, fname)
return fname
@wraps(saveplot)
def saveplot(self, fname=None):
fname = saveplot(self, fname)
return fname
@wraps(exportplot)
def exportplot(self, fname=None):
fname = exportplot(self, fname=fname)
return fname
def mag_to_flux(mag, zp, wl):
# sometimes overflows if a bad mag is passed (e.g. -999 for some surveys)
try:
flux = zp * 10 ** (-0.4 * mag)
except:
return None
# convert to mjy
c = 2.988 * 10**18
fnl = 1 * 10 ** (-23)
flux = flux / ((fnl * c) / wl**2) * 1000
return flux
def format_data(survey, survey_data, filter_wavelengths, mag_names, error_names):
import numpy as np
if survey != "gaia":
zero_points = [10 ** ((5 * np.log10(x) + 2.406) / -2.5) for x in filter_wavelengths]
else:
zero_points = [2.5e-9, 4.11e-9, 1.24e-9]
sed_datapoints = {"survey": survey, "mag_name": [], "wavelength": [], "flux": [], "flux_rel_err": []}
for filter_wavelength, mag_name, error_name, zero_point in zip(
filter_wavelengths, mag_names, error_names, zero_points
):
mag, mag_err = survey_data[mag_name][0], survey_data[error_name][0]
# offsets for converting WISE Vega magnitudes -> AB magnitudes (https://wise2.ipac.caltech.edu/docs/release/allsky/expsup/sec4_4h.html)
if mag_name == "W1mag":
mag += 2.699
elif mag_name == "W2mag":
mag += 3.339
elif mag_name == "W3mag":
mag += 5.174
elif mag_name == "W4mag":
mag += 6.620
# offsets for converting 2MASS Vega magnitudes -> AB magnitudes (https://www.astronomy.ohio-state.edu/martini.10/usefuldata.html)
if mag_name == "Jmag":
mag += 0.91
elif mag_name == "Hmag":
mag += 1.39
elif mag_name == "Kmag":
mag += 1.85
flux = mag_to_flux(mag=mag, zp=zero_point, wl=filter_wavelength)
if flux:
rel_err = flux * mag_err / mag
sed_datapoints["wavelength"].append(filter_wavelength)
sed_datapoints["flux"].append(flux)
sed_datapoints["flux_rel_err"].append(rel_err)
sed_datapoints["mag_name"].append(mag_name)
return sed_datapoints
def query(radius, pos=None, source=None):
from ..Tools import query
sed_params = surveyInfo.dataSurveyInfo
bulkdata = query(kind="bulkdata", pos=pos, source=source, radius=radius, level="internal")
final_pos = bulkdata.pos
if source:
split_trace = bulkdata.trace.split("|")
# use trace string from bulkdata query, but remove parts that aren't needed
for survey in bulkdata.data:
if survey not in sed_params:
split_trace = [x for x in split_trace if survey not in x]
trace = ""
for entry in split_trace[:-1]:
trace += f"{entry}{'|'}"
trace += split_trace[-1]
else:
trace = None
if bulkdata.data:
if source:
pos = [bulkdata.data["gaia"]["ra"][0], bulkdata.data["gaia"]["dec"][0]]
bulkdata.data = {key: value for key, value in bulkdata.data.items() if value is not None}
sed_data = []
for survey in bulkdata.data:
if survey not in sed_params:
continue
filter_wavelengths, mag_names, error_names = (
sed_params[survey]["filter_wavelengths"],
sed_params[survey]["mags"],
sed_params[survey]["errors"],
)
sed_data.append(
format_data(
survey=survey,
survey_data=bulkdata.data[survey],
filter_wavelengths=filter_wavelengths,
mag_names=mag_names,
error_names=error_names,
)
)
data_struct = SedStruct(pos=pos, source=source, data=sed_data)
data_struct.trace = trace
data_struct.pos = final_pos
return data_struct