diff --git a/docs/source/pythonapi/deplete.rst b/docs/source/pythonapi/deplete.rst index f112cf8ccff..25fcd898f49 100644 --- a/docs/source/pythonapi/deplete.rst +++ b/docs/source/pythonapi/deplete.rst @@ -287,6 +287,16 @@ the following abstract base classes: abc.SIIntegrator abc.DepSystemSolver +R2S Automation +-------------- + +.. autosummary:: + :toctree: generated + :nosignatures: + :template: myclass.rst + + R2SManager + D1S Functions ------------- diff --git a/docs/source/usersguide/decay_sources.rst b/docs/source/usersguide/decay_sources.rst index d5a078135ba..bf84f53b5b5 100644 --- a/docs/source/usersguide/decay_sources.rst +++ b/docs/source/usersguide/decay_sources.rst @@ -6,42 +6,189 @@ Decay Sources Through the :ref:`depletion ` capabilities in OpenMC, it is possible to simulate radiation emitted from the decay of activated materials. -For fusion energy systems, this is commonly done using what is known as the -`rigorous 2-step `_ (R2S) method. -In this method, a neutron transport calculation is used to determine the neutron -flux and reaction rates over a cell- or mesh-based spatial discretization of the -model. Then, the neutron flux in each discrete region is used to predict the -activated material composition using a depletion solver. Finally, a photon -transport calculation with a source based on the activity and energy spectrum of -the activated materials is used to determine a desired physical response (e.g., -a dose rate) at one or more locations of interest. - -Once a depletion simulation has been completed in OpenMC, the intrinsic decay -source can be determined as follows. First the activated material composition -can be determined using the :class:`openmc.deplete.Results` object. Indexing an -instance of this class with the timestep index returns a -:class:`~openmc.deplete.StepResult` object, which itself has a -:meth:`~openmc.deplete.StepResult.get_material` method. Once the activated -:class:`~openmc.Material` has been obtained, the -:meth:`~openmc.Material.get_decay_photon_energy` method will give the energy -spectrum of the decay photon source. The integral of the spectrum also indicates -the intensity of the source in units of [Bq]. Altogether, the workflow looks as -follows:: - +For fusion energy systems, this is commonly done using either the `rigorous +2-step `_ (R2S) method or the +`direct 1-step `_ (D1S) method. +In the R2S method, a neutron transport calculation is used to determine the +neutron flux and reaction rates over a cell- or mesh-based spatial +discretization of the model. Then, the neutron flux in each discrete region is +used to predict the activated material composition using a depletion solver. +Finally, a photon transport calculation with a source based on the activity and +energy spectrum of the activated materials is used to determine a desired +physical response (e.g., a dose rate) at one or more locations of interest. +OpenMC includes automation for both the R2S and D1S methods as described in the +following sections. + +Rigorous 2-Step (R2S) Calculations +================================== + +OpenMC includes an :class:`openmc.deplete.R2SManager` class that fully automates +cell- and mesh-based R2S calculations. Before we describe this class, it is +useful to understand the basic mechanics of how an R2S calculation works. +Generally, it involves the following steps: + +1. The :meth:`openmc.deplete.get_microxs_and_flux` function is called to run a + neutron transport calculation that determines fluxes and microscopic cross + sections in each activation region. +2. The :class:`openmc.deplete.IndependentOperator` and + :class:`openmc.deplete.PredictorIntegrator` classes are used to carry out a + depletion (activation) calculation in order to determine predicted material + compositions based on a set of timesteps and source rates. +3. The activated material composition is determined using the + :class:`openmc.deplete.Results` class. Indexing an instance of this class + with the timestep index returns a :class:`~openmc.deplete.StepResult` object, + which itself has a :meth:`~openmc.deplete.StepResult.get_material` method + returning an activated material. +4. The :meth:`openmc.Material.get_decay_photon_energy` method is used to obtain + the energy spectrum of the decay photon source. The integral of the spectrum + also indicates the intensity of the source in units of [Bq]. +5. A new photon source is defined using one of OpenMC's source classes with the + energy distribution set equal to the object returned by the + :meth:`openmc.Material.get_decay_photon_energy` method. The source is then + assigned to a photon :class:`~openmc.Model`. +6. A photon transport calculation is run with ``model.run()``. + +Altogether, the workflow looks as follows:: + + # Run neutron transport calculation + fluxes, micros = openmc.deplete.get_microxs_and_flux(model, domains) + + # Run activation calculation + op = openmc.deplete.IndependentOperator(mats, fluxes, micros) + timesteps = ... + source_rates = ... + integrator = openmc.deplete.Integrator(op, timesteps, source_rates) + integrator.integrate() + + # Get decay photon source at last timestep results = openmc.deplete.Results("depletion_results.h5") - - # Get results at last timestep step = results[-1] - - # Get activated material composition for ID=1 activated_mat = step.get_material('1') - - # Determine photon source photon_energy = activated_mat.get_decay_photon_energy() - -By default, the :meth:`~openmc.Material.get_decay_photon_energy` method will -eliminate spectral lines with very low intensity, but this behavior can be -configured with the ``clip_tolerance`` argument. + photon_source = openmc.IndependentSource( + space=..., + energy=photon_energy, + particle='photon', + strength=photon_energy.integral() + ) + + # Run photon transport calculation + model.settings.source = photon_source + model.run() + +Note that by default, the :meth:`~openmc.Material.get_decay_photon_energy` +method will eliminate spectral lines with very low intensity, but this behavior +can be configured with the ``clip_tolerance`` argument. + +Cell-based R2S +-------------- + +In practice, users do not need manually go through each of the steps in an R2S +calculation described above. The :class:`~openmc.deplete.R2SManager` fully +automates the execution of neutron transport, depletion, decay source +generation, and photon transport. For a cell-based R2S calculation, once you +have a :class:`~openmc.Model` that has been defined, simply create an instance +of :class:`~openmc.deplete.R2SManager` by passing the model and a list of cells +to activate:: + + r2s = openmc.deplete.R2SManager(model, [cell1, cell2, cell3]) + +Note that the ``volume`` attribute must be set for any cell that is to be +activated. The :class:`~openmc.deplete.R2SManager` class allows you to +optionally specify a separate photon model; if not given as an argument, it will +create a shallow copy of the original neutron model (available as the +``neutron_model`` attribute) and store it in the ``photon_model`` attribute. We +can use this to define tallies specific to the photon model:: + + dose_tally = openmc.Tally() + ... + r2s.photon_model.tallies = [dose_tally] + +Next, define the timesteps and source rates for the activation calculation:: + + timesteps = [(3.0, 'd'), (5.0, 'h')] + source_rates = [1e12, 0.0] + +In this case, the model is irradiated for 3 days with a source rate of +:math:`10^{12}` neutron/sec and then the source is turned off and the activated +materials are allowed to decay for 5 hours. These parameters should be passed to +the :meth:`~openmc.deplete.R2SManager.run` method to execute the full R2S +calculation. Before we can do that though, for a cell-based calculation, the one +other piece of information that is needed is bounding boxes of the activated +cells:: + + bounding_boxes = { + cell1.id: cell1.bounding_box, + cell2.id: cell2.bounding_box, + cell3.id: cell3.bounding_box + } + +Note that calling the ``bounding_box`` attribute may not work for all +constructive solid geometry regions (for example, a cell that uses a +non-axis-aligned plane). In these cases, the bounding box will need to be +specified manually. Once you have a set of bounding boxes, the R2S calculation +can be run:: + + r2s.run(timesteps, source_rates, bounding_boxes=bounding_boxes) + +If not specified otherwise, a photon transport calculation is run at each time +in the depletion schedule. That means in the case above, we would see three +photon transport calculations. To specify specific times at which photon +transport calculations should be run, pass the ``photon_time_indices`` argument. +For example, if we wanted to run a photon transport calculation only on the last +time (after the 5 day decay), we would run:: + + r2s.run(timesteps, source_rates, bounding_boxes=bounding_boxes, + photon_time_indices=[2]) + +After an R2S calculation has been run, the :class:`~openmc.deplete.R2SManager` +instance will have a ``results`` dictionary that allows you to directly access +results from each of the steps. It will also write out all the output files into +a directory that is named "r2s_/". The ``output_dir`` argument to the +:meth:`~openmc.deplete.R2SManager.run` method enables you to override the +default output directory name if desired. + +The :meth:`~openmc.deplete.R2SManager.run` method actually runs three +lower-level methods under the hood:: + + r2s.step1_neutron_transport(...) + r2s.step2_activation(...) + r2s.step3_photon_transport(...) + +For users looking for more control over the calculation, these lower-level +methods can be used in lieu of the :meth:`openmc.deplete.R2SManager.run` method. + +Mesh-based R2S +-------------- + +Executing a mesh-based R2S calculation looks nearly identical to the cell-based +R2S workflow described above. The only difference is that instead of passing a +list of cells to the ``domains`` argument of +:class:`~openmc.deplete.R2SManager`, you need to define a mesh object and pass +that instead. This might look like the following:: + + # Define a regular Cartesian mesh + mesh = openmc.RegularMesh() + mesh.lower_left = (-50., -50, 0.) + mesh.upper_right = (50., 50., 75.) + mesh.dimension = (10, 10, 5) + + r2s = openmc.deplete.R2SManager(model, mesh) + +Executing the R2S calculation is then performed by adding photon tallies and +calling the :meth:`~openmc.deplete.R2SManager.run` method with the appropriate +timesteps and source rates. Note that in this case we do not need to define cell +volumes or bounding boxes as is required for a cell-based R2S calculation. +Instead, during the neutron transport step, OpenMC will run a raytracing +calculation to determine material volume fractions within each mesh element +using the :meth:`openmc.MeshBase.material_volumes` method. Arguments to this +method can be customized via the ``mat_vol_kwargs`` argument to the +:meth:`~openmc.deplete.R2SManager.run` method. Most often, this would involve +customizing the number of rays traced to obtain better estimates of volumes. As +an example, if we wanted to run the raytracing calculation with 10 million rays, +we would run:: + + r2s.run(timesteps, source_rates, mat_vol_kwargs={'n_samples': 10_000_000}) Direct 1-Step (D1S) Calculations ================================ diff --git a/openmc/deplete/__init__.py b/openmc/deplete/__init__.py index 8a9509e9009..052e2245963 100644 --- a/openmc/deplete/__init__.py +++ b/openmc/deplete/__init__.py @@ -17,6 +17,7 @@ from .results import * from .integrators import * from .transfer_rates import * +from .r2s import * from . import abc from . import cram from . import helpers diff --git a/openmc/deplete/microxs.py b/openmc/deplete/microxs.py index 4ce199f0cc1..e351c923df0 100644 --- a/openmc/deplete/microxs.py +++ b/openmc/deplete/microxs.py @@ -8,8 +8,9 @@ from collections.abc import Sequence import shutil from tempfile import TemporaryDirectory -from typing import Union, TypeAlias +from typing import Union, TypeAlias, Self +import h5py import pandas as pd import numpy as np @@ -20,6 +21,7 @@ import openmc from .chain import Chain, REACTIONS, _get_chain from .coupled_operator import _find_cross_sections, _get_nuclides_with_data +from ..utility_funcs import h5py_file_or_group import openmc.lib from openmc.mpi import comm @@ -47,6 +49,7 @@ def get_microxs_and_flux( reaction_rate_mode: str = 'direct', chain_file: PathLike | Chain | None = None, path_statepoint: PathLike | None = None, + path_input: PathLike | None = None, run_kwargs=None ) -> tuple[list[np.ndarray], list[MicroXS]]: """Generate microscopic cross sections and fluxes for multiple domains. @@ -59,7 +62,7 @@ def get_microxs_and_flux( .. versionadded:: 0.14.0 .. versionchanged:: 0.15.3 - Added `reaction_rate_mode` and `path_statepoint` arguments. + Added `reaction_rate_mode`, `path_statepoint`, `path_input` arguments. Parameters ---------- @@ -90,6 +93,10 @@ def get_microxs_and_flux( Path to write the statepoint file from the neutron transport solve to. By default, The statepoint file is written to a temporary directory and is not kept. + path_input : path-like, optional + Path to write the model XML file from the neutron transport solve to. + By default, the model XML file is written to a temporary directory and + not kept. run_kwargs : dict, optional Keyword arguments passed to :meth:`openmc.Model.run` @@ -108,7 +115,7 @@ def get_microxs_and_flux( check_value('reaction_rate_mode', reaction_rate_mode, {'direct', 'flux'}) # Save any original tallies on the model - original_tallies = model.tallies + original_tallies = list(model.tallies) # Determine what reactions and nuclides are available in chain chain = _get_chain(chain_file) @@ -178,6 +185,10 @@ def get_microxs_and_flux( shutil.move(statepoint_path, path_statepoint) statepoint_path = path_statepoint + # Export the model to path_input if provided + if path_input is not None: + model.export_to_model_xml(path_input) + with StatePoint(statepoint_path) as sp: if reaction_rate_mode == 'direct': rr_tally = sp.tallies[rr_tally.id] @@ -383,8 +394,7 @@ def from_csv(cls, csv_file, **kwargs): MicroXS """ - if 'float_precision' not in kwargs: - kwargs['float_precision'] = 'round_trip' + kwargs.setdefault('float_precision', 'round_trip') df = pd.read_csv(csv_file, **kwargs) df.set_index(['nuclides', 'reactions', 'groups'], inplace=True) @@ -419,3 +429,96 @@ def to_csv(self, *args, **kwargs): ) df = pd.DataFrame({'xs': self.data.flatten()}, index=multi_index) df.to_csv(*args, **kwargs) + + def to_hdf5(self, group_or_filename: h5py.Group | PathLike, **kwargs): + """Export microscopic cross section data to HDF5 format + + Parameters + ---------- + group_or_filename : h5py.Group or path-like + HDF5 group or filename to write to + kwargs : dict, optional + Keyword arguments to pass to :meth:`h5py.Group.create_dataset`. + Defaults to {'compression': 'lzf'}. + + """ + kwargs.setdefault('compression', 'lzf') + + with h5py_file_or_group(group_or_filename, 'w') as group: + # Store cross section data as 3D dataset + group.create_dataset('data', data=self.data, **kwargs) + + # Store metadata as datasets using string encoding + group.create_dataset('nuclides', data=np.array(self.nuclides, dtype='S')) + group.create_dataset('reactions', data=np.array(self.reactions, dtype='S')) + + @classmethod + def from_hdf5(cls, group_or_filename: h5py.Group | PathLike) -> Self: + """Load data from an HDF5 file + + Parameters + ---------- + group_or_filename : h5py.Group or str or PathLike + HDF5 group or path to HDF5 file. If given as an h5py.Group, the + data is read from that group. If given as a string, it is assumed + to be the filename for the HDF5 file. + + Returns + ------- + MicroXS + """ + + with h5py_file_or_group(group_or_filename, 'r') as group: + # Read data from HDF5 group + data = group['data'][:] + nuclides = [nuc.decode('utf-8') for nuc in group['nuclides'][:]] + reactions = [rxn.decode('utf-8') for rxn in group['reactions'][:]] + + return cls(data, nuclides, reactions) + + +def write_microxs_hdf5( + micros: Sequence[MicroXS], + filename: PathLike, + names: Sequence[str] | None = None, + **kwargs +): + """Write multiple MicroXS objects to an HDF5 file + + Parameters + ---------- + micros : list of MicroXS + List of MicroXS objects + filename : PathLike + Output HDF5 filename + names : list of str, optional + Names for each MicroXS object. If None, uses 'domain_0', 'domain_1', + etc. + **kwargs + Additional keyword arguments passed to :meth:`h5py.Group.create_dataset` + """ + if names is None: + names = [f'domain_{i}' for i in range(len(micros))] + + # Open file once and write all domains using group interface + with h5py.File(filename, 'w') as f: + for microxs, name in zip(micros, names): + group = f.create_group(name) + microxs.to_hdf5(group, **kwargs) + + +def read_microxs_hdf5(filename: PathLike) -> dict[str, MicroXS]: + """Read multiple MicroXS objects from an HDF5 file + + Parameters + ---------- + filename : path-like + HDF5 filename + + Returns + ------- + dict + Dictionary mapping domain names to MicroXS objects + """ + with h5py.File(filename, 'r') as f: + return {name: MicroXS.from_hdf5(group) for name, group in f.items()} diff --git a/openmc/deplete/r2s.py b/openmc/deplete/r2s.py new file mode 100644 index 00000000000..4f67ff0b52f --- /dev/null +++ b/openmc/deplete/r2s.py @@ -0,0 +1,679 @@ +from __future__ import annotations +from collections.abc import Sequence +import copy +from datetime import datetime +import json +from pathlib import Path + +import numpy as np +import openmc +from . import IndependentOperator, PredictorIntegrator +from .microxs import get_microxs_and_flux, write_microxs_hdf5, read_microxs_hdf5 +from .results import Results +from ..checkvalue import PathLike + + +def get_activation_materials( + model: openmc.Model, mmv: openmc.MeshMaterialVolumes +) -> openmc.Materials: + """Get a list of activation materials for each mesh element/material. + + When performing a mesh-based R2S calculation, a unique material is needed + for each activation region, which is a combination of a mesh element and a + material within that mesh element. This function generates a list of such + materials, each with a unique name and volume corresponding to the mesh + element and material. + + Parameters + ---------- + model : openmc.Model + The full model containing the geometry and materials. + mmv : openmc.MeshMaterialVolumes + The mesh material volumes object containing the materials and their + volumes for each mesh element. + + Returns + ------- + openmc.Materials + A list of materials, each corresponding to a unique mesh element and + material combination. + + """ + # Get the material ID, volume, and element index for each element-material + # combination + mat_ids = mmv._materials[mmv._materials > -1] + volumes = mmv._volumes[mmv._materials > -1] + elems, _ = np.where(mmv._materials > -1) + + # Get all materials in the model + material_dict = model._get_all_materials() + + # Create a new activation material for each element-material combination + materials = openmc.Materials() + for elem, mat_id, vol in zip(elems, mat_ids, volumes): + mat = material_dict[mat_id] + new_mat = mat.clone() + new_mat.depletable = True + new_mat.name = f'Element {elem}, Material {mat_id}' + new_mat.volume = vol + materials.append(new_mat) + + return materials + + +class R2SManager: + """Manager for Rigorous 2-Step (R2S) method calculations. + + This class is responsible for managing the materials and sources needed for + mesh-based or cell-based R2S calculations. It provides methods to get + activation materials and decay photon sources based on the mesh/cells and + materials in the OpenMC model. + + This class supports the use of a different models for the neutron and photon + transport calculation. However, for cell-based calculations, it assumes that + the only changes in the model are material assignments. For mesh-based + calculations, it checks material assignments in the photon model and any + element--material combinations that don't appear in the photon model are + skipped. + + Parameters + ---------- + neutron_model : openmc.Model + The OpenMC model to use for neutron transport. + domains : openmc.MeshBase or Sequence[openmc.Cell] + The mesh or a sequence of cells that represent the spatial units over + which the R2S calculation will be performed. + photon_model : openmc.Model, optional + The OpenMC model to use for photon transport calculations. If None, a + shallow copy of the neutron_model will be created and used. + + Attributes + ---------- + domains : openmc.MeshBase or Sequence[openmc.Cell] + The mesh or a sequence of cells that represent the spatial units over + which the R2S calculation will be performed. + neutron_model : openmc.Model + The OpenMC model used for neutron transport. + photon_model : openmc.Model + The OpenMC model used for photon transport calculations. + method : {'mesh-based', 'cell-based'} + Indicates whether the R2S calculation uses mesh elements ('mesh-based') + as the spatial discetization or a list of a cells ('cell-based'). + results : dict + A dictionary that stores results from the R2S calculation. + + """ + def __init__( + self, + neutron_model: openmc.Model, + domains: openmc.MeshBase | Sequence[openmc.Cell], + photon_model: openmc.Model | None = None, + ): + self.neutron_model = neutron_model + if photon_model is None: + # Create a shallow copy of the neutron model for photon transport + self.photon_model = openmc.Model( + geometry=copy.copy(neutron_model.geometry), + materials=copy.copy(neutron_model.materials), + settings=copy.copy(neutron_model.settings), + tallies=copy.copy(neutron_model.tallies), + plots=copy.copy(neutron_model.plots), + ) + else: + self.photon_model = photon_model + if isinstance(domains, openmc.MeshBase): + self.method = 'mesh-based' + else: + self.method = 'cell-based' + self.domains = domains + self.results = {} + + def run( + self, + timesteps: Sequence[float] | Sequence[tuple[float, str]], + source_rates: float | Sequence[float], + timestep_units: str = 's', + photon_time_indices: Sequence[int] | None = None, + output_dir: PathLike | None = None, + bounding_boxes: dict[int, openmc.BoundingBox] | None = None, + chain_file: PathLike | None = None, + micro_kwargs: dict | None = None, + mat_vol_kwargs: dict | None = None, + run_kwargs: dict | None = None, + operator_kwargs: dict | None = None, + ): + """Run the R2S calculation. + + Parameters + ---------- + timesteps : Sequence[float] or Sequence[tuple[float, str]] + Sequence of timesteps. Note that values are not cumulative. The + units are specified by the `timestep_units` argument when + `timesteps` is an iterable of float. Alternatively, units can be + specified for each step by passing an iterable of (value, unit) + tuples. + source_rates : float or Sequence[float] + Source rate in [neutron/sec] for each interval in `timesteps`. + timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional + Units for values specified in the `timesteps` argument when passing + float values. 's' means seconds, 'min' means minutes, 'h' means + hours, 'd' means days, and 'a' means years (Julian). + photon_time_indices : Sequence[int], optional + Sequence of time indices at which photon transport should be run; + represented as indices into the array of times formed by the + timesteps. For example, if two timesteps are specified, the array of + times would contain three entries, and [2] would indicate computing + photon results at the last time. A value of None indicates to run + photon transport for each time. + output_dir : PathLike, optional + Path to directory where R2S calculation outputs will be saved. If + not provided, a timestamped directory 'r2s_YYYY-MM-DDTHH-MM-SS' is + created. Subdirectories will be created for the neutron transport, + activation, and photon transport steps. + bounding_boxes : dict[int, openmc.BoundingBox], optional + Dictionary mapping cell IDs to bounding boxes used for spatial + source sampling in cell-based R2S calculations. Required if method + is 'cell-based'. + chain_file : PathLike, optional + Path to the depletion chain XML file to use during activation. If + not provided, the default configured chain file will be used. + micro_kwargs : dict, optional + Additional keyword arguments passed to + :func:`openmc.deplete.get_microxs_and_flux` during the neutron + transport step. + mat_vol_kwargs : dict, optional + Additional keyword arguments passed to + :meth:`openmc.MeshBase.material_volumes`. + run_kwargs : dict, optional + Additional keyword arguments passed to :meth:`openmc.Model.run` + during the neutron and photon transport step. By default, output is + disabled. + operator_kwargs : dict, optional + Additional keyword arguments passed to + :class:`openmc.deplete.IndependentOperator`. + + Returns + ------- + Path + Path to the output directory containing all calculation results + """ + + if output_dir is None: + stamp = datetime.now().strftime('%Y-%m-%dT%H-%M-%S') + output_dir = Path(f'r2s_{stamp}') + + # Set run_kwargs for the neutron transport step + if micro_kwargs is None: + micro_kwargs = {} + if run_kwargs is None: + run_kwargs = {} + if operator_kwargs is None: + operator_kwargs = {} + run_kwargs.setdefault('output', False) + micro_kwargs.setdefault('run_kwargs', run_kwargs) + # If a chain file is provided, prefer it for steps 1 and 2 + if chain_file is not None: + micro_kwargs.setdefault('chain_file', chain_file) + operator_kwargs.setdefault('chain_file', chain_file) + + self.step1_neutron_transport( + output_dir / 'neutron_transport', mat_vol_kwargs, micro_kwargs + ) + self.step2_activation( + timesteps, source_rates, timestep_units, output_dir / 'activation', + operator_kwargs=operator_kwargs + ) + self.step3_photon_transport( + photon_time_indices, bounding_boxes, output_dir / 'photon_transport', + mat_vol_kwargs=mat_vol_kwargs, run_kwargs=run_kwargs + ) + + return output_dir + + def step1_neutron_transport( + self, + output_dir: PathLike = "neutron_transport", + mat_vol_kwargs: dict | None = None, + micro_kwargs: dict | None = None + ): + """Run the neutron transport step. + + This step computes the material volume fractions on the mesh, creates a + mesh-material filter, and retrieves the fluxes and microscopic cross + sections for each mesh/material combination. This step will populate the + 'fluxes' and 'micros' keys in the results dictionary. For a mesh-based + calculation, it will also populate the 'mesh_material_volumes' key. + + Parameters + ---------- + output_dir : PathLike, optional + The directory where the results will be saved. + mat_vol_kwargs : dict, optional + Additional keyword arguments based to + :meth:`openmc.MeshBase.material_volumes`. + micro_kwargs : dict, optional + Additional keyword arguments passed to + :func:`openmc.deplete.get_microxs_and_flux`. + + """ + + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + if self.method == 'mesh-based': + # Compute material volume fractions on the mesh + if mat_vol_kwargs is None: + mat_vol_kwargs = {} + self.results['mesh_material_volumes'] = mmv = \ + self.domains.material_volumes(self.neutron_model, **mat_vol_kwargs) + + # Save results to file + mmv.save(output_dir / 'mesh_material_volumes.npz') + + # Create mesh-material filter based on what combos were found + domains = openmc.MeshMaterialFilter.from_volumes(self.domains, mmv) + else: + domains: Sequence[openmc.Cell] = self.domains + + # Check to make sure that each cell is filled with a material and + # that the volume has been set + + # TODO: If volumes are not set, run volume calculation for cells + for cell in domains: + if cell.fill is None: + raise ValueError( + f"Cell {cell.id} is not filled with a materials. " + "Please set the fill material for each cell before " + "running the R2S calculation." + ) + if cell.volume is None: + raise ValueError( + f"Cell {cell.id} does not have a volume set. " + "Please set the volume for each cell before running " + "the R2S calculation." + ) + + # Set default keyword arguments for microxs and flux calculation + if micro_kwargs is None: + micro_kwargs = {} + micro_kwargs.setdefault('path_statepoint', output_dir / 'statepoint.h5') + micro_kwargs.setdefault('path_input', output_dir / 'model.xml') + + # Run neutron transport and get fluxes and micros + self.results['fluxes'], self.results['micros'] = get_microxs_and_flux( + self.neutron_model, domains, **micro_kwargs) + + # Save flux and micros to file + np.save(output_dir / 'fluxes.npy', self.results['fluxes']) + write_microxs_hdf5(self.results['micros'], output_dir / 'micros.h5') + + def step2_activation( + self, + timesteps: Sequence[float] | Sequence[tuple[float, str]], + source_rates: float | Sequence[float], + timestep_units: str = 's', + output_dir: PathLike = 'activation', + operator_kwargs: dict | None = None, + ): + """Run the activation step. + + This step creates a unique copy of each activation material based on the + mesh elements or cells, then solves the depletion equations for each + material using the fluxes and microscopic cross sections obtained in the + neutron transport step. This step will populate the 'depletion_results' + and 'activation_materials' keys in the results dictionary. + + Parameters + ---------- + timesteps : Sequence[float] or Sequence[tuple[float, str]] + Sequence of timesteps. Note that values are not cumulative. The + units are specified by the `timestep_units` argument when + `timesteps` is an iterable of float. Alternatively, units can be + specified for each step by passing an iterable of (value, unit) + tuples. + source_rates : float | Sequence[float] + Source rate in [neutron/sec] for each interval in `timesteps`. + timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional + Units for values specified in the `timesteps` argument when passing + float values. 's' means seconds, 'min' means minutes, 'h' means + hours, 'd' means days, and 'a' means years (Julian). + output_dir : PathLike, optional + Path to directory where activation calculation outputs will be + saved. + operator_kwargs : dict, optional + Additional keyword arguments passed to + :class:`openmc.deplete.IndependentOperator`. + """ + + if self.method == 'mesh-based': + # Get unique material for each (mesh, material) combination + mmv = self.results['mesh_material_volumes'] + self.results['activation_materials'] = get_activation_materials(self.neutron_model, mmv) + else: + # Create unique material for each cell + activation_mats = openmc.Materials() + for cell in self.domains: + mat = cell.fill.clone() + mat.name = f'Cell {cell.id}' + mat.depletable = True + mat.volume = cell.volume + activation_mats.append(mat) + self.results['activation_materials'] = activation_mats + + # Save activation materials to file + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + self.results['activation_materials'].export_to_xml( + output_dir / 'materials.xml') + + # Create depletion operator for the activation materials + if operator_kwargs is None: + operator_kwargs = {} + operator_kwargs.setdefault('normalization_mode', 'source-rate') + op = IndependentOperator( + self.results['activation_materials'], + self.results['fluxes'], + self.results['micros'], + **operator_kwargs + ) + + # Create time integrator and solve depletion equations + integrator = PredictorIntegrator( + op, timesteps, source_rates=source_rates, timestep_units=timestep_units + ) + output_path = output_dir / 'depletion_results.h5' + integrator.integrate(final_step=False, path=output_path) + + # Get depletion results + self.results['depletion_results'] = Results(output_path) + + def step3_photon_transport( + self, + time_indices: Sequence[int] | None = None, + bounding_boxes: dict[int, openmc.BoundingBox] | None = None, + output_dir: PathLike = 'photon_transport', + mat_vol_kwargs: dict | None = None, + run_kwargs: dict | None = None, + ): + """Run the photon transport step. + + This step performs photon transport calculations using decay photon + sources created from the activated materials. For each specified time, + it creates appropriate photon sources and runs a transport calculation. + In mesh-based mode, the sources are created using the mesh material + volumes, while in cell-based mode, they are created using bounding boxes + for each cell. This step will populate the 'photon_tallies' key in the + results dictionary. + + Parameters + ---------- + time_indices : Sequence[int], optional + Sequence of time indices at which photon transport should be run; + represented as indices into the array of times formed by the + timesteps. For example, if two timesteps are specified, the array of + times would contain three entries, and [2] would indicate computing + photon results at the last time. A value of None indicates to run + photon transport for each time. + bounding_boxes : dict[int, openmc.BoundingBox], optional + Dictionary mapping cell IDs to bounding boxes used for spatial + source sampling in cell-based R2S calculations. Required if method + is 'cell-based'. + output_dir : PathLike, optional + Path to directory where photon transport outputs will be saved. + mat_vol_kwargs : dict, optional + Additional keyword arguments passed to + :meth:`openmc.MeshBase.material_volumes`. + run_kwargs : dict, optional + Additional keyword arguments passed to :meth:`openmc.Model.run` + during the photon transport step. By default, output is disabled. + """ + + # TODO: Automatically determine bounding box for each cell + if bounding_boxes is None and self.method == 'cell-based': + raise ValueError("bounding_boxes must be provided for cell-based " + "R2S calculations.") + + # Set default run arguments if not provided + if run_kwargs is None: + run_kwargs = {} + run_kwargs.setdefault('output', False) + + # Write out JSON file with tally IDs that can be used for loading + # results + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Get default time indices if not provided + if time_indices is None: + n_steps = len(self.results['depletion_results']) + time_indices = list(range(n_steps)) + + # Check whether the photon model is different + neutron_univ = self.neutron_model.geometry.root_universe + photon_univ = self.photon_model.geometry.root_universe + different_photon_model = (neutron_univ != photon_univ) + + # For mesh-based calculations, compute material volume fractions for the + # photon model if it is different from the neutron model to account for + # potential material changes + if self.method == 'mesh-based' and different_photon_model: + self.results['mesh_material_volumes_photon'] = photon_mmv = \ + self.domains.material_volumes(self.photon_model, **mat_vol_kwargs) + + # Save photon MMV results to file + photon_mmv.save(output_dir / 'mesh_material_volumes.npz') + + tally_ids = [tally.id for tally in self.photon_model.tallies] + with open(output_dir / 'tally_ids.json', 'w') as f: + json.dump(tally_ids, f) + + self.results['photon_tallies'] = {} + + # Get dictionary of cells in the photon model + if different_photon_model: + photon_cells = self.photon_model.geometry.get_all_cells() + + for time_index in time_indices: + # Create decay photon source + if self.method == 'mesh-based': + self.photon_model.settings.source = \ + self.get_decay_photon_source_mesh(time_index) + else: + sources = [] + results = self.results['depletion_results'] + for cell, original_mat in zip(self.domains, self.results['activation_materials']): + # Skip if the cell is not in the photon model or the + # material has changed + if different_photon_model: + if cell.id not in photon_cells or \ + cell.fill.id != photon_cells[cell.id].fill.id: + continue + + # Get bounding box for the cell + bounding_box = bounding_boxes[cell.id] + + # Get activated material composition + activated_mat = results[time_index].get_material(str(original_mat.id)) + + # Create decay photon source source + space = openmc.stats.Box(*bounding_box) + energy = activated_mat.get_decay_photon_energy() + strength = energy.integral() if energy is not None else 0.0 + source = openmc.IndependentSource( + space=space, + energy=energy, + particle='photon', + strength=strength, + constraints={'domains': [cell]} + ) + sources.append(source) + self.photon_model.settings.source = sources + + # Convert time_index (which may be negative) to a normal index + if time_index < 0: + time_index = len(self.results['depletion_results']) + time_index + + # Run photon transport calculation + run_kwargs['cwd'] = Path(output_dir) / f'time_{time_index}' + statepoint_path = self.photon_model.run(**run_kwargs) + + # Store tally results + with openmc.StatePoint(statepoint_path) as sp: + self.results['photon_tallies'][time_index] = [ + sp.tallies[tally.id] for tally in self.photon_model.tallies + ] + + def get_decay_photon_source_mesh( + self, + time_index: int = -1 + ) -> list[openmc.MeshSource]: + """Create decay photon source for a mesh-based calculation. + + This function creates N :class:`MeshSource` objects where N is the + maximum number of unique materials that appears in a single mesh + element. For each mesh element-material combination, and + IndependentSource instance is created with a spatial constraint limited + the sampled decay photons to the correct region. + + When the photon transport model is different from the neutron model, the + photon MeshMaterialVolumes is used to determine whether an (element, + material) combination exists in the photon model. + + Parameters + ---------- + time_index : int, optional + Time index for the decay photon source. Default is -1 (last time). + + Returns + ------- + list of openmc.MeshSource + A list of MeshSource objects, each containing IndependentSource + instances for the decay photons in the corresponding mesh element. + + """ + mat_dict = self.neutron_model._get_all_materials() + + # Some MeshSource objects will have empty positions; create a "null source" + # that is used for this case + null_source = openmc.IndependentSource(particle='photon', strength=0.0) + + # List to hold sources for each MeshSource (length = N) + source_lists = [] + + # Index in the overall list of activated materials + index_mat = 0 + + # Get various results from previous steps + mat_vols = self.results['mesh_material_volumes'] + materials = self.results['activation_materials'] + results = self.results['depletion_results'] + photon_mat_vols = self.results.get('mesh_material_volumes_photon') + + # Total number of mesh elements + n_elements = mat_vols.num_elements + + for index_elem in range(n_elements): + # Determine which materials exist in the photon model for this element + if photon_mat_vols is not None: + photon_materials = { + mat_id + for mat_id, _ in photon_mat_vols.by_element(index_elem) + if mat_id is not None + } + + for j, (mat_id, _) in enumerate(mat_vols.by_element(index_elem)): + # Skip void volume + if mat_id is None: + continue + + # Skip if this material doesn't exist in photon model + if photon_mat_vols is not None and mat_id not in photon_materials: + index_mat += 1 + continue + + # Check whether a new MeshSource object is needed + if j >= len(source_lists): + source_lists.append([null_source]*n_elements) + + # Get activated material composition + original_mat = materials[index_mat] + activated_mat = results[time_index].get_material(str(original_mat.id)) + + # Create decay photon source source + energy = activated_mat.get_decay_photon_energy() + strength = energy.integral() if energy is not None else 0.0 + source_lists[j][index_elem] = openmc.IndependentSource( + energy=energy, + particle='photon', + strength=strength, + constraints={'domains': [mat_dict[mat_id]]} + ) + + # Increment index of activated material + index_mat += 1 + + # Return list of mesh sources + return [openmc.MeshSource(self.domains, sources) for sources in source_lists] + + def load_results(self, path: PathLike): + """Load results from a previous R2S calculation. + + Parameters + ---------- + path : PathLike + Path to the directory containing the R2S calculation results. + + """ + path = Path(path) + + # Load neutron transport results + neutron_dir = path / 'neutron_transport' + if self.method == 'mesh-based': + mmv_file = neutron_dir / 'mesh_material_volumes.npz' + if mmv_file.exists(): + self.results['mesh_material_volumes'] = \ + openmc.MeshMaterialVolumes.from_npz(mmv_file) + fluxes_file = neutron_dir / 'fluxes.npy' + if fluxes_file.exists(): + self.results['fluxes'] = list(np.load(fluxes_file, allow_pickle=True)) + micros_dict = read_microxs_hdf5(neutron_dir / 'micros.h5') + self.results['micros'] = [ + micros_dict[f'domain_{i}'] for i in range(len(micros_dict)) + ] + + # Load activation results + activation_dir = path / 'activation' + activation_results = activation_dir / 'depletion_results.h5' + if activation_results.exists(): + self.results['depletion_results'] = Results(activation_results) + activation_mats_file = activation_dir / 'materials.xml' + if activation_mats_file.exists(): + self.results['activation_materials'] = \ + openmc.Materials.from_xml(activation_mats_file) + + # Load photon transport results + photon_dir = path / 'photon_transport' + + # Load photon mesh material volumes if they exist (for mesh-based calculations) + if self.method == 'mesh-based': + photon_mmv_file = photon_dir / 'mesh_material_volumes.npz' + if photon_mmv_file.exists(): + self.results['mesh_material_volumes_photon'] = \ + openmc.MeshMaterialVolumes.from_npz(photon_mmv_file) + + # Load tally IDs from JSON file + tally_ids_path = photon_dir / 'tally_ids.json' + if tally_ids_path.exists(): + with tally_ids_path.open('r') as f: + tally_ids = json.load(f) + self.results['photon_tallies'] = {} + + # For each photon transport calc, load the statepoint and get the + # tally results based on tally_ids + for time_dir in photon_dir.glob('time_*'): + time_index = int(time_dir.name.split('_')[1]) + for sp_path in time_dir.glob('statepoint.*.h5'): + with openmc.StatePoint(sp_path) as sp: + self.results['photon_tallies'][time_index] = [ + sp.tallies[tally_id] for tally_id in tally_ids + ] diff --git a/openmc/mesh.py b/openmc/mesh.py index 2e9abd1b65c..d34691bc7bd 100644 --- a/openmc/mesh.py +++ b/openmc/mesh.py @@ -317,20 +317,10 @@ def get_homogenized_materials( vols = self.material_volumes(model, n_samples, **kwargs) mat_volume_by_element = [vols.by_element(i) for i in range(vols.num_elements)] - # Create homogenized material for each element - materials = model.geometry.get_all_materials() - - # Account for materials in DAGMC universes - # TODO: This should really get incorporated in lower-level calls to - # get_all_materials, but right now it requires information from the - # Model object - for cell in model.geometry.get_all_cells().values(): - if isinstance(cell.fill, openmc.DAGMCUniverse): - names = cell.fill.material_names - materials.update({ - mat.id: mat for mat in model.materials if mat.name in names - }) + # Get dictionary of all materials + materials = model._get_all_materials() + # Create homogenized material for each element homogenized_materials = [] for mat_volume_list in mat_volume_by_element: material_ids, volumes = [list(x) for x in zip(*mat_volume_list)] @@ -402,7 +392,7 @@ def material_volumes( # In order to get mesh into model, we temporarily replace the # tallies with a single mesh tally using the current mesh - original_tallies = model.tallies + original_tallies = list(model.tallies) new_tally = openmc.Tally() new_tally.filters = [openmc.MeshFilter(self)] new_tally.scores = ['flux'] diff --git a/openmc/model/model.py b/openmc/model/model.py index 59e6fa511e5..66a0253341c 100644 --- a/openmc/model/model.py +++ b/openmc/model/model.py @@ -203,6 +203,29 @@ def _materials_by_name(self) -> dict[int, openmc.Material]: result[mat.name].add(mat) return result + # TODO: This should really get incorporated in lower-level calls to + # get_all_materials, but right now it requires information from the Model object + def _get_all_materials(self) -> dict[int, openmc.Material]: + """Get all materials including those in DAGMC universes + + Returns + ------- + dict + Dictionary mapping material ID to material instances + """ + # Get all materials from the Geometry object + materials = self.geometry.get_all_materials() + + # Account for materials in DAGMC universes + for cell in self.geometry.get_all_cells().values(): + if isinstance(cell.fill, openmc.DAGMCUniverse): + names = cell.fill.material_names + materials.update({ + mat.id: mat for mat in self.materials if mat.name in names + }) + + return materials + @classmethod def from_xml( cls, diff --git a/openmc/utility_funcs.py b/openmc/utility_funcs.py index da9f73b1651..935a589853d 100644 --- a/openmc/utility_funcs.py +++ b/openmc/utility_funcs.py @@ -3,6 +3,8 @@ from pathlib import Path from tempfile import TemporaryDirectory +import h5py + import openmc from .checkvalue import PathLike @@ -57,3 +59,20 @@ def input_path(filename: PathLike) -> Path: return Path(filename).resolve() else: return Path(filename) + + +@contextmanager +def h5py_file_or_group(group_or_filename: PathLike | h5py.Group, *args, **kwargs): + """Context manager for opening an HDF5 file or using an existing group + + Parameters + ---------- + group_or_filename : path-like or h5py.Group + Path to HDF5 file, or group from an existing HDF5 file + + """ + if isinstance(group_or_filename, h5py.Group): + yield group_or_filename + else: + with h5py.File(group_or_filename, *args, **kwargs) as f: + yield f diff --git a/tests/unit_tests/test_r2s.py b/tests/unit_tests/test_r2s.py new file mode 100644 index 00000000000..a94f85c8c08 --- /dev/null +++ b/tests/unit_tests/test_r2s.py @@ -0,0 +1,152 @@ +from pathlib import Path + +import pytest +import openmc +from openmc.deplete import Chain, R2SManager + + +@pytest.fixture +def simple_model_and_mesh(tmp_path): + # Define two materials: water and Ni + h2o = openmc.Material() + h2o.add_nuclide("H1", 2.0) + h2o.add_nuclide("O16", 1.0) + h2o.set_density("g/cm3", 1.0) + nickel = openmc.Material() + nickel.add_element("Ni", 1.0) + nickel.set_density("g/cm3", 4.0) + + # Geometry: two half-spaces split by x=0 plane + left = openmc.XPlane(0.0) + x_min = openmc.XPlane(-10.0, boundary_type='vacuum') + x_max = openmc.XPlane(10.0, boundary_type='vacuum') + y_min = openmc.YPlane(-10.0, boundary_type='vacuum') + y_max = openmc.YPlane(10.0, boundary_type='vacuum') + z_min = openmc.ZPlane(-10.0, boundary_type='vacuum') + z_max = openmc.ZPlane(10.0, boundary_type='vacuum') + + c1 = openmc.Cell(fill=h2o, region=+x_min & -left & +y_min & -y_max & +z_min & -z_max) + c2 = openmc.Cell(fill=nickel, region=+left & -x_max & +y_min & -y_max & +z_min & -z_max) + c1.volume = 4000.0 + c2.volume = 4000.0 + geometry = openmc.Geometry([c1, c2]) + + # Simple settings with a point source + settings = openmc.Settings() + settings.batches = 10 + settings.particles = 1000 + settings.run_mode = 'fixed source' + settings.source = openmc.IndependentSource() + model = openmc.Model(geometry, settings=settings) + + mesh = openmc.RegularMesh() + mesh.lower_left = (-10.0, -10.0, -10.0) + mesh.upper_right = (10.0, 10.0, 10.0) + mesh.dimension = (1, 1, 1) + return model, (c1, c2), mesh + + +def test_r2s_mesh_expected_output(simple_model_and_mesh, tmp_path): + model, (c1, c2), mesh = simple_model_and_mesh + + # Use mesh-based domains + r2s = R2SManager(model, mesh) + + # Use custom reduced chain file for Ni + chain = Chain.from_xml(Path(__file__).parents[1] / "chain_ni.xml") + + # Run R2S calculation + outdir = r2s.run( + timesteps=[(1.0, 'd')], + source_rates=[1.0], + photon_time_indices=[1], + output_dir=tmp_path, + chain_file=chain, + ) + + # Check directories and files exist + nt = Path(outdir) / 'neutron_transport' + assert (nt / 'fluxes.npy').exists() + assert (nt / 'micros.h5').exists() + assert (nt / 'mesh_material_volumes.npz').exists() + act = Path(outdir) / 'activation' + assert (act / 'depletion_results.h5').exists() + pt = Path(outdir) / 'photon_transport' + assert (pt / 'tally_ids.json').exists() + assert (pt / 'time_1' / 'statepoint.10.h5').exists() + + # Basic results structure checks + assert len(r2s.results['fluxes']) == 2 + assert len(r2s.results['micros']) == 2 + assert len(r2s.results['mesh_material_volumes']) == 2 + assert len(r2s.results['activation_materials']) == 2 + assert len(r2s.results['depletion_results']) == 2 + + # Check activation materials + amats = r2s.results['activation_materials'] + assert all(m.depletable for m in amats) + # Volumes preserved + assert {m.volume for m in amats} == {c1.volume, c2.volume} + + # Check loading results + r2s_loaded = R2SManager(model, mesh) + r2s_loaded.load_results(outdir) + assert len(r2s_loaded.results['fluxes']) == 2 + assert len(r2s_loaded.results['micros']) == 2 + assert len(r2s_loaded.results['mesh_material_volumes']) == 2 + assert len(r2s_loaded.results['activation_materials']) == 2 + assert len(r2s_loaded.results['depletion_results']) == 2 + + +def test_r2s_cell_expected_output(simple_model_and_mesh, tmp_path): + model, (c1, c2), _ = simple_model_and_mesh + + # Use cell-based domains + r2s = R2SManager(model, [c1, c2]) + + # Use custom reduced chain file for Ni + chain = Chain.from_xml(Path(__file__).parents[1] / "chain_ni.xml") + + # Run R2S calculation + bounding_boxes = {c1.id: c1.bounding_box, c2.id: c2.bounding_box} + outdir = r2s.run( + timesteps=[(1.0, 'd')], + source_rates=[1.0], + photon_time_indices=[1], + output_dir=tmp_path, + bounding_boxes=bounding_boxes, + chain_file=chain + ) + + # Check directories and files exist + nt = Path(outdir) / 'neutron_transport' + assert (nt / 'fluxes.npy').exists() + assert (nt / 'micros.h5').exists() + act = Path(outdir) / 'activation' + assert (act / 'depletion_results.h5').exists() + pt = Path(outdir) / 'photon_transport' + assert (pt / 'tally_ids.json').exists() + assert (pt / 'time_1' / 'statepoint.10.h5').exists() + + # Basic results structure checks + assert len(r2s.results['fluxes']) == 2 + assert len(r2s.results['micros']) == 2 + assert len(r2s.results['activation_materials']) == 2 + assert len(r2s.results['depletion_results']) == 2 + + # Check activation materials + amats = r2s.results['activation_materials'] + assert all(m.depletable for m in amats) + # Names include cell IDs + assert any(f"Cell {c1.id}" in m.name for m in amats) + assert any(f"Cell {c2.id}" in m.name for m in amats) + # Volumes preserved + assert {m.volume for m in amats} == {c1.volume, c2.volume} + + # Check loading results + r2s_loaded = R2SManager(model, [c1, c2]) + r2s_loaded.load_results(outdir) + assert len(r2s_loaded.results['fluxes']) == 2 + assert len(r2s_loaded.results['micros']) == 2 + assert len(r2s_loaded.results['activation_materials']) == 2 + assert len(r2s_loaded.results['depletion_results']) == 2