# coding=utf8
# Copyright (c) MPE/IR-Submm Group. See LICENSE.rst for license information.
#
# File containing classes for defining different instruments
# to "observe" a model galaxy.
from __future__ import (absolute_import, division, print_function,
unicode_literals)
# Standard library
import logging
# Third party imports
import numpy as np
# import astropy.convolution as apy_conv
from astropy.convolution.utils import discretize_model as _discretize_model
from astropy.modeling import models as apy_models
from scipy.signal import fftconvolve
import astropy.units as u
import astropy.constants as c
from radio_beam import Beam as _RBeam
__all__ = ["Instrument", "GaussianBeam", "DoubleBeam", "Moffat", "LSF"]
# CONSTANTS
sig_to_fwhm = 2.*np.sqrt(2.*np.log(2.))
# LOGGER SETTINGS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('DysmalPy')
logger.setLevel(logging.INFO)
def _normalized_gaussian1D_kern(sigma_pixel):
x_size = int(np.ceil(8*sigma_pixel))
if x_size % 2 == 0:
x_size += 1
x_range = (-(int(x_size) - 1) // 2, (int(x_size) - 1) // 2 + 1)
return _discretize_model(apy_models.Gaussian1D(1.0 / (np.sqrt(2 * np.pi) * sigma_pixel),
0, sigma_pixel), x_range)
[docs]
class Instrument:
"""
Base class to define an instrument to observe a model galaxy.
Parameters
----------
beam : 2D array, `~dysmalpy.instrument.GaussianBeam`, `~dysmalpy.instrument.DoubleBeam`, or `~dysmalpy.instrument.Moffat`
Object describing the PSF of the instrument
beam_type : {`'analytic'`, `'empirical'`}
* `'analytic'` implies the beam is one of the provided beams in `dysmalpy`.
* `'empirical'` implies the provided beam is a 2D array that describes
the convolution kernel.
lsf : LSF object
Object describing the line spread function of the instrument
pixscale : float or `~astropy.units.Quantity`
Size of one pixel on the sky. If no units are used, arcseconds are assumed.
spec_type : {`'velocity'`, `'wavelength'`}
Whether the spectral axis is in velocity or wavelength space
spec_start : `~astropy.units.Quantity`
The value and unit of the first spectral channel
spec_step : `~astropy.units.Quantity`
The spacing of the spectral channels
nspec : int
Number of spectral channels
fov : tuple
x and y size of the FOV of the instrument in pixels
name : str
Name of the instrument
"""
def __init__(self, beam=None, beam_type=None, lsf=None, pixscale=None,
spec_type='velocity', spec_start=-1000*u.km/u.s,
spec_step=10*u.km/u.s, nspec=201,
line_center=None,
smoothing_type=None, smoothing_npix=None,
moment=False,
apertures=None,
integrate_cube=True, slit_width=None, slit_pa=None,
fov=None,
ndim=None,
name='Instrument'):
self.name = name
self.ndim = ndim
self.pixscale = pixscale
# Case of two beams: analytic and empirical: if beam_type==None, assume analytic
self.beam = beam
self.beam_type = beam_type
self._beam_kernel = None
self.lsf = lsf
self._lsf_kernel = None
self.fov = fov
self.spec_type = spec_type
self.spec_start = spec_start
self.spec_step = spec_step
self.nspec = nspec
# Wave spec options:
self.line_center = line_center
# 3D / 2D options:
self.smoothing_type = smoothing_type
self.smoothing_npix = smoothing_npix
# 2D / 1D options:
self.moment = moment
# 1D options:
self.apertures = apertures
# 0D options
self.integrate_cube = integrate_cube
self.slit_width = slit_width
self.slit_pa = slit_pa
[docs]
def convolve(self, cube, spec_center=None):
"""
Perform convolutions that are associated with this Instrument
This method convolves the input cube such that the output cube has the correct
PSF and/or LSF as given by the Instrument's beam and lsf. The cube is first convolved to
the PSF, then to the LSF.
Parameters
----------
cube : 3D array
Input model cube to convolve with the instrument PSF and/or LSF
The first dimension is assumed to correspond to the spectral dimension.
spec_center : `~astropy.units.Quantity`
Central wavelength to define the conversion from velocity to wavelength
Only necessary if the spectral type is 'wavelength' and using an LSF
Returns
-------
cube : 3D array
New model cube after applying the PSF and LSF convolution
"""
if self.beam is None and self.lsf is None:
# Nothing to do if a PSF and LSF aren't set for the instrument
logger.warning("No convolution being performed since PSF and LSF "
"haven't been set!")
return cube
elif self.lsf is None:
cube_conv = self.convolve_with_beam(cube)
elif self.beam is None:
cube_conv = self.convolve_with_lsf(cube, spec_center=spec_center)
else:
# Separate convolution steps: note this is FASTER than doing a composite convolution
cube_conv_beam = self.convolve_with_beam(cube)
cube_conv = self.convolve_with_lsf(cube_conv_beam, spec_center=spec_center)
return cube_conv
[docs]
def convolve_with_lsf(self, cube, spec_center=None):
"""
Performs line broadening due to the line spread function
Parameters
----------
cube : 3D array
Input model cube to convolve with the instrument LSF
The first dimension is assumed to correspond to the spectral dimension.
spec_center : `~astropy.units.Quantity`
Central wavelength to define the conversion from velocity to wavelength
Only necessary if the instrument spectral type is 'wavelength'
Returns
-------
cube : 3D array
New model cube after applying the LSF convolution
"""
if self._lsf_kernel is None:
self.set_lsf_kernel(spec_center=spec_center)
cube_conv = fftconvolve(cube.copy(), self._lsf_kernel.copy(), mode='same')
return cube_conv
[docs]
def convolve_with_beam(self, cube):
"""
Performs spatial broadening due to the point spread function
Parameters
----------
cube : 3D array
Input model cube to convolve with the instrument PSF
The first dimension is assumed to correspond to the spectral dimension.
Returns
-------
cube : 3D array
New model cube after applying the PSF convolution
"""
if self.pixscale is None:
raise ValueError("Pixelscale for this instrument has not "
"been set yet. Can't convolve with beam.")
if self._beam_kernel is None:
self.set_beam_kernel()
cube_conv = fftconvolve(cube.copy(), self._beam_kernel.copy(), mode='same')
return cube_conv
def _clear_kernels(self):
"""
Delete pre-computed kernels
"""
self._beam_kernel = None
self._lsf_kernel = None
[docs]
def set_beam_kernel(self, support_scaling=12.):
"""
Calculate and store the PSF convolution kernel
Parameters
----------
support_scaling : int
The amount to scale the stddev to determine the
size of the kernel
"""
if (self.beam_type == 'analytic') | (self.beam_type == None):
if isinstance(self.beam, GaussianBeam):
kernel = self.beam.as_kernel(self.pixscale, support_scaling=support_scaling)
kern2D = kernel.array
else:
kernel = self.beam.as_kernel(self.pixscale, support_scaling=support_scaling)
if isinstance(self.beam, DoubleBeam):
kern2D = kernel
elif isinstance(self.beam, Moffat):
kern2D = kernel
elif self.beam_type == 'empirical':
if len(self.beam.shape) == 1:
raise ValueError("1D beam/PSF not currently supported")
kern2D = self.beam.copy()
kern2D[~np.isfinite(kern2D)] = 0. # Replace NaNs/non-finite with zero
kern2D[kern2D<0.] = 0. # Replace < 0 with zero:
kern2D /= np.sum(kern2D[np.isfinite(kern2D)]) # need to normalize
kern3D = np.zeros(shape=(1, kern2D.shape[0], kern2D.shape[1],))
kern3D[0, :, :] = kern2D
self._beam_kernel = kern3D
[docs]
def set_lsf_kernel(self, spec_center=None):
"""
Calculate and store the LSF convolution kernel
Parameters
----------
spec_center : `~astropy.units.Quantity`, optional
Central wavelength that corresponds to 0 velocity
Only necessary if `Instrument.spec_type = 'wavelength'`
and `Instrument.spec_center` hasn't been set.
"""
if (self.spec_step is None):
raise ValueError("Spectral step not defined.")
elif (self.spec_step is not None) and (self.spec_type == 'velocity'):
kernel = self.lsf.as_velocity_kernel(self.spec_step)
elif (self.spec_step is not None) and (self.spec_type == 'wavelength'):
if (self.line_center is None) and (spec_center is None):
raise ValueError("Center wavelength not defined in either "
"the instrument or call to convolve.")
elif (spec_center is not None):
kernel = self.lsf.as_wave_kernel(self.spec_step, spec_center)
else:
kernel = self.lsf.as_wave_kernel(self.spec_step, self.line_center)
#kern1D = kernel.array
kern1D = kernel
kern3D = np.zeros(shape=(kern1D.shape[0], 1, 1,))
kern3D[:, 0, 0] = kern1D
self._lsf_kernel = kern3D
@property
def beam(self):
return self._beam
@beam.setter
def beam(self, new_beam):
if isinstance(new_beam, GaussianBeam) | isinstance(new_beam, Moffat) | isinstance(new_beam, DoubleBeam):
self._beam = new_beam
elif new_beam is None:
self._beam = None
else:
raise TypeError("Beam must be an instance of "
"instrument.GaussianBeam, instrument.Moffat, or instrument.DoubleBeam")
@property
def lsf(self):
return self._lsf
@lsf.setter
def lsf(self, new_lsf):
self._lsf = new_lsf
@property
def pixscale(self):
return self._pixscale
@pixscale.setter
def pixscale(self, value):
if value is None:
self._pixscale = value
elif not isinstance(value, u.Quantity):
logger.warning("No units on pixscale. Assuming arcseconds.")
self._pixscale = value*u.arcsec
else:
if (u.arcsec).is_equivalent(value):
self._pixscale = value
else:
raise u.UnitsError("pixscale not in equivalent units to "
"arcseconds.")
@property
def spec_step(self):
return self._spec_step
@spec_step.setter
def spec_step(self, value):
if value is None:
self._spec_step = value
elif not isinstance(value, u.Quantity):
logger.warning("No units on spec_step. Assuming km/s.")
self._spec_step = value * u.km/u.s
else:
self._spec_step = value
@property
def spec_center(self):
if (self.spec_start is None) | (self.nspec is None):
return None
else:
return (self.spec_start + np.round(self.nspec/2)*self.spec_step)
[docs]
class GaussianBeam(_RBeam):
"""
Re-definition of radio_beam.Beam to allow it to work with copy.deepcopy and copy.copy.
PA: angle to beam major axis, in deg E of N (0 is N, +90 is E).
"""
def __deepcopy__(self, memo):
self2 = type(self)(major=self._major, minor=self._minor, pa=self._pa, area=None,
default_unit=self.default_unit, meta=self.meta)
self2.__dict__.update(self.__dict__)
return self2
def __copy__(self):
self2 = type(self)(major=self._major, minor=self._minor, pa=self._pa, area=None,
default_unit=self.default_unit, meta=self.meta)
self2.__dict__.update(self.__dict__)
return self2
[docs]
class DoubleBeam:
"""
Beam object that is the superposition of two Gaussian Beams
Parameters
----------
major1 : `~astropy.units.Quantity`
FWHM along the major axis of the first Gaussian beam
minor1 : `~astropy.units.Quantity`
FWHM along the minor axis of the first Gaussian beam
pa1 : `~astropy.units.Quantity`
Position angle of the first Gaussian beam:
angle to beam major axis, in deg E of N (0 is N, +90 is E).
scale1 : float
Flux scaling for the first Gaussian beam
major2 : `~astropy.units.Quantity`
FWHM along the major axis of the second Gaussian beam
minor2 : `~astropy.units.Quantity`
FWHM along the minor axis of the second Gaussian beam
pa2 : `~astropy.units.Quantity`
Position angle of the second Gaussian beam:
angle to beam major axis, in deg E of N (0 is N, +90 is E).
scale2 : float
Flux scaling for the second Gaussian beam
"""
def __init__(self, major1=None, minor1=None, pa1=None, scale1=None,
major2=None, minor2=None, pa2=None, scale2=None):
if (major1 is None) or (major2 is None):
raise ValueError('Need to specify at least the major axis FWHM of each beam component.')
if minor1 is None:
minor1 = major1
if minor2 is None:
minor2 = major2
if pa1 is None:
pa1 = 0.*u.deg
if pa2 is None:
pa2 = 0.*u.deg
if scale1 is None:
scale1 = 1.0
if scale2 is None:
scale2 = scale1
self.beam1 = GaussianBeam(major=major1, minor=minor1, pa=pa1)
self.beam2 = GaussianBeam(major=major2, minor=minor2, pa=pa2)
self._scale1 = scale1
self._scale2 = scale2
[docs]
def as_kernel(self, pixscale, support_scaling=None):
"""
Calculate the convolution kernel for the DoubleBeam
Parameters
----------
pixscale : `~astropy.units.Quantity`
Pixel scale of image that will be convolved
support_scaling : int
The amount to scale the stddev to determine the
size of the kernel
Returns
-------
kernel_total : 2D array
Convolution kernel for the DoubleBeam
"""
kernel1 = self.beam1.as_kernel(pixscale, support_scaling=support_scaling)
kernel2 = self.beam2.as_kernel(pixscale, support_scaling=support_scaling)
if kernel1.shape[0] > kernel2.shape[1]:
xsize = kernel1.shape[0]
ysize= kernel1.shape[1]
kernel2 = self.beam2.as_kernel(pixscale, x_size=xsize, y_size=ysize)
elif kernel1.shape[0] < kernel2.shape[1]:
xsize = kernel2.shape[0]
ysize = kernel2.shape[1]
kernel1 = self.beam1.as_kernel(pixscale, x_size=xsize, y_size=ysize)
# Combine the kernels
kernel_total = 10. * (kernel1.array * self._scale1 / np.sum(kernel1.array) +
kernel2.array * self._scale2 / np.sum(kernel2.array))
return kernel_total
def __deepcopy__(self, memo):
self2 = type(self)(major1=self.beam1._major, major2=self.beam2._major, minor1=self.beam1._minor,
minor2=self.beam2._minor, pa1=self.beam1._pa, pa2=self.beam2._pa,
scale1=self._scale1, scale2=self._scale2)
self2.__dict__.update(self.__dict__)
return self2
def __copy__(self):
self2 = type(self)(major1=self.beam1._major, major2=self.beam2._major, minor1=self.beam1._minor,
minor2=self.beam2._minor, pa1=self.beam1._pa, pa2=self.beam2._pa,
scale1=self._scale1, scale2=self._scale2)
self2.__dict__.update(self.__dict__)
return self2
[docs]
class Moffat(object):
"""
Object describing a Moffat PSF
Parameters
----------
major_fwhm : `~astropy.units.Quantity`
FWHM of the Moffat PSF along the major axis
minor_fwhm : `~astropy.units.Quantity`
FWHM of the Moffat PSF along the minor axis
pa : `~astropy.units.Quantity`
Position angle of major axis of the Moffat PSF, in deg E of N (0 is N, +90 is E).
beta : float
beta parameter of the Moffat PSF
padfac : int
The amount to scale the stddev to determine the
size of the kernel
"""
def __init__(self, major_fwhm=None, minor_fwhm=None, pa=None, beta=None, padfac=12.):
if (major_fwhm is None) | (beta is None):
raise ValueError('Need to specify at least the major axis FWHM + beta of beam.')
if minor_fwhm is None:
minor_fwhm = major_fwhm
if (major_fwhm != minor_fwhm) & (pa is None):
raise ValueError("Need to specifiy 'pa' to have elliptical PSF!")
if pa is None:
pa = 0.*u.deg
self.major_fwhm = major_fwhm
self.minor_fwhm = minor_fwhm
self.pa = pa
self.beta = beta
self.alpha = self.major_fwhm/(2.*np.sqrt(np.power(2., 1./np.float(self.beta)) - 1 ))
self.padfac = padfac
[docs]
def as_kernel(self, pixscale, support_scaling=None):
"""
Calculate the convolution kernel for the Moffat PSF
Parameters
----------
pixscale : `~astropy.units.Quantity`
Pixel scale of image that will be convolved
support_scaling : int
The amount to scale the stddev to determine the
size of the kernel
Returns
-------
kernel : 2D array
Convolution kernel for the Moffat PSF
"""
try:
pixscale = pixscale.to(self.major_fwhm.unit)
pixscale = pixscale.value
major_fwhm = self.major_fwhm.value
minor_fwhm = self.minor_fwhm.value
pa = self.pa.value
alpha = self.alpha.value/pixscale
except:
pixscale = pixscale.to(self.major_fwhm.unit)
pixscale = pixscale.value
major_fwhm = self.major_fwhm
minor_fwhm = self.minor_fwhm
pa = self.pa
alpha = self.alpha/pixscale
if support_scaling is not None:
padfac = support_scaling
else:
padfac = self.padfac
# Npix: rounded std dev[ in pix] * padfac * 2 -- working in DIAMETER
# Factor of 1./0.7: For beta~2.5, Moffat FWHM ~ 0.7*Gaus FWHM
# -> add extra padding so the Moffat window
# isn't much smaller than similar Gaussian PSF.
npix = np.int(np.ceil(major_fwhm/pixscale/2.35 * 2 * 1./0.7 * padfac))
if npix % 2 == 0:
npix += 1
# Arrays
y, x = np.indices((npix, npix), dtype=float)
x -= (npix-1)/2.
y -= (npix-1)/2.
cost = np.cos((90.+pa)*np.pi/180.)
sint = np.sin((90.+pa)*np.pi/180.)
xp = cost*x + sint*y
yp = -sint*x + cost*y
qtmp = minor_fwhm / major_fwhm
r = np.sqrt(xp**2 + (yp/qtmp)**2)
kernel = (self.beta-1.)/(np.pi * alpha**2) * np.power( (1. + (r/alpha)**2 ), -1.*self.beta )
return kernel
[docs]
class LSF(u.Quantity):
"""
An object to handle line spread functions.
"""
def __new__(cls, dispersion=None, default_unit=u.km/u.s, meta=None):
"""
Create a new Gaussian Line Spread Function
Parameters
----------
dispersion : :class:`~astropy.units.Quantity` with speed equivalency
The standard deviation of the Gaussian LSF.
default_unit : :class:`~astropy.units.Unit`
The unit to impose on dispersion if they are specified as floats
"""
# TODO: Allow for wavelength dispersion to be specified
# error checking
# give specified values priority
if dispersion is not None:
if (u.km/u.s).is_equivalent(dispersion):
dispersion = dispersion
else:
logger.warning("Assuming dispersion has been specified in "
"{}.".format(default_unit))
dispersion = dispersion * default_unit
self = super(LSF, cls).__new__(cls, dispersion.value, default_unit)
self._dispersion = dispersion
self.default_unit = default_unit
if meta is None:
self.meta = {}
elif isinstance(meta, dict):
self.meta = meta
else:
raise TypeError("metadata must be a dictionary")
return self
def __repr__(self):
return "LSF: Vel. Disp. = {0}".format(
self.dispersion.to(self.default_unit))
def __str__(self):
return self.__repr__()
@property
def dispersion(self):
return self._dispersion
[docs]
def vel_to_lambda(self, wave):
"""
Convert from velocity dispersion to wavelength dispersion for
a given central wavelength.
Parameters
----------
wave : `~astropy.units.Quantity`
Central wavelength to use in conversion from velocity to wavelength
Returns
-------
wdisp : `~astropy.units.Quantity`
Dispersion of LSF in wavelength units
"""
if not isinstance(wave, u.Quantity):
raise TypeError("wave must be a Quantity object. "
"Try 'wave*u.Angstrom' or another equivalent unit.")
return (self.dispersion/c.c.to(self.dispersion.unit))*wave
[docs]
def as_velocity_kernel(self, velstep, **kwargs):
"""
Return a Gaussian convolution kernel in velocity space
Parameters
----------
velstep : `~astropy.units.Quantity`
Step size in velocity of one spectral channel
Returns
-------
vel_kern : 1D array
Convolution kernel for the LSF in velocity space
"""
sigma_pixel = (self.dispersion.value /
velstep.to(self.dispersion.unit).value)
#return apy_conv.Gaussian1DKernel(sigma_pixel, **kwargs)
# Astropy kernel DOES NOT TRUNCATE AS DESIRED:
# Instead, use similar size span, but keep as a normalized Gaussian:
return _normalized_gaussian1D_kern(sigma_pixel)
[docs]
def as_wave_kernel(self, wavestep, wavecenter, **kwargs):
"""
Return a Gaussian convolution kernel in wavelength space
Parameters
----------
wavestep : `~astropy.units.Quantity`
Step size in wavelength of one spectral channel
wavecenter : `~astropy.units.Quantity`
Central wavelength used to convert from velocity to wavelength
Returns
-------
wave_kern : 1D array
Convolution kernal for the LSF in wavelength space
"""
sigma_pixel = self.vel_to_lambda(wavecenter).value/wavestep.value
#return apy_conv.Gaussian1DKernel(sigma_pixel, **kwargs)
# Astropy kernel DOES NOT TRUNCATE AS DESIRED:
# Instead, use similar size span, but keep as a normalized Gaussian:
return _normalized_gaussian1D_kern(sigma_pixel)
def __deepcopy__(self, memo):
self2 = type(self)(dispersion=self._dispersion, default_unit=self.default_unit,
meta=self.meta)
self2.__dict__.update(self.__dict__)
return self2
def __copy__(self):
self2 = type(self)(dispersion=self._dispersion, default_unit=self.default_unit,
meta=self.meta)
self2.__dict__.update(self.__dict__)
return self2