Module stixdcpy.correction

This module provides algorithms to correct detector effects Author: Hualin Xiao (hualin.xiao@fhnw.ch) Date: Sep. 1, 2021

Expand source code
#!/usr/bin/python
"""
    This module provides algorithms to correct detector effects
    Author: Hualin Xiao (hualin.xiao@fhnw.ch)
    Date: Sep. 1, 2021

"""
import numpy as np
from matplotlib import pyplot as plt

from stixdcpy import instrument as inst
from stixdcpy import science
from stixdcpy import time as sdt
from stixdcpy.logger import logger

DETECTOR_GROUPS = [[1, 2], [6, 7], [5, 11], [12, 13], [14, 15], [10, 16],
                   [8, 9], [3, 4], [31, 32], [26, 27], [22, 28], [20, 21],
                   [18, 19], [17, 23], [24, 25], [29, 30]]
DET_SIBLINGS = {
    0: 1,
    1: 0,
    5: 6,
    6: 5,
    4: 10,
    10: 4,
    11: 12,
    12: 11,
    13: 14,
    14: 13,
    9: 15,
    15: 9,
    7: 8,
    8: 7,
    2: 3,
    3: 2,
    30: 31,
    31: 30,
    25: 26,
    26: 25,
    21: 27,
    27: 21,
    19: 20,
    20: 19,
    17: 18,
    18: 17,
    16: 22,
    22: 16,
    23: 24,
    24: 23,
    28: 29,
    29: 28
}

# detector sibling index


class BackgroundSubtraction(object):
    def __init__(self, l1sig: science.ScienceL1, l1bkg: science.ScienceL1):
        """
                   do background subtraction
                Arguments
                l1sig: a L1product instance containing the signal
                l1bkg: a L1Product instance containing the background

                """
        self.l1sig = l1sig
        self.l1bkg = l1bkg

        dmask = self.l1bkg.energy_bin_mask - self.l1sig.energy_bin_mask
        if np.any(dmask < 0):
            logger.error(
                'Background subtraction failed due to the background energy range does not cover the signal energy range  '
            )
            return

        #mean_pixel_rate_clip = self.l1bkg.mean_pixel_rate_spectra * self.l1sig.inverse_energy_bin_mask

        self.pixel_bkg_counts = np.array([
            int_time * self.l1bkg.mean_pixel_rate_spectra
            for int_time in self.l1sig.timedel
        ])
        # set counts beyond the signal energy range to 0
        self.subtracted_counts = (self.l1sig.counts - self.pixel_bkg_counts
                                  ) * self.l1sig.inverse_energy_bin_mask

        # Dead time correction needs to be included in the future
        self.subtracted_counts_err = np.sqrt(
            self.l1sig.counts + np.array([int_time * self.l1bkg.mean_pixel_rate_spectra_err ** 2 for int_time in self.l1sig.timedel])) * \
            self.l1sig.inverse_energy_bin_mask
        self.bkg_subtracted_spectrogram = np.sum(self.subtracted_counts,
                                                 axis=(1, 2))

    def peek(self):
        fig, axs = plt.subplots(2, 2)
        self.l1sig.peek(axs[0, 0])
        self.l1bkg.peek(axs[0, 1])
        X, Y = np.meshgrid(self.l1sig.time,
                           np.arange(self.l1sig.min_ebin, self.l1sig.max_ebin))
        im = axs[1, 0].pcolormesh(
            X, Y,
            np.transpose(
                self.bkg_subtracted_spectrogram[:, self.l1sig.min_ebin:self.
                                                l1sig.max_ebin]))
        axs[1, 0].set_yticks(self.l1sig.energies['channel']
                             [self.l1sig.min_ebin:self.l1sig.max_ebin:2])
        axs[1, 0].set_yticklabels(
            self.l1sig.energy_bin_names[self.l1sig.min_ebin:self.l1sig.
                                        max_ebin:2])
        fig = plt.gcf()
        cbar = fig.colorbar(im, ax=axs[1, 0])
        cbar.set_label('Counts')
        axs[1, 0].set_title('Bkg sub. counts')
        axs[1, 0].set_ylabel('Energy range(keV')
        axs[1, 0].set_xlabel(f"Seconds since {self.l1sig.T0}s ")
        axs[1, 1].plot(np.sum(self.l1sig.spectrogram, axis=0),
                       drawstyle='steps-mid',
                       label='Before subtraction')
        axs[1, 1].plot(np.sum(self.bkg_subtracted_spectrogram, axis=0),
                       drawstyle="steps-mid",
                       label='After subtraction')
        axs[1, 1].plot(np.sum(self.pixel_bkg_counts, axis=(0, 1, 2)),
                       drawstyle="steps-mid",
                       label='background')
        axs[1, 1].legend()

    def get_background_subtracted_spectrum(self, start_utc=None, end_utc=None):
        """
        Get signal background subtracted spectrum

        """
        start_unix = sdt.utc2unix(start_utc)
        end_unix = sdt.utc2unix(end_utc)
        start_time = start_unix - self.l1sig.T0_unix
        end_time = end_unix - self.l1sig.T0_unix
        start_i_tbin = np.argmax(
            self.l1sig.time - 0.5 * self.l1sig.timedel >= start_time) if (
                0 <= start_time <= self.l1sig.duration) else 0

        end_i_tbin = np.argmin(
            self.l1sig.time + 0.5 * self.l1sig.timedel <= end_time) if (
                start_time <= end_time <= self.l1sig.duration) else len(
                    self.l1sig.time)
        time_span = self.l1sig.time[end_i_tbin] - self.l1sig.time[
            start_i_tbin] + 0.5 * self.l1sig.timedel[
                start_i_tbin] + 0.5 * self.l1sig.timedel[end_i_tbin]

        bkg_sub_spectra = np.sum(
            self.subtracted_counts[start_i_tbin:end_i_tbin, :, :, :],
            axis=(0, 1, 2)) / time_span,
        bkg_sub_spectra_err = np.sqrt(
            np.sum(self.subtracted_counts_err[start_i_tbin:end_i_tbin, :, :, :]
                   ** 2,
                   axis=(0, 1, 2))) / time_span
        return bkg_sub_spectra, bkg_sub_spectra_err


