# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""This module defines an observed spectrum, i.e., a source spectrum
that has gone through a bandpass.
import math
import warnings
import numpy as np
from scipy.integrate import trapezoid
from astropy import log
from astropy import units as u
from astropy.utils.exceptions import (AstropyUserWarning,
from synphot import binning, exceptions, units, utils
from synphot.models import Empirical1D
from synphot.spectrum import (BaseSourceSpectrum, SourceSpectrum,
__all__ = ['Observation']
class Observation(BaseSourceSpectrum):
"""This is an observed spectrum, where a source spectrum
has gone through a bandpass.
Usually, this is the end point of a chain of spectral
manipulation. It has extra attributes that deal with binning,
which is introduced by the detector.
spec : `~synphot.spectrum.SourceSpectrum` or `specutils.Spectrum1D`
Source spectrum.
band : `~synphot.spectrum.SpectralElement`
binset : array-like, `~astropy.units.quantity.Quantity`, or `None`
Center of binned wavelengths.
If not a Quantity, assumed to be in Angstrom.
If `None`, input ``self.waveset`` values are used.
force : {`None`, 'none', 'extrap', 'taper'}
Force creation of an observation even when source spectrum
and bandpass do not fully overlap:
* `None` or 'none' - Source must encompass bandpass (default)
* 'extrap' - Extrapolate source spectrum (this changes the
underlying model of ``spec`` to always extrapolate,
if applicable)
* 'taper' - Taper source spectrum
Bandpass does not overlap with source spectrum.
Bandpass only partially overlaps with source spectrum
when they must fully overlap.
Invalid inputs.
Missing binned wavelength set.
def __init__(self, spec, band, binset=None, force='none'):
# Duck-type specutils.Spectrum1D to avoid hard dependency on specutils
if hasattr(spec, 'flux') and hasattr(spec, 'spectral_axis'):
spec = SourceSpectrum.from_spectrum1d(spec)
if not isinstance(spec, SourceSpectrum):
raise exceptions.SynphotError('Invalid source spectrum.')
if not isinstance(band, SpectralElement):
raise exceptions.SynphotError('Invalid bandpass.')
# Inherit input warnings like ASTROLIB PYSYNPHOT
warn = {}
# Validate overlap
if force is None:
force = 'none'
force = force.lower()
stat = band.check_overlap(spec)
if stat == 'none':
raise exceptions.DisjointError(
'Source spectrum and bandpass are disjoint.')
elif 'partial' in stat:
if force == 'none':
raise exceptions.PartialOverlap(
'Source spectrum and bandpass do not fully overlap. '
'You may use force=[extrap|taper] to force this '
'Observation anyway.')
elif force == 'taper':
spec = spec.taper()
msg = 'Source spectrum is tapered.'
warnings.warn(msg, AstropyUserWarning)
warn['PartialOverlap'] = msg
elif force.startswith('extrap'):
if spec.force_extrapolation():
msg = ('Source spectrum will be extrapolated (at constant '
'value for empirical model).')
msg = ('Source spectrum will be evaluated outside '
'pre-defined waveset.')
warnings.warn(msg, AstropyUserWarning)
warn['PartialOverlap'] = msg
raise exceptions.SynphotError(
'force={0} is invalid, must be "none", "taper", '
'or "extrap"'.format(force))
elif stat != 'full': # pragma: no cover
raise exceptions.SynphotError(
'Overlap result of {0} is unexpected'.format(stat))
# Create composite spectrum
super(Observation, self).__init__(spec * band, clean_meta=True)
self._spec = spec
self._band = band
self._force = force
# Merge in other warnings
self.warnings = warn
# Initialize bins
def _init_bins(self, binset):
"""Calculated binned wavelength centers, edges, and flux.
By contrast, the native waveset and flux should be considered
samples of a continuous function.
Thus, it makes sense to interpolate ``self.waveset`` and
``self(self.waveset)``, but not `binset` and `binflux`.
if binset is None:
if self.bandpass.waveset is not None:
self._binset = self.bandpass.waveset
elif self.spectrum.waveset is not None:
self._binset = self.spectrum.waveset
log.info('Bandpass waveset is undefined; '
'Using source spectrum waveset instead.')
raise exceptions.UndefinedBinset(
'Both source spectrum and bandpass have undefined '
'waveset; Provide binset manually.')
self._binset = self._validate_wavelengths(binset)
# binset must be in ascending order for calcbinflux()
# to work properly.
if self._binset[0] > self._binset[-1]:
self._binset = self._binset[::-1]
self._bin_edges = binning.calculate_bin_edges(self._binset)
# Merge bin edges and centers in with the natural waveset
spwave = utils.merge_wavelengths(
self._bin_edges.value, self._binset.value)
if self.waveset is not None:
spwave = utils.merge_wavelengths(spwave, self.waveset.value)
# Throw out invalid wavelengths after merging.
spwave = spwave[spwave > 0]
# Compute indices associated to each endpoint.
indices = np.searchsorted(spwave, self._bin_edges.value)
i_beg = indices[:-1]
i_end = indices[1:]
# Prepare integration variables.
flux = self(spwave)
avflux = (flux.value[1:] + flux.value[:-1]) * 0.5
deltaw = spwave[1:] - spwave[:-1]
# Sum over each bin.
binflux, intwave = binning.calcbinflux(
self._binset.size, i_beg, i_end, avflux, deltaw)
self._binflux = binflux * flux.unit
def spectrum(self):
"""Source spectrum of the observation."""
return self._spec
def bandpass(self):
"""Bandpass of the observation."""
return self._band
def binset(self):
"""Center of binned wavelengths."""
return self._binset
def bin_edges(self):
"""Edges of binned wavelengths."""
return self._bin_edges
def binflux(self):
"""Binned flux corresponding to `binset`."""
return self._binflux
def __mul__(self, other):
"""Multiply self and other."""
sp = self.spectrum * other
obs = self.__class__(
sp, self.bandpass, binset=self.binset, force=self._force)
return obs
def taper(self, **kwargs):
"""Tapering is disabled."""
raise NotImplementedError('Observation cannot be tapered.')
def _validate_binned_wavelengths(self, wave):
if wave is None:
wavelengths = self.binset
wavelengths = self._validate_wavelengths(wave)
return wavelengths
def sample_binned(self, wavelengths=None, flux_unit=None, **kwargs):
"""Sample binned observation without interpolation.
To sample unbinned data, use ``__call__``.
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling.
If not a Quantity, assumed to be in Angstrom.
If `None`, `binset` is used.
flux_unit : str or `~astropy.units.Unit` or `None`
Flux is converted to this unit.
If not given, internal unit is used.
kwargs : dict
Keywords acceptable by :func:`~synphot.units.convert_flux`.
flux : `~astropy.units.quantity.Quantity`
Binned flux in given unit.
Interpolation of binned data is not allowed.
x = self._validate_binned_wavelengths(wavelengths)
i = np.searchsorted(self.binset, x)
if not np.allclose(self.binset[i].value, x.value):
raise exceptions.InterpolationNotAllowed(
'Some or all wavelength values are not in binset.')
y = self.binflux[i]
if flux_unit is None:
flux = y
flux = units.convert_flux(x, y, flux_unit, **kwargs)
return flux
def _get_binned_arrays(self, wavelengths, flux_unit, area=None,
"""Get binned observation in user units."""
x = self._validate_binned_wavelengths(wavelengths)
y = self.sample_binned(wavelengths=x, flux_unit=flux_unit, area=area,
if isinstance(wavelengths, u.Quantity):
w = x.to(wavelengths.unit, u.spectral())
w = x
return w, y
def binned_waverange(self, cenwave, npix, **kwargs):
"""Calculate the wavelength range covered by the given number
of pixels centered on the given central wavelengths of
cenwave : float or `~astropy.units.quantity.Quantity`
Desired central wavelength.
If not a Quantity, assumed to be in Angstrom.
npix : int
Desired number of pixels, centered on ``cenwave``.
kwargs : dict
Keywords accepted by :func:`synphot.binning.wave_range`.
waverange : `~astropy.units.quantity.Quantity`
Lower and upper limits of the wavelength range,
in the unit of ``cenwave``.
# Calculation is done in the unit of cenwave.
if not isinstance(cenwave, u.Quantity):
cenwave = cenwave * self._internal_wave_unit
bin_wave = units.validate_quantity(
self.binset, cenwave.unit, equivalencies=u.spectral())
return binning.wave_range(
bin_wave.value, cenwave.value, npix, **kwargs) * cenwave.unit
def binned_pixelrange(self, waverange, **kwargs):
"""Calculate the number of pixels within the given wavelength
range and `binset`.
waverange : tuple of float or `~astropy.units.quantity.Quantity`
Lower and upper limits of the desired wavelength range.
If not a Quantity, assumed to be in Angstrom.
kwargs : dict
Keywords accepted by :func:`synphot.binning.pixel_range`.
npix : int
Number of pixels.
x = units.validate_quantity(
waverange, self._internal_wave_unit, equivalencies=u.spectral())
return binning.pixel_range(self.binset.value, x.value, **kwargs)
def effective_wavelength(self, binned=True, wavelengths=None,
"""Calculate :ref:`effective wavelength <synphot-formula-effwave>`.
binned : bool
Sample data in native wavelengths if `False`.
Else, sample binned data (default).
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling.
If not a Quantity, assumed to be in Angstrom.
If `None`, ``self.waveset`` or `binset` is used, depending
on ``binned``.
mode : {'efflerg', 'efflphot'}
Flux is first converted to the unit below before calculation:
* 'efflerg' - FLAM
* 'efflphot' - PHOTLAM (deprecated)
eff_lam : `~astropy.units.quantity.Quantity`
Observation effective wavelength.
Invalid mode.
mode = mode.lower()
if mode == 'efflerg':
flux_unit = units.FLAM
elif mode == 'efflphot':
'Usage of EFFLPHOT is deprecated.', AstropyDeprecationWarning)
flux_unit = units.PHOTLAM
raise exceptions.SynphotError(
'mode must be "efflerg" or "efflphot"')
if binned:
x = self._validate_binned_wavelengths(wavelengths).value
y = self.sample_binned(wavelengths=x, flux_unit=flux_unit).value
x = self._validate_wavelengths(wavelengths).value
y = units.convert_flux(x, self(x), flux_unit).value
num = trapezoid(y * x ** 2, x=x)
den = trapezoid(y * x, x=x)
if den == 0.0: # pragma: no cover
eff_lam = 0.0
eff_lam = abs(num / den)
return eff_lam * self._internal_wave_unit
# https://github.com/spacetelescope/synphot_refactor/issues/159
def effstim(self, flux_unit=None, wavelengths=None, area=None,
"""Calculate :ref:`effective stimulus <synphot-formula-effstim>`
for given flux unit.
flux_unit : str or `~astropy.units.Unit` or `None`
The unit of effective stimulus.
COUNT gives result in count/s (see :meth:`countrate` for more
If not given, internal unit is used.
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling. This must be given if
``self.waveset`` is undefined for the underlying spectrum model(s).
If not a Quantity, assumed to be in Angstrom.
If `None`, ``self.waveset`` is used.
area, vegaspec
See :func:`~synphot.units.convert_flux`.
eff_stim : `~astropy.units.quantity.Quantity`
Observation effective stimulus based on given flux unit.
if flux_unit is None:
flux_unit = self._internal_flux_unit
flux_unit = units.validate_unit(flux_unit)
flux_unit_name = flux_unit.to_string()
# Special handling of COUNT/OBMAG.
# This is special case of countrate calculations.
if flux_unit == u.count or flux_unit_name == units.OBMAG.to_string():
val = self.countrate(area, binned=False, wavelengths=wavelengths)
if flux_unit == units.OBMAG:
eff_stim = (-2.5 * np.log10(val.value)) * flux_unit
eff_stim = val
return eff_stim
# Special handling of VEGAMAG.
# This is basically effstim(self)/effstim(Vega)
if flux_unit_name == units.VEGAMAG.to_string():
if not isinstance(vegaspec, SourceSpectrum):
raise exceptions.SynphotError('Vega spectrum is missing.')
num = self.integrate(wavelengths=wavelengths)
den = (vegaspec * self.bandpass).integrate(
return (2.5 * (math.log10(den.value) -
math.log10(num.value))) * units.VEGAMAG
# Sample the bandpass
x_band = self.bandpass._validate_wavelengths(wavelengths).value
y_band = self.bandpass(x_band).value
# Sample the observation in FLAM
inwave = self._validate_wavelengths(wavelengths).value
influx = units.convert_flux(inwave, self(inwave), units.FLAM).value
# Integrate
num = abs(trapezoid(inwave * influx, x=inwave))
den = abs(trapezoid(x_band * y_band, x=x_band))
val = (num / den) * units.FLAM
# Integration should always be done in FLAM and then
# converted to desired units as follows.
if flux_unit.physical_type == 'spectral flux density wav':
if flux_unit == u.STmag:
eff_stim = val.to(flux_unit)
else: # FLAM
eff_stim = val
elif flux_unit.physical_type in (
'spectral flux density', 'photon flux density',
'photon flux density wav'):
w_pivot = self.bandpass.pivot()
eff_stim = units.convert_flux(w_pivot, val, flux_unit)
raise exceptions.SynphotError(
'Flux unit {0} is invalid'.format(flux_unit))
return eff_stim
def countrate(self, area, binned=True, wavelengths=None, waverange=None,
"""Calculate :ref:`effective stimulus <synphot-formula-effstim>`
in count/s.
area : float or `~astropy.units.quantity.Quantity`
Area that flux covers. If not a Quantity, assumed to be in
binned : bool
Sample data in native wavelengths if `False`.
Else, sample binned data (default).
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling. This must be given if
``self.waveset`` is undefined for the underlying spectrum model(s).
If not a Quantity, assumed to be in Angstrom.
If `None`, ``self.waveset`` or `binset` is used, depending
on ``binned``.
waverange : tuple of float, Quantity, or `None`
Lower and upper limits of the desired wavelength range.
If not a Quantity, assumed to be in Angstrom.
If `None`, the full range is used.
force : bool
If a wavelength range is given, partial overlap raises
an exception when this is `False` (default). Otherwise,
it returns calculation for the overlapping region.
Disjoint wavelength range raises an exception regardless.
count_rate : `~astropy.units.quantity.Quantity`
Observation effective stimulus in count/s.
Wavelength range does not overlap with observation.
Wavelength range only partially overlaps with observation.
Calculation failed, including but not limited to NaNs in flux.
# Sample the observation
if binned:
x = self._validate_binned_wavelengths(wavelengths).value
y = self.sample_binned(wavelengths=x, flux_unit=u.count,
x = self._validate_wavelengths(wavelengths).value
y = units.convert_flux(x, self(x), u.count,
# Use entire wavelength range
if waverange is None:
influx = y
# Use given wavelength range
w = units.validate_quantity(waverange, self._internal_wave_unit,
stat = utils.overlap_status(w, x)
w1 = w.min()
w2 = w.max()
if stat == 'none':
raise exceptions.DisjointError(
'Observation and wavelength range are disjoint.')
elif 'partial' in stat:
if force:
'Count rate calculated only for wavelengths in the '
'overlap between observation and given range.',
w1 = max(w1, x.min())
w2 = min(w2, x.max())
raise exceptions.PartialOverlap(
'Observation and wavelength range do not fully '
'overlap. You may use force=True to force this '
'calculation anyway.')
elif stat != 'full': # pragma: no cover
raise exceptions.SynphotError(
'Overlap result of {0} is unexpected'.format(stat))
if binned:
if wavelengths is None:
bin_edges = self.bin_edges.value
bin_edges = binning.calculate_bin_edges(x).value
i1 = np.searchsorted(bin_edges, w1) - 1
i2 = np.searchsorted(bin_edges, w2)
influx = y[i1:i2]
mask = ((x >= w1) & (x <= w2))
influx = y[mask]
val = math.fsum(influx)
return val * (u.count / u.s)
def plot(self, binned=True, wavelengths=None, flux_unit=None, area=None,
vegaspec=None, **kwargs): # pragma: no cover
"""Plot the observation.
.. note:: Uses ``matplotlib``.
binned : bool
Plot data in native wavelengths if `False`.
Else, plot binned data (default).
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling.
If not a Quantity, assumed to be in Angstrom.
If `None`, ``self.waveset`` or `binset` is used, depending
on ``binned``.
flux_unit : str or `~astropy.units.Unit` or `None`
Flux is converted to this unit for plotting.
If not given, internal unit is used.
area, vegaspec
See :func:`~synphot.units.convert_flux`.
kwargs : dict
See :func:`synphot.spectrum.BaseSpectrum.plot`.
Invalid inputs.
if binned:
w, y = self._get_binned_arrays(wavelengths, flux_unit, area=area,
w, y = self._get_arrays(wavelengths, flux_unit=flux_unit,
area=area, vegaspec=vegaspec)
self._do_plot(w, y, **kwargs)
def as_spectrum(self, binned=True, wavelengths=None):
"""Reduce the observation to an empirical source spectrum.
An observation is a complex object with some restrictions
on its capabilities. At times, it would be useful to work
with the observation as a simple object that is easier to
manipulate and takes up less memory.
This is also useful for writing an observation as sampled
spectrum out to a FITS file.
binned : bool
Write out data in native wavelengths if `False`.
Else, write binned data (default).
wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None`
Wavelength values for sampling.
If not a Quantity, assumed to be in Angstrom.
If `None`, ``self.waveset`` or `binset` is used, depending
on ``binned``.
sp : `~synphot.spectrum.SourceSpectrum`
Empirical source spectrum.
if binned:
w, y = self._get_binned_arrays(
wavelengths, self._internal_flux_unit)
w, y = self._get_arrays(
wavelengths, flux_unit=self._internal_flux_unit)
header = {'observation': str(self), 'binned': binned}
return SourceSpectrum(Empirical1D, points=w, lookup_table=y,
meta={'header': header})