diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 7f2c8e2..f4eefd7 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -14,6 +14,7 @@ Notes for major and minor releases. Notes for patch releases are referred. **Of interest to the Developer:** +- PR #24 Class ReflectedBeamOptions to reuse when saving to diferent file formats - PR #23 Class DirectBeamOptions to reuse when saving to diferent file formats - PR #22 SampleLogs class to reduce boilerplate code and make the code more pythonic - PR #21 Enum DataType substitutes expressions involving integer values to improved understanding of the source diff --git a/src/mr_reduction/reflectivity_output.py b/src/mr_reduction/reflectivity_output.py index c905bf8..322ad5e 100644 --- a/src/mr_reduction/reflectivity_output.py +++ b/src/mr_reduction/reflectivity_output.py @@ -6,7 +6,7 @@ # standard imports import math import time -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import List, Optional # third party imports @@ -15,7 +15,7 @@ # mr_reduction imports import mr_reduction from mr_reduction.runpeak import RunPeakNumber -from mr_reduction.simple_utils import SampleLogs +from mr_reduction.simple_utils import SampleLogs, workspace_handle from mr_reduction.types import MantidWorkspace @@ -39,6 +39,30 @@ class DirectBeamOptions: number: int # run number File: str # normalization run in the re-processed and legacy-compatible, readable by QuickNXS + @staticmethod + def option_names() -> List[str]: + """List of option names in the order expected for a QuickNXS output file""" + return [ + "DB_ID", + "P0", + "PN", + "x_pos", + "x_width", + "y_pos", + "y_width", + "bg_pos", + "bg_width", + "dpix", + "tth", + "number", + "File", + ] + + @classmethod + def dat_header(cls) -> str: + """Header for the direct beam options in the *_autoreduced.dat file""" + return "# [Direct Beam Runs]\n# %s\n" % " ".join(["%8s" % name for name in cls.option_names()]) + @staticmethod def from_workspace(input_workspace: MantidWorkspace, direct_beam_counter=1) -> Optional["DirectBeamOptions"]: """Create an instance of DirectBeamOptions from a workspace. @@ -87,28 +111,169 @@ def from_workspace(input_workspace: MantidWorkspace, direct_beam_counter=1) -> O File=filename, ) + @property + def as_dat(self) -> str: + """ "Formatted string representation of the DirectBeamOptions suitable for an *_autoreduced.dat file""" + clean_dict = {} + for name in self.option_names(): + value = getattr(self, name) + if isinstance(value, (bool, str)): + clean_dict[name] = "%8s" % value + else: + clean_dict[name] = "%8g" % value + + template = "# %s\n" % " ".join(["{%s}" % p for p in self.option_names()]) + return template.format(**clean_dict) + + +@dataclass +class ReflectedBeamOptions: + scale: float + P0: int + PN: int + x_pos: float # peak center + x_width: float # peak width + y_pos: float + y_width: float + bg_pos: float + bg_width: float + fan: bool + dpix: float + tth: float + number: str + DB_ID: int + File: str + """two-theta offset to be applied when saving the options to a file later to be read by QuickNXS""" + tth_offset: float = field(repr=False, default=0.0) + + @staticmethod + def option_names() -> List[str]: + """List of option names, excluding the two-theta offset, in the order expected for a QuickNXS output file""" + return [ + "scale", + "P0", + "PN", + "x_pos", + "x_width", + "y_pos", + "y_width", + "bg_pos", + "bg_width", + "fan", + "dpix", + "tth", + "number", + "DB_ID", + "File", + ] + @classmethod def dat_header(cls) -> str: """Header for the direct beam options in the *_autoreduced.dat file""" - return "# [Direct Beam Runs]\n# %s\n" % " ".join(["%8s" % field for field in cls.__dataclass_fields__]) + return "# [Data Runs]\n# %s\n" % " ".join(["%8s" % name for name in cls.option_names()]) - @property - def options(self) -> List[str]: - """List of option names""" - return self.__dataclass_fields__ + @staticmethod + def filename(input_workspace: MantidWorkspace) -> str: + """ + Generate a filename for the given Mantid workspace. + + This method retrieves the filename from the Mantid workspace's run object. + If the filename ends with 'nxs.h5', it modifies the filename to be compatible + with QuickNXS by replacing 'nexus' with 'data' and changing the extension to '_histo.nxs'. + If the workspace is live data and does not have a filename, it returns 'live data'. + """ + sample_logs = SampleLogs(input_workspace) + if "Filename" in sample_logs: + filename = sample_logs["Filename"] + if filename.endswith("nxs.h5"): + filename = filename.replace("nexus", "data") + filename = filename.replace(".nxs.h5", "_histo.nxs") + else: + filename = "live data" # For live data, we might not have a file name + return filename + + @staticmethod + def two_theta_offset(input_workspace: MantidWorkspace) -> float: + """two-theta offset for compatibility with QuickNXS. + + For some reason, the tth value that QuickNXS expects is offset. + It seems to be because that same offset is applied later in the QuickNXS calculation. + """ + ws = workspace_handle(input_workspace) + sample_logs = SampleLogs(input_workspace) + scatt_pos = sample_logs["specular_pixel"] + det_distance = sample_logs.mean("SampleDetDis") + # Check units + if sample_logs.property("SampleDetDis").units not in ["m", "meter"]: + det_distance /= 1000.0 + direct_beam_pix = sample_logs.mean("DIRPIX") + # Get pixel size from instrument properties + if ws.getInstrument().hasParameter("pixel-width"): + pixel_width = float(ws.getInstrument().getNumberParameter("pixel-width")[0]) / 1000.0 + else: + pixel_width = 0.0007 + + return -((direct_beam_pix - scatt_pos) * pixel_width) / det_distance * 180.0 / math.pi + + @classmethod + def from_workspace(cls, input_workspace: MantidWorkspace, direct_beam_counter=1) -> "ReflectedBeamOptions": + """Create an instance of ReflectedBeamOptions from a workspace. + + Parameters + ---------- + input_workspace : MantidWorkspace + The Mantid workspace from which to create the ReflectedBeamOptions instance. + direct_beam_counter : int, optional + The counter for the direct beam associated to this reflected beam, by default 1. + """ + sample_logs = SampleLogs(input_workspace) + + peak_min = sample_logs["scatt_peak_min"] + peak_max = sample_logs["scatt_peak_max"] + bg_min = sample_logs["scatt_bg_min"] + bg_max = sample_logs["scatt_bg_max"] + low_res_min = sample_logs["scatt_low_res_min"] + low_res_max = sample_logs["scatt_low_res_max"] + filename = cls.filename(input_workspace) + scatt_pos = sample_logs["specular_pixel"] + + options = ReflectedBeamOptions( + scale=1, + P0=0, + PN=0, + x_pos=scatt_pos, + x_width=peak_max - peak_min + 1, + y_pos=(low_res_max + low_res_min) / 2.0, + y_width=low_res_max - low_res_min + 1, + bg_pos=(bg_min + bg_max) / 2.0, + bg_width=bg_max - bg_min + 1, + fan=sample_logs["constant_q_binning"], + dpix=sample_logs.mean("DIRPIX"), + tth=sample_logs["two_theta"], + number=sample_logs["run_number"], + DB_ID=direct_beam_counter, + File=filename, + ) + + # two-theta offset for compatibility with QuickNXS + options.tth_offset = cls.two_theta_offset(input_workspace) + + return options @property def as_dat(self) -> str: - """ "Formatted string representation of the DirectBeamOptions suitable for an *_autoreduced.dat file""" + """Formatted string representation of the ReflectedBeamOptions suitable for an *_autoreduced.dat file""" clean_dict = {} - for option in self.options: - value = getattr(self, option) - if isinstance(value, (bool, str)): - clean_dict[option] = "%8s" % value + for name in self.option_names(): + value = getattr(self, name) + if name == "tth": + value += self.tth_offset + if isinstance(value, str): + clean_dict[name] = "%8s" % value else: - clean_dict[option] = "%8g" % value + clean_dict[name] = "%8g" % value - template = "# %s\n" % " ".join(["{%s}" % p for p in self.options]) + template = "# %s\n" % " ".join(["{%s}" % p for p in self.option_names()]) return template.format(**clean_dict) @@ -144,136 +309,50 @@ def write_reflectivity(ws_list, output_path, cross_section): if direct_beam_options is not None: fd.write(direct_beam_options.as_dat) - # Scattering data - dataset_options = [ - "scale", - "P0", - "PN", - "x_pos", - "x_width", - "y_pos", - "y_width", - "bg_pos", - "bg_width", - "fan", - "dpix", - "tth", - "number", - "DB_ID", - "File", - ] - + # + # Write scattering options and collect scatting data for later + # fd.write("#\n") - fd.write("# [Data Runs]\n") - toks = ["%8s" % item for item in dataset_options] - fd.write("# %s\n" % " ".join(toks)) - i_direct_beam = 0 - - data_block = "" - for ws in ws_list: - i_direct_beam += 1 - - sample_logs = SampleLogs(ws) - peak_min = sample_logs["scatt_peak_min"] - peak_max = sample_logs["scatt_peak_max"] - bg_min = sample_logs["scatt_bg_min"] - bg_max = sample_logs["scatt_bg_max"] - low_res_min = sample_logs["scatt_low_res_min"] - low_res_max = sample_logs["scatt_low_res_max"] - dpix = sample_logs.mean("DIRPIX") - # For live data, we might not have a file name - if "Filename" in sample_logs: - filename = sample_logs["Filename"] - # In order to make the file loadable by QuickNXS, we have to change the - # file name to the re-processed and legacy-compatible files. - # The new QuickNXS can load both. - if filename.endswith("nxs.h5"): - filename = filename.replace("nexus", "data") - filename = filename.replace(".nxs.h5", "_histo.nxs") - else: - filename = "live data" - constant_q_binning = sample_logs["constant_q_binning"] - scatt_pos = sample_logs["specular_pixel"] - - # For some reason, the tth value that QuickNXS expects is offset. - # It seems to be because that same offset is applied later in the QuickNXS calculation. - # Correct tth here so that it can load properly in QuickNXS and produce the same result. - tth = sample_logs["two_theta"] - det_distance = sample_logs.mean("SampleDetDis") - # Check units - if sample_logs.property("SampleDetDis").units not in ["m", "meter"]: - det_distance /= 1000.0 - direct_beam_pix = sample_logs.mean("DIRPIX") - - # Get pixel size from instrument properties - if ws.getInstrument().hasParameter("pixel-width"): - pixel_width = float(ws.getInstrument().getNumberParameter("pixel-width")[0]) / 1000.0 - else: - pixel_width = 0.0007 - tth -= ((direct_beam_pix - scatt_pos) * pixel_width) / det_distance * 180.0 / math.pi - - item = dict( - scale=1, - DB_ID=i_direct_beam, - P0=0, - PN=0, - tth=tth, - fan=constant_q_binning, - x_pos=scatt_pos, - x_width=peak_max - peak_min + 1, - y_pos=(low_res_max + low_res_min) / 2.0, - y_width=low_res_max - low_res_min + 1, - bg_pos=(bg_min + bg_max) / 2.0, - bg_width=bg_max - bg_min + 1, - dpix=dpix, - number=str(ws.getRunNumber()), - File=filename, - ) - - par_list = ["{%s}" % p for p in dataset_options] - template = "# %s\n" % " ".join(par_list) - _clean_dict = {} - for key in item: - if isinstance(item[key], str): - _clean_dict[key] = "%8s" % item[key] - else: - _clean_dict[key] = "%8g" % item[key] - fd.write(template.format(**_clean_dict)) - - x = ws.readX(0) - y = ws.readY(0) - dy = ws.readE(0) - dx = ws.readDx(0) - tth = sample_logs["two_theta"] * math.pi / 360.0 - quicknxs_scale = quicknxs_scaling_factor(ws) + fd.write(ReflectedBeamOptions.dat_header()) + data_block = "" # collect the data for later + for i_direct_beam, ws in enumerate(ws_list, start=1): + reflected_beam_options = ReflectedBeamOptions.from_workspace(ws, i_direct_beam) + fd.write(reflected_beam_options.as_dat) + # collect the numerical data into `data_block` + x, y, dy, dx = ws.readX(0), ws.readY(0), ws.readE(0), ws.readDx(0) + theta = reflected_beam_options.tth * math.pi / 360.0 + sf = quicknxs_scaling_factor(ws) for i in range(len(x)): - data_block += "%12.6g %12.6g %12.6g %12.6g %12.6g\n" % ( - x[i], - y[i] * quicknxs_scale, - dy[i] * quicknxs_scale, - dx[i], - tth, - ) + data_block += "%12.6g %12.6g %12.6g %12.6g %12.6g\n" % (x[i], y[i] * sf, dy[i] * sf, dx[i], theta) - fd.write("#\n") - fd.write("# [Global Options]\n") - fd.write("# name value\n") - # TODO: set the sample dimension as an option - fd.write("# sample_length 10\n") - fd.write("#\n") + fd.write("""# +# [Global Options] +# name value +# sample_length 10 +# +""") + + # + # Write sequence information from the last workspace in the list + # fd.write("# [Sequence]\n") - sample_logs = SampleLogs(ws_list[-1]) # sample logs of the last workspace - if "sequence_id" in sample_logs: - fd.write("# sequence_id %s\n" % sample_logs["sequence_id"]) - if "sequence_number" in sample_logs: - fd.write("# sequence_number %s\n" % sample_logs["sequence_number"]) - if "sequence_total" in sample_logs: - fd.write("# sequence_total %s\n" % sample_logs["sequence_total"]) - fd.write("#\n") - fd.write("# [Data]\n") - toks = ["%12s" % item for item in ["Qz [1/A]", "R [a.u.]", "dR [a.u.]", "dQz [1/A]", "theta [rad]"]] - fd.write("# %s\n" % " ".join(toks)) - fd.write("#\n%s\n" % data_block) + sample_logs = SampleLogs(ws_list[-1]) # use the last workspace for the sequence information + line_template = "# {0} {1}\n" + for entry in ["sequence_id", "sequence_number", "sequence_total"]: + if entry in sample_logs: + fd.write(line_template.format(entry, sample_logs[entry])) + + # + # Write scattering data + # + tokens = ["%12s" % item for item in ["Qz [1/A]", "R [a.u.]", "dR [a.u.]", "dQz [1/A]", "theta [rad]"]] + header = "# %s" % " ".join(tokens) + fd.write(f"""# +# [Data] +{header} +# +{data_block} +""") fd.close() diff --git a/tests/unit/mr_reduction/test_reflectivity_output.py b/tests/unit/mr_reduction/test_reflectivity_output.py index c1f492d..fced526 100644 --- a/tests/unit/mr_reduction/test_reflectivity_output.py +++ b/tests/unit/mr_reduction/test_reflectivity_output.py @@ -28,7 +28,8 @@ from mantid.simpleapi import AddSampleLog, CreateWorkspace, DeleteWorkspace, LoadNexus, mtd # mr_reduction imports -from mr_reduction.reflectivity_output import DirectBeamOptions, write_reflectivity +from mr_reduction.reflectivity_output import DirectBeamOptions, ReflectedBeamOptions, write_reflectivity +from numpy.testing import assert_almost_equal @pytest.fixture(scope="module") @@ -52,6 +53,31 @@ def mock_normalization_workspace(): DeleteWorkspace(workspace) # teardown steps after all tests in this module have run +@pytest.fixture(scope="module") +def mock_reflected_workspace(): + workspace = mtd.unique_hidden_name() + CreateWorkspace(DataX=[0, 1], DataY=[0, 10], OutputWorkspace=workspace) + for name, value, logType in [ + ("Filename", "REF_M_29160.nxs.h5", "String"), + ("specular_pixel", 183.5, "Number"), + ("DIRPIX", 227.0, "Number"), + ("scatt_peak_min", 158, "Number"), + ("scatt_peak_max", 209, "Number"), + ("scatt_bg_min", 118, "Number"), + ("scatt_bg_max", 223, "Number"), + ("scatt_low_res_min", 51, "Number"), + ("scatt_low_res_max", 179, "Number"), + ("scatt_dirpix", 0.5, "Number"), + ("constant_q_binning", "False", "String"), + ("two_theta", 1.6903, "Number"), + ("run_number", 29160, "String"), + ]: + AddSampleLog(workspace, LogName=name, LogText=str(value), LogType=logType) + AddSampleLog(workspace, LogName="SampleDetDis", LogText=str(2297.0), LogType="Number", LogUnit="mm") + yield workspace + DeleteWorkspace(workspace) # teardown steps after all tests in this module have run + + class TestDirectBeamOptions: def test_dat_header(self): header = DirectBeamOptions.dat_header() @@ -72,6 +98,39 @@ def test_as_dat(self, mock_normalization_workspace): ) +class TestReflectedBeamOptions: + def test_dat_header(self): + header = ReflectedBeamOptions.dat_header() + assert header.startswith("# [Data Runs]\n") + + def test_filename(self, mock_reflected_workspace): + assert ReflectedBeamOptions.filename(mock_reflected_workspace) == "REF_M_29160_histo.nxs" + ws = CreateWorkspace(DataX=[0, 1], DataY=[0, 10], OutputWorkspace=mtd.unique_hidden_name()) + assert ReflectedBeamOptions.filename(ws) == "live data" + DeleteWorkspace(ws) + + def test_two_theta_offset(self, mock_reflected_workspace): + assert_almost_equal(ReflectedBeamOptions.two_theta_offset(mock_reflected_workspace), -0.76, decimal=2) + + def test_from_workspace(self, mock_reflected_workspace): + options = ReflectedBeamOptions.from_workspace(mock_reflected_workspace) + assert options.x_width == 209 - 158 + 1 + assert options.y_pos == (179 + 51) / 2.0 + assert options.y_width == 179 - 51 + 1 + assert options.bg_width == 223 - 118 + 1 + assert options.bg_pos == (223 + 118) / 2.0 + assert_almost_equal(options.tth_offset, -0.76, decimal=2) + assert options.File == "REF_M_29160_histo.nxs" + + def test_as_dat(self, mock_reflected_workspace): + options = ReflectedBeamOptions.from_workspace(mock_reflected_workspace) + as_dat = options.as_dat + assert as_dat == ( + "# 1 0 0 183.5 52 115 129 170.5" + " 106 False 227 0.930763 29160 1 REF_M_29160_histo.nxs\n" + ) + + @pytest.mark.datarepo() def test_write_reflectivity(mock_filesystem, data_server): reflectivity_workspace = LoadNexus(data_server.path_to("REF_M_29160_2_Off_Off_autoreduce.nxs.h5")) @@ -81,7 +140,12 @@ def test_write_reflectivity(mock_filesystem, data_server): obtained = open(output_file).readlines()[4:] expected = open(data_server.path_to("REF_M_29160_2_Off_Off_autoreduce.dat")).readlines()[4:] for obtained_line, expected_line in zip(obtained, expected): - assert obtained_line == expected_line + if ("REF_M_29137_histo.nxs" in obtained_line) or ("REF_M_29160_histo.nxs" in obtained_line): + obtained_items = obtained_line.split()[:-1] # remove the last item which is the absolute file path + expected_items = expected_line.split()[:-1] + assert obtained_items == expected_items + else: + assert obtained_line == expected_line if __name__ == "__main__":