class LiveTimeCorrection(object):
    pass
    """
    #counts is np.array   time_bins, detector, pixel, energy bins
    trigger_rates=l1data['triggers'][1:,:]/l1data['timedel'][:-1,None]
    # delta time is off by 1 time bin due a bug in the
    out=np.copy(trigger_rates)
    tau=11e-6
    live_time=1 - tau*trig
    photo_in=trig/(live_time)
        """

    @classmethod
    def L1_live_time_correction(cls, triggers, counts_arr, time_bins):
        """ Live time correction
        Args
            triggers: ndarray
                triggers in the spectrogram
            counts_arr:ndarray
                counts in the spectrogram
            time_bins: ndarray
                time_bins in the spectrogram
        Returns
        live_time_ratio: ndarray
            live time ratio of detectors
        count_rate:
            corrected count rate
        photons_in:
            rate of photons illuminating the detector group

        """

        fpga_tau=10.1e-6
        asic_tau=2.63e-6
        trig_tau = fpga_tau+asic_tau

        time_bins = time_bins[:, None]
        photons_in = triggers/(time_bins-trig_tau*triggers)
        #photon rate calculated using triggers 

        correction_factors= np.zeros((time_bins.size, 32))
        time_bins = time_bins[:, :, None, None]

        time_bins = time_bins[:, :, None, None]
        count_rates = counts_arr/time_bins
        # print(counts_arr.shape)
        for det in range(32):
            trig_idx=inst.detector_id_to_trigger_index[det]
            nin=photons_in[:,trig_idx]
            cor_factor=0.94
            live_ratio[:,det]=np.exp(- cor_factor*nin*asic_tau*1e-6)/(1+ nin*trig_tau)

        return correction_factors, corrected_rates , count_rates,photons_in



