Skip to content

Commit

Permalink
Merge pull request #997 from neutrons/symmetric_auto_wedge_ewm9180
Browse files Browse the repository at this point in the history
Auto find wedge and mirror defect
  • Loading branch information
backmari authored Feb 24, 2025
2 parents e43aaec + f411c6b commit be3fc46
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 23 deletions.
1 change: 1 addition & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Release Notes

**Of interest to the User**:

- PR #997: Fix bug causing symmetric auto wedge finding to fail. Add mirrored wedge to auto wedge fit function plot.
- PR #998: remove the TOF offset that is done by the data aquisition system
- PR #994: Remove unused module `drtsans/tof/eqsans/reduce.py`
- PR #993: Skip slices with too high transmission error when using time sliced sample transmission run
Expand Down
64 changes: 48 additions & 16 deletions src/drtsans/auto_wedge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# https://docs.mantidproject.org/nightly/algorithms/Fit-v1.html
from mantid.simpleapi import Fit
from mantid.api import IFunction
import h5py
from matplotlib import pyplot as plt

Expand Down Expand Up @@ -57,8 +58,11 @@ def _create_fit_function(combo_function_str):
# split in PARAM_NAME=PARAM_VALUE style
param_list = params[value_index].split("=")
param_name_i = param_list[0]
param_value_i = float(param_list[1])
func_i.__setattr__(param_name_i, param_value_i)
param_value_i = param_list[1]
if param_name_i == "constraints":
func_i.constrain(param_value_i)
else:
func_i.__setattr__(param_name_i, float(param_value_i))
# END-FOR
single_functions.append(func_i)

Expand Down Expand Up @@ -99,7 +103,7 @@ def _calculate_function(fit_functions, vec_x):
Parameters
----------
fit_functions: ~list
fit_functions: ~list[IFunction]
List of fit function
vec_x: numpy.ndarray
1D array for X values
Expand All @@ -121,7 +125,7 @@ def _calculate_function(fit_functions, vec_x):
return vec_y


def _plot_fit_results(rings, peak_fit_dict, output_dir):
def _plot_fit_results(rings, peak_fit_dict, output_dir, auto_symmetric_wedges):
"""Plot original data and fit result
Parameters
Expand All @@ -132,11 +136,23 @@ def _plot_fit_results(rings, peak_fit_dict, output_dir):
dictionary containing all the peak fitting result
output_dir: str
full path of the output directory
auto_symmetric_wedges: bool
if True, label the second peak as mirrored
Returns
-------
"""

def _set_function_params_and_calculate(fit_functions: list[IFunction], peak_fitted_dict: dict):
for param_name in peak_fitted_dict:
if param_name not in ["fit_function", "error", "used"]:
# parameter value is recorded as tuple as value and error
_set_function_param_value(fit_function_set, param_name, peak_fitted_dict[param_name][0])
# calculate
model_y = _calculate_function(fit_functions, ring.mod_q)
return model_y

for index, ring in enumerate(rings):
# add fitting related information to hdf file
if peak_fit_dict[index]["error"] is not None:
Expand All @@ -149,17 +165,33 @@ def _plot_fit_results(rings, peak_fit_dict, output_dir):
# calculate estimated peaks
estimated_y = _calculate_function(fit_function_set, ring.mod_q)

for param_name in peak_fit_dict[index]:
if param_name not in ["fit_function", "error", "used"]:
# parameter value is recorded as tuple as value and error
_set_function_param_value(fit_function_set, param_name, peak_fit_dict[index][param_name][0])
if auto_symmetric_wedges:
# separate fitted peak ("f1" params) from mirrored peak (fake "f2" params) to plot them separately
# the fit function is a single gaussian with only "f1" entries
peak_fitted_dict = {}
peak_mirrored_dict = {}
for key, val in peak_fit_dict[index].items():
if key.startswith("f2"):
new_key = key.replace("f2", "f1")
# shift mirrored peak into plot angle range
new_value = (val[0] - 360.0, val[1]) if "PeakCentre" in key and val[0] > 360.0 else val
peak_mirrored_dict[new_key] = new_value
else:
peak_fitted_dict[key] = val
else:
peak_fitted_dict = peak_fit_dict[index]

# calculate
model_y = _calculate_function(fit_function_set, ring.mod_q)
model_y = _set_function_params_and_calculate(fit_function_set, peak_fitted_dict)

# plot
plt.cla()
plt.plot(ring.mod_q, ring.intensity, label="observed", color="black")
plt.plot(ring.mod_q, model_y, label="fitted", color="red")
plt.plot(ring.mod_q, estimated_y, label="estimated", color="blue")
plt.plot(ring.mod_q, model_y, label="fitted", color="red")
if auto_symmetric_wedges:
mirrored_model_y = _set_function_params_and_calculate(fit_function_set, peak_mirrored_dict)
plt.plot(ring.mod_q, mirrored_model_y, label="mirrored", color="red", linestyle="dashed")
plt.legend(loc="upper left")
plt.savefig(os.path.join(output_dir, f"ring_{index:01}.png"))


Expand Down Expand Up @@ -342,7 +374,7 @@ def getWedgeSelection(
peak_fit_dict=fit_dict,
output_dir=debug_dir,
)
_plot_fit_results(azimuthal_rings, fit_dict, debug_dir)
_plot_fit_results(azimuthal_rings, fit_dict, debug_dir, auto_symmetric_wedges)

