Source code for fargopy.fields

###############################################################
# FARGOpy interdependencies
###############################################################
import fargopy

###############################################################
# Required packages
###############################################################
import os
import numpy as np
import re
import re
import pandas as pd

import matplotlib.pyplot as plt
import plotly.figure_factory as ff
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from matplotlib.animation import FFMpegWriter
from scipy.interpolate import RBFInterpolator
from scipy.interpolate import interp1d
from scipy.interpolate import LinearNDInterpolator
from scipy.interpolate import RegularGridInterpolator
from scipy.spatial import cKDTree


from joblib import Parallel, delayed, parallel_config


from ipywidgets import interact, FloatSlider, IntSlider
from celluloid import Camera
from IPython.display import HTML, Video

from scipy.interpolate import griddata
from scipy.integrate import solve_ivp
from tqdm import tqdm
from pathlib import Path
import fargopy as fp
from scipy.ndimage import gaussian_filter

###############################################################
# Constants
###############################################################
# Map of coordinates into FARGO3D coordinates
# This dictionary maps the coordinates regular names (r, phi, theta, etc.) of
# different coordinate systems into the FARGO3D x, y, z

COORDS_MAP = dict(
    cartesian = dict(x='x',y='y',z='z'),
    cylindrical = dict(phi='x',r='y',z='z'),
    spherical = dict(phi='x',r='y',theta='z'),
)

