# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spectrum models not in `astropy.modeling`."""
# STDLIB
import math
import warnings
from copy import deepcopy
from functools import partial
# THIRD-PARTY
import numpy as np
# ASTROPY
from astropy import constants as const
from astropy import units as u
from astropy.modeling import Fittable1DModel, Model, Parameter
from astropy.modeling import models as _models
from astropy.modeling.models import (RickerWavelet1D as _RickerWavelet1D,
Tabular1D)
from astropy.stats.funcs import gaussian_fwhm_to_sigma, gaussian_sigma_to_fwhm
from astropy.utils import metadata
from astropy.utils.exceptions import AstropyUserWarning
# LOCAL
from synphot import units
from synphot.exceptions import SynphotError
from synphot.utils import merge_wavelengths
__all__ = ['BlackBody1D', 'BlackBodyNorm1D', 'Box1D', 'ConstFlux1D',
'Empirical1D', 'Gaussian1D', 'GaussianAbsorption1D',
'GaussianFlux1D', 'Lorentz1D', 'MexicanHat1D', 'RickerWavelet1D',
'PowerLawFlux1D', 'Trapezoid1D', 'get_waveset', 'get_metadata']
[docs]
class BlackBody1D(Fittable1DModel):
"""Create a :ref:`blackbody spectrum <synphot-planck-law>`
model with given temperature.
Parameters
----------
temperature : float
Blackbody temperature in Kelvin.
"""
temperature = Parameter(default=5000)
def __init__(self, *args, **kwargs):
super(BlackBody1D, self).__init__(*args, **kwargs)
self.meta['expr'] = 'bb({0})'.format(self.temperature.value)
@property
def lambda_max(self):
"""Peak wavelength in Angstrom when the curve is expressed as
power density."""
return ((const.b_wien.value / self.temperature) * u.m).to_value(u.AA)
def bounding_box(self, factor=10.0):
"""Tuple defining the default ``bounding_box`` limits,
``(x_low, x_high)``.
.. math::
x_{\\mathrm{low}} = 0
x_{\\mathrm{high}} = \\log(\\lambda_{\\mathrm{max}} \\;\
(1 + \\mathrm{factor}))
Parameters
----------
factor : float
Used to calculate ``x_high``.
"""
w0 = self.lambda_max
return (w0 * 0, np.log10(w0 + factor * w0))
[docs]
def sampleset(self, factor_bbox=10.0, num=1000):
"""Return ``x`` array that samples the feature.
Parameters
----------
factor_bbox : float
Factor for ``bounding_box`` calculations.
num : int
Number of points to generate.
"""
w1, w2 = self.bounding_box(factor=factor_bbox)
if self._n_models == 1:
w = np.logspace(w1, w2, num)
else:
w = list(map(partial(np.logspace, num=num), w1, w2))
return np.asarray(w)
[docs]
@staticmethod
def evaluate(x, temperature):
"""Evaluate the model.
Parameters
----------
x : number or ndarray
Wavelengths in Angstrom.
temperature : number
Temperature in Kelvin.
Returns
-------
y : number or ndarray
Blackbody radiation in PHOTLAM per steradian.
"""
from synphot.blackbody import blackbody_nu
# Silence Numpy
old_np_err_cfg = np.seterr(all='ignore')
wave = np.ascontiguousarray(x) * u.AA
bbnu_flux = blackbody_nu(wave, temperature)
bbflux = (bbnu_flux * u.sr).to(
units.PHOTLAM, u.spectral_density(wave)) / u.sr # PHOTLAM/sr
# Restore Numpy settings
np.seterr(**old_np_err_cfg)
return bbflux.value
[docs]
def integrate(self, *args):
with u.add_enabled_equivalencies(u.temperature()):
t = u.Quantity(self.temperature, u.K)
return (const.sigma_sb * t ** 4 / math.pi) # per steradian
[docs]
class BlackBodyNorm1D(BlackBody1D):
"""Create a normalized :ref:`blackbody spectrum <synphot-planck-law>`
with given temperature.
It is normalized by multiplying `BlackBody1D` result with a solid angle,
:math:`\\Omega`, as defined below, where :math:`d` is 1 kpc:
.. math::
\\Omega = \\frac{\\pi R_{\\mathrm{Sun}}^{2}}{d^{2}}
Parameters
----------
temperature : float
Blackbody temperature in Kelvin.
"""
def __init__(self, *args, **kwargs):
super(BlackBodyNorm1D, self).__init__(*args, **kwargs)
self._omega = np.pi * (const.R_sun / const.kpc).value ** 2 # steradian
[docs]
def evaluate(self, x, temperature):
"""Evaluate the model.
Parameters
----------
x : number or ndarray
Wavelengths in Angstrom.
temperature : number
Temperature in Kelvin.
Returns
-------
y : number or ndarray
Blackbody radiation in PHOTLAM.
"""
bbflux = super(BlackBodyNorm1D, self).evaluate(x, temperature)
return bbflux * self._omega
[docs]
def integrate(self, *args):
return super().integrate(*args) * self._omega
[docs]
class Box1D(_models.Box1D):
"""Same as `astropy.modeling.functional_models.Box1D`, except with
``sampleset`` defined.
Parameters
----------
step : float
Distance of first and last points w.r.t. bounding box. In default units
(nominally Angstrom). Defaults to 0.01
"""
def __init__(self, *args, **kwargs):
if "step" in kwargs:
self.step = kwargs.pop("step")
else:
self.step = 0.01
super(Box1D, self).__init__(*args, **kwargs)
@staticmethod
def _calc_sampleset(w1, w2, step, minimal):
"""Calculate sampleset for each model."""
if minimal:
arr = [w1 - step, w1, w2, w2 + step]
else:
arr = np.arange(w1 - step, w2 + step + step, step)
return arr
[docs]
def sampleset(self, step=None, minimal=False):
"""Return ``x`` array that samples the feature.
Parameters
----------
step : float, optional
Distance of first and last points w.r.t. bounding box. If None,
set to attribute ``step``.
minimal : bool
Only return the minimal points needed to define the box;
i.e., box edges and a point outside on each side.
"""
if step is None:
step = self.step
w1, w2 = tuple(self.bounding_box.bounding_box())
if self._n_models == 1:
w = self._calc_sampleset(w1, w2, step, minimal)
else:
w = list(map(partial(
self._calc_sampleset, step=step, minimal=minimal), w1, w2))
return np.asarray(w)
[docs]
def integrate(self, *args):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
w = u.Quantity(self.width, u.AA)
return self.amplitude * w
[docs]
class ConstFlux1D(_models.Const1D):
"""One dimensional constant flux model.
Flux that is constant in a given unit might not be constant in
another unit. During evaluation, flux is always converted to PHOTLAM.
For multiple ``n_models``, this model only accepts amplitudes of the
same flux unit; e.g., ``[1, 2]`` or ``Quantity([1, 2], 'photlam')``.
Parameters
----------
amplitude : number or `~astropy.units.quantity.Quantity`
Value and unit of the constant function.
If not Quantity, assume the unit of PHOTLAM.
"""
def __init__(self, amplitude, **kwargs):
if not isinstance(amplitude, u.Quantity):
amplitude = amplitude * units.PHOTLAM
if amplitude.unit == u.STmag:
a = units.convert_flux(1, amplitude, units.FLAM)
elif amplitude.unit == u.ABmag:
a = units.convert_flux(1, amplitude, units.FNU)
elif (amplitude.unit.physical_type in
('spectral flux density', 'spectral flux density wav',
'photon flux density', 'photon flux density wav')):
a = amplitude
else:
raise NotImplementedError(
'{0} not supported.'.format(amplitude.unit))
self._flux_unit = a.unit
super(ConstFlux1D, self).__init__(amplitude=a.value, **kwargs)
[docs]
def evaluate(self, x, *args):
"""One dimensional constant flux model function.
Parameters
----------
x : number or ndarray
Wavelengths in Angstrom.
Returns
-------
y : number or ndarray
Flux in PHOTLAM.
"""
a = (self.amplitude * np.ones_like(x)) * self._flux_unit
y = units.convert_flux(x, a, units.PHOTLAM)
return y.value
[docs]
def integrate(self, x):
# TODO: Remove unit hardcoding when we use model with units natively.
# TODO: We do not handle wav_unit as wave number nor energy for now.
if any(['wav' in t for t in self._flux_unit.physical_type]):
wav_unit = u.AA
else:
wav_unit = u.Hz
with u.add_enabled_equivalencies(u.spectral()):
x = u.Quantity(x, wav_unit)
amp = u.Quantity(self.amplitude, self._flux_unit)
return (max(x) - min(x)) * amp
[docs]
class Empirical1D(Tabular1D):
"""Empirical (sampled) spectrum or bandpass model.
.. note::
This model requires `SciPy <https://www.scipy.org>`_ 0.14
or later to be installed.
Parameters
----------
keep_neg : bool
Convert negative ``lookup_table`` values to zeroes?
This is to be consistent with ASTROLIB PYSYNPHOT.
kwargs : dict
Keywords for `~astropy.modeling.tabular.Tabular1D` model
creation or :func:`~scipy.interpolate.interpn`.
When ``fill_value=np.nan`` is given, extrapolation is done
based on nearest end points on each end; This is the default
behavior.
"""
def __init__(self, **kwargs):
# Manually insert user metadata here to accomodate any warning
# from self._process_neg_flux()
meta = kwargs.pop('meta', {})
self.meta = meta
if 'warnings' not in self.meta:
self.meta['warnings'] = {}
x = kwargs['points']
y = kwargs['lookup_table']
# Points can only be ascending for interpn()
if x[-1] < x[0]:
x = x[::-1]
y = y[::-1]
kwargs['points'] = x
# Handle negative flux
keep_neg = kwargs.pop('keep_neg', False)
self._keep_neg = keep_neg
y = self._process_neg_flux(x, y)
kwargs['lookup_table'] = y
super(Empirical1D, self).__init__(**kwargs)
# Set non-default interpolation default values.
# For tapered model, just fill with zero;
# Otherwise, extrapolate like ASTROLIB PYSYNPHOT.
self.bounds_error = kwargs.get('bounds_error', False)
if self.is_tapered():
self.fill_value = kwargs.get('fill_value', 0)
else:
self.fill_value = kwargs.get('fill_value', np.nan)
def _process_neg_flux(self, x, y):
"""Remove negative flux."""
if self._keep_neg: # Nothing to do
return y
old_y = None
if np.isscalar(y): # pragma: no cover
if y < 0:
n_neg = 1
old_x = x
old_y = y
y = 0
else:
x = np.asarray(x) # In case input is just pure list
y = np.asarray(y)
i = np.where(y < 0)
n_neg = len(i[0])
if n_neg > 0:
old_x = x[i]
old_y = y[i]
y[i] = 0
if old_y is not None:
warn_str = ('{0} bin(s) contained negative flux or throughput'
'; it/they will be set to zero.'.format(n_neg))
warn_str += '\n points: {0}\n lookup_table: {1}'.format(
old_x, old_y) # Extra info
self.meta['warnings'].update({'NegativeFlux': warn_str})
warnings.warn(warn_str, AstropyUserWarning)
return y
[docs]
def is_tapered(self):
return np.array_equal(
self.lookup_table[::self.lookup_table.size - 1], [0, 0])
[docs]
def sampleset(self):
"""Return array that samples the feature."""
return np.squeeze(self.points)
[docs]
def evaluate(self, inputs):
"""Evaluate the model.
Parameters
----------
inputs : number or ndarray
Wavelengths in same unit as ``points``.
Returns
-------
y : number or ndarray
Flux or throughput in same unit as ``lookup_table``.
"""
y = super(Empirical1D, self).evaluate(inputs)
# Assume NaN at both ends need to be extrapolated based on
# nearest end point.
if self.fill_value is np.nan:
# Cannot use sampleset() due to ExtinctionModel1D.
x = np.squeeze(self.points)
# np.squeeze may throw unit away.
if (isinstance(self.points, tuple) and
isinstance(self.points[0], u.Quantity) and
not isinstance(x, u.Quantity)):
x = x * self.points[0].unit
if np.isscalar(y): # pragma: no cover
if inputs < x[0]:
y = self.lookup_table[0]
elif inputs > x[-1]:
y = self.lookup_table[-1]
else:
y[inputs < x[0]] = self.lookup_table[0]
y[inputs > x[-1]] = self.lookup_table[-1]
return self._process_neg_flux(inputs, y)
class BaseGaussian1D(_models.Gaussian1D):
"""Same as `astropy.modeling.functional_models.BaseGaussian1D`, except with
``sampleset`` defined.
"""
_sqrt_2_pi = math.sqrt(2 * math.pi)
def sampleset(self, factor_step=0.1, **kwargs):
"""Return ``x`` array that samples the feature.
Parameters
----------
factor_step : float
Factor for sample step calculation. The step is calculated
using ``factor_step * self.stddev``.
kwargs : dict
Keyword(s) for ``bounding_box`` calculation.
Default ``factor`` is set to 5 to be compatible with
ASTROLIB PYSYNPHOT.
"""
if 'factor' not in kwargs:
kwargs['factor'] = 5.0
w1, w2 = self.bounding_box(**kwargs)
dw = factor_step * self.stddev
if self._n_models == 1:
w = np.arange(w1, w2, dw)
else:
w = list(map(np.arange, w1, w2, dw))
return np.asarray(w)
[docs]
class Gaussian1D(BaseGaussian1D):
"""Same as `astropy.modeling.functional_models.Gaussian1D`, except with
``sampleset`` defined.
"""
[docs]
def integrate(self, *args):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
stddev = u.Quantity(self.stddev, u.AA)
return self.amplitude * stddev * self._sqrt_2_pi
# TODO: Deprecate this?
# This is not really supported anymore but kept for backward compatibility.
[docs]
class GaussianAbsorption1D(BaseGaussian1D):
"""Same as ``astropy.modeling.functional_models.GaussianAbsorption1D``,
except with ``sampleset`` defined.
"""
[docs]
@staticmethod
def evaluate(x, amplitude, mean, stddev):
"""
GaussianAbsorption1D model function.
"""
return 1.0 - Gaussian1D.evaluate(x, amplitude, mean, stddev)
[docs]
@staticmethod
def fit_deriv(x, amplitude, mean, stddev):
"""
GaussianAbsorption1D model function derivatives.
"""
import operator
return list(map(
operator.neg, Gaussian1D.fit_deriv(x, amplitude, mean, stddev)))
[docs]
class GaussianFlux1D(Gaussian1D):
"""Same as `Gaussian1D` but accepts extra keywords below.
Parameters
----------
amplitude : float
Amplitude of the Gaussian in PHOTLAM.
Also see ``total_flux``.
mean : float
Mean of the Gaussian in Angstrom.
stddev : float
Standard deviation of the Gaussian in Angstrom.
Also see ``fwhm``.
fwhm : float
Full width at half maximum of the Gaussian in Angstrom.
If given, this overrides ``stddev``.
total_flux : float
Total flux under the Gaussian in ``erg/s/cm^2``.
If given, this overrides ``amplitude``.
"""
def __init__(self, *args, **kwargs):
fwhm = kwargs.pop('fwhm', None)
total_flux = kwargs.pop('total_flux', None)
super(GaussianFlux1D, self).__init__(*args, **kwargs)
if fwhm is None:
fwhm = self.stddev * gaussian_sigma_to_fwhm
else:
self.stddev = fwhm * gaussian_fwhm_to_sigma
gaussian_amp_to_totflux = self._sqrt_2_pi * self.stddev
if total_flux is None:
u_str = 'PHOTLAM'
total_flux = self.amplitude * gaussian_amp_to_totflux
else:
u_str = 'FLAM'
# total_flux is passed in unaltered, any conversion error would
# happen here.
tf_unit = u.erg / (u.cm * u.cm * u.s)
if isinstance(total_flux, u.Quantity):
total_flux = total_flux.to(tf_unit)
else:
total_flux = total_flux * tf_unit
self.amplitude = (total_flux / (gaussian_amp_to_totflux * u.AA)).to_value(units.PHOTLAM, u.spectral_density(self.mean.value * u.AA)) # noqa
total_flux = total_flux.value
self.meta['expr'] = 'em({0:g}, {1:g}, {2:g}, {3})'.format(
self.mean.value, fwhm, total_flux, u_str)
[docs]
def integrate(self, *args):
# TODO: Remove unit hardcoding when we use model with units natively.
return super(GaussianFlux1D, self).integrate(*args) * units.PHOTLAM
[docs]
class Lorentz1D(_models.Lorentz1D):
"""Same as `astropy.modeling.functional_models.Lorentz1D`, except with
``sampleset`` defined.
"""
[docs]
def sampleset(self, factor_step=0.05, **kwargs):
"""Return ``x`` array that samples the feature.
Parameters
----------
factor_step : float
Factor for sample step calculation. The step is calculated
using ``factor_step * self.fwhm``.
kwargs : dict
Keyword(s) for ``bounding_box`` calculation.
"""
w1, w2 = self.bounding_box(**kwargs)
dw = factor_step * self.fwhm
if self._n_models == 1:
w = np.arange(w1, w2, dw)
else:
w = list(map(np.arange, w1, w2, dw))
return np.asarray(w)
[docs]
def integrate(self, x):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
x = u.Quantity(x, u.AA)
x_0 = u.Quantity(self.x_0, u.AA)
gamma = u.Quantity(self.fwhm, u.AA) * 0.5
a1 = np.arctan((min(x) - x_0) / gamma)
a2 = np.arctan((max(x) - x_0) / gamma)
da = (a2 - a1).to(u.dimensionless_unscaled, u.dimensionless_angles())
return self.amplitude * gamma * da
[docs]
class RickerWavelet1D(_RickerWavelet1D):
"""Same as `astropy.modeling.functional_models.RickerWavelet1D`, except
with ``sampleset`` defined.
"""
[docs]
def sampleset(self, factor_step=0.1, **kwargs):
"""Return ``x`` array that samples the feature.
Parameters
----------
factor_step : float
Factor for sample step calculation. The step is calculated
using ``factor_step * self.sigma``.
kwargs : dict
Keyword(s) for ``bounding_box`` calculation.
"""
w1, w2 = self.bounding_box(**kwargs)
dw = factor_step * self.sigma
if self._n_models == 1:
w = np.arange(w1, w2, dw)
else:
w = list(map(np.arange, w1, w2, dw))
return np.asarray(w)
[docs]
def integrate(self, x):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
x = u.Quantity(x, u.AA)
x_0 = u.Quantity(self.x_0, u.AA)
sig = u.Quantity(self.sigma, u.AA)
# Roots, where y=0
root_left = x_0 - sig
root_right = x_0 + sig
x_min = min(x)
x_max = max(x)
if x_min >= root_left or x_max <= root_right:
raise NotImplementedError(
'Partial analytic integration not supported')
sig2 = sig * sig
def _int_subregion(xx1, xx2):
dx_min = xx1 - x_0
dx_max = xx2 - x_0
a1 = dx_min * np.exp(-0.5 * dx_min * dx_min / sig2)
a2 = dx_max * np.exp(-0.5 * dx_max * dx_max / sig2)
return abs(a2 - a1)
# Unsigned area
return self.amplitude * (_int_subregion(x_min, root_left) +
_int_subregion(root_left, root_right) +
_int_subregion(root_right, x_max))
# TODO: Emit proper deprecation warning.
# https://github.com/spacetelescope/synphot_refactor/issues/249
[docs]
class MexicanHat1D(RickerWavelet1D):
"""This is the deprecated name for `RickerWavelet1D`."""
[docs]
class PowerLawFlux1D(_models.PowerLaw1D):
"""One dimensional power law model with proper flux handling.
For multiple ``n_models``, this model only accepts parameters of the
same unit; e.g., ``amplitude=[1, 2]`` or
``amplitude=Quantity([1, 2], 'photlam')``.
Also see `~astropy.modeling.powerlaws.PowerLaw1D`.
Parameters
----------
amplitude : number or `~astropy.units.quantity.Quantity`
Model amplitude at the reference point.
If not Quantity, assume the unit of PHOTLAM.
x_0 : number or `~astropy.units.quantity.Quantity`
Reference point.
If not Quantity, assume the unit of Angstrom.
alpha : float
Power law index.
"""
def __init__(self, amplitude, x_0, alpha, **kwargs):
if not isinstance(amplitude, u.Quantity):
amplitude = amplitude * units.PHOTLAM
if (amplitude.unit.physical_type in
('spectral flux density', 'spectral flux density wav',
'photon flux density', 'photon flux density wav')):
self._flux_unit = amplitude.unit
else:
raise NotImplementedError(
'{0} not supported.'.format(amplitude.unit))
if isinstance(x_0, u.Quantity):
x_0 = x_0.to_value(u.AA, u.spectral())
super(PowerLawFlux1D, self).__init__(
amplitude=amplitude.value, x_0=x_0, alpha=alpha, **kwargs)
[docs]
def evaluate(self, x, *args):
"""Return flux in PHOTLAM. Assume input wavelength is in Angstrom."""
xx = x / self.x_0
y = (self.amplitude * xx ** (-self.alpha)) * self._flux_unit
flux = units.convert_flux(x, y, units.PHOTLAM)
return flux.value
[docs]
def integrate(self, x):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
x = u.Quantity(x, u.AA)
x_0 = u.Quantity(self.x_0, u.AA)
amp = u.Quantity(self.amplitude, self._flux_unit)
fac = 1 - self.alpha
denom = x_0 ** -self.alpha * fac
return amp * (max(x) ** fac - min(x) ** fac) / denom
[docs]
class Trapezoid1D(_models.Trapezoid1D):
"""Same as `astropy.modeling.functional_models.Trapezoid1D`, except with
``sampleset`` defined.
"""
[docs]
def sampleset(self):
"""Return ``x`` array that samples the feature."""
x1, x4 = tuple(self.bounding_box.bounding_box())
dw = self.width * 0.5
x2 = self.x_0 - dw
x3 = self.x_0 + dw
if self._n_models == 1:
w = [x1, x2, x3, x4]
else:
w = list(zip(x1, x2, x3, x4))
return np.asarray(w)
[docs]
def integrate(self, *args):
# TODO: Remove unit hardcoding when we use model with units natively.
with u.add_enabled_equivalencies(u.spectral()):
width = u.Quantity(self.width, u.AA)
slope = u.Quantity(self.slope, 1 / u.AA)
return self.amplitude * (width + self.amplitude / slope)
# Functions below are for sampleset magic.
def _get_sampleset(model):
"""Return sampleset of a model or `None` if undefined."""
w = None
if isinstance(model, Model) and hasattr(model, 'sampleset'):
w = model.sampleset()
return w
def _model_tree_evaluate_sampleset(root):
# Not a CompoundModel, grab sampleset and be done.
if not hasattr(root, 'op'):
return _get_sampleset(root)
model1 = root.left
model2 = root.right
# model2 is redshifted, apply the redshift if applicable.
if isinstance(model1, _models.RedshiftScaleFactor):
val = _model_tree_evaluate_sampleset(model2)
if val is None:
w = val
else:
w = model1.inverse(val)
# This should not ever happen, so ignore the redshift.
elif isinstance(model2, _models.RedshiftScaleFactor):
w = _model_tree_evaluate_sampleset(model1)
# One of the models is scaled. Non-redshift scaling does
# not affect sampleset of the model.
elif isinstance(model1, _models.Scale):
w = _model_tree_evaluate_sampleset(model2)
elif isinstance(model2, _models.Scale):
w = _model_tree_evaluate_sampleset(model1)
# Combine sampleset from both models.
else:
w1 = _model_tree_evaluate_sampleset(model1)
w2 = _model_tree_evaluate_sampleset(model2)
w = merge_wavelengths(w1, w2)
return w
[docs]
def get_waveset(model):
"""Get optimal wavelengths for sampling a given model.
Parameters
----------
model : `~astropy.modeling.Model`
Model.
Returns
-------
waveset : array-like or `None`
Optimal wavelengths. `None` if undefined.
Raises
------
synphot.exceptions.SynphotError
Invalid model.
"""
if not isinstance(model, Model):
raise SynphotError('{0} is not a model.'.format(model))
return _model_tree_evaluate_sampleset(model)
# Functions below are for meta magic.
def _get_meta(model):
"""Return metadata of a model."""
w = {}
if isinstance(model, Model):
w = model.meta
return w
def _model_tree_evaluate_metadata(root):
# Not a CompoundModel, grab metadata and be done.
if not hasattr(root, 'op'):
return _get_meta(root)
m1 = _model_tree_evaluate_metadata(root.left)
m2 = _model_tree_evaluate_metadata(root.right)
return metadata.merge(m1, m2, metadata_conflicts='silent')