class TransmissionCorrection(object):
    pass

Classes

class BackgroundSubtraction (l1sig: ScienceL1, l1bkg: ScienceL1)

do background subtraction Arguments l1sig: a L1product instance containing the signal l1bkg: a L1Product instance containing the background

Expand source code
class BackgroundSubtraction(object):
    def __init__(self, l1sig: science.ScienceL1, l1bkg: science.ScienceL1):
        """
                   do background subtraction
                Arguments
                l1sig: a L1product instance containing the signal
                l1bkg: a L1Product instance containing the background

                """
        self.l1sig = l1sig
        self.l1bkg = l1bkg

        dmask = self.l1bkg.energy_bin_mask - self.l1sig.energy_bin_mask
        if np.any(dmask < 0):
            logger.error(
                'Background subtraction failed due to the background energy range does not cover the signal energy range  '
            )
            return

        #mean_pixel_rate_clip = self.l1bkg.mean_pixel_rate_spectra * self.l1sig.inverse_energy_bin_mask

        self.pixel_bkg_counts = np.array([
            int_time * self.l1bkg.mean_pixel_rate_spectra
            for int_time in self.l1sig.timedel
        ])
        # set counts beyond the signal energy range to 0
        self.subtracted_counts = (self.l1sig.counts - self.pixel_bkg_counts
                                  ) * self.l1sig.inverse_energy_bin_mask

        # Dead time correction needs to be included in the future
        self.subtracted_counts_err = np.sqrt(
            self.l1sig.counts + np.array([int_time * self.l1bkg.mean_pixel_rate_spectra_err ** 2 for int_time in self.l1sig.timedel])) * \
            self.l1sig.inverse_energy_bin_mask
        self.bkg_subtracted_spectrogram = np.sum(self.subtracted_counts,
                                                 axis=(1, 2))

    def peek(self):
        fig, axs = plt.subplots(2, 2)
        self.l1sig.peek(axs[0, 0])
        self.l1bkg.peek(axs[0, 1])
        X, Y = np.meshgrid(self.l1sig.time,
                           np.arange(self.l1sig.min_ebin, self.l1sig.max_ebin))
        im = axs[1, 0].pcolormesh(
            X, Y,
            np.transpose(
                self.bkg_subtracted_spectrogram[:, self.l1sig.min_ebin:self.
                                                l1sig.max_ebin]))
        axs[1, 0].set_yticks(self.l1sig.energies['channel']
                             [self.l1sig.min_ebin:self.l1sig.max_ebin:2])
        axs[1, 0].set_yticklabels(
            self.l1sig.energy_bin_names[self.l1sig.min_ebin:self.l1sig.
                                        max_ebin:2])
        fig = plt.gcf()
        cbar = fig.colorbar(im, ax=axs[1, 0])
        cbar.set_label('Counts')
        axs[1, 0].set_title('Bkg sub. counts')
        axs[1, 0].set_ylabel('Energy range(keV')
        axs[1, 0].set_xlabel(f"Seconds since {self.l1sig.T0}s ")
        axs[1, 1].plot(np.sum(self.l1sig.spectrogram, axis=0),
                       drawstyle='steps-mid',
                       label='Before subtraction')
        axs[1, 1].plot(np.sum(self.bkg_subtracted_spectrogram, axis=0),
                       drawstyle="steps-mid",
                       label='After subtraction')
        axs[1, 1].plot(np.sum(self.pixel_bkg_counts, axis=(0, 1, 2)),
                       drawstyle="steps-mid",
                       label='background')
        axs[1, 1].legend()

    def get_background_subtracted_spectrum(self, start_utc=None, end_utc=None):
        """
        Get signal background subtracted spectrum

        """
        start_unix = sdt.utc2unix(start_utc)
        end_unix = sdt.utc2unix(end_utc)
        start_time = start_unix - self.l1sig.T0_unix
        end_time = end_unix - self.l1sig.T0_unix
        start_i_tbin = np.argmax(
            self.l1sig.time - 0.5 * self.l1sig.timedel >= start_time) if (
                0 <= start_time <= self.l1sig.duration) else 0

        end_i_tbin = np.argmin(
            self.l1sig.time + 0.5 * self.l1sig.timedel <= end_time) if (
                start_time <= end_time <= self.l1sig.duration) else len(
                    self.l1sig.time)
        time_span = self.l1sig.time[end_i_tbin] - self.l1sig.time[
            start_i_tbin] + 0.5 * self.l1sig.timedel[
                start_i_tbin] + 0.5 * self.l1sig.timedel[end_i_tbin]

        bkg_sub_spectra = np.sum(
            self.subtracted_counts[start_i_tbin:end_i_tbin, :, :, :],
            axis=(0, 1, 2)) / time_span,
        bkg_sub_spectra_err = np.sqrt(
            np.sum(self.subtracted_counts_err[start_i_tbin:end_i_tbin, :, :, :]
                   ** 2,
                   axis=(0, 1, 2))) / time_span
        return bkg_sub_spectra, bkg_sub_spectra_err

