Source code for synphot.models

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spectrum models not in `astropy.modeling`."""
from __future__ import absolute_import, division, print_function
from .extern.six.moves import map, zip

import warnings
from collections import defaultdict
from copy import deepcopy
from functools import partial

import numpy as np

from astropy import constants as const
from astropy import units as u
from astropy.modeling import Fittable1DModel, Model, Parameter
from astropy.modeling import models as _models
from astropy.modeling.core import _CompoundModel
from astropy.modeling.models import Tabular1D
from astropy.stats.funcs import gaussian_fwhm_to_sigma, gaussian_sigma_to_fwhm
from astropy.utils import metadata
from astropy.utils.exceptions import AstropyUserWarning

from . import units
from .compat import ASTROPY_LT_2_0
from .exceptions import SynphotError
from .utils import merge_wavelengths

__all__ = ['BlackBody1D', 'BlackBodyNorm1D', 'Box1D', 'ConstFlux1D',
           'Empirical1D', 'Gaussian1D', 'GaussianAbsorption1D',
           'GaussianFlux1D', 'Lorentz1D', 'MexicanHat1D', 'PowerLawFlux1D',
           'Trapezoid1D', 'get_waveset', 'get_metadata']

[docs]class BlackBody1D(Fittable1DModel): """Create a :ref:`blackbody spectrum <synphot-planck-law>` model with given temperature. Parameters ---------- temperature : float Blackbody temperature in Kelvin. """ temperature = Parameter(default=5000) def __init__(self, *args, **kwargs): super(BlackBody1D, self).__init__(*args, **kwargs) self.meta['expr'] = 'bb({0})'.format(self.temperature.value) @property def lambda_max(self): """Peak wavelength in Angstrom when the curve is expressed as power density.""" return ((const.b_wien.value / self.temperature) * u.m).to(u.AA).value def bounding_box(self, factor=10.0): """Tuple defining the default ``bounding_box`` limits, ``(x_low, x_high)``. .. math:: x_{\\textnormal{low}} = 0 x_{\\textnormal{high}} = \\log(\\lambda_{\\textnormal{max}} \\;\ (1 + \\textnormal{factor})) Parameters ---------- factor : float Used to calculate ``x_high``. """ w0 = self.lambda_max return (w0 * 0, np.log10(w0 + factor * w0))
[docs] def sampleset(self, factor_bbox=10.0, num=1000): """Return ``x`` array that samples the feature. Parameters ---------- factor_bbox : float Factor for ``bounding_box`` calculations. num : int Number of points to generate. """ w1, w2 = self.bounding_box(factor=factor_bbox) if self._n_models == 1: w = np.logspace(w1, w2, num) else: w = list(map(partial(np.logspace, num=num), w1, w2)) return np.asarray(w)
[docs] @staticmethod def evaluate(x, temperature): """Evaluate the model. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. temperature : number Temperature in Kelvin. Returns ------- y : number or ndarray Blackbody radiation in PHOTLAM per steradian. """ if ASTROPY_LT_2_0: from astropy.analytic_functions.blackbody import blackbody_nu else: from astropy.modeling.blackbody import blackbody_nu # Silence Numpy old_np_err_cfg = np.seterr(all='ignore') wave = np.ascontiguousarray(x) * u.AA bbnu_flux = blackbody_nu(wave, temperature) bbflux = (bbnu_flux * units.PHOTLAM, u.spectral_density(wave)) / # PHOTLAM/sr # Restore Numpy settings np.seterr(**old_np_err_cfg) return bbflux.value
[docs]class BlackBodyNorm1D(BlackBody1D): """Create a normalized :ref:`blackbody spectrum <synphot-planck-law>` with given temperature. It is normalized by multiplying `BlackBody1D` result with a solid angle, :math:`\\Omega`, as defined below, where :math:`d` is 1 kpc: .. math:: \\Omega = \\frac{\\pi R_{\\textnormal{Sun}}^{2}}{d^{2}} Parameters ---------- temperature : float Blackbody temperature in Kelvin. """ def __init__(self, *args, **kwargs): super(BlackBodyNorm1D, self).__init__(*args, **kwargs) self._omega = np.pi * (const.R_sun / const.kpc).value ** 2 # steradian
[docs] def evaluate(self, x, temperature): """Evaluate the model. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. temperature : number Temperature in Kelvin. Returns ------- y : number or ndarray Blackbody radiation in PHOTLAM. """ bbflux = super(BlackBodyNorm1D, self).evaluate(x, temperature) return bbflux * self._omega
[docs]class Box1D(_models.Box1D): """Same as `astropy.modeling.functional_models.Box1D`, except with ``sampleset`` defined. """ @staticmethod def _calc_sampleset(w1, w2, step, minimal): """Calculate sampleset for each model.""" if minimal: arr = [w1 - step, w1, w2, w2 + step] else: arr = np.arange(w1 - step, w2 + step + step, step) return arr
[docs] def sampleset(self, step=0.01, minimal=False): """Return ``x`` array that samples the feature. Parameters ---------- step : float Distance of first and last points w.r.t. bounding box. minimal : bool Only return the minimal points needed to define the box; i.e., box edges and a point outside on each side. """ w1, w2 = self.bounding_box if self._n_models == 1: w = self._calc_sampleset(w1, w2, step, minimal) else: w = list(map(partial( self._calc_sampleset, step=step, minimal=minimal), w1, w2)) return np.asarray(w)
[docs]class ConstFlux1D(_models.Const1D): """One dimensional constant flux model. Flux that is constant in a given unit might not be constant in another unit. During evaluation, flux is always converted to PHOTLAM. For multiple ``n_models``, this model only accepts amplitudes of the same flux unit; e.g., ``[1, 2]`` or ``Quantity([1, 2], 'photlam')``. Parameters ---------- amplitude : number or `~astropy.units.quantity.Quantity` Value and unit of the constant function. If not Quantity, assume the unit of PHOTLAM. """ def __init__(self, amplitude, **kwargs): if not isinstance(amplitude, u.Quantity): amplitude = amplitude * units.PHOTLAM if amplitude.unit == u.STmag: a = units.convert_flux(1, amplitude, units.FLAM) elif amplitude.unit == u.ABmag: a = units.convert_flux(1, amplitude, units.FNU) elif (amplitude.unit.physical_type in ('spectral flux density', 'spectral flux density wav', 'photon flux density', 'photon flux density wav')): a = amplitude else: raise NotImplementedError( '{0} not supported.'.format(amplitude.unit)) self._flux_unit = a.unit super(ConstFlux1D, self).__init__(amplitude=a.value, **kwargs)
[docs] def evaluate(self, x, *args): """One dimensional constant flux model function. Parameters ---------- x : number or ndarray Wavelengths in Angstrom. Returns ------- y : number or ndarray Flux in PHOTLAM. """ a = (self.amplitude * np.ones_like(x)) * self._flux_unit y = units.convert_flux(x, a, units.PHOTLAM) return y.value
[docs]class Empirical1D(Tabular1D): """Empirical (sampled) spectrum or bandpass model. .. note:: This model requires `SciPy <>`_ 0.14 or later to be installed. Parameters ---------- keep_neg : bool Convert negative ``lookup_table`` values to zeroes? This is to be consistent with ASTROLIB PYSYNPHOT. kwargs : dict Keywords for `~astropy.modeling.tabular.Tabular1D` model creation or :func:`~scipy.interpolate.interpn`. When ``fill_value=np.nan`` is given, extrapolation is done based on nearest end points on each end; This is the default behavior. """ def __init__(self, **kwargs): # Manually insert user metadata here to accomodate any warning # from self._process_neg_flux() meta = kwargs.pop('meta', {}) self.meta = meta if 'warnings' not in self.meta: self.meta['warnings'] = {} x = kwargs['points'] y = kwargs['lookup_table'] # Points can only be ascending for interpn() if x[-1] < x[0]: x = x[::-1] y = y[::-1] kwargs['points'] = x # Handle negative flux keep_neg = kwargs.pop('keep_neg', False) self._keep_neg = keep_neg y = self._process_neg_flux(x, y) kwargs['lookup_table'] = y super(Empirical1D, self).__init__(**kwargs) # Set non-default interpolation default values. # For tapered model, just fill with zero; # Otherwise, extrapolate like ASTROLIB PYSYNPHOT. self.bounds_error = kwargs.get('bounds_error', False) if self.is_tapered(): self.fill_value = kwargs.get('fill_value', 0) else: self.fill_value = kwargs.get('fill_value', np.nan) def _process_neg_flux(self, x, y): """Remove negative flux.""" if self._keep_neg: # Nothing to do return y old_y = None if np.isscalar(y): # pragma: no cover if y < 0: n_neg = 1 old_x = x old_y = y y = 0 else: x = np.asarray(x) # In case input is just pure list y = np.asarray(y) i = np.where(y < 0) n_neg = len(i[0]) if n_neg > 0: old_x = x[i] old_y = y[i] y[i] = 0 if old_y is not None: warn_str = ('{0} bin(s) contained negative flux or throughput' '; it/they will be set to zero.'.format(n_neg)) warn_str += '\n points: {0}\n lookup_table: {1}'.format( old_x, old_y) # Extra info self.meta['warnings'].update({'NegativeFlux': warn_str}) warnings.warn(warn_str, AstropyUserWarning) return y
[docs] def is_tapered(self): return np.array_equal( self.lookup_table[::self.lookup_table.size - 1], [0, 0])
[docs] def sampleset(self): """Return array that samples the feature.""" return np.squeeze(self.points)
[docs] def evaluate(self, inputs): """Evaluate the model. Parameters ---------- inputs : number or ndarray Wavelengths in same unit as ``points``. Returns ------- y : number or ndarray Flux or throughput in same unit as ``lookup_table``. """ y = super(Empirical1D, self).evaluate(inputs) # Assume NaN at both ends need to be extrapolated based on # nearest end point. if self.fill_value is np.nan: # Cannot use sampleset() due to ExtinctionModel1D x = np.squeeze(self.points) if np.isscalar(y): # pragma: no cover if inputs < x[0]: y = self.lookup_table[0] elif inputs > x[-1]: y = self.lookup_table[-1] else: y[inputs < x[0]] = self.lookup_table[0] y[inputs > x[-1]] = self.lookup_table[-1] return self._process_neg_flux(inputs, y)
class BaseGaussian1D(_models.Gaussian1D): """Same as `astropy.modeling.functional_models.BaseGaussian1D`, except with ``sampleset`` defined. """ def sampleset(self, factor_step=0.1, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.stddev``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. Default ``factor`` is set to 5 to be compatible with ASTROLIB PYSYNPHOT. """ if 'factor' not in kwargs: kwargs['factor'] = 5.0 w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.stddev if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs]class Gaussian1D(BaseGaussian1D): """Same as `astropy.modeling.functional_models.Gaussian1D`, except with ``sampleset`` defined. """ pass
[docs]class GaussianAbsorption1D(BaseGaussian1D): """Same as ``astropy.modeling.functional_models.GaussianAbsorption1D``, except with ``sampleset`` defined. """
[docs] @staticmethod def evaluate(x, amplitude, mean, stddev): """ GaussianAbsorption1D model function. """ return 1.0 - Gaussian1D.evaluate(x, amplitude, mean, stddev)
[docs] @staticmethod def fit_deriv(x, amplitude, mean, stddev): """ GaussianAbsorption1D model function derivatives. """ import operator return list(map( operator.neg, Gaussian1D.fit_deriv(x, amplitude, mean, stddev)))
[docs]class GaussianFlux1D(Gaussian1D): """Same as `Gaussian1D` but accepts extra keywords below. Parameters ---------- amplitude : float Amplitude of the Gaussian in PHOTLAM. Also see ``total_flux``. mean : float Mean of the Gaussian in Angstrom. stddev : float Standard deviation of the Gaussian in Angstrom. Also see ``fwhm``. fwhm : float Full width at half maximum of the Gaussian in Angstrom. If given, this overrides ``stddev``. total_flux : float Total flux under the Gaussian in ``erg/s/cm^2``. If given, this overrides ``amplitude``. """ def __init__(self, *args, **kwargs): fwhm = kwargs.pop('fwhm', None) total_flux = kwargs.pop('total_flux', None) super(GaussianFlux1D, self).__init__(*args, **kwargs) if fwhm is None: fwhm = self.stddev * gaussian_sigma_to_fwhm else: self.stddev = fwhm * gaussian_fwhm_to_sigma gaussian_amp_to_totflux = np.sqrt(2.0 * np.pi) * self.stddev if total_flux is None: u_str = 'PHOTLAM' total_flux = self.amplitude * gaussian_amp_to_totflux else: u_str = 'FLAM' # total_flux is passed in unaltered, any conversion error would # happen here. tf_unit = u.erg / ( * * u.s) if isinstance(total_flux, u.Quantity): total_flux = else: total_flux = total_flux * tf_unit self.amplitude = (total_flux / (gaussian_amp_to_totflux * u.AA)).to(units.PHOTLAM, u.spectral_density(self.mean.value * u.AA)).value # noqa total_flux = total_flux.value self.meta['expr'] = 'em({0:g}, {1:g}, {2:g}, {3})'.format( self.mean.value, fwhm, total_flux, u_str)
[docs]class Lorentz1D(_models.Lorentz1D): """Same as `astropy.modeling.functional_models.Lorentz1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self, factor_step=0.05, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.fwhm``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. """ w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.fwhm if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs]class MexicanHat1D(_models.MexicanHat1D): """Same as `astropy.modeling.models.MexicanHat1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self, factor_step=0.1, **kwargs): """Return ``x`` array that samples the feature. Parameters ---------- factor_step : float Factor for sample step calculation. The step is calculated using ``factor_step * self.sigma``. kwargs : dict Keyword(s) for ``bounding_box`` calculation. """ w1, w2 = self.bounding_box(**kwargs) dw = factor_step * self.sigma if self._n_models == 1: w = np.arange(w1, w2, dw) else: w = list(map(np.arange, w1, w2, dw)) return np.asarray(w)
[docs]class PowerLawFlux1D(_models.PowerLaw1D): """One dimensional power law model with proper flux handling. For multiple ``n_models``, this model only accepts parameters of the same unit; e.g., ``amplitude=[1, 2]`` or ``amplitude=Quantity([1, 2], 'photlam')``. Also see `~astropy.modeling.powerlaws.PowerLaw1D`. Parameters ---------- amplitude : number or `~astropy.units.quantity.Quantity` Model amplitude at the reference point. If not Quantity, assume the unit of PHOTLAM. x_0 : number or `~astropy.units.quantity.Quantity` Reference point. If not Quantity, assume the unit of Angstrom. alpha : float Power law index. """ def __init__(self, amplitude, x_0, alpha, **kwargs): if not isinstance(amplitude, u.Quantity): amplitude = amplitude * units.PHOTLAM if (amplitude.unit.physical_type in ('spectral flux density', 'spectral flux density wav', 'photon flux density', 'photon flux density wav')): self._flux_unit = amplitude.unit else: raise NotImplementedError( '{0} not supported.'.format(amplitude.unit)) if isinstance(x_0, u.Quantity): x_0 =, u.spectral()).value super(PowerLawFlux1D, self).__init__( amplitude=amplitude.value, x_0=x_0, alpha=alpha, **kwargs)
[docs] def evaluate(self, x, *args): """Return flux in PHOTLAM. Assume input wavelength is in Angstrom.""" xx = x / self.x_0 y = (self.amplitude * xx ** (-self.alpha)) * self._flux_unit flux = units.convert_flux(x, y, units.PHOTLAM) return flux.value
[docs]class Trapezoid1D(_models.Trapezoid1D): """Same as `astropy.modeling.functional_models.Trapezoid1D`, except with ``sampleset`` defined. """
[docs] def sampleset(self): """Return ``x`` array that samples the feature.""" x1, x4 = self.bounding_box dw = self.width * 0.5 x2 = self.x_0 - dw x3 = self.x_0 + dw if self._n_models == 1: w = [x1, x2, x3, x4] else: w = list(zip(x1, x2, x3, x4)) return np.asarray(w)
# Functions below are for sampleset magic. def _get_sampleset(model): """Return sampleset of a model or `None` if undefined. Model could be a real model or evaluated sampleset.""" if isinstance(model, Model): if hasattr(model, 'sampleset'): w = model.sampleset() else: w = None else: w = model # Already a sampleset return w def _merge_sampleset(model1, model2): """Simple merge of samplesets.""" w1 = _get_sampleset(model1) w2 = _get_sampleset(model2) return merge_wavelengths(w1, w2) def _shift_wavelengths(model1, model2): """One of the models is either ``RedshiftScaleFactor`` or ``Scale``. Possible combos:: RedshiftScaleFactor | Model Scale | Model Model | Scale """ if isinstance(model1, _models.RedshiftScaleFactor): val = _get_sampleset(model2) if val is None: w = val else: w = model1.inverse(val) elif isinstance(model1, _models.Scale): w = _get_sampleset(model2) else: w = _get_sampleset(model1) return w WAVESET_OPERATORS = { '+': _merge_sampleset, '-': _merge_sampleset, '*': _merge_sampleset, '/': _merge_sampleset, '**': _merge_sampleset, '|': _shift_wavelengths, '&': _merge_sampleset }
[docs]def get_waveset(model): """Get optimal wavelengths for sampling a given model. Parameters ---------- model : `~astropy.modeling.Model` Model. Returns ------- waveset : array-like or `None` Optimal wavelengths. `None` if undefined. Raises ------ synphot.exceptions.SynphotError Invalid model. """ if not isinstance(model, Model): raise SynphotError('{0} is not a model.'.format(model)) if isinstance(model, _CompoundModel): waveset = model._tree.evaluate(WAVESET_OPERATORS, getter=None) else: waveset = _get_sampleset(model) return waveset
# Functions below are for meta magic. # This is for joining metadata in a compound model. METADATA_OPERATORS = defaultdict(lambda: _merge_meta) def _get_meta(model): """Return metadata of a model. Model could be a real model or evaluated metadata.""" if isinstance(model, Model): w = model.meta else: w = model # Already metadata return w def _merge_meta(model1, model2): """Simple merge of samplesets.""" w1 = _get_meta(model1) w2 = _get_meta(model2) return metadata.merge(w1, w2, metadata_conflicts='silent')
[docs]def get_metadata(model): """Get metadata for a given model. Parameters ---------- model : `~astropy.modeling.Model` Model. Returns ------- metadata : dict Metadata for the model. Raises ------ synphot.exceptions.SynphotError Invalid model. """ if not isinstance(model, Model): raise SynphotError('{0} is not a model.'.format(model)) if isinstance(model, _CompoundModel): metadata = model._tree.evaluate(METADATA_OPERATORS, getter=None) else: metadata = deepcopy(model.meta) return metadata