diff --git a/omc3/kmod_averages.py b/omc3/kmod_averages.py deleted file mode 100644 index e42b7569..00000000 --- a/omc3/kmod_averages.py +++ /dev/null @@ -1,319 +0,0 @@ -""" -Average K-Modulation Results ----------------------------- - -Average muliple K-Modulation results into a single file/dataframe. - - -**Arguments:** - -*--Required--* - -- **betastar** *(float)*: - - Model beta-star value of measurements. - - -- **ip** *(int)*: - - Specific ip to average over. - - -- **meas_paths** *(PathOrStr)*: - - Directories of Kmod results to import. - - -*--Optional--* - -- **output_dir** *(PathOrStr)*: - - Path to the directory where to write the output files. - - -- **plot**: - - Plot the averaged results. - - action: ``store_true`` - - -- **show_plots**: - - Show the plots. - - action: ``store_true`` - -""" -from __future__ import annotations - -from pathlib import Path -from typing import TYPE_CHECKING - -import numpy as np -import pandas as pd -import tfs -from generic_parser.entrypoint_parser import EntryPointParameters, entrypoint -from omc3.definitions.constants import PLANES -from omc3.plotting.plot_kmod_results import plot_kmod_results - -from omc3.kmod.constants import ( - BEAM, - BEAM_DIR, - BETASTAR, - ERR, - EXT, - LABEL, - LSA_FILE_NAME, - MDL, - RESULTS_FILE_NAME, - TIME, - AVERAGED_BETASTAR_FILENAME, - AVERAGED_BPM_FILENAME -) -from omc3.optics_measurements.constants import NAME, S -from omc3.utils import logging_tools -from omc3.utils.stats import weighted_mean, weighted_error -from omc3.utils.iotools import PathOrStr, save_config - -if TYPE_CHECKING: - from collections.abc import Sequence - - from generic_parser import DotDict - -LOG = logging_tools.get_logger(__name__) - -COLUMNS_TO_DROP: tuple[str, ...] = (TIME, LABEL, ) -COLUMNS_NO_AVERAGE: tuple[str, ...] = (S, ) - -def _get_params(): - """ - A function to create and return EntryPointParameters for Kmod average. - """ - params = EntryPointParameters() - params.add_parameter( - name="meas_paths", - required=True, - nargs="+", - type=PathOrStr, - help="Directories of Kmod results to import.", - ) - params.add_parameter( - name="ip", - required=True, - type=int, - help="Specific ip to average over." - ) - params.add_parameter( - name="betastar", - required=True, - type=float, - nargs="+", - help="Model beta-star values (x, y) of measurements.", - ) - params.add_parameter( - name="output_dir", - type=PathOrStr, - help="Path to the directory where to write the output files.", - ) - params.add_parameter( - name="plot", - action="store_true", - help="Plot the averaged results." - ) - params.add_parameter( - name="show_plots", - action="store_true", - help="Show the plots." - ) - return params - - -@entrypoint(_get_params(), strict=True) -def average_kmod_results_entrypoint(opt: DotDict) -> dict[int, tfs.TfsDataFrame]: - """ - Reads kmod results and averages over the different measurements. - - Args: - meas_paths (Sequence[Path|str]): - A sequence of kmod BPM results files to import. This can include either single - measurements (e.g., 'lsa_results.tfs') or averaged results - (e.g., 'averaged_bpm_beam1_ip1_beta0.22m.tfs'). - - ip (int): - The specific IP to average over. - - betastar (float): - The model beta-star values (x, y) of the measurements. - If a single value is given, beta-star_x == beta-star_y is assumed. - - output_dir (Path|str): - Path to the output directory to write out the averaged results. Optional. - - plot (bool): - If True, plots the averaged results. Default: False. - - show_plots (bool): - If True, show the plots. Default: False. - - Returns: - Dictionary of averaged kmod-result DataFrames by beams for the - bpm-data and with key `0` for the beta-star data. - """ - LOG.info("Starting Kmod averaging.") - if opt.output_dir is not None: - opt.output_dir = Path(opt.output_dir) - opt.output_dir.mkdir(exist_ok=True) - save_config(opt.output_dir, opt, __file__) - - if len(opt.betastar) == 1: - opt.betastar = [opt.betastar[0], opt.betastar[0]] - - meas_paths = [Path(m) for m in opt.meas_paths] - - averaged_results = get_average_betastar_results(meas_paths, opt.betastar) - averaged_bpm_results = get_average_bpm_results(meas_paths) - - if opt.output_dir is not None: - filename = AVERAGED_BETASTAR_FILENAME.format(ip=opt.ip, betastar_x=opt.betastar[0], betastar_y=opt.betastar[1]) - tfs.write(opt.output_dir / f'{filename}{EXT}', averaged_results, save_index=BEAM) - - for beam, df in averaged_bpm_results.items(): - filename = AVERAGED_BPM_FILENAME.format(beam=beam, ip=opt.ip, betastar_x=opt.betastar[0], betastar_y=opt.betastar[1]) - tfs.write(opt.output_dir / f'{filename}{EXT}', df, save_index=NAME) - - if opt.plot: - plot_kmod_results( - data=averaged_results, - ip=opt.ip, - output_dir=opt.output_dir, - show=opt.show_plots - ) - - averaged_bpm_results[0] = averaged_results - return averaged_bpm_results - - -def get_average_betastar_results(meas_paths: Sequence[Path], betastar: list[float]) -> tfs.TfsDataFrame: - """ - Calculate the average betastar results for the given measurements. - - Args: - meas_paths: The paths to the measurements. - betastar: The model betastar value. - - Returns: - The final results as a DataFrame; both beams merged. - """ - LOG.debug("Averaging beta-star results.") - final_results = {} - for beam in [1, 2]: - try: - all_dfs = [ - tfs.read(dir_path / f"{BEAM_DIR}{beam}" / f"{RESULTS_FILE_NAME}{EXT}") - for dir_path in meas_paths - ] - except FileNotFoundError as e: - LOG.warning(f"Could not find all results for beam {beam}. Skipping.", exc_info=e) - continue - - mean_df = _get_averaged_df(all_dfs) - - for plane, bstar in zip(PLANES, betastar): - mean_df[f'{BETASTAR}{plane}{MDL}'] = bstar - - mean_df[BEAM] = beam - final_results[beam] = mean_df - final_df = tfs.concat(final_results.values()).set_index(BEAM) - return final_df - - -def get_average_bpm_results(meas_paths: Sequence[Path]) -> dict[int, tfs.TfsDataFrame]: - """ - Calculate the average results for BPMs/IPs for the given measurements. - - Args: - meas_paths: The paths to the measurements. - - Returns: - final_results: A dictionary containing the average bpm betas results for each beam. - """ - LOG.debug("Averaging bpm results.") - final_results = {} - - for beam in [1, 2]: - try: - all_dfs = [ - tfs.read(dir_path / f"{BEAM_DIR}{beam}" / f"{LSA_FILE_NAME}{EXT}", index=NAME) - for dir_path in meas_paths - ] - except FileNotFoundError as e: - LOG.warning(f"Could not find all results for beam {beam}. Skipping.", exc_info=e) - continue - - final_results[beam] = _get_averaged_df(all_dfs) - return final_results - - -def _get_averaged_df(dfs: Sequence[pd.DataFrame]) -> pd.DataFrame: - """ Calculate the average over the data in the given dfs. - - This function calculates the means and errors over the given dataframes, - using the weighted_mean and weighted_error functions from the `stats` module, - which takes the standard deviation of the data and their errors into account. - If no error column is present, - - It is assumed the same columns are present in all dataframes, - and the average is only done on rows, that have common indices. - The order of rows and columns is irrelevant. - - In case only a single dataframe is given, this frame is returned, instead of doing calculations. - """ - # Select columns for averaging, determines column order of output - columns = dfs[0].columns - no_average_cols = list(columns.intersection(COLUMNS_NO_AVERAGE)) - drop_cols = list(columns.intersection(COLUMNS_TO_DROP)) - - # Check if we actually need to average - if len(dfs) == 1: - LOG.warning("Only single DataFrame given for averaging -> Returning it.") - return dfs[0].drop(columns=drop_cols) - - # Check indices, which also determines row order of output - index = dfs[0].index - for df in dfs[1:]: - index = index.intersection(df.index) - - if not len(index): - msg = "Cannot perform averaging as the files all have no common indices." - raise KeyError(msg) - - - data_cols = [ - col for col in columns - if (not col.startswith(ERR)) and (col not in no_average_cols + drop_cols) - ] - - # Compute the weighted mean and weighted error for each column - avg_df = dfs[0].loc[index, no_average_cols] - - for data_col in data_cols: - err_col = f"{ERR}{data_col}" - if err_col not in columns: - err_col = None - - # Select data to average - data = np.array([df.loc[index, data_col].values for df in dfs]) - errors = None - if err_col is not None: - errors = np.array([df.loc[index, err_col].values for df in dfs]) - - # Compute weighted mean and error - avg_df.loc[index, data_col] = weighted_mean(data, errors, axis=0) - avg_df.loc[index, err_col] = weighted_error(data, errors, axis=0, t_value_corr=False) - - return avg_df - - -if __name__ == "__main__": - average_kmod_results_entrypoint() diff --git a/omc3/kmod_lumi_imbalance.py b/omc3/kmod_lumi_imbalance.py index c7a963ee..4853100b 100644 --- a/omc3/kmod_lumi_imbalance.py +++ b/omc3/kmod_lumi_imbalance.py @@ -38,6 +38,7 @@ """ from __future__ import annotations +import copy from pathlib import Path from typing import TYPE_CHECKING @@ -63,7 +64,7 @@ from generic_parser import DotDict LOG = logging_tools.get_logger(__name__) -IPS = ("ip1", "ip2", "ip5", "ip8") +IPS: tuple[str, ...] = ("ip1", "ip2", "ip5", "ip8") def _get_params(): """ @@ -85,28 +86,35 @@ def _get_params(): @entrypoint(_get_params(), strict=True) -def calculate_lumi_imbalance_entrypoint(opt: DotDict) -> tfs.TfsDataFrame: +def calculate_lumi_imbalance(opt: DotDict) -> tfs.TfsDataFrame: output_path = opt.output_dir if output_path is not None: output_path = Path(output_path) output_path.mkdir(parents=True, exist_ok=True) - if isinstance(opt.ip1, (Path, str)) and isinstance(opt.ip5, (Path, str)): - save_config(output_path, opt, __file__) - - dfs = _read_and_sort_dataframes(opt) - df = get_lumi_imbalance(**dfs) + opt_cp = copy.copy(opt) + for ip in IPS: + if not isinstance(opt_cp[ip], (Path, str)): + opt_cp[ip] = "(was provided as DataFrame)" + save_config(output_path, opt_cp, __file__) + + dfs = _read_and_check_dataframes(opt) + df = get_lumi_imbalance_df(**dfs) betastar_x, betastar_y = dfs[list(dfs.keys())[0]].loc[1, [f'{BETASTAR}X{MDL}', f'{BETASTAR}Y{MDL}']].values if output_path is not None: tfs.write( - output_path / f"{EFFECTIVE_BETAS_FILENAME.format(betastar_x=betastar_x, betastar_y=betastar_y)}{EXT}", df, + output_path / f"{EFFECTIVE_BETAS_FILENAME.format(betastar_x=betastar_x, betastar_y=betastar_y)}{EXT}", + df, save_index=IP ) return df -def _read_and_sort_dataframes(opt: DotDict) -> dict[str, tfs.TfsDataFrame]: +def _read_and_check_dataframes(opt: DotDict) -> dict[str, tfs.TfsDataFrame]: + """ + Read the given DataFrames if needed, check them for validity and return a dictionary. + """ dfs = {} for ip in IPS: df = opt.get(ip, None) @@ -118,44 +126,70 @@ def _read_and_sort_dataframes(opt: DotDict) -> dict[str, tfs.TfsDataFrame]: df = tfs.read(df, index=BEAM) except KeyError as e: msg = ( - f"Dataframe {df} does not contain a {BEAM} column." - "You need to run the `kmod_average` script on data for both beams first." + f"Dataframe '{df}' does not contain a '{BEAM}' column." + "You need to run the `kmod_average` script on data for both beams " + "before you can calulate the luminosity imbalance." ) raise KeyError(msg) from e - + + if BEAM in df.columns: + df = df.set_index(BEAM) + + for beam in (1, 2): + if beam not in df.index: + msg = ( + f"Dataframe '{df}' does not seem to contain data per beam." + "You need to run the `kmod_average` script on data for both beams " + "before you can calulate the luminosity imbalance." + ) + raise KeyError(msg) + dfs[ip] = df + + if len(dfs) != 2: + msg = ( + "Lumi inbalance can only be calculated for exactly two IPs, " + f"but instead {len(dfs)} were given." + ) + raise ValueError(msg) + return dfs -def get_lumi_imbalance(df_ip1: tfs.TfsDataFrame, df_ip5: tfs.TfsDataFrame) -> tfs.TfsDataFrame: + +def get_lumi_imbalance_df(**kwargs) -> tfs.TfsDataFrame: """ Calculate the effective beta stars and the luminosity imbalance from the input dataframes. Args: - df_ip1 (tfs.TfsDataFrame): a `TfsDataFrame` with the results from a kmod analysis, for IP1. - df_ip5 (tfs.TfsDataFrame): a `TfsDataFrame` with the results from a kmod analysis, for IP5. + ipA (tfs.TfsDataFrame): ar`TfsDataFrame` with the averaged results from a kmod analysis, for IP_A. + ipB (tfs.TfsDataFrame): a `TfsDataFrame` with the averaged results from a kmod analysis, for IP_B. + (Actually, any name that ends with an integer is fine.) Returns: tfs.TfsDataFrame with effective beta stars per IP and the luminosity imbalance added to the header. """ df_effective_betas = tfs.TfsDataFrame() - for ip, df in ((1, df_ip1), (5, df_ip5)): + for ip_str, df in kwargs.items(): + ip = int(ip_str[-1]) df_effective_betas.loc[ip, [f'{BETASTAR}', f'{ERR}{BETASTAR}']] = get_effective_beta_star_w_err(df) + ip_a, ip_b = df_effective_betas.index lumi_imb, lumi_imb_err = get_imbalance_w_err( - *tuple(df_effective_betas.loc[1, :]), - *tuple(df_effective_betas.loc[5, :]) + *tuple(df_effective_betas.loc[ip_a, :]), + *tuple(df_effective_betas.loc[ip_b, :]) ) + df_effective_betas.headers[f'{LUMINOSITY}{IMBALACE}'] = lumi_imb df_effective_betas.headers[f'{ERR}{LUMINOSITY}{IMBALACE}'] = lumi_imb_err return df_effective_betas -def get_imbalance_w_err(ip1_beta: float, ip1_beta_err: float, ip5_beta: float, ip5_beta_err: float) -> tuple[float, float]: +def get_imbalance_w_err(ipA_beta: float, ipA_beta_err: float, ipB_beta: float, ipB_beta_err: float) -> tuple[float, float]: """ - Calculate the luminosity imbalance IP1 / IP5 and its error. + Calculate the luminosity imbalance IP_A / IP_B and its error. """ - result = ip5_beta / ip1_beta # due to beta in the denominator for lumi - err = result * np.sqrt((ip5_beta_err/ip5_beta)**2 + (ip1_beta_err/ip1_beta)**2) + result = ipB_beta / ipA_beta # inverse due to beta in the denominator for lumi + err = result * np.sqrt((ipB_beta_err/ipB_beta)**2 + (ipA_beta_err/ipA_beta)**2) return result, err @@ -180,6 +214,11 @@ def get_effective_beta_star_w_err(df_ip: tfs.TfsDataFrame) -> tuple[float]: return beta, sigma +def _get_betastar_beams(df_ip: tfs.TfsDataFrame, errors: bool = False) -> tuple[float, float, float, float]: + """ Get betastar x and y for both beam 1 and beam 2. Order: b1x, b1y, b2x, b2y """ + return (*_get_betastar_xy(df_ip, 1, errors), *_get_betastar_xy(df_ip, 2, errors)) + + def _get_betastar_xy(df_ip: tfs.TfsDataFrame, beam: int, errors: bool = False) -> tuple[float, float]: """ Get betastar x and y for the given beam. """ if errors: @@ -187,10 +226,7 @@ def _get_betastar_xy(df_ip: tfs.TfsDataFrame, beam: int, errors: bool = False) - return tuple(df_ip.loc[beam, [f'{BETASTAR}X', f'{BETASTAR}Y']]) -def _get_betastar_beams(df_ip: tfs.TfsDataFrame, errors: bool = False) -> tuple[float, float, float, float]: - """ Get betastar x and y for both beam 1 and beam 2. Order: b1x, b1y, b2x, b2y """ - return (*_get_betastar_xy(df_ip, 1, errors), *_get_betastar_xy(df_ip, 2, errors)) - +# Commandline Entry Point ------------------------------------------------------ if __name__ == "__main__": - calculate_lumi_imbalance_entrypoint() \ No newline at end of file + calculate_lumi_imbalance() \ No newline at end of file diff --git a/tests/unit/test_kmod_averaging.py b/tests/unit/test_kmod_averaging.py index b51307f5..0aea4c2e 100644 --- a/tests/unit/test_kmod_averaging.py +++ b/tests/unit/test_kmod_averaging.py @@ -4,7 +4,7 @@ import pytest import tfs -from omc3.kmod_averages import ( +from omc3.kmod_average import ( AVERAGED_BETASTAR_FILENAME, AVERAGED_BPM_FILENAME, EXT, diff --git a/tests/unit/test_kmod_lumi_imbalance.py b/tests/unit/test_kmod_lumi_imbalance.py index 161bbe93..9f7359e1 100644 --- a/tests/unit/test_kmod_lumi_imbalance.py +++ b/tests/unit/test_kmod_lumi_imbalance.py @@ -4,7 +4,7 @@ import tfs from omc3.kmod.constants import AVERAGED_BETASTAR_FILENAME, EFFECTIVE_BETAS_FILENAME, EXT -from omc3.kmod_lumi_imbalance import calculate_lumi_imbalance_entrypoint +from omc3.kmod_lumi_imbalance import calculate_lumi_imbalance from tests.unit.test_kmod_averaging import REFERENCE_DIR, _get_reference_dir @@ -13,7 +13,7 @@ def test_kmod_lumi_imbalance(tmp_path): beta = 0.22 path_beta_ip1 = _get_input_path(1, beta) path_beta_ip5 = _get_input_path(5, beta) - calculate_lumi_imbalance_entrypoint(ip1=path_beta_ip1, ip5=path_beta_ip5, output_dir=tmp_path) + calculate_lumi_imbalance(ip1=path_beta_ip1, ip5=path_beta_ip5, output_dir=tmp_path) _assert_correct_files_are_present(tmp_path, beta) eff_betas = tfs.read(tmp_path / _get_filename(beta)) @@ -42,6 +42,6 @@ def update_reference_files(): beta = 0.22 path_beta_ip1 = _get_input_path(1, beta) path_beta_ip5 = _get_input_path(5, beta) - calculate_lumi_imbalance_entrypoint(ip1=path_beta_ip1, ip5=path_beta_ip5, output_dir=REFERENCE_DIR) + calculate_lumi_imbalance(ip1=path_beta_ip1, ip5=path_beta_ip5, output_dir=REFERENCE_DIR) for ini_file in REFERENCE_DIR.glob("*.ini"): ini_file.unlink() \ No newline at end of file