Methods

def get_background_subtracted_spectrum(self, start_utc=None, end_utc=None)

Get signal background subtracted spectrum

Expand source code
def get_background_subtracted_spectrum(self, start_utc=None, end_utc=None):
    """
    Get signal background subtracted spectrum

    """
    start_unix = sdt.utc2unix(start_utc)
    end_unix = sdt.utc2unix(end_utc)
    start_time = start_unix - self.l1sig.T0_unix
    end_time = end_unix - self.l1sig.T0_unix
    start_i_tbin = np.argmax(
        self.l1sig.time - 0.5 * self.l1sig.timedel >= start_time) if (
            0 <= start_time <= self.l1sig.duration) else 0

    end_i_tbin = np.argmin(
        self.l1sig.time + 0.5 * self.l1sig.timedel <= end_time) if (
            start_time <= end_time <= self.l1sig.duration) else len(
                self.l1sig.time)
    time_span = self.l1sig.time[end_i_tbin] - self.l1sig.time[
        start_i_tbin] + 0.5 * self.l1sig.timedel[
            start_i_tbin] + 0.5 * self.l1sig.timedel[end_i_tbin]

    bkg_sub_spectra = np.sum(
        self.subtracted_counts[start_i_tbin:end_i_tbin, :, :, :],
        axis=(0, 1, 2)) / time_span,
    bkg_sub_spectra_err = np.sqrt(
        np.sum(self.subtracted_counts_err[start_i_tbin:end_i_tbin, :, :, :]
               ** 2,
               axis=(0, 1, 2))) / time_span
    return bkg_sub_spectra, bkg_sub_spectra_err
def peek(self)
Expand source code
def peek(self):
    fig, axs = plt.subplots(2, 2)
    self.l1sig.peek(axs[0, 0])
    self.l1bkg.peek(axs[0, 1])
    X, Y = np.meshgrid(self.l1sig.time,
                       np.arange(self.l1sig.min_ebin, self.l1sig.max_ebin))
    im = axs[1, 0].pcolormesh(
        X, Y,
        np.transpose(
            self.bkg_subtracted_spectrogram[:, self.l1sig.min_ebin:self.
                                            l1sig.max_ebin]))
    axs[1, 0].set_yticks(self.l1sig.energies['channel']
                         [self.l1sig.min_ebin:self.l1sig.max_ebin:2])
    axs[1, 0].set_yticklabels(
        self.l1sig.energy_bin_names[self.l1sig.min_ebin:self.l1sig.
                                    max_ebin:2])
    fig = plt.gcf()
    cbar = fig.colorbar(im, ax=axs[1, 0])
    cbar.set_label('Counts')
    axs[1, 0].set_title('Bkg sub. counts')
    axs[1, 0].set_ylabel('Energy range(keV')
    axs[1, 0].set_xlabel(f"Seconds since {self.l1sig.T0}s ")
    axs[1, 1].plot(np.sum(self.l1sig.spectrogram, axis=0),
                   drawstyle='steps-mid',
                   label='Before subtraction')
    axs[1, 1].plot(np.sum(self.bkg_subtracted_spectrogram, axis=0),
                   drawstyle="steps-mid",
                   label='After subtraction')
    axs[1, 1].plot(np.sum(self.pixel_bkg_counts, axis=(0, 1, 2)),
                   drawstyle="steps-mid",
                   label='background')
    axs[1, 1].legend()
