"""
2D Disc models
==============
Classes: Rosenfeld2d, General2d, Velocity, Intensity, Cube, Tools
"""
#TODO in show(): Perhaps use text labels on line profiles to distinguish profiles for more than 2 cubes.
#TODO in make_model(): Find a smart way to detect and pass only the coords needed by a prop attribute.
#TODO in run_mcmc(): Enable an arg to allow the user see the position of parameter walkers every 'arg' steps.
#TODO in General2d: Implement irregular grids (see e.g. meshio from nschloe on github) for the disc grid.
#TODO in General2d: Compute props in the interpolated grid (not in the original grid) to avoid interpolation of props and save time.
#TODO in General2d: Allow the lower surface to have independent intensity and line width parametrisations.
#TODO in General2d: Implement pressure support term
#TODO in make_model(): Allow for warped emitting surfaces, check notes for ideas as to how to solve for multiple intersections between l.o.s and emission surface.
#TODO in __main__ file: show intro message when python -m disc2d
#TODO in run_mcmc(): use get() methods instead of allowing the user to use self obj attributes.
#TODO in make_model(): Allow R_disc to be a free parameter.
#TODO in make_model(): Enable 3D velocities too when subpixel algorithm is used
#TODO in v1.0: migrate to astropy units
#TODO in make_model(): Save/load bestfit/input parameters in json files. These should store relevant info in separate dicts (e.g. nwalkers, attribute functions).
#TODO in run_mcmc(): Implement other minimisation kernels (i.e. Delta_v). Only one kernel currently: difference of intensities on each pixel, on each channel.
#TODO where relevant: see comments next to get_sky_from_disc_coords function appearences.
from __future__ import print_function
from ..utils import constants as sfc
from ..utils import units as sfu
from astropy.convolution import Gaussian2DKernel, convolve
from scipy.interpolate import griddata, interp1d
from scipy.special import ellipk, ellipe
from scipy.optimize import curve_fit
from scipy.integrate import quad
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from matplotlib import ticker
import numpy as np
import matplotlib
import functools
import itertools
import warnings
import numbers
import pprint
import copy
import time
import sys
import os
from multiprocessing import Pool
os.environ["OMP_NUM_THREADS"] = "1"
try:
import termtables
found_termtables = True
except ImportError:
print ("\n*** For nicer outputs we recommend installing 'termtables' by typing in terminal: pip install termtables ***")
found_termtables = False
#warnings.filterwarnings("error")
__all__ = ['Cube', 'Tools', 'Intensity', 'Velocity', 'General2d', 'Rosenfeld2d']
path_file = os.path.dirname(os.path.realpath(__file__))+'/'
"""
matplotlib.rcParams['font.family'] = 'monospace'
matplotlib.rcParams['font.weight'] = 'normal'
matplotlib.rcParams['lines.linewidth'] = 1.5
matplotlib.rcParams['axes.linewidth'] = 3.0
matplotlib.rcParams['xtick.major.width']=1.6
matplotlib.rcParams['ytick.major.width']=1.6
matplotlib.rc('font', size=MEDIUM_SIZE) # controls default text sizes
matplotlib.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of axes title
matplotlib.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of x and y labels
matplotlib.rc('xtick', labelsize=MEDIUM_SIZE-2) # fontsize of y tick labels
matplotlib.rc('ytick', labelsize=MEDIUM_SIZE-2) # fontsize of x tick labels
matplotlib.rc('legend', fontsize=SMALL_SIZE-1) # legend fontsize
matplotlib.rc('figure', titlesize=BIGGER_SIZE) # fontsize of figure title
params = {'xtick.major.size': 6.5,
'ytick.major.size': 6.5
}
matplotlib.rcParams.update(params)
"""
SMALL_SIZE = 10
MEDIUM_SIZE = 15
BIGGER_SIZE = 22
hypot_func = lambda x,y: np.sqrt(x**2 + y**2) #Slightly faster than np.hypot<np.linalg.norm<scipydistance. Checked precision up to au**2 orders and seemed ok.
class InputError(Exception):
"""Exception raised for errors in the input.
Attributes:
expression -- input expression in which the error occurred
message -- explanation of the error
"""
def __init__(self, expression, message):
self.expression = expression
self.message = message
def __str__(self):
return '%s --> %s'%(self.expression, self.message)
class Residuals:
pass
class PlotTools:
@staticmethod
def mod_nticks_cbars(cbars, nbins=5):
for cb in cbars:
cb.locator = ticker.MaxNLocator(nbins=nbins)
cb.update_ticks()
@staticmethod
def mod_major_ticks(ax, axis='both', nbins=6):
ax.locator_params(axis=axis, nbins=nbins)
@staticmethod
def mod_minor_ticks(ax):
ax.minorticks_on()
ax.xaxis.set_minor_locator(ticker.AutoMinorLocator(2)) #1 minor tick per major interval
ax.yaxis.set_minor_locator(ticker.AutoMinorLocator(2))
@classmethod
def make_up_ax(cls, ax, xlims=(None, None), ylims=(None, None),
mod_minor=True, mod_major=True, **kwargs_tick_params):
kwargs_t = dict(labeltop=True, labelbottom=False, top=True, right=True, which='both', direction='in')
kwargs_t.update(kwargs_tick_params)
if mod_major: cls.mod_major_ticks(ax)
if mod_minor: cls.mod_minor_ticks(ax)
ax.set_xlim(*xlims)
ax.set_ylim(*ylims)
ax.tick_params(**kwargs_t)
@staticmethod
def truncate_colormap(cmap, minval=0.0, maxval=1.0, n=256):
new_cmap = matplotlib.colors.LinearSegmentedColormap.from_list(
'trunc({n},{a:.2f},{b:.2f})'.format(n=cmap.name, a=minval, b=maxval),
cmap(np.linspace(minval, maxval, n)))
return new_cmap
@staticmethod
def get_cmap_from_color(color, lev=3):
cmap = matplotlib.colors.to_rgba(color)
newcolors = np.tile(cmap, lev).reshape(lev,4) #Repeats the colour lev times
newcolors[:,-1] = np.linspace(0.25, 0.95, lev) #Modifies alpha only
new_cmap = ListedColormap(newcolors)
return new_cmap
@staticmethod
def mask_cmap_interval(cmap, cmap_lims, mask_lims, mask_color=np.ones(4), append=False):
if isinstance(cmap, str): cmap = plt.get_cmap(cmap)
cmap0, cmap1 = cmap_lims
mask0, mask1 = mask_lims
c0 = (mask0-cmap0)/(cmap1-cmap0)
c1 = (mask1-cmap0)/(cmap1-cmap0)
id0 = int(round(c0*(cmap.N)))
id1 = int(round(c1*(cmap.N)))
new_cmap = copy.copy(cmap)
new_cmap._init()
"""#The following does not work, plt does not know where to locate the newly added colorss
if append:
mask_color_arr = np.broadcast_to(mask_color, (id1-id0, 4))
new_cmap._lut = np.insert(new_cmap._lut, id0, mask_color_arr, axis=0)
new_cmap.N = cmap.N + id1-id0
#Next line redoes the continuous linearsegmented colormap, thus the masked color block is reduced to a single color
#new_cmap = new_cmap._resample(new_cmap.N)
"""
new_cmap._lut[id0:id1,:] = mask_color
return new_cmap
@staticmethod
def get_continuous_cmap(hex_list, float_list=None):
"""
Taken from https://github.com/KerryHalupka/custom_colormap
creates and returns a color map that can be used in heat map figures.
If float_list is not provided, colour map graduates linearly between each color in hex_list.
If float_list is provided, each color in hex_list is mapped to the respective location in float_list.
Parameters
----------
hex_list: list of hex code strings
float_list: list of floats between 0 and 1, same length as hex_list. Must start with 0 and end with 1.
Returns
----------
matplotlib cmap
Examples
----------
fig, ax = plt.subplots(1,1)
hex_list = ['#0091ad', '#fffffc', '#ffd166']
x, y = np.mgrid[-5:5:0.05, -5:5:0.05]
z = (np.sqrt(x**2 + y**2) + np.sin(x**2 + y**2))
im = ax.imshow(z, cmap=get_continuous_cmap(hex_list))
fig.colorbar(im)
ax.yaxis.set_major_locator(plt.NullLocator()) # remove y axis ticks
ax.xaxis.set_major_locator(plt.NullLocator()) # remove x axis ticks
plt.show()
"""
rgb_list = [matplotlib.colors.to_rgb(i) for i in hex_list]
if float_list is None: float_list = np.linspace(0,1,len(rgb_list))
cdict = dict()
for num, col in enumerate(['red', 'green', 'blue']):
col_list = [[float_list[i], rgb_list[i][num], rgb_list[i][num]] for i in range(len(float_list))]
cdict[col] = col_list
cmap_new = matplotlib.colors.LinearSegmentedColormap('my_cmp', segmentdata=cdict, N=256)
return cmap_new
@staticmethod
def append_stddev_panel(ax, prop, weights=None, hist=False, fit_gauss_hist=False): #attach significance panel to ax, based on dist. of points prop
gauss = lambda x, A, mu, sigma: A*np.exp(-(x-mu)**2/(2.*sigma**2))
ax1_ylims = ax[-2].get_ylim()
for axi in ax[:-1]: axi.tick_params(which='both', right=False, labelright=False)
ax[-1].tick_params(which='both', top=False, bottom=False, labelbottom=False,
left=False, labelleft=False, right=True, labelright=True)
ax[-1].yaxis.set_label_position('right')
ax[-1].spines['left'].set_color('0.6')
ax[-1].spines['left'].set_linewidth(3.5)
if weights is not None:
prop_mean = np.sum(weights*prop)/np.sum(weights)
prop_std = Tools.weighted_std(prop, weights, weighted_mean=prop_mean)
else:
prop_mean = np.mean(prop)
prop_std = np.std(prop)
max_y = 1.0
if hist:
n, bins, patches = ax[-1].hist(prop, bins=2*int(round(len(prop)**(1/3.)))-1, orientation='horizontal',
density=True, linewidth=1.5, facecolor='0.95', edgecolor='k', alpha=1.0)
max_y = np.max(n)
if fit_gauss_hist: #Fit Gaussian to histogram to compare against data distribution
coeff, var_matrix = curve_fit(gauss, 0.5*(bins[1:]+bins[:-1]), n, p0=[max_y, prop_mean, prop_std])
prop_x = np.linspace(prop_mean-4*prop_std, prop_mean+4*prop_std, 100)
prop_y = gauss(prop_x, *coeff)
ax[-1].plot(prop_y, prop_x, color='tomato', ls='--', lw=2.0)
prop_x = np.linspace(prop_mean-4*prop_std, prop_mean+4*prop_std, 100)
prop_pars = [max_y, prop_mean, prop_std]
prop_y = gauss(prop_x, *prop_pars)
ax[-1].plot(prop_y, prop_x, color='limegreen', lw=3.5)
ax[-1].set_xlim(-0.2*max_y, 1.2*max_y)
#ax[-1].plot([-0.2, 1.0], [prop_mean]*2, color='0.6', lw=2.5)
#for axi in ax[:-1]: axi.axhline(prop_mean, color='0.6', lw=2.5)
for i in range(0,4):
prop_stdi = prop_mean+i*prop_std
gauss_prop_stdi = gauss(prop_stdi, *prop_pars)
ax[-1].plot([-0.2*max_y, gauss_prop_stdi], [prop_stdi]*2, color='0.6', ls=':', lw=2.)
for axi in ax[:-1]: axi.axhline(prop_stdi, color='0.6', ls=':', lw=2.)
if prop_stdi < ax1_ylims[-1] and i>0:
ax[-1].text(gauss_prop_stdi+0.2*max_y, prop_stdi, r'%d$\sigma$'%i,
fontsize=14, ha='center', va='center', rotation=-90)
for axi in ax: axi.set_ylim(*ax1_ylims)
class Canvas3d:
pass
class Contours(PlotTools):
@staticmethod
def emission_surface(ax, R, phi, R_lev=None, phi_lev=None, extent=None,
proj_offset=None, X=None, Y=None, which='both',
kwargs_R={}, kwargs_phi={}):
kwargs_phif = dict(linestyles=':', linewidths=1.0, colors='k')
kwargs_Rf = dict(linewidths=1.4, colors='k')
kwargs_phif.update(kwargs_phi)
kwargs_Rf.update(kwargs_R)
near_nonan = ~np.isnan(R['upper'])
Rmax = np.max(R['upper'][near_nonan])
if extent is None:
extent = np.array([-Rmax, Rmax, -Rmax, Rmax])/sfu.au
kwargs_phif.update({'extent': extent})
kwargs_Rf.update({'extent': extent})
if R_lev is None: R_lev = np.linspace(0.06, 0.97, 4)*Rmax
else: R_lev = np.sort(R_lev)
if phi_lev is None: phi_lev = np.linspace(-np.pi*0.95, np.pi, 11, endpoint=False)
#Splitting phi into pos and neg to try and avoid ugly contours close to -pi and pi
phi_lev_neg = phi_lev[phi_lev<0]
phi_lev_pos = phi_lev[phi_lev>=0]
phi_neg_near = np.where((phi['upper']<0) & (R['upper']>R_lev[0]) & (R['upper']<R_lev[-1]), phi['upper'], np.nan)
phi_pos_near = np.where((phi['upper']>0) & (R['upper']>R_lev[0]) & (R['upper']<R_lev[-1]), phi['upper'], np.nan)
phi_neg_far = np.where((phi['lower']<0) & (R['lower']>R_lev[0]) & (R['lower']<R_lev[-1]), phi['lower'], np.nan)
phi_pos_far = np.where((phi['lower']>0) & (R['lower']>R_lev[0]) & (R['lower']<R_lev[-1]), phi['lower'], np.nan)
if proj_offset is not None: #For 3d projections
ax.contour(X, Y, R['upper'], offset=proj_offset, levels=R_lev, **kwargs_Rf)
ax.contour(X, Y, np.where(near_nonan, np.nan, R['lower']), offset=proj_offset, levels=R_lev, **kwargs_Rf)
ax.contour(X, Y, phi_pos_near, offset=proj_offset, levels=phi_lev_pos, **kwargs_phif)
ax.contour(X, Y, phi_neg_near, offset=proj_offset, levels=phi_lev_neg, **kwargs_phif)
ax.contour(X, Y, np.where(near_nonan, np.nan, phi_pos_far), offset=proj_offset, levels=phi_lev_pos, **kwargs_phif)
ax.contour(X, Y, np.where(near_nonan, np.nan, phi_neg_far), offset=proj_offset, levels=phi_lev_neg, **kwargs_phif)
else:
if which=='both':
ax.contour(R['upper'], levels=R_lev, **kwargs_Rf)
ax.contour(np.where(near_nonan, np.nan, R['lower']), levels=R_lev, **kwargs_Rf)
ax.contour(phi_pos_near, levels=phi_lev_pos, **kwargs_phif)
ax.contour(phi_neg_near, levels=phi_lev_neg, **kwargs_phif)
ax.contour(np.where(near_nonan, np.nan, phi_pos_far), levels=phi_lev_pos, **kwargs_phif)
ax.contour(np.where(near_nonan, np.nan, phi_neg_far), levels=phi_lev_neg, **kwargs_phif)
elif which=='upper':
ax.contour(R['upper'], levels=R_lev, **kwargs_Rf)
ax.contour(phi_pos_near, levels=phi_lev_pos, **kwargs_phif)
ax.contour(phi_neg_near, levels=phi_lev_neg, **kwargs_phif)
elif which=='lower':
ax.contour(R['lower'], levels=R_lev, **kwargs_Rf)
ax.contour(phi_pos_far, levels=phi_lev_pos, **kwargs_phif)
ax.contour(phi_neg_far, levels=phi_lev_neg, **kwargs_phif)
#The following method can be optimised if the contour finding process is separated from the plotting
# by returning coords_list and inds_cont first, which will allow the user use the same set of contours to plot different props.
@staticmethod
def prop_along_coords(ax, prop, coords, coord_ref, coord_levels,
ax2=None, X=None, Y=None,
PA=0,
acc_threshold=0.05,
max_prop_threshold=np.inf,
color_bounds=[np.pi/5, np.pi/2],
colors=['k', 'dodgerblue', (0,1,0), (1,0,0)],
lws=[2, 0.5, 0.2, 0.2], lw_ax2_factor=1,
subtract_quadrants=False,
subtract_func=np.subtract):
"""
Compute radial/azimuthal contours according to the model disc geometry
to get and plot information from the input 2D property ``prop``.
Parameters
----------
ax : `matplotlib.axes` instance, optional
ax instance to make the plot.
prop : array_like, shape (nx, ny)
Input 2D field to extract information along the computed contours.
coords : list, shape (2,)
coords[0] [array_like, shape (nx, ny)], is the coordinate 2D map onto which contours will be computed using the input ``coord_levels``;
coords[1] [array_like, shape (nx, ny)], is the coordinate 2D map against which the ``prop`` values are plotted. The output plot is prop vs coords[1]
coord_ref : scalar
Reference coordinate (referred to ``coords[0]``) to highlight among the other contours.
coord_levels : array_like, shape (nlevels,)
Contour levels to be extracted from ``coords[0]``.
ax2 : `matplotlib.axes` instance (or list of instances), optional
Additional ax(s) instance(s) to plot the location of contours in the disc.
If provided, ``X`` and ``Y`` must also be passed.
X : array_like, shape (nx, ny), optional
Meshgrid of the model x coordinate (see `numpy.meshgrid`). Required if ax2 instance(s) is provided.
Y : array_like, shape (nx, ny), optional
Meshgrid of the model y coordinate (see `numpy.meshgrid`). Required if ax2 instance(s) is provided.
PA : scalar, optional
Reference position angle.
acc_threshold : float, optional
Threshold to accept points on contours at constant coords[0]. If obtained level at a point is such that np.abs(level-level_reference)<acc_threshold the point is accepted
max_prop_threshold : float, optional
Threshold to accept points of contours. Rejects residuals of the contour if they are < max_prop_threshold. Useful to reject hot pixels.
color_bounds : array_like, shape (nbounds,), optional
Colour bounds with respect to the reference contour coord_ref.
colors : array_like, shape (nbounds+2,), optional
Contour colors. (i=0) is reserved for the reference contour coord_ref,
(i>0) for contour colors according to the bounds in color_bounds.
lws : array_like, shape (nbounds+2), optional
Contour linewidths. Similarly, (i=0) is reserved for coord_ref and
(i>0) for subsequent bounds.
subtract_quadrants : bool, optional
If True, subtract residuals by folding along the projected minor axis of the disc. Currently working for azimuthal contours only.
subtract_func : function, optional
If subtract_quadrants, this function is used to operate between folded quadrants. Defaults to np.subtract.
"""
from skimage import measure
coord_list, lev_list, resid_list, color_list = [], [], [], []
if np.sum(coord_levels==coord_ref)==0: coord_levels = np.append(coord_levels, coord_ref)
for lev in coord_levels:
contour = measure.find_contours(coords[0], lev) #, fully_connected='high', positive_orientation='high')
if len(contour)==0:
print ('no contours found for phi =', lev)
continue
ind_good = np.argmin([np.abs(lev-coords[0][tuple(np.round(contour[i][0]).astype(np.int))]) for i in range(len(contour))]) #getting ind of closest contour to lev
inds_cont = np.round(contour[ind_good]).astype(np.int)
inds_cont = [tuple(f) for f in inds_cont]
first_cont = np.array([coords[0][i] for i in inds_cont])
second_cont = np.array([coords[1][i] for i in inds_cont])
prop_cont = np.array([prop[i] for i in inds_cont])
corr_inds = np.abs(first_cont-lev) < acc_threshold
if lev == coord_ref: zorder=10
else: zorder=np.random.randint(0,10)
lw = lws[-1]
color = colors[-1]
for i,bound in enumerate(color_bounds):
if lev == coord_ref:
lw = lws[0]
color = colors[0]
zorder = 10
break
if np.abs(coord_ref - lev) < bound:
lw = lws[i+1]
color = colors[i+1]
break
if subtract_quadrants:
#if lev < color_bounds[0]: continue
ref_pos = PA+90 #Reference axis for positive angles
ref_neg = PA-90
angles = second_cont[corr_inds]
prop_ = prop_cont[corr_inds]
angles_pos = angles[angles>=0]
angles_neg = angles[angles<0]
relative_diff_pos = ref_pos - angles_pos
relative_diff_neg = ref_neg - angles_neg
angle_diff_pos, prop_diff_pos = [], []
angle_diff_neg, prop_diff_neg = [], []
for i,diff in enumerate(relative_diff_pos):
#Finding where the difference matches that of the current analysis angle
#The -1 flips the sign so that the number on the other side of the symmetry axis is found
ind = np.argmin(np.abs(-1*relative_diff_pos - diff))
mirror_ind = angles==angles_pos[ind]
current_ind = angles==angles_pos[i]
prop_diff = subtract_func(prop_[current_ind][0], prop_[mirror_ind][0])
angle_diff_pos.append(angles_pos[i])
prop_diff_pos.append(prop_diff)
angle_diff_pos = np.asarray(angle_diff_pos)
prop_diff_pos = np.asarray(prop_diff_pos)
if len(angle_diff_pos)>1:
ind_sort_pos = np.argsort(angle_diff_pos)
plot_ang_diff_pos = angle_diff_pos[ind_sort_pos]
plot_prop_diff_pos = prop_diff_pos[ind_sort_pos]
ind_prop_pos = np.abs(plot_prop_diff_pos)<max_prop_threshold
ax.plot(plot_ang_diff_pos[ind_prop_pos], plot_prop_diff_pos[ind_prop_pos], color=color, lw=lw, zorder=zorder)
coord_list.append(plot_ang_diff_pos[ind_prop_pos])
resid_list.append(plot_prop_diff_pos[ind_prop_pos])
color_list.append(color)
lev_list.append(lev)
else:
plot_ang_diff_pos = []
plot_prop_diff_pos = []
for i,diff in enumerate(relative_diff_neg):
ind = np.argmin(np.abs(-1*relative_diff_neg - diff))
mirror_ind = angles==angles_neg[ind]
current_ind = angles==angles_neg[i]
prop_diff = subtract_func(prop_[current_ind][0], prop_[mirror_ind][0])
angle_diff_neg.append(angles_neg[i])
prop_diff_neg.append(prop_diff)
angle_diff_neg = np.asarray(angle_diff_neg)
prop_diff_neg = np.asarray(prop_diff_neg)
if len(angle_diff_neg)>1:
ind_sort_neg = np.argsort(np.abs(angle_diff_neg))
plot_ang_diff_neg = angle_diff_neg[ind_sort_neg]
plot_prop_diff_neg = prop_diff_neg[ind_sort_neg]
ind_prop_neg = np.abs(plot_prop_diff_neg)<max_prop_threshold
ax.plot(plot_ang_diff_neg[ind_prop_neg], plot_prop_diff_neg[ind_prop_neg], color=color, lw=lw, zorder=zorder)
coord_list.append(plot_ang_diff_neg[ind_prop_neg])
resid_list.append(plot_prop_diff_neg[ind_prop_neg])
color_list.append(color)
lev_list.append(lev)
else:
plot_ang_diff_neg = []
plot_prop_diff_neg = []
"""
if len(angle_diff_pos)>1 or len(angle_diff_neg)>1:
coord_list.append(np.append(plot_ang_diff_pos, plot_ang_diff_neg))
resid_list.append(np.append(plot_prop_diff_pos, plot_prop_diff_neg))
color_list.append(color)
lev_list.append(lev)
"""
else:
coord_list.append(second_cont[corr_inds])
resid_list.append(prop_cont[corr_inds])
color_list.append(color)
lev_list.append(lev)
ind_sort = np.argsort(second_cont[corr_inds]) #sorting by azimuth to avoid 'joint' boundaries in plot
ax.plot(second_cont[corr_inds][ind_sort],
prop_cont[corr_inds][ind_sort],
color=color, lw=lw, zorder=zorder)
if ax2 is not None:
x_cont = np.array([X[i] for i in inds_cont])
y_cont = np.array([Y[i] for i in inds_cont])
if isinstance(ax2, matplotlib.axes._subplots.Axes):
ax2.plot(x_cont[corr_inds], y_cont[corr_inds], color=color, lw=lw*lw_ax2_factor)
elif isinstance(ax2, list):
for axi in ax2:
if isinstance(axi, matplotlib.axes._subplots.Axes): axi.plot(x_cont[corr_inds], y_cont[corr_inds], color=color, lw=lw*lw_ax2_factor)
return [np.asarray(tmp) for tmp in [coord_list, resid_list, color_list, lev_list]]
@staticmethod
def make_substructures(ax, twodim=False, polar=False, gaps=[], rings=[], kinks=[], make_labels=False,
kwargs_gaps={}, kwargs_rings={}, kwargs_kinks={}, func1d='axvline'):
'''Overlay ring-like (if twodim) or vertical lines (if not twodim) to illustrate the radial location of substructures in the disc'''
kwargs_g = dict(color='0.2', ls='--', lw=1.7, dash_capstyle='round', dashes=(3.0, 2.5), alpha=1.0)
kwargs_r = dict(color='0.2', ls='-', lw=1.7, dash_capstyle='round', alpha=1.0)
kwargs_k = dict(color='purple', ls=':', lw=2.5, dash_capstyle='round', dashes=(0.5, 1.5), alpha=0.9)
kwargs_g.update(kwargs_gaps)
kwargs_r.update(kwargs_rings)
kwargs_k.update(kwargs_kinks)
if twodim:
nphi = 100
phi = np.linspace(0, 2*np.pi, nphi)
if polar:
for R in gaps: ax.plot(phi, [R]*nphi, **kwargs_g)
for R in rings: ax.plot(phi, [R]*nphi, **kwargs_r)
for R in kinks: ax.plot(phi, [R]*nphi, **kwargs_k)
else:
cos_phi = np.cos(phi)
sin_phi = np.sin(phi)
for R in gaps: ax.plot(R*cos_phi, R*sin_phi, **kwargs_g)
for R in rings: ax.plot(R*cos_phi, R*sin_phi, **kwargs_r)
for R in kinks: ax.plot(R*cos_phi, R*sin_phi, **kwargs_k)
else:
if func1d=='axvline': func1d=ax.axvline
elif func1d=='axhline': func1d=ax.axhline
for R in gaps: func1d(R, **kwargs_g)
for R in rings: func1d(R, **kwargs_r)
for R in kinks: func1d(R, **kwargs_k)
if make_labels and len(gaps)>0: ax.plot([None], [None], label='Gaps', **kwargs_g)
if make_labels and len(rings)>0: ax.plot([None], [None], label='Rings', **kwargs_r)
if make_labels and len(kinks)>0: ax.plot([None], [None], label='Kinks', **kwargs_k)
return ax
@staticmethod
def make_contour_lev(prop, lev, X, Y, acc_threshold=20):
from skimage import measure
contour = measure.find_contours(prop, lev)
inds_cont = np.round(contour[-1]).astype(int)
inds_cont = [tuple(f) for f in inds_cont]
first_cont = np.array([prop[i] for i in inds_cont])
corr_inds = np.abs(first_cont-lev) < acc_threshold
x_cont = np.array([X[i] for i in inds_cont])
y_cont = np.array([Y[i] for i in inds_cont])
return x_cont[corr_inds], y_cont[corr_inds], inds_cont, corr_inds
@staticmethod
def beams_along_ring(lev, Rgrid, beam_size, X, Y):
xc, yc, _, _ = Contours.make_contour_lev(Rgrid, lev, X, Y)
try:
rc = hypot_func(xc, yc)
a = np.max(rc)
b = np.min(rc)
ellipse_perim = np.pi*(3*(a+b)-np.sqrt((3*a+b)*(a+3*b))) #Assuming that disc vertical extent does not distort much the ellipse
return ellipse_perim/beam_size
except ValueError: #No contour was found
return np.inf
@staticmethod
def get_average_east_west(resid_list, coord_list, lev_list,
Rgrid, beam_size, X, Y,
av_func=np.nanmean, mask_ang=0, resid_thres='3sigma',
error_func=True, error_unit=1.0, error_thres=np.inf):
#resid_thres: None, '3sigma', or list of thresholds with size len(lev_list)
nconts = len(lev_list)
if resid_thres is None: resid_thres = [np.inf]*nconts
elif resid_thres == '3sigma': resid_thres = [3*np.nanstd(resid_list[i]) for i in range(nconts)] #anything higher than 3sigma is rejected from annulus
# -np.pi<coord_list<np.pi
ind_west = [((coord_list[i]<90-mask_ang) & (coord_list[i]>-90+mask_ang)) & (np.abs(resid_list[i]-np.nanmean(resid_list[i])) < resid_thres[i]) for i in range(nconts)]
ind_east = [((coord_list[i]>90+mask_ang) | (coord_list[i]<-90-mask_ang)) & (np.abs(resid_list[i]-np.nanmean(resid_list[i])) < resid_thres[i]) for i in range(nconts)]
av_west = np.array([av_func(resid_list[i][ind_west[i]]) for i in range(nconts)])
av_east = np.array([av_func(resid_list[i][ind_east[i]]) for i in range(nconts)])
if error_func is None: av_west_error, av_east_error = None, None
else:
beams_ring_sqrt = np.sqrt([0.5*Contours.beams_along_ring(lev, Rgrid, beam_size, X, Y) for lev in lev_list]) #0.5 because we split the disc in halves
if callable(error_func): #if error map provided, compute average error per radius, divided by sqrt of number of beams (see Michiel Hogerheijde notes on errors)
av_west_error, av_east_error = np.zeros(nconts), np.zeros(nconts)
for i in range(nconts):
x_west, y_west, __ = Tools.get_sky_from_disc_coords(lev_list[i], coord_list[i][ind_west[i]]) #MISSING z, incl, PA for the function to work
x_east, y_east, __ = Tools.get_sky_from_disc_coords(lev_list[i], coord_list[i][ind_east[i]]) #MISSING z, incl, PA for the function to work
error_west = np.array(list(map(error_func, x_west, y_west))).T[0]
error_east = np.array(list(map(error_func, x_east, y_east))).T[0]
sigma2_west = np.where((np.isfinite(error_west)) & (error_unit*error_west<error_thres) & (error_west>0), (error_unit*error_west)**2, 0)
sigma2_east = np.where((np.isfinite(error_east)) & (error_unit*error_east<error_thres) & (error_east>0), (error_unit*error_east)**2, 0)
Np_west = len(coord_list[i][ind_west[i]])
Np_east = len(coord_list[i][ind_east[i]])
av_west_error[i] = np.sqrt(np.nansum(sigma2_west)/Np_west)/beams_ring_sqrt[i]
av_east_error[i] = np.sqrt(np.nansum(sigma2_east)/Np_east)/beams_ring_sqrt[i]
else: #compute standard error of mean value
av_west_error = np.array([np.std(resid_list[i][ind_west[i]], ddof=1) for i in range(nconts)])/beams_ring_sqrt
av_east_error = np.array([np.std(resid_list[i][ind_east[i]], ddof=1) for i in range(nconts)])/beams_ring_sqrt
return av_west, av_east, av_west_error, av_east_error
@staticmethod
def get_average(resid_list, coord_list, lev_list,
Rgrid, beam_size, X, Y,
av_func=np.nanmean, mask_ang=0, resid_thres='3sigma',
error_func=True, error_unit=1.0, error_thres=np.inf):
#mask_ang: +- angles to reject around minor axis (i.e. phi=+-90)
#resid_thres: None, '3sigma', or list of thresholds with size len(lev_list)
frac_annulus = 1.0 #if halves, 0.5; if quadrants, 0.25
nconts = len(lev_list)
if resid_thres is None: resid_thres = [np.inf]*nconts #consider all values for the average
elif resid_thres == '3sigma': resid_thres = [3*np.nanstd(resid_list[i]) for i in range(nconts)] #anything higher than 3sigma is rejected from annulus
# -np.pi<coord_list<np.pi
ind_accep = [(((coord_list[i]<90-mask_ang) & (coord_list[i]>-90+mask_ang)) |
((coord_list[i]>90+mask_ang) | (coord_list[i]<-90-mask_ang))) &
(np.abs(resid_list[i]-np.nanmean(resid_list[i]))<resid_thres[i])
for i in range(nconts)]
av_annulus = np.array([av_func(resid_list[i][ind_accep[i]]) for i in range(nconts)])
if error_func is None: av_error = None
else:
beams_ring_sqrt = np.sqrt([frac_annulus*Contours.beams_along_ring(lev, Rgrid, beam_size, X, Y) for lev in lev_list])
if callable(error_func): #if error map provided, compute average error per radius, divided by sqrt of number of beams (see Michiel Hogerheijde notes on errors)
av_error = np.zeros(nconts)
for i in range(nconts):
x_accep, y_accep, __ = get_sky_from_disc_coords(lev_list[i], coord_list[i][ind_accep[i]]) #MISSING z, incl, PA for the function to work
error_accep = np.array(list(map(error_func, x_accep, y_accep))).T[0]
sigma2_accep = np.where((np.isfinite(error_accep)) & (error_unit*error_accep<error_thres) & (error_accep>0), (error_unit*error_accep)**2, 0)
Np_accep = len(coord_list[i][ind_accep[i]])
av_error[i] = np.sqrt(np.nansum(sigma2_accep)/Np_accep)/beams_ring_sqrt[i]
else: #compute standard error of mean value
av_error = np.array([np.std(resid_list[i][ind_accep[i]], ddof=1) for i in range(nconts)])/beams_ring_sqrt
return av_annulus, av_error
@staticmethod
def get_average_zones(resid_list, coord_list, lev_list, Rgrid, beam_size, X, Y, fast=True,
az_zones=[[-30, 30], [150, -150]], join_zones=False, av_func=np.nanmean,
resid_thres='3sigma', error_func=True, error_unit=1.0, error_thres=np.inf):
#resid_thres: None, '3sigma', or list of thresholds with size len(lev_list)
nconts = len(lev_list)
nzones = len(az_zones)
if not fast: join_zones=False
if resid_thres is None: resid_thres = [np.inf]*nconts
elif resid_thres == '3sigma': resid_thres = [3*np.nanstd(resid_list[i]) for i in range(nconts)] #anything higher than 3sigma is rejected from annulus
make_or = lambda az0, az1: [((coord_list[i]>az0) | (coord_list[i]<az1)) & (np.abs(resid_list[i]-np.nanmean(resid_list[i])) < resid_thres[i]) for i in range(nconts)]
make_and = lambda az0, az1: [((coord_list[i]>az0) & (coord_list[i]<az1)) & (np.abs(resid_list[i]-np.nanmean(resid_list[i])) < resid_thres[i]) for i in range(nconts)]
def get_portion_inds(az):
az0, az1 = az
if (az0 > az1):
inds = make_or(az0, az1)
else:
inds = make_and(az0, az1)
return inds
def get_portion_percent(az):
az0, az1 = az
if (az0 > az1):
if az0 < 0: perc = 1 - (az0-az1)/360.
else: perc = (180-az0 + 180-np.abs(az1))/360.
else:
perc = (az1-az0)/360.
return perc
#inds containts lists of indices, one list per zone. Each is a list of lists, with as many lists as nconts (number of radii). Each sublist has different number of indices, the larger the radius (i.e. larger path) the more indices.
inds = [get_portion_inds(zone) for zone in az_zones]
az_percent = np.array([get_portion_percent(zone) for zone in az_zones])
if join_zones and nzones>1:
concat = lambda x,y: x+y
inds = [[functools.reduce(concat, [ind[i] for ind in inds]) for i in range(nconts)]] #concatenates indices from zones, per radius.
az_percent = np.sum(az_percent)[None] #array of single number
nzones = 1
if fast: #Compute usual average
av_on_inds = [np.array([av_func(resid_list[i][ind[i]]) for i in range(nconts)]) for ind in inds]
else: #Compute average using integral definition (trapezoid seems to succeed better than simpson)
av_integral = lambda y,x,dT: np.trapz(y, x=x)/dT # or trapezoid from scipy.integrate
av_on_inds = []
for ind in inds:
av_annulus = []
for i in range(nconts):
ii = ind[i]
coords_ii = coord_list[i][ii]
resid_ii = resid_list[i][ii]
if not len(coords_ii): trap=None
else: trap = av_integral(resid_ii, coords_ii, coords_ii[-1]-coords_ii[0]) #dT assumes coords_list is sorted (no matter if ascending or descending)
av_annulus.append(trap)
av_on_inds.append(np.array(av_annulus))
beams_ring_full = [Contours.beams_along_ring(lev, Rgrid, beam_size, X, Y) for lev in lev_list]
beams_zone_sqrt = [np.sqrt(az_percent*br) for br in beams_ring_full]
if error_func is None: av_error = None
else:
if callable(error_func): #Not yet tested
#if error map provided, compute average error per radius, divided by sqrt of number of beams (see Michiel Hogerheijde notes on errors)
av_error = []
for i in range(nconts):
r_ind = [Tools.get_sky_from_disc_coords(lev_list[i], coord_list[i][ind[i]]) for ind in inds] #MISSING z, incl, PA for the function to work
error_ind = [np.array(list(map(error_func, r_ind[j][0], r_ind[j][1]))).T[0] for j in range(nzones)]
sigma2_ind = [np.where((np.isfinite(error_ind[j])) & (error_unit*error_ind[j]<error_thres) & (error_ind[j]>0),
(error_unit*error_ind[j])**2,
0)
for j in range(nzones)]
np_ind = [len(coord_list[i][ind[i]]) for ind in inds]
av_error.append([np.sqrt(np.nansum(sigma2_ind[j])/np_ind[j])/beams_zone_sqrt[i][j] for j in range(nzones) in np_ind])
else: #compute standard error of mean value
av_error = [np.array([np.std(resid_list[i][inds[j][i]], ddof=1)/beams_zone_sqrt[i][j] for i in range(nconts)]) for j in range(nzones)]
return av_on_inds, av_error
def make_filaments(prop_2D, R_nonan_up_au, R_inner_au, beam_size_au, distance_pc, dpix_arcsec, **kwargs):
#FIND FILAMENTS
#adapt_thresh is the width of the element used for the adaptive thresholding mask.
# This is primarily the step that picks out the filamentary structure. The element size should be similar to the width of the expected filamentary structure
#kw_fil_mask = dict(verbose=False, adapt_thresh=50*apu.au, smooth_size=1*beam_size_au*apu.au, size_thresh=100*apu.pix**2, border_masking=False, fill_hole_size=0.01*apu.arcsec**2)
from fil_finder import FilFinder2D
from astropy import units as apu
distance=distance_pc*apu.pc
ang_scale=dpix_arcsec*apu.arcsec
R_min=R_inner_au
kw_fil_mask = dict(verbose=False, adapt_thresh=1*beam_size_au*apu.au, smooth_size=0.2*beam_size_au*apu.au, size_thresh=500*apu.pix**2, border_masking=False, fill_hole_size=0.01*apu.arcsec**2)
kw_fil_mask.update(kwargs)
Rgrid = R_nonan_up_au
Rind = (Rgrid>R_min) #& (Rgrid<R_max)
fil_pos = FilFinder2D(np.where(Rind & (prop_2D>0), np.abs(prop_2D), 0), ang_scale=ang_scale, distance=distance)
fil_pos.preprocess_image(skip_flatten=True)
fil_pos.create_mask(**kw_fil_mask)
fil_pos.medskel(verbose=False)
fil_neg = FilFinder2D(np.where(Rind & (prop_2D<0), np.abs(prop_2D), 0), ang_scale=ang_scale, distance=distance)
fil_neg.preprocess_image(skip_flatten=True)
fil_neg.create_mask(**kw_fil_mask)
fil_neg.medskel(verbose=False)
fil_pos.analyze_skeletons(prune_criteria='length')
fil_neg.analyze_skeletons(prune_criteria='length')
return fil_pos, fil_neg
[docs]class Cube(object):
def __init__(self, nchan, channels, data, beam=False, beam_kernel=False, tb={'nu': False, 'beam': False, 'full': True}):
self.nchan = nchan
self.channels = channels
self.data = data
self.point = self.cursor
self._interactive = self.cursor
self._interactive_path = self.curve
if beam: self.beam = beam
if beam_kernel: self.beam_kernel = beam_kernel
if isinstance(tb, dict):
if tb['nu'] and tb['beam']: self.data = Tools.get_tb(self.data, tb['nu'], tb['beam'], full=tb['full'])
@property
def interactive(self):
return self._interactive
@interactive.setter
def interactive(self, func):
print('Setting interactive function to', func)
self._interactive = func
@interactive.deleter
def interactive(self):
print('Deleting interactive function')
del self._interactive
@property
def interactive_path(self):
return self._interactive_path
@interactive_path.setter
def interactive_path(self, func):
print('Setting interactive_path function to', func)
self._interactive_path = func
@interactive_path.deleter
def interactive_path(self):
print('Deleting interactive_path function')
del self._interactive_path
[docs] def ellipse(self):
pass
def _plot_spectrum_region(self, x0, x1, y0, y1, ax, extent=None, compare_cubes=[], stat_func=np.mean, **kwargs):
kwargs_spec = dict(where='mid', linewidth=2.5, label=r'x0:%d,x1:%d'%(x0,x1))
kwargs_spec.update(kwargs)
v0, v1 = self.channels[0], self.channels[-1]
def get_ji(x,y):
pass
if extent is None:
j0, i0 = int(x0), int(y0)
j1, i1 = int(x1), int(y1)
else:
nz, ny, nx = np.shape(self.data)
dx = extent[1] - extent[0]
dy = extent[3] - extent[2]
j0 = int(nx*(x0-extent[0])/dx)
i0 = int(ny*(y0-extent[2])/dy)
j1 = int(nx*(x1-extent[0])/dx)
i1 = int(ny*(y1-extent[2])/dy)
slice_cube = self.data[:,i0:i1,j0:j1]
spectrum = np.array([stat_func(chan) for chan in slice_cube])
ncubes = len(compare_cubes)
if ncubes > 0:
slice_comp = [compare_cubes[i].data[:,i0:i1,j0:j1] for i in range(ncubes)]
cubes_spec = [np.array([stat_func(chan) for chan in slice_comp[i]]) for i in range(ncubes)]
if np.logical_or(np.isinf(spectrum), np.isnan(spectrum)).all(): return False
else:
plot_spec = ax.step(self.channels, spectrum, **kwargs_spec)
if ncubes > 0:
alpha = 0.2
dalpha = -alpha/ncubes
for i in range(ncubes):
ax.fill_between(self.channels, cubes_spec[i], color=plot_spec[0].get_color(), step='mid', alpha=alpha)
alpha+=dalpha
else: ax.fill_between(self.channels, spectrum, color=plot_spec[0].get_color(), step='mid', alpha=0.2)
return plot_spec
[docs] def box(self, fig, ax, extent=None, compare_cubes=[], stat_func=np.mean, **kwargs):
from matplotlib.widgets import RectangleSelector
def onselect(eclick, erelease):
if eclick.inaxes is ax[0]:
plot_spec = self._plot_spectrum_region(eclick.xdata, erelease.xdata, eclick.ydata, erelease.ydata,
ax[1], extent=extent, compare_cubes=compare_cubes,
stat_func=stat_func, **kwargs)
if plot_spec:
print('startposition: (%f, %f)' % (eclick.xdata, eclick.ydata))
print('endposition : (%f, %f)' % (erelease.xdata, erelease.ydata))
print('used button : ', eclick.button)
xc, yc = eclick.xdata, eclick.ydata #Left, bottom corner
dx, dy = erelease.xdata-eclick.xdata, erelease.ydata-eclick.ydata
rect = patches.Rectangle((xc,yc), dx, dy, lw=2, edgecolor=plot_spec[0].get_color(), facecolor='none')
ax[0].add_patch(rect)
ax[1].legend()
fig.canvas.draw()
fig.canvas.flush_events()
def toggle_selector(event):
print('Key pressed.')
if event.key in ['Q', 'q'] and toggle_selector.RS.active:
print('RectangleSelector deactivated.')
toggle_selector.RS.set_active(False)
if event.key in ['A', 'a'] and not toggle_selector.RS.active:
print('RectangleSelector activated.')
toggle_selector.RS.set_active(True)
rectprops = dict(facecolor='none', edgecolor = 'white',
alpha=0.8, fill=False)
lineprops = dict(color='white', linestyle='-',
linewidth=3, alpha=0.8)
toggle_selector.RS = RectangleSelector(ax[0], onselect, drawtype='box', rectprops=rectprops, lineprops=lineprops)
cid = fig.canvas.mpl_connect('key_press_event', toggle_selector)
return toggle_selector.RS
def _plot_spectrum_cursor(self, x, y, ax, extent=None, compare_cubes=[], **kwargs):
kwargs_spec = dict(where='mid', linewidth=2.5, label=r'%d,%d'%(x,y))
kwargs_spec.update(kwargs)
def get_ji(x,y):
pass
if extent is None:
j, i = int(x), int(y)
else:
nz, ny, nx = np.shape(self.data)
dx = extent[1] - extent[0]
dy = extent[3] - extent[2]
j = int(nx*(x-extent[0])/dx)
i = int(ny*(y-extent[2])/dy)
spectrum = self.data[:,i,j]
v0, v1 = self.channels[0], self.channels[-1]
if np.logical_or(np.isinf(spectrum), np.isnan(spectrum)).all(): return False
else:
#plot_fill = ax.fill_between(self.channels, spectrum, alpha=0.1)
plot_spec = ax.step(self.channels, spectrum, **kwargs_spec)
ncubes = len(compare_cubes)
if ncubes > 0:
alpha = 0.2
dalpha = -alpha/ncubes
for cube in compare_cubes:
ax.fill_between(self.channels, cube.data[:,i,j], color=plot_spec[0].get_color(), step='mid', alpha=alpha)
alpha+=dalpha
else: ax.fill_between(self.channels, spectrum, color=plot_spec[0].get_color(), step='mid', alpha=0.2)
return plot_spec
#def point(self, *args, **kwargs):
# return self.cursor(*args, **kwargs)
[docs] def cursor(self, fig, ax, extent=None, compare_cubes=[], **kwargs):
def onclick(event):
if event.button==3:
print ('Right click. Disconnecting click event...')
fig.canvas.mpl_disconnect(cid)
elif event.inaxes is ax[0]:
plot_spec = self._plot_spectrum_cursor(event.xdata, event.ydata, ax[1], extent=extent,
compare_cubes=compare_cubes, **kwargs)
if plot_spec:
print('%s click: button=%d, xdata=%f, ydata=%f' %
('double' if event.dblclick else 'single', event.button,
event.xdata, event.ydata))
ax[0].scatter(event.xdata, event.ydata, marker='D', s=50, facecolor=plot_spec[0].get_color(), edgecolor='k')
ax[1].legend(frameon=False, handlelength=0.7, fontsize=MEDIUM_SIZE-1)
fig.canvas.draw()
fig.canvas.flush_events()
cid = fig.canvas.mpl_connect('button_press_event', onclick)
return cid
def _plot_beam(self, ax):
x_fwhm = self.beam_kernel.model.x_fwhm
y_fwhm = self.beam_kernel.model.y_fwhm
ny_pix, nx_pix = np.shape(self.data[0])
ellipse = patches.Ellipse(xy = (0.05,0.05), angle = 90+self.beam.pa.value,
width=x_fwhm/nx_pix, height=y_fwhm/ny_pix, lw=1, fill=True,
fc='gray', ec='k', transform=ax.transAxes)
ax.add_artist(ellipse)
[docs] def surface(self, ax, *args, **kwargs): return Contours.emission_surface(ax, *args, **kwargs)
[docs] def show(self, extent=None, chan_init=0, compare_cubes=[], cursor_grid=True, cmap='gnuplot2_r',
int_unit=r'Intensity [mJy beam$^{-1}$]', pos_unit='Offset [au]', vel_unit=r'km s$^{-1}$',
show_beam=False, surface={'args': (), 'kwargs': {}}, **kwargs):
from matplotlib.widgets import Slider, Cursor, Button
v0, v1 = self.channels[0], self.channels[-1]
dv = v1-v0
fig, ax = plt.subplots(ncols=2, figsize=(12,5))
plt.subplots_adjust(wspace=0.25)
y0, y1 = ax[1].get_position().y0, ax[1].get_position().y1
axcbar = plt.axes([0.47, y0, 0.03, y1-y0])
max_data = np.nanmax([self.data]+[comp.data for comp in compare_cubes])
ax[0].set_xlabel(pos_unit)
ax[0].set_ylabel(pos_unit)
ax[1].set_xlabel('l.o.s velocity [%s]'%vel_unit)
PlotTools.mod_major_ticks(ax[0], axis='both', nbins=5)
ax[0].tick_params(direction='out')
ax[1].tick_params(direction='in', right=True, labelright=False, labelleft=False)
axcbar.tick_params(direction='out')
ax[1].set_ylabel(int_unit, labelpad=15)
ax[1].yaxis.set_label_position('right')
ax[1].set_xlim(v0-0.1, v1+0.1)
vmin, vmax = -1*max_data/100, 0.7*max_data#0.8*max_data#
ax[1].set_ylim(vmin, vmax)
#ax[1].grid(lw=1.5, ls=':')
cmap = plt.get_cmap(cmap)
cmap.set_bad(color=(0.9,0.9,0.9))
if show_beam and self.beam_kernel: self._plot_beam(ax[0])
img = ax[0].imshow(self.data[chan_init], cmap=cmap, extent=extent, origin='lower', vmin=vmin, vmax=vmax)
cbar = plt.colorbar(img, cax=axcbar)
img.cmap.set_under('w')
current_chan = ax[1].axvline(self.channels[chan_init], color='black', lw=2, ls='--')
text_chan = ax[1].text((self.channels[chan_init]-v0)/dv, 1.02, #Converting xdata coords to Axes coords
'%4.1f %s'%(self.channels[chan_init], vel_unit), ha='center',
color='black', transform=ax[1].transAxes)
if cursor_grid: cg = Cursor(ax[0], useblit=True, color='lime', linewidth=1.5)
def get_interactive(func):
return func(fig, ax, extent=extent, compare_cubes=compare_cubes, **kwargs)
interactive_obj = [get_interactive(self.interactive)]
#***************
#SLIDERS
#***************
def update_chan(val):
chan = int(val)
vchan = self.channels[chan]
img.set_data(self.data[chan])
current_chan.set_xdata(vchan)
text_chan.set_x((vchan-v0)/dv)
text_chan.set_text('%4.1f %s'%(vchan, vel_unit))
fig.canvas.draw_idle()
def update_cubes(val):
i = int(slider_cubes.val)
chan = int(slider_chan.val)
vchan = self.channels[chan]
if i==0: img.set_data(self.data[chan])
else: img.set_data(compare_cubes[i-1].data[chan])
current_chan.set_xdata(vchan)
text_chan.set_x((vchan-v0)/dv)
text_chan.set_text('%4.1f km/s'%vchan)
fig.canvas.draw_idle()
ncubes = len(compare_cubes)
if ncubes>0:
axcubes = plt.axes([0.2, 0.90, 0.24, 0.025], facecolor='0.7')
axchan = plt.axes([0.2, 0.95, 0.24, 0.025], facecolor='0.7')
slider_cubes = Slider(axcubes, 'Cube id', 0, ncubes,
valstep=1, valinit=0, valfmt='%1d', color='dodgerblue')
slider_chan = Slider(axchan, 'Channel', 0, self.nchan-1,
valstep=1, valinit=chan_init, valfmt='%2d', color='dodgerblue')
slider_cubes.on_changed(update_cubes)
slider_chan.on_changed(update_cubes)
else:
axchan = plt.axes([0.2, 0.9, 0.24, 0.05], facecolor='0.7')
slider_chan = Slider(axchan, 'Channel', 0, self.nchan-1,
valstep=1, valinit=chan_init, valfmt='%2d', color='dodgerblue')
slider_chan.on_changed(update_chan)
#*************
#BUTTONS
#*************
def go2cursor(event):
if self.interactive == self.cursor or self.interactive == self.point: return 0
interactive_obj[0].set_active(False)
self.interactive = self.cursor
interactive_obj[0] = get_interactive(self.interactive)
def go2box(event):
if self.interactive == self.box: return 0
fig.canvas.mpl_disconnect(interactive_obj[0])
self.interactive = self.box
interactive_obj[0] = get_interactive(self.interactive)
def go2trash(event):
print ('Cleaning interactive figure...')
plt.close()
chan = int(slider_chan.val)
self.show(extent=extent, chan_init=chan, compare_cubes=compare_cubes,
cursor_grid=cursor_grid, int_unit=int_unit, pos_unit=pos_unit,
vel_unit=vel_unit, surface=surface, **kwargs)
def go2surface(event):
self.surface(ax[0], *surface['args'], **surface['kwargs'])
fig.canvas.draw()
fig.canvas.flush_events()
box_img = plt.imread(path_file+'button_box.png')
cursor_img = plt.imread(path_file+'button_cursor.jpeg')
trash_img = plt.imread(path_file+'button_trash.jpg')
surface_img = plt.imread(path_file+'button_surface.png')
axbcursor = plt.axes([0.05, 0.779, 0.05, 0.05])
axbbox = plt.axes([0.05, 0.72, 0.05, 0.05])
axbtrash = plt.axes([0.05, 0.661, 0.05, 0.05], frameon=True, aspect='equal')
bcursor = Button(axbcursor, '', image=cursor_img)
bcursor.on_clicked(go2cursor)
bbox = Button(axbbox, '', image=box_img)
bbox.on_clicked(go2box)
btrash = Button(axbtrash, '', image=trash_img, color='white', hovercolor='lime')
btrash.on_clicked(go2trash)
if len(surface['args'])>0:
axbsurf = plt.axes([0.005, 0.759, 0.07, 0.07], frameon=True, aspect='equal')
bsurf = Button(axbsurf, '', image=surface_img)
bsurf.on_clicked(go2surface)
plt.show()
[docs] def show_side_by_side(self, cube1, extent=None, chan_init=0, cursor_grid=True, cmap='gnuplot2_r',
int_unit=r'Intensity [mJy beam$^{-1}$]', pos_unit='Offset [au]', vel_unit=r'km s$^{-1}$',
show_beam=False, surface={'args': (), 'kwargs': {}}, **kwargs):
from matplotlib.widgets import Slider, Cursor, Button
compare_cubes = [cube1]
v0, v1 = self.channels[0], self.channels[-1]
dv = v1-v0
fig, ax = plt.subplots(ncols=3, figsize=(17,5))
plt.subplots_adjust(wspace=0.25)
y0, y1 = ax[2].get_position().y0, ax[2].get_position().y1
axcbar = plt.axes([0.63, y0, 0.015, y1-y0])
max_data = np.nanmax([self.data]+[comp.data for comp in compare_cubes])
ax[0].set_xlabel(pos_unit)
ax[0].set_ylabel(pos_unit)
ax[2].set_xlabel('l.o.s velocity [%s]'%vel_unit)
PlotTools.mod_major_ticks(ax[0], axis='both', nbins=5)
ax[0].tick_params(direction='out')
ax[2].tick_params(direction='in', right=True, labelright=False, labelleft=False)
axcbar.tick_params(direction='out')
ax[2].set_ylabel(int_unit, labelpad=15)
ax[2].yaxis.set_label_position('right')
ax[2].set_xlim(v0-0.1, v1+0.1)
vmin, vmax = -1*max_data/100, 0.7*max_data#0.8*max_data#
ax[2].set_ylim(vmin, vmax)
cmap = plt.get_cmap(cmap)
cmap.set_bad(color=(0.9,0.9,0.9))
if show_beam and self.beam_kernel: self._plot_beam(ax[0])
img = ax[0].imshow(self.data[chan_init], cmap=cmap, extent=extent, origin='lower', vmin=vmin, vmax=vmax)
img1 = ax[1].imshow(cube1.data[chan_init], cmap=cmap, extent=extent, origin='lower', vmin=vmin, vmax=vmax)
cbar = plt.colorbar(img, cax=axcbar)
img.cmap.set_under('w')
img1.cmap.set_under('w')
current_chan = ax[2].axvline(self.channels[chan_init], color='black', lw=2, ls='--')
text_chan = ax[2].text((self.channels[chan_init]-v0)/dv, 1.02, #Converting xdata coords to Axes coords
'%4.1f %s'%(self.channels[chan_init], vel_unit), ha='center',
color='black', transform=ax[2].transAxes)
if cursor_grid: cg = Cursor(ax[0], useblit=True, color='lime', linewidth=1.5)
def get_interactive(func):
return func(fig, [ax[0], ax[2]], extent=extent, compare_cubes=compare_cubes, **kwargs)
interactive_obj = [get_interactive(self.interactive)]
#***************
#SLIDERS
#***************
def update_chan(val):
chan = int(val)
vchan = self.channels[chan]
img.set_data(self.data[chan])
img1.set_data(cube1.data[chan])
current_chan.set_xdata(vchan)
text_chan.set_x((vchan-v0)/dv)
text_chan.set_text('%4.1f %s'%(vchan, vel_unit))
fig.canvas.draw_idle()
ncubes = len(compare_cubes)
axchan = plt.axes([0.2, 0.9, 0.24, 0.05], facecolor='0.7')
slider_chan = Slider(axchan, 'Channel', 0, self.nchan-1,
valstep=1, valinit=chan_init, valfmt='%2d', color='dodgerblue')
slider_chan.on_changed(update_chan)
#*************
#BUTTONS
#*************
def go2cursor(event):
if self.interactive == self.cursor or self.interactive == self.point: return 0
interactive_obj[0].set_active(False)
self.interactive = self.cursor
interactive_obj[0] = get_interactive(self.interactive)
def go2box(event):
if self.interactive == self.box: return 0
fig.canvas.mpl_disconnect(interactive_obj[0])
self.interactive = self.box
interactive_obj[0] = get_interactive(self.interactive)
def go2trash(event):
print ('Cleaning interactive figure...')
plt.close()
chan = int(slider_chan.val)
self.show_side_by_side(cube1, extent=extent, chan_init=chan,
cursor_grid=cursor_grid, int_unit=int_unit, pos_unit=pos_unit,
vel_unit=vel_unit, surface=surface, **kwargs)
def go2surface(event):
self.surface(ax[0], *surface['args'], **surface['kwargs'])
self.surface(ax[1], *surface['args'], **surface['kwargs'])
fig.canvas.draw()
fig.canvas.flush_events()
box_img = plt.imread(path_file+'button_box.png')
cursor_img = plt.imread(path_file+'button_cursor.jpeg')
trash_img = plt.imread(path_file+'button_trash.jpg')
surface_img = plt.imread(path_file+'button_surface.png')
axbcursor = plt.axes([0.05, 0.779, 0.05, 0.05])
axbbox = plt.axes([0.05, 0.72, 0.05, 0.05])
axbtrash = plt.axes([0.05, 0.661, 0.05, 0.05], frameon=True, aspect='equal')
bcursor = Button(axbcursor, '', image=cursor_img)
bcursor.on_clicked(go2cursor)
bbox = Button(axbbox, '', image=box_img)
bbox.on_clicked(go2box)
btrash = Button(axbtrash, '', image=trash_img, color='white', hovercolor='lime')
btrash.on_clicked(go2trash)
if len(surface['args'])>0:
axbsurf = plt.axes([0.005, 0.759, 0.07, 0.07], frameon=True, aspect='equal')
bsurf = Button(axbsurf, '', image=surface_img)
bsurf.on_clicked(go2surface)
plt.show()
"""
#Lasso functions under development
def _plot_lasso(self, ax, x, y, chan, color=False, show_path=True, extent=None, compare_cubes=[], **kwargs):
if len(self._lasso_path) == 0: return
#for i in range(len(self.lasso_path))
if extent is None:
j = x.astype(np.int)
i = y.astype(np.int)
else:
nz, ny, nx = np.shape(self.data)
dx = extent[1] - extent[0]
dy = extent[3] - extent[2]
j = (nx*(x-extent[0])/dx).astype(np.int)
i = (ny*(y-extent[2])/dy).astype(np.int)
if color: self._plot_path = ax[1].step(np.arange(len(i)), self.data[chan,i,j], color=color)
else: self._plot_path = ax[1].step(np.arange(len(i)), self.data[chan,i,j])
self._plot_color = self._plot_path[0].get_color()
if show_path: self._path_on_cube = ax[0].plot(x,y, color=self._plot_color)
else: self._path_on_cube = None
def lasso(self, fig, ax, chan, color=False, show_path=True, extent=None, compare_cubes=[], **kwargs):
from matplotlib.widgets import LassoSelector
canvas = ax[0].figure.canvas
def onselect(verts):
#path = Path(verts)
canvas.draw_idle()
self._lasso_path.append(np.array(verts).T)
self._plot_lasso(ax, *np.array(verts).T, chan, color, show_path, extent, compare_cubes, **kwargs)
print (verts)
def disconnect():
self._lasso_obj.disconnect_events()
canvas.draw_idle()
self._lasso_obj = LassoSelector(ax[0], onselect, lineprops={'color': 'lime'})
def onclick(event):
if event.button == 3:
print ('Right click. Disconnecting click event...')
disconnect()
fig.canvas.draw()
cid = fig.canvas.mpl_connect('button_press_event', onclick)
"""
[docs] def curve(self, ax, x, y, chan, color=False, show_path=True, extent=None, compare_cubes=[], **kwargs):
kwargs_curve = dict(linewidth=2.5)#, label=r'x0:%d,x1:%d'%(x0,x1))
kwargs_curve.update(kwargs)
if extent is None:
j = x.astype(np.int)
i = y.astype(np.int)
else:
nz, ny, nx = np.shape(self.data)
dx = extent[1] - extent[0]
dy = extent[3] - extent[2]
j = (nx*(x-extent[0])/dx).astype(np.int)
i = (ny*(y-extent[2])/dy).astype(np.int)
pix_ids = np.arange(len(i))
path_val = self.data[chan,i,j]
if color: plot_path = ax[1].step(pix_ids, path_val, where='mid', color=color, **kwargs_curve)
else: plot_path = ax[1].step(pix_ids, path_val, where='mid', **kwargs_curve)
plot_color = plot_path[0].get_color()
if show_path: path_on_cube = ax[0].plot(x, y, color=plot_color, **kwargs_curve)
else: path_on_cube = None
cube_fill = []
plot_fill = None
ncubes = len(compare_cubes)
if ncubes > 0:
alpha = 0.2
dalpha = -alpha/ncubes
for cube in compare_cubes:
cube_fill.append(ax[1].fill_between(pix_ids, cube.data[chan,i,j], color=plot_color, step='mid', alpha=alpha))
alpha+=dalpha
else: plot_fill = ax[1].fill_between(pix_ids, path_val, color=plot_color, step='mid', alpha=0.2)
return path_on_cube, plot_path, plot_color, plot_fill, cube_fill
[docs] def show_path(self, x, y, extent=None, chan_init=20, compare_cubes=[], cursor_grid=True,
int_unit=r'Intensity [mJy beam$^{-1}$]', pos_unit='au', vel_unit=r'km s$^{-1}$',
show_beam=False, **kwargs):
from matplotlib.widgets import Slider, Cursor, Button
v0, v1 = self.channels[0], self.channels[-1]
dv = v1-v0
fig, ax = plt.subplots(ncols=2, figsize=(12,5))
plt.subplots_adjust(wspace=0.25)
y0, y1 = ax[1].get_position().y0, ax[1].get_position().y1
axcbar = plt.axes([0.47, y0, 0.03, y1-y0])
max_data = np.max(self.data)
ax[0].set_xlabel(pos_unit)
ax[0].set_ylabel(pos_unit)
ax[1].set_xlabel('Pixel id along path')
ax[1].tick_params(direction='in', right=True, labelright=False, labelleft=False)
axcbar.tick_params(direction='out')
ax[1].set_ylabel(int_unit, labelpad=15)
ax[1].yaxis.set_label_position('right')
#ax[1].set_xlim(v0-0.1, v1+0.1)
#ax[1].set_ylim(-1, max_data)
vmin, vmax = -max_data/30, max_data
ax[1].set_ylim(vmin, vmax)
ax[1].grid(lw=1.5, ls=':')
cmap = plt.get_cmap('brg')
cmap.set_bad(color=(0.9,0.9,0.9))
if show_beam and self.beam_kernel: self._plot_beam(ax[0])
img = ax[0].imshow(self.data[chan_init], cmap=cmap, extent=extent, origin='lower', vmin=vmin, vmax=vmax)
cbar = plt.colorbar(img, cax=axcbar)
text_chan = ax[1].text(0.15, 1.04, #Converting xdata coords to Axes coords
r'v$_{\rmchan}$=%4.1f %s'%(self.channels[chan_init], vel_unit), ha='center',
color='black', transform=ax[1].transAxes)
if cursor_grid: cg = Cursor(ax[0], useblit=True, color='lime', linewidth=1.5)
box_img = plt.imread(path_file+'button_box.png')
cursor_img = plt.imread(path_file+'button_cursor.jpeg')
def get_interactive(func, chan=chan_init, color=False, show_path=True):
return func(ax, x, y, chan, color=color, show_path=show_path, extent=extent, compare_cubes=compare_cubes, **kwargs)
interactive_obj = [get_interactive(self.interactive_path)]
#***************
#SLIDERS
#***************
def update_chan(val):
chan = int(val)
vchan = self.channels[chan]
img.set_data(self.data[chan])
text_chan.set_text(r'v$_{\rmchan}$=%4.1f %s'%(vchan, vel_unit))
path_on_cube, plot_path, plot_color, plot_fill, cube_fill = interactive_obj[0]
plot_path[0].remove()
if plot_fill is not None: plot_fill.remove()
for cbfill in cube_fill: cbfill.remove()
interactive_obj[0] = get_interactive(self.interactive_path, chan, color=plot_color, show_path=False)
fig.canvas.draw_idle()
def update_cubes(val):
i = int(slider_cubes.val)
chan = int(slider_chan.val)
vchan = self.channels[chan]
if i==0: img.set_data(self.data[chan])
else: img.set_data(compare_cubes[i-1].data[chan])
text_chan.set_text(r'v$_{\rmchan}$=%4.1f %s'%(vchan, vel_unit))
path_on_cube, plot_path, plot_color, plot_fill, cube_fill = interactive_obj[0]
plot_path[0].remove()
if plot_fill is not None: plot_fill.remove()
for cbfill in cube_fill: cbfill.remove()
interactive_obj[0] = get_interactive(self.interactive_path, chan, color=plot_color, show_path=False)
fig.canvas.draw_idle()
ncubes = len(compare_cubes)
if ncubes>0:
axcubes = plt.axes([0.2, 0.90, 0.24, 0.025], facecolor='0.7')
axchan = plt.axes([0.2, 0.95, 0.24, 0.025], facecolor='0.7')
slider_cubes = Slider(axcubes, 'Cube id', 0, ncubes,
valstep=1, valinit=0, valfmt='%1d', color='dodgerblue')
slider_chan = Slider(axchan, 'Channel', 0, self.nchan-1,
valstep=1, valinit=chan_init, valfmt='%2d', color='dodgerblue')
slider_cubes.on_changed(update_cubes)
slider_chan.on_changed(update_cubes)
else:
axchan = plt.axes([0.2, 0.9, 0.24, 0.05], facecolor='0.7')
slider_chan = Slider(axchan, 'Channel', 0, self.nchan-1,
valstep=1, valinit=chan_init, valfmt='%2d', color='dodgerblue')
slider_chan.on_changed(update_chan)
plt.show()
"""
self._path_on_cube, self._plot_path, self._plot_color = None, None, None
self._lasso_path = []
self.interactive_path(fig, ax, chan_init, color=False, show_path=True, extent=extent, compare_cubes=compare_cubes, **kwargs)
def get_interactive(func, chan=chan_init, color=False, show_path=True):
#func(fig, ax, chan, color=color, show_path=show_path, extent=extent, compare_cubes=compare_cubes, **kwargs)
if func == self.lasso:
return self._plot_lasso(ax, True, True, chan, color=color, show_path=show_path, extent=extent, compare_cubes=compare_cubes, **kwargs)
#interactive_obj = [get_interactive(self.interactive_path)]
#print (interactive_obj)
#***************
#SLIDERS
#***************
def update_chan(val):
chan = int(val)
vchan = self.channels[chan]
img.set_data(self.data[chan])
current_chan.set_xdata(vchan)
text_chan.set_x((vchan-v0)/dv)
text_chan.set_text('%4.1f km/s'%vchan)
#path_on_cube, plot_path, plot_color = interactive_obj[0]
if self._path_on_cube is not None:
self._plot_path[0].remove()
get_interactive(self.interactive_path, chan, color=self._plot_color, show_path=False)
fig.canvas.draw_idle()
"""
[docs] def make_fits(self, output, **kw_header):
from astropy.io import fits
hdr = fits.Header()
hdr.update(**kw_header)
data = np.where(np.isfinite(self.data), self.data, 0)
fits.writeto(output, data, hdr, overwrite=True)
[docs] def make_gif(self, folder='./movie/', extent=None, velocity2d=None,
unit=r'Brightness Temperature [K]',
gif_command='convert -delay 10 *int2d* cube_channels.gif'):
cwd = os.getcwd()
if folder[-1] != '/': folder+='/'
os.system('mkdir %s'%folder)
max_data = np.max(self.data)
clear_list, coll_list = [], []
fig, ax = plt.subplots()
contour_color = 'red'
cmap = plt.get_cmap('binary')
cmap.set_bad(color=(0.9,0.9,0.9))
ax.plot([None],[None], color=contour_color, linestyle='--', linewidth=2, label='Upper surface')
ax.plot([None],[None], color=contour_color, linestyle=':', linewidth=2, label='Lower surface')
ax.set_xlabel('au')
ax.set_ylabel('au')
for i in range(self.nchan):
vchan = self.channels[i]
int2d = ax.imshow(self.data[i], cmap=cmap, extent=extent, origin='lower', vmax=max_data)
cbar = plt.colorbar(int2d)
cbar.set_label(unit)
if velocity2d is not None:
vel_near=ax.contour(velocity2d['upper'], levels=[vchan], colors=contour_color, linestyles='--', linewidths=1.3, extent = extent)
vel_far=ax.contour(velocity2d['lower'], levels=[vchan], colors=contour_color, linestyles=':', linewidths=1.3, extent = extent)
coll_list = [vel_near, vel_far]
text_chan = ax.text(0.7, 1.02, '%4.1f km/s'%vchan, color='black', transform=ax.transAxes)
ax.legend(loc='upper left')
plt.savefig(folder+'int2d_chan%04d'%i)
#print ('Saved channel %d'%i)
#plt.cla()
clear_list = [cbar, int2d, text_chan]
for obj in clear_list: obj.remove()
for obj in coll_list:
for coll in obj.collections:
coll.remove()
plt.close()
os.chdir(folder)
print ('Making movie...')
os.system(gif_command)
os.chdir(cwd)
class Height:
@property
def z_upper_func(self):
return self._z_upper_func
@z_upper_func.setter
def z_upper_func(self, upper):
print('Setting upper surface height function to', upper)
self._z_upper_func = upper
@z_upper_func.deleter
def z_upper_func(self):
print('Deleting upper surface height function')
del self._z_upper_func
@property
def z_lower_func(self):
return self._z_lower_func
@z_lower_func.setter
def z_lower_func(self, lower):
print('Setting lower surface height function to', lower)
self._z_lower_func = lower
@z_lower_func.deleter
def z_lower_func(self):
print('Deleting lower surface height function')
del self._z_lower_func
psi0 = 15*np.pi/180
@staticmethod
def z_cone(coord, psi=psi0):
R = coord['R']
z = np.tan(psi) * R
return z
@staticmethod
def z_cone_neg(coord, psi=psi0):
return -Height.z_cone(coord, psi)
@staticmethod
def z_upper_irregular(coord, file='0.txt', **kwargs):
R = coord['R']/sfu.au
Rf, zf = np.loadtxt(file)
z_interp = interp1d(Rf, zf, **kwargs)
return sfu.au*z_interp(R)
class Linewidth:
@property
def linewidth_func(self):
return self._linewidth_func
@linewidth_func.setter
def linewidth_func(self, linewidth):
print('Setting linewidth function to', linewidth)
self._linewidth_func = linewidth
@linewidth_func.deleter
def linewidth_func(self):
print('Deleting linewidth function')
del self._linewidth_func
@staticmethod
def linewidth_powerlaw(coord, L0=0.2, p=-0.4, q=0.3, R0=100*sfu.au, z0=100*sfu.au):
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
z = coord['z']
A = L0*R0**-p*z0**-q
return A*R**p*np.abs(z)**q
class Lineslope:
@property
def lineslope_func(self):
return self._lineslope_func
@lineslope_func.setter
def lineslope_func(self, lineslope):
print('Setting lineslope function to', lineslope)
self._lineslope_func = lineslope
@lineslope_func.deleter
def lineslope_func(self):
print('Deleting lineslope function')
del self._lineslope_func
@staticmethod
def lineslope_powerlaw(coord, Ls=5.0, p=0.0, q=0.0, R0=100*sfu.au, z0=100*sfu.au):
if p==0.0 and q==0.0:
return Ls
else:
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
z = coord['z']
A = Ls*R0**-p*z0**-q
return A*R**p*np.abs(z)**q
class ScaleHeight:
@property
def scaleheight_func(self):
return self._scaleheight_func
@scaleheight_func.setter
def scaleheight_func(self, surf):
print('Setting scaleheight function to', surf)
self._scaleheight_func = surf
@scaleheight_func.deleter
def scaleheight_func(self):
print('Deleting scaleheight function')
del self._scaleheight_func
@staticmethod
def powerlaw(coord, H0=6.5, psi=1.25, R0=100.0):
#Simple powerlaw in R.
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
R = R/sfu.au
return sfu.au*H0*(R/R0)**psi
class SurfaceDensity:
@property
def surfacedensity_func(self):
return self._surfacedensity_func
@surfacedensity_func.setter
def surfacedensity_func(self, surf):
print('Setting surfacedensity function to', surf)
self._surfacedensity_func = surf
@surfacedensity_func.deleter
def surfacedensity_func(self):
print('Deleting surfacedensity function')
del self._surfacedensity_func
@staticmethod
def powerlaw(coord, Ec=30.0, gamma=1.0, Rc=100.0):
#Simple powerlaw in R.
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
R = R/sfu.au
return Ec*(R/Rc)**-gamma
@staticmethod
def powerlaw_tapered(coord, Ec=30.0, Rc=100.0, gamma=1.0): #30.0 kg/m2 or 3.0 g/cm2
#Self-similar model of thin, viscous accretion disc, see Rosenfeld+2013.
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
R = R/sfu.au
return Ec*(R/Rc)**-gamma * np.exp(-(R/Rc)**(2-gamma))
class Temperature:
@property
def temperature_func(self):
return self._temperature_func
@temperature_func.setter
def temperature_func(self, temp):
print('Setting temperature function to', temp)
self._temperature_func = temp
@temperature_func.deleter
def temperature_func(self):
print('Deleting temperature function')
del self._temperature_func
@staticmethod
def temperature_powerlaw(coord, T0=100.0, R0=100*sfu.au, p=-0.4, z0=100*sfu.au, q=0.3):
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
z = coord['z']
A = T0*R0**-p*z0**-q
return A*R**p*np.abs(z)**q
[docs]class Velocity:
@property
def velocity_func(self):
return self._velocity_func
@velocity_func.setter
def velocity_func(self, vel):
print('Setting velocity function to', vel)
self._velocity_func = vel
if (vel is Velocity.keplerian_vertical_selfgravity or
vel is Velocity.keplerian_vertical_selfgravity_pressure):
"""
R_true_au = self.R_true/sfu.au
tmp = np.unique(np.array(R_true_au).astype(np.int32))
#Adding missing upper bound for interp1d purposes:
self.R_1d = np.append(tmp, tmp[-1]+1) #short 1D list of R in au
"""
R_true_au = self.R_true/sfu.au
tmp = np.max(R_true_au)
self.R_1d = np.arange(1, 4*tmp, 5) #short 1D list of R in au
@velocity_func.deleter
def velocity_func(self):
print('Deleting velocity function')
del self._velocity_func
[docs] @staticmethod
def keplerian(coord, Mstar=1.0, vel_sign=1, vsys=0):
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
return vel_sign*np.sqrt(sfc.G*Mstar/R) * 1e-3
[docs] @staticmethod
def keplerian_vertical(coord, Mstar=1.0, vel_sign=1, vsys=0):
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
if 'r' not in coord.keys(): r = hypot_func(R, coord['z'])
else: r = coord['r']
return vel_sign*np.sqrt(sfc.G*Mstar/r**3)*R * 1e-3
[docs] @staticmethod
def keplerian_pressure(coord, Mstar=1.0, vel_sign=1, vsys=0,
gamma=1.0, beta=0.5, H0=6.5, R0=100.0):
#pressure support taken from Lodato's 2021 notes and Viscardi+2021 thesis
#--> pressure term assumes vertically isothermal disc, T propto R**-beta, and surfdens propto R**-gamma (using Rosenfeld+2013 notation).
#--> R0 is the ref radius for scaleheight powerlaw, no need to be set as free par during mcmc.
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
alpha = 1.5 + gamma + 0.5*beta
psi = -0.5*beta + 1.5
H = ScaleHeight.powerlaw({'R': R}, H0=H0, R0=R0, psi=psi)
vk2 = sfc.G*Mstar/R
vp2 = vk2*(-alpha*(H/R)**2) #pressure term
return vel_sign*np.sqrt(vk2 + vp2) * 1e-3
[docs] @staticmethod
def keplerian_vertical_pressure(coord, Mstar=1.0, vel_sign=1, vsys=0,
gamma=1.0, beta=0.5, H0=6.5, R0=100.0):
#pressure support taken from Lodato's 2021 notes and Viscardi+2021 thesis
#--> pressure term assumes vertically isothermal disc, T propto R**-beta, and surfdens propto R**-gamma (using Rosenfeld+2013 notation).
#--> R0 is the ref radius for scaleheight powerlaw, no need to be set as free par during mcmc.
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
if 'r' not in coord.keys(): r = hypot_func(R, coord['z'])
else: r = coord['r']
z = coord['z']
z_R2 = (z/R)**2
z_R32 = (1+z_R2)**1.5
alpha = 1.5 + gamma + 0.5*beta
dlogH_dlogR = 1.5 - 0.5*beta
psi = -0.5*beta + 1.5
H = ScaleHeight.powerlaw({'R': R}, H0=H0, R0=R0, psi=psi)
vk2 = sfc.G*Mstar/R
vp2 = vk2*( -alpha*(H/R)**2 + (2/z_R32)*( 1+1.5*z_R2-z_R32-dlogH_dlogR*(1+z_R2-z_R32) ) ) #pressure term
return vel_sign*np.sqrt(R**2*sfc.G*Mstar/r**3 + vp2) * 1e-3
[docs] @staticmethod
def keplerian_vertical_selfgravity(coord, Mstar=1.0, vel_sign=1, vsys=0,
Ec=30.0, gamma=1.0, Rc=100.0,
surfacedensity_func=SurfaceDensity.powerlaw):
#disc self-gravity contribution taken from Veronesi+2021
#Surface density function defaults to powerlaw, surfdens propto R**-gamma
#--> Rc is the critical radius, no need to be set as free par during mcmc if input function is powerlaw.
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
if 'r' not in coord.keys(): r = hypot_func(R, coord['z'])
else: r = coord['r']
R_1d = coord['R_1d'] #in au to ease computing below
z_1d = coord['z_1d'] #in au
def SG_integral(Rp, R, z):
dR = np.append(Rp[0], Rp[1:]-Rp[:-1]) ##
Rp_R = Rp/R
RpxR = Rp*R
k2 = 4*RpxR/((R+Rp)**2 + z**2)
k = np.sqrt(k2)
K1 = ellipk(k2) #It's k2 (not k) here. The def in the Gradshteyn+1980 book differs from that of scipy.
E2 = ellipe(k2)
#surf_dens = SurfaceDensity.powerlaw_tapered({'R': Rp*sfu.au}, Ec=Ec, Rc=Rc, gamma=gamma)
surf_dens = surfacedensity_func({'R': Rp*sfu.au}, Ec=Ec, Rc=Rc, gamma=gamma)
val = (K1 - 0.25*(k2/(1-k2))*(Rp_R - R/Rp + z**2/RpxR)*E2) * np.sqrt(Rp_R)*k*surf_dens
#return sfc.G*val*sfu.au
return sfc.G*np.sum(val*dR)*sfu.au ##
R_len = len(R_1d)
SG_1d = []
for i in range(R_len):
#SG_1d.append(quad(SG_integral, 0, np.inf, args=(R_1d[i], z_1d[i]), limit=100)[0])
SG_1d.append(SG_integral(R_1d, R_1d[i], z_1d[i])) ##
SG_2d = interp1d(R_1d, SG_1d)
return vel_sign*np.sqrt(R**2*sfc.G*Mstar/r**3 + SG_2d(R/sfu.au)) * 1e-3
[docs] @staticmethod
def keplerian_vertical_selfgravity_pressure(coord, Mstar=1.0, vel_sign=1, vsys=0,
Ec=30.0, gamma=1.0,
beta=0.5,
H0=6.5,
Rc=100.0, R0=100.0):
#--> Rc and R0 are reference radii for surfdens and scaleheight powerlaws, no need to be set as free pars during mcmc.
Mstar *= sfu.MSun
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
if 'r' not in coord.keys(): r = hypot_func(R, coord['z'])
else: r = coord['r']
R_1d = coord['R_1d']
z_1d = coord['z_1d']
def SG_integral(Rp, R, z):
dR = np.append(Rp[0], Rp[1:]-Rp[:-1]) ##
Rp_R = Rp/R
RpxR = Rp*R
k2 = 4*RpxR/((R+Rp)**2 + z**2)
k = np.sqrt(k2)
K1 = ellipk(k2) #It's k2 (not k) here. The def in the Gradshteyn+1980 book differs from that of scipy.
E2 = ellipe(k2)
surf_dens = SurfaceDensity.powerlaw({'R': Rp*sfu.au}, Ec=Ec, Rc=Rc, gamma=gamma)
val = (K1 - 0.25*(k2/(1-k2))*(Rp_R - R/Rp + z**2/RpxR)*E2) * np.sqrt(Rp_R)*k*surf_dens
#return sfc.G*val*sfu.au
return sfc.G*np.sum(val*dR)*sfu.au ##
R_len = len(R_1d)
SG_1d = []
for i in range(R_len):
#SG_1d.append(quad(SG_integral, 0, np.inf, args=(R_1d[i], z_1d[i]), limit=100)[0])
SG_1d.append(SG_integral(R_1d, R_1d[i], z_1d[i])) ##
SG_2d = interp1d(R_1d, SG_1d)
#calculate pressure support
z = coord['z']
z_R2 = (z/R)**2
z_R32 = (1+z_R2)**1.5
alpha = 1.5 + gamma + 0.5*beta
dlogH_dlogR = 1.5 - 0.5*beta
psi = -0.5*beta + 1.5
H = ScaleHeight.powerlaw({'R': R}, H0=H0, R0=R0, psi=psi)
vk2 = sfc.G*Mstar/R
vp2 = vk2*( -alpha*(H/R)**2 + (2/z_R32)*( 1+1.5*z_R2-z_R32-dlogH_dlogR*(1+z_R2-z_R32) ) )
return vel_sign*np.sqrt(R**2*sfc.G*Mstar/r**3 + SG_2d(R/sfu.au) + vp2) * 1e-3
[docs]class Intensity:
@property
def beam_info(self):
return self._beam_info
@beam_info.setter
def beam_info(self, beam_info):
print('Setting beam_info var to', beam_info)
self._beam_info = beam_info
@beam_info.deleter
def beam_info(self):
print('Deleting beam_info var')
del self._beam_info
@property
def beam_kernel(self):
return self._beam_kernel
@beam_kernel.setter
def beam_kernel(self, beam_kernel):
print('Setting beam_kernel var to', beam_kernel)
x_stddev = beam_kernel.model.x_stddev.value
y_stddev = beam_kernel.model.y_stddev.value
self._beam_area = 2*np.pi*x_stddev*y_stddev
self._beam_kernel = beam_kernel
@beam_kernel.deleter
def beam_kernel(self):
print('Deleting beam_kernel var')
del self._beam_kernel
@property
def beam_from(self):
return self._beam_from
@beam_from.setter
def beam_from(self, file):
#Rework this, missing beam kwargs info
print('Setting beam_from var to', file)
if file: self.beam_info, self.beam_kernel = Tools._get_beam_from(file) #Calls beam_kernel setter
self._beam_from = file
@beam_from.deleter
def beam_from(self):
print('Deleting beam_from var')
del self._beam_from
@property
def use_temperature(self):
return self._use_temperature
@use_temperature.setter
def use_temperature(self, use):
use = bool(use)
print('Setting use_temperature var to', use)
if use: self.line_profile = self.line_profile_temp
#else: self.line_profile = self.line_profile_v_sigma
self._use_temperature = use
@use_temperature.deleter
def use_temperature(self):
print('Deleting use_temperature var')
del self._use_temperature
@property
def use_full_channel(self):
return self._use_full_channel
@use_full_channel.setter
def use_full_channel(self, use): #Needs remake, there is now a new kernel (line_profile_bell)
use = bool(use)
print('Setting use_full_channel var to', use)
if use:
if self.use_temperature: self.line_profile = self.line_profile_temp_full
else: self.line_profile = self.line_profile_v_sigma_full
else:
if self.use_temperature: self.line_profile = self.line_profile_temp
else: self.line_profile = self.line_profile_v_sigma
self._use_full_channel = use
@use_full_channel.deleter
def use_full_channel(self):
print('Deleting use_full_channel var')
del self._use_full_channel
@property
def line_profile(self):
return self._line_profile
@line_profile.setter
def line_profile(self, profile):
print('Setting line profile function to', profile)
self._line_profile = profile
@line_profile.deleter
def line_profile(self):
print('Deleting intensity function')
del self._line_profile
@property
def intensity_func(self):
return self._intensity_func
@intensity_func.setter
def intensity_func(self, intensity):
print('Setting intensity function to', intensity)
self._intensity_func = intensity
@intensity_func.deleter
def intensity_func(self):
print('Deleting intensity function')
del self._intensity_func
[docs] @staticmethod
def intensity_powerlaw(coord, I0=30.0, R0=100*sfu.au, p=-0.4, z0=100*sfu.au, q=0.3):
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
z = coord['z']
A = I0*R0**-p*z0**-q
return A*R**p*np.abs(z)**q
[docs] @staticmethod
def nuker(coord, I0=30.0, Rt=100*sfu.au, alpha=-0.5, gamma=0.1, beta=0.2):
if 'R' not in coord.keys(): R = hypot_func(coord['x'], coord['y'])
else: R = coord['R']
A = I0*Rt**gamma
return A*(R**-gamma) * (1+(R/Rt)**alpha)**((gamma-beta)/alpha)
[docs] @staticmethod
def line_profile_subchannel(line_profile_func, v_chan, v, v_sigma, b_slope, channel_width=0.1, **kwargs): #Currently not used
half_chan = 0.5*channel_width
v0 = v_chan - half_chan
v1 = v_chan + half_chan
nsub = 10
vsub = np.linspace(v0, v1, nsub)
dvsub = vsub[1]-vsub[0]
J = 0
for vs in vsub:
J += line_profile_func(vs, v, v_sigma, b_slope, **kwargs)
J = J * dvsub/channel_width
return J
[docs] @staticmethod
def line_profile_temp(v_chan, v, T, dum, v_turb=0.0, mmol=2*sfu.amu):
v_sigma = np.sqrt(sfc.kb*T/mmol + v_turb**2) * 1e-3 #in km/s
#return 1/(np.sqrt(np.pi)*v_sigma) * np.exp(-((v-v_chan)/v_sigma)**2)
return np.exp(-0.5*((v-v_chan)/v_sigma)**2)
[docs] @staticmethod
def line_profile_temp_full(v_chan, v, T, dum, v_turb=0, mmol=2*sfu.amu, channel_width=0.1):
v_sigma = np.sqrt(sfc.kb*T/mmol + v_turb**2) * 1e-3 #in km/s
half_chan = 0.5*channel_width
v0 = v_chan - half_chan
v1 = v_chan + half_chan
nsub = 10
vsub = np.linspace(v0, v1, nsub)
dvsub = vsub[1]-vsub[0]
J = 0
for vs in vsub:
J += np.exp(-0.5*((v-vs)/v_sigma)**2)
J = J * dvsub/channel_width
return J
[docs] @staticmethod
def line_profile_v_sigma(v_chan, v, v_sigma, dum):
#return 1/(np.sqrt(2*np.pi)*v_sigma) * np.exp(-0.5*((v-v_chan)/v_sigma)**2)
#return np.where(np.abs(v-v_chan) < 0.5*v_sigma, 1, 0)
return np.exp(-0.5*((v-v_chan)/v_sigma)**2)
[docs] @staticmethod
def line_profile_v_sigma_full(v_chan, v, v_sigma, dum, channel_width=0.1):
half_chan = 0.5*channel_width
v0 = v_chan - half_chan
v1 = v_chan + half_chan
nsub = 10
vsub = np.linspace(v0, v1, nsub)
dvsub = vsub[1]-vsub[0]
J = 0
for vs in vsub:
J += np.exp(-0.5*((v-vs)/v_sigma)**2)
J = J * dvsub/channel_width
return J
[docs] @staticmethod
def line_profile_bell(v_chan, v, v_sigma, b_slope):
return 1/(1+np.abs((v-v_chan)/v_sigma)**(2*b_slope))
[docs] @staticmethod
def line_profile_bell_full(v_chan, v, v_sigma, b_slope, channel_width=0.1):
half_chan = 0.5*channel_width
v0 = v_chan - half_chan
v1 = v_chan + half_chan
nsub = 10
vsub = np.linspace(v0, v1, nsub)
dvsub = vsub[1]-vsub[0]
J = 0
for vs in vsub:
J += 1/(1+np.abs((v-vs)/v_sigma)**(2*b_slope))
J = J * dvsub/channel_width
return J
[docs] def get_line_profile(self, v_chan, vel2d, linew2d, lineb2d, **kwargs):
if self.subpixels:
v_near, v_far = [], []
for i in range(self.subpixels_sq):
v_near.append(self.line_profile(v_chan, vel2d[i]['upper'], linew2d['upper'], lineb2d['upper'], **kwargs))
v_far.append(self.line_profile(v_chan, vel2d[i]['lower'], linew2d['lower'], lineb2d['lower'], **kwargs))
integ_v_near = np.sum(np.array(v_near), axis=0) * self.sub_dA / self.pix_dA
integ_v_far = np.sum(np.array(v_far), axis=0) * self.sub_dA / self.pix_dA
return integ_v_near, integ_v_far
else:
v_near = self.line_profile(v_chan, vel2d['upper'], linew2d['upper'], lineb2d['upper'], **kwargs)
v_far = self.line_profile(v_chan, vel2d['lower'], linew2d['lower'], lineb2d['lower'], **kwargs)
return v_near, v_far
[docs] def get_channel(self, velocity2d, intensity2d, linewidth2d, lineslope2d, v_chan, **kwargs):
vel2d, int2d, linew2d, lineb2d = velocity2d, {}, {}, {}
if isinstance(intensity2d, numbers.Number): int2d['upper'] = int2d['lower'] = intensity2d
else: int2d = intensity2d
if isinstance(linewidth2d, numbers.Number): linew2d['upper'] = linew2d['lower'] = linewidth2d
else: linew2d = linewidth2d
if isinstance(lineslope2d, numbers.Number): lineb2d['upper'] = lineb2d['lower'] = lineslope2d
else: lineb2d = lineslope2d
v_near, v_far = self.get_line_profile(v_chan, vel2d, linew2d, lineb2d, **kwargs)
"""
v_near_clean = np.where(np.isnan(v_near), -np.inf, v_near)
v_far_clean = np.where(np.isnan(v_far), -np.inf, v_far)
#vmap_full = np.array([v_near_clean, v_far_clean]).max(axis=0)
int2d_near = np.where(np.isnan(int2d['upper']), -np.inf, int2d['upper'] * v_near_clean)# / v_near_clean.max())
int2d_far = np.where(np.isnan(int2d['lower']), -np.inf, int2d['lower'] * v_far_clean)# / v_far_clean.max())
"""
int2d_full = np.nanmax([int2d_near, int2d_far], axis=0)
if self.beam_kernel:
"""
inf_mask = np.isinf(int2d_full)
"""
inf_mask = np.isnan(int2d_full)
int2d_full = np.where(inf_mask, 0.0, int2d_full)
int2d_full = self._beam_area*convolve(int2d_full, self.beam_kernel, preserve_nan=False)
return int2d_full
[docs] def get_cube(self, vchannels, velocity2d, intensity2d, linewidth2d, lineslope2d, make_convolve=True,
nchan=None, tb={'nu': False, 'beam': False, 'full': True}, return_data_only=False, **kwargs):
vel2d, int2d, linew2d, lineb2d = velocity2d, {}, {}, {}
line_profile = self.line_profile
if nchan is None: nchan=len(vchannels)
cube = []
if isinstance(intensity2d, numbers.Number): int2d['upper'] = int2d['lower'] = intensity2d
else: int2d = intensity2d
if isinstance(linewidth2d, numbers.Number): linew2d['upper'] = linew2d['lower'] = linewidth2d
else: linew2d = linewidth2d
if isinstance(lineslope2d, numbers.Number): lineb2d['upper'] = lineb2d['lower'] = lineslope2d
else: lineb2d = lineslope2d
"""
int2d_near_nan = np.isnan(int2d['upper']) #~int2d['upper'].mask
int2d_far_nan = np.isnan(int2d['lower']) #~int2d['lower'].mask
if self.subpixels:
vel2d_near_nan = np.isnan(vel2d[self.sub_centre_id]['upper'])
vel2d_far_nan = np.isnan(vel2d[self.sub_centre_id]['lower'])
else:
vel2d_near_nan = np.isnan(vel2d['upper']) #~vel2d['upper'].mask
vel2d_far_nan = np.isnan(vel2d['lower']) #~vel2d['lower'].mask
"""
#for _ in itertools.repeat(None, nchan):
for vchan in vchannels:
v_near, v_far = self.get_line_profile(vchan, vel2d, linew2d, lineb2d, **kwargs)
"""
v_near_clean = np.where(vel2d_near_nan, -np.inf, v_near)
v_far_clean = np.where(vel2d_far_nan, -np.inf, v_far)
#vmap_full = np.array([v_near_clean, v_far_clean]).max(axis=0)
int2d_near = np.where(int2d_near_nan, -np.inf, int2d['upper'] * v_near_clean)
int2d_far = np.where(int2d_far_nan, -np.inf, int2d['lower'] * v_far_clean)
"""
int2d_near = int2d['upper'] * v_near
int2d_far = int2d['lower'] * v_far
#vel nans might differ from Int nans when a z surf is zero and SG is active, then nanmax must be used:
int2d_full = np.nanmax([int2d_near, int2d_far], axis=0)
if make_convolve and self.beam_kernel:
"""
inf_mask = np.isinf(int2d_full)
"""
inf_mask = np.isnan(int2d_full)
int2d_full = np.where(inf_mask, 0.0, int2d_full)
int2d_full = self._beam_area*convolve(int2d_full, self.beam_kernel, preserve_nan=False)
cube.append(int2d_full)
if return_data_only: return np.asarray(cube)
else: return Cube(nchan, vchannels, np.asarray(cube), beam=self.beam_info, beam_kernel=self.beam_kernel, tb=tb)
[docs] @staticmethod
def make_channels_movie(vchan0, vchan1, velocity2d, intensity2d, linewidth2d, lineslope2d, nchans=30, folder='./movie_channels/', **kwargs):
channels = np.linspace(vchan0, vchan1, num=nchans)
int2d_cube = []
for i, vchan in enumerate(channels):
int2d = Intensity.get_channel(velocity2d, intensity2d, linewidth2d, lineslope2d, vchan, **kwargs)
int2d_cube.append(int2d)
extent = [-600, 600, -600, 600]
plt.imshow(int2d, cmap='binary', extent=extent, origin='lower', vmax=np.max(linewidth2d['upper']))
plt.xlabel('au')
plt.ylabel('au')
plt.text(200, 500, '%.1f km/s'%vchan)
cbar = plt.colorbar()
cbar.set_label(r'Brightness Temperature [K]')
plt.contour(velocity2d['upper'], levels=[vchan], colors='red', linestyles='--', linewidths=1.3, extent = extent)
plt.contour(velocity2d['lower'], levels=[vchan], colors='red', linestyles=':', linewidths=1.3, extent = extent)
plt.plot([None],[None], color='red', linestyle='--', linewidth=2, label='Near side')
plt.plot([None],[None], color='red', linestyle=':', linewidth=2, label='Far side')
plt.legend(loc='upper left')
plt.savefig(folder+'int2d_chan%04d'%i)
print ('Saving channel %d'%i)
plt.close()
os.chdir(folder)
print ('Making channels movie...')
os.system('convert -delay 10 *int2d* cube_channels.gif')
return np.array(int2d_cube)
class Mcmc:
@staticmethod
def _get_params2fit(mc_params, boundaries):
header = []
kind = []
params_indices = {}
boundaries_list = []
check_param2fit = lambda val: val and isinstance(val, bool)
i = 0
for key in mc_params:
if isinstance(mc_params[key], dict):
params_indices[key] = {}
for key2 in mc_params[key]:
if check_param2fit(mc_params[key][key2]):
header.append(key2)
kind.append(key)
boundaries_list.append(boundaries[key][key2])
params_indices[key][key2] = i
i+=1
else: raise InputError(mc_params, 'Wrong input parameters. Base keys in mc_params must be categories; parameters of a category must be within a dictionary as well.')
return header, kind, len(header), boundaries_list, params_indices
@staticmethod
def plot_walkers(samples, best_params, nstats=None, header=None, kind=None, tag=''):
npars, nsteps, nwalkers = samples.shape
if kind is not None:
ukind, neach = np.unique(kind, return_counts=True)
ncols = len(ukind)
nrows = np.max(neach)
else:
ukind = ['']
ncols = 1
nrows = npars
kind = ['' for i in range(nrows)]
if header is not None:
if len(header) != npars: raise InputError(header, 'Number of headers must be equal to number of parameters')
kind_col = {ukind[i]: i for i in range(ncols)}
col_count = np.zeros(ncols).astype('int')
fig, ax = plt.subplots(nrows=nrows, ncols=ncols, figsize=(3*ncols, 3*nrows))
x0_hline = 0
for k, key in enumerate(kind):
j = kind_col[key]
i = col_count[j]
for walker in samples[k].T:
if ncols == 1: axij = ax[i]
elif nrows==1: axij = ax[j]
else: axij = ax[i][j]
axij.plot(walker, alpha=0.1, lw=1.0, color='k')
if header is not None:
#axij.set_ylabel(header[k])
axij.text(0.1, 0.1, header[k], va='center', ha='left', fontsize=MEDIUM_SIZE+2, transform=axij.transAxes, rotation=0) #0.06, 0.95, va top, rot 90
if i==0: axij.set_title(key, pad=10, fontsize=MEDIUM_SIZE+2)
if nstats is not None:
axij.axvline(nsteps-nstats, ls=':', lw=2, color='r')
x0_hline = nsteps-nstats
axij.plot([x0_hline, nsteps], [best_params[k]]*2, ls='-', lw=3, color='dodgerblue')
axij.text((nsteps-1)+0.03*nsteps, best_params[k], '%.3f'%best_params[k], va='center', color='dodgerblue', fontsize=MEDIUM_SIZE+1, rotation=90)
axij.tick_params(axis='y', which='major', labelsize=SMALL_SIZE, rotation=45)
axij.set_xlim(None, nsteps-1 + 0.01*nsteps)
col_count[j]+=1
for j in range(ncols):
i_last = col_count[j]-1
if ncols==1: ax[i_last].set_xlabel('Steps')
elif nrows==1: ax[j].set_xlabel('Steps')
else: ax[i_last][j].set_xlabel('Steps')
if nrows>1 and i_last<nrows-1: #Remove empty axes
for k in range((nrows-1)-i_last): ax[nrows-1-k][j].axis('off')
plt.tight_layout()
plt.savefig('mc_walkers_%s_%dwalkers_%dsteps.png'%(tag, nwalkers, nsteps), dpi=300)
plt.close()
@staticmethod
def plot_corner(samples, labels=None, quantiles=None):
"""Plot corner plot to check parameter correlations. Requires the 'corner' module"""
import corner
quantiles = [0.16, 0.5, 0.84] if quantiles is None else quantiles
corner.corner(samples, labels=labels, title_fmt='.4f', bins=30,
quantiles=quantiles, show_titles=True)
def ln_likelihood(self, new_params, **kwargs):
for i in range(self.mc_nparams):
#Assuming uniform prior likelihood (within boundaries) for all parameters
if not (self.mc_boundaries_list[i][0] < new_params[i] < self.mc_boundaries_list[i][1]): return -np.inf
else: self.params[self.mc_kind[i]][self.mc_header[i]] = new_params[i]
vel2d, int2d, linew2d, lineb2d = self.make_model(**kwargs)
lnx2=0
model_cube = self.get_cube(self.channels, vel2d, int2d, linew2d, lineb2d, nchan=self.nchan, return_data_only=True)#, tb = {'nu': 230, 'beam': self.beam_info})
for i in range(self.nchan):
model_chan = model_cube[i] #model_cube.data[i] #self.get_channel(vel2d, int2d, linew2d, lineb2d, self.channels[i])
mask_data = np.isfinite(self.data[i])
mask_model = np.isfinite(model_chan)
data = np.where(np.logical_and(mask_model, ~mask_data), 0, self.data[i])
model = np.where(np.logical_and(mask_data, ~mask_model), 0, model_chan)
mask = np.logical_and(mask_data, mask_model)
lnx = np.where(mask, np.power((data - model)/self.noise_stddev, 2), 0)
#lnx = -0.5 * np.sum(lnx2[~np.isnan(lnx2)] * 0.00001)# * self.ivar)
lnx2 += -0.5 * np.sum(lnx)
#print (new_params, "\nLOG LIKELIHOOD %.4e"%lnx2)
return lnx2 if np.isfinite(lnx2) else -np.inf
[docs]class General2d(Height, Velocity, Intensity, Linewidth, Lineslope, Tools, Mcmc): #Inheritance should only be from Intensity and Mcmc, the others contain just staticmethods...
def __init__(self, grid, prototype=False, subpixels=False, beam=None, skygrid=None, kwargs_beam={}):
Tools._print_logo()
self.flags = {'disc': True, 'env': False}
self.grid = grid
self.prototype = prototype
if skygrid is None: skygrid = grid
self._beam_info = False
self._beam_from = False #Should be deprecated
self._beam_kernel = False
self._beam_area = False
if beam is not None:
self.beam_info, self.beam_kernel = Tools._get_beam_from(beam, dpix=grid.step[0], **kwargs_beam)
self._z_upper_func = General2d.z_cone
self._z_lower_func = General2d.z_cone_neg
self._velocity_func = General2d.keplerian
self._intensity_func = General2d.intensity_powerlaw
self._linewidth_func = General2d.linewidth_powerlaw
self._lineslope_func = General2d.lineslope_powerlaw
self._line_profile = General2d.line_profile_bell
self._use_temperature = False
self._use_full_channel = False
x_true, y_true = grid.XYZ[:2]
self.phi_true = np.arctan2(y_true, x_true) #grid.rRTP[3]
self.R_true = hypot_func(x_true, y_true) #grid.rRTP[1] #Slightly different as in the grid object the pixels R=0 actually take the closest-neighbour value. Current approach masks r,R=0
self.x_true, self.y_true = x_true, y_true
self.mesh = np.meshgrid(skygrid.XYZgrid[0], skygrid.XYZgrid[1]) #disc grid will be interpolated onto this sky grid in make_model(). Must match data dims for mcmc.
self.R_1d = None #will be modified if selfgravity is considered
if subpixels and isinstance(subpixels, int):
if subpixels%2 == 0: subpixels+=1 #If input even becomes odd to contain pxl centre
pix_size = grid.step[0]
dx = dy = pix_size / subpixels
centre = int(round((subpixels-1)/2.))
centre_sq = int(round((subpixels**2-1)/2.))
x_shift = np.arange(0, subpixels*dx, dx) - dx*centre
y_shift = np.arange(0, subpixels*dy, dy) - dy*centre
sub_x_true = [x_true + x0 for x0 in x_shift]
sub_y_true = [y_true + y0 for y0 in y_shift]
self.sub_R_true = [[hypot_func(sub_x_true[j], sub_y_true[i]) for j in range(subpixels)] for i in range(subpixels)]
self.sub_phi_true = [[np.arctan2(sub_y_true[i], sub_x_true[j]) for j in range(subpixels)] for i in range(subpixels)]
self.sub_x_true = sub_x_true
self.sub_y_true = sub_x_true
self.sub_dA = dx*dy
self.pix_dA = pix_size**2
self.sub_centre_id = centre_sq
self.subpixels = subpixels
self.subpixels_sq = subpixels**2
else: self.subpixels=False
#Get and print default parameters for default functions
self.categories = ['velocity', 'orientation', 'intensity', 'linewidth', 'lineslope', 'height_upper', 'height_lower']
self.mc_params = {'velocity': {'Mstar': True,
'vel_sign': 1,
'vsys': 0},
'orientation': {'incl': True,
'PA': True,
'xc': False,
'yc': False},
'intensity': {'I0': True,
'p': True,
'q': False},
'linewidth': {'L0': True,
'p': True,
'q': 0.1},
'lineslope': {'Ls': False,
'p': False,
'q': False},
'height_upper': {'psi': True},
'height_lower': {'psi': True},
}
self.mc_boundaries = {'velocity': {'Mstar': [0.05, 5.0],
'vsys': [-10, 10],
'Ec': [0, 300],
'Rc': [50, 300],
'gamma': [0.5, 2.0],
'beta': [0, 1.0],
'H0': [0.1, 20]},
'orientation': {'incl': [-np.pi/3, np.pi/3],
'PA': [-np.pi, np.pi],
'xc': [-50, 50],
'yc': [-50, 50]},
'intensity': {'I0': [0, 1000],
'p': [-10.0, 10.0],
'q': [0, 5.0]},
'linewidth': {'L0': [0.005, 5.0],
'p': [-5.0, 5.0],
'q': [-5.0, 5.0]},
'lineslope': {'Ls': [0.005, 20],
'p': [-5.0, 5.0],
'q': [-5.0, 5.0]},
'height_upper': {'psi': [0, np.pi/2]},
'height_lower': {'psi': [0, np.pi/2]}
}
if prototype:
self.params = {}
for key in self.categories: self.params[key] = {}
print ('Available categories for prototyping:', self.params)
else:
self.mc_header, self.mc_kind, self.mc_nparams, self.mc_boundaries_list, self.mc_params_indices = General2d._get_params2fit(self.mc_params, self.mc_boundaries)
#print ('Default parameter header for mcmc fitting:', self.mc_header)
#print ('Default parameters to fit and fixed parameters:', self.mc_params)
[docs] def plot_quick_attributes(self, R_in=10, R_out=300, surface='upper', fig_width=80, fig_height=25,
height=True, velocity=True, linewidth=True, peakintensity=True, **kwargs_plot):
import termplotlib as tpl #pip install termplotlib. Requires gnuplot: brew install gnuplot (for OSX users)
kwargs = dict(plot_command="plot '-' w steps", xlabel='Offset [au]', label=None, xlim=None, ylim=None) #plot_command: lines, steps, dots, points, boxes
kwargs.update(kwargs_plot)
R = np.linspace(R_in, R_out, 100)
if surface=='upper': coords = {'R': R*sfu.au, 'z': self.z_upper_func({'R': R*sfu.au}, **self.params['height_upper'])}
if surface=='lower': coords = {'R': R*sfu.au, 'z': self.z_lower_func({'R': R*sfu.au}, **self.params['height_lower'])}
def make_plot(func, kind, tag=None, val_unit=1, surface=surface):
if tag is None: tag=kind
fig = tpl.figure()
val = func(coords, **self.params[kind])
fig.plot(R, val/val_unit, width=fig_width, height=fig_height, title=tag+' '+surface, **kwargs)
fig.show()
if height and surface=='upper': make_plot(self.z_upper_func, 'height_upper', val_unit=sfu.au, surface='')
if height and surface=='lower': make_plot(self.z_lower_func, 'height_lower', val_unit=sfu.au, surface='')
if velocity: make_plot(self.velocity_func, 'velocity')
if linewidth: make_plot(self.linewidth_func, 'linewidth')
if peakintensity: make_plot(self.intensity_func, 'intensity', tag='peak intensity')
[docs] def run_mcmc(self, data, channels, p0_mean=[], p0_stddev=1e-3, noise_stddev=1.0,
nwalkers=30, nsteps=100, frac_stats=0.5, frac_stddev=1e-3,
nthreads=None,
backend=None, #emcee
use_zeus=False,
#custom_header={}, custom_kind={}, mc_layers=1,
z_mirror=False,
plot_walkers=True, plot_corner=True, tag='',
mpi=False,
**kwargs_model):
#p0: list of initial guesses. In the future will support 'optimize', 'min_bound', 'max_bound'
self.data = data
self.channels = channels
self.nchan = len(channels)
self.noise_stddev = noise_stddev
if use_zeus: import zeus as sampler_id
else: import emcee as sampler_id
kwargs_model.update({'z_mirror': z_mirror})
if z_mirror:
for key in self.mc_params['height_lower']: self.mc_params['height_lower'][key] = 'height_upper_mirror'
self.mc_header, self.mc_kind, self.mc_nparams, self.mc_boundaries_list, self.mc_params_indices = General2d._get_params2fit(self.mc_params, self.mc_boundaries)
self.params = copy.deepcopy(self.mc_params)
#if p0_mean == 'optimize': p0_mean = optimize_p0()
if isinstance(p0_mean, (list, tuple, np.ndarray)):
if len(p0_mean) != self.mc_nparams: raise InputError(p0_mean, 'Length of input p0_mean must be equal to the number of parameters to fit: %d'%self.mc_nparams)
else: pass
nstats = int(round(frac_stats*(nsteps-1))) #python2 round returns float, python3 returns int
ndim = self.mc_nparams
p0_stddev = [frac_stddev*(self.mc_boundaries_list[i][1] - self.mc_boundaries_list[i][0]) for i in range(self.mc_nparams)]
p0 = np.random.normal(loc=p0_mean,
scale=p0_stddev,
size=(nwalkers, ndim)
)
Tools._break_line()
print ('Initialising MCMC routines with the following (%d) parameters:\n'%self.mc_nparams)
if found_termtables:
bound_left, bound_right = np.array(self.mc_boundaries_list).T
tt_header = ['Attribute', 'Parameter', 'Mean initial guess', 'Par stddev', 'Lower bound', 'Upper bound']
tt_data = np.array([self.mc_kind, self.mc_header, p0_mean, p0_stddev, bound_left, bound_right]).T
termtables.print(
tt_data,
header=tt_header,
style=termtables.styles.markdown,
padding=(0, 1),
alignment="lcllll"
)
else:
print ('Parameter header set for mcmc model fitting:', self.mc_header)
print ('Parameters to fit and fixed parameters:')
pprint.pprint(self.mc_params)
print ('Number of mc parameters:', self.mc_nparams)
print ('Parameter attributes:', self.mc_kind)
print ('Parameter boundaries:')
pprint.pprint(self.mc_boundaries_list)
print ('Mean for initial guess p0:', p0_mean)
print ('p0 pars stddev:', p0_stddev)
Tools._break_line(init='\n', end='\n\n')
if mpi: #Needs schwimmbad library: $ pip install schwimmbad
from schwimmbad import MPIPool
with MPIPool() as pool:
if not pool.is_master():
pool.wait()
sys.exit(0)
sampler = sampler_id.EnsembleSampler(nwalkers, ndim, self.ln_likelihood, pool=pool, backend=backend, kwargs=kwargs_model)
start = time.time()
if backend is not None and backend.iteration!=0:
sampler.run_mcmc(None, nsteps, progress=True)
else:
sampler.run_mcmc(p0, nsteps, progress=True)
end = time.time()
multi_time = end - start
print("MPI multiprocessing took {0:.1f} seconds".format(multi_time))
else:
with Pool(processes=nthreads) as pool:
sampler = sampler_id.EnsembleSampler(nwalkers, ndim, self.ln_likelihood, pool=pool, backend=backend, kwargs=kwargs_model)
start = time.time()
if backend is not None and backend.iteration!=0:
sampler.run_mcmc(None, nsteps, progress=True)
else:
sampler.run_mcmc(p0, nsteps, progress=True)
end = time.time()
multi_time = end - start
print("Multiprocessing took {0:.1f} seconds".format(multi_time))
sampler_chain = sampler.chain
if use_zeus: sampler_chain = np.swapaxes(sampler.chain, 0, 1) #zeus chains shape (nsteps, nwalkers, npars) must be swapped
samples = sampler_chain[:, -nstats:] #3d matrix, chains shape (nwalkers, nstats, npars)
samples = samples.reshape(-1, samples.shape[-1]) #2d matrix, shape (nwalkers*nstats, npars). With -1 numpy guesses the x dimensionality
best_params = np.median(samples, axis=0)
self.mc_sampler = sampler
self.mc_samples = samples
self.best_params = best_params
samples_all = sampler_chain[:, :] #3d matrix, chains shape (nwalkers, nsteps, npars)
samples_all = samples_all.reshape(-1, samples.shape[-1]) #2d matrix, shape (nwalkers*nsteps, npars)
self.mc_samples_all = samples_all
#Errors: +- 68.2 percentiles
errpos, errneg = [], []
for i in range(self.mc_nparams):
tmp = best_params[i]
indpos = samples[:,i] > tmp
indneg = samples[:,i] < tmp
val = samples[:,i][indpos] - tmp
errpos.append(np.percentile(val, [68.2])) #1 sigma (2x perc 34.1), positive pars
val = np.abs(samples[:,i][indneg] - tmp)
errneg.append(np.percentile(val, [68.2]))
self.best_params_errpos = np.asarray(errpos).squeeze()
self.best_params_errneg = np.asarray(errneg).squeeze()
best_fit_dict = np.array([p0_mean, best_params, self.best_params_errneg, self.best_params_errpos]).T
best_fit_dict = {key+'_'+self.mc_kind[i]: str(best_fit_dict[i].tolist())[1:-1] for i,key in enumerate(self.mc_header)}
self.best_fit_dict = best_fit_dict
Tools._break_line(init='\n')
print ('Median from parameter walkers for the last %d steps:\n'%nstats)
if found_termtables:
tt_header = ['Parameter', 'Best-fit value', 'error [-]', 'error [+]']
tt_data = np.array([self.mc_header, self.best_params, self.best_params_errneg, self.best_params_errpos]).T
termtables.print(
tt_data,
header=tt_header,
style=termtables.styles.markdown,
padding=(0, 1),
alignment="clll"
)
else:
print (list(zip(self.mc_header, best_params)))
Tools._break_line(init='\n', end='\n\n')
#************
#PLOTTING
#************
#for key in custom_header: self.mc_header[key] = custom_header[key]
#for key in custom_kund: self.mc_kind[key] = custom_kind[key]
if plot_walkers:
Mcmc.plot_walkers(sampler_chain.T, best_params, header=self.mc_header, kind=self.mc_kind, nstats=nstats, tag=tag)
if plot_corner:
Mcmc.plot_corner(samples, labels=self.mc_header)
plt.savefig('mc_corner_%s_%dwalkers_%dsteps.png'%(tag, nwalkers, nsteps))
plt.close()
[docs] @staticmethod
def orientation(incl=np.pi/4, PA=0.0, xc=0.0, yc=0.0):
xc = xc*sfu.au
yc = yc*sfu.au
return incl, PA, xc, yc
[docs] def get_projected_coords(self, z_mirror=False, R_inner=0, R_disc=None,
R_nan_val=0, phi_nan_val=10*np.pi, z_nan_val=0):
if self.prototype:
Tools._break_line()
print ('Computing disc upper and lower surface coordinates, projected on the sky plane...')
print ('Using height and orientation parameters from prototype model:\n')
pprint.pprint({key: self.params[key] for key in ['height_upper', 'height_lower', 'orientation']})
Tools._break_line(init='\n')
incl, PA, xc, yc = General2d.orientation(**self.params['orientation'])
cos_incl, sin_incl = np.cos(incl), np.sin(incl)
#*******************************************
#MAKE TRUE GRID FOR UPPER AND LOWER SURFACES
z_true = {}
z_true['upper'] = self.z_upper_func({'R': self.R_true, 'phi': self.phi_true}, **self.params['height_upper'])
if z_mirror: z_true['lower'] = -z_true['upper']
else: z_true['lower'] = self.z_lower_func({'R': self.R_true, 'phi': self.phi_true}, **self.params['height_lower'])
grid_true = {'upper': [self.x_true, self.y_true, z_true['upper'], self.R_true, self.phi_true],
'lower': [self.x_true, self.y_true, z_true['lower'], self.R_true, self.phi_true]}
#***********************************
#PROJECT PROPERTIES ON THE SKY PLANE
R, phi, z = {}, {}, {}
R_nonan, phi_nonan, z_nonan = {}, {}, {}
for side in ['upper', 'lower']:
xt, yt, zt = grid_true[side][:3]
x_pro, y_pro, z_pro = self._project_on_skyplane(xt, yt, zt, cos_incl, sin_incl)
if PA: x_pro, y_pro = self._rotate_sky_plane(x_pro, y_pro, PA)
x_pro = x_pro+xc
y_pro = y_pro+yc
R[side] = griddata((x_pro, y_pro), self.R_true, (self.mesh[0], self.mesh[1]), method='linear')
x_grid = griddata((x_pro, y_pro), xt, (self.mesh[0], self.mesh[1]), method='linear')
y_grid = griddata((x_pro, y_pro), yt, (self.mesh[0], self.mesh[1]), method='linear')
phi[side] = np.arctan2(y_grid, x_grid)
#Since this one is periodic it has to be recalculated, otherwise the interpolation will screw up things at the boundary -np.pi->np.pi
# When plotting contours there seems to be in any case some sort of interpolation, so there is still problems at the boundary
#phi[side] = griddata((x_pro, y_pro), self.phi_true, (self.mesh[0], self.mesh[1]), method='linear')
z[side] = griddata((x_pro, y_pro), z_true[side], (self.mesh[0], self.mesh[1]), method='linear')
#r[side] = hypot_func(R[side], z[side])
R_nonan[side] = copy.copy(R[side]) #Still nan out of the sky grid
phi_nonan[side] = copy.copy(phi[side])
z_nonan[side] = copy.copy(z[side])
if R_disc is not None:
for prop in [R, phi, z]: prop[side] = np.where(np.logical_and(R[side]<R_disc, R[side]>R_inner), prop[side], np.nan)
#R_nonan, phi_nonan, z_nonan = None, None, None
if R_nan_val is not None: R_nonan = {side: np.where(np.isnan(R[side]), R_nan_val, R[side]) for side in ['upper', 'lower']}
if phi_nan_val is not None: phi_nonan = {side: np.where(np.isnan(phi[side]), phi_nan_val, phi[side]) for side in ['upper', 'lower']}
if z_nan_val is not None: z_nonan = {side: np.where(np.isnan(z[side]), z_nan_val, z[side]) for side in ['upper', 'lower']}
return R, phi, z, R_nonan, phi_nonan, z_nonan
[docs] def make_model(self, z_mirror=False, R_inner=0, R_disc=None, verbose=True):
if self.prototype and verbose:
Tools._break_line()
print ('Running prototype model with the following parameters:\n')
pprint.pprint(self.params)
Tools._break_line(init='\n')
incl, PA, xc, yc = General2d.orientation(**self.params['orientation'])
int_kwargs = self.params['intensity']
vel_kwargs = self.params['velocity']
lw_kwargs = self.params['linewidth']
ls_kwargs = self.params['lineslope']
cos_incl, sin_incl = np.cos(incl), np.sin(incl)
#*******************************************
#MAKE TRUE GRID FOR UPPER AND LOWER SURFACES
z_true = self.z_upper_func({'R': self.R_true, 'phi': self.phi_true}, **self.params['height_upper'])
if z_mirror: z_true_far = -z_true
else: z_true_far = self.z_lower_func({'R': self.R_true, 'phi': self.phi_true}, **self.params['height_lower'])
if (self.velocity_func is Velocity.keplerian_vertical_selfgravity or
self.velocity_func is Velocity.keplerian_vertical_selfgravity_pressure):
z_1d = self.z_upper_func({'R': self.R_1d*sfu.au}, **self.params['height_upper'])/sfu.au
if z_mirror: z_far_1d = -z_1d
else: z_far_1d = self.z_lower_func({'R': self.R_1d*sfu.au}, **self.params['height_lower'])/sfu.au
else: z_1d = z_far_1d = None
grid_true = {'upper': [self.x_true, self.y_true, z_true, self.R_true, self.phi_true, self.R_1d, z_1d],
'lower': [self.x_true, self.y_true, z_true_far, self.R_true, self.phi_true, self.R_1d, z_far_1d]}
#*******************************
#COMPUTE PROPERTIES ON SKY GRID #This will no longer be necessary as all four functions will always be called
avai_kwargs = [vel_kwargs, int_kwargs, lw_kwargs, ls_kwargs]
avai_funcs = [self.velocity_func, self.intensity_func, self.linewidth_func, self.lineslope_func]
true_kwargs = [isinstance(kwarg, dict) for kwarg in avai_kwargs]
prop_kwargs = [kwarg for i, kwarg in enumerate(avai_kwargs) if true_kwargs[i]]
prop_funcs = [func for i, func in enumerate(avai_funcs) if true_kwargs[i]]
if self.subpixels:
subpix_vel = []
for i in range(self.subpixels):
for j in range(self.subpixels):
z_true = self.z_upper_func({'R': self.sub_R_true[i][j]}, **self.params['height_upper'])
if z_mirror: z_true_far = -z_true
else: z_true_far = self.z_lower_func({'R': self.sub_R_true[i][j]}, **self.params['height_lower'])
subpix_grid_true = {'upper': [self.sub_x_true[j], self.sub_y_true[i], z_true, self.sub_R_true[i][j], self.sub_phi_true[i][j]],
'lower': [self.sub_x_true[j], self.sub_y_true[i], z_true_far, self.sub_R_true[i][j], self.sub_phi_true[i][j]]}
subpix_vel.append(self._compute_prop(subpix_grid_true, [self.velocity_func], [vel_kwargs])[0])
ang_fac = sin_incl * np.cos(self.phi_true)
for i in range(self.subpixels_sq):
for side in ['upper', 'lower']:
subpix_vel[i][side] *= ang_fac
subpix_vel[i][side] += vel_kwargs['vsys']
props = self._compute_prop(grid_true, prop_funcs[1:], prop_kwargs[1:])
props.insert(0, subpix_vel)
else:
props = self._compute_prop(grid_true, prop_funcs, prop_kwargs)
if true_kwargs[0]: #Convention: positive vel (+) means gas receding from observer
phi_fac = sin_incl * np.cos(self.phi_true) #phi component
for side in ['upper', 'lower']:
if len(props[0][side])==3: #3D vel
v3d = props[0][side]
r_fac = sin_incl * np.sin(self.phi_true)
z_fac = cos_incl
props[0][side] = v3d[0]*phi_fac+v3d[1]*r_fac+v3d[2]*z_fac
else: #1D vel, assuming vphi only
props[0][side] *= phi_fac
props[0][side] += vel_kwargs['vsys']
#***********************************
#PROJECT PROPERTIES ON THE SKY PLANE
x_pro_dict = {}
y_pro_dict = {}
z_pro_dict = {}
for side in ['upper', 'lower']:
xt, yt, zt = grid_true[side][:3]
x_pro, y_pro, z_pro = self._project_on_skyplane(xt, yt, zt, cos_incl, sin_incl)
if PA: x_pro, y_pro = self._rotate_sky_plane(x_pro, y_pro, PA)
x_pro = x_pro+xc
y_pro = y_pro+yc
if R_disc is not None: R_grid = griddata((x_pro, y_pro), self.R_true, (self.mesh[0], self.mesh[1]), method='linear')
x_pro_dict[side] = x_pro
y_pro_dict[side] = y_pro
z_pro_dict[side] = z_pro
if self.subpixels:
for i in range(self.subpixels_sq): #Subpixels are projected on the same plane where true grid is projected
props[0][i][side] = griddata((x_pro, y_pro), props[0][i][side], (self.mesh[0], self.mesh[1]), method='linear') #subpixels velocity
for prop in props[1:]:
prop[side] = griddata((x_pro, y_pro), prop[side], (self.mesh[0], self.mesh[1]), method='linear')
if R_disc is not None: prop[side] = np.where(np.logical_and(R_grid<R_disc, R_grid>R_inner), prop[side], np.nan) #Todo: allow for R_in as well
else:
for prop in props:
if not isinstance(prop[side], numbers.Number): prop[side] = griddata((x_pro, y_pro), prop[side], (self.mesh[0], self.mesh[1]), method='linear')
if R_disc is not None: prop[side] = np.where(np.logical_and(R_grid<R_disc, R_grid>R_inner), prop[side], np.nan)
"""
grid_axes_3d = np.meshgrid(self.grid.XYZgrid[0], self.grid.XYZgrid[1], self.grid.XYZgrid[0])
from functools import reduce
x_full = np.array(reduce(np.append, list(x_pro_dict.values())))
y_full = np.array(reduce(np.append, list(y_pro_dict.values())))
z_full = np.array(reduce(np.append, list(z_pro_dict.values())))
z_grid = griddata((x_full, y_full, z_full), z_full, (grid_axes_3d[0], grid_axes_3d[1], grid_axes_3d[2]), method='linear')
"""
#*************************************
return props
[docs]class Rosenfeld2d(Velocity, Intensity, Linewidth, Tools):
"""
Host class for the Rosenfeld+2013 model which describes the velocity field of a flared disc in 2D.
This model assumes a (Keplerian) double cone to account for the near and far sides of the disc
and solves analytical equations to find the line-of-sight velocity v_obs projected on the sky-plane from both sides.
Parameters
----------
grid : array_like, shape (nrows, ncols)
(x', y') map of the sky-plane onto which the disc velocity field will be projected.
Attributes
----------
velocity_func : function(coord, **kwargs)
Velocity function describing the kinematics of the disc. The argument coord is a dictionary
of coordinates (e.g. 'x', 'y', 'z', 'r', 'R', 'theta', 'phi') where the function will be evaluated.
Additional arguments are optional and depend upon the function definition, e.g. Mstar=1.0*sfu.Msun
"""
def __init__(self, grid):
self.flags = {'disc': True, 'env': False}
self.grid = grid
self._velocity_func = Rosenfeld2d.keplerian
self._intensity_func = Rosenfeld2d.intensity_powerlaw
self._linewidth_func = Rosenfeld2d.linewidth_powerlaw
self._line_profile = General2d.line_profile_v_sigma
self._use_temperature = False
self._use_full_channel = False
def _get_t(self, A, B, C):
t = []
for i in range(self.grid.NPoints):
p = [A, B[i], C[i]]
t.append(np.sort(np.roots(p)))
return np.array(t)
[docs] def make_model(self, incl, psi, PA=0.0, int_kwargs={}, vel_kwargs={}, lw_kwargs=None, ls_kwargs=None):
"""
Executes the Rosenfeld+2013 model.
The sum of incl+psi must be < 90, otherwise the quadratic equation will have imaginary roots as some portions of the cone (which has finite extent)
do not intersect with the sky plane.
Parameters
----------
incl : scalar
Inclination of the disc midplane with respect to the x'y' plane; pi/2 radians is edge-on.
psi : scalar
Opening angle of the cone describing the velocity field of the gas emitting layer in the disc;
0 radians returns the projected velocity field of the disc midplane (i.e no conic emission).
PA : scalar, optional
Position angle in radians. Measured from North (+y) to East (-x).
Attributes
----------
velocity : array_like, size (n,)
Velocity field computed using the Rosenfeld+2013 model.
velocity2d : array_like, size (nx, ny)
If set get_2d=True: Velocity field computed using the Rosenfeld+2013 model, reshaped to 2D to facilitate plotting.
"""
if PA: x_plane, y_plane = Rosenfeld2d._rotate_sky_plane(self.grid.XYZ[0], self.grid.XYZ[1], -PA)
else: x_plane, y_plane = self.grid.XYZ[:2]
cos_incl = np.cos(incl)
sin_incl = np.sin(incl)
y_plane_cos_incl = y_plane/cos_incl
#**********************
#ROSENFELD COEFFICIENTS
fac = -2*np.sin(psi)**2
A = np.cos(2*incl) + np.cos(2*psi)
B = fac * 2*(sin_incl/cos_incl) * y_plane
C = fac * (x_plane**2 + (y_plane_cos_incl)**2)
t = self._get_t(A,B,C).T
#****************************
#ROSENFELD CONVERSION X<-->X'
x_true_near = x_plane
y_true_near = y_plane_cos_incl + t[1]*sin_incl
x_true_far = x_plane
y_true_far = y_plane_cos_incl + t[0]*sin_incl
#np.hypot 2x faster than np.linalg.norm([x,y], axis=0)
R_true_near = hypot_func(x_true_near, y_true_near)
R_true_far = hypot_func(x_true_far, y_true_far)
z_true_near = t[1] * cos_incl
z_true_far = t[0] * cos_incl
phi_true_near = np.arctan2(y_true_near, x_true_near)
phi_true_far = np.arctan2(y_true_far, x_true_far)
#****************************
grid_true = {'upper': [x_true_near, y_true_near, z_true_near, R_true_near, phi_true_near],
'lower': [x_true_far, y_true_far, z_true_far, R_true_far, phi_true_far]}
#*******************************
#COMPUTE PROPERTIES ON TRUE GRID
avai_kwargs = [vel_kwargs, int_kwargs, lw_kwargs, ls_kwargs]
avai_funcs = [self.velocity_func, self.intensity_func, self.linewidth_func, self.lineslope_func]
true_kwargs = [isinstance(kwarg, dict) for kwarg in avai_kwargs]
prop_kwargs = [kwarg for i, kwarg in enumerate(avai_kwargs) if true_kwargs[i]]
prop_funcs = [func for i, func in enumerate(avai_funcs) if true_kwargs[i]]
props = self._compute_prop(grid_true, prop_funcs, prop_kwargs)
#Positive vel is positive along z, i.e. pointing to the observer, for that reason imposed a (-) factor to convert to the standard convention: (+) receding
if true_kwargs[0]:
ang_fac_near = -sin_incl * np.cos(phi_true_near)
ang_fac_far = -sin_incl * np.cos(phi_true_far)
props[0]['upper'] *= ang_fac_near
props[0]['lower'] *= ang_fac_far
#*************************************
return [{side: prop[side].reshape(self.grid.Nodes[:2]) for side in ['upper', 'lower']} for prop in props]