diff --git a/cyclops/data/df/dates/__init__.py b/cyclops/data/df/dates/__init__.py new file mode 100644 index 000000000..647e68eb4 --- /dev/null +++ b/cyclops/data/df/dates/__init__.py @@ -0,0 +1,18 @@ +"""Processors for date handling.""" + +from cyclops.data.df.dates.dates import ( + DatePairHandler, + analyze_dates, + analyzed_dates_differ, + analyzed_dates_failed_to_convert, + combine_date_and_time_components, + components_to_datetime, + datetime_components, + datetime_to_unix, + dateutil_parse_date, + extract_dateutil_components, + filter_date_deltas, + has_time, + round_date, + unix_to_datetime, +) diff --git a/cyclops/data/df/dates/dates.py b/cyclops/data/df/dates/dates.py new file mode 100644 index 000000000..5ff2e1877 --- /dev/null +++ b/cyclops/data/df/dates/dates.py @@ -0,0 +1,690 @@ +"""Utilities for working with dates in pandas DataFrames.""" + +import datetime +import warnings +from datetime import timedelta +from typing import Any, List, Optional, Union + +import numpy as np +import pandas as pd +from dateutil import parser as du_parser +from dateutil.parser import ParserError + +from cyclops.data.df.series_validation import is_datetime_series, is_str_series +from cyclops.data.df.utils import check_cols + + +# Datetime component names +DATE_COMPONENTS = ["year", "month", "day"] +TIME_COMPONENTS = ["hour", "minute", "second", "microsecond"] +DT_COMPONENTS = DATE_COMPONENTS + TIME_COMPONENTS + +# Parsing results for pd.to_datetime (PD_DT) and the dateutil parser (DU_DT) +PD_DT = "pd" +DU_DT = "du" +DU_TO_PD_DT = f"{DU_DT}_to_{PD_DT}" + + +def datetime_to_unix(series: pd.Series) -> pd.Series: + """Convert a datetime series to UNIX timestamps. + + Parameters + ---------- + series : pandas.Series + Datetime series. + + Returns + ------- + pd.Series + Series containing UNIX timestamps corresponding to the datetime values. + """ + is_datetime_series(series, raise_err=True) + + return series.astype(int) / 10**9 + + +def unix_to_datetime(series: pd.Series) -> pd.Series: + """Convert a series of UNIX timestamps to datetime. + + Parameters + ---------- + series : pandas.Series + Series containing UNIX timestamps. + + Returns + ------- + pd.Series + Series containing datetime values corresponding to the UNIX timestamps. + """ + return series.astype(int).astype("datetime64[s]") + + +def round_date(dates: pd.Series) -> pd.Series: + """Round datetimes to the nearest day. + + Parameters + ---------- + dates : pd.Series + Datetime series. + + Returns + ------- + pd.Series + Series rounded to the nearest day. + """ + is_datetime_series(dates, raise_err=True) + + return dates.dt.round("1d") + + +def has_time( + dates: pd.Series, + raise_err_on_time: bool = False, +) -> pd.Series: + """Check whether any datetimes have a time component. + + Parameters + ---------- + dates : pd.Series + Datetime series. + raise_err : bool, default False + If True, raise an error if any date has a time component. + + Returns + ------- + bool + Whether any dates have a time component. + + Raises + ------ + ValueError + If any date has a time component and `raise_err` is True. + """ + # Round datetime values + rounded = round_date(dates) + + # If the same when rounded, then no time, if different, then has time + # Since NaN isn't equal to NaN, specifically check to make sure not null + has_time = (dates != rounded) & ~dates.isna() + + # Check if any dates have times and raise_err is True + if raise_err_on_time and has_time.any(): + raise ValueError("Dates cannot have a time component.") + + return has_time + + +# DEPRECIATED IN CONTRAST TO `analyze_dates`??? +def invalid_date(dates: pd.Series, **to_datetime_kwargs: Any) -> pd.Series: + """Return a boolean Series of whether a given series of dates are invalid. + + Parameters + ---------- + dates : pandas.Series + A string series containing (possibly invalid) dates. + **to_datetime_kwargs + Additional arguments for pandas.to_datetime. + + Returns + ------- + pandas.Series + Series with boolean values indicating whether each date is invalid. + + Raises + ------ + ValueError + When "errors" is specified in `to_datetime_kwargs` + """ + is_str_series(dates, raise_err=True) + + if "errors" in to_datetime_kwargs: + raise ValueError("Cannot specify 'errors' in to_datetime_kwargs.") + + return pd.isna(pd.to_datetime(dates, errors="coerce", **to_datetime_kwargs)) + + +def filter_date_deltas( + dates: pd.DataFrame, + delta_cutoff: Optional[Union[str, timedelta]] = None, + left_delta_cutoff: Optional[Union[str, timedelta]] = None, + right_delta_cutoff: Optional[Union[str, timedelta]] = None, +) -> pd.DataFrame: + """ + Filter DataFrame based on date delta conditions. + + Parameters + ---------- + dates : pandas.DataFrame + DataFrame containing 'delta' column. + delta_cutoff : timedelta, optional, default=None + Maximum delta value allowed. + left_delta_cutoff : timedelta, optional, default=None + Minimum delta value allowed. + right_delta_cutoff : timedelta, optional, default=None + Maximum delta value allowed. + + Returns + ------- + pandas.DataFrame + Filtered DataFrame based on delta conditions. + + Raises + ------ + ValueError + When `delta_cutoff` specified along with `left_delta_cutoff` or + `right_delta_cutoff`. + """ + if delta_cutoff is not None: + if left_delta_cutoff is not None or right_delta_cutoff is not None: + raise ValueError( + "Cannot specify left_delta_cutoff or right_delta_cutoff when " + "delta_cutoff is specified.", + ) + + return dates[abs(dates["delta"]) <= pd.to_timedelta(delta_cutoff)] + + if left_delta_cutoff is not None: + dates = dates[dates["delta"] >= pd.to_timedelta(left_delta_cutoff)] + + if right_delta_cutoff is not None: + dates = dates[dates["delta"] <= pd.to_timedelta(right_delta_cutoff)] + + return dates + + +class DatePairHandler: + """Handler to create and manipulate pairs based on dates and IDs. + + Attributes + ---------- + data_x : pandas.DataFrame + DataFrame containing data x. Should have the index `id` and a `date` column. + data_y : pandas.DataFrame + DataFrame containing data y. Should have the index `id` and a `date` column. + date_pairs : pandas.DataFrame + DataFrame containing date pair results. + _paired_data : pandas.DataFrame, optional + The paired data coming from the data_x and data_y columns. Computed and stored + based on `date_pairs` when the `paired_data` method is first called. + """ + + def __init__( + self, + data_x: pd.DataFrame, + data_y: pd.DataFrame, + delta_cutoff: Optional[Union[str, timedelta]] = None, + left_delta_cutoff: Optional[Union[str, timedelta]] = None, + right_delta_cutoff: Optional[Union[str, timedelta]] = None, + keep_closest_to: Optional[str] = None, + ) -> None: + """Initialize an instance of `DatePairHandler`.""" + assert data_x.index.name == "id" + assert data_y.index.name == "id" + assert "idx_x" not in data_x.columns + assert "idx_y" not in data_y.columns + assert "date" in data_x.columns + assert "date" in data_y.columns + + data_x["idx_x"] = np.arange(len(data_x)) + data_y["idx_y"] = np.arange(len(data_y)) + + date_pairs = data_x[["date", "idx_x"]].merge( + data_y[["date", "idx_y"]], + on="id", + how="inner", + ) + + if keep_closest_to is not None: + assert keep_closest_to in ["date_x", "date_y"] + + date_pairs["delta"] = date_pairs["date_x"] - date_pairs["date_y"] + date_pairs["abs_delta"] = abs(date_pairs["delta"]) + + date_pairs = filter_date_deltas( + date_pairs, + delta_cutoff=delta_cutoff, + left_delta_cutoff=left_delta_cutoff, + right_delta_cutoff=right_delta_cutoff, + ) + + if keep_closest_to is not None: + date_pairs = date_pairs.reset_index() + min_deltas = ( + date_pairs.groupby(["id", keep_closest_to]) + .agg( + { + "abs_delta": "min", + }, + ) + .reset_index() + ) + date_pairs = date_pairs.merge( + min_deltas, + on=["id", keep_closest_to, "abs_delta"], + how="inner", + ) + + self.data_x = data_x + self.data_y = data_y + self.date_pairs = date_pairs + self._paired_data = None + + @property + def paired_data(self) -> pd.DataFrame: + """Get paired data based on the date pairs. + + Returns + ------- + pandas.DataFrame + Paired data based on the date pairs. + """ + if self._paired_data is None: + self._paired_data = pd.concat( + [ + self.data_x.set_index("idx_x") + .loc[self.date_pairs["idx_x"]] + .reset_index(), + self.data_y.set_index("idx_y") + .loc[self.date_pairs["idx_y"]] + .reset_index(), + ], + axis=1, + ) + + return self._paired_data + + +def dateutil_parse_date( + date: str, + **parse_kwargs: Any, +) -> Union[datetime.datetime, float]: + """Parse a date string using dateutil's parser. + + Parameters + ---------- + date : str + Date string to be parsed. + **parse_kwargs + Keyword arguments to pass to the parser. + + Returns + ------- + datetime.datetime or float + Parsed datetime object or np.nan on failure. + """ + try: + return du_parser.parse(date, **parse_kwargs) + + # ParserError = failed to parse + # TypeError = wrong type, e.g., nan or int + except (ParserError, TypeError): + return np.nan + + +def extract_dateutil_components( + du_series: pd.Series, + components: Optional[List[str]] = None, +) -> pd.DataFrame: + """Extract datetime components from dates parsed from `dateutil` (du). + + Useful for Series full of datetimes that cannot be converted using + `pandas.to_datetime` without possibly losing dates to errors like + `OutOfBoundsDatetime`. + + Parameters + ---------- + du_series : pd.Series + Series of datetimes parsed using dateutil. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + + Returns + ------- + pd.DataFrame + DataFrame containing the extracted datetime components. + """ + + def extract_components( + datetime: datetime.datetime, + components: List[str], + ) -> np.ndarray: + if pd.isna(datetime): + return np.full(len(components), np.nan) + return np.array([getattr(datetime, comp) for comp in components]) + + components = components or DT_COMPONENTS + component_data = pd.DataFrame( + np.stack(du_series.apply(extract_components, args=(components,)).values), + columns=components, + index=du_series.index, + ) + return component_data.astype("Int64") + + +def datetime_components( + texts: pd.Series, + components: Optional[List[str]] = None, +) -> pd.DataFrame: + """Extract separate datetime components (NaN when missing) using dateutil. + + Useful because functionalities like `pandas.to_datetime` will return + NaT if a full date is not present (e.g., missing a year). + + Parameters + ---------- + texts : pd.Series + Series of datetime strings. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + + Returns + ------- + pd.DataFrame + DataFrame containing the extracted datetime components and the parsed date. + """ + # Extract dates with different values across all components + du = texts.apply(dateutil_parse_date) + du.rename(DU_DT, inplace=True) + + du2 = texts.apply( + dateutil_parse_date, + default=datetime.datetime(1, 2, 2, 2, 2, 2, 2), + ) + du2.rename("du2", inplace=True) + + # Where they are equal is not default, where they aren't is default (i.e., missing) + components = components or DT_COMPONENTS + equal = pd.concat( + [ + extract_dateutil_components(du, components=components), + extract_dateutil_components(du2, components=components).add_suffix("_2"), + ], + axis=1, + ) + + for _, comp in enumerate(components): + # If a value is missing (different for different default components), + # then replace it with NaN + equal[comp][equal[comp] != equal[f"{comp}_2"]] = np.nan + + return pd.concat([du, equal[components]], axis=1) + + +def analyzed_dates_differ( + analyzed: pd.DataFrame, + warn: bool = False, + raise_err: bool = False, +) -> pd.Series: + """Check where the analyzed `dateutil` and `pd.to_datetime` dates differ. + + Parameters + ---------- + analyzed : pd.DataFrame + A result of `analyze_dates`. + warn : bool, default False + Whether to warn the user when the dates differ. + raise_err : bool, default False + Whether to raise an error when the dates differ. + + Returns + ------- + pd.Series + Boolean series indicating where the dates from `pd.to_datetime` and + `dateutil` do not match. + + Raises + ------ + ValueError + Raised if `raise_err` is True and there are non-matching dates between + `pd.to_datetime` and `dateutil`. + """ + check_cols(analyzed, [PD_DT, DU_DT], raise_err_on_missing=True) + + # If the dates parsed from pd and du aren't the same date (and didn't + # both fail to parse), then flag that something funky might be going on + matching = (analyzed[PD_DT] == analyzed[DU_DT]) | ( + analyzed[[PD_DT, DU_DT]].isna().sum(axis=1) == 2 + ) + + if not matching.all(): + msg = ( + "`pd.to_datetime` and `dateutil` produced different results. " + "Consider manual inspection." + ) + + if raise_err: + raise ValueError(msg) + + if warn: + warnings.warn(msg, UserWarning, stacklevel=2) + + return ~matching + + +def analyzed_dates_failed_to_convert( + analyzed: pd.DataFrame, + warn: bool = False, + raise_err: bool = False, +) -> pd.Series: + """Check if any `dateutil` dates failed to convert using `pd.to_datetime`. + + One common failure is due to a `pandas.errors.OutOfBoundsDatetime`. + + Parameters + ---------- + analyzed : pd.DataFrame + A result of `analyze_dates`. + warn : bool, default False + Whether to warn the user if there are failures. + raise_err : bool, default False + Whether to raise an error if there are failures. + + Returns + ------- + pd.Series + Boolean series indicating where the `dateutil` dates failed to convert. + + Raises + ------ + ValueError + Raised if `raise_err` is True and there are `dateutil` dates failed to convert. + """ + check_cols(analyzed, [DU_DT, DU_TO_PD_DT], raise_err_on_missing=True) + + # If du date is not null but the converted date is, then it failed to convert + failed = analyzed[DU_DT].notnull() & analyzed[DU_TO_PD_DT].isna() + + if failed.any(): + msg = ( + "Failed to convert `dateutil` dates using `pd.to_datetime`. " + "Consider manual inspection." + ) + + if raise_err: + raise ValueError(msg) + + if warn: + warnings.warn(msg, UserWarning, stacklevel=2) + + return failed + + +def analyze_dates( + texts: pd.Series, + components: Optional[List[str]] = None, + warn: bool = True, +) -> pd.DataFrame: + """Analyze a series of dates and extract datetime components. + + Parameters + ---------- + texts : pd.Series + Series of datetime strings to be analyzed. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + warn : bool, default True + Whether to analyze the dates and warn the user about various anomalies. + + Returns + ------- + pd.DataFrame + DataFrame containing the analyzed dates and extracted components. + """ + is_str_series(texts, raise_err=True) + + texts.rename("text", inplace=True) + dates = texts.to_frame() + + dates[PD_DT] = pd.to_datetime( + dates["text"], + infer_datetime_format=True, + errors="coerce", + ) + + components = components or DT_COMPONENTS + dates = pd.concat( + [ + dates, + datetime_components(dates["text"], components=components), + ], + axis=1, + ) + + # Drop a component column if the whole column is NaN - it is likely never specified + dates.drop( + [comp for comp in components if dates[comp].isna().all()], + axis=1, + inplace=True, + ) + + dates[DU_TO_PD_DT] = pd.to_datetime( + dates[DU_DT], + infer_datetime_format=True, + errors="coerce", + ) + + if warn: + analyzed_dates_differ(dates, warn=True) + analyzed_dates_failed_to_convert(dates, warn=True) + + return dates + + +def components_to_datetime( + comps: pd.DataFrame, + default_time: Optional[datetime.time] = None, +) -> pd.Series: + """Convert a DataFrame of datetime components into a datetime series. + + Useful for combining separate date and time texts. + + Parameters + ---------- + comps: pandas.DataFrame + DataFrame of component columns. Must have `DATE_COMPONENTS` columns and may + have any in `DT_COMPONENTS`. + default_time : datetime.time, optional + Default time for filling null time components. Defaults to midnight (all 0). + + Returns + ------- + pd.Series + A datetime series. Null time components will be filled with the components in + `default_time`. Null date components will result in a null result. + + Notes + ----- + Consider using `default_time=datetime.time(12)` (noon) to approximate the datetime + with the least error. If nothing is specified, it defaults to midnight, which is + a bad default for many events, e.g., few medical procedures take place at night. + + Examples + -------- + >>> # Convert components to datetime, using noon as the default time + >>> dts = components_to_datetime(comps, default_time=datetime.time(12)) + """ + # Check component columns + check_cols(comps, DATE_COMPONENTS, raise_err_on_missing=True) + check_cols(comps, DT_COMPONENTS, raise_err_on_unexpected=True) + avail_time_comps = set(comps.columns).intersection(set(TIME_COMPONENTS)) + + if not (comps.dtypes.unique().astype(str) == "Int64").all(): + raise ValueError("Components must have type 'Int64'.") + + # Handle default times + default_time = default_time or datetime.time(0) + + for time_comp in TIME_COMPONENTS: + time_comp_value = getattr(default_time, time_comp) + + # If the column already exists, fill any nulls with the default value + if time_comp in avail_time_comps: + comps[time_comp].fillna(time_comp_value, inplace=True) + # If not, then create the column using the default value + else: + comps[time_comp] = time_comp_value + comps[time_comp] = comps[time_comp].astype("Int64") + + # Convert the components (now filled with time defaults) into datetimes + cmp = comps.copy() + index = cmp.index + cmp.reset_index(drop=True, inplace=True) + + # Convert only the datetimes which are not missing date components, + # the rest will be filled with NaN during reindexing + res = pd.to_datetime(cmp[~cmp.isna().any(axis=1)].astype(int)).reindex(cmp.index) + res.index = index + + return res + + +def combine_date_and_time_components( + date_comps: pd.DataFrame, + time_comps: pd.DataFrame, +) -> pd.DataFrame: + """Combine date components from one DataFrame and time components from another. + + Parameters + ---------- + date_comps : pandas.DataFrame + DataFrame containing relevant date components. Non-relevant columns dropped. + time_comps : pandas.DataFrame + DataFrame containing relevant time components. Non-relevant columns dropped. + + Returns + ------- + pd.DataFrame + A DataFrame with the date components from `date_comps` and time components from + `time_comps`. + + Examples + -------- + >>> from cyclops.data.df.dates import ( + ... analyze_dates, + ... combine_date_and_time_components, + ... ) + >>> date_comps = analyze_dates(meta["AcquisitionDate"]) + >>> time_comps = analyze_dates(meta["AcquisitionTime"]) + >>> comps = combine_date_and_time_components( + >>> date_comps, + >>> time_comps, + >>> default_time=datetime.time(12), + >>> ) + >>> dts = components_to_datetime(datetime) + """ + if not date_comps.index.equals(date_comps.index): + raise ValueError( + "Indexes of `date_comps` and `time_comps` must be the same.", + ) + + unexpected_cols_date, _, _ = check_cols(date_comps, DATE_COMPONENTS) + date_comps = date_comps.drop(unexpected_cols_date, axis=1) + + unexpected_cols_time, _, _ = check_cols(time_comps, TIME_COMPONENTS) + time_comps = time_comps.drop(unexpected_cols_time, axis=1) + + return pd.concat([date_comps, time_comps], axis=1) + + +# def find_dates(text): +# matches = datefinder.find_dates(text, source=True, index=True) diff --git a/cyclops/data/df/dates/reconcile_dates.py b/cyclops/data/df/dates/reconcile_dates.py new file mode 100644 index 000000000..8b32456a6 --- /dev/null +++ b/cyclops/data/df/dates/reconcile_dates.py @@ -0,0 +1,595 @@ +"""Reconcile issues with dates in a DataFrame.""" + +import datetime +import warnings +from copy import deepcopy +from dataclasses import dataclass +from datetime import timedelta +from typing import Dict, Hashable, List, Optional + +import numpy as np +import pandas as pd +from sklearn.cluster import DBSCAN + +from cyclops.data.df.dates.dates import datetime_to_unix, has_time +from cyclops.data.df.pairs import ( + get_pairs, + pairs_to_groups, + split_pairs, +) +from cyclops.data.df.series_validation import is_datetime_series +from cyclops.data.df.utils import ( + check_cols, + combine_nonoverlapping, + groupby_agg_mode, + index_structure_equal, + is_multiindex, + or_conditions, + reset_index_merge, +) +from cyclops.utils.common import to_list_optional + + +def cluster_date_group(dates, dbscan): + dbscan.fit(dates.values.reshape(-1, 1)) + + return pd.Series(dbscan.labels_) + + +def cluster_dates(dates, dbscan: DBSCAN): + # Convert to Unix for clustering + unix = datetime_to_unix(dates) + + # Create clusters for each group + clusters = unix.groupby(level=0).apply(cluster_date_group, dbscan) + + clusters.index = clusters.index.droplevel(1) + clusters = clusters.replace({-1: np.nan}).astype("Int64") + + return clusters + + +def get_date_clusters(dates, max_neighbourhood_delta: datetime.timedelta): + check_cols(dates, ["date", "approx"], raise_err_on_missing=True) + + dbscan = DBSCAN( + eps=max_neighbourhood_delta.total_seconds(), + min_samples=2, + ) + clusters = cluster_dates(dates["date"], dbscan) + clusters.rename("cluster", inplace=True) + + # Combine into the original data + clusters = pd.concat([dates, clusters], axis=1) + + return clusters + + +def cluster_analysis(unres_hard, clusters): + index_col = clusters.index.names + + # Get the max cluster size for each group + cluster_size = clusters.reset_index().groupby(index_col + ["cluster"]).size() + cluster_size.rename("cluster_size", inplace=True) + + max_sizes = cluster_size.groupby(level=0).agg("max") + + clusters_of_max_size = reset_index_merge( + cluster_size, + max_sizes, + on=index_col + ["cluster_size"], + how="inner", + index_col=index_col, + )["cluster"] + clusters_of_max_size + clusters_of_max_size = clusters_of_max_size.to_frame() + clusters_of_max_size["is_max_size"] = True + + # The below averaging methods only make sense if there is a single max cluster, + # so ignore groups with several clusters of same size + clusters_of_max_size_vcs = clusters_of_max_size.index.value_counts() + + clusters_of_max_size = clusters_of_max_size[ + ~clusters_of_max_size.index.isin( + clusters_of_max_size_vcs.index[clusters_of_max_size_vcs > 1], + ) + ] + + # Get the is_max_size column into clusters + clusters = reset_index_merge( + clusters, + clusters_of_max_size, + how="left", + on=index_col + ["cluster"], + index_col=index_col, + ) + clusters["is_max_size"].fillna(False, inplace=True) + + # Get only the dates in the largest cluster + clusters_largest = clusters[clusters["is_max_size"]] + + # Get the hard dates in the largest clusters + clusters_largest_hard = clusters_largest[~clusters_largest["approx"]] + + # # === Resolve: largest_cluster_hard_mode + # single_modes = groupby_agg_mode( + # unres_hard["date"].groupby(level=0), + # single_modes_only=True, + # ) + + # largest_hard_is_mode = clusters_largest_hard.index.isin(single_modes.index) + # largest_cluster_hard_mode = clusters_largest_hard[largest_hard_is_mode]["date"] + + # # Continue without the resolved ones + # clusters_largest_hard = clusters_largest_hard[~largest_hard_is_mode] + + # === Resolve: largest_cluster_hard_mean === + # Take the average of these largest cluster hard dates + largest_cluster_hard_mean = ( + clusters_largest_hard.reset_index() + .groupby(index_col + ["cluster"])["date"] + .agg("mean") + ) + largest_cluster_hard_mean.index = largest_cluster_hard_mean.index.droplevel(1) + + # === Resolve: largest_cluster_approx_mean === + # Now consider the largest clusters which have only approximate values + all_approx = clusters_largest.groupby(level=0)["approx"].all() + + clusters_largest_approx = clusters_largest[ + clusters_largest.index.isin(all_approx.index[all_approx]) + ].copy() + + largest_cluster_approx_mean = clusters_largest_approx.groupby( + index_col + ["cluster"], + )["date"].agg("mean") + largest_cluster_approx_mean.index = largest_cluster_approx_mean.index.droplevel(1) + + return clusters_largest, largest_cluster_hard_mean, largest_cluster_approx_mean + + +def analyze_typos(dates_hard): + index_col = dates_hard.index.names + + # Get all unique hard dates for each group + dates_hard_unique = ( + dates_hard["date"] + .reset_index() + .value_counts() + .reset_index() + .drop(0, axis=1) + .set_index(index_col)["date"] + ) + + # Ignore any groups which only have one unique hard date + dates_hard_unique_vcs = dates_hard_unique.index.value_counts() + dates_hard_unique_vcs = dates_hard_unique_vcs[dates_hard_unique_vcs > 1] + dates_hard_unique_vcs.rename("n_unique", inplace=True) + + dates_hard_unique = dates_hard_unique.loc[dates_hard_unique_vcs.index] + + def date_to_char(dates): + chars = dates.astype(str).str.split("", expand=True) + chars.drop(columns=[0, 5, 8, 11], inplace=True) + chars.rename( + { + 1: "y1", + 2: "y2", + 3: "y3", + 4: "y4", + 6: "m1", + 7: "m2", + 9: "d1", + 10: "d2", + }, + axis=1, + inplace=True, + ) + chars = chars.astype("uint8") + + return chars + + # Convert the dates into characters + chars = date_to_char(dates_hard_unique) + + # Compute hard date character combinations + pairs = chars.groupby(level=0).apply(get_pairs) + pairs.index = pairs.index.droplevel(1) + pairs.index.names = index_col + + pairs_x, pairs_y = split_pairs(pairs) + + # Calculate equal characters + pairs_eq = pairs_x == pairs_y + pairs_eq = pairs_eq.add_suffix("_eq") + pairs_eq["n_diff"] = 8 - pairs_eq.sum(axis=1) + + # Calculate adjacent characters, e.g., 5 vs 6 or 2 vs 1 + # Convert from uint8 to int to avoid rounding issues + pairs_adj = (pairs_x.astype(int) - pairs_y.astype(int)).abs() == 1 + pairs_adj = pairs_adj.add_suffix("_adj") + pairs_adj["n_adj"] = pairs_adj.sum(axis=1) + + # Collect information about the typo pairs + pairs = pd.concat([pairs_eq, pairs_adj], axis=1) + + # Incorporate date info + # Recover the dates from the characters + date_x = pairs_x.astype(str).agg("".join, axis=1) + date_x = ( + date_x.str.slice(stop=4) + + "-" + + date_x.str.slice(start=4, stop=6) + + "-" + + date_x.str.slice(start=6) + ) + + date_y = pairs_y.astype(str).agg("".join, axis=1) + date_y = ( + date_y.str.slice(stop=4) + + "-" + + date_y.str.slice(start=4, stop=6) + + "-" + + date_y.str.slice(start=6) + ) + pairs["date_x"] = pd.to_datetime(date_x) + pairs["date_y"] = pd.to_datetime(date_y) + pairs["year"] = pairs["date_x"].dt.year == pairs["date_y"].dt.year + pairs["month"] = pairs["date_x"].dt.month == pairs["date_y"].dt.month + pairs["day"] = pairs["date_x"].dt.day == pairs["date_y"].dt.day + + # Check if gotten the day/month transposed + pairs["dm_transpose"] = (pairs["date_x"].dt.day == pairs["date_y"].dt.month) & ( + pairs["date_x"].dt.month == pairs["date_y"].dt.day + ) + + # Logic for determining whether a typo or not + certain_conds = [ + # Only one different character + (pairs["n_diff"] == 1), + # Two different characters with at least one adjacent + ((pairs["n_diff"] == 2) & (pairs["n_adj"] >= 1)), + # Day and month are transposed, but correct year + (pairs["dm_transpose"] & pairs["year"]), + ] + pairs["typo_certain"] = or_conditions(certain_conds) + + pairs["typo_possible"] = pairs["n_diff"] <= 3 + + # Create typo groups from pairs of possible typos + typo_pairs = pairs[pairs["typo_certain"] | pairs["typo_possible"]] + + typo_groups = ( + typo_pairs[["date_x", "date_y"]] + .astype(str) + .groupby(level=0) + .apply( + pairs_to_groups, + ) + .reset_index() + .set_index(index_col + ["group"])["level_1"] + ) + typo_groups.rename("date", inplace=True) + + # Convert typos to characters + typo_group_chars = date_to_char(typo_groups) + + def mode_scalar_or_list(series): + mode = pd.Series.mode(series) + + if len(mode) > 1: + return mode.to_list() + + return mode + + # Compile the most popular character options seen in each typo group + typo_value_options = typo_group_chars.groupby(level=[0, 1]).agg( + dict( + zip( + typo_group_chars.columns, + [mode_scalar_or_list] * len(typo_group_chars.columns), + ), + ), + ) + + """ + LEFT TO DO: + Compile a "date_possible" object + - Any completely filled typo_value_options (no lists) are essentially solved + - For day/month transpositions, those would be two possible dates [1914-11-03, 1914-03-11] + Still need to check out letter transpositions - 1956-10-02 vs 1956-10-20 + Perhaps do a mean for the one day/ten day/one month cols? The user can specify what's allowed? + - Trade off between accuracy and just having nulls instead - date accuracy importance is use case specific + + As we go down the line of columns, disagreements become less and less important + That means we could take a mean of two disagreeing days, but not years, or + thousands of years + """ + + return pairs, typo_pairs, typo_groups, typo_value_options + + +@dataclass +class DateReconcilerResults: + index_col: List[Hashable] + resolved: pd.DataFrame + dates: pd.DataFrame + dates_hard: pd.DataFrame + dates_approx: pd.DataFrame + groups: pd.DataFrame + unres: pd.DataFrame + unres_hard: pd.DataFrame + unres_approx: pd.DataFrame + unres_groups: pd.DataFrame + clusters_largest: pd.DataFrame + pairs: pd.DataFrame + typo_pairs: pd.DataFrame + typo_groups: pd.Series + typo_value_options: pd.DataFrame + + +class DateReconciler: + """ + + Notes + ----- + === Resolutions === + - one_entry: Group contains one entry - select this date + - one_date: Contains multiple entries, but one unique date value - select this date + - one_multi_hard: Group which contains multiple of the same hard dates, but not + multiple sets of them, e.g., two instances of 1988-03-09 and two of 1974-06-20. + Works since it's unlikely for a typo or system error to produce the same date. + - hard_single_mode: Groups containing one hard date mode. + ### - largest_cluster_hard_mode: If after clustering, only one cluster of max size is + ### found, then take the mode of the hard dates, provided there is just one mode. + - largest_cluster_hard_mean: From the previous case, if more than one mode, then + take the average all of the hard dates in that cluster. + - largest_cluster_approx_mean: Same scenario as above, except the largest cluster + had no hard dates, so instead take the average of the approx dates. + + === Hard vs approximate dates === + One important distinction is whether a date is approximate (approx) or not: + - Approx: Computed, rounded, etc. - close to the real date, but maybe not equal + (e.g., only the year was given, or computing DOB from age and event time) + - Hard: System-defined or hand-inputted dates - these should be the true date, + with the exception of system errors and typos + + Delta distances are computed for both hard and approx dates, but Levenshtein + distance is only computed for hard dates. + + Approx dates take on supporting roles, e.g., is a given hard date near to many + supporting approx dates, or can be used as a backup with no hard dates available. + """ + + def __init__( + self, + sources: Dict[Hashable, pd.Series], + date_score_fn: callable, + approx_sources: Optional[List[Hashable]] = None, + approx_near_thresh: Optional[timedelta] = None, + once_per_source: bool = True, + ): + """ + sources : dict + Dictionary of datetime Series, where the key indicates the source. + date_score_fn : callable + A function which accepts a returns float between 0 and 1, where this value + represents the score (feasibility) of the date. + approx_sources : list of hashable, optional + Sources where the dates have been approximated - rounded, calculated, etc. + approx_near_thresh: datetime.timedelta, optional + Threshold for considering approximated sources to be the same. Must be + specified if there are any approximate sources. + once_per_source : bool, default True + Consider a unique index/date pair only once per source. Helpful for + ensuring that sources with more/repeated entries don't hold more weight + """ + # Handle approximate date sources + if approx_sources is not None and approx_near_thresh is None: + raise ValueError( + "Must specify `approx_near_thresh` if `approx_sources` specified.", + ) + approx_sources = to_list_optional(approx_sources, none_to_empty=True) + + if not set(approx_sources).issubset(set(sources.keys())): + raise ValueError( + "`approx_sources` must be a subset of the `sources` keys.", + ) + + self.dates = self._preproc_sources(sources, approx_sources, once_per_source) + self.date_score_fn = date_score_fn + + self.approx_sources = approx_sources + self.approx_near_thresh = approx_near_thresh + + def _preproc_sources(self, sources, approx_sources, once_per_source): + # Preprocess the sources/dates + dates = [] + prev_source = None + + for source, date in deepcopy(sources).items(): + try: + # Confirm datetime dtype + is_datetime_series(date, raise_err=True) + + # Raise an error if having a multiindex + is_multiindex( + sources[list(sources.keys())[0]].index, + raise_err_multi=True, + ) + + # Confirm identical index structures + if prev_source is not None: + index_structure_equal( + date.index, + sources[prev_source].index, + raise_err=True, + ) + + # No dates can have times - it messes things up + has_time(date, raise_err_on_time=True) + + except Exception as exc: + raise ValueError(f"Issue with series - source {source}.") from exc + + date.dropna(inplace=True) + date.rename("date", inplace=True) + + if once_per_source: + index_col = date.index.names + date = ( + date.reset_index() + .drop_duplicates( + keep="first", + ) + .set_index(index_col)["date"] + ) + + date = date.to_frame() + date["source"] = source + date["approx"] = source in approx_sources + + dates.append(date) + prev_source = source + + dates = pd.concat(dates) + dates = dates[~dates.index.isna()] + dates.sort_index(inplace=True) + + if not (dates["date"].dt.time == datetime.time(0)).all(): + warnings.warn( + "Dates with times are not supported. Converting to date only.", + ) + + return dates + + def _combined_resolved(self, groups, groups_resolved): + resolved = [] + for reason, dates in groups_resolved.items(): + dates = dates.to_frame() + dates["reason"] = reason + dates = dates.reindex(groups.index) + resolved.append(dates) + + return combine_nonoverlapping(resolved) + + def __call__(self): + dates = self.dates.copy() + + index_col = list(dates.index.names) + + dates["date_str"] = dates["date"].astype(str) + dates["date_score"] = dates["date"].apply(self.date_score_fn) + + # Split into approximate and hard dates + dates_approx = dates[dates["approx"]].drop("approx", axis=1) + dates_hard = dates[~dates["approx"]].drop("approx", axis=1) + + groups = dates.groupby(dates.index).size().rename("size").to_frame() + groups["one_entry"] = groups["size"] == 1 + groups["n_approx"] = dates_approx.groupby(dates_approx.index).size() + groups["n_approx"].fillna(0, inplace=True) + + # Groups are resolved on a case-by-case basis. Once resolved, they can be + # ignored to avoid wasted computation. The unresolved (unres) dates/groups + # will continue to be analyzed. + unres = dates.copy() + unres_hard = dates_hard.copy() + unres_approx = dates_approx.copy() + unres_groups = groups.copy() + + # Find and analyze typos in the hard dates + pairs, typo_pairs, typo_groups, typo_value_options = analyze_typos(dates_hard) + + # Having extracted the typo information, drop any impossible dates (score = 0) + # which might later confuse the analysis + unres = unres[unres["date_score"] != 0] + unres_hard = unres_hard[unres_hard["date_score"] != 0] + unres_approx = unres_approx[unres_approx["date_score"] != 0] + + groups_resolved = {} + + def resolve(resolved, reason): + nonlocal groups_resolved, unres, unres_hard, unres_approx, unres_groups + + groups_resolved[reason] = resolved + + unres = unres[~unres.index.isin(resolved.index)] + unres_hard = unres_hard[~unres_hard.index.isin(resolved.index)] + unres_approx = unres_approx[~unres_approx.index.isin(resolved.index)] + unres_groups = unres_groups[~unres_groups.index.isin(resolved.index)] + + # === Resolve: one_entry === + one_entry = unres[ + unres.index.isin(unres_groups.index[unres_groups["size"] == 1]) + ]["date"] + resolve(one_entry, "one_entry") + + # === Resolve: one_date === + vcs = unres["date"].reset_index().value_counts() + vcs.rename("count", inplace=True) + + # Iff a given row has a count equal to its group size, then only one unique date + instance_compare = vcs.reset_index().join(groups, how="left", on="research_id") + instance_compare.set_index(index_col, inplace=True) + one_date_cond = instance_compare["count"] == instance_compare["size"] + one_date = instance_compare[one_date_cond]["date"] + resolve(one_date, "one_date") + + # === Resolve: one_multi_hard === + # For each group, determine the hard dates which appear more than once + vcs_hard = unres_hard["date"].reset_index().value_counts() + vcs_hard_multi = vcs_hard[vcs_hard > 1] + + # Get the groups which only have a single set of these same hard dates + # Otherwise, it may be ambiguous as to which set is the right one + is_multi_one = vcs_hard_multi.index.droplevel(1).value_counts() + is_multi_one = is_multi_one[is_multi_one == 1] + + one_multi_hard = vcs_hard_multi.reset_index().set_index(index_col)["date"] + one_multi_hard = one_multi_hard.loc[is_multi_one.index] + + resolve(one_multi_hard, "one_multi_hard") + + # === Resolve: hard_single_mode === + hard_single_mode = groupby_agg_mode( + unres_hard["date"].groupby(level=0), + single_modes_only=True, + ) + resolve(hard_single_mode, "hard_single_mode") + + # === Cluster resolutions === + clusters = get_date_clusters( + unres[["date", "approx"]], + self.approx_near_thresh, + ) + + ( + clusters_largest, + largest_cluster_hard_mean, + largest_cluster_approx_mean, + ) = cluster_analysis(unres_hard, clusters) + + resolve(largest_cluster_hard_mean, "largest_cluster_hard_mean") + resolve(largest_cluster_approx_mean, "largest_cluster_approx_mean") + + # Combine all of the resolved data collected into a single DataFrame + resolved = self._combined_resolved(groups, groups_resolved) + + return DateReconcilerResults( + index_col=index_col, + resolved=resolved, + dates=dates, + dates_hard=dates_hard, + dates_approx=dates_approx, + groups=groups, + unres=unres, + unres_hard=unres_hard, + unres_approx=unres_approx, + unres_groups=unres_groups, + clusters_largest=clusters_largest, + pairs=pairs, + typo_pairs=typo_pairs, + typo_groups=typo_groups, + typo_value_options=typo_value_options, + ) diff --git a/cyclops/data/df/pairs.py b/cyclops/data/df/pairs.py new file mode 100644 index 000000000..fbcb59240 --- /dev/null +++ b/cyclops/data/df/pairs.py @@ -0,0 +1,122 @@ +"""Functions for working with pairs of values in DataFrames.""" + +from typing import Tuple, Union + +import networkx as nx +import numpy as np +import pandas as pd + +from cyclops.data.df.series_validation import to_frame_if_series + + +def get_pairs( + data: Union[pd.Series, pd.DataFrame], + self_match: bool = False, + combinations: bool = True, +) -> pd.DataFrame: + """Perform a self-cross to generate pairs. + + Parameters + ---------- + data : pandas.Series or pandas.DataFrame + Values used to create the pairs. + self_match : bool, default False + If False, rows which paired with themselves are excluded. + combinations : bool, default True + If True, remove one of two permutations, leaving only pair combinations. + + Returns + ------- + pandas.DataFrame + A DataFrame of pairs. + + Notes + ----- + Often, we are only interested in combinations of pairs, not permutations. For + example, if evaluating the pairs using a commutative function, where argument order + does not affect the result, we would want to take only the pair combinations. + """ + pairs = to_frame_if_series(data).merge(data, how="cross") + + if combinations or not self_match: + length = len(data) + idx0 = np.repeat(np.arange(length), length) + idx1 = np.tile(np.arange(length), length) + + if combinations: + pairs = pairs[idx0 <= idx1] if self_match else pairs[idx0 < idx1] + else: + pairs = pairs[idx0 != idx1] + + return pairs + + +def split_pairs(pairs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Split x and y pair columns into two separate DataFrames. + + Parameters + ---------- + pairs : pandas.DataFrame + A DataFrame of pairs. + + Returns + ------- + pandas.DataFrame + A DataFrame of pairs which had the "_x" columns. Suffix now removed. + pandas.DataFrame + A DataFrame of pairs which had the "_y" columns. Suffix now removed. + """ + half_len = len(pairs.columns) // 2 + + pairs_x = pairs.iloc[:, :half_len] + pairs_y = pairs.iloc[:, half_len:] + + cols = pairs.columns[:half_len].str.slice(stop=-2) + + pairs_x.columns = cols + pairs_y.columns = cols + + return pairs_x, pairs_y + + +def pairs_to_groups(pairs: pd.DataFrame) -> pd.DataFrame: + """Convert pairs of values in a DataFrame to groups of connected values. + + Given a DataFrame with two columns representing pairs of values, this function + constructs a graph where each value is a node and each pair is an edge. It then + finds the connected components of this graph, returning each component as a group + in a DataFrame. + + Parameters + ---------- + pairs : pandas.DataFrame + A DataFrame with two columns, each containing values. Each row represents a + pair of connected values. + + Raises + ------ + ValueError + If the input DataFrame does not have exactly two columns. + + Returns + ------- + pandas.DataFrame + A DataFrame with columns `value` and `group`. Each row represents a value and + its associated group ID. + """ + if pairs.shape[1] != 2: + raise ValueError("The DataFrame must have exactly two columns.") + + # Create an empty graph + graph = nx.Graph() + + # Add edges to the graph based on the DataFrame rows + for _, row in pairs.iterrows(): + graph.add_edge(row[pairs.columns[0]], row[pairs.columns[1]]) + + # Find connected components + components = pd.Series(nx.connected_components(graph)) + + # Convert connected components into a groups series + groups = components.explode() + return pd.Series(groups.index, index=groups.values, name="group") diff --git a/cyclops/data/df/series_validation.py b/cyclops/data/df/series_validation.py new file mode 100644 index 000000000..cb30c3bd6 --- /dev/null +++ b/cyclops/data/df/series_validation.py @@ -0,0 +1,215 @@ +"""Functions for validating Pandas Series.""" + +from typing import Any + +import pandas as pd +from pandas.api.types import ( + is_bool_dtype, + is_datetime64_any_dtype, + is_float_dtype, + is_integer_dtype, + is_string_dtype, +) + + +def is_series(data: Any, raise_err: bool = False) -> bool: + """Check if the input is a Pandas Series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas Series. + + Returns + ------- + bool + True if the input is a Pandas Series, False otherwise. + """ + if isinstance(data, pd.Series): + return True + + if raise_err: + raise ValueError("Data must be a Pandas series.") + + return False + + +def is_bool_series(data: Any, raise_err: bool = False) -> bool: + """Check if the input is a Pandas boolean series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a boolean Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas boolean series. + + Returns + ------- + bool + True if the input is a Pandas boolean series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_bool_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a boolean type.") + + return False + + +def is_int_series( + data: Any, + raise_err: bool = False, + raise_err_with_nullable: bool = False, +) -> bool: + """Check if the input is a Pandas integer series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not an integer Series. + raise_err_with_nullable: bool, default False + Whether to raise an error informing that, if the data is not an integer Series, + consider a nullable integer data type. Takes precedence over raise_err. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas integer series. + + Returns + ------- + bool + True if the input is a Pandas integer series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_integer_dtype(data): + return True + + if raise_err_with_nullable: + raise ValueError( + "Pandas series must have an integer type. Consider applying " + "`series.astype('Int64')`, where Int64 is a nullable integer data type " + "which enables the use of null values with an integer dtype.", + ) + + if raise_err: + raise ValueError("Pandas series must have an integer type.") + + return False + + +def is_float_series(data: Any, raise_err: bool = False) -> bool: + """Check if the input is a Pandas float series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a float Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas float series. + + Returns + ------- + bool + True if the input is a Pandas float series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_float_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a float type.") + + return False + + +def is_str_series(data: Any, raise_err: bool = False) -> bool: + """Check if the input is a Pandas string series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a string Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas string series. + + Returns + ------- + bool + True if the input is a Pandas string series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_string_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a string type.") + + return False + + +def is_datetime_series(data: Any, raise_err: bool = False) -> bool: + """Check if the input is a Pandas datetime series. + + Parameters + ---------- + data : Any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a datetime Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas datetime series. + + Returns + ------- + bool + True if the input is a Pandas datetime series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_datetime64_any_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a datetime type.") + + return False diff --git a/cyclops/data/df/utils.py b/cyclops/data/df/utils.py new file mode 100644 index 000000000..bdf967128 --- /dev/null +++ b/cyclops/data/df/utils.py @@ -0,0 +1,319 @@ +"""Utility functions for working with Pandas DataFrames.""" + +from functools import reduce +from typing import ( + Any, + Hashable, + List, + Optional, + Sequence, + Set, + Tuple, + Union, +) + +import pandas as pd + +from cyclops.data.df.series_validation import is_bool_series +from cyclops.utils.common import to_list + + +COLS_TYPE = Union[Hashable, Sequence[Hashable]] + + +def check_cols( + data: pd.DataFrame, + cols: COLS_TYPE, + raise_err_on_unexpected: bool = False, + raise_err_on_existing: bool = False, + raise_err_on_missing: bool = False, +) -> Tuple[Set[Hashable], Set[Hashable], Set[Hashable]]: + """Check DataFrame columns for expected columns and handle errors. + + Parameters + ---------- + data : pd.DataFrame + The input DataFrame to check columns against. + cols : hashable or list of Hashable + The column(s) to check for in the DataFrame. + raise_err_on_unexpected : bool, default False + Raise an error if unexpected columns are found. + raise_err_on_existing : bool, default False + Raise an error if any of the specified columns already exist. + raise_err_on_missing : bool, default False + Raise an error if any of the specified columns are missing. + + Returns + ------- + Tuple[Set[Hashable], Set[Hashable], Set[Hashable]] + A tuple containing sets of unexpected, existing, and missing columns. + """ + columns = set(to_list(cols)) + data_cols = set(data.columns) + + unexpected = data_cols - columns + if raise_err_on_unexpected and len(unexpected) > 0: + raise ValueError(f"Unexpected columns: {', '.join(unexpected)}") + + existing = data_cols.intersection(columns) + if raise_err_on_existing and len(existing) > 0: + raise ValueError(f"Existing columns: {', '.join(existing)}") + + missing = columns - data_cols + if raise_err_on_missing and len(missing) > 0: + raise ValueError(f"Missing columns: {', '.join(missing)}") + + return unexpected, existing, missing + + +def and_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical AND operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical AND operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x & y, conditions) + + +def or_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical OR operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical OR operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x | y, conditions) + + +def combine_nonoverlapping(datas: List[Union[pd.DataFrame, pd.Series]]) -> pd.DataFrame: + """Combine non-overlapping DataFrames/Series into a single DataFrame/Series. + + The objects in `datas` should be all DataFrames or all Series, not a combination. + + For any given value location, it can be non-null in exactly 0 or 1 of the + DataFrames. The combined DataFrame will contains all of these values. + + Parameters + ---------- + datas : list of pandas.DataFrame or pandas.Series + A list of DataFrames/Series to be combined. + + Returns + ------- + pandas.DataFrame + The combined DataFrame. + + Raises + ------ + ValueError + If unauthorized overlap is found between DataFrames. + """ + # Get masks where the DataFrames are NaN + datas_na = [data.isna() for data in datas] + + # Check that there is no unauthorized overlap + datas_not_na = [(~data_na).astype(int) for data_na in datas_na] + datas_not_na_sum = reduce(lambda x, y: x + y, datas_not_na) + if not (datas_not_na_sum <= 1).all().all(): + raise ValueError( + "Unauthorized overlap found between DataFrames. Cannot combine.", + ) + + # Combine the DataFrames + combined = datas[0].copy() + for data in datas[1:]: + combined = combined.combine_first(data) + + return combined + + +def reset_index_merge( + left: Union[pd.DataFrame, pd.Series], + right: Union[pd.DataFrame, pd.Series], + index_col: Optional[COLS_TYPE] = None, + **merge_kwargs: Any, +) -> pd.DataFrame: + """Merge two dataframes after resetting their indexes. + + Parameters + ---------- + left : pandas.DataFrame or pandas.Series + The left object to merge. + right : pandas.DataFrame or pandas.Series + The right object to merge. + index_col : hashable or sequence of hashable, optional + Column(s) to set as index for the merged result. + **merge_kwargs + Additional keyword arguments to pass to pandas merge function. + + Returns + ------- + pd.DataFrame + The merged dataframe. + """ + # Reset index for both dataframes + left_reset = left.reset_index() + right_reset = right.reset_index() + + # Merge the dataframes + merged = pd.merge(left_reset, right_reset, **merge_kwargs) + + # If index_col is provided, set it for the merged dataframe + if index_col: + merged.set_index(index_col, inplace=True) + + return merged + + +def index_structure_equal( + idx1: pd.Index, + idx2: pd.Index, + raise_err: bool = False, +) -> bool: + """Check whether two indexes have the same structure. + + Values aren't considered. + + Parameters + ---------- + idx1 : pandas.Index + The first index to compare. + idx2 : pandas.Index + The second index to compare. + raise_err : bool, default False + If True, raises an error if indexes do not have the same structure. + + Returns + ------- + bool + True if the indexes have the same structure, otherwise False. + """ + if type(idx1) != type(idx2): + if raise_err: + raise ValueError("Index dtypes do not match.") + + return False + + if idx1.names != idx2.names: + if raise_err: + raise ValueError("Index names do not match.") + + return False + + if idx1.nlevels != idx2.nlevels: + if raise_err: + raise ValueError("Number of index levels do not match.") + + return False + + return True + + +def is_multiindex( + idx: pd.Index, + raise_err: bool = False, + raise_err_multi: bool = False, +) -> bool: + """Check whether a given index is a MultiIndex. + + Parameters + ---------- + idx : pd.Index + Index to check. + raise_err : bool, default False + If True, raise a ValueError when idx is not a MultiIndex. + raise_err_multi : bool, default False + If True, raise a ValueError when idx is a MultiIndex. + + Raises + ------ + ValueError + Raised when `idx` is not a MultiIndex and `raise_err` is True. + Raised when `idx` is a MultiIndex and `raise_err_multi` is True. + + Returns + ------- + bool + True if idx is a MultiIndex, False otherwise. + """ + multiindex = isinstance(idx, pd.MultiIndex) + + if not multiindex and raise_err: + raise ValueError("Index must be a MultiIndex.") + + if multiindex and raise_err_multi: + raise ValueError("Index cannot be a MultiIndex.") + + return multiindex + + +def agg_mode(series: pd.Series) -> list[Any]: + """Get the mode(s) of a series by using `.agg(agg_mode)`. + + Parameters + ---------- + series : pd.Series + Series. + + Returns + ------- + list + List containing the mode(s) of the input series. + """ + return pd.Series.mode(series).to_list() # type: ignore[no-any-return] + + +def groupby_agg_mode( + grouped: pd.core.groupby.generic.SeriesGroupBy, + single_modes_only: bool = False, +) -> pd.Series: + """Compute the mode(s) for each group of a grouped series. + + Parameters + ---------- + grouped : pd.core.groupby.generic.SeriesGroupBy + Grouped series. + single_modes_only : bool, default False + If True, only groups with a singular mode are kept. + + Returns + ------- + pd.Series + A pandas Series containing the mode(s) for each group. + """ + result = grouped.agg(agg_mode).explode() + if single_modes_only: + duplicate_indices = result.index[result.index.duplicated(keep=False)] + result = result.drop(duplicate_indices) + return result