Source code for AstroToolkit.Data.lightcurvequery

from functools import wraps

from ..Configuration.epochs import EpochStruct
from ..Input.input_validation import check_inputs
from ..PackageInfo import SurveyInfo
from ..StructureMethods.method_definitions import (exportplot, plot, savedata,
                                                   saveplot, showdata,
                                                   showplot)
from ..Utility import HJDtoMJD

surveyInfo = SurveyInfo()
epochs = EpochStruct().epoch_list


[docs] class LightcurveStruct(object): """LightcurveStruct() This structure is returned from lightcurve queries, when read from a data file that was originally created by a lightcurve query, or through the Models module. .. rubric:: Attributes :heading-level: 1 kind : *str* "lightcurve" survey : *str* survey to which the data belongs source : *int* Gaia source ID of target system (if given, else None) pos : *list<float>* Position of target system [right ascension, declination] in degrees identifier : *str* Position of target system in JHHMMSS.SS±DDMMSS.SS format dataname : *str* Default file for the :func:`savedata` method plotname : *str* Default file name for the :func:`showplot` and :func:`saveplot` methods figure : *Bokeh figure* Stores figures generated by the :func:`plot` method data: *list<dict>* Returned data as a list of bands, with each band in the form: .. code-block:: python "band": <band> "ra": [right ascension (degrees)] "dec": [declination in (degrees)] "mjd": [mjd] "mag"/"flux": [apparent magnitude]/[flux] "mag_err"/"flux_err": [error on the apparent magnitude]/[error on the flux] where <band> is the wavelength band of the data entry, and the bands for each supported light curve survey are: - ztf: g, r, i - atlas: o, c, i - gaia: g, bp, rp - asassn: g, v - crts: v - tess: TESS mag | """ def __init__(self, survey, source, pos, data, identifier=None, trace=None): self.kind = "lightcurve" self.survey = survey self.source = source self.pos = pos self.identifier = identifier self.data = data self.figure = None self.dataname = None self.plotname = None self.trace = trace def __str__(self): return "<ATK Lightcurve Structure>"
[docs] def plot(self, **kwargs): """ Plots data contained within a given data stucture and assigns the resulting figure to the data structure's 'figure' attribute. Light curves can be plotted in three ways: - As standard light curves - As a Lomb-Scargle power spectrum - As a phase-folded light curve **Additional Parameters:** :param kind: type of figure to generate, from: 'lightcurve', 'powspec' (power spectrum), 'phasefold' (phase-folded light curve), defaults to 'lightcurve' :type kind: str, optional Additional optional parameters are available depending on the kind of figure being generated: | **For lightcurve plotting:** :param bands: list of bands to plot, see listed bands for each supported survey above, defaults to all supported bands of relevant survey :type bands: list<str>, optional :param colours: list of colours to apply to selected bands. Supported colours: green, red, blue, black, orange, purple. Default = black for all bands :type colours: list<str>, optional :param timeformat: time format from: 'reduced', 'original'. defaults to 'reduced' :type timeformat: str, optional | **For powspec plotting:** :param start_freq: starting frequency in :math:`\\text{days}^{-1}`, defaults to 0 :type start_freq: float, optional :param stop_freq: final frequency in :math:`\\text{days}^{-1}`, defaults to 60 :type stop_freq: float, optional :param samples: number of frequency values at which to calculate power, defaults to 150000 :type samples: int, optional | **For phasefold plotting:** :param freq: frequency on which to fold the data in :math:`\\text{days}^{-1}`, defaults to maximum power Lomb-Scargle frequency :type freq: float, optional :param bins: number of equally spaced bins in time to bin phase folded data into, defaults to None (i.e. no binning) :type bins: int, optional :param repeat: number of repetitions to plot on the folded period, defaults to 2 :type repeat: int, optional :param shift: shift to apply to phase folded light curve in units of phase, defaults to 0 :type shift: float, optional :param foverlay: overlays a sine wave of the fold frequency, defaults to True :type foverlay: bool, optional | """ return plot(self, **kwargs)
@wraps(showdata) def showdata(self, pprint=True, print_methods=True): showdata(self, pprint, print_methods) return self @wraps(savedata) def savedata(self, fname=None): fname = savedata(self, fname) return fname @wraps(showplot) def showplot(self, fname=None): fname = showplot(self, fname) return fname @wraps(saveplot) def saveplot(self, fname=None): fname = saveplot(self, fname) return fname @wraps(exportplot) def exportplot(self, fname=None): fname = exportplot(self, fname=fname) return fname
[docs] def sigmaclip(self, sigma=3): """ Sigma clips light curve data. :param sigma: number of standard deviations beyond which to clip data, defaults to 3 :type sigma: float, optional :return: Self """ corrected_inputs = check_inputs({"sigma": [sigma, float]}, label="sigmaclip") sigma = corrected_inputs[0] from .sigmaclip import sigma_clip if self.data: self.data = [sigma_clip(band, sigma) if band["mag"] else band for band in self.data] return self
[docs] def bin(self, bins=None, binsize=None): """bin(**kwargs) Bins light curve data into a given number of equally spaced bins in time or a given bin size in days, hours or minutes. :param bins: number of bins in which to bin data, defaults to None (i.e. no binning) :type bins: int, optional or :param binsize: bin size ending with the unit, e.g. "10d", "10h" or "10m" for 10 days, hours or minutes, respectively. Defaults to None (i.e. no binning) :type binsize: str, optional :return: Self | """ from ..Input.input_validation import check_inputs corrected_inputs = check_inputs({"bins": [bins, int], "binsize": [binsize, str]}, label="bin") bins, binsize = corrected_inputs from .bin_lightcurves import binning if not bins and not binsize: raise Exception("Must provide bins or binsize.") elif bins and binsize: raise Exception("Both bins and binsize provided.") binned_data = [] for band in self.data: if band["mag"]: binned_data.append(binning(band, bins=bins, bin_size=binsize)) return self
[docs] def crop(self, start=None, stop=None, timeformat="reduced"): """crop(**kwargs) Crops light curve data between a given start and end time in reduced or original time format, or between two percentages of the light curve's total coverage in time. :param timeformat: time format, from: 'reduced', 'original'. Defaults to 'reduced' :type timeformat: str, optional :param start: start time in format given by **timeformat**, or where to start in the light curve as a percentage (e.g. '25%'). Defaults to None (i.e. no cropping) :type start: float/str, optional :param stop: end time in format given by **timeformat**, or where to stop in the light curve as a percentage (e.g. '75%'). Defaults to None (i.e. no cropping) :type stop: float/str, optional :return: Self | """ from ..Input.input_validation import check_inputs corrected_inputs = check_inputs({"start": [start, str], "stop": [stop, str], "timeformat": [timeformat, str]}, label="crop") start, stop, timeformat = corrected_inputs from .crop_lightcurves import crop_lightcurve self.data = crop_lightcurve(self, start, stop, timeformat) return self
class GeneralQuery(object): def __init__(self, survey, radius, source, pos, username, password, pmra, pmdec, raw): self.survey = survey self.radius = radius self.source = source self.pos = pos self.url = "" self.username = username self.password = password self.pmra = pmra self.pmdec = pmdec self.raw = raw def get_response(self): import requests from requests.adapters import HTTPAdapter, Retry s = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) s.mount("http://", HTTPAdapter(max_retries=retries)) try: response = s.get(self.url, timeout=180) except: print(f"Note: experiencing issues with {self.survey}") return None if response.status_code != 200: print(f"Note: experiencing issues with {self.survey}") return None return response class ZtfQuery(GeneralQuery): def set_url(self): url = f"https://irsa.ipac.caltech.edu/cgi-bin/ZTF/nph_light_curves?POS=CIRCLE {self.pos[0]} {self.pos[1]} {self.radius / 3600}&BANDNAME=g,r,i&FORMAT=CSV" self.url = url def generate_data(self, response): from io import BytesIO import pandas as pd data = pd.read_csv(BytesIO(response.content)) if len(data) == 0: print(f"Note: {self.survey} query returned no data.") return None return data def format_data(self, data): data_arr = [] for filter_code in ["zg", "zr", "zi"]: current_band_data = data.loc[data["filtercode"] == filter_code].reset_index(drop=True) if current_band_data.empty: data_arr.append({"band": filter_code[1:], "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) continue mag, mag_err, ra, dec, hjd = ( current_band_data["mag"].tolist(), current_band_data["magerr"].tolist(), current_band_data["ra"].tolist(), current_band_data["dec"].tolist(), current_band_data["hjd"].tolist(), ) data_arr.append({"band": filter_code[1:], "ra": ra, "dec": dec, "mjd": HJDtoMJD(hjd, self.pos), "mag": mag, "mag_err": mag_err}) return data_arr class AtlasQuery(GeneralQuery): def set_url(self): base_url = "https://fallingstar-data.com/forcedphot" self.url = base_url def get_response(self): import re import time import requests r = requests.post(url=f"{self.url}/api-token-auth/", data={"username": self.username, "password": self.password}) if r.status_code == 200: token = r.json()["token"] headers = {"Authorization": f"Token {token}", "Accept": "application/json"} self.headers = headers else: print(f"Note: experiencing issues with {self.survey}. Please ensure that you have provided valid login credentials.") return None do_correction = False if self.pmra and self.pmdec: do_correction = True if self.source: coord_epoch = epochs["gaia"][0] else: coord_epoch = 2000 task_url = None while not task_url: with requests.Session() as s: if do_correction: r = s.post( f"{self.url}/queue/", headers=headers, data={ "ra": self.pos[0], "dec": self.pos[1], "mjd_min": 60000.0, "propermotion_ra": self.pmra, "propermotion_dec": self.pmdec, "radec_epoch_year": coord_epoch, "use_reduced": True, }, ) else: r = s.post( f"{self.url}/queue/", headers=headers, data={ "ra": self.pos[0], "dec": self.pos[1], "mjd_min": 60000.0, "radec_epoch_year": coord_epoch, "use_reduced": True, }, ) if r.status_code == 201: task_url = r.json()["url"] elif r.status_code == 429: message = r.json()["detail"] print(f"{r.status_code} {message}") t_sec = re.findall(r"Available in (\d+) seconds", message) t_min = re.findall(r"Available in (\d+) minutes", message) if t_sec: waittime = int(t_sec[0]) elif t_min: waittime = int(t_min[0]) else: waittime = 10 print(f"Waiting {waittime} seconds.") time.sleep(waittime) else: print(f"{self.survey} login failed. Please ensure that you have provided valid login credentials.") return None result_url = None while not result_url: with requests.Session() as s: r = s.get(task_url, headers=headers) if r.status_code == 200: if r.json()["finishtimestamp"]: return r elif r.json()["starttimestamp"]: print(f"Task is running (started at {r.json()['starttimestamp']})") else: print("Waiting for job to start. Checking again in 10 seconds.") time.sleep(10) else: print(f"Note: experiencing issues with {self.survey}") return None def generate_data(self, response): import io import pandas as pd import requests result_url = response.json()["result_url"] with requests.Session() as s: textdata = s.get(result_url, headers=self.headers).text df = pd.read_csv(io.StringIO(textdata.replace("###", "")), sep="\\s+") if df.empty: print("Note: ATLAS query returned no data.") return None else: return df def format_data(self, data): import itertools mjd = data["MJD"].tolist() mag = data["m"].tolist() mag_err, ra_arr, dec_arr, obs_arr, chi_arr, flux, flux_err = ( data["dm"].tolist(), data["RA"].tolist(), data["Dec"].tolist(), data["Obs"].tolist(), data["chi/N"].tolist(), data["uJy"].tolist(), data["duJy"].tolist(), ) if not self.raw: chi_arr = [float(x) for x in chi_arr] flux = [float(x) for x in flux] flux_err = [float(x) for x in flux_err] # abs(flux) < 3 bad_indices_flux = set([i for i, element in enumerate(flux) if abs(element) < 3]) # flux_err > 4000 bad_indices_flux_err = set([i for i, element in enumerate(flux_err) if element > 4000]) """ # mag_err > 0.5 * mag_range bad_indices_mag = set( [ i for i, element in enumerate(mag_err) if element > 0.5 * (max(mag) - min(mag)) ] ) """ bad_indices_mag_hard_lim = set([i for i, element in enumerate(mag_err) if element > 0.5]) bad_indices = list(set(itertools.chain(*[bad_indices_flux, bad_indices_flux_err, bad_indices_mag_hard_lim]))) # apply the above filters filtered_list = [] for _, val in enumerate([obs_arr, mag, mag_err, mjd, ra_arr, dec_arr]): filtered_list.append([element for i, element in enumerate(val) if i not in bad_indices]) obs_arr, mag, mag_err, mjd, ra_arr, dec_arr = filtered_list filtered_list = [] mask = [i for i, val in enumerate(mag) if val < 0] for _, val in enumerate([obs_arr, mag, mag_err, mjd, ra_arr, dec_arr]): filtered_list.append([element for i, element in enumerate(val) if i not in mask]) obs_arr, mag, mag_err, mjd, ra_arr, dec_arr = filtered_list filtered_list = [] mask = [i for i, val in enumerate(mag_err) if val == 0] for _, val in enumerate([obs_arr, mag, mag_err, mjd, ra_arr, dec_arr]): filtered_list.append([element for i, element in enumerate(val) if i not in mask]) obs_arr, mag, mag_err, mjd, ra_arr, dec_arr = filtered_list # split data into bands data_split = [] for band in ["o", "c", "i"]: split_arr = [] for _, val in enumerate([ra_arr, dec_arr, mjd, mag, mag_err]): split_arr.append([element for i, element in enumerate(val) if obs_arr[i][-1] == band]) if len(split_arr[4]) > 0: data_split.append( { "band": band, "ra": split_arr[0], "dec": split_arr[1], "mjd": split_arr[2], "mag": split_arr[3], "mag_err": split_arr[4], } ) else: data_split.append({"band": band, "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) return data_split class GaiaQuery(GeneralQuery): def get_data(self, source=None, pos=None): from ..Tools import query data = query(survey="gaia_lc", kind="data", pos=pos, source=source, radius=self.radius, level="internal") if data: return data.data def format_data(self, data): import math import numpy as np from astropy.time import Time # magnitude error formula taken from https://astronomy.stackexchange.com/questions/38371/how-can-i-calculate-the-uncertainties-in-magnitude-like-the-cds-does bands = ["g", "bp", "rp"] cols = [] data_arr = [] for band in bands: if band == "g": # time, flux, flux_err, mag, ra, dec cols = ["t8_c5", "FG", "e_FG", "Gmag", "RA_ICRS", "DE_ICRS"] elif band == "bp": # time, flux, flux_err, mag, ra, dec cols = ["t8_c11", "FBP", "e_FBP", "BPmag", "RA_ICRS", "DE_ICRS"] elif band == "rp": # time, flux, flux_err, mag, ra, dec cols = ["t8_c16", "FRP", "e_FRP", "RPmag", "RA_ICRS", "DE_ICRS"] time, flux, flux_err, mag, ra, dec = (data[cols[0]], data[cols[1]], data[cols[2]], data[cols[3]], data[cols[4]], data[cols[5]]) mag_err = [(2.5 / np.log(10)) * (f_e / f) for f, f_e in zip(flux, flux_err)] bad_indices = [i for i, val in enumerate(time) if math.isnan(val)] # apply the above filter filtered_list = [] for _, val in enumerate([mag, mag_err, time, ra, dec]): filtered_list.append([element for i, element in enumerate(val) if i not in bad_indices]) mag, mag_err, time, ra, dec = filtered_list mjd = [Time(t + 2455197.5 - 2400000.5, format="jd").value for t in time] if len(mag) > 0: data_arr.append({"band": band, "ra": ra, "dec": dec, "mjd": mjd, "mag": mag, "mag_err": mag_err}) else: data_arr.append({"band": band, "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) return data_arr class AsassnQuery(GeneralQuery): def set_url(self): url = f"https://asas-sn.osu.edu/photometry.json?action=index&controller=photometry&dec={self.pos[1]}&epochs_max=&epochs_min=&ra={self.pos[0]}&radius={self.radius / 60}&rms_max=&rms_min=&sort_by=raj2000&utf8=%E2%9C%93&vmag_max=&vmag_min=" self.url = url def generate_data(self, response): import json data = json.loads(response.content) if data["count"] > 1: print(f"Note: {self.survey} query returned data for multiple objects") elif data["count"] == 0: print("Note: asassn query returned no data.") return None return data def format_data(self, data): import json import requests v_ra_arr, v_dec_arr, v_mag, v_mag_err, v_hjd = [], [], [], [], [] g_ra_arr, g_dec_arr, g_mag, g_mag_err, g_hjd = [], [], [], [], [] for i in range(0, len(data["results"])): link = data["results"][i]["link"] ra = data["results"][i]["raj2000"] dec = data["results"][i]["dej2000"] v_cameras = ["a", "b", "c", "d", "e", "f", "g", "h"] g_cameras = ["i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t"] complete = False while not complete: r = requests.get(link, timeout=60) lc_data = json.loads(r.content) for j in range(0, len(lc_data["results"])): current_data = lc_data["results"][j] camera = current_data["camera"][-1:] if camera in v_cameras: v_mag.append(current_data["mag"]) v_mag_err.append(current_data["mag_err"]) v_hjd.append(current_data["hjd"]) v_ra_arr.append(ra) v_dec_arr.append(dec) elif camera in g_cameras: g_mag.append(current_data["mag"]) g_mag_err.append(current_data["mag_err"]) g_hjd.append(current_data["hjd"]) g_ra_arr.append(ra) g_dec_arr.append(dec) if lc_data["next"] is not None: link = lc_data["next"] else: complete = True if not self.raw: v_bad_indices = set([i for i, element in enumerate(v_mag_err) if element > 0.5 * (max(v_mag) - min(v_mag))]) g_bad_indices = set([i for i, element in enumerate(g_mag_err) if element > 0.5 * (max(g_mag) - min(g_mag))]) # apply the above filters filtered_list = [] for _, val in enumerate([v_mag, v_mag_err, v_hjd, v_ra_arr, v_dec_arr]): filtered_list.append([element for i, element in enumerate(val) if i not in v_bad_indices]) v_mag, v_mag_err, v_hjd, v_ra_arr, v_dec_arr = filtered_list # apply the above filters filtered_list = [] for _, val in enumerate([g_mag, g_mag_err, g_hjd, g_ra_arr, g_dec_arr]): filtered_list.append([element for i, element in enumerate(val) if i not in g_bad_indices]) g_mag, g_mag_err, g_hjd, g_ra_arr, g_dec_arr = filtered_list data_arr = [] if len(v_mag) > 0: data_arr.append( {"band": "v", "ra": v_ra_arr, "dec": v_dec_arr, "mjd": HJDtoMJD(v_hjd, self.pos), "mag": v_mag, "mag_err": v_mag_err} ) else: data_arr.append({"band": "v", "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) if len(g_mag) > 0: data_arr.append( {"band": "g", "ra": g_ra_arr, "dec": g_dec_arr, "mjd": HJDtoMJD(g_hjd, self.pos), "mag": g_mag, "mag_err": g_mag_err} ) else: data_arr.append({"band": "g", "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) return data_arr class CrtsQuery(GeneralQuery): def get_data(self): from .fetch_crts import get_CRTS_lightcurve data = get_CRTS_lightcurve(self.pos, self.radius) return data def format_data(self, data): ra_arr = data["RA"].tolist() dec_arr = data["Dec"].tolist() mjd = data["MJD"].tolist() mag = data["Mag"].tolist() mag_err = data["Magerr"].tolist() data_dict = {"band": "v", "ra": ra_arr, "dec": dec_arr, "mjd": mjd, "mag": mag, "mag_err": mag_err} return [data_dict] class TessQuery(GeneralQuery): def get_data(self): from .fetch_tess import get_TESS_lightcurve data = get_TESS_lightcurve(self.pos, self.radius) return data def format_data(self, data): import math import numpy as np mjd = data["mjd"].tolist() mag = data["mag"].tolist() ra_arr = [self.pos[0]] * len(mag) dec_arr = [self.pos[1]] * len(mag) flux = data["flux"].tolist() flux_err = data["flux_err"].tolist() mag_err = ((2.5 / np.log(10)) * (np.asarray(flux_err) / np.asarray(flux))) ** 2 + 0.05**2 mag_err = [math.sqrt(x) for x in mag_err] if not self.raw: mask = [i for i, val in enumerate(mag_err) if val > 1] def filter_data(arr, mask): filtered_arr = [val for i, val in enumerate(arr) if i not in mask] return filtered_arr mjd = filter_data(mjd, mask) mag = filter_data(mag, mask) mag_err = filter_data(mag_err, mask) ra_arr = filter_data(ra_arr, mask) dec_arr = filter_data(dec_arr, mask) data_dict = {"band": "TESS mag", "ra": ra_arr, "dec": dec_arr, "mjd": mjd, "mag": mag, "mag_err": mag_err} return [data_dict] def get_f_return(survey): f_return = [] if survey == "gaia": survey = "gaia_lc" bands = surveyInfo.lightcurveSurveyInfo[survey]["bands"] for band in bands: f_return.append({"band": band, "ra": None, "dec": None, "mjd": None, "mag": None, "mag_err": None}) return f_return def query(survey, source, pos, radius, raw, username=None, password=None): def get_lightcurve(): query_object = globals()[f"{survey.capitalize()}Query"]( source=source, pos=pos, radius=radius, survey=survey, username=username, password=password, pmra=pmra, pmdec=pmdec, raw=raw ) if survey == "gaia": if source: data = query_object.get_data(source=source) else: data = query_object.get_data(pos=pos) elif survey in ["crts", "tess"]: data = query_object.get_data() else: query_object.set_url() response = query_object.get_response() if response is None: return get_f_return(survey) data = query_object.generate_data(response) if data is None: return get_f_return(survey) data = query_object.format_data(data) return data pmra, pmdec = None, None if source and survey != "atlas": from ..Tools import correctpm pos, success = correctpm(source=source, target_survey=survey, check_success=True) if survey == "gaia": survey_str = "gaia (lightcurve)" epoch_label = "gaia_lc" else: survey_str = survey epoch_label = survey if success: trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> {survey_str}: {epochs[epoch_label]} -> {survey_str} query performed -> [2000,0] -> end" final_pos = correctpm(source=source, target_time=[2000, 0], pmra=pmra, pmdec=pmdec) else: trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> proper motion correction failed -> {survey_str} query performed -> end" final_pos = pos elif source and survey == "atlas": from ..Tools import correctpm, query gaia_data = query(kind="data", source=source, survey="gaia", level="internal") import math ra, dec, pmra, pmdec = (gaia_data.data["ra"][0], gaia_data.data["dec"][0], gaia_data.data["pmra"][0], gaia_data.data["pmdec"][0]) pos = [ra, dec] if math.isnan(pmra) or math.isnan(pmdec): print("Note: could not correct coordinates due to missing pmra/pmdec") pmra, pmdec = None, None trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> proper motion correction failed -> {survey} query performed -> end" final_pos = pos else: trace = f"start -> extracted pos from source query, assumed {epochs['gaia']} -> proper motion correction by ATLAS API -> {survey} query performed -> [2000,0] -> end" final_pos = correctpm(pos=pos, input_time=epochs["gaia"], target_time=[2000, 0], pmra=pmra, pmdec=pmdec) else: final_pos = pos trace = None lightcurve = LightcurveStruct(survey=survey, source=source, pos=pos, data=get_lightcurve()) lightcurve.trace = trace lightcurve.pos = final_pos return lightcurve