Source code for synphot.models

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spectrum models not in `astropy.modeling`."""

# STDLIB
import math
import warnings
from copy import deepcopy
from functools import partial

# THIRD-PARTY
import numpy as np

# ASTROPY
from astropy import constants as const
from astropy import units as u
from astropy.modeling import Fittable1DModel, Model, Parameter
from astropy.modeling import models as _models
from astropy.modeling.models import (RickerWavelet1D as _RickerWavelet1D,
                                     Tabular1D)
from astropy.stats.funcs import gaussian_fwhm_to_sigma, gaussian_sigma_to_fwhm
from astropy.utils import metadata
from astropy.utils.exceptions import AstropyUserWarning

# LOCAL
from synphot import units
from synphot.exceptions import SynphotError
from synphot.utils import merge_wavelengths

__all__ = ['BlackBody1D', 'BlackBodyNorm1D', 'Box1D', 'ConstFlux1D',
           'Empirical1D', 'Gaussian1D', 'GaussianAbsorption1D',
           'GaussianFlux1D', 'Lorentz1D', 'MexicanHat1D', 'RickerWavelet1D',
           'PowerLawFlux1D', 'Trapezoid1D', 'get_waveset', 'get_metadata']


[docs] class BlackBody1D(Fittable1DModel): """Create a :ref:`blackbody spectrum <synphot-planck-law>` model with given temperature. Parameters ---------- temperature : float Blackbody temperature in Kelvin. """ temperature = Parameter(default=5000) def __init__(self, *args, **kwargs): super(BlackBody1D, self).__init__(*args, **kwargs) self.meta['expr'] = 'bb({0})'.format(self.temperature.value) @property def lambda_max(self): """Peak wavelength in Angstrom when the curve is expressed as power density.""" return ((const.b_wien.value / self.temperature) * u.m).to_value(u.AA) def bounding_box(self, factor=10.0): """Tuple defining the default ``bounding_box`` limits, ``(x_low, x_high)``. .. math:: x_{\\mathrm{low}} = 0 x_{\\mathrm{high}} = \\log(\\lambda_{\\mathrm{max}} \\;\ (1 + \\mathrm{factor})) Parameters ---------- factor : float Used to calculate ``x_high``. """ w0 = self.lambda_max return (w0 * 0, np.log10(w0 + factor * w0))
[docs] def sampleset(self, factor_bbox=10.0, num=1000): """Return ``x`` array that samples the feature. Parameters ---------- factor_bbox : float Factor for ``bounding_box`` calculations. num : int Number of points to generate. """ w1, w2 = self.bounding_box(factor=factor_bbox) if self._n_models == 1: w = np.logspace(w1, w2, num) else: w = list(map(partial(np.logspace, num=num), w1, w2)) return np.asarray(w)
[docs] @staticmethod def evaluate(x, temperature): """Evaluate the model. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. temperature : number Temperature in Kelvin. Returns ------- y : number or ndarray Blackbody radiation in PHOTLAM per steradian. """ from synphot.blackbody import blackbody_nu # Silence Numpy old_np_err_cfg = np.seterr(all='ignore') wave = np.ascontiguousarray(x) * u.AA bbnu_flux = blackbody_nu(wave, temperature) bbflux = (bbnu_flux * u.sr).to( units.PHOTLAM, u.spectral_density(wave)) / u.sr # PHOTLAM/sr # Restore Numpy settings np.seterr(**old_np_err_cfg) return bbflux.value
[docs] def integrate(self, *args): with u.add_enabled_equivalencies(u.temperature()): t = u.Quantity(self.temperature, u.K) return (const.sigma_sb * t ** 4 / math.pi) # per steradian
[docs] class BlackBodyNorm1D(BlackBody1D): """Create a normalized :ref:`blackbody spectrum <synphot-planck-law>` with given temperature. It is normalized by multiplying `BlackBody1D` result with a solid angle, :math:`\\Omega`, as defined below, where :math:`d` is 1 kpc: .. math:: \\Omega = \\frac{\\pi R_{\\mathrm{Sun}}^{2}}{d^{2}} Parameters ---------- temperature : float Blackbody temperature in Kelvin. """ def __init__(self, *args, **kwargs): super(BlackBodyNorm1D, self).__init__(*args, **kwargs) self._omega = np.pi * (const.R_sun / const.kpc).value ** 2 # steradian
[docs] def evaluate(self, x, temperature): """Evaluate the model. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. temperature : number Temperature in Kelvin. Returns ------- y : number or ndarray Blackbody radiation in PHOTLAM. """ bbflux = super(BlackBodyNorm1D, self).evaluate(x, temperature) return bbflux * self._omega
[docs] def integrate(self, *args): return super().integrate(*args) * self._omega
[docs] class Box1D(_models.Box1D): """Same as `astropy.modeling.functional_models.Box1D`, except with ``sampleset`` defined. Parameters ---------- step : float Distance of first and last points w.r.t. bounding box. In default units (nominally Angstrom). Defaults to 0.01 """ def __init__(self, *args, **kwargs): if "step" in kwargs: self.step = kwargs.pop("step") else: self.step = 0.01 super(Box1D, self).__init__(*args, **kwargs) @staticmethod def _calc_sampleset(w1, w2, step, minimal): """Calculate sampleset for each model.""" if minimal: arr = [w1 - step, w1, w2, w2 + step] else: arr = np.arange(w1 - step, w2 + step + step, step) return arr
[docs] def sampleset(self, step=None, minimal=False): """Return ``x`` array that samples the feature. Parameters ---------- step : float, optional Distance of first and last points w.r.t. bounding box. If None, set to attribute ``step``. minimal : bool Only return the minimal points needed to define the box; i.e., box edges and a point outside on each side. """ if step is None: step = self.step w1, w2 = tuple(self.bounding_box.bounding_box()) if self._n_models == 1: w = self._calc_sampleset(w1, w2, step, minimal) else: w = list(map(partial( self._calc_sampleset, step=step, minimal=minimal), w1, w2)) return np.asarray(w)
[docs] def integrate(self, *args): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): w = u.Quantity(self.width, u.AA) return self.amplitude * w
[docs] class ConstFlux1D(_models.Const1D): """One dimensional constant flux model. Flux that is constant in a given unit might not be constant in another unit. During evaluation, flux is always converted to PHOTLAM. For multiple ``n_models``, this model only accepts amplitudes of the same flux unit; e.g., ``[1, 2]`` or ``Quantity([1, 2], 'photlam')``. Parameters ---------- amplitude : number or `~astropy.units.quantity.Quantity` Value and unit of the constant function. If not Quantity, assume the unit of PHOTLAM. """ def __init__(self, amplitude, **kwargs): if not isinstance(amplitude, u.Quantity): amplitude = amplitude * units.PHOTLAM if amplitude.unit == u.STmag: a = units.convert_flux(1, amplitude, units.FLAM) elif amplitude.unit == u.ABmag: a = units.convert_flux(1, amplitude, units.FNU) elif (amplitude.unit.physical_type in ('spectral flux density', 'spectral flux density wav', 'photon flux density', 'photon flux density wav')): a = amplitude else: raise NotImplementedError( '{0} not supported.'.format(amplitude.unit)) self._flux_unit = a.unit super(ConstFlux1D, self).__init__(amplitude=a.value, **kwargs)
[docs] def evaluate(self, x, *args): """One dimensional constant flux model function. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. Returns ------- y : number or ndarray Flux in PHOTLAM. """ a = (self.amplitude * np.ones_like(x)) * self._flux_unit y = units.convert_flux(x, a, units.PHOTLAM) return y.value
[docs] def integrate(self, x): # TODO: Remove unit hardcoding when we use model with units natively. # TODO: We do not handle wav_unit as wave number nor energy for now. if any(['wav' in t for t in self._flux_unit.physical_type]): wav_unit = u.AA else: wav_unit = u.Hz with u.add_enabled_equivalencies(u.spectral()): x = u.Quantity(x, wav_unit) amp = u.Quantity(self.amplitude, self._flux_unit) return (max(x) - min(x)) * amp
[docs] class Empirical1D(Tabular1D): """Empirical (sampled) spectrum or bandpass model. .. note:: This model requires `SciPy <https://www.scipy.org>`_ 0.14 or later to be installed. Parameters ---------- keep_neg : bool Convert negative ``lookup_table`` values to zeroes? This is to be consistent with ASTROLIB PYSYNPHOT. kwargs : dict Keywords for `~astropy.modeling.tabular.Tabular1D` model creation or :func:`~scipy.interpolate.interpn`. When ``fill_value=np.nan`` is given, extrapolation is done based on nearest end points on each end; This is the default behavior. """ def __init__(self, **kwargs): # Manually insert user metadata here to accomodate any warning # from self._process_neg_flux() meta = kwargs.pop('meta', {}) self.meta = meta if 'warnings' not in self.meta: self.meta['warnings'] = {} x = kwargs['points'] y = kwargs['lookup_table'] # Points can only be ascending for interpn() if x[-1] < x[0]: x = x[::-1] y = y[::-1] kwargs['points'] = x # Handle negative flux keep_neg = kwargs.pop('keep_neg', False) self._keep_neg = keep_neg y = self._process_neg_flux(x, y) kwargs['lookup_table'] = y super(Empirical1D, self).__init__(**kwargs) # Set non-default interpolation default values. # For tapered model, just fill with zero; # Otherwise, extrapolate like ASTROLIB PYSYNPHOT. self.bounds_error = kwargs.get('bounds_error', False) if self.is_tapered(): self.fill_value = kwargs.get('fill_value', 0) else: self.fill_value = kwargs.get('fill_value', np.nan) def _process_neg_flux(self, x, y): """Remove negative flux.""" if self._keep_neg: # Nothing to do return y old_y = None if np.isscalar(y): # pragma: no cover if y < 0: n_neg = 1 old_x = x old_y = y y = 0 else: x = np.asarray(x) # In case input is just pure list y = np.asarray(y) i = np.where(y < 0) n_neg = len(i[0]) if n_neg > 0: old_x = x[i] old_y = y[i] y[i] = 0 if old_y is not None: warn_str = ('{0} bin(s) contained negative flux or throughput' '; it/they will be set to zero.'.format(n_neg)) warn_str += '\n points: {0}\n lookup_table: {1}'.format( old_x, old_y) # Extra info self.meta['warnings'].update({'NegativeFlux': warn_str}) warnings.warn(warn_str, AstropyUserWarning) return y
[docs] def is_tapered(self): return np.array_equal( self.lookup_table[::self.lookup_table.size - 1], [0, 0])
[docs] def sampleset(self): """Return array that samples the feature.""" return np.squeeze(self.points)
[docs] def evaluate(self, inputs): """Evaluate the model. Parameters ---------- inputs : number or ndarray Wavelengths in same unit as ``points``. Returns ------- y : number or ndarray Flux or throughput in same unit as ``lookup_table``. """ y = super(Empirical1D, self).evaluate(inputs) # Assume NaN at both ends need to be extrapolated based on # nearest end point. if self.fill_value is np.nan: # Cannot use sampleset() due to ExtinctionModel1D. x = np.squeeze(self.points) # np.squeeze may throw unit away. if (isinstance(self.points, tuple) and isinstance(self.points[0], u.Quantity) and not isinstance(x, u.Quantity)): x = x * self.points[0].unit if np.isscalar(y): # pragma: no cover if inputs < x[0]: y = self.lookup_table[0] elif inputs > x[-1]: y = self.lookup_table[-1] else: y[inputs < x[0]] = self.lookup_table[0] y[inputs > x[-1]] = self.lookup_table[-1] return self._process_neg_flux(inputs, y)
class BaseGaussian1D(_models.Gaussian1D): """Same as `astropy.modeling.functional_models.BaseGaussian1D`, except with ``sampleset`` defined. """ _sqrt_2_pi = math.sqrt(2 * math.pi) def sampleset(self, factor_step=0.1, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.stddev``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. Default ``factor`` is set to 5 to be compatible with ASTROLIB PYSYNPHOT. """ if 'factor' not in kwargs: kwargs['factor'] = 5.0 w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.stddev if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs] class Gaussian1D(BaseGaussian1D): """Same as `astropy.modeling.functional_models.Gaussian1D`, except with ``sampleset`` defined. """
[docs] def integrate(self, *args): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): stddev = u.Quantity(self.stddev, u.AA) return self.amplitude * stddev * self._sqrt_2_pi
# TODO: Deprecate this? # This is not really supported anymore but kept for backward compatibility.
[docs] class GaussianAbsorption1D(BaseGaussian1D): """Same as ``astropy.modeling.functional_models.GaussianAbsorption1D``, except with ``sampleset`` defined. """
[docs] @staticmethod def evaluate(x, amplitude, mean, stddev): """ GaussianAbsorption1D model function. """ return 1.0 - Gaussian1D.evaluate(x, amplitude, mean, stddev)
[docs] @staticmethod def fit_deriv(x, amplitude, mean, stddev): """ GaussianAbsorption1D model function derivatives. """ import operator return list(map( operator.neg, Gaussian1D.fit_deriv(x, amplitude, mean, stddev)))
[docs] class GaussianFlux1D(Gaussian1D): """Same as `Gaussian1D` but accepts extra keywords below. Parameters ---------- amplitude : float Amplitude of the Gaussian in PHOTLAM. Also see ``total_flux``. mean : float Mean of the Gaussian in Angstrom. stddev : float Standard deviation of the Gaussian in Angstrom. Also see ``fwhm``. fwhm : float Full width at half maximum of the Gaussian in Angstrom. If given, this overrides ``stddev``. total_flux : float Total flux under the Gaussian in ``erg/s/cm^2``. If given, this overrides ``amplitude``. """ def __init__(self, *args, **kwargs): fwhm = kwargs.pop('fwhm', None) total_flux = kwargs.pop('total_flux', None) super(GaussianFlux1D, self).__init__(*args, **kwargs) if fwhm is None: fwhm = self.stddev * gaussian_sigma_to_fwhm else: self.stddev = fwhm * gaussian_fwhm_to_sigma gaussian_amp_to_totflux = self._sqrt_2_pi * self.stddev if total_flux is None: u_str = 'PHOTLAM' total_flux = self.amplitude * gaussian_amp_to_totflux else: u_str = 'FLAM' # total_flux is passed in unaltered, any conversion error would # happen here. tf_unit = u.erg / (u.cm * u.cm * u.s) if isinstance(total_flux, u.Quantity): total_flux = total_flux.to(tf_unit) else: total_flux = total_flux * tf_unit self.amplitude = (total_flux / (gaussian_amp_to_totflux * u.AA)).to_value(units.PHOTLAM, u.spectral_density(self.mean.value * u.AA)) # noqa total_flux = total_flux.value self.meta['expr'] = 'em({0:g}, {1:g}, {2:g}, {3})'.format( self.mean.value, fwhm, total_flux, u_str)
[docs] def integrate(self, *args): # TODO: Remove unit hardcoding when we use model with units natively. return super(GaussianFlux1D, self).integrate(*args) * units.PHOTLAM
[docs] class Lorentz1D(_models.Lorentz1D): """Same as `astropy.modeling.functional_models.Lorentz1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self, factor_step=0.05, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.fwhm``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. """ w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.fwhm if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs] def integrate(self, x): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): x = u.Quantity(x, u.AA) x_0 = u.Quantity(self.x_0, u.AA) gamma = u.Quantity(self.fwhm, u.AA) * 0.5 a1 = np.arctan((min(x) - x_0) / gamma) a2 = np.arctan((max(x) - x_0) / gamma) da = (a2 - a1).to(u.dimensionless_unscaled, u.dimensionless_angles()) return self.amplitude * gamma * da
[docs] class RickerWavelet1D(_RickerWavelet1D): """Same as `astropy.modeling.functional_models.RickerWavelet1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self, factor_step=0.1, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.sigma``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. """ w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.sigma if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs] def integrate(self, x): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): x = u.Quantity(x, u.AA) x_0 = u.Quantity(self.x_0, u.AA) sig = u.Quantity(self.sigma, u.AA) # Roots, where y=0 root_left = x_0 - sig root_right = x_0 + sig x_min = min(x) x_max = max(x) if x_min >= root_left or x_max <= root_right: raise NotImplementedError( 'Partial analytic integration not supported') sig2 = sig * sig def _int_subregion(xx1, xx2): dx_min = xx1 - x_0 dx_max = xx2 - x_0 a1 = dx_min * np.exp(-0.5 * dx_min * dx_min / sig2) a2 = dx_max * np.exp(-0.5 * dx_max * dx_max / sig2) return abs(a2 - a1) # Unsigned area return self.amplitude * (_int_subregion(x_min, root_left) + _int_subregion(root_left, root_right) + _int_subregion(root_right, x_max))
# TODO: Emit proper deprecation warning. # https://github.com/spacetelescope/synphot_refactor/issues/249
[docs] class MexicanHat1D(RickerWavelet1D): """This is the deprecated name for `RickerWavelet1D`."""
[docs] class PowerLawFlux1D(_models.PowerLaw1D): """One dimensional power law model with proper flux handling. For multiple ``n_models``, this model only accepts parameters of the same unit; e.g., ``amplitude=[1, 2]`` or ``amplitude=Quantity([1, 2], 'photlam')``. Also see `~astropy.modeling.powerlaws.PowerLaw1D`. Parameters ---------- amplitude : number or `~astropy.units.quantity.Quantity` Model amplitude at the reference point. If not Quantity, assume the unit of PHOTLAM. x_0 : number or `~astropy.units.quantity.Quantity` Reference point. If not Quantity, assume the unit of Angstrom. alpha : float Power law index. """ def __init__(self, amplitude, x_0, alpha, **kwargs): if not isinstance(amplitude, u.Quantity): amplitude = amplitude * units.PHOTLAM if (amplitude.unit.physical_type in ('spectral flux density', 'spectral flux density wav', 'photon flux density', 'photon flux density wav')): self._flux_unit = amplitude.unit else: raise NotImplementedError( '{0} not supported.'.format(amplitude.unit)) if isinstance(x_0, u.Quantity): x_0 = x_0.to_value(u.AA, u.spectral()) super(PowerLawFlux1D, self).__init__( amplitude=amplitude.value, x_0=x_0, alpha=alpha, **kwargs)
[docs] def evaluate(self, x, *args): """Return flux in PHOTLAM. Assume input wavelength is in Angstrom.""" xx = x / self.x_0 y = (self.amplitude * xx ** (-self.alpha)) * self._flux_unit flux = units.convert_flux(x, y, units.PHOTLAM) return flux.value
[docs] def integrate(self, x): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): x = u.Quantity(x, u.AA) x_0 = u.Quantity(self.x_0, u.AA) amp = u.Quantity(self.amplitude, self._flux_unit) fac = 1 - self.alpha denom = x_0 ** -self.alpha * fac return amp * (max(x) ** fac - min(x) ** fac) / denom
[docs] class Trapezoid1D(_models.Trapezoid1D): """Same as `astropy.modeling.functional_models.Trapezoid1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self): """Return ``x`` array that samples the feature.""" x1, x4 = tuple(self.bounding_box.bounding_box()) dw = self.width * 0.5 x2 = self.x_0 - dw x3 = self.x_0 + dw if self._n_models == 1: w = [x1, x2, x3, x4] else: w = list(zip(x1, x2, x3, x4)) return np.asarray(w)
[docs] def integrate(self, *args): # TODO: Remove unit hardcoding when we use model with units natively. with u.add_enabled_equivalencies(u.spectral()): width = u.Quantity(self.width, u.AA) slope = u.Quantity(self.slope, 1 / u.AA) return self.amplitude * (width + self.amplitude / slope)
# Functions below are for sampleset magic. def _get_sampleset(model): """Return sampleset of a model or `None` if undefined.""" w = None if isinstance(model, Model) and hasattr(model, 'sampleset'): w = model.sampleset() return w def _model_tree_evaluate_sampleset(root): # Not a CompoundModel, grab sampleset and be done. if not hasattr(root, 'op'): return _get_sampleset(root) model1 = root.left model2 = root.right # model2 is redshifted, apply the redshift if applicable. if isinstance(model1, _models.RedshiftScaleFactor): val = _model_tree_evaluate_sampleset(model2) if val is None: w = val else: w = model1.inverse(val) # This should not ever happen, so ignore the redshift. elif isinstance(model2, _models.RedshiftScaleFactor): w = _model_tree_evaluate_sampleset(model1) # One of the models is scaled. Non-redshift scaling does # not affect sampleset of the model. elif isinstance(model1, _models.Scale): w = _model_tree_evaluate_sampleset(model2) elif isinstance(model2, _models.Scale): w = _model_tree_evaluate_sampleset(model1) # Combine sampleset from both models. else: w1 = _model_tree_evaluate_sampleset(model1) w2 = _model_tree_evaluate_sampleset(model2) w = merge_wavelengths(w1, w2) return w
[docs] def get_waveset(model): """Get optimal wavelengths for sampling a given model. Parameters ---------- model : `~astropy.modeling.Model` Model. Returns ------- waveset : array-like or `None` Optimal wavelengths. `None` if undefined. Raises ------ synphot.exceptions.SynphotError Invalid model. """ if not isinstance(model, Model): raise SynphotError('{0} is not a model.'.format(model)) return _model_tree_evaluate_sampleset(model)
# Functions below are for meta magic. def _get_meta(model): """Return metadata of a model.""" w = {} if isinstance(model, Model): w = model.meta return w def _model_tree_evaluate_metadata(root): # Not a CompoundModel, grab metadata and be done. if not hasattr(root, 'op'): return _get_meta(root) m1 = _model_tree_evaluate_metadata(root.left) m2 = _model_tree_evaluate_metadata(root.right) return metadata.merge(m1, m2, metadata_conflicts='silent')
[docs] def get_metadata(model): """Get metadata for a given model. Parameters ---------- model : `~astropy.modeling.Model` Model. Returns ------- meta : dict Metadata for the model. Raises ------ synphot.exceptions.SynphotError Invalid model. """ if not isinstance(model, Model): raise SynphotError('{0} is not a model.'.format(model)) # Deep copy to make sure modiyfing returned metadata # does not modify input model metadata, especially # if input model is not a compound model. meta = deepcopy(_model_tree_evaluate_metadata(model)) return meta