###############################################################
# FARGOpy interdependencies
###############################################################
import fargopy
###############################################################
# Required packages
###############################################################
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from tqdm import tqdm
import fargopy as fp
[docs]
class Surface:
"""Analytic surface tessellation and helpers for flux and mass analysis.
The ``Surface`` class provides tools to define analytic control surfaces
(sphere, cylinder, plane) and tessellate them into small patches.
These surfaces are used to calculate physical quantities like mass flux
(accretion rates) and enclosed mass by integrating simulation fields.
Attributes
----------
type : str
Surface type ('sphere', 'cylinder', 'plane').
radius : float
Radius or characteristic dimension (e.g., hill radius factor).
centers : np.ndarray
(N, 3) array of patch centroids.
normals : np.ndarray
(N, 3) array of outward-facing unit normals for each patch.
areas : np.ndarray
(N,) array of patch areas.
Examples
--------
Define a spherical surface around a planet:
>>> surface = fp.Flux.Surface(type='sphere', radius=0.5, subdivisions=2)
Calculate mass flux through the surface:
>>> flux = surface.mass_flux(sim, field_density='gasdens', field_velocity='gasv')
"""
def __init__(self, type="sphere", radius=1.0, height=None, subdivisions=1,
center=(0.0, 0.0, 0.0), z_cut=None, x_axis=1, y_axis=0, z_axis=0,
width=None, length=None, coords='cartesian'):
"""Initialize a Surface instance.
Parameters
----------
type : str, optional
Surface type. One of ``'sphere'``, ``'cylinder'`` or ``'plane'``.
radius : float, optional
Radius for spheres or radial extent for planes/cylinders.
height : float, optional
Cylinder height; required when ``type=='cylinder'``.
subdivisions : int, optional
For ``'sphere'``: number of recursive icosahedron subdivisions.
For ``'cylinder'`` and ``'plane'``: number of divisions along
each logical axis used to build patches.
center : tuple of float, optional
Cartesian coordinates ``(x, y, z)`` of the surface center.
z_cut : float, optional
Optional z-plane clipping value. When provided, portions with
z < ``z_cut`` are removed for spherical tessellations.
x_axis, y_axis, z_axis : int, optional
Axis flags (0 or 1) that select the plane normal for
``type=='plane'``. Exactly one flag must be 1.
width, length : float, optional
Explicit span of the plane along the two in-plane axes. If
omitted, each defaults to ``2 * radius``.
coords : str, optional
Default coordinate system for surface geometry ('cartesian', 'cylindrical',
or 'spherical'). This affects the output of properties like
``centers_in_coords`` and ``normals_in_coords``. The tessellation is
always computed in Cartesian coordinates internally, but results can be
automatically converted to the specified system. Default is 'cartesian'.
"""
self.type = type
self.radius = radius
self.height = height
self.subdivisions = subdivisions
self.center = np.array(center)
self.z_cut = z_cut
self.x_axis = x_axis
self.y_axis = y_axis
self.z_axis = z_axis
# Plane dimensions: if not provided, fall back to diameter defined by radius
self.width = width if width is not None else 2.0 * self.radius
self.length = length if length is not None else 2.0 * self.radius
if self.width <= 0 or self.length <= 0:
raise ValueError("width and length must be positive numbers")
self.volume = None
# Coordinate system for output
self.coords = coords.lower()
if self.coords not in ['cartesian', 'cylindrical', 'spherical']:
raise ValueError(f"Invalid coords '{coords}'. Must be 'cartesian', 'cylindrical', or 'spherical'.")
# Attributes for tessellation (internal storage always in cartesian)
self._centers = None
self._normals = None
self.areas = None
self.triangles = None
self.num_triangles = 0
self.triangle_index = 0
if self.type == "sphere":
self.num_triangles = 20 * (4 ** self.subdivisions)
self.triangles = np.zeros((self.num_triangles, 3, 3))
self._centers = np.zeros((self.num_triangles, 3))
self.areas = np.zeros(self.num_triangles)
self._tessellate_sphere()
elif self.type == "cylinder":
self._tessellate_cylinder()
elif self.type == "plane":
self._tessellate_plane()
else:
raise ValueError("Unsupported surface type. Use 'sphere', 'cylinder', or 'plane'.")
def _tessellate_sphere(self):
"""Construct a spherical triangle tessellation.
The method builds an icosahedron and recursively subdivides its
faces to produce approximately uniform triangular patches on the
sphere of radius ``self.radius``. If ``self.z_cut`` is defined,
triangles crossing the plane ``z = z_cut`` are clipped so that
only the portion with ``z >= z_cut`` remains.
"""
phi = (1.0 + np.sqrt(5.0)) / 2.0
patterns = [
(-1, phi, 0), (1, phi, 0), (-1, -phi, 0), (1, -phi, 0),
(0, -1, phi), (0, 1, phi), (0, -1, -phi), (0, 1, -phi),
(phi, 0, -1), (phi, 0, 1), (-phi, 0, -1), (-phi, 0, 1),
]
vertices = np.array([Surface._normalize(np.array(p)) * self.radius for p in patterns])
faces = [
(0, 11, 5), (0, 5, 1), (0, 1, 7), (0, 7, 10), (0, 10, 11),
(1, 5, 9), (5, 11, 4), (11, 10, 2), (10, 7, 6), (7, 1, 8),
(3, 9, 4), (3, 4, 2), (3, 2, 6), (3, 6, 8), (3, 8, 9),
(4, 9, 5), (2, 4, 11), (6, 2, 10), (8, 6, 7), (9, 8, 1),
]
# Reset triangle index and arrays
self.num_triangles = 20 * (4 ** self.subdivisions)
self.triangle_index = 0
self.triangles = np.zeros((self.num_triangles, 3, 3))
self._centers = np.zeros((self.num_triangles, 3))
self.areas = np.zeros(self.num_triangles)
for face in faces:
v1, v2, v3 = vertices[face[0]], vertices[face[1]], vertices[face[2]]
self._subdivide_triangle(v1, v2, v3, self.subdivisions)
self._calculate_polygon_centers()
# If z_cut is provided, clip each triangle against the plane instead of
# simply filtering by centroid position.
if self.z_cut is not None:
def clip_triangle_with_plane(tri, z_plane):
# tri: (3,3) array
# returns list of triangles (each (3,3)) above or on the plane
verts = tri
z = verts[:, 2]
above = z >= z_plane
if np.all(above):
return [verts]
elif np.all(~above):
return []
# Identify configuration
idx_above = np.where(above)[0]
idx_below = np.where(~above)[0]
v = verts
tris = []
if len(idx_above) == 2 and len(idx_below) == 1:
# Two above, one below: split into two triangles
a, b = idx_above
c = idx_below[0]
va, vb, vc = v[a], v[b], v[c]
# Intersect edges ac and bc with plane
def interp(v1, v2):
dz = v2[2] - v1[2]
if dz == 0: return v1
t = (z_plane - v1[2]) / dz
return v1 + t * (v2 - v1)
vab = va
vbb = vb
vac = interp(va, vc)
vbc = interp(vb, vc)
# Triangle 1: va, vb, vbc
tris.append(np.array([va, vb, vbc]))
# Triangle 2: va, vbc, vac
tris.append(np.array([va, vbc, vac]))
elif len(idx_above) == 1 and len(idx_below) == 2:
# One above, two below: split into one triangle
a = idx_above[0]
b, c = idx_below
va, vb, vc = v[a], v[b], v[c]
# Intersect edges ab and ac with plane
def interp(v1, v2):
dz = v2[2] - v1[2]
if dz == 0: return v1
t = (z_plane - v1[2]) / dz
return v1 + t * (v2 - v1)
vab = interp(va, vb)
vac = interp(va, vc)
# Triangle: va, vab, vac
tris.append(np.array([va, vab, vac]))
return tris
new_triangles = []
for tri in self.triangles:
clipped = clip_triangle_with_plane(tri, self.z_cut)
new_triangles.extend(clipped)
self.triangles = np.array(new_triangles)
self.num_triangles = len(self.triangles)
self._centers = np.mean(self.triangles, axis=1)
self.areas = np.array([self._calculate_triangle_area(*tri) for tri in self.triangles])
self._calculate_normals()
else:
self._calculate_all_triangle_areas()
self._calculate_normals()
self.volume = self.areas * (self.radius / 3)
def _tessellate_cylinder(self):
"""Discretize a right circular cylinder into top, bottom and lateral patches.
The cylinder is split into a regular grid on the top and bottom
circular faces and into strips along the azimuth and height for
the lateral surface. The method sets per-patch centers, normals
and areas on attributes named ``top_centers``, ``bottom_centers``,
``lateral_centers`` and their corresponding ``_normals`` and
``_areas`` attributes.
"""
theta = np.linspace(0, 2 * np.pi, self.subdivisions, endpoint=False)
r = np.linspace(0, self.radius, self.subdivisions)
R, Theta = np.meshgrid(r, theta, indexing='ij')
X = R * np.cos(Theta) + self.center[0]
Y = R * np.sin(Theta) + self.center[1]
Z_top = np.full_like(X, self.center[2] + self.height / 2)
Z_bottom = np.full_like(X, self.center[2] - self.height / 2)
self.top_centers = np.stack([X.ravel(), Y.ravel(), Z_top.ravel()], axis=1)
self.bottom_centers = np.stack([X.ravel(), Y.ravel(), Z_bottom.ravel()], axis=1)
self.top_normals = np.tile([0, 0, 1], (self.top_centers.shape[0], 1))
self.bottom_normals = np.tile([0, 0, -1], (self.bottom_centers.shape[0], 1))
self.top_areas = (self.radius / self.subdivisions) ** 2 * np.pi
self.bottom_areas = self.top_areas
theta = np.linspace(0, 2 * np.pi, self.subdivisions, endpoint=False)
z = np.linspace(-self.height / 2, self.height / 2, self.subdivisions) + self.center[2] # Adjust z by center
Theta, Z = np.meshgrid(theta, z, indexing='ij')
X = self.radius * np.cos(Theta) + self.center[0]
Y = self.radius * np.sin(Theta) + self.center[1]
self.lateral_centers = np.stack([X.ravel(), Y.ravel(), Z.ravel()], axis=1)
self.lateral_normals = np.stack([np.cos(Theta).ravel(), np.sin(Theta).ravel(), np.zeros_like(Z).ravel()], axis=1)
self.lateral_areas = (2 * np.pi * self.radius / self.subdivisions) * (self.height / self.subdivisions)
def _tessellate_plane(self):
"""Discretize an axis-aligned plane into rectangular patches.
The plane is centered at ``self.center`` and aligned with the axis
selected by the triple ``(x_axis, y_axis, z_axis)``. The in-plane
spans are ``self.width`` and ``self.length`` which are split into
``self.subdivisions`` cells along each axis. The method sets
``self.centers``, ``self.normals``, ``self.areas`` and related
attributes for downstream use.
"""
axis_flags = np.array([self.x_axis, self.y_axis, self.z_axis], dtype=int)
if np.any((axis_flags != 0) & (axis_flags != 1)) or axis_flags.sum() != 1:
raise ValueError("Plane normal must align with exactly one axis using 0/1 flags.")
if self.subdivisions <= 0:
raise ValueError("subdivisions must be >= 1 for plane tessellation.")
normal_axis = int(np.argmax(axis_flags))
plane_axes = [idx for idx in range(3) if idx != normal_axis]
# lin = np.linspace(-self.radius, self.radius, self.subdivisions + 1)
# centers_axis = 0.5 * (lin[:-1] + lin[1:])
# grid_a, grid_b = np.meshgrid(centers_axis, centers_axis, indexing='ij')
# num_cells = self.subdivisions ** 2
# centers = np.zeros((num_cells, 3))
# centers[:, plane_axes[0]] = grid_a.ravel() + self.center[plane_axes[0]]
# centers[:, plane_axes[1]] = grid_b.ravel() + self.center[plane_axes[1]]
# centers[:, normal_axis] = self.center[normal_axis]
# self.centers = centers
# normal_vector = np.zeros(3)
# normal_vector[normal_axis] = 1.0
# self.normals = np.tile(normal_vector, (num_cells, 1))
# cell_edge = (2 * self.radius) / self.subdivisions
# self.areas = np.full(num_cells, cell_edge ** 2)
# self.num_triangles = num_cells
# self.triangles = None
# Use width and length (span) to build grid along the two in-plane axes
lin_a = np.linspace(-self.width/2.0, self.width/2.0, self.subdivisions + 1)
lin_b = np.linspace(-self.length/2.0, self.length/2.0, self.subdivisions + 1)
centers_a = 0.5 * (lin_a[:-1] + lin_a[1:])
centers_b = 0.5 * (lin_b[:-1] + lin_b[1:])
grid_a, grid_b = np.meshgrid(centers_a, centers_b, indexing='ij')
num_cells = self.subdivisions ** 2
centers = np.zeros((num_cells, 3))
centers[:, plane_axes[0]] = grid_a.ravel() + self.center[plane_axes[0]]
centers[:, plane_axes[1]] = grid_b.ravel() + self.center[plane_axes[1]]
centers[:, normal_axis] = self.center[normal_axis]
self._centers = centers
normal_vector = np.zeros(3)
normal_vector[normal_axis] = 1.0
self._normals = np.tile(normal_vector, (num_cells, 1))
cell_edge_a = (self.width) / self.subdivisions
cell_edge_b = (self.length) / self.subdivisions
self.areas = np.full(num_cells, cell_edge_a * cell_edge_b)
self.num_triangles = num_cells
self.triangles = None
@staticmethod
def _normalize(v):
"""Return a unit-length copy of the input vector.
Parameters
----------
v : array_like
Input vector.
Returns
-------
ndarray
Unit-normalized copy of ``v``.
"""
return v / np.linalg.norm(v)
def _subdivide_triangle(self, v1, v2, v3, depth):
"""Recursively subdivide a triangle and store leaf triangles.
Parameters
----------
v1, v2, v3 : array_like
Triangle vertices in Cartesian coordinates (on the unit sphere
before scaling by ``self.radius``).
depth : int
Number of remaining subdivision levels. When ``depth==0`` the
triangle is written into ``self.triangles``.
"""
if depth == 0:
self.triangles[self.triangle_index] = [v1 + self.center, v2 + self.center, v3 + self.center]
self.triangle_index += 1
return
v12 = Surface._normalize((v1 + v2) / 2) * self.radius
v23 = Surface._normalize((v2 + v3) / 2) * self.radius
v31 = Surface._normalize((v3 + v1) / 2) * self.radius
self._subdivide_triangle(v1, v12, v31, depth - 1)
self._subdivide_triangle(v12, v2, v23, depth - 1)
self._subdivide_triangle(v31, v23, v3, depth - 1)
self._subdivide_triangle(v12, v23, v31, depth - 1)
def _calculate_polygon_centers(self):
"""Compute and cache centroids for all stored triangles.
The computed centroids are assigned to ``self._centers``.
"""
self._centers = np.mean(self.triangles, axis=1)
@staticmethod
def _calculate_triangle_area(v1, v2, v3):
"""Compute the area of a triangle using the cross product.
Parameters
----------
v1, v2, v3 : array_like
Triangle vertex coordinates.
Returns
-------
float
Triangle area.
"""
side1 = v2 - v1
side2 = v3 - v1
cross_product = np.cross(side1, side2)
area = np.linalg.norm(cross_product) / 2
return area
def _calculate_all_triangle_areas(self):
"""Evaluate and cache areas for every triangle in ``self.triangles``.
This fills ``self.areas`` using :meth:`_calculate_triangle_area`.
"""
for i, (v1, v2, v3) in enumerate(self.triangles):
self.areas[i] = self._calculate_triangle_area(v1, v2, v3)
def _calculate_normals(self):
"""Compute outward-facing unit normals for each stored triangle.
The method enforces that each normal points away from ``self.center``;
if the computed normal points inward it is flipped.
Results are stored in ``self._normals``.
"""
self._normals = np.zeros((self.num_triangles, 3))
for i, tri in enumerate(self.triangles):
AB = tri[1] - tri[0]
AC = tri[2] - tri[0]
normal = np.cross(AB, AC)
normal /= np.linalg.norm(normal)
centroid = np.mean(tri, axis=0)
to_centroid = centroid - self.center
if np.dot(normal, to_centroid) < 0:
normal = -normal
self._normals[i] = normal
[docs]
def tessellate(self):
"""Recompute the tessellation from current instance parameters.
This method is a convenience wrapper that dispatches to the
appropriate internal tessellation implementation depending on
``self.type``.
"""
if self.type == "sphere":
self._tessellate_sphere()
elif self.type == "cylinder":
self._tessellate_cylinder()
elif self.type == "plane":
self._tessellate_plane()
else:
raise ValueError("Unsupported surface type. Use 'sphere', 'cylinder', or 'plane'.")
[docs]
def get_centers_in_coords(self, coords_system='cartesian'):
"""
Get surface centers in the specified coordinate system.
This method converts the tessellation centers from Cartesian coordinates
to the requested coordinate system. This is essential for fast interpolation
when the simulation data is stored in spherical or cylindrical coordinates,
as it allows using RegularGridInterpolator instead of slow unstructured methods.
Parameters
----------
coords_system : str, optional
Target coordinate system: 'cartesian', 'spherical', or 'cylindrical'.
Default is 'cartesian'.
Returns
-------
tuple of np.ndarray
For cartesian: (x, y, z)
For spherical: (phi, r, theta)
For cylindrical: (phi, r, z)
Each array has shape (n_points,) corresponding to surface centers.
Notes
-----
The coordinate transformations are:
- Spherical: phi = arctan2(y, x), r = sqrt(x² + y² + z²), theta = arccos(z/r)
- Cylindrical: phi = arctan2(y, x), r = sqrt(x² + y²), z = z
Examples
--------
>>> surface = Surface(type='sphere', radius=1.0, subdivisions=2)
>>> phi, r, theta = surface.get_centers_in_coords('spherical')
>>> # Use these for fast interpolation with spherical simulation data
"""
if self.type == "sphere":
centers = self._centers
elif self.type == "cylinder":
if not hasattr(self, 'top_centers'):
raise ValueError("Cylinder not tessellated. Call tessellate() first.")
centers = np.concatenate([self.top_centers, self.bottom_centers, self.lateral_centers], axis=0)
elif self.type == "plane":
if self._centers is None:
raise ValueError("Plane not tessellated. Call tessellate() first.")
centers = self._centers
else:
raise ValueError(f"Unsupported surface type: {self.type}")
x = centers[:, 0]
y = centers[:, 1]
z = centers[:, 2]
if coords_system == 'cartesian':
return x, y, z
elif coords_system == 'spherical':
r = np.sqrt(x**2 + y**2 + z**2)
# Avoid division by zero for points at origin
r_safe = np.where(r > 1e-15, r, 1e-15)
theta = np.arccos(np.clip(z / r_safe, -1, 1))
phi = np.arctan2(y, x)
return phi, r, theta
elif coords_system == 'cylindrical':
r_cyl = np.sqrt(x**2 + y**2)
phi = np.arctan2(y, x)
return phi, r_cyl, z
else:
raise ValueError(f"Unknown coordinate system: {coords_system}")
[docs]
def get_normals_in_coords(self, coords_system='cartesian'):
"""
Get surface normals in the specified coordinate system.
Transforms the unit normal vectors from Cartesian to the requested
coordinate system. This is useful for flux calculations when working
in native simulation coordinates.
Parameters
----------
coords_system : str, optional
Target coordinate system: 'cartesian', 'spherical', or 'cylindrical'.
Returns
-------
tuple of np.ndarray
For cartesian: (nx, ny, nz)
For spherical: (n_phi, n_r, n_theta)
For cylindrical: (n_phi, n_r, n_z)
Each array has shape (n_points,).
Notes
-----
The transformation uses Jacobian matrices for the coordinate change.
For spherical: [n_phi, n_r, n_theta]^T = J^T [nx, ny, nz]^T
For cylindrical: [n_phi, n_r, n_z]^T = J^T [nx, ny, nz]^T
"""
if self.type == "sphere":
normals = self._normals
centers = self._centers
elif self.type == "cylinder":
if not hasattr(self, 'top_normals'):
raise ValueError("Cylinder not tessellated. Call tessellate() first.")
normals = np.concatenate([self.top_normals, self.bottom_normals, self.lateral_normals], axis=0)
centers = np.concatenate([self.top_centers, self.bottom_centers, self.lateral_centers], axis=0)
elif self.type == "plane":
if self._normals is None:
raise ValueError("Plane not tessellated. Call tessellate() first.")
normals = self._normals
centers = self._centers
else:
raise ValueError(f"Unsupported surface type: {self.type}")
nx = normals[:, 0]
ny = normals[:, 1]
nz = normals[:, 2]
if coords_system == 'cartesian':
return nx, ny, nz
elif coords_system == 'spherical':
# Get position coordinates for Jacobian
x = centers[:, 0]
y = centers[:, 1]
z = centers[:, 2]
r = np.sqrt(x**2 + y**2 + z**2)
r_safe = np.where(r > 1e-15, r, 1e-15)
r_xy = np.sqrt(x**2 + y**2)
r_xy_safe = np.where(r_xy > 1e-15, r_xy, 1e-15)
# Transform: n_i = (∂x^j/∂q^i) n_j where q = (phi, r, theta)
# For spherical: x = r sin(theta) cos(phi), y = r sin(theta) sin(phi), z = r cos(theta)
sin_theta = r_xy / r_safe
cos_theta = z / r_safe
sin_phi = y / r_xy_safe
cos_phi = x / r_xy_safe
# Jacobian transpose (transforms contravariant vectors)
n_phi = -r_safe * sin_theta * sin_phi * nx + r_safe * sin_theta * cos_phi * ny
n_r = sin_theta * cos_phi * nx + sin_theta * sin_phi * ny + cos_theta * nz
n_theta = r_safe * cos_theta * cos_phi * nx + r_safe * cos_theta * sin_phi * ny - r_safe * sin_theta * nz
return n_phi, n_r, n_theta
elif coords_system == 'cylindrical':
# Get position coordinates
x = centers[:, 0]
y = centers[:, 1]
r_cyl = np.sqrt(x**2 + y**2)
r_cyl_safe = np.where(r_cyl > 1e-15, r_cyl, 1e-15)
sin_phi = y / r_cyl_safe
cos_phi = x / r_cyl_safe
# Transform for cylindrical
n_phi = -r_cyl_safe * sin_phi * nx + r_cyl_safe * cos_phi * ny
n_r = cos_phi * nx + sin_phi * ny
n_z = nz
return n_phi, n_r, n_z
else:
raise ValueError(f"Unknown coordinate system: {coords_system}")
@property
def centers(self):
"""
Get surface centers in the coordinate system specified by self.coords.
This property automatically returns coordinates in the system specified
during Surface initialization. For Cartesian, returns shape (N, 3) array.
For cylindrical/spherical, returns shape (N, 3) array with columns
representing (phi, r, z/theta).
Returns
-------
np.ndarray
Array of shape (N, 3) where N is the number of surface patches.
- If coords='cartesian': columns are (x, y, z)
- If coords='cylindrical': columns are (phi, r, z)
- If coords='spherical': columns are (phi, r, theta)
Examples
--------
>>> sphere = fp.flux.Surface(type='sphere', radius=1.0, subdivisions=2, coords='spherical')
>>> centers = sphere.centers # Returns (N, 3) array with (phi, r, theta) columns
>>> phi = centers[:, 0]
>>> r = centers[:, 1]
>>> theta = centers[:, 2]
Notes
-----
Internally, coordinates are always stored in Cartesian form. This property
performs the coordinate transformation on-the-fly based on self.coords.
"""
if self.coords == 'cartesian':
# Return internal storage directly for cartesian
if self.type == "sphere":
return self._centers
elif self.type == "cylinder":
return np.concatenate([self.top_centers, self.bottom_centers, self.lateral_centers], axis=0)
elif self.type == "plane":
return self._centers
else:
# Transform to requested coordinate system
return np.column_stack(self.get_centers_in_coords(self.coords))
@property
def normals(self):
"""
Get surface normals in the coordinate system specified by self.coords.
This property automatically returns normal vectors in the system specified
during Surface initialization. For Cartesian, returns shape (N, 3) array.
For cylindrical/spherical, returns shape (N, 3) array with columns
representing (n_phi, n_r, n_z/n_theta).
Returns
-------
np.ndarray
Array of shape (N, 3) where N is the number of surface patches.
- If coords='cartesian': columns are (nx, ny, nz)
- If coords='cylindrical': columns are (n_phi, n_r, n_z)
- If coords='spherical': columns are (n_phi, n_r, n_theta)
Examples
--------
>>> sphere = fp.flux.Surface(type='sphere', radius=1.0, coords='spherical')
>>> normals = sphere.normals # Returns (N, 3) array with (n_phi, n_r, n_theta) columns
>>> n_r = normals[:, 1] # Radial component (should be ~1 for sphere)
Notes
-----
Internally, normals are always stored in Cartesian form. This property
performs the coordinate transformation on-the-fly based on self.coords.
Normals are always unit vectors regardless of coordinate system.
"""
if self.coords == 'cartesian':
# Return internal storage directly for cartesian
if self.type == "sphere":
return self._normals
elif self.type == "cylinder":
return np.concatenate([self.top_normals, self.bottom_normals, self.lateral_normals], axis=0)
elif self.type == "plane":
return self._normals
else:
# Transform to requested coordinate system
return np.column_stack(self.get_normals_in_coords(self.coords))
@property
def centers_in_coords(self):
"""
Get surface centers in the default coordinate system specified by self.coords.
This property provides a convenient way to access surface centers without
manually calling get_centers_in_coords(). The coordinate system is determined
by the ``coords`` attribute set during initialization or modified later.
Returns
-------
tuple of np.ndarray
Coordinates in the system specified by self.coords:
- Cartesian: (x, y, z)
- Spherical: (phi, r, theta)
- Cylindrical: (phi, r, z)
Examples
--------
>>> surface = fp.Flux.Surface(type='sphere', radius=1.0, subdivisions=2, coords='spherical')
>>> phi, r, theta = surface.centers_in_coords
>>> # Use directly for interpolation
>>> rho = fields.evaluate(var1=phi, var2=r, var3=theta, field='gasdens')
Notes
-----
This is equivalent to calling get_centers_in_coords(self.coords) but more concise.
Change self.coords to switch the output coordinate system dynamically.
"""
return self.get_centers_in_coords(self.coords)
@property
def normals_in_coords(self):
"""
Get surface normals in the default coordinate system specified by self.coords.
This property provides a convenient way to access surface normals without
manually calling get_normals_in_coords(). The coordinate system is determined
by the ``coords`` attribute.
Returns
-------
tuple of np.ndarray
Normal vector components in the system specified by self.coords:
- Cartesian: (nx, ny, nz)
- Spherical: (n_phi, n_r, n_theta)
- Cylindrical: (n_phi, n_r, n_z)
Examples
--------
>>> surface = fp.Flux.Surface(type='sphere', radius=1.0, coords='cylindrical')
>>> n_phi, n_r, n_z = surface.normals_in_coords
Notes
-----
This is equivalent to calling get_normals_in_coords(self.coords) but more concise.
Normals are always unit vectors regardless of coordinate system.
"""
return self.get_normals_in_coords(self.coords)
[docs]
def generate_dataframe(self):
"""Return tessellation metadata as a pandas :class:`DataFrame`.
The returned DataFrame contains one row per surface patch with
columns ``'Center'``, ``'Normal'`` and ``'Area'``. Coordinates are
represented as Python lists to ensure JSON-serializable cell values.
"""
data = []
for i, (center, normal, area) in enumerate(zip(self.centers, self.normals, self.areas)):
data.append({
"Center": center.tolist(),
"Normal": normal.tolist(),
"Area": area
})
return pd.DataFrame(data)
[docs]
def total_mass_mtc(self, sim, field='gasdens', n_samples=10000, snapshot=[0,1],
interpolator='regular_grid', method='linear', cut_r=None,
follow_planet=True, planet_index=0, random_seed=None, coords=None):
"""Estimate enclosed mass using Monte Carlo sampling.
The method samples ``n_samples`` points uniformly within the
spherical region defined by this surface (accounting for ``z_cut``
when present), interpolates the requested density field at the
sample points and returns an estimate of the enclosed mass.
Parameters
----------
sim : object
Simulation object providing ``load_field`` and ``load_planets``
methods (FARGOpy simulation API).
field : str, optional
Density field name to sample (default ``'gasdens'``).
n_samples : int, optional
Number of Monte Carlo samples per snapshot.
snapshot : int or [start, end], optional
Snapshot index or inclusive snapshot range.
interpolator : str, optional
Interpolator backend passed to the field evaluator.
method : str, optional
Interpolation method passed to the field evaluator.
cut_r : float, optional
Radial cut passed to ``sim.load_field`` to limit data transfer.
follow_planet : bool, optional
If True follow the planet position when sampling (useful for
Hill-sphere based regions).
planet_index : int, optional
Index of the planet to follow when ``follow_planet`` is True.
random_seed : int, optional
Random seed for reproducibility of Monte Carlo sampling.
If None (default), results will vary between runs.
Set to a fixed integer for reproducible results.
coords : str, optional
Coordinate system to use for field loading and interpolation.
If None, uses ``self.coords``. Options are 'cartesian', 'cylindrical',
or 'spherical'. Using the simulation's native coordinate system
enables faster RegularGridInterpolator.
Returns
-------
float or numpy.ndarray
Estimated enclosed mass. Returns a single float if a single
snapshot is requested, otherwise an array of estimates.
"""
# Determine coordinate system to use
if coords is None:
coords = self.coords
coords = coords.lower()
if isinstance(snapshot, int):
times = [snapshot]
else:
times = np.linspace(snapshot[0], snapshot[1], snapshot[1]-snapshot[0]+1)
mass = np.zeros(len(times))
planet = sim.load_planets(snapshot=0)[planet_index]
factor = self.radius/planet.hill_radius
# Warn if n_samples is too low for accurate Monte Carlo estimates
if n_samples < 5000:
import warnings
warnings.warn(
f"Monte Carlo sampling with n_samples={n_samples} may produce noisy results. "
f"Consider increasing n_samples to 10000 or more for smoother estimates.",
UserWarning
)
# Set random seed for reproducibility if requested
if random_seed is not None:
np.random.seed(random_seed)
for i, t in enumerate(tqdm(times, desc="Calculating total mass (Monte Carlo)")):
# Update surface if following planet
if follow_planet:
planet = sim.load_planets(snapshot=int(t))[planet_index]
self.center = np.array([planet.pos.x, planet.pos.y, planet.pos.z])
if hasattr(planet, 'hill_radius'):
self.radius = factor * planet.hill_radius
self.tessellate()
# Generate random points inside the sphere (in spherical coordinates)
u = np.random.uniform(0, 1, int(n_samples))
v = np.random.uniform(0, 1, int(n_samples))
w = np.random.uniform(0, 1, int(n_samples))
r_sph = self.radius * np.cbrt(u)
theta_sph = np.arccos(1 - 2 * v)
phi_sph = 2 * np.pi * w
# Convert to Cartesian for geometry checks
x = r_sph * np.sin(theta_sph) * np.cos(phi_sph) + self.center[0]
y = r_sph * np.sin(theta_sph) * np.sin(phi_sph) + self.center[1]
z = r_sph * np.cos(theta_sph) + self.center[2]
# Apply z_cut if specified
if self.z_cut is not None:
mask = z > self.z_cut
x, y, z = x[mask], y[mask], z[mask]
r_sph, theta_sph, phi_sph = r_sph[mask], theta_sph[mask], phi_sph[mask]
n_effective = len(x)
h = self.radius + self.center[2] - self.z_cut
h = np.clip(h, 0, 2*self.radius)
volume = (1/3) * np.pi * h**2 * (3*self.radius - h)
else:
n_effective = int(n_samples)
volume = (4/3) * np.pi * self.radius**3
# Load field in the specified coordinate system for fast interpolation
if cut_r is not None:
field_interp = sim.load_field(
fields=[field],
snapshot=[int(t)],
cut=(self.center[0], self.center[1], self.center[2], cut_r),
coords=coords
)
else:
field_interp = sim.load_field(
fields=[field],
snapshot=[int(t)],
cut=(self.center[0], self.center[1], self.center[2], self.radius*2),
coords=coords
)
# Convert sample points to the specified coordinate system
if coords == 'spherical':
var1, var2, var3 = fp.transform_coords('cartesian', 'spherical', x, y, z)
elif coords == 'cylindrical':
var1, var2, var3 = fp.transform_coords('cartesian', 'cylindrical', x, y, z)
else:
# Cartesian
var1, var2, var3 = x, y, z
# Interpolate in native coordinates (uses fast RegularGridInterpolator)
rho = field_interp.evaluate(
time=t,
var1=var1,
var2=var2,
var3=var3,
interpolator=interpolator,
method=method
)
# Compute mass estimate
# Filter NaN values (points outside domain)
valid_rho = rho[~np.isnan(rho)]
if len(valid_rho) == 0:
# All points are outside domain - mass is zero
mass[i] = 0.0
else:
# Monte Carlo estimator: V * mean(rho)
# If some points are NaN, we need to account for the fraction that's valid
fraction_valid = len(valid_rho) / n_effective
avg_rho = np.mean(valid_rho)
mass[i] = avg_rho * volume * fraction_valid
if len(mass) == 1:
return mass[0]
return mass
[docs]
def mass_flux(self, sim, field_density='gasdens', field_velocity='gasv',
snapshot=[0, 1], interpolator='regular_grid', method='linear',
follow_planet=True, planet_index=0,
correct_normals=True, relative_velocity=False, coords=None):
"""Compute mass flux through the surface patches.
The instantaneous mass flux for each patch is computed as::
dΦ = ρ * (v_rel · n_out) * dA
and the returned value is the sum over all patches. Velocities can
optionally be converted to the planet rest frame by enabling
``relative_velocity``.
Parameters
----------
sim : object
Simulation object exposing ``load_field`` and ``load_planets``.
field_density : str, optional
Density field name (default ``'gasdens'``).
field_velocity : str, optional
Velocity field name (default ``'gasv'``).
snapshot : [start, end], optional
Inclusive snapshot range to evaluate.
interpolator, method : str, optional
Passed to the field evaluator for interpolation.
follow_planet : bool, optional
If True follow the planet position and scale the surface
(useful for Hill-sphere analysis).
planet_index : int, optional
Index of the planet to follow.
correct_normals : bool, optional
If True ensure per-patch normals point outward from the
surface center before computing the flux.
relative_velocity : bool, optional
If True subtract the planet velocity from the interpolated
fluid velocity prior to flux computation.
coords : str, optional
Coordinate system to use for field loading and interpolation.
If None, uses ``self.coords`` (default coordinate system set during
initialization). Options are 'cartesian', 'cylindrical', or 'spherical'.
Using the simulation's native coordinate system enables faster
RegularGridInterpolator instead of unstructured griddata.
Returns
-------
numpy.ndarray
Array of flux values, one per requested snapshot.
Examples
--------
Compute accretion rate (mass flux) onto a planet:
>>> surface = fp.Flux.Surface(type='sphere', radius=0.5, subdivisions=3, coords='spherical')
>>> mdot = surface.mass_flux(sim, field_density='gasdens', field_velocity='gasv', follow_planet=True)
>>> plt.plot(mdot)
"""
# Check if simulation is 3D (required for flux calculations)
if hasattr(sim, 'vars') and sim.vars.DIM == 2:
raise ValueError(
"mass_flux() requires a 3D simulation. "
"Your simulation is 2D. Flux calculations through 3D surfaces cannot be performed in 2D domains."
)
# Determine coordinate system to use
if coords is None:
coords = self.coords
coords = coords.lower()
# Validate coordinate system
if coords not in ['cartesian', 'cylindrical', 'spherical']:
raise ValueError(f"Invalid coords '{coords}'. Must be 'cartesian', 'cylindrical', or 'spherical'.")
steps = snapshot[1] - snapshot[0] + 1
times = np.linspace(snapshot[0], snapshot[1], steps)
flux = np.zeros(len(times))
# Initial planet for scaling
planet0 = sim.load_planets()[planet_index]
factor = self.radius / planet0.hill_radius
for i, t in enumerate(tqdm(times, desc="Calculating mass flux")):
# -------------------------------------------------------------
#Update the surface position and scale if following the planet
# -------------------------------------------------------------
if follow_planet:
planet = sim.load_planets(snapshot=int(t))[planet_index]
self.center = np.array([planet.pos.x, planet.pos.y, planet.pos.z])
if hasattr(planet, 'hill_radius'):
self.radius = factor * planet.hill_radius
self.tessellate()
elif relative_velocity:
# Need planet velocity but not position
planet = sim.load_planets(snapshot=int(t))[planet_index]
# Planet velocity (for relative velocities)
if relative_velocity:
vpx, vpy, vpz = planet.vel.x, planet.vel.y, planet.vel.z
else:
vpx, vpy, vpz = 0.0, 0.0, 0.0
# -------------------------------------------------------------
# Select geometric properties of the surface
# Always use internal Cartesian coordinates for geometry
# -------------------------------------------------------------
if self.type == "sphere":
centers = self._centers # Always use Cartesian for geometric calculations
normals = self._normals # Always use Cartesian for dot products
areas = self.areas
surface_cut = (self.center[0], self.center[1], self.center[2], 2*self.radius)
elif self.type == "cylinder":
centers = np.concatenate([self.top_centers,
self.bottom_centers,
self.lateral_centers], axis=0)
normals = np.concatenate([self.top_normals,
self.bottom_normals,
self.lateral_normals], axis=0)
areas = np.concatenate([
np.full(self.top_centers.shape[0], self.top_areas),
np.full(self.bottom_centers.shape[0], self.bottom_areas),
np.full(self.lateral_centers.shape[0], self.lateral_areas)
])
surface_cut = (self.center[0], self.center[1], self.center[2],
2*self.radius, 2*self.height)
elif self.type == "plane":
centers = self._centers # Always use Cartesian for geometric calculations
normals = self._normals # Always use Cartesian for dot products
areas = self.areas
surface_cut = (self.center[0], self.center[1], self.center[2], 2*self.radius)
else:
raise ValueError("Unsupported surface type.")
# -------------------------------------------------------------
# Load both fields simultaneously into a single DataFrame
# Use specified coordinate system for optimal interpolation
# -------------------------------------------------------------
fields = sim.load_field(
fields=[field_density, field_velocity],
snapshot=[int(t)],
cut=surface_cut,
coords=coords # Load in requested coords for fast RegularGridInterpolator
)
# Convert surface centers to the specified coordinate system
var1_eval, var2_eval, var3_eval = self.get_centers_in_coords(coords)
# -------------------------------------------------------------
# Interpolate density
# -------------------------------------------------------------
rho = fields.evaluate(
time=t,
var1=var1_eval,
var2=var2_eval,
var3=var3_eval,
interpolator=interpolator,
method=method,
field="gasdens"
)
# -------------------------------------------------------------
# Interpolate velocity vector
# -------------------------------------------------------------
vel = fields.evaluate(
time=t,
var1=var1_eval,
var2=var2_eval,
var3=var3_eval,
interpolator=interpolator,
method=method,
field="gasv"
)
# Shape fix (3,N → N,3)
if vel.ndim == 2 and vel.shape[0] == 3:
vel = vel.T
# -------------------------------------------------------------
# Convert velocity to Cartesian for dot product with normals
# Normals are always in Cartesian system
# -------------------------------------------------------------
if coords != 'cartesian':
# Use transform_velocity to convert from coords to cartesian
# vel is in the format expected by the coordinate system:
# - spherical: [v_phi, v_r, v_theta]
# - cylindrical: [v_phi, v_r, v_z]
# Get position in the original coordinate system for transformation
pos_coords = self.get_centers_in_coords(coords)
# Transform velocity components to Cartesian
if vel.shape[0] == len(centers):
# vel has shape (N, 3)
vel_tuple = (vel[:, 0], vel[:, 1], vel[:, 2])
pos_tuple = pos_coords
vx, vy, vz = fp.transform_velocity(coords, 'cartesian', vel_tuple, pos_tuple)
vel = np.column_stack([vx, vy, vz])
else:
raise ValueError(f"Unexpected velocity shape: {vel.shape}")
# -------------------------------------------------------------
# Ensure normals point outward
# -------------------------------------------------------------
if correct_normals:
to_centers = centers - self.center
flip = (np.einsum('ij,ij->i', normals, to_centers) < 0)
normals[flip] *= -1
# -------------------------------------------------------------
# Convert to velocity in planet's rest frame
# -------------------------------------------------------------
if relative_velocity:
vel[:, 0] -= vpx
vel[:, 1] -= vpy
vel[:, 2] -= vpz
# -------------------------------------------------------------
# Compute flux for each surface element
# -------------------------------------------------------------
v_dot_n = np.einsum('ij,ij->i', vel, normals)
dF = rho * v_dot_n * areas
flux[i] = np.nansum(dF)
return flux
[docs]
def total_mass(self,
sim,
field='gasdens',
snapshot=[0,1],
follow_planet=True,
planet_index=0,
return_resolution=False):
"""Compute enclosed mass by direct grid integration.
The method integrates the requested density field on the simulation
grid accounting for the region geometry defined by this Surface instance.
Supports both spherical and cylindrical coordinate systems.
``self.type`` must be either ``'sphere'`` or ``'cylinder'``; the
integration mask is constructed accordingly.
Parameters
----------
sim : object
Simulation object providing access to the raw grid and fields.
field : str, optional
Density field name (default ``'gasdens'``).
snapshot : int or [start, end], optional
Snapshot index or inclusive snapshot range to integrate.
follow_planet : bool, optional
If True update the integration center and radius from the
specified planet's position/hill radius.
planet_index : int, optional
Planet index to follow when ``follow_planet`` is True.
return_resolution : bool, optional
If True return detailed resolution metadata for each snapshot
alongside the integrated mass.
Returns
-------
float or numpy.ndarray or list
If ``return_resolution`` is False and a single snapshot is
requested, returns a float. If multiple snapshots are
requested, returns a numpy array of masses. If
``return_resolution`` is True a list of dictionaries with
per-snapshot metadata is returned.
Examples
--------
Compute total mass inside a Hill sphere:
>>> surface = fp.Flux.Surface(type='sphere', radius=1.0) # radius is factor of Hill radius
>>> mass = surface.total_mass(sim, field='gasdens', follow_planet=True)
"""
# --------------------
# Handle snapshot list
# --------------------
if isinstance(snapshot, int):
times = [snapshot]
else:
s0, s1 = snapshot
times = np.arange(s0, s1+1)
masses = []
resolutions = []
# ----------------------------
# Detect coordinate system from simulation
# ----------------------------
coords = sim.vars.COORDINATES if hasattr(sim, 'vars') else 'spherical'
# Check if simulation is 3D (required for volume integration)
if hasattr(sim, 'vars') and sim.vars.DIM == 2:
raise ValueError(
"total_mass() requires a 3D simulation. "
"Your simulation is 2D. Volume integration cannot be performed in 2D domains."
)
# ----------------------------
# Load grid info based on coordinate system
# ----------------------------
r_arr = sim.domains.r
ph_arr = sim.domains.phi
if coords == 'spherical':
th_arr = sim.domains.theta
TH, RR, PH = np.meshgrid(th_arr, r_arr, ph_arr, indexing='ij')
X = RR * np.sin(TH) * np.cos(PH)
Y = RR * np.sin(TH) * np.sin(PH)
Z = RR * np.cos(TH)
# Precompute cell volumes (spherical metric)
dr = np.diff(r_arr)
dth = np.diff(th_arr)
dph = np.diff(ph_arr)
dr_full = np.empty_like(r_arr); dr_full[:-1] = dr; dr_full[-1] = dr[-1]
dth_full = np.empty_like(th_arr); dth_full[:-1] = dth; dth_full[-1] = dth[-1]
dph_full = np.empty_like(ph_arr); dph_full[:-1] = dph; dph_full[-1] = dph[-1]
DR = dr_full[np.newaxis, :, np.newaxis]
DTH = dth_full[:, np.newaxis, np.newaxis]
DPH = dph_full[np.newaxis, np.newaxis, :]
dV = (RR**2) * np.sin(TH) * DR * DTH * DPH
elif coords == 'cylindrical':
z_arr = sim.domains.z
ZZ, RR, PH = np.meshgrid(z_arr, r_arr, ph_arr, indexing='ij')
X = RR * np.cos(PH)
Y = RR * np.sin(PH)
Z = ZZ
# Precompute cell volumes (cylindrical metric)
dr = np.diff(r_arr)
dz = np.diff(z_arr)
dph = np.diff(ph_arr)
dr_full = np.empty_like(r_arr); dr_full[:-1] = dr; dr_full[-1] = dr[-1]
dz_full = np.empty_like(z_arr); dz_full[:-1] = dz; dz_full[-1] = dz[-1]
dph_full = np.empty_like(ph_arr); dph_full[:-1] = dph; dph_full[-1] = dph[-1]
DR = dr_full[np.newaxis, :, np.newaxis]
DZ = dz_full[:, np.newaxis, np.newaxis]
DPH = dph_full[np.newaxis, np.newaxis, :]
dV = RR * DR * DZ * DPH
else:
raise ValueError(f"Unsupported coordinate system: {coords}")
# ----------------------------------------
# Detect geometry
# ----------------------------------------
geom = self.type.lower()
if geom not in ['sphere', 'cylinder']:
raise ValueError("Surface.type must be 'sphere' or 'cylinder'")
# Loop over snapshots
for t in tqdm(times, desc="Calculating total mass"):
# Follow planet
if follow_planet:
planet = sim.load_planets(snapshot=t)[planet_index]
xp, yp, zp = planet.pos.x, planet.pos.y, planet.pos.z
# Update center and radius according to Hill radius
factor = np.round(self.radius / sim.load_planets(snapshot=t)[planet_index].hill_radius,2)
self.center = (xp, yp, zp)
self.radius = factor * planet.hill_radius
else:
xp, yp, zp = self.center
Xc = X - xp
Yc = Y - yp
Zc = Z - zp
# ---------------------------------------
# Apply geometry mask
# ---------------------------------------
if geom == 'sphere':
Rlim = self.radius
mask = (Xc**2 + Yc**2 + Zc**2) <= Rlim**2
# If z_cut exists → semisphere
if hasattr(self, 'z_cut') and (self.z_cut is not None):
mask &= (Z >= self.z_cut)
Hlim = None
elif geom == 'cylinder':
Rlim = self.radius
Hlim = self.height # provided by Surface(type='cylinder', height=...)
Rcyl = np.sqrt(Xc**2 + Yc**2)
mask = (Rcyl <= Rlim) & (np.abs(Zc) <= Hlim) & (np.abs(Zc) >= -Hlim)
# ---------------------------------------
# Load density for this snapshot
# ---------------------------------------
rho = sim.load_field(field, snapshot=t).gasdens_mesh[0]
# Enclosed mass
M = np.sum(rho[mask] * dV[mask])
masses.append(M)
# Resolution info
if return_resolution:
idx_dim1, idx_r, idx_ph = np.where(mask)
dim1_name = 'N_theta' if coords == 'spherical' else 'N_z'
resolutions.append({
"snapshot": t,
"mass": M,
"geometry": geom,
"R_extent": Rlim,
"H_extent": Hlim,
dim1_name: len(np.unique(idx_dim1)),
"N_r": len(np.unique(idx_r)),
"N_phi": len(np.unique(idx_ph)),
"N_total": mask.sum()
})
# Return logic
if return_resolution:
return resolutions
if len(masses) == 1:
return masses[0]
return np.array(masses)