# coding=utf8
# Copyright (c) MPE/IR-Submm Group. See LICENSE.rst for license information.
#
# Kinematic options for DysmalPy
from __future__ import (absolute_import, division, print_function,
unicode_literals)
# Standard library
import logging
# Third party imports
import numpy as np
import scipy.special as scp_spec
import scipy.interpolate as scp_interp
import scipy.optimize as scp_opt
# Local imports
from .baryons import DiskBulge, LinearDiskBulge, Sersic, ExpDisk
__all__ = ['KinematicOptions']
# LOGGER SETTINGS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('DysmalPy')
logger.setLevel(logging.INFO)
import warnings
warnings.filterwarnings("ignore")
# ****** Kinematic Options Class **********
[docs]
class KinematicOptions:
r"""
Object for storing and applying kinematic corrections
Parameters
----------
adiabatic_contract : bool
If True, apply adiabatic contraction when deriving the rotational velocity
pressure_support : bool
If True, apply asymmetric drift correction when deriving the rotational velocity
pressure_support_type : {1, 2, 3}
Type of asymmetric drift correction. Default is 1 (following Burkert et al. 2010).
pressure_support_re : float
Effective radius in kpc to use for asymmetric drift calculation
pressure_support_n : float
Sersic index to use for asymmetric drift calculation
Notes
-----
**Adiabatic contraction** is applied following Burkert et al (2010) [1]_.
The recipe involves numerically solving these two implicit equations:
.. math::
v^2_{\rm circ}(r) = v^2_{\rm disk}(r) + v^2_{\rm DM}(r^{\prime})
r^{\prime} = r\left(1 + \frac{rv^2_{\rm disk}(r)}{r^{\prime} v^2_{\rm DM}(r^{\prime})}\right)
Adiabatic contraction then can only be applied if there is a halo and baryon component
in the `ModelSet`.
**Pressure support** (i.e., asymmetric drift) can be calculated in three different ways.
By default (`pressure_support_type=1`), the asymmetric drift derivation from
Burkert et al. (2010) [1]_, Equation (11) is applied
(assuming an exponential disk, with :math:`R_e=1.678r_d`):
.. math::
v^2_{\rm rot}(r) = v^2_{\rm circ} - 3.36 \sigma_0^2 \left(\frac{r}{R_e}\right)
Alternatively, for `pressure_support_type=2`, the Sersic index can be taken into account beginning from
Eq (9) of Burkert et al. (2010), so the asymmetric drift is then:
.. math::
v^2_{\rm rot}(r) = v^2_{\rm circ} - 2 \sigma_0^2 \frac{b_n}{n} \left(\frac{r}{R_e}\right)^{1/n}
Finally, for `pressure_support_type=3`, the asymmetric drift is determined using
the pressure gradient (assuming constant veloctiy dispersion :math:`\sigma_0`).
This approach allows for explicitly incorporating different gradients
:math:`d\ln{}\rho(r)/d\ln{}r` for different components (versus applying the disk geometry inherent in the
in the later parts of the Burkert et al. derivation).
For `pressure_support_type=3`, we follow Eq (3) of Burkert et al. (2010):
.. math::
v^2_{\rm rot}(r) = v^2_{\rm circ} + \sigma_0^2 \frac{d \ln \rho(r)}{d \ln r}
Warnings
--------
Adiabatic contraction can significantly increase the computation time for a `ModelSet`
to simulate a cube.
References
----------
.. [1] https://ui.adsabs.harvard.edu/abs/2010ApJ...725.2324B/abstract
"""
def __init__(self, adiabatic_contract=False, pressure_support=False,
pressure_support_type=1, pressure_support_re=None,
pressure_support_n=None):
self.adiabatic_contract = adiabatic_contract
self.pressure_support = pressure_support
self.pressure_support_re = pressure_support_re
self.pressure_support_n = pressure_support_n
self.pressure_support_type = pressure_support_type
[docs]
def apply_adiabatic_contract(self, model, r, vbaryon_sq, vhalo_sq,
compute_dm=False, return_vsq=False,
step1d = 0.2):
"""
Function that applies adiabatic contraction to a ModelSet
Parameters
----------
model : `ModelSet`
ModelSet that adiabatic contraction will be applied to
r : array
Radii in kpc
vbaryon_sq : array
Square of baryonic component circular velocities in km^2/s^2
vhalo_sq : array
Square of dark matter halo circular velocities in km^2/s^2
compute_dm : bool
If True, will return the adiabatically contracted halo velocities.
return_vsq : bool
If True, return square velocities instead of taking sqrt.
step1d : float
Step size in kpc to use during adiabatic contraction calculation
Returns
-------
vel : array
Total circular velocity corrected for adiabatic contraction in km/s
vhalo_adi : array
Dark matter halo circular velocities corrected for adiabatic contraction.
Only returned if `compute_dm` = True
"""
if self.adiabatic_contract:
# Define 1d radius array for calculation
#step1d = 0.2 # kpc
# r1d = np.arange(step1d, np.ceil(r.max()/step1d)*step1d+ step1d, step1d, dtype=np.float64)
try:
rmaxin = r.max()
except:
rmaxin = r
try:
r_ap = model._model_aperture_r()
except:
r_ap = 0.
rmax_calc = max(5.* r_ap, rmaxin)
# Wide enough radius range for full calculation -- out to 5*Reff, at least
r1d = np.arange(step1d, np.ceil(rmax_calc/step1d)*step1d+ step1d, step1d, dtype=np.float64)
rprime_all_1d = np.zeros(len(r1d))
# Calculate vhalo, vbaryon on this 1D radius array [note r is a 3D array]
vhalo1d_sq = r1d * 0.
vbaryon1d_sq = r1d * 0.
for cmp in model.mass_components:
if model.mass_components[cmp]:
mcomp = model.components[cmp]
cmpnt_v_sq = mcomp.vcirc_sq(r1d)
if mcomp._subtype == 'dark_matter':
vhalo1d_sq = vhalo1d_sq + cmpnt_v_sq
elif mcomp._subtype == 'baryonic':
vbaryon1d_sq = vbaryon1d_sq + cmpnt_v_sq
elif mcomp._subtype == 'combined':
raise ValueError('Adiabatic contraction cannot be turned on when'
'using a combined baryonic and halo mass model!')
else:
raise TypeError("{} mass model subtype not recognized"
" for {} component. Only 'dark_matter'"
" or 'baryonic' accepted.".format(mcomp._subtype, cmp))
converged = np.zeros(len(r1d), dtype=bool)
for i in range(len(r1d)):
try:
result = scp_opt.newton(_adiabatic_sq, r1d[i] + 1.,
args=(r1d[i], vhalo1d_sq, r1d, vbaryon1d_sq[i]),
maxiter=200)
converged[i] = True
except:
result = r1d[i]
converged[i] = False
# ------------------------------------------------------------------
# HACK TO FIX WEIRD AC: If too weird: toss it...
if ('adiabatic_contract_modify_small_values' in self.__dict__.keys()):
if self.adiabatic_contract_modify_small_values:
if ((result < 0.) | (result > 5*max(r1d))):
result = r1d[i]
converged[i] = False
# ------------------------------------------------------------------
rprime_all_1d[i] = result
###########################
vhalo_adi_interp_1d = scp_interp.interp1d(r1d, np.sqrt(vhalo1d_sq), fill_value='extrapolate', kind='linear')
# Just calculations:
if converged.sum() < len(r1d):
if converged.sum() >= 0.9 *len(r1d):
rprime_all_1d = rprime_all_1d[converged]
r1d = r1d[converged]
vhalo_adi_1d = vhalo_adi_interp_1d(rprime_all_1d)
vhalo_adi_interp_map_3d = scp_interp.interp1d(r1d, vhalo_adi_1d, fill_value='extrapolate', kind='linear')
vhalo_adi = vhalo_adi_interp_map_3d(r)
vel_sq = vhalo_adi ** 2 + vbaryon_sq
else:
vel_sq = vhalo_sq + vbaryon_sq
if return_vsq:
if compute_dm:
if self.adiabatic_contract:
return vel_sq, vhalo_adi ** 2
else:
return vel_sq, vhalo_sq
else:
return vel_sq
else:
vel = np.sqrt(vel_sq)
if compute_dm:
if self.adiabatic_contract:
return vel, vhalo_adi
else:
vhalo = np.sqrt(vhalo_sq)
return vel, vhalo
else:
return vel
[docs]
def apply_pressure_support(self, r, model, vel_sq, tracer=None):
"""
Function to apply asymmetric drift correction
Parameters
----------
r : float or array
Radius or radii at which to apply the correction
model : `ModelSet`
ModelSet for which the correction is applied to
vel_sq : float or array
Square of circular velocity in km^2/s^2
tracer : string
Name of the dynamical tracer (used to determine which is the appropriate dispersion profile).
Returns
-------
vel_sq : float or array
Square of rotational velocity with asymmetric drift applied, in km^2/s^2
"""
if self.pressure_support:
if tracer is None:
raise ValueError("Must specify 'tracer' to determine pressure support!")
vel_asymm_drift_sq = self.get_asymm_drift_profile(r, model, tracer=tracer)
vel_squared = vel_sq - vel_asymm_drift_sq
# if array:
try:
vel_squared[vel_squared < 0] = 0.
except:
# if float single value:
if vel_squared < 0:
vel_squared = 0.
#vel = np.sqrt(vel_squared)
return vel_squared
[docs]
def correct_for_pressure_support(self, r, model, vel_sq, tracer=None):
"""
Remove asymmetric drift effect from input velocities
Parameters
----------
r : float or array
Radius or radii in kpc
model : `ModelSet`
ModelSet the correction is applied to
vel_sq : float or array
Square of rotational velocities in km^2/s^2 from which to remove asymmetric drift
tracer : string
Name of the dynamical tracer (used to determine which is the appropriate dispersion profile).
Returns
-------
vel_sq : float or array
Square of circular velocity after asymmetric drift is removed, in km^2/s^2
"""
if self.pressure_support:
if tracer is None:
raise ValueError("Must specify 'tracer' to determine pressure support!")
vel_asymm_drift_sq = self.get_asymm_drift_profile(r, model, tracer=tracer)
vel_squared = vel_sq + vel_asymm_drift_sq
# if array:
try:
vel_squared[vel_squared < 0] = 0.
except:
# if float single value:
if (vel_squared < 0):
vel_squared = 0.
#vel = np.sqrt(vel_squared)
return vel_squared
[docs]
def get_asymm_drift_profile(self, r, model, tracer=None):
"""
Calculate the asymmetric drift correction
Parameters
----------
r : float or array
Radius or radii in kpc
model : `ModelSet`
ModelSet the correction is applied to
tracer : string
Name of the dynamical tracer (used to determine which is the appropriate dispersion profile).
Returns
-------
vel_asymm_drift_sq : float or array
Square velocity correction in km^2/s^2 associated with asymmetric drift
"""
if tracer is None:
raise ValueError("Must specify 'tracer' to determine pressure support!")
# Compatibility hack, to handle the changed galaxy structure
# (properties, not attributes for data[*], instrument
if 'pressure_support_type' not in self.__dict__.keys():
# Set to default if missing
self.pressure_support_type = 1
if 'pressure_support_n' not in self.__dict__.keys():
# Set to default if missing:
self.pressure_support_n = None
if (self.pressure_support_type == 1) | \
(self.pressure_support_type == 2):
pre = self.get_pressure_support_param(model, param='re')
if tracer not in model.dispersions.keys():
raise AttributeError("The dispersion profile for tracer={} not found!".format(tracer))
sigma = model.dispersions[tracer](r)
if self.pressure_support_type == 1:
# Pure exponential derivation // n = 1
vel_asymm_drift_sq = 3.36 * (r / pre) * sigma ** 2
elif self.pressure_support_type == 2:
# Modified derivation that takes into account n_disk / n
pn = self.get_pressure_support_param(model, param='n')
bn = scp_spec.gammaincinv(2. * pn, 0.5)
vel_asymm_drift_sq = 2. * (bn/pn) * np.power((r/pre), 1./pn) * sigma**2
elif self.pressure_support_type == 3:
# Direct calculation from sig0^2 dlnrho/dlnr:
# Assumes constant sig0 -- eg Eq 3, Burkert+10
# NEEDS TO BE JUST RHO FOR THE GAS:
dlnrhogas_dlnr = model.get_dlnrhogas_dlnr(r)
vel_asymm_drift_sq = - dlnrhogas_dlnr * sigma**2
return vel_asymm_drift_sq
[docs]
def get_pressure_support_param(self, model, param=None):
"""
Return model parameters needed for asymmetric drift calculation
Parameters
----------
model : `ModelSet`
ModelSet the correction is applied to
param : {'n', 're'}
Which parameter value to retrieve. Either the effective radius or Sersic index
Returns
-------
p_val : float
Parameter value
"""
p_altnames = {'n': 'n',
're': 'r_eff'}
if param not in ['n', 're']:
raise ValueError("get_pressure_support_param() only works for param='n', 're'")
paramkey = 'pressure_support_{}'.format(param)
p_altname = p_altnames[param]
if self.__dict__[paramkey] is None:
p_val = None
for cmp in model.mass_components:
if model.mass_components[cmp]:
mcomp = model.components[cmp]
if (mcomp._subtype == 'baryonic') | (mcomp._subtype == 'combined'):
if (isinstance(mcomp, DiskBulge)) | (isinstance(mcomp, LinearDiskBulge)):
p_val = mcomp.__getattribute__('{}_disk'.format(p_altname)).value
elif (isinstance(mcomp, Sersic)) | (isinstance(mcomp, ExpDisk)):
p_val = mcomp.__getattribute__('{}'.format(p_altname)).value
break
if p_val is None:
if param == 're':
logger.warning("No disk baryonic mass component found. Using "
"1 kpc as the pressure support effective"
" radius")
p_val = 1.0
elif param == 'n':
logger.warning("No disk baryonic mass component found. Using "
"n=1 as the pressure support Sersic index")
p_val = 1.0
else:
p_val = self.__dict__[paramkey]
return p_val
def _adiabatic(rprime, r_adi, adia_v_dm, adia_x_dm, adia_v_disk):
if rprime <= 0.:
rprime = 0.1
if rprime < adia_x_dm[1]:
rprime = adia_x_dm[1]
rprime_interp = scp_interp.interp1d(adia_x_dm, adia_v_dm,
fill_value="extrapolate")
result = (r_adi + r_adi * ((r_adi*adia_v_disk**2) /
(rprime*(rprime_interp(rprime))**2)) - rprime)
return result
def _adiabatic_sq(rprime, r_adi, adia_v_dm_sq, adia_x_dm, adia_v_disk_sq):
if rprime <= 0.:
rprime = 0.1
if rprime < adia_x_dm[1]:
rprime = adia_x_dm[1]
rprime_interp = scp_interp.interp1d(adia_x_dm, np.sqrt(adia_v_dm_sq),
fill_value="extrapolate")
result = (r_adi + r_adi * ((r_adi*adia_v_disk_sq) /
(rprime*(rprime_interp(rprime))**2)) - rprime)
return result