###############################################################
# Classes
###############################################################
[docs] class Field(fargopy.Fargobj): """Represents a simulation field (scalar or vector) with coordinate system and domain information. The ``Field`` object encapsulates the data arrays and associated coordinate meshes for a specific simulation variable. It supports slicing, coordinate transformation and simple visualization helpers. Attributes ---------- data : np.ndarray Numpy array containing the physical data. coordinates : str Coordinate system type ('cartesian', 'cylindrical', 'spherical'). domains : object Object containing domain-specific geometry (e.g., mesh arrays). type : str Field type ('scalar' or 'vector'). Examples -------- Load a field from a simulation object (returns a Field instance if interpolation is disabled or a FieldInterpolator otherwise): >>> fp.Simulation.load_field(fields='gasdens', snapshot=0, interpolate=False) Access data and mesh: >>> rho = field.data >>> xmesh = field.mesh.x """ def __init__(self,data=None,coordinates='cartesian',domains=None,type='scalar',**kwargs): """ Initialize a Field object. Parameters ---------- data : np.ndarray, optional Field data array. coordinates : str, optional Coordinate system ('cartesian', 'cylindrical', 'spherical'). domains : object, optional Domain information for each coordinate. type : str, optional Field type ('scalar' or 'vector'). **kwargs : dict Additional keyword arguments. """ super().__init__(**kwargs) self.data = data self.coordinates = coordinates self.domains = domains self.type = type
[docs] def meshslice(self,slice=None,component=0,verbose=False): """Perform a slice on a field and produce the corresponding field slice and associated coordinate matrices for plotting. Parameters ---------- slice : str Slice definition string (e.g., 'z=0'). component : int, optional Component index for vector fields (default: 0). verbose : bool, optional If True, print debug information. Returns ------- tuple (sliced field, mesh dictionary with coordinates). The mesh dictionary contains coordinate arrays (x, y, z, r, phi, theta) corresponding to the slice. Examples -------- Slice a field at z=0: >>> field_slice, mesh = field.meshslice(slice='z=0') Plot the slice: >>> plt.pcolormesh(mesh.x, mesh.y, field_slice) """ # Analysis of the slice if slice is None: raise ValueError("You must provide a slice option.") # Degrees specification slice = slice.replace('deg','*fargopy.DEG') # Perform the slice slice_cmd = f"self._slice({slice},pattern=True,verbose={verbose})" slice,pattern = eval(slice_cmd) # Create the mesh if self.coordinates == 'cartesian': z,y,x = np.meshgrid(self.domains.z,self.domains.y,self.domains.x,indexing='ij') x = eval(f"x[{pattern}]") y = eval(f"y[{pattern}]") z = eval(f"z[{pattern}]") mesh = fargopy.Dictobj(dict=dict(x=x,y=y,z=z)) if self.coordinates == 'cylindrical': z,r,phi = np.meshgrid(self.domains.z,self.domains.r,self.domains.phi,indexing='ij') x,y,z = r*np.cos(phi),r*np.sin(phi),z x = eval(f"x[{pattern}]") y = eval(f"y[{pattern}]") z = eval(f"z[{pattern}]") r = eval(f"r[{pattern}]") phi = eval(f"phi[{pattern}]") mesh = fargopy.Dictobj(dict=dict(r=r,phi=phi,x=x,y=y,z=z)) if self.coordinates == 'spherical': theta,r,phi = np.meshgrid(self.domains.theta,self.domains.r,self.domains.phi,indexing='ij') x,y,z = r*np.sin(theta)*np.cos(phi),r*np.sin(theta)*np.sin(phi),r*np.cos(theta) x = eval(f"x[{pattern}]") y = eval(f"y[{pattern}]") z = eval(f"z[{pattern}]") r = eval(f"r[{pattern}]") phi = eval(f"phi[{pattern}]") theta = eval(f"theta[{pattern}]") mesh = fargopy.Dictobj(dict=dict(r=r,phi=phi,theta=theta,x=x,y=y,z=z)) return slice,mesh
def _slice(self,verbose=False,pattern=False,**kwargs): """ Extract a slice of a 3-dimensional FARGO3D field. Parameters ---------- verbose : bool, optional If True, print debug information. pattern : bool, optional If True, return the pattern of the slice (e.g., [:,:,:]). ir, iphi, itheta, ix, iy, iz : int or str, optional Index or range of indexes for the corresponding coordinate. r, phi, theta, x, y, z : float, list, or tuple, optional Value or range for slicing. The closest value in the domain is used. Returns ------- np.ndarray or tuple Sliced field, and optionally the pattern string if pattern=True. Examples -------- # 0D: Get the value of the field at iphi=0, itheta=-1, and close to r=0.82 >>> gasvz.slice(iphi=0, itheta=-1, r=0.82) # 1D: Get all values in radial direction at iphi=0, itheta=-1 >>> gasvz.slice(iphi=0, itheta=-1) # 2D: Get all values for values close to phi=0 >>> gasvz.slice(phi=0) """ # By default slice ivar = dict(x=':',y=':',z=':') if len(kwargs.keys()) == 0: pattern_str = f"{ivar['z']},{ivar['y']},{ivar['x']}" if pattern: return self.data, pattern_str return self.data # Check all conditions for key,item in kwargs.items(): match = re.match('^i(.+)',key) if match: index = item coord = match.group(1) if verbose: print(f"Index condition {index} for coordinate {coord}") ivar[COORDS_MAP[self.coordinates][coord]] = index else: if verbose: print(f"Numeric condition found for coordinate {key}") if key in self.domains.keys(): # Check if item is a list if isinstance(item,list) or isinstance(item,tuple): if verbose: print(f"You pass the range '{item}' for coordinate {key}") min = abs(self.domains.item(key)-item[0]).argmin() max = abs(self.domains.item(key)-item[1]).argmin() if (min > max) or (min == max): extrema = self.domains.extrema[key] vmin, vmax = extrema[0][1], extrema[1][1] raise ValueError(f"The range provided for '{key}', ie. '{item}' is not valid. You must provide a valid range for the variable with range: [{vmin},{vmax}]") ivar[COORDS_MAP[self.coordinates][key]] = f"{min}:{max}" else: # Check if value provided is in range domain = self.domains.item(key) extrema = self.domains.extrema[key] min, max = extrema[0][1], extrema[1][1] if (item<min) or (item>max): raise ValueError(f"You are attempting to get a slice in {key} = {item}, but the valid range for this variable is [{min},{max}]") find = abs(self.domains.item(key) - item) ivar[COORDS_MAP[self.coordinates][key]] = find.argmin() if verbose: print(f"Range for {key}: {ivar[COORDS_MAP[self.coordinates][key]]}") pattern_str = f"{ivar['z']},{ivar['y']},{ivar['x']}" if self.type == 'scalar': slice_cmd = f"self.data[{pattern_str}]" if verbose: print(f"Slice: {slice_cmd}") slice = eval(slice_cmd) elif self.type == 'vector': # Handle variable number of components (2 for 2D, 3 for 3D) ncomponents = self.data.shape[0] slice_components = [] for i in range(ncomponents): component_cmd = f"self.data[{i},{pattern_str}]" slice_components.append(eval(component_cmd, {"self": self, "np": np})) slice = np.array(slice_components) if pattern: return slice,pattern_str return slice
[docs] def to_cartesian(self): """ Convert the field to cartesian coordinates. Returns ------- Field or tuple of Field The field in cartesian coordinates. For scalar fields, returns the field itself. For vector fields, returns a tuple (vx, vy, vz). Examples -------- >>> v = sim.load_field('gasv', snapshot=0) >>> vx, vy, vz = v.to_cartesian() """ if self.type == 'scalar': # Scalar fields are invariant under coordinate transformations return self elif self.type == 'vector': # Vector fields must be transformed according to domain if self.coordinates == 'cartesian': return self if self.coordinates == 'cylindrical': z,r,phi = np.meshgrid(self.domains.z,self.domains.r,self.domains.phi,indexing='ij') vphi = self.data[0] vr = self.data[1] if self.data.shape[0] == 3: vz = self.data[2] else: vz = np.zeros_like(vr) vx = vr*np.cos(phi) vy = vr*np.sin(phi) return (Field(vx,coordinates=self.coordinates,domains=self.domains,type='scalar'), Field(vy,coordinates=self.coordinates,domains=self.domains,type='scalar'), Field(vz,coordinates=self.coordinates,domains=self.domains,type='scalar')) if self.coordinates == 'spherical': theta,r,phi = np.meshgrid(self.domains.theta,self.domains.r,self.domains.phi,indexing='ij') vphi = self.data[0] vr = self.data[1] vtheta = self.data[2] vx = vr*np.sin(theta)*np.cos(phi) + vtheta*np.cos(theta)*np.cos(phi) - vphi*np.sin(phi) vy = vr*np.sin(theta)*np.sin(phi) + vtheta*np.cos(theta)*np.sin(phi) + vphi*np.cos(phi) vz = vr*np.cos(theta) - vtheta*np.sin(theta) return (Field(vx,coordinates=self.coordinates,domains=self.domains,type='scalar'), Field(vy,coordinates=self.coordinates,domains=self.domains,type='scalar'), Field(vz,coordinates=self.coordinates,domains=self.domains,type='scalar'))
[docs] def get_size(self): """ Return the size of the field data in megabytes (MB). Returns ------- float Size in MB. """ return self.data.nbytes/1024**2
def __str__(self): """ String representation of the field data. Returns ------- str """ return str(self.data) def __repr__(self): """ String representation of the field data. Returns ------- str """ return str(self.data)
# ############################################################### # FieldInterpolator # ############################################################### # This class is used to load and interpolate fields from a FARGO3D simulation. # It provides methods to load data, create meshes, and perform interpolation. # It also handles 2D and 3D data loading based on the provided parameters. #################################################################
[docs] class FieldInterpolator(fargopy.Fargobj): """Loads and interpolates fields from a FARGO3D simulation. The ``FieldInterpolator`` facilitates loading, slicing, and interpolating simulation data across multiple snapshots and fields. It handles coordinate transformations and dimensionality reduction based on slice definitions. Attributesde ---------- sim : Simulation The simulation object. df : pd.DataFrame or None DataFrame containing loaded field data organized by snapshot and time. snapshot_time_table : pd.DataFrame or None Table mapping snapshots to normalized time. snapshot : list or None List of loaded snapshots. slice : str or None Slice definition string used to load the data. dim : int or None Dimensionality of the loaded data (e.g., 2 for a 2D slice). Examples -------- Load multiple fields from a snapshot with interpolation enabled: >>> data = sim.load_field(fields=['gasdens', 'gasv'], snapshot=4) Load a specific slice: >>> dens = sim.load_field(fields='gasdens', slice='r=[0.8,1.2],phi=[-25 deg,25 deg],theta=1.56', snapshot=4) """ def __init__(self, sim, df=None): """ Initialize a FieldInterpolator. Parameters ---------- sim : Simulation The simulation object. df : pd.DataFrame, optional DataFrame with preloaded field data. """ self.sim = sim self.snapshot_time_table = None self.snapshot = None self.slice = None self.dim=None self.df = df self._domain_limits = None self._df_sorted_cache = None self._slice_type = None self._slice_ranges = None self._is_regular_grid = None self._grid_axes = None self._axis_order = None self._original_coords = None def _reset_caches(self): """Clear cached dataframe and slice metadata before loading or evaluating new data.""" self._df_sorted_cache = None self._slice_type = None self._slice_ranges = None def _cache_domain_limits(self): """Cache domain extrema for r, theta, phi, and z to avoid repeated property access.""" if self._domain_limits is not None: return dom = self.sim.domains coords = self.sim.vars.COORDINATES if coords == 'spherical': self._domain_limits = dict( r=(dom.r.min(), dom.r.max()), theta=(dom.theta.min(), dom.theta.max()), phi=(dom.phi.min(), dom.phi.max()), z=(None, None) ) elif coords == 'cylindrical': self._domain_limits = dict( r=(dom.r.min(), dom.r.max()), z=(dom.z.min(), dom.z.max()), phi=(dom.phi.min(), dom.phi.max()), theta=(None, None) ) else: # cartesian self._domain_limits = dict( x=(dom.x.min(), dom.x.max()), y=(dom.y.min(), dom.y.max()), z=(dom.z.min(), dom.z.max()), r=(None, None), theta=(None, None), phi=(None, None) ) def _detect_slice_type(self, slice_str): """Return the canonical slice type ('theta', 'phi', 'r', 'z', or None) inferred from the user string. For spherical coordinates: - theta=constant -> XY plane slice - phi=constant -> XZ plane slice - r=constant -> spherical shell For cylindrical coordinates: - z=constant -> XY plane slice (horizontal) - phi=constant -> RZ plane slice (vertical) - r=constant -> cylindrical surface """ if not slice_str: return None txt = slice_str.replace(" ", "").lower() # Detect coordinate system coords = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' if coords == 'cylindrical': # For cylindrical coordinates, check for z, phi, r m_z = re.search(r"z=([^\[\],]+)(?![\]])", txt) m_phi = re.search(r"phi=([^\[\],]+)(?![\]])", txt) m_r = re.search(r"r=([^\[\],]+)(?![\]])", txt) # If both phi and r are fixed, we have a line along z if m_phi and not re.search(r"phi=\[", txt) and m_r and not re.search(r"r=\[", txt): return "z" # If z is fixed, we have XY plane (r-phi slice) if m_z and not re.search(r"z=\[", txt): return "z" # If phi is fixed, we have RZ plane if m_phi and not re.search(r"phi=\[", txt): return "phi" # If r is fixed, we have cylindrical surface if m_r and not re.search(r"r=\[", txt): return "r" elif coords == 'spherical': # Original spherical coordinate logic m_theta = re.search(r"theta=([^\[\],]+)(?![\]])", txt) m_phi = re.search(r"phi=([^\[\],]+)(?![\]])", txt) if m_theta and not re.search(r"theta=\[", txt) and m_phi and not re.search(r"phi=\[", txt): return "r" if m_theta and not re.search(r"theta=\[", txt): return "theta" if m_phi and not re.search(r"phi=\[", txt): return "phi" return None def _parse_slice_ranges(self, slice_str): """Parse the slice expression into numeric (r, theta, phi, z) bounds expressed in radians when needed.""" ranges = {"r": None, "theta": None, "phi": None, "z": None} if not slice_str: return ranges txt = slice_str.lower() def _to_float(value): value = value.strip() match = re.match(r"(-?\d+(?:\.\d+)?)\s*deg", value) return np.deg2rad(float(match.group(1))) if match else float(value) range_pattern = re.compile(r"(r|theta|phi|z)=\[(.+?)\]") value_pattern = re.compile(r"(r|theta|phi|z)=([^\[\],]+)") for key, vals in range_pattern.findall(txt): lo, hi = [_to_float(v) for v in vals.split(",")] ranges[key] = (min(lo, hi), max(lo, hi)) for key, val in value_pattern.findall(txt): if ranges.get(key) is None: parsed = _to_float(val) ranges[key] = (parsed, parsed) return ranges def _get_sorted_dataframe(self, dataframe): """Return the dataframe sorted by normalized time, reusing a cached copy when possible.""" if self._df_sorted_cache and self._df_sorted_cache[0] is dataframe: return self._df_sorted_cache[1] df_sorted = dataframe.sort_values("time") self._df_sorted_cache = (dataframe, df_sorted) return df_sorted def __getattr__(self, name): """ Delegate attribute access to the internal DataFrame if present. Parameters ---------- name : str Attribute name. Returns ------- object Attribute from the DataFrame if available. Raises ------ AttributeError If the attribute is not found. """ df = object.__getattribute__(self, 'df') if df is not None and hasattr(df, name): return getattr(df, name) raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def _run_parallel(self, tasks, backend='threading'): tasks = list(tasks) if not tasks: return [] with parallel_config(n_jobs=-1, prefer='threads'): return Parallel(backend=backend)(tasks) def _compute_cut_indices(self, cut, coords_system='spherical'): """ Calculate structured grid indices for a geometric cut (sphere or cylinder). Instead of using boolean masks that destroy grid structure, this method computes the bounding box of the cut in spherical/cylindrical coordinates and returns index ranges that preserve the regular grid structure. Parameters ---------- cut : tuple For sphere: (xc, yc, zc, rs) - center coordinates and radius For cylinder: (xc, yc, zc, rc, hc) - center, radius, height coords_system : str 'spherical' or 'cylindrical' Returns ------- dict Dictionary with keys 'r_slice', 'phi_slice', 'theta_slice' (or 'z_slice') containing slice objects or None if no cut should be applied. Also includes 'cut_mask' for fine-grained filtering if needed. """ if cut is None: return None dom = self.sim.domains # Get coordinate arrays if coords_system == 'spherical': r_arr = dom.r phi_arr = dom.phi theta_arr = dom.theta # Build full 3D mesh to detect affected regions theta_mesh, r_mesh, phi_mesh = np.meshgrid( theta_arr, r_arr, phi_arr, indexing='ij' ) x = r_mesh * np.sin(theta_mesh) * np.cos(phi_mesh) y = r_mesh * np.sin(theta_mesh) * np.sin(phi_mesh) z = r_mesh * np.cos(theta_mesh) elif coords_system == 'cylindrical': r_arr = dom.r phi_arr = dom.phi z_arr = dom.z # Build full 3D mesh z_mesh, r_mesh, phi_mesh = np.meshgrid( z_arr, r_arr, phi_arr, indexing='ij' ) x = r_mesh * np.cos(phi_mesh) y = r_mesh * np.sin(phi_mesh) z = z_mesh else: raise ValueError(f"Unsupported coordinate system: {coords_system}") # Compute mask based on cut geometry if len(cut) == 5: # Cylinder xc, yc, zc, rc, hc = cut r_xy = np.sqrt((x - xc)**2 + (y - yc)**2) zmin, zmax = zc - hc/2, zc + hc/2 mask_3d = (r_xy <= rc) & (z >= zmin) & (z <= zmax) elif len(cut) == 4: # Sphere xc, yc, zc, rs = cut r_sph = np.sqrt((x - xc)**2 + (y - yc)**2 + (z - zc)**2) mask_3d = r_sph <= rs else: raise ValueError("cut must have 4 (sphere) or 5 (cylinder) elements.") # Find bounding indices in each dimension where mask is True # This preserves grid structure by using index ranges instead of boolean masks if coords_system == 'spherical': # Check which indices have any True values along other dimensions mask_theta = np.any(mask_3d, axis=(1, 2)) # Any True along r, phi mask_r = np.any(mask_3d, axis=(0, 2)) # Any True along theta, phi mask_phi = np.any(mask_3d, axis=(0, 1)) # Any True along theta, r # Get index ranges theta_idx = np.where(mask_theta)[0] r_idx = np.where(mask_r)[0] phi_idx = np.where(mask_phi)[0] if len(theta_idx) == 0 or len(r_idx) == 0 or len(phi_idx) == 0: # Empty cut - return None return None # Create slices with some padding to ensure we capture the cut region # Use slice objects to maintain structure result = { 'theta_slice': slice(theta_idx[0], theta_idx[-1] + 1), 'r_slice': slice(r_idx[0], r_idx[-1] + 1), 'phi_slice': slice(phi_idx[0], phi_idx[-1] + 1), 'cut_mask': mask_3d, # Store full mask for precise filtering if needed 'coords_system': 'spherical' } elif coords_system == 'cylindrical': # Check which indices have any True values mask_z = np.any(mask_3d, axis=(1, 2)) mask_r = np.any(mask_3d, axis=(0, 2)) mask_phi = np.any(mask_3d, axis=(0, 1)) z_idx = np.where(mask_z)[0] r_idx = np.where(mask_r)[0] phi_idx = np.where(mask_phi)[0] if len(z_idx) == 0 or len(r_idx) == 0 or len(phi_idx) == 0: return None result = { 'z_slice': slice(z_idx[0], z_idx[-1] + 1), 'r_slice': slice(r_idx[0], r_idx[-1] + 1), 'phi_slice': slice(phi_idx[0], phi_idx[-1] + 1), 'cut_mask': mask_3d, 'coords_system': 'cylindrical' } return result def _detect_regular_grid(self, var1_mesh, var2_mesh, var3_mesh): """ Detect if 3D mesh data represents a regular grid and extract coordinate axes. A regular grid has uniform spacing along each dimension and can be represented as a Cartesian product of 1D axes (meshgrid). This enables fast interpolation with RegularGridInterpolator. Parameters ---------- var1_mesh, var2_mesh, var3_mesh : np.ndarray 3D coordinate meshes (shape must match). Returns ------- is_regular : bool True if the grid is regular with strictly monotonic axes. axes : tuple of np.ndarray or None Tuple (var1_axis, var2_axis, var3_axis) if regular, else None. axis_order : tuple of int or None Order indicating which mesh dimension varies along each coordinate. For example, (0, 1, 2) means var1 varies along dim 0, etc. """ try: # Check if arrays are 3D if var1_mesh.ndim != 3 or var2_mesh.ndim != 3 or var3_mesh.ndim != 3: return False, None, None if var1_mesh.shape != var2_mesh.shape or var1_mesh.shape != var3_mesh.shape: return False, None, None shape = var1_mesh.shape # Detect which dimension each coordinate varies along # Check variation using peak-to-peak (ptp) along slices var1_vars = [ np.ptp(var1_mesh[:, 0, 0]), np.ptp(var1_mesh[0, :, 0]), np.ptp(var1_mesh[0, 0, :]) ] var2_vars = [ np.ptp(var2_mesh[:, 0, 0]), np.ptp(var2_mesh[0, :, 0]), np.ptp(var2_mesh[0, 0, :]) ] var3_vars = [ np.ptp(var3_mesh[:, 0, 0]), np.ptp(var3_mesh[0, :, 0]), np.ptp(var3_mesh[0, 0, :]) ] var1_dim = np.argmax(var1_vars) var2_dim = np.argmax(var2_vars) var3_dim = np.argmax(var3_vars) # Each coordinate must vary along a different dimension if len(set([var1_dim, var2_dim, var3_dim])) != 3: return False, None, None # Extract 1D axes if var1_dim == 0: var1_axis = var1_mesh[:, 0, 0] elif var1_dim == 1: var1_axis = var1_mesh[0, :, 0] else: var1_axis = var1_mesh[0, 0, :] if var2_dim == 0: var2_axis = var2_mesh[:, 0, 0] elif var2_dim == 1: var2_axis = var2_mesh[0, :, 0] else: var2_axis = var2_mesh[0, 0, :] if var3_dim == 0: var3_axis = var3_mesh[:, 0, 0] elif var3_dim == 1: var3_axis = var3_mesh[0, :, 0] else: var3_axis = var3_mesh[0, 0, :] # Check monotonicity (strictly increasing or decreasing) def is_strictly_monotonic(arr): diff = np.diff(arr) return np.all(diff > 0) or np.all(diff < 0) if not (is_strictly_monotonic(var1_axis) and is_strictly_monotonic(var2_axis) and is_strictly_monotonic(var3_axis)): return False, None, None # Verify grid structure: reconstruct mesh and check it matches axes_ordered = [None, None, None] axes_ordered[var1_dim] = var1_axis axes_ordered[var2_dim] = var2_axis axes_ordered[var3_dim] = var3_axis # Create test meshgrid test_mesh = np.meshgrid(axes_ordered[0], axes_ordered[1], axes_ordered[2], indexing='ij') # Map back to check test_var1 = test_mesh[var1_dim] test_var2 = test_mesh[var2_dim] test_var3 = test_mesh[var3_dim] # Check if test meshes match original (within tolerance) tol = 1e-10 if not (np.allclose(test_var1, var1_mesh, rtol=tol, atol=tol) and np.allclose(test_var2, var2_mesh, rtol=tol, atol=tol) and np.allclose(test_var3, var3_mesh, rtol=tol, atol=tol)): return False, None, None return True, (var1_axis, var2_axis, var3_axis), (var1_dim, var2_dim, var3_dim) except Exception: return False, None, None
[docs] def load_data(self, fields=None, slice=None, snapshots=1, cut=None, coords='cartesian'): """Load one or multiple fields into a unified DataFrame. This method loads simulation data for the specified fields and snapshots, potentially applying a spatial slice or cut. The data is stored in an internal DataFrame (`self.df`) for further processing or interpolation. Parameters ---------- fields : str or list of str Name(s) of the fields to load (e.g., 'gasdens', 'gasv'). slice : str, optional Slice definition (e.g., 'z=0', 'theta=1.57'). snapshots : int or list of int, optional Snapshot number(s) to load. Can be a single integer or a range [start, end]. cut : list, optional Geometric cut parameters (sphere or cylinder mask). coords : str, optional Coordinate system for vector transformation ('cartesian' by default). Returns ------- pd.DataFrame The DataFrame containing the loaded data. Examples -------- Load density and velocity for snapshot 10: >>> loader = fp.FieldInterpolator(sim) >>> df = loader.load_data(fields=['gasdens', 'gasv'], snapshot=10) Load a 2D slice at z=0 (midplane): >>> df_slice = loader.load_data(fields='gasdens', slice='z=0', snapshot=10) """ # ------------------------- # Validate arguments # ------------------------- self._reset_caches() if fields is None: raise ValueError("You must specify at least one field.") if isinstance(fields, str): fields = [fields] self.fields = fields self.slice = slice # Convert snapshot into list - handle int, numpy types, and iterables if snapshots is None: snapshots = [0] elif isinstance(snapshots, (int, np.integer)): snapshots = [int(snapshots)] elif not isinstance(snapshots, list): # Convert arrays, tuples, etc. to list try: snapshots = list(snapshots) except TypeError: snapshots = [int(snapshots)] self.snapshot = snapshots # Detect dimensionality from the sliced data (if a slice is provided) if slice is not None: test_field = self.sim._load_field_raw('gasdens', snapshot=snapshots[0], field_type='scalar') try: data_slice, mesh = test_field.meshslice(slice=slice) self.dim = len(np.array(data_slice).shape) except Exception: # Fallback: assume full 3D self.dim = 3 else: self.dim = 3 # Snapshot & time arrays if len(snapshots) == 1: snaps = snapshots time_values = [0] else: snaps = np.arange(snapshots[0], snapshots[1] + 1) time_values = np.linspace(0, 1, len(snaps)) # Store snapshot-time table self.snapshot_time_table = pd.DataFrame({ "Snapshot": snaps, "Normalized_time": time_values }) # Slice handling if not hasattr(self.sim, "domains") or self.sim.domains is None: raise ValueError("Simulation domains are not loaded.") self._cache_domain_limits() self._slice_type = self._detect_slice_type(slice) self._slice_ranges = self._parse_slice_ranges(slice) # ------------------------- # Helper for rotation (phi slices) # ------------------------- def _rotation(X, Y, Z, phi0): X_rot = X * np.cos(phi0) + Y * np.sin(phi0) Y_rot = -X * np.sin(phi0) + Y * np.cos(phi0) return X_rot, Y_rot, Z.copy() # ===================================================================== # ======================== 2D CASE ================================ # ===================================================================== if self.dim < 3: # Collect rows and build DataFrame once to avoid repeated concat rows = [] for i, snap in enumerate(snaps): row = {'snapshot': snap, 'time': time_values[i]} coords_assigned = False # Only assign var1/var2/var3 once # Loop over requested fields for field in fields: # ----------------- # GASDENS 2D # ----------------- if field == 'gasdens': gasd = self.sim._load_field_raw('gasdens', snapshot=snap, field_type='scalar') data_slice, mesh = gasd.meshslice(slice=slice) # assign coordinates only once if not coords_assigned: if coords == 'cartesian': # rotate if phi is fixed try: if np.all(mesh.phi.ravel() == mesh.phi.ravel()[0]): phi0 = mesh.phi.ravel()[0] x_rot, y_rot, z_rot = _rotation(mesh.x, mesh.y, mesh.z, phi0) row['var1_mesh'] = x_rot row['var2_mesh'] = y_rot row['var3_mesh'] = z_rot else: row['var1_mesh'] = mesh.x row['var2_mesh'] = mesh.y row['var3_mesh'] = mesh.z except Exception: # Fallback if mesh lacks phi row['var1_mesh'] = mesh.x row['var2_mesh'] = mesh.y row['var3_mesh'] = mesh.z else: # original coordinate names as defined in simulation vnames = getattr(self.sim.vars, 'VARIABLES', ['x', 'y', 'z']) row['var1_mesh'] = getattr(mesh, vnames[0]) row['var2_mesh'] = getattr(mesh, vnames[1]) row['var3_mesh'] = getattr(mesh, vnames[2]) coords_assigned = True row['gasdens_mesh'] = data_slice # ----------------- # GASV 2D # ----------------- if field == 'gasv': gasv_raw = self.sim._load_field_raw('gasv', snapshot=snap, field_type='vector') if coords == 'cartesian': gasvx, gasvy, gasvz = gasv_raw.to_cartesian() v1, mesh = gasvx.meshslice(slice=slice) v2, mesh = gasvy.meshslice(slice=slice) v3, mesh = gasvz.meshslice(slice=slice) if not coords_assigned: row['var1_mesh'] = mesh.x row['var2_mesh'] = mesh.y row['var3_mesh'] = mesh.z coords_assigned = True row['gasv_mesh'] = np.array([v1, v2, v3]) else: v_slice, mesh = gasv_raw.meshslice(slice=slice) # Handle 2D (2 components) or 3D (3 components) ncomponents = len(v_slice) if ncomponents == 2: v1, v2 = v_slice[0], v_slice[1] if not coords_assigned: vnames = getattr(self.sim.vars, 'VARIABLES', ['x', 'y', 'z']) row['var1_mesh'] = getattr(mesh, vnames[0]) row['var2_mesh'] = getattr(mesh, vnames[1]) coords_assigned = True row['gasv_mesh'] = np.array([v1, v2]) else: # 3D v1, v2, v3 = v_slice[0], v_slice[1], v_slice[2] if not coords_assigned: vnames = getattr(self.sim.vars, 'VARIABLES', ['x', 'y', 'z']) row['var1_mesh'] = getattr(mesh, vnames[0]) row['var2_mesh'] = getattr(mesh, vnames[1]) row['var3_mesh'] = getattr(mesh, vnames[2]) coords_assigned = True row['gasv_mesh'] = np.array([v1, v2, v3]) # ----------------- # GASENERGY 2D # ----------------- if field == 'gasenergy': gasen = self.sim._load_field_raw('gasenergy', snapshot=snap, field_type='scalar') data_slice, mesh = gasen.meshslice(slice=slice) if not coords_assigned: if coords == 'cartesian': row['var1_mesh'] = mesh.x row['var2_mesh'] = mesh.y row['var3_mesh'] = mesh.z else: vnames = getattr(self.sim.vars, 'VARIABLES', ['x', 'y', 'z']) row['var1_mesh'] = getattr(mesh, vnames[0]) row['var2_mesh'] = getattr(mesh, vnames[1]) row['var3_mesh'] = getattr(mesh, vnames[2]) coords_assigned = True row['gasenergy_mesh'] = data_slice # collect row dicts and build DataFrame once rows.append(row) df_snapshots = pd.DataFrame(rows) self.df = df_snapshots return df_snapshots # ===================================================================== # ======================== 3D CASE ================================ # ===================================================================== if self.dim == 3: # Determine coordinate system coords_sys = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' # Compute structured cut indices (preserves grid structure) cut_info = None if cut is not None: cut_info = self._compute_cut_indices(cut, coords_system=coords_sys) if cut_info is None: raise ValueError("The specified cut does not intersect the simulation domain.") # Build mesh with optional structured cut if coords_sys == 'spherical': # Get coordinate arrays r_arr = self.sim.domains.r phi_arr = self.sim.domains.phi theta_arr = self.sim.domains.theta # Apply structured slicing if cut is specified if cut_info is not None: theta_slice = cut_info['theta_slice'] r_slice = cut_info['r_slice'] phi_slice = cut_info['phi_slice'] theta_arr = theta_arr[theta_slice] r_arr = r_arr[r_slice] phi_arr = phi_arr[phi_slice] # Build 3D structured mesh from sliced arrays theta, r, phi = np.meshgrid(theta_arr, r_arr, phi_arr, indexing='ij') x = r * np.sin(theta) * np.cos(phi) y = r * np.sin(theta) * np.sin(phi) z = r * np.cos(theta) elif coords_sys == 'cylindrical': # Get coordinate arrays r_arr = self.sim.domains.r phi_arr = self.sim.domains.phi z_arr = self.sim.domains.z # Apply structured slicing if cut is specified if cut_info is not None: z_slice = cut_info['z_slice'] r_slice = cut_info['r_slice'] phi_slice = cut_info['phi_slice'] z_arr = z_arr[z_slice] r_arr = r_arr[r_slice] phi_arr = phi_arr[phi_slice] # Build 3D structured mesh z, r, phi = np.meshgrid(z_arr, r_arr, phi_arr, indexing='ij') x = r * np.cos(phi) y = r * np.sin(phi) else: raise ValueError(f"Unsupported coordinate system: {coords_sys}") # Collect rows and build DataFrame once to avoid repeated concat rows = [] for i, snap in enumerate(snaps): row = {'snapshot': snap, 'time': time_values[i]} coords_assigned = False # Loop over requested fields for field in fields: # ----------------- # GASDENS 3D # ----------------- if field == "gasdens": gasd = self.sim._load_field_raw('gasdens', snapshot=snap, field_type='scalar') if not coords_assigned: if coords == 'cartesian': row["var1_mesh"] = x row["var2_mesh"] = y row["var3_mesh"] = z else: # original coordinate variables order v0, v1, v2 = self.sim.vars.VARIABLES mapping = dict(r=r, phi=phi, theta=theta, x=x, y=y, z=z) row["var1_mesh"] = mapping[v0] row["var2_mesh"] = mapping[v1] row["var3_mesh"] = mapping[v2] coords_assigned = True # Apply structured slicing to data if cut_info is not None: if coords_sys == 'spherical': data_sliced = gasd.data[ cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice'] ] else: # cylindrical data_sliced = gasd.data[ cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice'] ] row["gasdens_mesh"] = data_sliced else: row["gasdens_mesh"] = gasd.data # ----------------- # GASV 3D # ----------------- if field == "gasv": gasv_raw = self.sim._load_field_raw('gasv', snapshot=snap, field_type='vector') if coords == 'cartesian': gasvx, gasvy, gasvz = gasv_raw.to_cartesian() if not coords_assigned: row["var1_mesh"] = x row["var2_mesh"] = y row["var3_mesh"] = z coords_assigned = True # Apply structured slicing to vector components if cut_info is not None: if coords_sys == 'spherical': row["gasv_mesh"] = np.array([ gasvx.data[cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']], gasvy.data[cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']], gasvz.data[cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']] ]) else: # cylindrical row["gasv_mesh"] = np.array([ gasvx.data[cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']], gasvy.data[cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']], gasvz.data[cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']] ]) else: row["gasv_mesh"] = np.array([gasvx.data, gasvy.data, gasvz.data]) else: vdata = gasv_raw.data if not coords_assigned: v0, v1, v2 = self.sim.vars.VARIABLES mapping = dict(r=r, phi=phi, theta=theta, x=x, y=y, z=z) row["var1_mesh"] = mapping[v0] row["var2_mesh"] = mapping[v1] row["var3_mesh"] = mapping[v2] coords_assigned = True # Apply structured slicing to vector components if cut_info is not None: if coords_sys == 'spherical': row["gasv_mesh"] = np.array([ vdata[0][cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']], vdata[1][cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']], vdata[2][cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice']] ]) else: # cylindrical row["gasv_mesh"] = np.array([ vdata[0][cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']], vdata[1][cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']], vdata[2][cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice']] ]) else: row["gasv_mesh"] = np.array([vdata[0], vdata[1], vdata[2]]) # ----------------- # GASENERGY 3D # ----------------- if field == "gasenergy": gasen = self.sim._load_field_raw('gasenergy', snapshot=snap, field_type='scalar') if not coords_assigned: if coords == 'cartesian': row["var1_mesh"] = x row["var2_mesh"] = y row["var3_mesh"] = z else: v0, v1, v2 = self.sim.vars.VARIABLES mapping = dict(r=r, phi=phi, theta=theta, x=x, y=y, z=z) row["var1_mesh"] = mapping[v0] row["var2_mesh"] = mapping[v1] row["var3_mesh"] = mapping[v2] coords_assigned = True # Apply structured slicing to data if cut_info is not None: if coords_sys == 'spherical': data_sliced = gasen.data[ cut_info['theta_slice'], cut_info['r_slice'], cut_info['phi_slice'] ] else: # cylindrical data_sliced = gasen.data[ cut_info['z_slice'], cut_info['r_slice'], cut_info['phi_slice'] ] row["gasenergy_mesh"] = data_sliced else: row["gasenergy_mesh"] = gasen.data # collect row dicts and build DataFrame once rows.append(row) df_snapshots = pd.DataFrame(rows) # Detect if this is a regular grid # With structured cuts, the grid remains regular! if len(rows) > 0: first_row = rows[0] var1_test = first_row["var1_mesh"] var2_test = first_row["var2_mesh"] var3_test = first_row["var3_mesh"] is_regular, axes, axis_order = self._detect_regular_grid( var1_test, var2_test, var3_test ) self._is_regular_grid = is_regular self._grid_axes = axes self._axis_order = axis_order self._original_coords = coords else: self._is_regular_grid = False self._grid_axes = None self._axis_order = None self._original_coords = coords self.df = df_snapshots return df_snapshots
[docs] def times(self): """ Return the snapshot time table mapping snapshots to normalized time. Returns ------- pd.DataFrame DataFrame with columns 'Snapshot' and 'Normalized_time'. Raises ------ ValueError If no data has been loaded. """ if self.snapshot_time_table is None: raise ValueError("No data loaded. Run load_data() first.") return self.snapshot_time_table
[docs] def create_mesh( self, slice=None, nr=50, ntheta=50, nphi=50, nz=50 ): """ Create a mesh grid based on the slice definition provided by the user. If no slice is provided, create a full 3D mesh within the simulation domain. Supports both spherical and cylindrical coordinate systems. Parameters ---------- slice : str, optional The slice definition string (e.g., "r=[0.8,1.2],phi=0,theta=[0 deg,90 deg]" for spherical or "r=[0.8,1.2],phi=0,z=[0,0.5]" for cylindrical). nr : int Number of divisions in r. ntheta : int Number of divisions in theta (spherical only). nphi : int Number of divisions in phi. nz : int Number of divisions in z (cylindrical only). Returns ------- tuple Mesh grid (x, y, z) based on the slice definition or the full domain. """ import numpy as np import re # Get coordinate system coords = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' # If no slice is provided, create a full 3D mesh using the simulation domains if not slice: if coords == 'spherical': r = np.linspace(self.sim.domains.r.min(), self.sim.domains.r.max(), nr) theta = np.linspace(self.sim.domains.theta.min(), self.sim.domains.theta.max(), ntheta) phi = np.linspace(self.sim.domains.phi.min(), self.sim.domains.phi.max(), nphi) theta_grid, r_grid, phi_grid = np.meshgrid(theta, r, phi, indexing='ij') x = r_grid * np.sin(theta_grid) * np.cos(phi_grid) y = r_grid * np.sin(theta_grid) * np.sin(phi_grid) z = r_grid * np.cos(theta_grid) return x, y, z elif coords == 'cylindrical': r = np.linspace(self.sim.domains.r.min(), self.sim.domains.r.max(), nr) z = np.linspace(self.sim.domains.z.min(), self.sim.domains.z.max(), nz) phi = np.linspace(self.sim.domains.phi.min(), self.sim.domains.phi.max(), nphi) z_grid, r_grid, phi_grid = np.meshgrid(z, r, phi, indexing='ij') x = r_grid * np.cos(phi_grid) y = r_grid * np.sin(phi_grid) return x, y, z_grid # Initialize default ranges based on coordinate system r_range = [self.sim.domains.r.min(), self.sim.domains.r.max()] phi_range = [self.sim.domains.phi.min(), self.sim.domains.phi.max()] if coords == 'spherical': theta_range = [self.sim.domains.theta.min(), self.sim.domains.theta.max()] z_range = None else: # cylindrical theta_range = None z_range = [self.sim.domains.z.min(), self.sim.domains.z.max()] # Regular expressions to extract parameters range_pattern = re.compile(r"(\w+)=\[(.+?)\]") # Matches ranges like r=[0.8,1.2] value_pattern = re.compile(r"(\w+)=([-\d.]+)") # Matches single values like phi=0 or z=0 degree_pattern = re.compile(r"([-\d.]+) deg") # Matches angles in degrees like -25 deg # Process ranges for match in range_pattern.finditer(slice): key, values = match.groups() values = [float(degree_pattern.sub(lambda m: str(float(m.group(1)) * np.pi / 180), v.strip())) for v in values.split(',')] if key == 'r': r_range = values elif key == 'phi': phi_range = values elif key == 'theta': theta_range = values if coords == 'spherical' else theta_range elif key == 'z': z_range = values if coords == 'cylindrical' else z_range # Process single values z_value = None theta_value = None for match in value_pattern.finditer(slice): key, value = match.groups() value = float(degree_pattern.sub(lambda m: str(float(m.group(1)) * np.pi / 180), value)) if key == 'z': z_value = value if coords == 'cylindrical': z_range = [value, value] elif key == 'phi': phi_range = [value, value] elif key == 'theta': theta_value = value if coords == 'spherical': theta_range = [value, value] # SPHERICAL COORDINATES if coords == 'spherical': # 3D mesh: all ranges are intervals if (phi_range[0] != phi_range[1]) and (theta_range[0] != theta_range[1]): r = np.linspace(r_range[0], r_range[1], nr) theta = np.linspace(theta_range[0], theta_range[1], ntheta) phi = np.linspace(phi_range[0], phi_range[1], nphi) theta_grid, r_grid, phi_grid = np.meshgrid(theta, r, phi, indexing='ij') x = r_grid * np.sin(theta_grid) * np.cos(phi_grid) y = r_grid * np.sin(theta_grid) * np.sin(phi_grid) z = r_grid * np.cos(theta_grid) return x, y, z # 2D mesh: one angle is fixed (slice) elif phi_range[0] == phi_range[1]: # Slice at constant phi (XZ plane) r = np.linspace(r_range[0], r_range[1], nr) theta = np.linspace(theta_range[0], theta_range[1], ntheta) phi = phi_range[0] theta_grid, r_grid = np.meshgrid(theta, r, indexing='ij') x = r_grid * np.sin(theta_grid) * np.cos(phi) y = r_grid * np.sin(theta_grid) * np.sin(phi) z = r_grid * np.cos(theta_grid) return x, y, z elif theta_range[0] == theta_range[1]: # Slice at constant theta (XY plane) r = np.linspace(r_range[0], r_range[1], nr) phi = np.linspace(phi_range[0], phi_range[1], nphi) theta = theta_range[0] phi_grid, r_grid = np.meshgrid(phi, r, indexing='ij') x = r_grid * np.sin(theta) * np.cos(phi_grid) y = r_grid * np.sin(theta) * np.sin(phi_grid) z = r_grid * np.cos(theta) return x, y, z elif z_value is not None: # Slice at constant z (XY plane in Cartesian) r = np.linspace(r_range[0], r_range[1], nr) phi = np.linspace(phi_range[0], phi_range[1], nphi) r_grid, phi_grid = np.meshgrid(r, phi, indexing='ij') x = r_grid * np.cos(phi_grid) y = r_grid * np.sin(phi_grid) z = np.full_like(x, z_value) return x, y, z else: raise ValueError("Slice definition for spherical coordinates must include either 'z', 'phi', or 'theta'.") # CYLINDRICAL COORDINATES elif coords == 'cylindrical': # 3D mesh: all ranges are intervals if (phi_range[0] != phi_range[1]) and (z_range[0] != z_range[1]): r = np.linspace(r_range[0], r_range[1], nr) z = np.linspace(z_range[0], z_range[1], nz) phi = np.linspace(phi_range[0], phi_range[1], nphi) z_grid, r_grid, phi_grid = np.meshgrid(z, r, phi, indexing='ij') x = r_grid * np.cos(phi_grid) y = r_grid * np.sin(phi_grid) return x, y, z_grid # 2D mesh: one coordinate is fixed (slice) elif phi_range[0] == phi_range[1]: # Slice at constant phi (RZ plane) r = np.linspace(r_range[0], r_range[1], nr) z = np.linspace(z_range[0], z_range[1], nz) phi = phi_range[0] z_grid, r_grid = np.meshgrid(z, r, indexing='ij') x = r_grid * np.cos(phi) y = r_grid * np.sin(phi) return x, y, z_grid elif z_range[0] == z_range[1]: # Slice at constant z (XY plane) r = np.linspace(r_range[0], r_range[1], nr) phi = np.linspace(phi_range[0], phi_range[1], nphi) z = z_range[0] phi_grid, r_grid = np.meshgrid(phi, r, indexing='ij') x = r_grid * np.cos(phi_grid) y = r_grid * np.sin(phi_grid) z_out = np.full_like(x, z) return x, y, z_out else: raise ValueError("Slice definition for cylindrical coordinates must include either 'z', 'phi', or 'r'.") else: raise ValueError(f"Unsupported coordinate system: {coords}")
def _domain_mask(self, xi, slice=None): """ Build a boolean mask that keeps only coordinates within the simulation domain and enforces any user-specified radial/angle/z limits for various slice types. Supports both spherical and cylindrical coordinate systems. """ self._cache_domain_limits() slice = slice or self.slice slice_ranges = self._slice_ranges or self._parse_slice_ranges(slice) # Get coordinate system coords = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' r_bounds = slice_ranges.get('r') theta_bounds = slice_ranges.get('theta') phi_bounds = slice_ranges.get('phi') z_bounds = slice_ranges.get('z') r_min, r_max = self._domain_limits['r'] phi_min, phi_max = self._domain_limits['phi'] # Get theta or z limits depending on coordinate system if coords == 'spherical': theta_min, theta_max = self._domain_limits['theta'] z_min, z_max = None, None else: # cylindrical theta_min, theta_max = None, None z_min, z_max = self._domain_limits['z'] eps = 1e-7 xi = np.asarray(xi) ndim = xi.shape[1] if xi.ndim > 1 else 1 def _bounded(vals, bounds, default): if bounds is None: return vals >= default[0] - eps, vals <= default[1] + eps lo, hi = bounds return vals >= lo - eps, vals <= hi + eps def _phi_in_bounds(phi_vals): if phi_bounds is None: return np.ones_like(phi_vals, dtype=bool) lo, hi = phi_bounds if lo <= hi: return (phi_vals >= lo - eps) & (phi_vals <= hi + eps) return (phi_vals >= lo - eps) | (phi_vals <= hi + eps) if ndim == 2: # Handle 2D slices based on coordinate system if coords == 'spherical': # XY plane: theta fixed if slice is not None and 'theta' in slice: # XY plane: z = 0, theta fixed, filter by r and phi x, y = xi[:, 0], xi[:, 1] r = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) r_ge, r_le = _bounded(r, r_bounds, (r_min, r_max)) mask = r_ge & r_le & _phi_in_bounds(phi) return mask # XZ plane: phi fixed elif slice is not None and 'phi' in slice: # XZ plane: y = 0, phi fixed, filter by r and theta x, z = xi[:, 0], xi[:, 1] r = np.sqrt(x**2 + z**2) theta = np.arccos(z / np.clip(r, 1e-14, None)) r_ge, r_le = _bounded(r, r_bounds, (r_min, r_max)) if theta_bounds: lo, hi = theta_bounds theta_mask = (theta >= lo - eps) & (theta <= hi + eps) else: theta_mask = ( ((theta > theta_min) | np.isclose(theta, theta_min, atol=eps)) & ((theta < theta_max) | np.isclose(theta, theta_max, atol=eps)) ) return r_ge & r_le & theta_mask else: # Default: treat as XY (theta fixed) x, y = xi[:, 0], xi[:, 1] r = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) mask = (r > r_min) & (r < r_max) return mask else: # cylindrical # XY plane: z fixed if slice is not None and 'z' in slice: # XY plane: z fixed, filter by r and phi x, y = xi[:, 0], xi[:, 1] r = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) r_ge, r_le = _bounded(r, r_bounds, (r_min, r_max)) mask = r_ge & r_le & _phi_in_bounds(phi) return mask # RZ plane: phi fixed elif slice is not None and 'phi' in slice: # RZ plane: phi fixed, filter by r and z # Assuming xi[:, 0] is r-like and xi[:, 1] is z-like x, y = xi[:, 0], xi[:, 1] # Convert to cylindrical r, z r = np.sqrt(x**2 + y**2) # For phi-slice in cylindrical, we may receive (x,z) coordinates # Need to determine which is which based on the slice z_ge, z_le = _bounded(y, z_bounds, (z_min, z_max)) r_ge, r_le = _bounded(r, r_bounds, (r_min, r_max)) return r_ge & r_le & z_ge & z_le else: # Default: treat as XY (z fixed) x, y = xi[:, 0], xi[:, 1] r = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) mask = (r > r_min) & (r < r_max) return mask elif ndim == 1: # 1D input: could be r, theta, phi (spherical) or r, z, phi (cylindrical) xi_1d = np.asarray(xi).ravel() # Decide which variable is the "free" one in the 1D cut. # Prefer explicit ranges (r=[...], theta=[...], phi=[...], z=[...]); otherwise, # the free variable is the one that does NOT appear as a scalar in the slice. r_b = r_bounds th_b = theta_bounds ph_b = phi_bounds z_b = z_bounds def _is_range(b): return (b is not None) and (abs(b[1] - b[0]) > 1e-12) if _is_range(r_b): free = 'r' elif _is_range(th_b): free = 'theta' elif _is_range(ph_b): free = 'phi' elif _is_range(z_b): free = 'z' else: s = (slice or self.slice) or "" s_low = s.lower() has_r = re.search(r"\br\s*=", s_low) is not None has_th = re.search(r"\btheta\s*=", s_low) is not None has_ph = re.search(r"\bphi\s*=", s_low) is not None has_z = re.search(r"\bz\s*=", s_low) is not None # the free variable is the one not present in the slice specification if coords == 'cylindrical': if not has_r: free = 'r' elif not has_z: free = 'z' elif not has_ph: free = 'phi' else: free = 'r' else: # spherical if not has_r: free = 'r' elif not has_th: free = 'theta' elif not has_ph: free = 'phi' else: free = 'r' # Build mask depending on which variable is free if free == 'r': lo, hi = (r_b if r_b is not None else (r_min, r_max)) mask = (xi_1d >= lo - eps) & (xi_1d <= hi + eps) return mask if free == 'theta': if theta_min is not None and theta_max is not None: lo, hi = (th_b if th_b is not None else (theta_min, theta_max)) mask = (xi_1d >= lo - eps) & (xi_1d <= hi + eps) return mask else: return np.ones_like(xi_1d, dtype=bool) if free == 'z': if z_min is not None and z_max is not None: lo, hi = (z_b if z_b is not None else (z_min, z_max)) mask = (xi_1d >= lo - eps) & (xi_1d <= hi + eps) return mask else: return np.ones_like(xi_1d, dtype=bool) # free == 'phi' lo, hi = (ph_b if ph_b is not None else (phi_min, phi_max)) if lo <= hi: mask = (xi_1d >= lo - eps) & (xi_1d <= hi + eps) else: # wrap-around range (e.g. [5.5, 0.5] in radians) mask = (xi_1d >= lo - eps) | (xi_1d <= hi + eps) return mask if ndim==3: mask = np.ones(xi.shape[0],dtype=bool) return mask
[docs] def evaluate( self, var1, snapshot=None, time=None, var2=None, var3=None, dataframe=None, interpolator="griddata", method="linear", rbf_kwargs=None, griddata_kwargs=None, idw_kwargs=None, sigma_smooth=None, field=None, reflect=False, coords=None ): """ Evaluate the selected field at arbitrary spatial coordinates using multi-snapshot interpolation. Supports scalar and vector fields, 1D/2D/3D geometry, time interpolation, and several interpolation backends. Designed for unified DataFrames (gasdens + gasv + others). Parameters ---------- time : float or int Normalized time in [0,1] or snapshot index. var1, var2, var3 : array-like or float Evaluation coordinates. For 3D: typically (x, y, z) if coords='cartesian', (phi, r, theta) if coords='spherical', or (phi, r, z) if coords='cylindrical'. The coordinate system is determined by the 'coords' parameter. Scalars are accepted. dataframe : pandas.DataFrame, optional Custom DataFrame. If omitted, self.df is used. interpolator : {"griddata", "rbf", "linearnd", "idw", "regular_grid"}, default "griddata" Backend used for spatial interpolation: - "griddata": scipy.interpolate.griddata (default, works for irregular grids) - "rbf": Radial Basis Function interpolation - "linearnd": LinearNDInterpolator - "idw": Inverse Distance Weighting - "regular_grid": scipy.interpolate.RegularGridInterpolator (automatic for 3D regular grids, fastest option when applicable) Note: For 3D data with regular grid structure, RegularGridInterpolator is automatically used by default regardless of the interpolator setting, with automatic fallback to griddata if the grid is irregular. method : str, default "linear" Kernel/method used by backend (e.g., 'linear', 'cubic' for griddata). sigma_smooth : float or None Optional Gaussian smoothing parameter. field : {"gasdens", "gasv", "gasenergy"} or None Field to evaluate. If None and DF has exactly one field, it is auto-selected. If multiple fields exist, explicit selection is required. reflect : bool, default False If True, enable symmetry-based reflection for interpolation. When interpolating at points outside the loaded domain, the method attempts to evaluate at the corresponding reflected point and uses that value. The reflection is performed in the coordinate system specified by the 'coords' parameter used in load_field(): - Cartesian: reflects across z=0 (z -> -z) - Spherical: reflects across equatorial plane (theta -> π - theta) - Cylindrical: reflects across z=0 (z -> -z) This is useful for enforcing symmetry in simulations that are symmetric about the equatorial/midplane. Works efficiently with RegularGridInterpolator by only reflecting points that fall outside the domain (NaN values). coords : {"cartesian", "spherical", "cylindrical", None}, default None Target coordinate system for the evaluation points (var1, var2, var3). If None (default), uses the original coordinate system from load_data. - "cartesian": var1=x, var2=y, var3=z - "spherical": var1=phi, var2=r, var3=theta - "cylindrical": var1=phi, var2=r, var3=z The method automatically converts between coordinate systems as needed. This allows querying data in any coordinate system regardless of how it was originally loaded. Returns ------- ndarray or float Interpolated value(s). Vector fields return shape (3, N) or (3, ...). Examples -------- Evaluate density at Cartesian coordinates: >>> loader.evaluate(var1=x_points, var2=y_points, var3=z_points, ... field='gasdens', time=0.5, coords='cartesian') Evaluate in spherical coordinates: >>> loader.evaluate(var1=phi_vals, var2=r_vals, var3=theta_vals, ... field='gasdens', time=0.5, coords='spherical') Notes ----- For 3D regular grids (uniform spacing in each dimension), the method automatically uses RegularGridInterpolator for optimal performance, regardless of the 'interpolator' parameter. This provides significant speedup (often 10-100x) compared to griddata for large datasets. The coordinate transformation feature allows seamless querying in different coordinate systems. The original simulation data coordinate system is preserved internally, and transformations are applied automatically. """ # =============================================================== # Time or snap # =============================================================== if time is None: if snapshot is None: raise ValueError("Must provide either 'time' or 'snapshot'.") time = snapshot # =============================================================== # Coordinate system handling # =============================================================== # Determine target coordinate system (default: original from load_data) if coords is None: coords_target = self._original_coords if self._original_coords else 'cartesian' else: coords_target = coords # Store original coordinate system from data coords_original = self._original_coords if self._original_coords else 'cartesian' # =============================================================== # Basic validation # =============================================================== if sigma_smooth is not None and sigma_smooth <= 0: raise ValueError("sigma_smooth must be None or positive.") df = dataframe if dataframe is not None else self.df if df is None: raise ValueError("No dataframe available.") # =============================================================== # FIELD SELECTION (safe and explicit) # =============================================================== field_map = { "gasdens": "gasdens_mesh", "gasv": "gasv_mesh", "gasenergy": "gasenergy_mesh" } if field is not None: if field in field_map: field = field_map[field] if field not in df.columns: raise ValueError( f"Field '{field}' not in DF. Available: {list(df.columns)}" ) else: # Autodetect only if exactly one exists candidates = [ c for c in df.columns if c in ("gasdens_mesh","gasv_mesh","gasenergy_mesh") ] if len(candidates) != 1: raise ValueError( f"Multiple fields present {candidates}. " "Specify field='gasdens', 'gasv', or 'gasenergy'." ) field = candidates[0] # =============================================================== # Prepare snapshot ordering # =============================================================== df_sorted = self._get_sorted_dataframe(df) times = df_sorted["time"].values nsnaps = len(times) # Detect scalar inputs is_scalar = ( np.isscalar(var1) and (var2 is None or np.isscalar(var2)) and (var3 is None or np.isscalar(var3)) ) result_shape = () if is_scalar else np.asarray(var1).shape if np.isscalar(var1): var1 = np.array([var1]) if np.isscalar(var2): var2 = np.array([var2]) if np.isscalar(var3): var3 = np.array([var3]) # Convenience: allow calling evaluate(var1=..., var2=...) for # 2D XZ or RZ slices where the expected coordinates are (var1,var3). # If the slice is XZ/RZ (not XY) and user passed var2 but left var3=None, # treat var2 as var3. try: slice_type_tmp = self._slice_type or self._detect_slice_type(self.slice) coords_sys = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' except Exception: slice_type_tmp = None coords_sys = 'spherical' # Determine if this is an XY plane or XZ/RZ plane is_xy_plane = False if coords_sys == 'spherical': is_xy_plane = (slice_type_tmp == 'theta') else: # cylindrical is_xy_plane = (slice_type_tmp == 'z') if self.dim == 2 and slice_type_tmp is not None and not is_xy_plane: if var3 is None and var2 is not None: var3 = var2 var2 = None # =============================================================== # Smoothing helper # =============================================================== def _smooth(values): if sigma_smooth is None or np.isscalar(values): return values arr = np.asarray(values) if arr.ndim == 0: return values # Vector smoothing if field == "gasv_mesh" and arr.ndim >= 2: out = np.empty_like(arr) for k in range(arr.shape[0]): out[k] = gaussian_filter(arr[k], sigma=sigma_smooth) return out return gaussian_filter(arr, sigma=sigma_smooth) # =============================================================== # Coordinate transformation helpers # =============================================================== def transform_coords(var1_in, var2_in, var3_in, from_system, to_system): """Transform coordinates between cartesian, spherical, and cylindrical systems.""" if from_system == to_system: return var1_in, var2_in, var3_in # Convert to arrays v1 = np.asarray(var1_in) v2 = np.asarray(var2_in) if var2_in is not None else np.zeros_like(v1) v3 = np.asarray(var3_in) if var3_in is not None else np.zeros_like(v1) # First convert to Cartesian (common intermediate) if from_system == 'cartesian': x, y, z = v1, v2, v3 elif from_system == 'spherical': # spherical: var1=phi, var2=r, var3=theta phi, r, theta = v1, v2, v3 x = r * np.sin(theta) * np.cos(phi) y = r * np.sin(theta) * np.sin(phi) z = r * np.cos(theta) elif from_system == 'cylindrical': # cylindrical: var1=phi, var2=r, var3=z phi, r_cyl, z = v1, v2, v3 x = r_cyl * np.cos(phi) y = r_cyl * np.sin(phi) else: raise ValueError(f"Unknown coordinate system: {from_system}") # Convert from Cartesian to target system if to_system == 'cartesian': return x, y, z elif to_system == 'spherical': r = np.sqrt(x**2 + y**2 + z**2) theta = np.arccos(np.clip(z / (r + 1e-30), -1, 1)) phi = np.arctan2(y, x) return phi, r, theta elif to_system == 'cylindrical': r_cyl = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) return phi, r_cyl, z else: raise ValueError(f"Unknown coordinate system: {to_system}") # =============================================================== # Interpolation backends # =============================================================== def regular_grid_interp(axes, values, xi, axis_order): """ Fast interpolation for regular grids using RegularGridInterpolator. Parameters ---------- axes : tuple of 1D arrays Coordinate axes (var1_axis, var2_axis, var3_axis). values : ndarray Data values on the regular grid. xi : ndarray Query points (N, 3). axis_order : tuple of int Dimension order (var1_dim, var2_dim, var3_dim). Returns ------- ndarray Interpolated values at query points. """ var1_axis, var2_axis, var3_axis = axes var1_dim, var2_dim, var3_dim = axis_order n_query = xi.shape[0] # Create axes in the order matching the data array dimensions axes_ordered = [None, None, None] axes_ordered[var1_dim] = var1_axis axes_ordered[var2_dim] = var2_axis axes_ordered[var3_dim] = var3_axis # Handle reversed axes (decreasing instead of increasing) flips = [False, False, False] for i, ax in enumerate(axes_ordered): if len(ax) > 1 and np.diff(ax)[0] < 0: axes_ordered[i] = np.flip(ax) flips[i] = True # Flip data array accordingly values_fixed = values.copy() for i, flip in enumerate(flips): if flip: values_fixed = np.flip(values_fixed, axis=i) # Create interpolator try: interp_obj = RegularGridInterpolator( tuple(axes_ordered), values_fixed, method=method if method in ['linear', 'nearest'] else 'linear', bounds_error=False, fill_value=np.nan ) except Exception as e: # Fallback to None if creation fails return None # Prepare query points: xi has columns [var1, var2, var3] # We need to reorder them to [dim0_coord, dim1_coord, dim2_coord] xi_ordered_list = [None, None, None] xi_ordered_list[var1_dim] = xi[:, 0] xi_ordered_list[var2_dim] = xi[:, 1] xi_ordered_list[var3_dim] = xi[:, 2] xi_ordered = np.column_stack(xi_ordered_list) # Interpolate with automatic chunking for large datasets chunk_size = 5_000_000 if n_query > chunk_size: # Use chunking for large queries to avoid memory issues result = np.empty(n_query, dtype=np.float64) n_chunks = int(np.ceil(n_query / chunk_size)) for i in range(n_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, n_query) chunk_xi = xi_ordered[start_idx:end_idx] try: result[start_idx:end_idx] = interp_obj(chunk_xi) except Exception as e: return None return result else: # Small query - no chunking needed try: return interp_obj(xi_ordered) except Exception as e: return None def idw_interp(coords, values, xi): coords = np.asarray(coords) values = np.asarray(values).ravel() xi = np.asarray(xi) mask = self._domain_mask(xi) if reflect: mask = np.ones(xi.shape[0], dtype=bool) out = np.zeros(xi.shape[0]) tree = cKDTree(coords) k = idw_kwargs.get("k", 8) power = idw_kwargs.get("power", 2) # If mask selects points, compute only there. Otherwise try for all xi. if np.any(mask): d, idxs = tree.query(xi[mask], k=k) d = np.where(d == 0, 1e-10, d) w = 1 / d**power w /= w.sum(axis=1, keepdims=True) out[mask] = np.sum(values[idxs] * w, axis=1) return out # Fallback: compute for all xi d, idxs = tree.query(xi, k=k) d = np.where(d == 0, 1e-10, d) w = 1 / d**power w /= w.sum(axis=1, keepdims=True) out = np.sum(values[idxs] * w, axis=1) return out def rbf_interp(coords, values, xi): coords = np.asarray(coords) values = np.asarray(values).ravel() xi = np.asarray(xi) mask = self._domain_mask(xi) if reflect: mask = np.ones(xi.shape[0], dtype=bool) out = np.full(xi.shape[0], np.nan) # Try interpolate where mask True try: obj = RBFInterpolator(coords, values, kernel=method, **(rbf_kwargs or {})) except Exception: return np.zeros(xi.shape[0]) if np.any(mask): out[mask] = obj(xi[mask]) # attempt to leave other positions as NaN return np.where(np.isfinite(out), out, np.nan) # Fallback: evaluate on all xi vals_all = obj(xi) return np.where(np.isfinite(vals_all), vals_all, np.nan) def griddata_interp(coords, values, xi): # --- Apply domain mask: only interpolate inside the simulation domain --- mask = self._domain_mask(xi) if reflect: mask = np.ones(xi.shape[0], dtype=bool) out = np.full(xi.shape[0], np.nan) # If mask has selected points, interpolate only there if np.any(mask): out[mask] = griddata(coords, values.ravel(), xi[mask], method=method) # leave outside as NaN -> caller can mask later return np.where(np.isfinite(out), out, np.nan) # Fallback: try interpolate for all xi (useful when domain mask selection fails) try: vals_all = griddata(coords, values.ravel(), xi, method=method) return np.where(np.isfinite(vals_all), vals_all, np.nan) except Exception: return np.zeros(xi.shape[0]) def linearnd_interp(coords, values, xi): coords = np.asarray(coords) values = np.asarray(values).ravel() xi = np.asarray(xi) mask = self._domain_mask(xi) if reflect: mask = np.ones(xi.shape[0], dtype=bool) out = np.full(xi.shape[0], np.nan) obj = LinearNDInterpolator(coords, values) if np.any(mask): out[mask] = obj(xi[mask]) return np.where(np.isfinite(out), out, np.nan) # Fallback: evaluate on all xi vals_all = obj(xi) return np.where(np.isfinite(vals_all), vals_all, np.zeros_like(vals_all)) # =============================================================== # Main interpolation kernel # =============================================================== slice_type = self._slice_type or self._detect_slice_type(self.slice) def interp(idx, field_name, comp=None): row = df_sorted.iloc[idx] cx = np.array(row["var1_mesh"]) cy = np.array(row["var2_mesh"]) cz = np.array(row["var3_mesh"]) # ========================================================= # COORDINATE TRANSFORMATION (SIMPLIFIED) # ========================================================= # Only apply transformation if explicitly requested and different from original # Default: use coordinates as-is (no transformation) if (self.dim == 3 and coords is not None and coords_target != coords_original and var1 is not None): # User explicitly requested different coordinate system try: var1_eval = var1 var2_eval = var2 if var2 is not None else np.zeros_like(var1) var3_eval = var3 if var3 is not None else np.zeros_like(var1) # Transform coordinates var1_transformed, var2_transformed, var3_transformed = transform_coords( var1_eval, var2_eval, var3_eval, from_system=coords_target, to_system=coords_original ) var1_use = var1_transformed var2_use = var2_transformed var3_use = var3_transformed except Exception as e: # If transformation fails, use original coordinates var1_use = var1 var2_use = var2 var3_use = var3 else: # No transformation needed - use original coordinates var1_use = var1 var2_use = var2 var3_use = var3 # Build coordinate arrays if self.dim == 3: points_array = np.column_stack((cx.ravel(), cy.ravel(), cz.ravel())) xi = np.column_stack((var1_use.ravel(), var2_use.ravel(), var3_use.ravel())) # ========================================================= # TRY REGULAR GRID INTERPOLATOR FOR 3D (FAST PATH) # ========================================================= # Use RegularGridInterpolator if grid is regular (much faster) # With reflect=True, we still use RegularGridInterpolator but handle # reflection by evaluating reflected points when primary evaluation fails use_regular_grid = (self._is_regular_grid and self._grid_axes is not None and self._axis_order is not None) if use_regular_grid: # Select field data data_3d = row[field_name] # Handle vector component selection before interpolation if isinstance(data_3d, np.ndarray) and comp is not None: if data_3d.ndim == 4 and data_3d.shape[0] == 3: data_3d = data_3d[comp] elif data_3d.ndim == 2 and data_3d.shape[0] == 3: # Might be already flattened (for vector in masked case) expected_size = cx.size if data_3d.shape[1] == expected_size: data_3d = data_3d[comp].reshape(cx.shape) # Try regular grid interpolation if isinstance(data_3d, np.ndarray) and data_3d.ndim == 3: result_rg = regular_grid_interp( self._grid_axes, data_3d, xi, self._axis_order ) if result_rg is not None: # Success with regular grid interpolator! # Handle reflection if requested if reflect: # Strategy: For all points, attempt reflection to fill gaps # Priority: use primary value if available, otherwise use reflected value # Special case: for points very close to symmetry plane, try both # Calculate ALL reflected coordinates xi_reflected = xi.copy() # Determine symmetry plane location and tolerance if coords_original == 'spherical': # Symmetry plane is at theta = π/2 (equatorial plane) # Reflect theta: theta -> π - theta (index 2) xi_reflected[:, 2] = np.pi - xi_reflected[:, 2] symmetry_coord = xi[:, 2] # theta values symmetry_plane = np.pi / 2 # Tolerance: consider "near symmetry plane" as within ~2 grid spacings if self._grid_axes is not None and len(self._grid_axes) > 2: coord_axis = self._grid_axes[2] if self._axis_order[2] == 2 else self._grid_axes[self._axis_order.index(2)] if len(coord_axis) > 1: grid_spacing = np.min(np.abs(np.diff(coord_axis))) tolerance = 2.0 * grid_spacing else: tolerance = 0.05 # fallback ~3 degrees else: tolerance = 0.05 elif coords_original == 'cylindrical': # Symmetry plane is at z = 0 (midplane) # Reflect z: z -> -z (index 2) xi_reflected[:, 2] *= -1 symmetry_coord = xi[:, 2] # z values symmetry_plane = 0.0 if self._grid_axes is not None and len(self._grid_axes) > 2: coord_axis = self._grid_axes[2] if self._axis_order[2] == 2 else self._grid_axes[self._axis_order.index(2)] if len(coord_axis) > 1: grid_spacing = np.min(np.abs(np.diff(coord_axis))) tolerance = 2.0 * grid_spacing else: tolerance = 0.1 else: tolerance = 0.1 else: # cartesian # Symmetry plane is at z = 0 (midplane) # Reflect z: z -> -z (index 2) xi_reflected[:, 2] *= -1 symmetry_coord = xi[:, 2] # z values symmetry_plane = 0.0 tolerance = 0.1 # Identify points near symmetry plane distance_to_plane = np.abs(symmetry_coord - symmetry_plane) near_plane_mask = distance_to_plane < tolerance # Interpolate at ALL reflected points result_reflected = regular_grid_interp( self._grid_axes, data_3d, xi_reflected, self._axis_order ) if result_reflected is not None: # Apply sign flip for perpendicular component if field_name == 'gasv_mesh' and comp == 2: result_reflected = result_reflected * (-1) # Identify NaN values in primary interpolation nan_mask = np.isnan(result_rg) # Strategy 1: Fill NaN values with reflected values valid_reflected = ~np.isnan(result_reflected) fill_mask = nan_mask & valid_reflected result_rg[fill_mask] = result_reflected[fill_mask] # Strategy 2: For points near symmetry plane with both values available, # use average to smooth transition (optional, more robust) both_valid = (~nan_mask) & valid_reflected & near_plane_mask if np.any(both_valid): # Average primary and reflected values for points very close to plane result_rg[both_valid] = 0.5 * (result_rg[both_valid] + result_reflected[both_valid]) # Strategy 3: Fill remaining NaN values in the gap at symmetry plane # This handles cases where data doesn't quite reach the symmetry plane # (e.g., FARGO3D data ending at theta=89.92° instead of 90°) still_nan = np.isnan(result_rg) if np.any(still_nan): # Only fill points very close to symmetry plane (in the gap) gap_points = still_nan & (distance_to_plane < tolerance) if np.any(gap_points): # Use nearest neighbor interpolation ONLY for gap points # This is much better than global nearest: it's a smart local fill from scipy.spatial import cKDTree # Get all valid points (original or reflected) all_valid = ~still_nan if np.any(all_valid): # Build KDTree from valid points valid_coords = xi[all_valid] valid_values = result_rg[all_valid] tree = cKDTree(valid_coords) # Find nearest valid point for each gap point gap_coords = xi[gap_points] distances, indices = tree.query(gap_coords, k=1) # Only fill if nearest point is very close (within tolerance) # This prevents filling with distant values close_enough = distances < tolerance gap_indices = np.where(gap_points)[0] fill_indices = gap_indices[close_enough] if len(fill_indices) > 0: result_rg[fill_indices] = valid_values[indices[close_enough]] return result_rg elif self.dim == 2: # Detect coordinate system coords_sys = self.sim.vars.COORDINATES if hasattr(self.sim, 'vars') else 'spherical' # Determine which plane we're working with # Spherical: theta=const → XY plane, phi=const → XZ plane # Cylindrical: z=const → XY plane, phi=const → RZ plane if coords_sys == 'spherical': if slice_type == "theta": points_array = np.column_stack((cx.ravel(), cy.ravel())) xi = np.column_stack((var1_use.ravel(), var2_use.ravel())) else: # phi or r points_array = np.column_stack((cx.ravel(), cz.ravel())) xi = np.column_stack((var1_use.ravel(), var3_use.ravel())) else: # cylindrical if slice_type == "z": points_array = np.column_stack((cx.ravel(), cy.ravel())) xi = np.column_stack((var1_use.ravel(), var2_use.ravel())) else: # phi or r points_array = np.column_stack((cx.ravel(), cz.ravel())) xi = np.column_stack((var1_use.ravel(), var3_use.ravel())) elif self.dim == 1: points_array = np.sqrt(cx**2 + cy**2 + cz**2) xi = np.asarray(var1_use) # Select field data = row[field_name] # ------------------------------------------- # UNIVERSAL VECTOR COMPONENT SELECTOR # ------------------------------------------- if isinstance(data, np.ndarray) and comp is not None: if data.ndim == 2 and data.shape[0] == 3: data = data[comp] elif data.ndim == 2 and data.shape[1] == 3: data = data[:, comp] elif data.ndim == 3 and data.shape[0] == 3: data = data[comp].ravel() elif data.ndim == 4 and data.shape[0] == 3: data = data[comp].ravel() elif data.ndim == 4 and data.shape[-1] == 3: data = data[..., comp].ravel() else: raise ValueError(f"Cannot extract vector component from {data.shape}") # ------------------------------------------------- # Reflection augmentation # If `reflect=True` we augment the interpolation dataset # by reflecting across the equatorial plane. # # The reflection is done in the ORIGINAL coordinate system # specified by _original_coords (from load_field): # - Cartesian: z -> -z (reflection across z=0) # - Spherical: theta -> π - theta (reflection across equatorial plane) # - Cylindrical: z -> -z (reflection across z=0) # # For vector components, the component perpendicular to the # reflection plane changes sign. # ------------------------------------------------- if reflect: try: # Normalize coords to shape (N, ndim) coords_arr = np.asarray(points_array) ndim = coords_arr.shape[1] if coords_arr.ndim == 2 else 1 coords_orig = coords_arr.reshape(-1, ndim) coords_ref = coords_orig.copy() # Determine which coordinate to reflect based on original system original_system = coords_original # Set earlier in evaluate() if original_system == 'spherical': # For spherical coords (phi, r, theta): # Reflect across equatorial plane: theta -> π - theta if coords_orig.shape[1] == 3: # Index 2 is theta coords_ref[:, 2] = np.pi - coords_ref[:, 2] elif coords_orig.shape[1] == 2: # For 2D slices, check if it's an XZ-like plane # If slice_type is 'phi' (meridional plane), reflect theta if slice_type == 'phi': coords_ref[:, 1] = np.pi - coords_ref[:, 1] # theta coordinate elif original_system == 'cylindrical': # For cylindrical coords (phi, r, z): # Reflect across z=0: z -> -z if coords_orig.shape[1] == 3: # Index 2 is z coords_ref[:, 2] *= -1 elif coords_orig.shape[1] == 2: # For 2D slices, if it's RZ plane (phi slice), flip z if slice_type == 'phi': coords_ref[:, 1] *= -1 # z coordinate else: # cartesian (default) # For cartesian coords (x, y, z): # Reflect across z=0: z -> -z if coords_orig.shape[1] == 3: coords_ref[:, 2] *= -1 elif coords_orig.shape[1] == 2: # Assume (x,z) layout for XZ cuts coords_ref[:, 1] *= -1 # Prepare data values (flattened) data_flat = np.asarray(data).ravel() # For vector components, reflect sign for the component # perpendicular to the reflection plane if field_name == 'gasv_mesh' and comp is not None: # The perpendicular component depends on the coordinate system: # - Cartesian: vz (comp=2) # - Spherical: v_theta (comp=2) # - Cylindrical: vz (comp=2) # In all cases, comp=2 is the perpendicular component if comp == 2: data_ref = -data_flat else: data_ref = data_flat.copy() else: # Scalars or already selected components data_ref = data_flat.copy() # Augment coords and data for interpolation points_array = np.vstack([coords_orig, coords_ref]) data = np.concatenate([data_flat, data_ref]) except Exception: # On error, fallback to original coords/data points_array = np.asarray(points_array) data = np.asarray(data) # Dispatch backend if interpolator == "rbf": return rbf_interp(points_array, data, xi) elif interpolator == "linearnd": return linearnd_interp(points_array, data, xi) elif interpolator == "idw": return idw_interp(points_array, data, xi) else: return griddata_interp(points_array, data, xi) # =============================================================== # TIME INTERPOLATION # =============================================================== if nsnaps == 1: if field == "gasv_mesh": vals = [interp(0, field, c) for c in range(3)] arr = np.array([v.item() if is_scalar else v.reshape(result_shape) for v in vals]) return _smooth(arr) v = interp(0, field) return _smooth(v.item() if is_scalar else v.reshape(result_shape)) # Two snapshots if nsnaps == 2: i0, i1 = 0, 1 t0, t1 = times[i0], times[i1] fac = (time - t0) / (t1 - t0) if abs(t1 - t0) > 1e-12 else 0 fac = np.clip(fac, 0, 1) def blend(c=None): v0 = interp(i0, field, c) v1 = interp(i1, field, c) return (1 - fac) * v0 + fac * v1 if field == "gasv_mesh": vals = [blend(c) for c in range(3)] arr = np.array([v.item() if is_scalar else v.reshape(result_shape) for v in vals]) return _smooth(arr) v = blend() return _smooth(v.item() if is_scalar else v.reshape(result_shape)) # Many snapshots i0 = np.searchsorted(times, time) - 1 i0 = np.clip(i0, 0, nsnaps - 2) i1 = i0 + 1 t0, t1 = times[i0], times[i1] fac = (time - t0) / (t1 - t0) if abs(t1 - t0) > 1e-12 else 0 fac = np.clip(fac, 0, 1) def blend(c=None): v0 = interp(i0, field, c) v1 = interp(i1, field, c) return (1 - fac) * v0 + fac * v1 if field == "gasv_mesh": vals = [blend(c) for c in range(3)] arr = np.array([v.item() if is_scalar else v.reshape(result_shape) for v in vals]) return _smooth(arr) v = blend() return _smooth(v.item() if is_scalar else v.reshape(result_shape))
[docs] def plot(self, t=0, contour_levels=10, component='vz', smoothing_sigma=None, field=None): """ Automatically determines the plane (XY, XZ, or 3D) and plots the field data. Parameters ---------- t : int, optional Index of the snapshot/time to plot. contour_levels : int, optional Number of contour levels for 2D plots. component : str, optional Component to plot for vector fields ('vx', 'vy', 'vz'). field : str or None, optional Which field to plot when multiple fields were loaded (e.g. 'gasdens', 'gasv', 'gasenergy'). If None and exactly one field is present, that field is plotted. If None and multiple candidate fields exist, a ValueError is raised instructing the user to pick one. """ if self.df is None: raise ValueError("No data loaded. Run load_field() first.") if component=='vz': comp = 2 if component=='vy': comp = 1 if component=='vx': comp = 0 df_names = self.df.columns.tolist() # Map short names to dataframe column names field_map = { 'gasdens': 'gasdens_mesh', 'gasv': 'gasv_mesh', 'gasenergy': 'gasenergy_mesh' } # Detect candidate fields present in the DataFrame candidates = [c for c in df_names if c in ('gasdens_mesh', 'gasv_mesh', 'gasenergy_mesh')] # Resolve user-requested field if field is not None: # allow short names or full column names if field in field_map: field_col = field_map[field] else: field_col = field if field_col not in df_names: raise ValueError(f"Requested field '{field}' not present. Available: {candidates}") else: if len(candidates) == 1: field_col = candidates[0] else: raise ValueError( f"Multiple fields present {candidates}. Specify which to plot using field='gasdens' or 'gasv'." ) # Extract the mesh grids and field data after slicing var1 = self.df['var1_mesh'][t] var2 = self.df['var2_mesh'][t] var3 = self.df['var3_mesh'][t] # Load the original field (before slicing) if needed elsewhere d3 = self.sim._load_field_raw('gasdens', snapshot=int(self.df['snapshot'][t]), field_type='scalar') # Prepare field_data according to selected field raw_field = self.df[field_col][t] is_vector = (field_col == 'gasv_mesh') if is_vector: # choose component or compute magnitude if needed data_arr = np.asarray(raw_field) # handle common memory layouts: (3, ... ) or (..., 3) if data_arr.ndim >= 1 and data_arr.shape[0] == 3: # shape (3, N...) -> select component field_data = data_arr[comp] elif data_arr.ndim >= 1 and data_arr.shape[-1] == 3: field_data = data_arr[..., comp] else: # fallback: try to interpret as magnitude try: field_data = np.sqrt(np.sum(data_arr**2, axis=0)) except Exception: field_data = data_arr # do not apply log to vector components else: # scalar field: apply safe log10 for plotting field_data = np.asarray(raw_field) # avoid log of non-positive numbers with np.errstate(divide='ignore'): field_data = np.log10(np.where(field_data <= 0, np.nan, field_data)) # Get the shapes of the resulting mesh grids after applying the slice sliced_shape = var1.shape # Assuming var1, var2, var3 have the same shape after slicing # Detect fixed angles in slice string slice_str = self.slice if hasattr(self, 'slice') and self.slice is not None else "" # Fixed theta: e.g. 'theta=1.56' (not theta=[...]) fixed_theta = re.search(r'theta\s*=\s*([^\[\],]+)', slice_str) fixed_phi = re.search(r'phi\s*=\s*([^\[\],]+)', slice_str) if fixed_theta: plane = 'XY' elif fixed_phi: plane = 'XZ' else: plane = 'XY' # Default/fallback # Check the number of dimensions in the sliced shape if len(sliced_shape) == 3: var1_flat = var1.ravel() var2_flat = var2.ravel() var3_flat = var3.ravel() data = field_data.ravel() fig = plt.figure(figsize=(8, 6)) ax = fig.add_subplot(111, projection='3d') scatter = ax.scatter(var1_flat, var2_flat, var3_flat, c=data, cmap='Spectral_r', s=5) cbar = fig.colorbar(scatter, ax=ax) cbar.ax.tick_params(labelsize=12) if is_vector: cbar.set_label(rf"{component} [units]", size=14) else: cbar.set_label(rf"$\log_{{10}}(field)$", size=14) ax.set_xlabel("X",size=14) ax.set_ylabel("Y",size=14) ax.set_zlabel("Z",size=14) fp.Plot.fargopy_mark(ax) plt.show() elif len(sliced_shape) == 2: # Optional smoothing to remove interpolation artefacts (triangular edges) if smoothing_sigma is not None: try: field_data = gaussian_filter(field_data, sigma=smoothing_sigma) except Exception: # If smoothing fails, fall back to original data field_data = field_data if plane == 'XY': fig, ax = plt.subplots(figsize=(8, 6)) mesh = ax.pcolormesh(var1, var2, field_data, shading='auto', cmap='Spectral_r') cbar = fig.colorbar(mesh) cbar.ax.tick_params(labelsize=12) if is_vector: cbar.set_label(rf"{component} [units]", size=14) else: cbar.set_label(rf"$\log_{{10}}(field)$", size=14) ax.set_xlabel("X",size=14) ax.set_ylabel("Y",size=14) ax.tick_params(axis='both', which='major', labelsize=12) fp.Plot.fargopy_mark(ax) plt.show() elif plane == 'XZ': fig, ax = plt.subplots(figsize=(8, 6)) mesh = ax.pcolormesh(var1, var3, field_data, shading='auto', cmap='Spectral_r') cbar = fig.colorbar(mesh) cbar.ax.tick_params(labelsize=12) if is_vector: cbar.set_label(rf"{component} [units]", size=14) else: cbar.set_label(rf"$\log_{{10}}(field)$", size=14) ax.set_xlabel("X",size=14) ax.set_ylabel("Z",size=14) ax.tick_params(axis='both', which='major', labelsize=12) fp.Plot.fargopy_mark(ax) plt.show() else: fig, ax = plt.subplots(figsize=(8, 6)) mesh = ax.pcolormesh(var1, var2, field_data, shading='auto', cmap='Spectral_r') cbar = fig.colorbar(mesh) cbar.ax.tick_params(labelsize=12) cbar.set_label(rf"$\log_{{10}}(field)$", size=14) ax.set_xlabel("X",size=14) ax.set_ylabel("Y",size=14) ax.tick_params(axis='both', which='major', labelsize=12) fp.Plot.fargopy_mark(ax) plt.show()
[docs] def cut_sector(self, xc, yc, zc, rc, hc, dataframe=None): """ Filter the DataFrame to keep only data inside a cylinder of radius rc and height hc centered at (xc, yc, zc). Returns a new filtered DataFrame. Parameters ---------- xc, yc, zc : float Center coordinates of the cylinder. rc : float Cylinder radius. hc : float Cylinder height. dataframe : pd.DataFrame, optional DataFrame to filter (default: self.df). Returns ------- pd.DataFrame Filtered DataFrame with only points inside the cylinder. """ if dataframe is None: if self.df is None: raise ValueError("No DataFrame loaded. Run load_data() first or pass a DataFrame.") dataframe = self.df df = dataframe.copy() # Assume mesh columns are named 'var1_mesh', 'var2_mesh', 'var3_mesh' mask_list = [] for idx, row in df.iterrows(): x = np.array(row['var1_mesh']) y = np.array(row['var2_mesh']) z = np.array(row['var3_mesh']) # Compute boolean mask selecting points inside the cylinder r_xy = np.sqrt((x - xc)**2 + (y - yc)**2) z_min = zc - hc/2 z_max = zc + hc/2 mask = (r_xy <= rc) & (z >= z_min) & (z <= z_max) # Si el campo es escalar filtered = {} filtered['snapshot'] = row['snapshot'] filtered['time'] = row['time'] filtered['var1_mesh'] = x[mask] filtered['var2_mesh'] = y[mask] filtered['var3_mesh'] = z[mask] # Filter the corresponding field columns for col in df.columns: if col.endswith('_mesh') and col not in ['var1_mesh', 'var2_mesh', 'var3_mesh']: data = np.array(row[col]) filtered[col] = data[mask] mask_list.append(filtered) # Convert the list of dicts to a DataFrame filtered_df = pd.DataFrame(mask_list) return filtered_df