# verify that the results didn't predict wedges larger than half of the data
if np.any(np.array(fwhm_vec) > 360.0 / 2):
Expand Down Expand Up @@ -500,7 +532,7 @@ def _estimatePeakParameters(intensity, azimuthal, azimuthal_start, window_half_w
if abs(azimuthal_new - azimuthal_last) < 1.0:
break
# output
print(
logger.debug(
f"[WEDGE FIT] azimuthal: {azimuthal_new}, {azimuthal_last} with left and right as {left_index}, {right_index}"
)

Expand Down Expand Up @@ -603,7 +635,7 @@ def _fitSpectrum(
function = ["name=FlatBackground,A0={}".format(background)]

# template for describing initial peak guess
gaussian_str = "name=Gaussian,Height={},PeakCentre={},Sigma={}"
gaussian_str = "name=Gaussian,Height={},PeakCentre={},Sigma={},constraints=(0<Height)"

# guess where one peak might be, start with a window of WINDOW_SIZE each side around 110
intensity_peak, azimuthal_first, sigma = _estimatePeakParameters(
Expand Down Expand Up @@ -665,7 +697,7 @@ def _fitSpectrum(
result = {"fit_function": fit_function}
for i in range(fitresult.OutputParameters.rowCount()):
row = fitresult.OutputParameters.row(i)
print(f"[DEBUG-TEST] row: {row} of type {type(row)}")
logger.debug(f"[DEBUG-TEST] row: {row} of type {type(row)}")
name = row["Name"]
if name.startswith("Cost function"):
name = "chisq"
Expand All @@ -682,7 +714,7 @@ def _fitSpectrum(
result["f2.Sigma"] = result["f1.Sigma"]

if verbose:
print(f"Fit result: {result}")
logger.notice(f"Fit result: {result}")

return result

Expand Down
17 changes: 10 additions & 7 deletions src/drtsans/iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@ def valid_wedge(min_angle, max_angle) -> List[Tuple[float, float]]:
max_angle, min_angle, diff
)
)
diff = min_angle - max_angle
if diff <= 180:
raise ValueError(
"wedge angle is greater than 180 degrees: {:.1f} - {:.1f} = {:.1f} <= 180".format(
min_angle, max_angle, diff
else:
# max_angle is smaller than min_angle, meaning that the wedge crosses the 270 -> -90 border
# Split the wedge into two parts on either side of the border so that all wedges are in the
# range [-90,270).
diff = 360.0 - (min_angle - max_angle)
if diff >= 180:
raise ValueError(
f"wedge angle (min {min_angle:.1f} max {max_angle:.1f}) is greater than 180 degrees:"
f" (360 + {max_angle:.1f}) - {min_angle:.1f} = {diff:.1f} >= 180"
)
)
return [(min_angle, 270.1), (-90.1, max_angle)]
return [(min_angle, 270.1), (-90.1, max_angle)]


def get_wedges(min_angle: float, max_angle: float, symmetric_wedges=True) -> List[Tuple[float, float]]:
Expand Down
52 changes: 52 additions & 0 deletions tests/integration/drtsans/test_auto_wedge.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,58 @@ def test_real_data_biosans(datarepo_dir):
os.remove(filename)


@pytest.mark.datarepo
def test_real_data_biosans_autosymmetric_wedges(datarepo_dir):
MSamp_fn = os.path.join(datarepo_dir.biosans, "CG3_127_5532_mBSub.h5")
MBuff_fn = os.path.join(datarepo_dir.biosans, "CG3_127_5562_mBSub.h5")

ws_ms = LoadNexusProcessed(Filename=MSamp_fn, OutputWorkspace="sample", LoadHistory=False)
ws_mb = LoadNexusProcessed(Filename=MBuff_fn, OutputWorkspace="Main_buffer", LoadHistory=False)
ws_ms -= ws_mb # subtract the buffer
ws_mb.delete()

# convert to I(qx,qy)
q1d_data = biosans.convert_to_q(ws_ms, mode="scalar")
q2d_data = biosans.convert_to_q(ws_ms, mode="azimuthal")
ws_ms.delete()

# calculate the wedge angles to use
wedge_angles = getWedgeSelection(
data2d=q2d_data,
q_min=0.00,
q_delta=0.001,
q_max=0.02,
azimuthal_delta=0.5,
peak_width=0.25,
background_width=1.5,
signal_to_noise_min=1.2,
auto_symmetric_wedges=True,
)
assert len(wedge_angles) == 2, "Expect 2 separate wedges"

# use these to integrate the wedges
for wedges in wedge_angles:
for azi_min, azi_max in wedges:
print("integrating from {}deg to {} deg".format(azi_min, azi_max))
iq_wedge = select_i_of_q_by_wedge(q2d_data, azi_min, azi_max)
assert iq_wedge

# test bin_all
nbins = 100.0
iq2d_rebinned, iq1d_rebinned = bin_all(
q2d_data,
q1d_data,
nxbins=nbins,
nybins=nbins,
n1dbins=nbins,
bin1d_type="wedge",
wedges=wedge_angles,
symmetric_wedges=False,
error_weighted=False,
)
assert len(iq1d_rebinned) == 2, "Expect exactly 2 output 1d spectra"


@pytest.mark.datarepo
def test_real_data_biosans_manual(datarepo_dir):
"""Test asymmetric manual wedge binning on BIOSANS data"""
Expand Down

0 comments on commit be3fc46

Please sign in to comment.