diff --git a/pyproject.toml b/pyproject.toml index de6c58b..4174be8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "pandas", "jsonargparse", "scipy", + "actigamma", ] [project.optional-dependencies] tests = [ diff --git a/src/f4epurity/decay_chain_calc.py b/src/f4epurity/decay_chain_calc.py index 5dd0f09..e2819c7 100644 --- a/src/f4epurity/decay_chain_calc.py +++ b/src/f4epurity/decay_chain_calc.py @@ -1,6 +1,6 @@ +import math from copy import deepcopy -import math import numpy as np from f4epurity.utilities import add_user_irrad_scenario, convert_names @@ -42,6 +42,10 @@ + [3920, 400] * 3, "fluxes": [0.00536, 0.0412, 0, 0.083] + [0, 1] * 17 + [0, 1.4] * 3, }, + "Y1": { + "times": [365 * DAY_TO_SEC], + "fluxes": [1], + }, } @@ -105,7 +109,6 @@ def irradiate( decay_data_dic[nuclide]["lambda"][1:], decay_data_dic[nuclide]["Decay_daughter_names"][1:], ): - # Tool cannot handle fission or very long lived >1e19 s half-lives, therefore treat these as stable. if decay_type == "f" or ("BR" in decay_data_dic[nuclide] and lambd < 1e-20): continue @@ -168,14 +171,12 @@ def get_nuclides(scenario: dict, parent: str, decay_data_dic: dict, atoms: int) # Loop over each time step in the irradiation and decay for time, flux in zip(scenario["times"], scenario["fluxes"]): - # initialise a dictionary of nuclides which will occur decay_data_dic[parent]["lambda"] = [flux * x for x in initial_lambda] sum_nuclides = {} # Loop over each of the nuclides that is present at the start of the pulse for nuclide in init_nuclides: - # Set the first lambda in the decay chain to the lambda of this nuclide lambda_temp[0] = decay_data_dic[nuclide]["lambda"][0] @@ -268,7 +269,7 @@ def create_dictionary(decay_data: dict, parent: str, daughters: list) -> dict: def calculate_total_activity( nuclide_dict: dict, irrad_scenario: dict, decay_time: int, decay_data: dict -) -> dict: +) -> dict[str, list[np.ndarray]]: """Function to calculate the total activity of a given nuclide dictionary Parameters @@ -284,7 +285,7 @@ def calculate_total_activity( Returns ------- - dict + dict[str, list[np.ndarray]] dictionary containing the total activity for each nuclide in the nuclide dictionary """ diff --git a/src/f4epurity/dose.py b/src/f4epurity/dose.py index 3d7a4a3..36c5aa3 100644 --- a/src/f4epurity/dose.py +++ b/src/f4epurity/dose.py @@ -1,10 +1,10 @@ +import logging import os import sys +from importlib.resources import as_file, files from math import pi from pathlib import Path -from importlib.resources import files, as_file -import logging import matplotlib.pyplot as plt import numpy as np import pyevtk @@ -102,7 +102,6 @@ def dose_from_line_source(dose, x1, y1, z1, x2, y2, z2, x, y, z): def is_within_bounds( x1, y1, z1, flux_grid_file, stl_files_path=None, x2=None, y2=None, z2=None ): - # If a path to the STL files is provided, use it if stl_files_path is not None: folder_path = Path(stl_files_path) @@ -178,7 +177,6 @@ def write_vtk_file( z2=None, output_all_vtr=False, ): - # Get the bounds in which to make the plot plot_bounds = is_within_bounds( x1, y1, z1, flux_grid_file, stl_files_path, x2, y2, z2 @@ -227,17 +225,23 @@ def write_vtk_file( ) dose_array[i, j, k] = dose_point - # Create a 3D numpy array filled with the dose values based on 1/r^2 - else: - for i in range(len(x) - 1): - for j in range(len(y) - 1): - for k in range(len(z) - 1): + else: # Point source case + # get midpoints + x_mid = (x[1:] + x[:-1]) / 2 + y_mid = (y[1:] + y[:-1]) / 2 + z_mid = (z[1:] + z[:-1]) / 2 + # Create a 3D numpy array filled with the dose values based on 1/r^2 (on middle point source model) + for i in range(len(x_mid)): + for j in range(len(y_mid)): + for k in range(len(z_mid)): distance = np.sqrt( - (x[i] - x1) ** 2 + (y[j] - y1) ** 2 + (z[k] - z1) ** 2 + (x_mid[i] - x1) ** 2 + + (y_mid[j] - y1) ** 2 + + (z_mid[k] - z1) ** 2 ) # Avoid division by zero if distance == 0: - dose_array[i, j, k] = dose[0] + dose_array[i, j, k] = dose[0] / (4 * pi) else: dose_array[i, j, k] = dose[0] / (4 * pi * distance**2) @@ -248,7 +252,6 @@ def write_vtk_file( x_max = x[indices[0]] y_max = y[indices[1]] z_max = z[indices[2]] - # Create the output directory if it doesn't exist os.makedirs("output", exist_ok=True) @@ -265,7 +268,6 @@ def write_vtk_file( def plot_slice(dose_array, x, y, z, slice_axis, slice_location, plot_bounds): - # Map axis names to indices axis_dict = {"x": 0, "y": 1, "z": 2} @@ -352,8 +354,9 @@ def format_func(value, tick_number): # Loop over each solid in the stl file and plot the slice for solid in stl.solids: slice = solid.slice(plane=slice_axis, intercept=slice_location) - pairs = getattr(slice, axis_pairs[slice_axis][0] + "_pairs"), getattr( - slice, axis_pairs[slice_axis][1] + "_pairs" + pairs = ( + getattr(slice, axis_pairs[slice_axis][0] + "_pairs"), + getattr(slice, axis_pairs[slice_axis][1] + "_pairs"), ) for pair in zip(*pairs): ax.plot(*pair, color="black", linewidth=1) diff --git a/src/f4epurity/main.py b/src/f4epurity/main.py index 2fd58b6..6987134 100644 --- a/src/f4epurity/main.py +++ b/src/f4epurity/main.py @@ -1,30 +1,32 @@ -import logging -from jsonargparse import Namespace import csv import datetime import json -import numpy as np +import logging import os -from importlib.resources import files, as_file +from importlib.resources import as_file, files + +import numpy as np import pandas as pd +from jsonargparse import Namespace -from f4epurity.decay_chain_calc import calculate_total_activity from f4epurity.collapse import collapse_flux, extract_xs -from f4epurity.dose import convert_to_dose, write_vtk_file, plot_slice +from f4epurity.decay_chain_calc import calculate_total_activity +from f4epurity.dose import convert_to_dose, plot_slice, write_vtk_file from f4epurity.maintenance import ( dose_within_workstation, get_dose_at_workstation, read_maintenance_locations, ) +from f4epurity.parsing import parse_arguments, parse_isotopes_activities_file +from f4epurity.psource import GlobalPointSource, PointSource from f4epurity.reaction_rate import calculate_reaction_rate from f4epurity.utilities import ( calculate_number_of_atoms, get_isotopes, - sum_vtr_files, - normalise_nuclide_name, get_reactions_from_file, + normalise_nuclide_name, + sum_vtr_files, ) -from f4epurity.parsing import parse_arguments, parse_isotopes_activities_file F4Epurity_TITLE = """ _____ _ _ _____ _ _ @@ -76,7 +78,14 @@ def calculate_dose_for_source( x2: np.ndarray = None, y2: np.ndarray = None, z2: np.ndarray = None, -) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, list[float]]: +) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + np.ndarray, + list[float], + dict[str, list[np.ndarray]], +]: """Calculate the dose for a given source Parameters @@ -141,7 +150,6 @@ def calculate_dose_for_source( sigma_eff, flux_spectrum = collapse_flux( xs_values, args.input_flux, x1, y1, z1, x2, y2, z2 ) - # Calculate the reaction rate based on the flux and effective cross section reaction_rate = calculate_reaction_rate( args.delta_impurity, sigma_eff, flux_spectrum @@ -161,7 +169,6 @@ def calculate_dose_for_source( logging.info("Calculating the Dose...") # Determine the Dose for each nuclide for nuclide, nuclide_activity in activities.items(): - # Convert to format in dose conversion spreadsheet nuclide = normalise_nuclide_name(nuclide) @@ -210,7 +217,7 @@ def calculate_dose_for_source( else: plt.savefig(f"{run_dir}/dose_{x1}_{y1}_{z1}.png") - return dose_array, x, y, z, total_dose + return dose_array, x, y, z, total_dose, activities def calculate_dose_at_workstations( @@ -301,6 +308,7 @@ def process_sources(args: Namespace) -> None: dose_factors_df = pd.read_excel(fp) dose_arrays = [] + sources = [] # Check if a second point was provided - line source if args.x2 is not None and args.y2 is not None and args.z2 is not None: logging.info("Line source(s) selected") @@ -309,7 +317,7 @@ def process_sources(args: Namespace) -> None: for x1, y1, z1, x2, y2, z2 in zip( args.x1, args.y1, args.z1, args.x2, args.y2, args.z2 ): - dose_array, x, y, z, dose = calculate_dose_for_source( + dose_array, x, y, z, dose, activities = calculate_dose_for_source( args, x1, y1, @@ -339,8 +347,8 @@ def process_sources(args: Namespace) -> None: logging.info("Point source(s) selected") # Handle multiple coordinates being provided - for x1, y1, z1 in zip(args.x1, args.y1, args.z1): - dose_array, x, y, z, dose = calculate_dose_for_source( + for i, (x1, y1, z1) in enumerate(zip(args.x1, args.y1, args.z1)): + dose_array, x, y, z, dose, activities = calculate_dose_for_source( args, x1, y1, @@ -352,8 +360,18 @@ def process_sources(args: Namespace) -> None: dose_factors_df, ) dose_arrays.append(dose_array) + if args.write_sdef: + if args.m: + mass = args.m[i] + else: + mass = 1 + sources.append(PointSource(activities, [x1, y1, z1], mass=mass)) calculate_dose_at_workstations(args, dose, x1, y1, z1, run_dir) + if args.write_sdef: + global_source = GlobalPointSource(sources) + global_source.to_sdef(f"{run_dir}/source.sdef") + # If more than one dose array is present, sum the dose arrays (multiple sources) if len(dose_arrays) > 1: sum_vtr_files(dose_arrays, x, y, z, run_dir, masses=args.m) diff --git a/src/f4epurity/parsing.py b/src/f4epurity/parsing.py index 396bd9f..30b9e84 100644 --- a/src/f4epurity/parsing.py +++ b/src/f4epurity/parsing.py @@ -1,9 +1,9 @@ +import os from typing import List, Optional -import os import numpy as np -from jsonargparse import ArgumentParser, Namespace, ActionConfigFile import pandas as pd +from jsonargparse import ActionConfigFile, ArgumentParser, Namespace def parse_arguments(args_list: Optional[List[str]] = None) -> Namespace: @@ -138,7 +138,12 @@ def parse_arguments(args_list: Optional[List[str]] = None) -> Namespace: type=str, help="Path to STL files of ITER rooms that can be used for plotting", ) - + # Add the optional "mcnp" argument + parser.add_argument( + "--write_sdef", + action="store_true", + help="Optional 'write_sdef' argument to generate source.sdef.file for MCNP", + ) # Parse the arguments args = parser.parse_args(args_list) diff --git a/src/f4epurity/psource.py b/src/f4epurity/psource.py new file mode 100644 index 0000000..6181121 --- /dev/null +++ b/src/f4epurity/psource.py @@ -0,0 +1,279 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from pathlib import Path + +import actigamma as ag +import numpy as np + +GRID = ag.EnergyGrid(bounds=ag.linspace(0, 20e6, 100000)) # TODO + +CHAR_LIMIT = 120 + + +class GlobalPointSource: + def __init__(self, sources: list[PointSource]) -> None: + self.sources = sources + + def to_sdef(self, outfile: str | Path) -> None: + """ + Generate an MCNP SDEF (source definition) card for the global point sources and + write it to a file named mcnp_source.txt. + + Parameters + ---------- + outfile : str | Path + The path to the output file where the SDEF card will be written. + + Returns + ------- + None + """ + + sdef_line = "sdef PAR=2 POS=d1 ERG FPOS d2\n" + si1 = "SI1 L " + sp1 = "SP1 " + ds2 = "DS2 S " + intial_ds = 3 + lines_distributions = [] + start_SI = "SI{} L " + start_SP = "SP{} " + t_gamma = 0 + for idx, psource in enumerate(self.sources): + X, Y, p_activity = psource._compute_lines() + t_gamma += np.array(Y).sum() + fmesh_line = "FMESH4:P GEOM=XYZ $ Modify the fmesh as you wish\n ORIGIN 0 0 0\n IMESH 1 IINTS 1\n JMESH 1 JINTS 1\n KMESH 1 KINTS 1\n" + gamma_per_second_line = ( + f"FM4 {t_gamma:.2E} $ To be multiplied by 3.6E-09 to obtain Sv/hr\n" + ) + conv_coeff_line = "C ICRP-74 H*(10)\nDE4\n 0.01 0.015 0.02 0.03 0.04 0.05 0.06 0.07 0.08 0.10 0.15 0.20 0.3 0.4 0.5 0.6 0.8 1.0 2.0 4.0 6.0 8.0 10.0\nDF4\n 0.0485 0.1254 0.2050 0.2999 0.3381 0.3572 0.3780 0.4066 0.4399 0.5172 0.7523 1.0041 1.5083 1.9958\n 2.4657 2.9082 3.7269 4.4834 7.4896 12.0153 15.9873 19.9191 23.7600" + + for idx, psource in enumerate(self.sources): + X, Y, p_activity = psource._compute_lines() + # Adjounr global distributions + si1 = insert_wrapped_values_2(si1, psource.coord, CHAR_LIMIT) + p_gamma = np.array(Y).sum() + r_gamma = p_gamma / t_gamma + sp1 = insert_wrap_values_3(sp1, r_gamma, CHAR_LIMIT) + p_source_idx = intial_ds + idx + ds2 = insert_wrap_values_3(ds2, p_source_idx, CHAR_LIMIT) + + # add specific point source distribution + + SI_line = start_SI.format(p_source_idx) + SP_line = start_SP.format(p_source_idx) + SI_line = insert_wrapped_values(SI_line, X, CHAR_LIMIT) + SP_line = insert_wrapped_values(SP_line, Y, CHAR_LIMIT) + lines_distributions.append(SI_line) + lines_distributions.append(SP_line) + + with open(outfile, "w") as f: + f.write(sdef_line) + f.write(si1 + "\n") + f.write(sp1 + "\n") + f.write(ds2 + "\n") + for line in lines_distributions: + f.write(line + "\n") + f.write(fmesh_line) + f.write(gamma_per_second_line) + f.write(conv_coeff_line) + + +def insert_wrap_values_3( + initial_str: str, values: float | np.ndarray, limit: int +) -> str: + """ + Insert values into a string with a limit of characters per line (when not iterable) + + Parameters + ---------- + initial_str : str + initial string + values : list | np.ndarray + values to be inserted + limit : int + character limit per line + Returns + ------- + str + The resulting string with values indented according to the character limit. + """ + new_str = initial_str + "" + line = 0 + value = f"{values:.3f}" + if len(new_str.split("\n")[-1]) + len(value) > limit: + new_str += "\n " + line += 1 + new_str += f" {value}" + return new_str + + +def insert_wrapped_values_2( + initial_str: str, values: list | np.ndarray, limit: int +) -> str: + """ + Insert values into a string with a limit of characters per line + + Parameters + ---------- + initial_str : str + initial string + values : list | np.ndarray + values to be inserted + limit : int + character limit per line + Returns + ------- + str + The resulting string with values indented according to the character limit. + """ + new_str = initial_str + "" + line = 0 + for value in values: + value = f"{value:.3f}" + if len(new_str.split("\n")[-1]) + len(value) > limit: + new_str += "\n " + line += 1 + new_str += f" {value}" + return new_str + + +def insert_wrapped_values( + initial_str: str, values: list | np.ndarray, limit: int +) -> str: + """ + Insert values into a string with a limit of characters per line + + Parameters + ---------- + initial_str : str + initial string + values : list | np.ndarray + values to be inserted + limit : int + character limit per line + Returns + ------- + str + The resulting string with values indented according to the character limit. + """ + new_str = initial_str + "\n " + line = 0 + for value in values: + value = f"{value:.2e}" + if len(new_str.split("\n")[-1]) + len(value) > limit: + new_str += "\n " + line += 1 + new_str += f" {value}" + return new_str + + +class SingleSource(ABC): + """ + Generate a single source definition for MCNP simulation. + + Parameters + ---------- + activities : dict + A dictionary where keys are nuclide names and values are their activities. + coord : list[float] + List of coordinates [x1, y1, z1] for point and also [x2, y2, z2] for line source + mass : float, optional + The mass of the source in grams of component times delta impurity. + If not provided, the mass is assumed to be 1.0. + + Returns + ------- + str + A string representing the MCNP source definition. + """ + + def __init__( + self, + activities: dict[str, list[np.ndarray]], + coord: list | np.ndarray, + mass: float = 1, + ) -> None: + self.activities = activities + self.coord = coord + self.mass = mass + + def _compute_lines(self) -> tuple[list, list, float]: + inventory, t_activity, db = self._compute_inventory() + LC = ag.MultiTypeLineAggregator(db, GRID) + Y, X = LC(inventory) + # Filter out zero count values & convert to MeV + X_filtered = [x for x, y in zip(X, Y) if y != 0] + Y_filtered = [y for y in Y if y != 0] + X_filtered = np.array(X_filtered) * 1e-6 + return X_filtered.tolist(), Y_filtered, t_activity + + @abstractmethod + def _compute_inventory( + self, + ) -> tuple[ag.UnstablesInventory, float, ag.DefaultDatabase]: + pass + + +class PointSource(SingleSource): + """ + A class representing a point source + + Parameters + ---------- + SingleSource : _type_ + Compute the inventory of unstable nuclides for the point source. + """ + + def _compute_inventory( + self, + ) -> tuple[ag.UnstablesInventory, float, ag.DefaultDatabase]: + db = ag.Decay2012Database() + data = [] + t_activity = 0 + for key, value in self.activities.items(): + act = float(value[0][0]) * self.mass + if act == 0: + continue + key = _convert_elem_name(key) + data.append((db.getzai(key), act)) + t_activity += act + inventory = ag.UnstablesInventory(data) + return inventory, t_activity, db + + +def _convert_elem_name(f4epurity_name: str) -> str: + """ + Convert element name from a specific format to the expected format. + + Parameters + ---------- + inp : str + The input element name to be converted. + + Returns + ------- + str + The converted element name. + """ + # Remove leading zeros from the numeric part of the element name + import re + + match = re.match(r"([A-Za-z]+)(0*)(\d+)([A-Za-z]*)", f4epurity_name) + if match: + element, zeros, number, suffix = match.groups() + return f"{element}{int(number)}{suffix}" + return f4epurity_name + + +class LineSource(SingleSource): + """ + A class representing a line source + + Parameters + ---------- + SingleSource : _type_ + Compute the inventory of unstable nuclides for the line source. + """ + + pass diff --git a/tests/test_psource.py b/tests/test_psource.py new file mode 100644 index 0000000..cb7b28c --- /dev/null +++ b/tests/test_psource.py @@ -0,0 +1,140 @@ +import difflib + +import actigamma as ag +import matplotlib.pyplot as plt + +# global_counter = 0 +import numpy as np +import pytest + +# from f4epurity.main import calculate_total_activity # Import the function from main.py +# from f4epurity.mcnp_source_calc import ( +# convert_to_actigamma_format, # Import the function from mcnp_source_calc.py +# ) +from f4epurity.psource import ( + GlobalPointSource, + PointSource, + _convert_elem_name, + insert_wrapped_values, + insert_wrapped_values_2, +) + +DB = ag.Decay2012Database() +GRID = ag.EnergyGrid(bounds=ag.linspace(0, 14e6, 10000)) # TODO +LC = ag.MultiTypeLineAggregator(DB, GRID) + + +class TestPointSource: + def test_actigamma_import(self): + try: + db = ag.Decay2012Database() + assert db is not None + except ImportError: + assert False, "Failed to import actigamma" + + def test_compute_lines(self): + activities = { + "Co60": [np.array([100])], + } + x1 = 0 # , -1107.091, -953.078 + y1 = 0 # , 1720.756, 1537.211 + z1 = 0 # , 1241.0, 1378.307 + coord = [x1, y1, z1] + mass = 1.0 + psource = PointSource(activities, coord, mass) + X_filtered, Y_filtered, t = psource._compute_lines() + + y = np.array(Y_filtered) + x = np.array(X_filtered) + newy = y[y > 1e-30] + newx = x[y > 1e-30] + xmin = newx.min() + xmax = newx.max() + + # assert pytest.approx(xmin, 1e-2) == 1.1732e6 + # assert pytest.approx(xmax, 1e-2) == 1.3325e6 + # assert pytest.approx(newy.max(), 1e-2) == newy.min() + X2_filtered, Y2_filtered, t = psource._compute_lines() + assert X2_filtered == X_filtered + assert Y2_filtered == Y_filtered + assert y.sum() == 199.86007030599998 + + # def test_compute_lines_multi_unstable(self): + # activities = { + # "Co60": [np.array([1])], + # "Co60m": [np.array([0.5])], + # } + # x1 = -277.391 # , -1107.091, -953.078 + # y1 = 2086.437 # , 1720.756, 1537.211 + # z1 = 1241.0 # , 1241.0, 1378.307 + # coord = [x1, y1, z1] + # mass = 1.0 + # psource = PointSource(activities, coord, mass) + # X_filtered, Y_filtered = psource._compute_lines() + + +class TestGlobalSource: + def test_to_sdef(self, tmpdir): + delta_impurity = 0.05 + points = [ + { + "activities": { + "Co60m": [np.array([1])], + "Co60": [np.array([1])], + }, + "coord": [0, 0, 0], + "mass": 2.0, + }, + { + "activities": { + "Co60m": [np.array([0.5])], + "Co60": [np.array([0.5])], + }, + "coord": [0, 0, 1], + "mass": 10.0, + }, + ] + + # Create PointSource instances for each point + point_sources = [ + PointSource(point["activities"], point["coord"], point["mass"]) + for point in points + ] + + global_source = GlobalPointSource([point_sources[0], point_sources[1]]) + global_source.to_sdef(tmpdir.join("mcnp_source.txt")) + + if not tmpdir.join("mcnp_source.txt").exists(): + assert False, "Output file not created" + else: + with open(tmpdir.join("mcnp_source.txt")) as f: + lines = f.readlines() + + +def test_insert_wrapped_values(): + # Test with a simple example + init_str = "SI L" + values = np.array([100, 200000, 300000000, 40, 0.5, 6000, 7000, 800, 90, 10]) + wrapped_values = insert_wrapped_values(init_str, values, 60) + assert ( + wrapped_values + == "SI L\n 1.00e+02 2.00e+05 3.00e+08 4.00e+01 5.00e-01 6.00e+03\n 7.00e+03 8.00e+02 9.00e+01 1.00e+01" + ) + wrapped_values2 = insert_wrapped_values_2(init_str, values, 60) + assert ( + wrapped_values2 + == "SI L 100.000 200000.000 300000000.000 40.000 0.500 6000.000\n 7000.000 800.000 90.000 10.000" + ) + + +@pytest.mark.parametrize( + "inp, expected", + [ + ("Co060m", "Co60m"), + ("Co060", "Co60"), + ("H001", "H1"), + ("Xe162", "Xe162"), + ], +) +def test_convert_elem_name(inp: str, expected: str): + assert _convert_elem_name(inp) == expected