class LiveTimeCorrection
Expand source code
class LiveTimeCorrection(object):
    pass
    """
    #counts is np.array   time_bins, detector, pixel, energy bins
    trigger_rates=l1data['triggers'][1:,:]/l1data['timedel'][:-1,None]
    # delta time is off by 1 time bin due a bug in the
    out=np.copy(trigger_rates)
    tau=11e-6
    live_time=1 - tau*trig
    photo_in=trig/(live_time)
        """

    @classmethod
    def L1_live_time_correction(cls, triggers, counts_arr, time_bins):
        """ Live time correction
        Args
            triggers: ndarray
                triggers in the spectrogram
            counts_arr:ndarray
                counts in the spectrogram
            time_bins: ndarray
                time_bins in the spectrogram
        Returns
        live_time_ratio: ndarray
            live time ratio of detectors
        count_rate:
            corrected count rate
        photons_in:
            rate of photons illuminating the detector group

        """

        fpga_tau=10.1e-6
        asic_tau=2.63e-6
        trig_tau = fpga_tau+asic_tau

        time_bins = time_bins[:, None]
        photons_in = triggers/(time_bins-trig_tau*triggers)
        #photon rate calculated using triggers 

        correction_factors= np.zeros((time_bins.size, 32))
        time_bins = time_bins[:, :, None, None]

        time_bins = time_bins[:, :, None, None]
        count_rates = counts_arr/time_bins
        # print(counts_arr.shape)
        for det in range(32):
            trig_idx=inst.detector_id_to_trigger_index[det]
            nin=photons_in[:,trig_idx]
            cor_factor=0.94
            live_ratio[:,det]=np.exp(- cor_factor*nin*asic_tau*1e-6)/(1+ nin*trig_tau)

        return correction_factors, corrected_rates , count_rates,photons_in

Static methods

def L1_live_time_correction(triggers, counts_arr, time_bins)

Live time correction Args triggers: ndarray triggers in the spectrogram counts_arr:ndarray counts in the spectrogram time_bins: ndarray time_bins in the spectrogram Returns live_time_ratio: ndarray live time ratio of detectors count_rate: corrected count rate photons_in: rate of photons illuminating the detector group

Expand source code
@classmethod
def L1_live_time_correction(cls, triggers, counts_arr, time_bins):
    """ Live time correction
    Args
        triggers: ndarray
            triggers in the spectrogram
        counts_arr:ndarray
            counts in the spectrogram
        time_bins: ndarray
            time_bins in the spectrogram
    Returns
    live_time_ratio: ndarray
        live time ratio of detectors
    count_rate:
        corrected count rate
    photons_in:
        rate of photons illuminating the detector group

    """

    fpga_tau=10.1e-6
    asic_tau=2.63e-6
    trig_tau = fpga_tau+asic_tau

    time_bins = time_bins[:, None]
    photons_in = triggers/(time_bins-trig_tau*triggers)
    #photon rate calculated using triggers 

    correction_factors= np.zeros((time_bins.size, 32))
    time_bins = time_bins[:, :, None, None]

    time_bins = time_bins[:, :, None, None]
    count_rates = counts_arr/time_bins
    # print(counts_arr.shape)
    for det in range(32):
        trig_idx=inst.detector_id_to_trigger_index[det]
        nin=photons_in[:,trig_idx]
        cor_factor=0.94
        live_ratio[:,det]=np.exp(- cor_factor*nin*asic_tau*1e-6)/(1+ nin*trig_tau)

    return correction_factors, corrected_rates , count_rates,photons_in
class TransmissionCorrection
Expand source code
class TransmissionCorrection(object):
    pass