diff --git a/Framework/PythonInterface/plugins/algorithms/IntegratePeaks1DProfile.py b/Framework/PythonInterface/plugins/algorithms/IntegratePeaks1DProfile.py index 40580e159eab..6bae9122f6e1 100644 --- a/Framework/PythonInterface/plugins/algorithms/IntegratePeaks1DProfile.py +++ b/Framework/PythonInterface/plugins/algorithms/IntegratePeaks1DProfile.py @@ -15,6 +15,9 @@ Progress, FunctionFactory, IPeak, + MultiDomainFunction, + AnalysisDataService as ADS, + IPeakFunction, ) from mantid.kernel import ( Direction, @@ -23,19 +26,27 @@ StringListValidator, EnabledWhenProperty, PropertyCriterion, - UnitConversion, - DeltaEModeType, StringArrayProperty, + UnitParams, ) -from mantid.fitfunctions import FunctionWrapper +from mantid.dataobjects import Workspace2D +from mantid.fitfunctions import FunctionWrapper, CompositeFunctionWrapper import numpy as np -from scipy.ndimage import binary_dilation +from scipy.ndimage import distance_transform_edt, label from plugins.algorithms.IntegratePeaksSkew import InstrumentArrayConverter, get_fwhm_from_back_to_back_params, PeakData from plugins.algorithms.IntegratePeaksShoeboxTOF import get_bin_width_at_tof, set_peak_intensity from enum import Enum -from typing import Callable, Sequence +from typing import Sequence, Tuple + + +class PEAK_STATUS(Enum): + VALID = "Valid Peak" + ON_EDGE = "Peak on detector edge" + NO_PEAK = "No peak found" + MIN_TOF_WIDTH = 1e-3 +MIN_INTENS_OVER_SIGMA = 0.5 class IntegratePeaks1DProfile(DataProcessorAlgorithm): @@ -156,7 +167,7 @@ def PyInit(self): name="IOverSigmaThreshold", defaultValue=2.5, direction=Direction.Input, - validator=FloatBoundedValidator(lower=0.0), + validator=FloatBoundedValidator(lower=MIN_INTENS_OVER_SIGMA), doc="Criterion to stop fitting.", ) self.declareProperty( @@ -217,6 +228,15 @@ def PyInit(self): "Optional file path in which to write diagnostic plots (note this will slow the execution of algorithm).", ) self.setPropertyGroup("OutputFile", "Plotting") + # peak validation + self.declareProperty( + name="NPixMin", + defaultValue=0, + direction=Direction.Input, + validator=IntBoundedValidator(lower=0), + doc="Minimum number of pixels successfully fitted in a peak.", + ) + self.setPropertyGroup("NPixMin", "Peak Validation") def validateInputs(self): issues = dict() @@ -278,12 +298,21 @@ def PyExec(self): do_lorz_cor = self.getProperty("LorentzCorrection").value # saving file output_file = self.getProperty("OutputFile").value + # peak validation + npix_min = self.getProperty("NPixMin").value # create output table workspace peaks = self.exec_child_alg("CloneWorkspace", InputWorkspace=peaks, OutputWorkspace="out_peaks") # select cost function - fit_kwargs = {"Minimizer": "Levenberg-Marquardt", "MaxIterations": 5000, "StepSizeMethod": "Sqrt epsilon"} + fit_kwargs = { + "Minimizer": "Levenberg-Marquardt", + "MaxIterations": 5000, + "StepSizeMethod": "Sqrt epsilon", + "IgnoreInvalidData": False, + "CreateOutput": True, + "OutputCompositeMembers": True, + } match cost_func_name: case "RSq": fit_kwargs["CostFunction"] = "Unweighted least squares" @@ -299,69 +328,103 @@ def PyExec(self): prog_reporter = Progress(self, start=0.0, end=1.0, nreports=peaks.getNumberPeaks()) for ipk, peak in enumerate(peaks): prog_reporter.report("Integrating") - - intens, sigma = 0.0, 0.0 + peak_intens, peak_sigma = 0.0, 0.0 status = PEAK_STATUS.NO_PEAK detid = peak.getDetectorID() bank_name = peaks.column("BankName")[ipk] + + # get fit range pk_tof = peak.getTOF() + ispec_pk = ws.getIndicesFromDetectorIDs([detid])[0] + if get_nbins_from_b2bexp_params: + fit_width = nfwhm * get_fwhm_from_back_to_back_params(peak, ws, detid) + else: + bin_width = get_bin_width_at_tof(ws, ispec_pk, pk_tof) + fit_width = self.getProperty("NBins").value * bin_width + if peak_func_name == "BackToBackExponential": + # take into account asymmetry of peak function in choosing window + tof_start = pk_tof - fit_width / 3 + else: + tof_start = pk_tof - fit_width / 2 + tof_end = tof_start + fit_width # check TOF is in limits of x-axis xdim = ws.getXDimension() - if xdim.getMinimum() < pk_tof < xdim.getMaximum(): - # check peak is in extent of data - ispec = ws.getIndicesFromDetectorIDs([detid])[0] - bin_width = get_bin_width_at_tof(ws, ispec, pk_tof) # used later to scale intensity - if get_nbins_from_b2bexp_params: - fwhm = get_fwhm_from_back_to_back_params(peak, ws, detid) - nbins = max(3, int(nfwhm * fwhm / bin_width)) if fwhm is not None else self.getProperty("NBins").value - else: - nbins = self.getProperty("NBins").value - - # get data array and crop - peak_data = array_converter.get_peak_data(peak, detid, bank_name, nrows, ncols, nrows_edge, ncols_edge) - - # fit peak - peak_fitter = PeakFitter( - peak, - peak_data, - nbins, - peak_func_name, - bg_func_name, - peak_params_to_fix, - frac_dspac_delta, - i_over_sig_threshold, - self.exec_fit, - fit_kwargs, - error_strategy, + if xdim.getMinimum() > tof_start or xdim.getMaximum() < tof_end: + continue # skip peak + + # get detector IDs in peak region + peak_data = array_converter.get_peak_data(peak, detid, bank_name, nrows, ncols, nrows_edge, ncols_edge) + + # fit with constrained peak centers + func_generator = PeakFunctionGenerator(peak_params_to_fix) + initial_function, md_fit_kwargs, initial_peak_mask = func_generator.get_initial_fit_function_and_kwargs( + ws, peak_data, peak.getDSpacing(), (tof_start, tof_end), peak_func_name, bg_func_name + ) + if initial_peak_mask.sum() < npix_min: + continue # no peak + fit_result = self.exec_fit(initial_function, **fit_kwargs, **md_fit_kwargs) + if not fit_result["success"]: + self.delete_fit_result_workspaces(fit_result) + continue # skip peak + + # update peak mask based on I/sig from fit + *_, i_over_sigma = calc_intens_and_sigma_arrays(fit_result, error_strategy) + non_bg_mask = np.zeros(peak_data.detids.shape, dtype=bool) + non_bg_mask.flat[initial_peak_mask] = i_over_sigma > i_over_sig_threshold + peak_mask = find_peak_cluster_in_window(non_bg_mask, (peak_data.irow, peak_data.icol)) + if peak_mask.sum() < npix_min: + continue # no peak + + is_on_edge = np.any(np.logical_and(peak_mask, peak_data.det_edges)) + if is_on_edge: + status = PEAK_STATUS.ON_EDGE + if not integrate_on_edge: + self.delete_fit_result_workspaces(fit_result) + continue # skip peak + + # fit only peak pixels and let peak centers vary independently of DIFC ratio + fit_mask = peak_mask.flat[initial_peak_mask] # get bool for domains to be fitted from peak mask + final_function = func_generator.get_final_fit_function(fit_result["Function"], fit_mask, frac_dspac_delta) + fit_result = self.exec_fit(final_function, **fit_kwargs, **md_fit_kwargs) + if not fit_result["success"]: + self.delete_fit_result_workspaces(fit_result) + continue # skip peak + + # post-fit check on peak widths (so as not + at_limit = func_generator.check_peak_widths_not_at_limits(fit_result["Function"]) + if at_limit.all(): + self.delete_fit_result_workspaces(fit_result) + continue # skip peak + fit_mask[at_limit] = False # do not include these pixels in peak + peak_mask.flat[initial_peak_mask] = fit_mask + + # calculate intensity + status = PEAK_STATUS.VALID + intens, sigma, _ = calc_intens_and_sigma_arrays(fit_result, error_strategy) + peak_intens = np.sum(intens[fit_mask]) + peak_sigma = np.sqrt(np.sum(sigma[fit_mask] ** 2)) + + if output_file: + intens_over_sig = peak_intens / peak_sigma if peak_sigma > 0 else 0.0 + results.append( + LineProfileResult( + ipk, + peak, + intens_over_sig, + status, + peak_mask, + fit_mask, + func_generator.ysum, + fit_result, + peak_data, + ) ) - peak_fitter.integrate_peak() - - # update intenisty and save results for later plotting - if np.any(peak_fitter.successful): - # check edge - is_on_edge = np.any(np.logical_and(peak_fitter.attempted, peak_data.det_edges)) - if integrate_on_edge or not is_on_edge: - status = PEAK_STATUS.VALID - intens = peak_fitter.intens_sum - sigma = np.sqrt(peak_fitter.sigma_sq_sum) - else: - status = PEAK_STATUS.ON_EDGE - if output_file: - intens_over_sig = intens / sigma if sigma > 0 else 0.0 - results.append( - LineProfileResult( - ipk, - peak, - intens_over_sig, - peak_fitter, - peak_data, - status, - ) - ) - - set_peak_intensity(peak, intens, sigma, do_lorz_cor) + set_peak_intensity(peak, peak_intens, peak_sigma, do_lorz_cor) + + # delete fit workspaces + self.delete_fit_result_workspaces(fit_result) # plot output if output_file: @@ -381,232 +444,179 @@ def exec_child_alg(self, alg_name, **kwargs): else: return None - def exec_fit(self, ispec, **kwargs): + def exec_fit(self, function, **kwargs): alg = self.createChildAlgorithm("Fit", enableLogging=False) alg.initialize() + alg.setAlwaysStoreInADS(True) + alg.setProperty("Function", function) # needs to be done first alg.setProperties(kwargs) - alg.setProperty("WorkspaceIndex", ispec) + fit_result = {"success": False} try: alg.execute() - func = alg.getProperty("Function").value # getPropertyValue returns FunctionProperty not IFunction + fit_result["Function"] = alg.getProperty("Function").value # getPropertyValue returns FunctionProperty not IFunction status = alg.getPropertyValue("OutputStatus") - success = status == "success" or "Changes in function value are too small" in status - return success, func + fit_result["success"] = status == "success" or status == "Changes in parameter value are too small" + for prop in ("OutputWorkspace", "OutputNormalisedCovarianceMatrix", "OutputParameters"): + fit_result[prop] = alg.getPropertyValue(prop) + return fit_result except: - return False, None + pass + return fit_result + def delete_fit_result_workspaces(self, fit_result: dict): + wsnames = [fit_result[field] for field in ("OutputWorkspace", "OutputNormalisedCovarianceMatrix", "OutputParameters")] + self.exec_child_alg("DeleteWorkspaces", WorkspaceList=wsnames) -class PeakFitter: - def __init__( - self, - pk, - peak_data, - nbins, - peak_func_name, - bg_func_name, - peak_params_to_fix, - frac_dspac_delta, - i_over_sig_threshold, - exec_fit, - fit_kwargs, - error_strategy, - ): - self.ws = peak_data.ws - self.pk: IPeak = pk - self.peak_pos: tuple[int, int] = (peak_data.irow, peak_data.icol) - self.frac_dspac_delta: float = frac_dspac_delta - self.i_over_sig_threshold: float = i_over_sig_threshold - # extract data - self.tofs: np.ndarray = None - self.y: np.ndarray = None - self.esq: np.ndarray = None - self.ispecs: np.ndarray = None - self.yfit_foc: np.ndarray = None - self.successful: np.ndarray = None - self.attempted: np.ndarray = None - self.intens_sum: float = 0 - self.sigma_sq_sum: float = 0 - self.peak_func_name: str = peak_func_name - self.bg_func_name: str = bg_func_name - self.peak_params_to_fix: Sequence[str] = peak_params_to_fix - self.exec_fit: Callable = exec_fit - self.fit_kwargs: dict = None - self.error_strategy: str = error_strategy - self.cached_params: dict = None - self.nfixed_default: int = None - self.nfixed: int = None - - self.get_and_clip_data_arrays(peak_data, nbins) - self.calc_limits_on_fwhm() - self.ysum = self.y.sum(axis=2) - self.yfit_foc: np.ndarray = np.zeros(self.tofs.shape) - self.successful: np.ndarray = np.zeros(self.ispecs.shape, dtype=bool) - self.attempted: np.ndarray = self.successful.copy() - self.update_peak_position() - self.fit_kwargs = { - "InputWorkspace": self.ws, - "CreateOutput": False, - "CalcErrors": True, - "StartX": self.tofs[0], - "EndX": self.tofs[-1], - **fit_kwargs, - } - def get_tof_slice_for_cropping(self, nbins): - # get tof indices and limits - self.ispec = int(self.ispecs[self.peak_pos]) - itof = self.ws.yIndexOfX(self.pk.getTOF(), self.ispec) - if self.peak_func_name == "BackToBackExponential": - # take into account asymmetry of peak function in choosing window - nbins_left = nbins // 3 - istart = itof - nbins_left - iend = itof + (nbins - nbins_left) +class PeakFunctionGenerator: + def __init__(self, peak_params_to_fix: Sequence[str]): + self.cen_par_name: str = None + self.intens_par_name: str = None + self.width_par_name: str = None + self.width_max: float = None + self.width_max: float = None + self.peak_params_to_fix: Sequence[str] = peak_params_to_fix + self.peak_mask: np.ndarray[float] = None + self.ysum: np.ndarray[float] = None + + def get_initial_fit_function_and_kwargs( + self, ws: Workspace2D, peak_data: PeakData, dpk: float, tof_range: tuple[float, float], peak_func_name: str, bg_func_name: str + ) -> str: + ispecs = ws.getIndicesFromDetectorIDs([int(d) for d in peak_data.detids.flat]) + tof_start, tof_end = tof_range + function = MultiDomainFunction() + si = ws.spectrumInfo() + fit_kwargs = {} + # estimate background + istart = ws.yIndexOfX(tof_start) + iend = ws.yIndexOfX(tof_end) + # init bg func (global) + bg_func = FunctionFactory.createFunction(bg_func_name) + # init peak func + peak_func = FunctionFactory.Instance().createPeakFunction(peak_func_name) + # save parameter names for future ties/constraints + peak_func.setIntensity(1.0) + self.intens_par_name = next(peak_func.getParamName(ipar) for ipar in range(peak_func.nParams()) if peak_func.isExplicitlySet(ipar)) + self.cen_par_name = peak_func.getCentreParameterName() + self.width_par_name = peak_func.getWidthParameterName() + avg_bg = 0 + idom = 0 + peak_mask = np.zeros(len(ispecs), dtype=bool) + self.ysum = np.zeros(peak_data.detids.shape) # required for plotting later + for ii, ispec in enumerate(ispecs): + # check stats in pixel + intens, sigma, bg = self._estimate_intensity_and_background(ws, ispec, istart, iend) + self.ysum.flat[ii] = ws.readY(ispec)[istart:iend].sum() + avg_bg += bg + peak_mask[ii] = sigma > 0 and intens / sigma > MIN_INTENS_OVER_SIGMA # low threshold for initial fit + if peak_mask[ii]: + # add peak + difc = si.diffractometerConstants(int(ispec))[UnitParams.difc] + peak_func.setCentre(difc * dpk) + peak_func.setIntensity(intens) + comp_func = CompositeFunctionWrapper(FunctionWrapper(peak_func), FunctionWrapper(bg_func), NumDeriv=True) + function.add(comp_func.function) + function.setDomainIndex(idom, idom) + key_suffix = f"_{idom}" if idom > 0 else "" + fit_kwargs["InputWorkspace" + key_suffix] = ws.name() + fit_kwargs["StartX" + key_suffix] = tof_start + fit_kwargs["EndX" + key_suffix] = tof_end + fit_kwargs["WorkspaceIndex" + key_suffix] = int(ispec) + idom += 1 + # set background (background global tied to first domain function) + function[0][1]["A0"] = avg_bg / len(ispecs) + # set instrument specific parameters + iset_initial = np.array([function[0][0].isExplicitlySet(ipar) for ipar in range(peak_func.nParams())]) + ispec_pk = np.ravel_multi_index([peak_data.irow, peak_data.icol], peak_data.detids.shape) + function[0][0].setMatrixWorkspace(ws, int(ispec_pk), 0, 0) + iset_final = np.array([function[0][0].isExplicitlySet(ipar) for ipar in range(peak_func.nParams())]) + ipars_to_tie = np.flatnonzero(np.logical_not(iset_initial, iset_final)) + pars_to_tie = [function[0][0].parameterName(int(ipar)) for ipar in ipars_to_tie] # global peak parameters + # set constraint on FWHM (to avoid peak fitting to noise or background) + self._add_fwhm_constraints(function, peak_func, fit_range=tof_end - tof_start, nbins=iend - istart) + return self._add_parameter_ties(function, pars_to_tie), fit_kwargs, peak_mask + + def _add_fwhm_constraints(self, function: MultiDomainFunction, peak_func: IPeakFunction, fit_range: float, nbins: int): + [peak_func.setParameter(iparam, function[0].getParameterValue(iparam)) for iparam in range(peak_func.nParams())] + if function[0][0].isExplicitlySet(peak_func.getParameterIndex(self.width_par_name)): + # BackToBack initialised with instrument parameters (ratio of FWHM to Sigma also depends on A,B) + fwhm = peak_func.fwhm() + scale_factor = peak_func.getParameterValue(self.width_par_name) / fwhm if fwhm > 0 else 1 else: - istart = itof - nbins // 2 - iend = itof + nbins // 2 - self.tof_slice = slice( - int(np.clip(istart, a_min=0, a_max=self.ws.blocksize())), int(np.clip(iend, a_min=0, a_max=self.ws.blocksize())) - ) - - def get_and_clip_data_arrays(self, peak_data, nbins): - tofs, y, esq, self.ispecs = peak_data.get_data_arrays() # 3d arrays [rows x cols x tof] - self.get_tof_slice_for_cropping(nbins) - self.tofs = tofs[self.peak_pos[0], self.peak_pos[1], self.tof_slice] # take x at peak cen, should be same for all det - # crop data array to TOF region of peak - self.y = y[:, :, self.tof_slice] - self.esq = esq[:, :, self.tof_slice] - - def calc_limits_on_fwhm(self): - self.min_fwhm = self.tofs[1 + (len(self.tofs) // 2)] - self.tofs[len(self.tofs) // 2] # FWHM > bin-width - self.max_fwhm = (self.tofs[-1] - self.tofs[0]) / 3 # must be at least 3 FWHM in data range - - def update_peak_position(self): - # search for pixel with highest TOF integrated counts in 3x3 window around peak position - irow_min = np.clip(self.peak_pos[0] - 1, a_min=0, a_max=self.ysum.shape[0]) - irow_max = np.clip(self.peak_pos[0] + 2, a_min=0, a_max=self.ysum.shape[0]) # add 1 as last index not in slice - icol_min = np.clip(self.peak_pos[1] - 1, a_min=0, a_max=self.ysum.shape[1]) - icol_max = np.clip(self.peak_pos[1] + 2, a_min=0, a_max=self.ysum.shape[1]) # add 1 as last index not in slice - imax = np.unravel_index(np.argmax(self.ysum[irow_min:irow_max, icol_min:icol_max]), (irow_max - irow_min, icol_max - icol_min)) - self.peak_pos = (imax[0] + irow_min, imax[1] + icol_min) - - def calc_tof_peak_centre_and_bounds(self, ispec): - # need to do this for each spectrum as DIFC different for each - diff_consts = self.ws.spectrumInfo().diffractometerConstants(int(ispec)) - tof_pk = UnitConversion.run("dSpacing", "TOF", self.pk.getDSpacing(), 0, DeltaEModeType.Elastic, diff_consts) - tof_pk_min = np.clip(tof_pk * (1 - self.frac_dspac_delta), a_min=self.tofs[0], a_max=self.tofs[-1]) - tof_pk_max = np.clip(tof_pk * (1 + self.frac_dspac_delta), a_min=self.tofs[0], a_max=self.tofs[-1]) - return tof_pk, tof_pk_min, tof_pk_max - - def create_peak_function_with_initial_params(self, ispec, intensity, bg, tof_pk, tof_pk_min, tof_pk_max): - # need to create from scratch for setMatrixWorkspace to overwrite parameters - peak_func = FunctionFactory.Instance().createPeakFunction(self.peak_func_name) - # set initial parameters of peak - peak_func.setCentre(tof_pk) - # initialise instrument specific parameters (e.g A,B,S for case of BackToBackExponential) - peak_func.setMatrixWorkspace(self.ws, ispec, 0, 0) - if np.isclose(peak_func.fwhm(), 0.0): - # width not set by default - set width based max of d-spacing tolerance or bin-width - fwhm = np.clip(self.frac_dspac_delta * tof_pk, a_min=self.min_fwhm, a_max=self.max_fwhm) - peak_func.setFwhm(fwhm) - peak_func.setIntensity(intensity) - # fix parameters - self.nfixed_default = len([ipar for ipar in range(peak_func.nParams()) if peak_func.isFixed(ipar)]) - [peak_func.fixParameter(par_name) for par_name in self.peak_params_to_fix] - self.nfixed = len([ipar for ipar in range(peak_func.nParams()) if peak_func.isFixed(ipar)]) - self.add_constraints_to_peak_function(peak_func, tof_pk_min, tof_pk_max) - bg_func = FunctionWrapper(self.bg_func_name) - bg_func.setParameter("A0", bg) # set constant background - return FunctionWrapper(peak_func) + bg_func, peak_func - - def add_constraints_to_peak_function(self, peak_func, tof_pk_min, tof_pk_max): - # set minimum of all parameters to be 0 - [peak_func.addConstraints(f"0<{peak_func.parameterName(ipar)}") for ipar in range(peak_func.nParams())] - # constrain centre and width - peak_func.addConstraints(f"{tof_pk_min}<{peak_func.getCentreParameterName()}<{tof_pk_max}") - # assume constant scale factor between FWHM and width parameter - width_par_name = peak_func.getWidthParameterName() - scale_factor = peak_func.getParameterValue(width_par_name) / peak_func.fwhm() - peak_func.addConstraints(f"{self.min_fwhm * scale_factor}<{width_par_name}<{self.max_fwhm * scale_factor}") - - def fit_spectrum(self, profile_func, ispec): - return self.exec_fit(ispec, Function=str(profile_func), **self.fit_kwargs) - - def integrate_peak(self): - self.fit_nearest([self.peak_pos]) - - def fit_nearest(self, inearest): - # fit in order of max intensity - isort = np.argsort([-self.ysum[inear] for inear in inearest]) - any_successful = False - for inear in isort: - irow, icol = inearest[inear] - self.attempted[irow, icol] = True - # check enough counts in spectrum - initial_intens, initial_sigma, bg = self.estimate_intensity_sigma_and_background(irow, icol) - intens_over_sigma = initial_intens / initial_sigma if initial_sigma > 0 else 0.0 - if intens_over_sigma < self.i_over_sig_threshold: - continue # skip this spectrum - # get center and check min data extent - ispec = int(self.ispecs[irow, icol]) - tof_pk, tof_pk_min, tof_pk_max = self.calc_tof_peak_centre_and_bounds(ispec) - if tof_pk < self.tofs[0] or tof_pk > self.tofs[-1]: - continue # peak not in data limits - # update initial parameter guesses - profile_func, peak_func = self.create_peak_function_with_initial_params( - ispec, initial_intens, bg, tof_pk, tof_pk_min, tof_pk_max - ) - # fit - success, profile_func = self.fit_spectrum(profile_func, ispec) - if success: - any_successful = True - profile_func.freeAll() - [profile_func[0].fixParameter(par_name) for par_name in self.peak_params_to_fix] - success_final, profile_func_final = self.fit_spectrum(profile_func, ispec) - if success_final: - profile_func = profile_func_final - # make peak function and get fwhm and intensity - [peak_func.setParameter(iparam, profile_func.getParameterValue(iparam)) for iparam in range(peak_func.nParams())] - if not self.min_fwhm < peak_func.fwhm() < self.max_fwhm: - continue # skip - intens = peak_func.intensity() - if self.error_strategy == "Hessian": - [peak_func.setError(iparam, profile_func.getError(iparam)) for iparam in range(peak_func.nParams())] - sigma = peak_func.intensityError() - else: - sigma = calc_sigma_from_summation(self.tofs, self.esq[irow, icol, :], FunctionWrapper(peak_func)(self.tofs)) - intens_over_sigma = intens / sigma if sigma > 0 else 0.0 - if intens_over_sigma > self.i_over_sig_threshold: - self.successful[irow, icol] = True - self.yfit_foc += FunctionWrapper(profile_func)(self.tofs) - self.intens_sum = self.intens_sum + intens - self.sigma_sq_sum = self.sigma_sq_sum + sigma**2 - if not any_successful: - # no neighbours successfully fitted - terminate here - return - # if did break start process again - inearest = self.find_neighbours() - return self.fit_nearest(inearest) - - def find_neighbours(self): - mask = binary_dilation(self.successful) - mask = np.logical_and(mask, ~self.attempted) - return list(zip(*np.where(mask))) - - def estimate_intensity_sigma_and_background(self, irow, icol): - if not np.any(self.y[irow, icol, :] > 0): + # assume Gaussian (even for BackToBack with default pars - because the defaults very bad!) + scale_factor = 2 * np.sqrt(2 * np.log(2)) + self.width_min = 0.5 * (fit_range / nbins) * scale_factor # FWHM > 0.5 * bin-width + self.width_max = max(self.width_min + 1e-10, (fit_range / 2) * scale_factor) # must be at least 2 FWHM in data range + function[0].addConstraints(f"{self.width_min} Tuple[float, float, float]: + bin_width = np.diff(ws.readX(ispec)[istart:iend]) + bin_width = np.hstack((bin_width, bin_width[-1])) # easier than checking iend and istart not out of bounds + y = ws.readY(ispec)[istart:iend] + if not np.any(y > 0): return 0.0, 0.0, 0.0 - ibg, _ = PeakData.find_bg_pts_seed_skew(self.y[irow, icol, :]) - bg = np.mean(self.y[irow, icol, ibg]) - bin_width = np.diff(self.tofs) - intensity = np.sum((0.5 * (self.y[irow, icol, 1:] + self.y[irow, icol, :-1]) - bg) * bin_width) - sigma = np.sqrt(np.sum(0.5 * (self.esq[irow, icol, 1:] + self.esq[irow, icol, :-1]) * (bin_width**2))) + e = ws.readE(ispec)[istart:iend] + ibg, _ = PeakData.find_bg_pts_seed_skew(y) + bg = np.mean(y[ibg]) + intensity = np.sum((y - bg) * bin_width) + sigma = np.sqrt(np.sum((e * bin_width) ** 2)) return intensity, sigma, bg - -class PEAK_STATUS(Enum): - VALID = "Valid Peak" - ON_EDGE = "Peak on detector edge" - NO_PEAK = "No peak found" + def _add_parameter_ties(self, function: MultiDomainFunction, pars_to_tie: Sequence[str]) -> str: + # fix peak params requested + [function[0][0].fixParameter(par) for par in self.peak_params_to_fix] + additional_pars_to_fix = set(self.peak_params_to_fix) - set(pars_to_tie) + ties = [] + for idom in range(1, function.nDomains()): + # tie global params to first + for par in pars_to_tie: + ties.append(f"f{idom}.f0.{par}=f0.f0.{par}") # global peak pars + for ipar_bg in range(function[idom][1].nParams()): + par = function[idom][1].getParamName(ipar_bg) + ties.append(f"f{idom}.f1.{par}=f0.f1.{par}") + # tie center using ratio of DIFC + ratio = function[idom][0][self.cen_par_name] / function[0][0][self.cen_par_name] + ties.append(f"f{idom}.f0.{self.cen_par_name}={ratio}*f0.f0.{self.cen_par_name}") + for par in additional_pars_to_fix: + # pars to be fixed but not global/already tied + function[idom][0].fixParameter(par) + # add ties as string (orders of magnitude quicker than self.function.tie) + return f"{str(function)};ties=({','.join(ties)})" + + def get_final_fit_function(self, function: MultiDomainFunction, peak_mask: np.ndarray[bool], frac_dspac_delta: float) -> str: + function[0][0].freeAll() + [function[0][0].fixParameter(par_name) for par_name in self.peak_params_to_fix] + idom_peak = np.argmax(peak_mask) # first domain containing peak + for idom, comp_func in enumerate(function): + # reset ties on peak function + function.removeTie(f"f{idom}.f0.{self.cen_par_name}") + if not peak_mask.flat[idom]: + comp_func[0][self.intens_par_name] = 0 + comp_func[0].fixParameter(self.intens_par_name) + comp_func[0].fixParameter(self.cen_par_name) + else: + xcen_lo = comp_func[0][self.cen_par_name] * (1 - frac_dspac_delta) + xcen_hi = comp_func[0][self.cen_par_name] * (1 + frac_dspac_delta) + comp_func.addConstraints(f"{xcen_lo} 0: + function.removeTie(f"f{idom}.f1.{par}") + if not peak_mask.flat[idom]: + comp_func[1].fixParameter(par) + elif idom != idom_peak: + function.tie(f"f{idom}.f1.{par}", f"f{idom_peak}.f1.{par}") + return str(function) class LineProfileResult: @@ -614,15 +624,26 @@ class LineProfileResult: This class holds result of line profile integration of a single-crystal Bragg peak """ - def __init__(self, ipk, pk, intens_over_sig, peak_fitter, peak_data, status): + def __init__( + self, + ipk: int, + pk: IPeak, + intens_over_sig: float, + status: Enum, + peak_mask: np.ndarray[bool], + fit_mask: np.ndarray[bool], + ysum: np.ndarray[float], + fit_result: dict, + peak_data: PeakData, + ): self.irow, self.icol = peak_data.irow, peak_data.icol - self.tofs = peak_fitter.tofs - self.ysum = peak_fitter.ysum - self.yfoc = peak_fitter.y[peak_fitter.successful].sum(axis=0) - self.efoc = np.sqrt(peak_fitter.esq[peak_fitter.successful].sum(axis=0)) - self.yfoc_fit = peak_fitter.yfit_foc - self.successful = peak_fitter.successful - # extract peak properties inot title + self.tofs = None + self.ysum = ysum + self.yfoc = None + self.efoc = None + self.yfoc_fit = None + self.peak_mask = peak_mask + # extract peak properties into title intens_over_sig = np.round(intens_over_sig, 1) hkl = np.round(pk.getHKL(), 2) wl = np.round(pk.getWavelength(), 2) @@ -636,12 +657,24 @@ def __init__(self, ipk, pk, intens_over_sig, peak_fitter, peak_data, status): rf"d={d} $\AA$" f"\n{status.value}" ) + self._init_foc_data(fit_result, fit_mask) + + def _init_foc_data(self, fit_result, fit_mask): + ndoms = fit_result["Function"].nDomains() + ydat = np.array([get_eval_ws(fit_result["OutputWorkspace"], idom).readY(0) for idom in range(ndoms) if fit_mask[idom]]) + edat_sq = np.array([get_eval_ws(fit_result["OutputWorkspace"], idom).readE(0) for idom in range(ndoms) if fit_mask[idom]]) ** 2 + yfit = np.array([get_eval_ws(fit_result["OutputWorkspace"], idom).readY(1) for idom in range(ndoms) if fit_mask[idom]]) + self.tofs = get_eval_ws(fit_result["OutputWorkspace"], 0).readX(0) + self.tofs = 0.5 * (self.tofs[1:] + self.tofs[:-1]) + self.yfoc = ydat.sum(axis=0) + self.efoc = np.sqrt(edat_sq.sum(axis=0)) + self.yfoc_fit = yfit.sum(axis=0) def plot_integrated_peak(self, fig, axes, norm_func): # plot colorfill of TOF integrated data on LHS im = axes[0].imshow(self.ysum) im.set_norm(norm_func()) - axes[0].plot(*np.where(self.successful)[::-1], "ow") + axes[0].plot(*np.where(self.peak_mask)[::-1], "ow") axes[0].plot(self.icol, self.irow, "+r") # peak centre axes[0].set_xlabel("Col") axes[0].set_ylabel("Row") @@ -650,20 +683,11 @@ def plot_integrated_peak(self, fig, axes, norm_func): axes[1].plot(self.tofs, self.yfoc_fit, "-r") axes[1].set_xlabel("TOF (mus)") axes[1].set_ylabel("Intensity (a.u.)") + fig.colorbar(im, ax=axes[0], location="left", label="Intensity (a.u.)") # set title fig.suptitle(self.title) -def calc_sigma_from_summation(xspec, esq_spec, ypeak, cutoff=0.025): - nbins = len(ypeak) - ypeak_cumsum = np.cumsum(ypeak) - ypeak_cumsum /= ypeak_cumsum[-1] - ilo = np.clip(np.argmin(abs(ypeak_cumsum - cutoff)), a_min=0, a_max=nbins // 2) - ihi = np.clip(np.argmin(abs(ypeak_cumsum - (1 - cutoff))), a_min=nbins // 2, a_max=nbins - 1) + 1 - bin_width = np.diff(xspec[ilo:ihi]) - return np.sqrt(np.sum(0.5 * (esq_spec[ilo : ihi - 1] + esq_spec[ilo + 1 : ihi]) * (bin_width**2))) - - def plot_integration_results(output_file, results, prog_reporter): # import inside this function as not allowed to import at point algorithms are registered from matplotlib.pyplot import subplots, close @@ -685,5 +709,50 @@ def plot_integration_results(output_file, results, prog_reporter): ) +def find_peak_cluster_in_window(mask, predicted_pos): + labels, nlabels = label(mask) + if nlabels < 1: + return np.zeros(mask.shape, dtype=bool) # no peak found + peak_label = labels[predicted_pos] + if peak_label == 0: + inearest = distance_transform_edt(labels == 0, return_distances=False, return_indices=True) + peak_label = labels[tuple(inearest)][predicted_pos] + return labels == peak_label + + +def calc_sigma_from_summation(xdat, edat_sq, ypeak, cutoff=0.025): + nbins = len(ypeak) + ypeak_cumsum = np.cumsum(abs(ypeak)) + ypeak_cumsum /= ypeak_cumsum[-1] + ilo = np.clip(np.argmin(abs(ypeak_cumsum - cutoff)), a_min=0, a_max=nbins // 2) + ihi = np.clip(np.argmin(abs(ypeak_cumsum - (1 - cutoff))), a_min=nbins // 2, a_max=nbins - 1) + 1 + bin_width = np.diff(xdat[ilo : ihi + 1]) + return np.sqrt(np.sum(edat_sq[ilo:ihi] * (bin_width**2))) + + +def calc_intens_and_sigma_arrays(fit_result, error_strategy): + function = fit_result["Function"] + intens = np.zeros(function.nDomains()) + sigma = np.zeros(intens.shape) + intens_over_sig = np.zeros(intens.shape) + peak_func = FunctionFactory.Instance().createPeakFunction(function[0][0].name()) + for idom, comp_func in enumerate(function): + [peak_func.setParameter(iparam, comp_func.getParameterValue(iparam)) for iparam in range(peak_func.nParams())] + intens[idom] = peak_func.intensity() + if error_strategy == "Hessian": + [peak_func.setError(iparam, comp_func.getError(iparam)) for iparam in range(peak_func.nParams())] + sigma[idom] = peak_func.intensityError() + else: + ws_fit = get_eval_ws(fit_result["OutputWorkspace"], idom) + sigma[idom] = calc_sigma_from_summation(ws_fit.readX(0), ws_fit.readE(0) ** 2, ws_fit.readY(3)) + ivalid = ~np.isclose(sigma, 0) + intens_over_sig[ivalid] = intens[ivalid] / sigma[ivalid] + return intens, sigma, intens_over_sig + + +def get_eval_ws(out_ws_name, idom): + return ADS.retrieve(f"{out_ws_name[:-1]}_{idom}") + + # register algorithm with mantid AlgorithmFactory.subscribe(IntegratePeaks1DProfile) diff --git a/Framework/PythonInterface/test/python/plugins/algorithms/IntegratePeaks1DProfileTest.py b/Framework/PythonInterface/test/python/plugins/algorithms/IntegratePeaks1DProfileTest.py index c1767c91f5a1..c5b380d90ce2 100644 --- a/Framework/PythonInterface/test/python/plugins/algorithms/IntegratePeaks1DProfileTest.py +++ b/Framework/PythonInterface/test/python/plugins/algorithms/IntegratePeaks1DProfileTest.py @@ -29,8 +29,9 @@ def setUpClass(cls): cls.ws = LoadEmptyInstrument(InstrumentName="SXD", OutputWorkspace="SXD") axis = cls.ws.getAxis(0) axis.setUnit("TOF") - # rebin to get 20 TOF bins - cls.ws = Rebin(InputWorkspace=cls.ws, OutputWorkspace=cls.ws.name(), Params="10000,25,10500", PreserveEvents=False) + + # rebin to get required blocksize + cls.ws = Rebin(InputWorkspace=cls.ws, OutputWorkspace=cls.ws.name(), Params="9900,10,10800", PreserveEvents=False) cls.ws += 50 # add constant background cls.peaks_edge = CreatePeaksWorkspace(InstrumentWorkspace=cls.ws, NumberOfPeaks=0, OutputWorkspace="peaks_incl_edge") @@ -38,11 +39,15 @@ def setUpClass(cls): ispecs = cls.ws.getIndicesFromDetectorIDs(cls.detids) # simulate peak cls.pk_tof = 10250 - pk_func = BackToBackExponential(I=1e4, A=0.9, X0=cls.pk_tof) # override default A so cost-func not 0 in fit + pk_func = BackToBackExponential(I=1e4, X0=cls.pk_tof) for ipk, detid in enumerate(cls.detids): AddPeak(PeaksWorkspace=cls.peaks_edge, RunWorkspace=cls.ws, TOF=cls.pk_tof, DetectorID=detid) + pk_func["X0"] = cls.pk_tof pk_func.function.setMatrixWorkspace(cls.ws, ispecs[ipk], 0.0, 0.0) - for ispec in np.arange(ispecs[ipk], ispecs[ipk] + 2): + for ipix, ispec in enumerate(np.arange(ispecs[ipk], ispecs[ipk] + 2)): + if ipix > 0: + # add small shift to simulated peak in second pixel so can check d-spacing tolerance + pk_func["X0"] = cls.pk_tof + 5 y = cls.ws.dataY(int(ispec)) y += pk_func(cls.ws.readX(int(ispec))[:-1]) # note shifts peak centre by half a bin cls.ws.setE(int(ispec), np.sqrt(y)) @@ -60,10 +65,14 @@ def setUpClass(cls): "FractionalChangeDSpacing": 0.025, "IntegrateIfOnEdge": True, "LorentzCorrection": False, + "IOverSigmaThreshold": 1, + "NRowsEdge": 2, + "NColsEdge": 2, } # output file dir cls._test_dir = tempfile.mkdtemp() + cls.default_intens_over_sigma = 30.966 @classmethod def tearDownClass(cls): @@ -74,14 +83,14 @@ def test_exec_IntegrateIfOnEdge_True(self): out = IntegratePeaks1DProfile( InputWorkspace=self.ws, PeaksWorkspace=self.peaks_edge, OutputWorkspace="peaks_int_1", **self.profile_kwargs ) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.59, delta=1e-1) - self.assertAlmostEqual(out.column("Intens/SigInt")[1], 19.59, delta=1e-1) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], self.default_intens_over_sigma, delta=1e-1) + self.assertAlmostEqual(out.column("Intens/SigInt")[1], self.default_intens_over_sigma, delta=1e-1) def test_exec_IntegrateIfOnEdge_False(self): kwargs = self.profile_kwargs.copy() kwargs["IntegrateIfOnEdge"] = False out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks_edge, OutputWorkspace="peaks_int_2", **kwargs) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.59, delta=1e-1) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], self.default_intens_over_sigma, delta=1e-1) self.assertAlmostEqual(out.column("Intens/SigInt")[1], 0.0, delta=1e-2) def test_exec_IntegrateIfOnEdge_False_respects_detector_masking(self): @@ -96,26 +105,26 @@ def test_exec_IntegrateIfOnEdge_False_respects_detector_masking(self): out = IntegratePeaks1DProfile( InputWorkspace=ws_masked, PeaksWorkspace=self.peaks_edge, OutputWorkspace="peaks_int_2_masked", **kwargs ) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.59, delta=1e-1) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], self.default_intens_over_sigma, delta=1e-1) self.assertAlmostEqual(out.column("Intens/SigInt")[1], 0.0, delta=1e-2) def test_exec_poisson_cost_func(self): kwargs = self.profile_kwargs.copy() kwargs["CostFunction"] = "Poisson" out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_3", **kwargs) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.60, delta=1e-2) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 30.95, delta=1e-2) def test_exec_chisq_cost_func(self): kwargs = self.profile_kwargs.copy() kwargs["CostFunction"] = "ChiSq" out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_4", **kwargs) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.60, delta=1e-2) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], self.default_intens_over_sigma, delta=1e-2) def test_exec_hessian_error_strategy(self): kwargs = self.profile_kwargs.copy() kwargs["ErrorStrategy"] = "Hessian" out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_5", **kwargs) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 1212658, delta=1) # not realistic fit + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 62502470, delta=5) # not realistic fit - no noise! def test_exec_IOverSigmaThreshold_respected(self): kwargs = self.profile_kwargs.copy() @@ -128,7 +137,7 @@ def test_exec_gaussian_peak_func(self): kwargs["PeakFunction"] = "Gaussian" kwargs["FixPeakParameters"] = "" out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_7", **kwargs) - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.66, delta=1e-2) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 31.04, delta=1e-2) def test_exec_OutputFile(self): out_file = path.join(self._test_dir, "out.pdf") @@ -140,10 +149,23 @@ def test_exec_OutputFile(self): def test_exec_FractionalChangeDSpacing(self): kwargs = self.profile_kwargs.copy() - kwargs["FractionalChangeDSpacing"] = 1e-8 + kwargs["FractionalChangeDSpacing"] = 1e-10 + out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_9", **kwargs) + # I/sigma different now d-spacing constrained + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 31.16, delta=1e-2) + + def test_exec_fix_peak_params(self): + kwargs = self.profile_kwargs.copy() + kwargs["FixPeakParameters"] = ["I"] out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_9", **kwargs) - # I/sigma different as center constrained - self.assertAlmostEqual(out.column("Intens/SigInt")[0], 19.62, delta=1e-2) + # I/sig slightly different as fixed intensity at initial guess (pretty good guess!) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 30.964, delta=1e-3) + + def test_exec_NPixMin_respected(self): + kwargs = self.profile_kwargs.copy() + kwargs["NPixMin"] = 3 # only 2 pixels in simulated data + out = IntegratePeaks1DProfile(InputWorkspace=self.ws, PeaksWorkspace=self.peaks, OutputWorkspace="peaks_int_10", **kwargs) + self.assertAlmostEqual(out.column("Intens/SigInt")[0], 0.0, delta=1e-2) if __name__ == "__main__": diff --git a/Testing/SystemTests/tests/framework/SXDAnalysis.py b/Testing/SystemTests/tests/framework/SXDAnalysis.py index 6e6c5fab9b66..259376173802 100644 --- a/Testing/SystemTests/tests/framework/SXDAnalysis.py +++ b/Testing/SystemTests/tests/framework/SXDAnalysis.py @@ -195,7 +195,7 @@ def runTest(self): self.integrated_peaks = sxd.get_peaks(runno, PEAK_TYPE.FOUND, INTEGRATION_TYPE.PROFILE) def validate(self): - intens_over_sigma = [0.0, 14.593, 12.116, 133.96, 0.0] + intens_over_sigma = [0.0, 17.863, 13.395, 135.109, 0.0] self.assertTrue(np.allclose(self.integrated_peaks.column("Intens/SigInt"), intens_over_sigma, atol=1e-2)) diff --git a/docs/source/algorithms/IntegratePeaks1DProfile-v1.rst b/docs/source/algorithms/IntegratePeaks1DProfile-v1.rst index 7c02b97dd198..cf17ba2b5193 100644 --- a/docs/source/algorithms/IntegratePeaks1DProfile-v1.rst +++ b/docs/source/algorithms/IntegratePeaks1DProfile-v1.rst @@ -55,8 +55,9 @@ Usage AddPeak(PeaksWorkspace="peaks", RunWorkspace="SXD23767", TOF=8303.3735339704781, DetectorID=7646) peaks_out = IntegratePeaks1DProfile(InputWorkspace="SXD23767", PeaksWorkspace="peaks", OutputWorkspace="peaks_int", - GetNBinsFromBackToBackParams=True, NFWHM=6, CostFunction="Poisson", + GetNBinsFromBackToBackParams=True, NFWHM=8, CostFunction="RSq", PeakFunction="BackToBackExponential", FixPeakParameters='A', + NRows=7, NCols=7, IOverSigmaThreshold=1, FractionalChangeDSpacing=0.01, IntegrateIfOnEdge=True) print(f"I/sigma = {peaks_out.getPeak(0).getIntensityOverSigma():.2f}") @@ -65,7 +66,7 @@ Usage .. testoutput:: exampleIntegratePeaks1DProfile - I/sigma = 92.97 + I/sigma = 94.36 References ---------- diff --git a/docs/source/release/v6.12.0/Diffraction/Single_Crystal/New_features/38515.rst b/docs/source/release/v6.12.0/Diffraction/Single_Crystal/New_features/38515.rst new file mode 100644 index 000000000000..0b5c0f9959fb --- /dev/null +++ b/docs/source/release/v6.12.0/Diffraction/Single_Crystal/New_features/38515.rst @@ -0,0 +1 @@ +- Use :ref:`MultiDomainFunction` in :ref:`IntegratePeaks1DProfile ` to tie peak profile parameters accross pixels.