Module stixdcpy.science
This module provides APIs to retrieve Quick-look data from STIX data center , and provides tools to display the data Author: Hualin Xiao (hualin.xiao@fhnw.ch) Date: Sep. 1, 2021
Expand source code
#!/usr/bin/python
"""
This module provides APIs to retrieve Quick-look data from STIX data center , and provides tools to display the data
Author: Hualin Xiao (hualin.xiao@fhnw.ch)
Date: Sep. 1, 2021
"""
import datetime
import numpy as np
from astropy.io import fits
from matplotlib import pyplot as plt
from stixdcpy import time as sdt
from stixdcpy.logger import logger
from stixdcpy import io as sio, net
from stixdcpy.net import FitsQuery as freq
from stixdcpy import instrument as inst
from pathlib import PurePath
FPGA_TAU=10.1e-6
ASIC_TAU=2.63e-6
class ScienceData(sio.IO):
"""
Retrieve science data from stix data center or load fits file from local storage
"""
def __init__(self, request_id=None, fname=None):
self.fname = fname
self.data_type=None
if not fname:
raise Exception("FITS filename not specified")
self.request_id = request_id
self.time_shift_applied=0
self.hdul = fits.open(fname)
self.energies=[]
#self.read_data()
@property
def url(self):
req_id=self.request_id if not isinstance(self.request_id, list) else self.request_id[0]
link=f'{net.HOST}/view/list/bsd/uid/{req_id}'
return f'<a href="{link}">{link}</a>'
def read_fits(self, light_time_correction=True):
"""
Read data L1 compression level FITS files
Parameters
---------------------
light_time_correction: boolean
Correct light time difference
"""
self.data = self.hdul['DATA'].data
self.T0_utc = self.hdul['PRIMARY'].header['DATE_BEG']
self.counts= self.data['counts']
self.light_time_del= self.hdul['PRIMARY'].header['EAR_TDEL']
self.light_time_corrected=light_time_correction
self.T0_unix = sdt.utc2unix(self.T0_utc)
self.triggers = self.data['triggers']
self.rcr = self.data['rcr']
self.timedel = self.data['timedel']
self.time = self.data['time']
if self.is_time_bin_shifted(self.T0_unix):
self.timedel = self.timedel[:-1]
self.time = self.time[1:]
logger.info('Shifted time bins have been corrected automatically!')
if self.data_type=='ScienceL1':
self.counts= self.counts[1:, :, :, :]
self.triggers = self.triggers[1:, :]
self.rcr = self.rcr[1:]
elif self.data_type=='Spectrogram':
self.counts= self.counts[1:, :]
self.triggers = self.triggers[1:]
#self.rcr = self.rcr[1:]
self.request_id = self.hdul['CONTROL'].data['request_id']
self.time_shift_applied=0 if light_time_correction else self.light_time_del
self.datetime = [
sdt.unix2datetime(self.T0_unix + x + y * 0.5 + self.time_shift_applied)
for x, y in zip(self.time, self.timedel)
]
self.duration = self.time[-1] - self.time[0] + (self.timedel[0] +
self.timedel[-1]) / 2
self.energies = self.hdul['ENERGIES'].data
self.energy_bin_names = [
f'{a} - {b}'
for a, b in zip(self.energies['e_low'], self.energies['e_high'])
]
self.energy_bin_mask = self.hdul["CONTROL"].data["energy_bin_mask"]
ebin_nz_idx =self.energy_bin_mask.nonzero()
self.max_ebin = np.max(ebin_nz_idx) #indices of the non-zero elements
self.min_ebin = np.min(ebin_nz_idx)
self.ebins_mid = [
(a + b) / 2.
for a, b in zip(self.energies['e_low'], self.energies['e_high'])
]
self.ebins_low, self.ebins_high = self.energies[
'e_low'], self.energies['e_high']
if self.data_type=='ScienceL1':
self.pixel_counts=self.counts
self.pixel_count_rates= self.pixel_counts/self.timedel[:,None,None, None]
self.trigger_rates = self.triggers / self.timedel[:, None]
elif self.data_type=='Spectrogram':
self.count_rates= self.counts/self.timedel[:,None]
self.trigger_rates = self.triggers / self.timedel
def is_time_bin_shifted(self, unix_time):
"""
Time bins are shifted in the data collected before 2021-12-09 due a bug in the flight software
Check if time bin is shifted in L1 data
Parameters
unix_time: float
Returns
is_shifted: bool
True if time bin is shifted else False
"""
return (unix_time < sdt.utc2unix('2021-12-09T14:00:00'))
@classmethod
def from_sdc(cls, request_id):
'''
download science data file from stix data center
Parameters
------
request_id : int
bulk science data request unique ID; Unique IDs can be found on the science data web page at stix data center
Returns
------
science data class object
'''
request_id = request_id
fname = freq.fetch_bulk_science_by_request_id(request_id)
return cls(request_id, fname)
@classmethod
def from_fits(cls, filename):
"""
factory class
Arguments
filename: str
FITS filename
"""
request_id = None
return cls(request_id, filename)
def get_energy_range_slicer(self, elow, ehigh):
sel=[]
i=0
for a, b in zip(self.energies['e_low'], self.energies['e_high']):
if a>=elow and b<=ehigh:
sel.append(i)
i+=1
return slice(min(sel),max(sel))
def rebin(self, ebins, min_tbin=0):
"""
Energy rebin and time rebin
Arguments:
ebins: list or numpy array
energy bin range in units of keV
min_tbin: float
minimum time bin, shorter time bins are merged
Returns:
an object containing rebinned light curves
"""
pass
def save(self, filename=None):
'''
Save data to a fits file
Parameters
filename : output fits filename
Returns
filename if success or error message
'''
if not isinstance(self.hdul, fits.hdu.hdulist.HDUList):
logger.error('The data object is a not a fits hdu object!')
return None
try:
if filename is None:
basename = self.hdul['PRIMARY'].header['FILENAME']
filename = PurePath(net.DOWNLOAD_PATH, basename)
self.hdul.writeto(filename)
return filename
except Exception as e:
logger.error(e)
def __getattr__(self, name):
if name == 'data':
return self.hdul
elif name == 'type':
return self.hdul.get('data_type', 'INVALID_TYPE')
elif name == 'filename':
return self.fname
def get_data(self):
return self.hdul
class ScienceL1(ScienceData):
"""
Tools to analyze L1 science data
"""
def __init__(self, reqeust_id,fname):
print(reqeust_id, fname)
super().__init__(reqeust_id, fname)
self.data_type='ScienceL1'
self.pixel_count_rates=None
self.correct_pixel_count_rates=None
self.read_fits()
self.make_spectra()
def make_spectra(self, pixel_counts=None):
if pixel_counts is None:
pixel_counts=self.pixel_counts
self.spectrogram = np.sum(pixel_counts, axis=(1, 2))
self.count_rate_spectrogram = self.spectrogram / self.timedel[:, np.
newaxis]
self.spectrum = np.sum(pixel_counts, axis=(0, 1, 2))
self.mean_pixel_rate_spectra = np.sum(self.pixel_counts,
axis=0) / self.duration
self.mean_pixel_rate_spectra_err = np.sqrt(
self.mean_pixel_rate_spectra) / np.sqrt(self.duration)
#sum over all time bins and then divide them by the duration, counts per second
def solve_cfl(self, start_utc, end_utc, elow=0, eup=31, ax=None):
"""calculate flare location using the online flare location solver.
Args:
start_utc: str
ROI start time
end_utc: str
ROI end time
elow: int
ROI lower energy limit (science channel).
eup: int
ROI upper energy limit (science channel).
Returns:
cfl_loc: dict
containing coarse flare location as well as ephemeris and chisquare map
"""
pass
def correct_live_time(self, clone=False):
""" Live time correction
Returns:
scienceL1 object
"""
trig_tau = FPGA_TAU+ASIC_TAU
time_bins = self.time_bins[:, None]
photons_in = self.triggers/(time_bins-trig_tau*self.triggers)
#photon rate calculated using triggers
cm= np.zeros((time_bins.size, 32))
time_bins = time_bins[:, :, None, None]
count_rates = self.pixel_counts/time_bins
for det in range(32):
trig_idx=inst.detector_id_to_trigger_index(det)
cm[:,det]= 1 + self.trigger_rates[:,trig_idx]*FPGA_TAU #live time per second
self.correct_pixel_count_rates=count_rates*cm[:, :, None, None]/np.exp(-count_rates*ASIC_TAU)
return self
def peek(self,
plots=['spg', 'lc', 'spec', 'tbin', 'qllc'],
ax0=None,
ax1=None,
ax2=None,
ax3=None):
"""
Create quick-look plots for the loaded science data
"""
if not self.hdul:
logger.logger(f'Data not loaded. ')
return None
if isinstance(plots, str) and plots:
plots = plots.split(',')
if not self.data_loaded:
self.read_fits()
self.data_loaded = True
if 'spg' in plots:
if not ax0:
_, ax0 = plt.subplots()
X, Y = np.meshgrid(self.time,
np.arange(self.min_ebin, self.max_ebin))
im = ax0.pcolormesh(
X, Y,
np.transpose(
self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin]
)) #pixel summed energy spectrum
ax0.set_yticks(
self.energies['channel'][self.min_ebin:self.max_ebin:2])
ax0.set_yticklabels(
self.energy_bin_names[self.min_ebin:self.max_ebin:2])
fig = plt.gcf()
cbar = fig.colorbar(im, ax=ax0)
cbar.set_label('Counts')
ax0.set_title('Count rate spectrogram')
ax0.set_ylabel('Energy range(keV')
ax0.set_xlabel(f"T0 at {self.T0_utc} ")
if 'lc' in plots or 'qllc' in plots:
if not ax1:
_, ax1 = plt.subplots()
self.count_rate_spectrogram = self.spectrogram / self.timedel[:,
None]
if 'qllc' in plots:
ql_ebins=[(4, 10),(10 ,15 ),(15 ,25 ),(25 ,50), (50 ,84)]
labels=('4 - 10 keV','10 - 15 keV','15 - 25 keV','25 - 50 keV', '50 - 84 keV')
ql_sci_ebins=[self.get_energy_range_slicer(s[0],s[1]) for s in ql_ebins]
for ebin_slicer,label in zip(ql_sci_ebins, labels):
ax1.plot(self.time,
np.sum(self.count_rate_spectrogram[:, ebin_slicer], axis=1),label=label)
ax1.set_title(f'Detector summed count rates (L1 request #{self.request_id})')
else:
ax1.plot(
self.time,
self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin])
#correct
ax1.set_ylabel('counts / sec')
#plt.legend(self.energy_bin_names, ncol=4)
ax1.set_yscale('log')
ax1.set_xlabel(f"seconds since {self.T0_utc} ")
plt.legend()
if 'spec' in plots:
if not ax2:
_, ax2 = plt.subplots()
ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post')
#ax.set_xticks(self.data[3].data['channel'])
ax2.set_xscale('log')
ax2.set_yscale('log')
ax2.set_xlabel('Energy (keV)')
ax2.set_ylabel('Counts')
if 'tbin' in plots:
if not ax3:
_, ax3 = plt.subplots()
ax3.plot(self.time, self.timedel)
ax3.set_xlabel(f"T0 at {self.T0_utc} ")
ax3.set_ylabel('Integration time (sec)')
plt.suptitle(f'L1 request #{self.request_id}')
#plt.tight_layout()
return ax0, ax1, ax2, ax3
class Spectrogram(ScienceData):
def __init__(self, reqeust_id,fname):
super().__init__(reqeust_id, fname)
self.data_type='Spectrogram'
self.read_fits()
self.spectrum = np.sum(self.counts, axis=0)
def peek(self, ax0=None, ax1=None, ax2=None, ax3=None):
"""
preivew Science data
Arguments:
ax0: matplotlib axe
ax0: matplotlib axe
"""
if not self.hdul:
print(f'Data not loaded. ')
return None
#((ax0, ax1), (ax2, ax3))=axs
if not any([ax0, ax1, ax2, ax3]):
_, ((ax0, ax1), (ax2, ax3)) = plt.subplots(2, 2, figsize=(8, 6))
if ax0:
X, Y = np.meshgrid(self.time,
self.hdul['ENERGIES'].data['channel'])
im = ax0.pcolormesh(X, Y, np.transpose(
self.counts)) #pixel summed energy spectrum
ax0.set_yticks(self.hdul['ENERGIES'].data['channel'][::2])
ax0.set_yticklabels(self.energy_bin_names[::2])
fig = plt.gcf()
cbar = fig.colorbar(im, ax=ax0)
cbar.set_label('Counts')
ax0.set_title('Spectrogram')
ax0.set_ylabel('Energy range(keV')
ax0.set_xlabel(f"Seconds since {self.T0}s ")
if ax1:
#convert to 2d
ax1.plot(self.time, self.count_rates)
ax1.set_yscale('log')
ax1.set_ylabel('Counts / sec')
ax1.set_xlabel(f"Seconds since {self.T0}s ")
if ax2:
ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post')
ax2.set_xscale('log')
ax2.set_yscale('log')
ax2.set_xlabel('Energy (keV)')
ax2.set_ylabel('Counts')
if ax3:
ax3.plot(self.time, self.timedel)
ax3.set_xlabel(f"Seconds since {self.T0}s ")
ax3.set_ylabel('Integration time (sec)')
plt.suptitle(f'L4 request #{self.request_id}')
plt.tight_layout()
return fig, ((ax0, ax1), (ax2, ax3))
Classes
class ScienceData (request_id=None, fname=None)
-
Retrieve science data from stix data center or load fits file from local storage
Expand source code
class ScienceData(sio.IO): """ Retrieve science data from stix data center or load fits file from local storage """ def __init__(self, request_id=None, fname=None): self.fname = fname self.data_type=None if not fname: raise Exception("FITS filename not specified") self.request_id = request_id self.time_shift_applied=0 self.hdul = fits.open(fname) self.energies=[] #self.read_data() @property def url(self): req_id=self.request_id if not isinstance(self.request_id, list) else self.request_id[0] link=f'{net.HOST}/view/list/bsd/uid/{req_id}' return f'<a href="{link}">{link}</a>' def read_fits(self, light_time_correction=True): """ Read data L1 compression level FITS files Parameters --------------------- light_time_correction: boolean Correct light time difference """ self.data = self.hdul['DATA'].data self.T0_utc = self.hdul['PRIMARY'].header['DATE_BEG'] self.counts= self.data['counts'] self.light_time_del= self.hdul['PRIMARY'].header['EAR_TDEL'] self.light_time_corrected=light_time_correction self.T0_unix = sdt.utc2unix(self.T0_utc) self.triggers = self.data['triggers'] self.rcr = self.data['rcr'] self.timedel = self.data['timedel'] self.time = self.data['time'] if self.is_time_bin_shifted(self.T0_unix): self.timedel = self.timedel[:-1] self.time = self.time[1:] logger.info('Shifted time bins have been corrected automatically!') if self.data_type=='ScienceL1': self.counts= self.counts[1:, :, :, :] self.triggers = self.triggers[1:, :] self.rcr = self.rcr[1:] elif self.data_type=='Spectrogram': self.counts= self.counts[1:, :] self.triggers = self.triggers[1:] #self.rcr = self.rcr[1:] self.request_id = self.hdul['CONTROL'].data['request_id'] self.time_shift_applied=0 if light_time_correction else self.light_time_del self.datetime = [ sdt.unix2datetime(self.T0_unix + x + y * 0.5 + self.time_shift_applied) for x, y in zip(self.time, self.timedel) ] self.duration = self.time[-1] - self.time[0] + (self.timedel[0] + self.timedel[-1]) / 2 self.energies = self.hdul['ENERGIES'].data self.energy_bin_names = [ f'{a} - {b}' for a, b in zip(self.energies['e_low'], self.energies['e_high']) ] self.energy_bin_mask = self.hdul["CONTROL"].data["energy_bin_mask"] ebin_nz_idx =self.energy_bin_mask.nonzero() self.max_ebin = np.max(ebin_nz_idx) #indices of the non-zero elements self.min_ebin = np.min(ebin_nz_idx) self.ebins_mid = [ (a + b) / 2. for a, b in zip(self.energies['e_low'], self.energies['e_high']) ] self.ebins_low, self.ebins_high = self.energies[ 'e_low'], self.energies['e_high'] if self.data_type=='ScienceL1': self.pixel_counts=self.counts self.pixel_count_rates= self.pixel_counts/self.timedel[:,None,None, None] self.trigger_rates = self.triggers / self.timedel[:, None] elif self.data_type=='Spectrogram': self.count_rates= self.counts/self.timedel[:,None] self.trigger_rates = self.triggers / self.timedel def is_time_bin_shifted(self, unix_time): """ Time bins are shifted in the data collected before 2021-12-09 due a bug in the flight software Check if time bin is shifted in L1 data Parameters unix_time: float Returns is_shifted: bool True if time bin is shifted else False """ return (unix_time < sdt.utc2unix('2021-12-09T14:00:00')) @classmethod def from_sdc(cls, request_id): ''' download science data file from stix data center Parameters ------ request_id : int bulk science data request unique ID; Unique IDs can be found on the science data web page at stix data center Returns ------ science data class object ''' request_id = request_id fname = freq.fetch_bulk_science_by_request_id(request_id) return cls(request_id, fname) @classmethod def from_fits(cls, filename): """ factory class Arguments filename: str FITS filename """ request_id = None return cls(request_id, filename) def get_energy_range_slicer(self, elow, ehigh): sel=[] i=0 for a, b in zip(self.energies['e_low'], self.energies['e_high']): if a>=elow and b<=ehigh: sel.append(i) i+=1 return slice(min(sel),max(sel)) def rebin(self, ebins, min_tbin=0): """ Energy rebin and time rebin Arguments: ebins: list or numpy array energy bin range in units of keV min_tbin: float minimum time bin, shorter time bins are merged Returns: an object containing rebinned light curves """ pass def save(self, filename=None): ''' Save data to a fits file Parameters filename : output fits filename Returns filename if success or error message ''' if not isinstance(self.hdul, fits.hdu.hdulist.HDUList): logger.error('The data object is a not a fits hdu object!') return None try: if filename is None: basename = self.hdul['PRIMARY'].header['FILENAME'] filename = PurePath(net.DOWNLOAD_PATH, basename) self.hdul.writeto(filename) return filename except Exception as e: logger.error(e) def __getattr__(self, name): if name == 'data': return self.hdul elif name == 'type': return self.hdul.get('data_type', 'INVALID_TYPE') elif name == 'filename': return self.fname def get_data(self): return self.hdul
Ancestors
Subclasses
Static methods
def from_fits(filename)
-
factory class Arguments filename: str FITS filename
Expand source code
@classmethod def from_fits(cls, filename): """ factory class Arguments filename: str FITS filename """ request_id = None return cls(request_id, filename)
def from_sdc(request_id)
-
download science data file from stix data center Parameters
request_id
:int
- bulk science data request unique ID; Unique IDs can be found on the science data web page at stix data center
Returns
science data class object
Expand source code
@classmethod def from_sdc(cls, request_id): ''' download science data file from stix data center Parameters ------ request_id : int bulk science data request unique ID; Unique IDs can be found on the science data web page at stix data center Returns ------ science data class object ''' request_id = request_id fname = freq.fetch_bulk_science_by_request_id(request_id) return cls(request_id, fname)
Instance variables
var url
-
Expand source code
@property def url(self): req_id=self.request_id if not isinstance(self.request_id, list) else self.request_id[0] link=f'{net.HOST}/view/list/bsd/uid/{req_id}' return f'<a href="{link}">{link}</a>'
Methods
def get_data(self)
-
Expand source code
def get_data(self): return self.hdul
def get_energy_range_slicer(self, elow, ehigh)
-
Expand source code
def get_energy_range_slicer(self, elow, ehigh): sel=[] i=0 for a, b in zip(self.energies['e_low'], self.energies['e_high']): if a>=elow and b<=ehigh: sel.append(i) i+=1 return slice(min(sel),max(sel))
def is_time_bin_shifted(self, unix_time)
-
Time bins are shifted in the data collected before 2021-12-09 due a bug in the flight software
Check if time bin is shifted in L1 data Parameters unix_time: float Returns is_shifted: bool True if time bin is shifted else False
Expand source code
def is_time_bin_shifted(self, unix_time): """ Time bins are shifted in the data collected before 2021-12-09 due a bug in the flight software Check if time bin is shifted in L1 data Parameters unix_time: float Returns is_shifted: bool True if time bin is shifted else False """ return (unix_time < sdt.utc2unix('2021-12-09T14:00:00'))
def read_fits(self, light_time_correction=True)
-
Read data L1 compression level FITS files Parameters
light_time_correction
:boolean
- Correct light time difference
Expand source code
def read_fits(self, light_time_correction=True): """ Read data L1 compression level FITS files Parameters --------------------- light_time_correction: boolean Correct light time difference """ self.data = self.hdul['DATA'].data self.T0_utc = self.hdul['PRIMARY'].header['DATE_BEG'] self.counts= self.data['counts'] self.light_time_del= self.hdul['PRIMARY'].header['EAR_TDEL'] self.light_time_corrected=light_time_correction self.T0_unix = sdt.utc2unix(self.T0_utc) self.triggers = self.data['triggers'] self.rcr = self.data['rcr'] self.timedel = self.data['timedel'] self.time = self.data['time'] if self.is_time_bin_shifted(self.T0_unix): self.timedel = self.timedel[:-1] self.time = self.time[1:] logger.info('Shifted time bins have been corrected automatically!') if self.data_type=='ScienceL1': self.counts= self.counts[1:, :, :, :] self.triggers = self.triggers[1:, :] self.rcr = self.rcr[1:] elif self.data_type=='Spectrogram': self.counts= self.counts[1:, :] self.triggers = self.triggers[1:] #self.rcr = self.rcr[1:] self.request_id = self.hdul['CONTROL'].data['request_id'] self.time_shift_applied=0 if light_time_correction else self.light_time_del self.datetime = [ sdt.unix2datetime(self.T0_unix + x + y * 0.5 + self.time_shift_applied) for x, y in zip(self.time, self.timedel) ] self.duration = self.time[-1] - self.time[0] + (self.timedel[0] + self.timedel[-1]) / 2 self.energies = self.hdul['ENERGIES'].data self.energy_bin_names = [ f'{a} - {b}' for a, b in zip(self.energies['e_low'], self.energies['e_high']) ] self.energy_bin_mask = self.hdul["CONTROL"].data["energy_bin_mask"] ebin_nz_idx =self.energy_bin_mask.nonzero() self.max_ebin = np.max(ebin_nz_idx) #indices of the non-zero elements self.min_ebin = np.min(ebin_nz_idx) self.ebins_mid = [ (a + b) / 2. for a, b in zip(self.energies['e_low'], self.energies['e_high']) ] self.ebins_low, self.ebins_high = self.energies[ 'e_low'], self.energies['e_high'] if self.data_type=='ScienceL1': self.pixel_counts=self.counts self.pixel_count_rates= self.pixel_counts/self.timedel[:,None,None, None] self.trigger_rates = self.triggers / self.timedel[:, None] elif self.data_type=='Spectrogram': self.count_rates= self.counts/self.timedel[:,None] self.trigger_rates = self.triggers / self.timedel
def rebin(self, ebins, min_tbin=0)
-
Energy rebin and time rebin Arguments: ebins: list or numpy array energy bin range in units of keV min_tbin: float minimum time bin, shorter time bins are merged
Returns
an object containing rebinned light curves
Expand source code
def rebin(self, ebins, min_tbin=0): """ Energy rebin and time rebin Arguments: ebins: list or numpy array energy bin range in units of keV min_tbin: float minimum time bin, shorter time bins are merged Returns: an object containing rebinned light curves """ pass
def save(self, filename=None)
-
Save data to a fits file Parameters filename : output fits filename Returns filename if success or error message
Expand source code
def save(self, filename=None): ''' Save data to a fits file Parameters filename : output fits filename Returns filename if success or error message ''' if not isinstance(self.hdul, fits.hdu.hdulist.HDUList): logger.error('The data object is a not a fits hdu object!') return None try: if filename is None: basename = self.hdul['PRIMARY'].header['FILENAME'] filename = PurePath(net.DOWNLOAD_PATH, basename) self.hdul.writeto(filename) return filename except Exception as e: logger.error(e)
Inherited members
class ScienceL1 (reqeust_id, fname)
-
Tools to analyze L1 science data
Expand source code
class ScienceL1(ScienceData): """ Tools to analyze L1 science data """ def __init__(self, reqeust_id,fname): print(reqeust_id, fname) super().__init__(reqeust_id, fname) self.data_type='ScienceL1' self.pixel_count_rates=None self.correct_pixel_count_rates=None self.read_fits() self.make_spectra() def make_spectra(self, pixel_counts=None): if pixel_counts is None: pixel_counts=self.pixel_counts self.spectrogram = np.sum(pixel_counts, axis=(1, 2)) self.count_rate_spectrogram = self.spectrogram / self.timedel[:, np. newaxis] self.spectrum = np.sum(pixel_counts, axis=(0, 1, 2)) self.mean_pixel_rate_spectra = np.sum(self.pixel_counts, axis=0) / self.duration self.mean_pixel_rate_spectra_err = np.sqrt( self.mean_pixel_rate_spectra) / np.sqrt(self.duration) #sum over all time bins and then divide them by the duration, counts per second def solve_cfl(self, start_utc, end_utc, elow=0, eup=31, ax=None): """calculate flare location using the online flare location solver. Args: start_utc: str ROI start time end_utc: str ROI end time elow: int ROI lower energy limit (science channel). eup: int ROI upper energy limit (science channel). Returns: cfl_loc: dict containing coarse flare location as well as ephemeris and chisquare map """ pass def correct_live_time(self, clone=False): """ Live time correction Returns: scienceL1 object """ trig_tau = FPGA_TAU+ASIC_TAU time_bins = self.time_bins[:, None] photons_in = self.triggers/(time_bins-trig_tau*self.triggers) #photon rate calculated using triggers cm= np.zeros((time_bins.size, 32)) time_bins = time_bins[:, :, None, None] count_rates = self.pixel_counts/time_bins for det in range(32): trig_idx=inst.detector_id_to_trigger_index(det) cm[:,det]= 1 + self.trigger_rates[:,trig_idx]*FPGA_TAU #live time per second self.correct_pixel_count_rates=count_rates*cm[:, :, None, None]/np.exp(-count_rates*ASIC_TAU) return self def peek(self, plots=['spg', 'lc', 'spec', 'tbin', 'qllc'], ax0=None, ax1=None, ax2=None, ax3=None): """ Create quick-look plots for the loaded science data """ if not self.hdul: logger.logger(f'Data not loaded. ') return None if isinstance(plots, str) and plots: plots = plots.split(',') if not self.data_loaded: self.read_fits() self.data_loaded = True if 'spg' in plots: if not ax0: _, ax0 = plt.subplots() X, Y = np.meshgrid(self.time, np.arange(self.min_ebin, self.max_ebin)) im = ax0.pcolormesh( X, Y, np.transpose( self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin] )) #pixel summed energy spectrum ax0.set_yticks( self.energies['channel'][self.min_ebin:self.max_ebin:2]) ax0.set_yticklabels( self.energy_bin_names[self.min_ebin:self.max_ebin:2]) fig = plt.gcf() cbar = fig.colorbar(im, ax=ax0) cbar.set_label('Counts') ax0.set_title('Count rate spectrogram') ax0.set_ylabel('Energy range(keV') ax0.set_xlabel(f"T0 at {self.T0_utc} ") if 'lc' in plots or 'qllc' in plots: if not ax1: _, ax1 = plt.subplots() self.count_rate_spectrogram = self.spectrogram / self.timedel[:, None] if 'qllc' in plots: ql_ebins=[(4, 10),(10 ,15 ),(15 ,25 ),(25 ,50), (50 ,84)] labels=('4 - 10 keV','10 - 15 keV','15 - 25 keV','25 - 50 keV', '50 - 84 keV') ql_sci_ebins=[self.get_energy_range_slicer(s[0],s[1]) for s in ql_ebins] for ebin_slicer,label in zip(ql_sci_ebins, labels): ax1.plot(self.time, np.sum(self.count_rate_spectrogram[:, ebin_slicer], axis=1),label=label) ax1.set_title(f'Detector summed count rates (L1 request #{self.request_id})') else: ax1.plot( self.time, self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin]) #correct ax1.set_ylabel('counts / sec') #plt.legend(self.energy_bin_names, ncol=4) ax1.set_yscale('log') ax1.set_xlabel(f"seconds since {self.T0_utc} ") plt.legend() if 'spec' in plots: if not ax2: _, ax2 = plt.subplots() ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post') #ax.set_xticks(self.data[3].data['channel']) ax2.set_xscale('log') ax2.set_yscale('log') ax2.set_xlabel('Energy (keV)') ax2.set_ylabel('Counts') if 'tbin' in plots: if not ax3: _, ax3 = plt.subplots() ax3.plot(self.time, self.timedel) ax3.set_xlabel(f"T0 at {self.T0_utc} ") ax3.set_ylabel('Integration time (sec)') plt.suptitle(f'L1 request #{self.request_id}') #plt.tight_layout() return ax0, ax1, ax2, ax3
Ancestors
Methods
def correct_live_time(self, clone=False)
-
Live time correction
Returns
scienceL1 object
Expand source code
def correct_live_time(self, clone=False): """ Live time correction Returns: scienceL1 object """ trig_tau = FPGA_TAU+ASIC_TAU time_bins = self.time_bins[:, None] photons_in = self.triggers/(time_bins-trig_tau*self.triggers) #photon rate calculated using triggers cm= np.zeros((time_bins.size, 32)) time_bins = time_bins[:, :, None, None] count_rates = self.pixel_counts/time_bins for det in range(32): trig_idx=inst.detector_id_to_trigger_index(det) cm[:,det]= 1 + self.trigger_rates[:,trig_idx]*FPGA_TAU #live time per second self.correct_pixel_count_rates=count_rates*cm[:, :, None, None]/np.exp(-count_rates*ASIC_TAU) return self
def make_spectra(self, pixel_counts=None)
-
Expand source code
def make_spectra(self, pixel_counts=None): if pixel_counts is None: pixel_counts=self.pixel_counts self.spectrogram = np.sum(pixel_counts, axis=(1, 2)) self.count_rate_spectrogram = self.spectrogram / self.timedel[:, np. newaxis] self.spectrum = np.sum(pixel_counts, axis=(0, 1, 2)) self.mean_pixel_rate_spectra = np.sum(self.pixel_counts, axis=0) / self.duration self.mean_pixel_rate_spectra_err = np.sqrt( self.mean_pixel_rate_spectra) / np.sqrt(self.duration) #sum over all time bins and then divide them by the duration, counts per second
def peek(self, plots=['spg', 'lc', 'spec', 'tbin', 'qllc'], ax0=None, ax1=None, ax2=None, ax3=None)
-
Create quick-look plots for the loaded science data
Expand source code
def peek(self, plots=['spg', 'lc', 'spec', 'tbin', 'qllc'], ax0=None, ax1=None, ax2=None, ax3=None): """ Create quick-look plots for the loaded science data """ if not self.hdul: logger.logger(f'Data not loaded. ') return None if isinstance(plots, str) and plots: plots = plots.split(',') if not self.data_loaded: self.read_fits() self.data_loaded = True if 'spg' in plots: if not ax0: _, ax0 = plt.subplots() X, Y = np.meshgrid(self.time, np.arange(self.min_ebin, self.max_ebin)) im = ax0.pcolormesh( X, Y, np.transpose( self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin] )) #pixel summed energy spectrum ax0.set_yticks( self.energies['channel'][self.min_ebin:self.max_ebin:2]) ax0.set_yticklabels( self.energy_bin_names[self.min_ebin:self.max_ebin:2]) fig = plt.gcf() cbar = fig.colorbar(im, ax=ax0) cbar.set_label('Counts') ax0.set_title('Count rate spectrogram') ax0.set_ylabel('Energy range(keV') ax0.set_xlabel(f"T0 at {self.T0_utc} ") if 'lc' in plots or 'qllc' in plots: if not ax1: _, ax1 = plt.subplots() self.count_rate_spectrogram = self.spectrogram / self.timedel[:, None] if 'qllc' in plots: ql_ebins=[(4, 10),(10 ,15 ),(15 ,25 ),(25 ,50), (50 ,84)] labels=('4 - 10 keV','10 - 15 keV','15 - 25 keV','25 - 50 keV', '50 - 84 keV') ql_sci_ebins=[self.get_energy_range_slicer(s[0],s[1]) for s in ql_ebins] for ebin_slicer,label in zip(ql_sci_ebins, labels): ax1.plot(self.time, np.sum(self.count_rate_spectrogram[:, ebin_slicer], axis=1),label=label) ax1.set_title(f'Detector summed count rates (L1 request #{self.request_id})') else: ax1.plot( self.time, self.count_rate_spectrogram[:, self.min_ebin:self.max_ebin]) #correct ax1.set_ylabel('counts / sec') #plt.legend(self.energy_bin_names, ncol=4) ax1.set_yscale('log') ax1.set_xlabel(f"seconds since {self.T0_utc} ") plt.legend() if 'spec' in plots: if not ax2: _, ax2 = plt.subplots() ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post') #ax.set_xticks(self.data[3].data['channel']) ax2.set_xscale('log') ax2.set_yscale('log') ax2.set_xlabel('Energy (keV)') ax2.set_ylabel('Counts') if 'tbin' in plots: if not ax3: _, ax3 = plt.subplots() ax3.plot(self.time, self.timedel) ax3.set_xlabel(f"T0 at {self.T0_utc} ") ax3.set_ylabel('Integration time (sec)') plt.suptitle(f'L1 request #{self.request_id}') #plt.tight_layout() return ax0, ax1, ax2, ax3
def solve_cfl(self, start_utc, end_utc, elow=0, eup=31, ax=None)
-
calculate flare location using the online flare location solver.
Args
start_utc
- str ROI start time
end_utc
- str ROI end time
elow
- int ROI lower energy limit (science channel).
eup
- int ROI upper energy limit (science channel).
Returns
cfl_loc
- dict containing coarse flare location as well as ephemeris and chisquare map
Expand source code
def solve_cfl(self, start_utc, end_utc, elow=0, eup=31, ax=None): """calculate flare location using the online flare location solver. Args: start_utc: str ROI start time end_utc: str ROI end time elow: int ROI lower energy limit (science channel). eup: int ROI upper energy limit (science channel). Returns: cfl_loc: dict containing coarse flare location as well as ephemeris and chisquare map """ pass
Inherited members
class Spectrogram (reqeust_id, fname)
-
Retrieve science data from stix data center or load fits file from local storage
Expand source code
class Spectrogram(ScienceData): def __init__(self, reqeust_id,fname): super().__init__(reqeust_id, fname) self.data_type='Spectrogram' self.read_fits() self.spectrum = np.sum(self.counts, axis=0) def peek(self, ax0=None, ax1=None, ax2=None, ax3=None): """ preivew Science data Arguments: ax0: matplotlib axe ax0: matplotlib axe """ if not self.hdul: print(f'Data not loaded. ') return None #((ax0, ax1), (ax2, ax3))=axs if not any([ax0, ax1, ax2, ax3]): _, ((ax0, ax1), (ax2, ax3)) = plt.subplots(2, 2, figsize=(8, 6)) if ax0: X, Y = np.meshgrid(self.time, self.hdul['ENERGIES'].data['channel']) im = ax0.pcolormesh(X, Y, np.transpose( self.counts)) #pixel summed energy spectrum ax0.set_yticks(self.hdul['ENERGIES'].data['channel'][::2]) ax0.set_yticklabels(self.energy_bin_names[::2]) fig = plt.gcf() cbar = fig.colorbar(im, ax=ax0) cbar.set_label('Counts') ax0.set_title('Spectrogram') ax0.set_ylabel('Energy range(keV') ax0.set_xlabel(f"Seconds since {self.T0}s ") if ax1: #convert to 2d ax1.plot(self.time, self.count_rates) ax1.set_yscale('log') ax1.set_ylabel('Counts / sec') ax1.set_xlabel(f"Seconds since {self.T0}s ") if ax2: ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post') ax2.set_xscale('log') ax2.set_yscale('log') ax2.set_xlabel('Energy (keV)') ax2.set_ylabel('Counts') if ax3: ax3.plot(self.time, self.timedel) ax3.set_xlabel(f"Seconds since {self.T0}s ") ax3.set_ylabel('Integration time (sec)') plt.suptitle(f'L4 request #{self.request_id}') plt.tight_layout() return fig, ((ax0, ax1), (ax2, ax3))
Ancestors
Methods
def peek(self, ax0=None, ax1=None, ax2=None, ax3=None)
-
preivew Science data Arguments: ax0: matplotlib axe ax0: matplotlib axe
Expand source code
def peek(self, ax0=None, ax1=None, ax2=None, ax3=None): """ preivew Science data Arguments: ax0: matplotlib axe ax0: matplotlib axe """ if not self.hdul: print(f'Data not loaded. ') return None #((ax0, ax1), (ax2, ax3))=axs if not any([ax0, ax1, ax2, ax3]): _, ((ax0, ax1), (ax2, ax3)) = plt.subplots(2, 2, figsize=(8, 6)) if ax0: X, Y = np.meshgrid(self.time, self.hdul['ENERGIES'].data['channel']) im = ax0.pcolormesh(X, Y, np.transpose( self.counts)) #pixel summed energy spectrum ax0.set_yticks(self.hdul['ENERGIES'].data['channel'][::2]) ax0.set_yticklabels(self.energy_bin_names[::2]) fig = plt.gcf() cbar = fig.colorbar(im, ax=ax0) cbar.set_label('Counts') ax0.set_title('Spectrogram') ax0.set_ylabel('Energy range(keV') ax0.set_xlabel(f"Seconds since {self.T0}s ") if ax1: #convert to 2d ax1.plot(self.time, self.count_rates) ax1.set_yscale('log') ax1.set_ylabel('Counts / sec') ax1.set_xlabel(f"Seconds since {self.T0}s ") if ax2: ax2.plot(self.ebins_low, self.spectrum, drawstyle='steps-post') ax2.set_xscale('log') ax2.set_yscale('log') ax2.set_xlabel('Energy (keV)') ax2.set_ylabel('Counts') if ax3: ax3.plot(self.time, self.timedel) ax3.set_xlabel(f"Seconds since {self.T0}s ") ax3.set_ylabel('Integration time (sec)') plt.suptitle(f'L4 request #{self.request_id}') plt.tight_layout() return fig, ((ax0, ax1), (ax2, ax3))
Inherited members