###############################################################
# FARGOpy interdependencies
###############################################################
import fargopy
###############################################################
# Required packages
###############################################################
import os
import json
import numpy as np
import re
import subprocess
import time
import gdown
import os
import signal
import ipywidgets as widgets
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
from pathlib import Path
###############################################################
# Constants
###############################################################
KB = 1.380650424e-16 # Boltzmann constant: erg/K, erg = g cm^2 / s^2
MP = 1.672623099e-24 # Mass of the proton, g
GCONST = 6.67259e-8 # Gravitational constant, cm^3/g/s^2
RGAS = 8.314472e7 # Gas constant, erg/K/mol
MSUN = 1.9891e33 # g
AU = 1.49598e13 # cm
YEAR = 31557600.0 # s
PRECOMPUTED_BASEURL = "https://docs.google.com/uc?export=download&id="
PRECOMPUTED_SIMULATIONS = dict(
# Download link: https://drive.google.com/file/d/1YXLKlf9fCGHgLej2fSOHgStD05uFB2C3/view?usp=drive_link
fargo=dict(
id="1YXLKlf9fCGHgLej2fSOHgStD05uFB2C3",
description="""Protoplanetary disk with a Jovian planet [2D]""",
size=55,
),
# Download link: https://drive.google.com/file/d/1KMp_82ylQn3ne_aNWEF1T9ElX2aWzYX6/view?usp=drive_link
p3diso=dict(
id="1KMp_82ylQn3ne_aNWEF1T9ElX2aWzYX6",
description="""Protoplanetary disk with a Super earth planet [3D]""",
size=220,
),
# Download link: https://drive.google.com/file/d/1Xzgk9qatZPNX8mLmB58R9NIi_YQUrHz9/view?usp=sharing
p3disoj=dict(
id="1Xzgk9qatZPNX8mLmB58R9NIi_YQUrHz9",
description="""Protoplanetary disk with a Jovian planet [3D]""",
size=84,
),
# Download link: https://drive.google.com/file/d/1KSQyxH_kbAqHQcsE30GQFRVgAPhMAcp7/view?usp=drive_link
fargo_multifluid=dict(
id="1KSQyxH_kbAqHQcsE30GQFRVgAPhMAcp7",
description="""Protoplanetary disk with several fluids (dust) and a Jovian planet in 2D""",
size=100,
),
# Download link: https://drive.google.com/file/d/12ZWoQS_9ISe6eDij5KWWbqR-bHyyVs2N/view?usp=drive_link
binary=dict(
id="12ZWoQS_9ISe6eDij5KWWbqR-bHyyVs2N",
description="""Disk around a binary with the properties of Kepler-38 in 2D""",
size=140,
),
# Download link: https://drive.google.com/file/d/1veMZ1t03UwUTN-fJ3ZaT7vuOSogHJHzM/view?usp=drive_link
pds70iso=dict(
id="1veMZ1t03UwUTN-fJ3ZaT7vuOSogHJHzM",
description="""PDS-70c - isothermal protoplanetary disk and circumplanetary disk in [3D]""",
size=2940,
),
)
if not fargopy.IN_COLAB:
# SIGCHLD is only available on Unix-like systems (Linux, macOS)
if hasattr(signal, "SIGCHLD"):
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
###############################################################
# Classes
###############################################################
[docs]
class Simulation(fargopy.Fargobj):
"""Manage a FARGO3D simulation and its filesystem, execution and I/O.
The ``Simulation`` object centralizes configuration, units, setup
discovery, compilation and runtime control for a FARGO3D case. It
stores derived paths (setup directory, output directory), unit
conversion factors and exposes convenience methods to compile,
launch and inspect simulation outputs.
Attributes
----------
fargo3d_dir : str
Base directory of FARGO3D installation.
outputs_dir : str
Directory where outputs are stored.
setups_dir : str
Directory where setups are located.
setup : str
Active setup name.
fargo3d_binary : str
Path to the compiled binary.
units_system : str
Unit system ('CGS' or 'MKS').
Parameters
----------
**kwargs : dict
Configuration keywords. Typical keys include ``setup``,
``fargo3d_dir``, ``output_dir`` and ``load``. When ``load=True``
the object attempts to read serialized metadata from the
specified setup directory.
Examples
--------
Create a simulation object:
>>> import fargopy as fp
>>> sim = fp.Simulation()
Select a setup:
>>> sim.set_setup('fargo')
Compile the simulation:
>>> sim.compile(parallel=0, gpu=0)
Run the simulation (clean run):
>>> sim.run(cleanrun=True)
Check status:
>>> sim.status()
Stop simulation:
>>> sim.stop()
"""
def __init__(self, **kwargs):
"""Create and configure the simulation object.
The constructor initializes unit systems, default paths and
optionally loads previously saved simulation metadata when the
``load`` flag is provided in ``kwargs``.
Parameters
----------
**kwargs : dict
Same as described in the class-level docstring. When
``load=True`` the object expects a valid ``setup`` name and
a FARGO3D base directory to locate a
``fargopy_simulation.json`` file to restore state.
"""
super().__init__(**kwargs)
# Default to CGS units
self.units_system = "CGS"
self._set_constants_cgs()
# Load simulation configuration from a file
if ("load" in kwargs.keys()) and kwargs["load"]:
if not "setup" in kwargs.keys():
raise AssertionError(f"You must provide a setup name.")
else:
setup = kwargs["setup"]
if "fargo3d_dir" in kwargs.keys():
fargo3d_dir = kwargs["fargo3d_dir"]
else:
fargo3d_dir = fargopy.Conf.FP_FARGO3D_DIR
# Load simulation
load_from = os.path.join(fargo3d_dir, "setups", setup)
if not os.path.isdir(load_from):
print(f"Directory for loading simulation '{load_from}' not found.")
json_file = os.path.join(load_from, "fargopy_simulation.json")
print(f"Loading simulation from '{json_file}'")
if not os.path.isfile(json_file):
print(f"Loading data '{json_file}' not found.")
with open(json_file) as file_handler:
attributes = json.load(file_handler)
# Check if there are not serializable items and set to None
for key, item in attributes.items():
if item == "<not serializable>":
attributes[key] = None
# Update the object
self.__dict__.update(attributes)
# Set the directories
self.set_fargo3d_dir(self.fargo3d_dir)
self.set_setup(self.setup)
self.set_output_dir(self.output_dir)
return
# Set units by default
self.set_units(UL=AU, UM=MSUN)
# Set properties
self.set_property(
"fargo3d_dir", fargopy.Conf.FP_FARGO3D_DIR, self.set_fargo3d_dir
)
self.set_property("setup", None, self.set_setup)
self.set_property("output_dir", None, self.set_output_dir)
self.set_property(
"fargo3d_compilation_options", dict(parallel=0, gpu=0, options="")
)
# Check if binary is already compiled
fargo3d_binary, compile_options = self._generate_binary_name(
parallel=self.fargo3d_compilation_options["parallel"],
gpu=self.fargo3d_compilation_options["gpu"],
options=self.fargo3d_compilation_options["options"],
)
if os.path.isfile(f"{self.fargo3d_dir}/{fargo3d_binary}"):
print(f"FARGO3D binary '{fargo3d_binary}' found.")
self.fargo3d_binary = fargo3d_binary
else:
self.fargo3d_binary = None
# Simulation process does not exist
self.set_property("fargo3d_process", None)
# ##########################################################################
# Set special properties
# #########################################a#################################
[docs]
def set_fargo3d_dir(self, dir=None):
"""Set the base FARGO3D installation directory.
Parameters
----------
dir : str or None
Path to the FARGO3D installation. When ``None`` the method
is a no-op.
Returns
-------
None
The method does not return meaningful data; it updates
internal path attributes and prints diagnostics.
"""
if dir is None:
return
if not os.path.isdir(dir):
print(f"FARGO3D directory '{dir}' does not exist.")
return
else:
fargo_header = os.path.join(dir, fargopy.Conf.FP_FARGO3D_HEADER)
if not os.path.isfile(fargo_header):
print(f"No header file for FARGO3D found in '{fargo_header}'")
else:
print(f"Your simulation is now connected with '{dir}'")
# Set derivative dirs
self.fargo3d_dir = dir
self.outputs_dir = os.path.join(self.fargo3d_dir, "outputs")
self.setups_dir = os.path.join(self.fargo3d_dir, "setups")
[docs]
def set_setup(self, setup):
"""Associate the simulation with a named setup directory.
Parameters
----------
setup : str or None
Setup name present under the configured ``setups_dir``. If
``None`` the setup association is cleared.
Returns
-------
str or None
The assigned setup name on success, otherwise ``None``.
"""
if setup is None:
self.setup_dir = None
return None
setup_dir = os.path.join(self.setups_dir, setup)
if self.set_setup_dir(setup_dir):
self.setup = setup
self.logfile = os.path.join(self.setup_dir, f"{self.setup}.log")
return setup
[docs]
def set_setup_dir(self, dir):
"""Set the absolute path to a setup directory and validate it.
Parameters
----------
dir : str
Filesystem path to the setup directory.
Returns
-------
bool
``True`` when the directory exists and is accepted,
otherwise ``False``.
"""
if dir is None:
return False
if not os.path.isdir(dir):
print(f"Setup directory '{dir}' does not exist.")
return False
else:
print(f"Now your simulation setup is at '{dir}'")
self.setup_dir = dir
return True
[docs]
def set_output_dir(self, dir):
"""Set the output directory where FARGO3D stores run results.
Parameters
----------
dir : str or None
Output directory path. When present the method attempts to
load simulation properties from a ``variables.par`` file.
"""
if dir is None:
return
if not os.path.isdir(dir):
print(f"Output directory '{dir}' does not exist.")
return
else:
print(f"Now you are connected with output directory '{dir}'")
self.output_dir = dir
# Check if there are results
par_file = os.path.join(dir, "variables.par")
if os.path.isfile(par_file):
print(f"Found a variables.par file in '{dir}', loading properties")
self.load_properties()
return
################################
## UNITS
################################
[docs]
def units(self, system):
"""Switch the simulation unit system between ``'CGS'`` and
``'MKS'``.
Parameters
----------
system : {'CGS','MKS'}
Unit system identifier.
"""
if system.upper() == "CGS":
self._set_constants_cgs()
self.units_system = "CGS"
print("Units set to CGS.")
elif system.upper() == "MKS":
self._set_constants_mks()
self.units_system = "MKS"
print("Units set to MKS.")
else:
raise ValueError("Invalid units system. Use 'CGS' or 'MKS'.")
def _set_constants_cgs(self):
"""Configure physical constants for the CGS unit system."""
self.KB = 1.380650424e-16 # Boltzmann constant: erg/K
self.MP = 1.672623099e-24 # Mass of the proton, g
self.GCONST = 6.67259e-8 # Gravitational constant, cm^3/g/s^2
self.RGAS = 8.314472e7 # Gas constant, erg/K/mol
self.MSUN = 1.9891e33 # Solar mass, g
self.AU = 1.49598e13 # Astronomical unit, cm
self.YEAR = 31557600.0 # Year, s
def _set_constants_mks(self):
"""Configure physical constants for the MKS unit system."""
self.KB = 1.380649e-23 # Boltzmann constant: J/K
self.MP = 1.6726219e-27 # Mass of the proton, kg
self.GCONST = 6.67430e-11 # Gravitational constant, m^3/kg/s^2
self.RGAS = 8.314462618 # Gas constant, J/K/mol
self.MSUN = 1.9891e30 # Solar mass, kg
self.AU = 1.49598e11 # Astronomical unit, m
self.YEAR = 31557600.0 # Year, s
[docs]
def set_units(self, UM=MSUN, UL=AU, G=1, mu=2.35):
"""Define simulation base units and derived unit scales.
Parameters
----------
UM : float
Mass unit (default: MSUN).
UL : float
Length unit (default: AU).
G : float
Dimensionless gravitational constant (default: 1).
mu : float
Mean molecular weight used to compute temperature units.
"""
# Basic
self.UM = UM
self.UL = UL
self.G = G
self.UT = (G * self.UL**3 / (GCONST * self.UM)) ** 0.5 # In seconds
# Thermodynamics
self.UTEMP = (GCONST * MP * mu / KB) * self.UM / self.UL # In K
# Derivative
self.USIGMA = self.UM / self.UL**2 # In g/cm^2
self.URHO = self.UM / self.UL**3 # In kg/m^3
self.UEPS = self.UM / (self.UL * self.UT**2) # In J/m^3
self.UV = self.UL / self.UT
# ##########################################################################
# Control methods
# ##########################################################################
[docs]
def compile(self, setup=None, parallel=0, gpu=0, options="", force=False):
"""Compile the FARGO3D executable for the active setup.
Parameters
----------
setup : str, optional
Setup name to compile. When provided the method will try to
set the setup before compiling.
parallel : int, optional
Parallel compilation flag (default: 0).
gpu : int, optional
GPU-enabled compilation flag (default: 0).
options : str, optional
Additional make options passed to the build system.
force : bool, optional
If ``True`` perform a clean before building.
Examples
--------
>>> sim.compile(parallel=1, gpu=0)
"""
if setup is not None:
if not self.set_setup(setup):
print("Failed")
return
# Clean directrory
if force:
print(f"Cleaning FARGO3D directory {self.fargo3d_dir}...")
cmd = f"make -C {self.fargo3d_dir} clean mrproper"
# Clean all binaries
compl = f"rm -rf {self.fargo3d_dir}/fargo3d_*"
error, output_clean = fargopy.Sys.run(cmd + "&&" + compl)
# Prepare compilation
fargo3d_binary, compile_options = self._generate_binary_name(
parallel=parallel, gpu=gpu, options=options
)
# compile_options = f"SETUP={self.setup} PARALLEL={parallel} GPU={gpu} "+options
# fargo3d_binary = f"fargo3d-{compile_options.replace(' ','-').replace('=','_').strip('-')}"
# Compile binary
print(f"Compiling {fargo3d_binary}...")
cmd = f"cd {self.fargo3d_dir};make {compile_options} 2>&1 |tee {self.setup_dir}/compilation.log"
compl = f"mv fargo3d {fargo3d_binary}"
pipe = f""
error, output_compilation = fargopy.Sys.run(cmd + " && " + compl)
# Check compilation result
if os.path.isfile(f"{self.fargo3d_dir}/{fargo3d_binary}"):
self.fargo3d_binary = fargo3d_binary
print(f"Succesful compilation of FARGO3D binary {self.fargo3d_binary}")
self.fargo3d_compilation_options = dict(
parallel=parallel, gpu=gpu, options=options
)
else:
print(
f"Something failed when compiling FARGO3D. For details check '{self.setup_dir}/compilation.log"
)
def _generate_binary_name(self, parallel=0, gpu=0, options=""):
"""Produce the target binary filename and the make options.
Parameters
----------
parallel : int
Parallel compilation flag.
gpu : int
GPU compilation flag.
options : str
Extra options appended to the make command.
Returns
-------
tuple
``(binary_name, compile_options)`` where ``binary_name`` is
the filename to expect after a successful build and
``compile_options`` is the command fragment passed to
``make``.
"""
compile_options = f"SETUP={self.setup} PARALLEL={parallel} GPU={gpu} " + options
fargo3d_binary = (
f"fargo3d-{compile_options.replace(' ', '-').replace('=', '_').strip('-')}"
)
return fargo3d_binary, compile_options
[docs]
def run(
self,
mode="async",
options="-m",
mpioptions="-np 1",
resume=False,
cleanrun=False,
test=False,
unlock=True,
):
"""Run the FARGO3D simulation.
Parameters
----------
mode : str, optional
'async' for asynchronous or 'sync' for synchronous execution.
options : str, optional
Command-line options for FARGO3D.
mpioptions : str, optional
MPI options for parallel runs.
resume : bool, optional
Resume from previous run.
cleanrun : bool, optional
Clean output directory before running.
test : bool, optional
If True, do not actually run the simulation.
unlock : bool, optional
If True, unlock the simulation if locked.
Examples
--------
Run asynchronously:
>>> sim.run(cleanrun=True)
Run synchronously:
>>> sim.run(mode='sync', cleanrun=True)
"""
if self.fargo3d_binary is None:
print(
"You must first compile your simulation with: <simulation>.compile(<option>)."
)
return
if self._is_running():
print(f"There is a running process. Please stop it before running/resuming")
return
lock_info = fargopy.Sys.is_locked(self.setup_dir)
if lock_info:
print(f"The process is locked by PID {lock_info['pid']}")
return
# Mandatory options
options = options + " -t"
if "fargo3d_run_options" not in self.__dict__.keys():
self.fargo3d_run_options = options
# Clean output if available
if cleanrun:
# Check if there is an output director
output_dir = os.path.join(self.outputs_dir, self.setup)
if os.path.isdir(output_dir):
self.output_dir = output_dir
self.clean_output()
else:
print(f"No output directory {output_dir} yet created.")
# Select command to run
precmd = ""
if self.fargo3d_compilation_options["parallel"]:
precmd = f"mpirun {mpioptions} "
# Preparing command
run_cmd = f"{precmd} ./{self.fargo3d_binary} {options} setups/{self.setup}/{self.setup}.par"
self.json_file = os.path.join(self.setup_dir, "fargopy_simulation.json")
if mode == "sync":
# Save object
self.save_object(self.json_file)
# Run synchronously
cmd = f"cd {self.fargo3d_dir};{run_cmd} |tee {self.logfile}"
print(f"Running synchronously: {cmd}")
fargopy.Sys.simple(cmd)
self.fargo3d_process = None
elif mode == "async":
# Run asynchronously
# Select logfile mode accroding to if the process is resuming
logmode = "a" if resume else "w"
logfile_handler = open(self.logfile, logmode)
# Launch process
print(f"Running asynchronously (test = {test}): {run_cmd}")
if not test:
# Check it is not locked
lock_info = fargopy.Sys.is_locked(dir=self.setup_dir)
if lock_info:
if unlock:
self.unlock_simulation(lock_info)
else:
print(
f"Output directory {self.setup_dir} is locked by a running process"
)
return
process = subprocess.Popen(
run_cmd.split(),
cwd=self.fargo3d_dir,
stdout=logfile_handler,
stderr=logfile_handler,
close_fds=True,
)
# Introduce a short delay to verify if the process has failed
time.sleep(1.0)
if process.poll() is None:
# Check if program is effectively running
self.fargo3d_process = process
# Lock
fargopy.Sys.lock(
dir=self.setup_dir, content=dict(pid=self.fargo3d_process.pid)
)
# Setup output directory
self.set_output_dir(os.path.join(self.outputs_dir, self.setup))
# Save object
self.save_object(self.json_file)
else:
print(
f"Process running failed. Please check the logfile {self.logfile}"
)
else:
print(f"Mode {mode} not recognized (valid are 'sync', 'async')")
return
[docs]
def stop(self, verbose=False):
"""Stop a running FARGO3D process and release the lock on the setup.
If a process is associated with the simulation the method attempts
to terminate it and then unlock the setup directory. If no
process handler is available the method simply tries to remove
any filesystem lock.
"""
# Check if the directory is locked
lock_info = fargopy.Sys.is_locked(self.setup_dir)
if lock_info:
print(f"The process is locked by PID {lock_info['pid']}")
# If not process associated unlock the simulation
if not self._check_process():
self.unlock_simulation(lock_info)
return
poll = self.fargo3d_process.poll()
if poll is None:
print(f"Stopping FARGO3D process (pid = {self.fargo3d_process.pid})")
subprocess.Popen.kill(self.fargo3d_process)
self.unlock_simulation(lock_info)
del self.fargo3d_process
self.fargo3d_process = None
else:
self.unlock_simulation(lock_info)
print(f"The process has finished. Check logfile {self.logfile}.")
[docs]
def unlock_simulation(self, lock_info=None, force=True, verbose=False):
"""Remove a simulation lock and optionally kill the owning PID.
Parameters
----------
lock_info : dict or None
Lock metadata as returned by ``fargopy.Sys.is_locked``. If
``None`` the method will attempt to discover the lock for
the active setup directory.
force : bool, optional
When ``True`` attempt to forcibly terminate the process
owning the lock (``kill -9``) before releasing the lock.
"""
if lock_info is None and force:
lock_info = fargopy.Sys.is_locked(self.setup_dir)
if lock_info:
print(f"Unlocking simulation (pid = {lock_info['pid']})")
if lock_info:
pid = lock_info["pid"]
fargopy.Debug.trace(f"Unlocking simulation (pid = {pid})")
error, output = fargopy.Sys.run(f"kill -9 {pid}")
fargopy.Sys.unlock(self.setup_dir)
[docs]
def status(self, mode="isrunning", verbose=True, **kwargs):
"""Report process, logfile and output status for the simulation.
Parameters
----------
mode : {'isrunning','logfile','outputs','progress','summary','locking','all'}, optional
Which status block to display. ``'all'`` prints every
section.
verbose : bool, optional
When ``True`` print human-readable diagnostics to stdout.
**kwargs : dict
Backend-specific options used by certain modes (for
instance ``numstatus`` for the ``'progress'`` mode).
"""
# Bar separating output
bar = f"\n{''.join(['#'] * 80)}\n"
# vprint
vprint = print if verbose else lambda x: x
if "isrunning" in mode or mode == "all":
vprint(bar + "Running status of the process:")
if self.fargo3d_process:
poll = self.fargo3d_process.poll()
if poll is None:
vprint("\tThe process is running.")
else:
# Unlock any remaining process
lock_info = fargopy.Sys.is_locked(self.setup_dir)
self.unlock_simulation(lock_info)
vprint(f"\tThe process has ended with termination code {poll}.")
else:
vprint(f"\tThe process is stopped.")
if "logfile" in mode or mode == "all":
vprint(bar + "Logfile content:")
if "logfile" in self.__dict__.keys() and os.path.isfile(self.logfile):
vprint("The latest 10 lines of the logfile:\n")
if verbose:
os.system(f"tail -n 10 {self.logfile}")
else:
vprint("No log file created yet")
if "outputs" in mode or mode == "all":
vprint(bar + "Output content:")
error, output = fargopy.Sys.run(f"ls {self.output_dir}/*.dat")
if not error:
files = [file.split("/")[-1] for file in output[:-1]]
file_list = ""
for i, file in enumerate(files):
file_list += f"{file}, "
if ((i + 1) % 10) == 0:
file_list += "\n"
file_list = file_list.strip("\n,")
vprint(f"\n{len(files)} available datafiles:\n")
self.output_datafiles = files
vprint(file_list)
else:
vprint("No datafiles yet available")
if "summary" in mode or mode == "all":
vprint(bar + "Summary:")
nsnaps = self._get_nsnaps()
print(
f"The simulation has been ran for {nsnaps} time-steps (including the initial one)."
)
if "locking" in mode or mode == "all":
vprint(bar + "Locking status:")
lock_info = fargopy.Sys.is_locked(self.setup_dir)
print(lock_info)
if "progress" in mode:
vprint(bar)
numstatus = 5
if "numstatus" in kwargs.keys():
numstatus = int(kwargs["numstatus"])
self._status_progress(numstatus=numstatus)
print(
f"\nOther status modes: 'isrunning', 'logfile', 'outputs', 'progress', 'summary'"
)
def _status_progress(self, minfreq=0.1, numstatus=100):
"""Display a live progress summary by tailing the simulation log.
The routine periodically reads the simulation logfile and prints
snapshot progress information until the process stops or the
requested number of updates is reached.
Parameters
----------
minfreq : float, optional
Minimum polling interval in seconds.
numstatus : int, optional
Maximum number of status frames to emit.
"""
# Prepare
if "status_frequency" not in self.__dict__.keys():
frequency = minfreq
else:
frequency = self.status_frequency
previous_output = ""
previous_resumable_snapshot = 1e100
time_previous = time.time()
# Infinite loop checking for output
n = 0
print(f"Progress of the simulation (interrupt by pressing the stop button):")
while True and (n < numstatus):
# Check if the process is running locally
if not self._is_running():
print("The simulation is not running anymore")
return
error, output = fargopy.Sys.run(f"grep OUTPUT {self.logfile} |tail -n 1")
if not error:
# Get the latest output
latest_output = output[-2]
if latest_output != previous_output:
print(
f"{n + 1}:{latest_output} [output pace = {frequency:.1f} secs]"
)
n += 1
# Fun the number of the output
find = re.findall(r"OUTPUTS\s+(\d+)", latest_output)
resumable_snapshot = int(find[0])
# Get the time elapsed since last status check
time_now = time.time()
frequency = max(time_now - time_previous, minfreq) / 2
if (resumable_snapshot - previous_resumable_snapshot) > 1:
# Reduce frequency if snapshots are accelerating
frequency = frequency / 2
self.status_frequency = frequency
previous_resumable_snapshot = resumable_snapshot
time_previous = time_now
previous_output = latest_output
try:
time.sleep(frequency)
except KeyboardInterrupt:
print("Interrupted by user.")
return
[docs]
def resume(self, snapshot=-1, mpioptions="-np 1"):
"""Resume an interrupted simulation from the specified snapshot.
Parameters
----------
snapshot : int, optional
Snapshot index to resume from. Use ``-1`` to resume from
the most recent resumable snapshot.
mpioptions : str, optional
MPI launch options for parallel executions.
"""
latest_snapshot_resumable = self._is_resumable()
if latest_snapshot_resumable < 0:
return
if self._is_running():
print(f"There is a running process. Please stop it before resuming")
return
if "fargo3d_run_options" not in self.__dict__.keys():
print(f"The process has not been run before.")
return
# Resume
if snapshot < 0:
snapshot = latest_snapshot_resumable
print(f"Resuming from snapshot {snapshot}...")
self.run(
mode="async",
mpioptions=mpioptions,
resume=True,
options=self.fargo3d_run_options + f" -S {snapshot}",
test=False,
)
def _has_finished(self):
"""Return ``True`` if the associated FARGO3D process has exited."""
if self.fargo3d_process:
poll = self.fargo3d_process.poll()
if poll is None:
return False
else:
print(f"The process has ended with termination code {poll}.")
return True
def _is_resumable(self):
"""Determine the latest snapshot index from which the simulation can resume.
Returns
-------
int
Index of the latest resumable snapshot, or ``-1`` when the
simulation is not resumable or no outputs are present.
"""
if self.logfile is None:
print(
f"The simulation has not been ran yet. Run <simulation>.run() before resuming"
)
return -1
latest_snapshot_resumable = max(self._get_nsnaps() - 2, 0)
return latest_snapshot_resumable
[docs]
def clean_output(self):
"""Remove all files from the configured output directory."""
if self.output_dir is None:
print(f"Output directory has not been set.")
return
if self._is_running():
print(f"There is a running process. Please stop it before cleaning")
return
print(f"Cleaning output directory {self.output_dir}")
cmd = f"rm -rf {self.output_dir}/*"
error, output = fargopy.Sys.run(cmd)
def _is_running(self, verbose=False):
"""Return ``True`` when a live FARGO3D process is detected.
Parameters
----------
verbose : bool, optional
When ``True`` print diagnostic messages.
Returns
-------
bool
``True`` if a running process is associated with the
simulation, otherwise ``False``.
"""
lock_info = fargopy.Sys.is_locked(self.setup_dir)
if lock_info:
# Check if process is up
error, output = fargopy.Sys.run(f"ps -p {lock_info['pid']}")
if error == 0:
return True
if not self.has("fargo3d_process"):
if verbose:
print("The simulation has not been run before.")
return False
if self.fargo3d_process:
poll = self.fargo3d_process.poll()
if poll is None:
if verbose:
print(
f"The process is already running with pid '{self.fargo3d_process.pid}'"
)
return True
else:
return False
else:
return False
def _check_process(self):
"""Return ``True`` if a process handler is present for the run."""
if self.fargo3d_process is None:
print(f"There is no FARGO3D process handler available.")
return False
return True
# ##########################################################################
# Operations on the FARGO3D directories
# ##########################################################################
[docs]
def list_fields(self, quiet=False):
"""Return a list of data file names present in the output directory.
Parameters
----------
quiet : bool, optional
When ``True`` avoid printing the detailed file list.
Returns
-------
list
Filenames found in the output directory.
"""
if self.output_dir is None:
print(
f"You have to set forst the outputs directory with <sim>.set_outputs('<directory>')"
)
else:
error, output = fargopy.Sys.run(f"ls {self.output_dir}")
if error == 0:
files = output[:-1]
print(f"{len(files)} files in output directory")
if not quiet:
file_list = ""
for i, file in enumerate(files):
file_list += f"{file}, "
if ((i + 1) % 10) == 0:
file_list += "\n"
print(file_list)
return files
[docs]
def load_macros(self, summaryfile="summary0.dat"):
"""Parse the PREPROCESSOR MACROS SECTION from a summary file.
Parameters
----------
summaryfile : str, optional
Summary filename relative to the output directory (default
``'summary0.dat'``).
Returns
-------
dict
Mapping of macro names to parsed values. Returns an empty
dict when the file is missing or parsing fails.
"""
summary_path = os.path.join(self.output_dir, summaryfile)
if not os.path.isfile(summary_path):
print(f"No summary file '{summary_path}' found.")
return {}
macros = {}
in_macros_section = False
with open(summary_path, "r") as f:
for line in f:
# Detect the start of the macros section
if "PREPROCESSOR MACROS SECTION" in line:
in_macros_section = True
continue
# Stop if another section starts
if (
in_macros_section
and "SECTION" in line
and "PREPROCESSOR MACROS SECTION" not in line
):
break
if in_macros_section:
# Skip empty lines and lines that don't contain '='
if (
line.strip() == ""
or "=" not in line
or line.strip().startswith("=")
or line.strip().startswith("#")
):
continue
# Extract parameter name and value after the last '='
parts = line.split("=")
if len(parts) >= 2:
key = parts[0].strip()
value_str = parts[-1].strip()
# Try to convert to float if possible
try:
value = float(value_str)
except ValueError:
value = value_str
macros[key] = value
self.simulation_macros = macros
return macros
[docs]
def load_planets(self, snapshot=0, summaryfile=None):
"""
Load planet data from a summary file.
Parameters
----------
snapshot : int, optional
Snapshot index to load.
summaryfile : str, optional
Name of the summary file.
Returns
-------
list of Planet
List of Planet objects for the given snapshot.
"""
import os
# Determine summary file
if summaryfile is None:
summaryfile = f"summary{snapshot}.dat"
summary_path = os.path.join(self.output_dir, summaryfile)
if not os.path.isfile(summary_path):
print(f"No summary file '{summary_path}' found.")
return []
# Get stellar mass
if (
not hasattr(self, "simulation_macros")
or "MSTAR" not in self.simulation_macros
):
self.load_macros()
mstar = self.simulation_macros["MSTAR"]
# Parse initial positions from the top of the file (if available)
initial_planets = []
with open(summary_path, "r") as f:
lines = f.readlines()
for line in lines:
if (
line.strip().startswith("#")
or not line.strip()
or set(line.strip()) == set("-")
):
continue
parts = line.split()
if len(parts) >= 3:
try:
float(parts[0])
continue # skip if first field is a number
except ValueError:
pass
try:
name = parts[0]
x0 = float(parts[1])
y0 = float(parts[2]) if len(parts) > 3 else 0.0
z0 = 0.0
initial_planets.append({"name": name, "posi": (x0, y0, z0)})
except Exception:
continue
# Find PLANETARY SYSTEM SECTION
planet_indices = []
in_section = False
for i, line in enumerate(lines):
if "PLANETARY SYSTEM SECTION" in line:
in_section = True
continue
if in_section and line.strip().startswith("#### Planet"):
planet_indices.append(i + 1) # next line has the data
if in_section and line.strip().startswith("***"):
break
planets = []
for idx, data_idx in enumerate(planet_indices):
data_line = lines[data_idx].strip()
parts = data_line.split()
if len(parts) < 7:
continue
x, y, z = map(float, parts[0:3])
vx, vy, vz = map(float, parts[3:6])
mass = float(parts[-1]) # Always take the last column as mass
# Get initial position if available
if idx < len(initial_planets):
name = initial_planets[idx]["name"]
posi = initial_planets[idx]["posi"]
else:
name = f"Planet {idx}"
posi = (x, y, z)
planet = Planet(
name=name,
pos=(x, y, z),
vel=(vx, vy, vz),
mass=mass,
posi=posi,
mstar=mstar,
)
planets.append(planet)
return planets
[docs]
def load_properties(
self,
quiet=False,
varfile="variables.par",
domain_prefix="domain_",
dimsfile="dims.dat",
summaryfile="summary0.dat",
):
"""
Load and print simulation properties, including variables, domains, dimensions, and planets.
Parameters
----------
quiet : bool, optional
If True, suppress output.
varfile : str, optional
Name of the variables file.
domain_prefix : str, optional
Prefix for domain files.
dimsfile : str, optional
Name of the dimensions file.
summaryfile : str, optional
Name of the summary file.
Returns
-------
str
Summary information string.
"""
info = []
if self.output_dir is None:
msg = f"You have to set first the outputs directory with <sim>.set_outputs('<directory>')"
print(msg)
return None
# Read variables
vars = self._load_variables(varfile)
if not vars:
print("No variables found.")
return None
info.append(f"Simulation in {vars.DIM} dimensions")
print(f"Simulation in {vars.DIM} dimensions")
# Read domains
domains = self._load_domains(vars, domain_prefix)
# Store the variables in the object
self.vars = vars
self.domains = domains
# Optionally read dims
dims = self._load_dims(dimsfile)
if len(dims):
self.dims = dims
# Read the summary files
self.nsnaps = self._get_nsnaps()
info.append(f"Number of snapshots in output directory: {self.nsnaps}")
print(f"Number of snapshots in output directory: {self.nsnaps}")
# Read planets from summary.dat using load_planet_states
self.planets = self.load_planets(snapshot=0, summaryfile=summaryfile)
if self.planets:
info.append("Planets found in summary.dat")
print("Planets found in summary.dat")
for planet in self.planets:
planet_info = f" Name: {planet.name}, Initial pos: {planet.posi}, Mass: {planet.mass}"
info.append(planet_info)
print(planet_info)
else:
info.append("No planet data found in summary.dat")
print("No planet data found in summary.dat")
# No devolver string para evitar mostrar \n literales en notebooks
return None
def _get_nsnaps(self):
"""
Get the number of snapshots in the output directory.
Returns
-------
int
Number of snapshot files.
"""
try:
# List all files in the output directory
files = [
f
for f in os.listdir(self.output_dir)
if f.startswith("summary") and f.endswith(".dat")
]
nsnaps = len(files)
return nsnaps
except FileNotFoundError:
print(f"No summary file in {self.output_dir}")
return 0
def _load_dims(self, dimsfile):
"""
Parse the dimensions file.
Parameters
----------
dimsfile : str
Name of the dimensions file.
Returns
-------
np.ndarray or list
Array of dimensions or empty list if not found.
"""
dimsfile = os.path.join(self.output_dir, dimsfile)
if not os.path.isfile(dimsfile):
# print(f"No file with dimensions '{dimsfile}' found.")
return []
dims = np.loadtxt(dimsfile)
return dims
def _load_variables(self, varfile):
"""
Parse the file with the simulation variables.
Parameters
----------
varfile : str
Name of the variables file.
Returns
-------
fargopy.Dictobj
Object containing simulation variables.
"""
varfile = os.path.join(self.output_dir, varfile)
if not os.path.isfile(varfile):
print(f"No file with variables named '{varfile}' found.")
return
print(f"Loading variables")
variables = np.genfromtxt(
varfile,
dtype={"names": ("parameters", "values"), "formats": ("|S30", "|S300")},
).tolist()
vars = dict()
for posicion in variables:
str_value = posicion[1].decode("utf-8")
try:
value = int(str_value)
except:
try:
value = float(str_value)
except:
value = str_value
vars[posicion[0].decode("utf-8")] = value
vars = fargopy.Dictobj(dict=vars)
print(f"{len(vars.__dict__.keys())} variables loaded")
# Create additional variables
variables = ["x", "y", "z"]
if vars.COORDINATES == "cylindrical":
variables = ["phi", "r", "z"]
elif vars.COORDINATES == "spherical":
variables = ["phi", "r", "theta"]
vars.VARIABLES = variables
vars.__dict__[f"N{variables[0].upper()}"] = vars.NX
vars.__dict__[f"N{variables[1].upper()}"] = vars.NY
vars.__dict__[f"N{variables[2].upper()}"] = vars.NZ
# Dimension of the domain
vars.DIM = 2 if vars.NZ == 1 else 3
return vars
def _load_domains(self, vars, domain_prefix, borders=None, middle=True):
"""
Load domain coordinate arrays from files.
Parameters
----------
vars : fargopy.Dictobj
Simulation variables.
domain_prefix : str
Prefix for domain files.
borders : list, optional
List of border slices for each variable.
middle : bool, optional
If True, average between cell coordinates.
Returns
-------
fargopy.Dictobj
Object containing domain arrays.
"""
if borders is None:
borders = [[], [3, -3], [3, -3]]
# Coordinates
variable_suffixes = ["x", "y", "z"]
print(f"Loading domain in {vars.COORDINATES} coordinates:")
# Correct dims in case of 2D
if vars.DIM == 2:
borders[-1] = []
# Load domains
domains = dict()
domains["extrema"] = dict()
for i, variable_suffix in enumerate(variable_suffixes):
domain_file = os.path.join(
self.output_dir, f"{domain_prefix}{variable_suffix}.dat"
)
if os.path.isfile(domain_file):
# Load data from file
domains[vars.VARIABLES[i]] = np.genfromtxt(domain_file)
if len(borders[i]) > 0:
# Drop the border of the domain
domains[vars.VARIABLES[i]] = domains[vars.VARIABLES[i]][
borders[i][0] : borders[i][1]
]
if middle:
# Average between domain cell coordinates
domains[vars.VARIABLES[i]] = 0.5 * (
domains[vars.VARIABLES[i]][:-1] + domains[vars.VARIABLES[i]][1:]
)
# Show indices and value map
domains["extrema"][vars.VARIABLES[i]] = [
[0, domains[vars.VARIABLES[i]][0]],
[-1, domains[vars.VARIABLES[i]][-1]],
]
print(
f"\tVariable {vars.VARIABLES[i]}: {len(domains[vars.VARIABLES[i]])} {domains['extrema'][vars.VARIABLES[i]]}"
)
else:
print(f"\tDomain file {domain_file} not found.")
domains = fargopy.Dictobj(dict=domains)
return domains
[docs]
def load_field(
self,
fields,
slice=None,
snapshot=None,
type=None,
coords="cartesian",
cut=None,
):
"""
Load a field or multiple fields from the simulation.
This method initializes a ``FieldInterpolator`` to handle the loading
and interpolation of simulation fields. It supports loading single
or multiple fields, taking slices, and specifying the coordinate
system for vector fields.
Parameters
----------
fields : str or list of str
Name or list of names of the fields to load (e.g., 'gasdens', 'gasv').
slice : str, optional
Slice definition string (e.g., 'phi=0', 'theta=1.57'). Used to
load a specific subset of the data.
snapshot : int or list of int, optional
Snapshot number(s) to load. Can be a single integer or a
range [start, end]. Default is None (which usually implies
snapshot 0 or all, depending on context, but here it is passed
to FieldInterpolator).
type : str, optional
Field type ('scalar' or 'vector'). If None, it is inferred
from the field name.
coords : str, optional
Coordinate system for vector transformation ('cartesian',
'cylindrical', 'spherical'). Default is 'cartesian'.
cut : list, optional
Geometric cut parameters (sphere or cylinder mask) to apply
to the data.
Returns
-------
fargopy.FieldInterpolator
An object capable of interpolating and managing the loaded field data.
Examples
--------
Load density and velocity for snapshot 10:
>>> loader = sim.load_field(fields=['gasdens', 'gasv'], snapshot=10)
Load a 2D slice at phi=0:
>>> df_slice = sim.load_field(fields='gasdens', slice='phi=0', snapshot=10)
"""
# Ensure fields is a list (but keep single-field compatibility)
single_input = False
if isinstance(fields, str):
single_input = True
fields = [fields]
self.field_handler = fargopy.FieldsHandler(
sim=self,
fields=fields,
slice=slice,
snapshot=snapshot,
type=type,
coords=coords,
cut=cut,
)
fields = self.field_handler.get_interpolator()
return fields
def _load_field_scalar(self, file):
"""
Load a scalar field from a file.
Parameters
----------
file : str
Path to the field file.
Returns
-------
np.ndarray
Field data array.
"""
if os.path.isfile(file):
field_data = np.fromfile(file).reshape(
int(self.vars.NZ), int(self.vars.NY), int(self.vars.NX)
)
return field_data
else:
raise AssertionError(f"File with field '{file}' not found")
def _load_field_raw(self, field, snapshot=0, field_type=None):
"""
Internal helper to load a single field as a `fargopy.Field` without going
through `load_field` dispatching. This prevents recursion when higher-level
helpers request raw data.
"""
# Infer type if not provided
if field_type is None:
if field in ["gasdens", "gasenergy"]:
field_type = "scalar"
elif field == "gasv":
field_type = "vector"
else:
raise ValueError(f"Field type for '{field}' could not be inferred.")
# Load scalar
if field_type == "scalar":
file_name = f"{field}{snapshot}.dat"
file_field = os.path.join(self.output_dir, file_name)
data = self._load_field_scalar(file_field)
# Load vector
elif field_type == "vector":
data = []
components = ["x", "y"]
if self.vars.DIM == 3:
components += ["z"]
for comp in components:
file_name = f"{field}{comp}{snapshot}.dat"
file_field = os.path.join(self.output_dir, file_name)
data.append(self._load_field_scalar(file_field))
data = np.array(data)
return fargopy.Field(
data=np.array(data),
coordinates=self.vars.COORDINATES,
domains=self.domains,
type=field_type,
)
@staticmethod
def _parse_file_field(file_field):
"""
Parse a field filename to extract the field name and snapshot number.
Parameters
----------
file_field : str
Filename of the field.
Returns
-------
list or None
List with field name and snapshot number, or None if not matched.
"""
basename = os.path.basename(file_field)
comps = None
match = re.match("([a-zA-Z]+)(\d+).dat", basename)
if match is not None:
comps = [match.group(i) for i in range(1, match.lastindex + 1)]
return comps
def __repr__(self):
"""
String representation of the Simulation object.
Returns
-------
str
"""
repr = f"""FARGOPY simulation (fargo3d_dir = '{self.fargo3d_dir}', setup = '{self.setup}')"""
return repr
def __str__(self):
"""
Detailed string with simulation information.
Returns
-------
str
"""
str = f"""Simulation information:
FARGO3D directory: {self.fargo3d_dir}
Outputs: {self.outputs_dir}
Setups: {self.setups_dir}
Units:
G = {self.G} UL^3/(UM UT^2)
UL, UM, UT = {self.UL} m, {self.UM} kg, {self.UT} s
UE = {self.UEPS} J/m^3
UV = {self.UV} m/s
URHO = {self.URHO} kg/m^3
USIGMA = {self.USIGMA} kg/m^2
Setup: {self.setup}
Setup directory: {self.setup_dir}
Output directory: {self.output_dir}
"""
return str
# ##########################################################################
# Static method
# ##########################################################################
[docs]
@staticmethod
def list_setups():
"""
Print all valid setup directories detected under ``FP_FARGO3D_DIR``.
Returns
-------
None
The setups are printed to stdout; nothing is returned.
"""
import glob
pattern = os.path.join(fargopy.Conf.FP_FARGO3D_DIR, "setups", "*")
output = sorted(glob.glob(pattern))
list_str = ""
for setup_dir in output:
setup_dir = setup_dir.replace("//", "/")
setup_name = setup_dir.split("/")[-1]
setup_par = os.path.join(setup_dir, f"{setup_name}.par")
if os.path.isfile(setup_par):
list_str += f"Setup '{setup_name}' in '{setup_dir}'\n"
print(list_str)
[docs]
@staticmethod
def list_precomputed():
"""
Display the catalog of downloadable precomputed simulations with descriptions and sizes.
Returns
-------
None
The listing is printed to stdout.
"""
for key, item in PRECOMPUTED_SIMULATIONS.items():
print(
f"{key}:\n\tDescription: {item['description']}\n\tSize: {item['size']} MB"
)
[docs]
@staticmethod
def download_precomputed(setup=None, download_dir=None, quiet=False, clean=True):
"""
Download and extract a precomputed output archive from the public repository.
Parameters
----------
setup : str, optional
Name of the entry in ``PRECOMPUTED_SIMULATIONS``.
download_dir : str, optional
Destination directory for the compressed file and extracted output.
quiet : bool, optional
When True, suppress download progress indicators.
clean : bool, optional
Remove the downloaded archive after extraction when set to True.
Returns
-------
str
Absolute path to the extracted output directory, or an empty string on failure.
"""
if setup is None:
print(
f"You must provide a setup name. Available setups: {list(PRECOMPUTED_SIMULATIONS.keys())}"
)
return ""
# Set default download directory based on OS
if download_dir is None:
if os.name == "nt": # Windows
download_dir = os.path.join(
os.environ.get("TEMP", "C:\\temp"), "fargopy_data"
)
else: # Unix-like systems
download_dir = "/tmp"
# Create directory if it doesn't exist
os.makedirs(download_dir, exist_ok=True)
if not os.path.isdir(download_dir):
print(f"Download directory '{download_dir}' does not exist.")
return ""
if setup not in PRECOMPUTED_SIMULATIONS.keys():
print(
f"Precomputed setup '{setup}' is not among the available setups: {list(PRECOMPUTED_SIMULATIONS.keys())}"
)
return ""
output_dir = os.path.join(download_dir, setup)
if os.path.isdir(output_dir):
print(f"Precomputed output directory '{output_dir}' already exist")
return output_dir
else:
filename = setup + ".tgz"
fileloc = os.path.join(download_dir, filename)
if os.path.isfile(fileloc):
print(f"Precomputed file '{fileloc}' already downloaded")
else:
# Download the setups
print(
f"Downloading {filename} from cloud (compressed size around {PRECOMPUTED_SIMULATIONS[setup]['size']} MB) into {download_dir}"
)
url = PRECOMPUTED_BASEURL + PRECOMPUTED_SIMULATIONS[setup]["id"]
gdown.download(url, fileloc, quiet=quiet)
# Uncompress the setups - Windows compatible
print(f"Uncompressing {filename} into {output_dir}")
try:
import tarfile
with tarfile.open(fileloc, "r:gz") as tar:
tar.extractall(path=download_dir)
print(f"Done.")
# Clean up the tar file if requested
if clean:
os.remove(fileloc)
except Exception as e:
print(f"Error uncompressing file: {e}")
# Fallback to system command for Unix-like systems
if os.name != "nt":
fargopy.Sys.simple(f"cd {download_dir};tar zxf {filename}")
if clean:
fargopy.Sys.simple(f"cd {download_dir};rm -rf {filename}")
else:
print(
"Failed to decompress on Windows. Please install tar or 7-zip."
)
return ""
return output_dir
[docs]
def time_scale(self, scale="orbits"):
"""
Calculates the time scale of the simulation in different units.
Parameters
----------
scale : str, optional
'orbits' to calculate the number of orbits completed by the planet,
'duration' for the total simulation time in simulation units.
Returns
-------
float
Number of orbits or total simulation time, depending on scale.
"""
import contextlib
import io
import numpy as np
with contextlib.redirect_stdout(io.StringIO()):
# Load necessary parameters
self.load_macros()
self.load_planet_summary()
# Extract parameters from macros and planet summary
G = self.G # Gravitational constant in simulation units
M_star = self.simulation_macros["MSTAR"] # Stellar mass in simulation units
planet = self.planets[0] # Assume the first planet for calculations
a = planet["distance"] # Orbital radius in simulation units
# Calculate orbital period (T) using Kepler's third law
T = 2 * np.pi * np.sqrt(a**3 / (G * M_star))
# Extract simulation parameters
NINTERM = self._load_variables("variables.par").NINTERM
DT = self._load_variables("variables.par").DT
NTOT = self._load_variables("variables.par").NTOT
# Calculate total simulation time
total_time = NTOT * DT # Total time in simulation units
if scale == "orbits":
# Calculate the number of orbits completed by the planet
orbits_num = total_time / T
return orbits_num
elif scale == "duration":
# Return the total simulation time in simulation units
return total_time
else:
raise ValueError("Invalid scale. Choose either 'orbits' or 'duration'.")
[docs]
def to_paraview(self, snapshot=0, dir=".", basename=None):
"""
Export a snapshot to a VTU (UnstructuredGrid) file with physical XYZ coordinates
and point data arrays for density and Cartesian velocity.
Supports both spherical and cylindrical coordinate systems.
Parameters
----------
snapshot : int
Snapshot index to export (default: 0).
dir : str
Output directory where the .vtu file will be written (default: current directory).
basename : str or None
Base name for the output file (without extension). If None, a default name
using the simulation setup and snapshot is created.
Notes
-----
- Requires the 'vtk' Python package (and vtk.util.numpy_support).
- Expects self.load_field('gasdens') and self.load_field('gasv') to return
fargopy.Field-like objects where .data is a numpy array with shapes:
For spherical: rho: (ntheta, nr, nphi), vel: (3, ntheta, nr, nphi) or (ntheta, nr, nphi, 3)
For cylindrical: rho: (nz, nr, nphi), vel: (3, nz, nr, nphi) or (nz, nr, nphi, 3)
- Domains expected in self.domains as theta/z, r, phi arrays.
- Output is a single .vtu unstructured grid with VTK_VOXEL cells and point data arrays:
- "rho" (scalar)
- "vel_cart" (3-component vector)
"""
import os
import numpy as np
try:
import vtk
from vtk.util import numpy_support as ns
except Exception as e:
raise RuntimeError(
"VTK is required for to_paraview. Install the 'vtk' package."
) from e
# Check if simulation is 3D (required for VTU export)
if hasattr(self, 'vars') and self.vars.DIM == 2:
raise ValueError(
"to_paraview() requires a 3D simulation. "
"Your simulation is 2D. VTU export is only available for 3D domains."
)
# prepare output path
os.makedirs(dir, exist_ok=True)
if basename is None:
base = getattr(self, "setup", "simulation")
basename = f"{base}_snap{snapshot}"
filename = os.path.join(dir, basename)
# Load fields (tolerant to different return types)
gasdens = self._load_field_raw(
field="gasdens", snapshot=snapshot, field_type="scalar"
)
gasv = self._load_field_raw(field="gasv", snapshot=snapshot, field_type="vector")
rho = np.log10(
gasdens.data * self.URHO
) # convert to physical units and log10
vel = gasv.data * self.UL / self.UT * 1e-5
# Normalize shapes
if rho.ndim == 4 and rho.shape[0] == 1:
rho = rho[0]
if rho.ndim != 3:
raise ValueError(
f"Unexpected rho shape: {rho.shape}. Expected (n1,nr,nphi) where n1 is ntheta or nz."
)
# velocity can be (3,n1,nr,nphi) or (n1,nr,nphi,3)
if vel.ndim == 4 and vel.shape[0] == 3:
vel_components = vel
elif vel.ndim == 4 and vel.shape[-1] == 3:
vel_components = np.moveaxis(vel, -1, 0)
else:
raise ValueError(
f"Unexpected vel shape: {vel.shape}. Expected (3,n1,nr,nphi) or (n1,nr,nphi,3)."
)
# Detect coordinate system
coords = self.vars.COORDINATES if hasattr(self, 'vars') else 'spherical'
# Get coordinate grids from self.domains
r = self.domains.r * self.UL / self.AU
phi = self.domains.phi
if coords == 'spherical':
# Spherical coordinates
try:
theta = self.domains.theta
except Exception as e:
raise RuntimeError(
"Could not find theta in self.domains for spherical coordinates"
) from e
# Create meshgrid (vectorized)
TT, RR, PP = np.meshgrid(theta, r, phi, indexing="ij") # shape (ntheta,nr,nphi)
# Coordinates in Cartesian
X = (RR * np.sin(TT) * np.cos(PP)).ravel(order="C").astype(np.float64)
Y = (RR * np.sin(TT) * np.sin(PP)).ravel(order="C").astype(np.float64)
Z = (RR * np.cos(TT)).ravel(order="C").astype(np.float64)
pts = np.column_stack([X, Y, Z])
# Cartesian velocity components (vectorized)
v_phi = vel_components[0]
v_r = vel_components[1]
v_theta = vel_components[2]
v_x = (
(
v_r * np.sin(TT) * np.cos(PP)
+ v_theta * np.cos(TT) * np.cos(PP)
- v_phi * np.sin(PP)
)
.ravel(order="C")
.astype(np.float64)
)
v_y = (
(
v_r * np.sin(TT) * np.sin(PP)
+ v_theta * np.cos(TT) * np.sin(PP)
+ v_phi * np.cos(PP)
)
.ravel(order="C")
.astype(np.float64)
)
v_z = (
(v_r * np.cos(TT) - v_theta * np.sin(TT))
.ravel(order="C")
.astype(np.float64)
)
elif coords == 'cylindrical':
# Cylindrical coordinates
try:
z = self.domains.z * self.UL / self.AU
except Exception as e:
raise RuntimeError(
"Could not find z in self.domains for cylindrical coordinates"
) from e
# Create meshgrid (vectorized)
ZZ, RR, PP = np.meshgrid(z, r, phi, indexing="ij") # shape (nz,nr,nphi)
# Coordinates in Cartesian
X = (RR * np.cos(PP)).ravel(order="C").astype(np.float64)
Y = (RR * np.sin(PP)).ravel(order="C").astype(np.float64)
Z = ZZ.ravel(order="C").astype(np.float64)
pts = np.column_stack([X, Y, Z])
# Cartesian velocity components (vectorized)
v_phi = vel_components[0]
v_r = vel_components[1]
v_z = vel_components[2] if vel_components.shape[0] > 2 else np.zeros_like(v_r)
v_x = (
(v_r * np.cos(PP) - v_phi * np.sin(PP))
.ravel(order="C")
.astype(np.float64)
)
v_y = (
(v_r * np.sin(PP) + v_phi * np.cos(PP))
.ravel(order="C")
.astype(np.float64)
)
v_z = v_z.ravel(order="C").astype(np.float64)
else:
raise ValueError(f"Unsupported coordinate system: {coords}")
vel_cart = np.column_stack([v_x, v_y, v_z])
# Density flattened
rho_flat = rho.ravel(order="C").astype(np.float64)
# VTK points
vtk_points = vtk.vtkPoints()
vtk_points.SetData(ns.numpy_to_vtk(pts, deep=True))
# Build VOXEL connectivity (vectorized)
n1, nr, nphi = rho.shape # n1 is ntheta (spherical) or nz (cylindrical)
n1m = n1 - 1
nrm = nr - 1
npm = nphi - 1
ncells = n1m * nrm * npm
i1, ir_, ip = np.meshgrid(
np.arange(n1m), np.arange(nrm), np.arange(npm), indexing="ij"
)
i1 = i1.ravel()
ir_ = ir_.ravel()
ip = ip.ravel()
base = i1 * nr * nphi + ir_ * nphi + ip
p000 = base
p001 = base + 1
p010 = base + nphi
p011 = base + nphi + 1
p100 = base + nr * nphi
p101 = base + nr * nphi + 1
p110 = base + nr * nphi + nphi
p111 = base + nr * nphi + nphi + 1
cells = np.column_stack(
[
np.full(ncells, 8, dtype=np.int64),
p000,
p001,
p010,
p011,
p100,
p101,
p110,
p111,
]
).ravel()
vtk_cells = vtk.vtkCellArray()
vtk_cells.SetCells(ncells, ns.numpy_to_vtkIdTypeArray(cells, deep=True))
# Unstructured grid
grid = vtk.vtkUnstructuredGrid()
grid.SetPoints(vtk_points)
grid.SetCells(vtk.VTK_VOXEL, vtk_cells)
# Point data arrays
vtk_rho = ns.numpy_to_vtk(rho_flat, deep=True)
vtk_rho.SetName("rho")
vtk_vel = ns.numpy_to_vtk(vel_cart, deep=True)
vtk_vel.SetNumberOfComponents(3)
vtk_vel.SetName("vel_cart")
grid.GetPointData().AddArray(vtk_rho)
grid.GetPointData().AddArray(vtk_vel)
# Write VTU
writer = vtk.vtkXMLUnstructuredGridWriter()
writer.SetFileName(filename + ".vtu")
writer.SetInputData(grid)
if writer.Write() == 0:
raise RuntimeError(f"Failed writing VTU file '{filename}.vtu'.")
return filename + ".vtu"
[docs]
class Planet:
"""
Represents a planet in the simulation, holding its physical state and properties.
Attributes
----------
name : str
Name or label of the planet.
mass : float
Planet mass (in Msun or simulation units).
pos : Planet.Vector
Current cartesian position (x, y, z) in AU.
vel : Planet.Vector
Current cartesian velocity (vx, vy, vz) in AU/UT.
posi : Planet.Vector
Initial cartesian position (x, y, z) in AU.
mstar : float
Stellar mass (in Msun or simulation units).
Properties
----------
hill_radius : float
The Hill radius of the planet (in AU), computed from its current position and mass.
Example
-------
>>> jupiter = planets[0]
>>> print(jupiter.pos.x, jupiter.vel.y, jupiter.hill_radius)
"""
def __init__(self, name, pos, vel, mass, posi, mstar):
self.name = name
self.mass = mass
self.pos = Planet._Vector(*pos)
self.vel = Planet._Vector(*vel)
self.posi = Planet._Vector(*posi)
self.mstar = mstar
class _Vector:
"""
Simple vector class for 3D coordinates, allowing attribute and index access.
Attributes
----------
x : float
X coordinate.
y : float
Y coordinate.
z : float
Z coordinate.
"""
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
def __getitem__(self, idx):
"""
Return the coordinate at index ``idx`` (0→x, 1→y, 2→z).
"""
return [self.x, self.y, self.z][idx]
def __array__(self):
"""
Expose the vector as a NumPy array for downstream numerical routines.
"""
import numpy as np
return np.array([self.x, self.y, self.z])
def __repr__(self):
"""
Debug-friendly string showing the three Cartesian components.
"""
return f"[{self.x}, {self.y}, {self.z}]"
@property
def hill_radius(self):
"""
.. :no-index:
Returns the Hill radius in AU, using the current position and mass.
Returns
-------
float
Hill radius in AU.
"""
AU_to_cm = 1.495978707e13
Mjup_to_g = 1.898e30
Msun_to_g = 1.989e33
# Distance to star (AU)
r_au = (self.pos.x**2 + self.pos.y**2 + self.pos.z**2) ** 0.5
m_jup = self.mass * 1e3 # Msun to Mjup
mstar_g = float(self.mstar) * Msun_to_g
r_cm = r_au * AU_to_cm
m_p = m_jup * Mjup_to_g
r_hill_cm = r_cm * (m_p / (3 * mstar_g)) ** (1 / 3)
r_hill_au = r_hill_cm / AU_to_cm
return r_hill_au
def __repr__(self):
"""
String representation of the Planet object.
Returns
-------
str
"""
return f"Planet(name={self.name}, mass={self.mass}, pos={self.pos}, vel={self.vel}, posi={self.posi})"