diff --git a/aurora/pipelines/run_summary.py b/aurora/pipelines/run_summary.py new file mode 100644 index 00000000..fc833c25 --- /dev/null +++ b/aurora/pipelines/run_summary.py @@ -0,0 +1,365 @@ +""" + +Note 1: Functionality of RunSummary() +1. User can get a list of local_station options, which correspond to unique pairs +of values: (survey_id, station_id) + +2. User can see all possible ways of processing the data: +- one list per (survey_id, station_id) pair in the run_summary + +Some of the following functionalities may end up in KernelDataset: +3. User can select local_station +-this can trigger a reduction of runs to only those that are from the local staion +and simultaneous runs at other stations +4. Given a local station, a list of possible reference stations can be generated +5. Given a remote reference station, a list of all relevent runs, truncated to +maximize coverage of the local station runs is generated +6. Given such a "restricted run list", runs can be dropped +7. Time interval endpoints can be changed + + +""" + +import copy +import pandas as pd + + +from mt_metadata.transfer_functions import ( + ALLOWED_INPUT_CHANNELS, +) +from mt_metadata.transfer_functions import ( + ALLOWED_OUTPUT_CHANNELS, +) +import mth5 +from mth5.utils.helpers import initialize_mth5 +from loguru import logger + + +RUN_SUMMARY_COLUMNS = [ + "survey", + "station_id", + "run_id", + "start", + "end", + "sample_rate", + "input_channels", + "output_channels", + "remote", + "mth5_path", +] + + +class RunSummary: + """ + The dependencies aren't clear yet. + Maybe still Dataset: + Could have methods + "drop_runs_shorter_than" + "fill_gaps_by_time_interval" + "fill_gaps_by_run_names" + " + + For the full MMT case this may need modification to a channel based summary. + + Question: To return a copy or modify in-place when querying. Need to decide on + standards and syntax. Handling this in general could use a decorator that allows + df as kwarg, and if it is not passed the modification is done in place. + The user who doesn't want to modify in place can work with a clone. + Could also try the @staticmethod decorator so that it returns a modified df. + + """ + + def __init__(self, **kwargs): + self.column_dtypes = [str, str, pd.Timestamp, pd.Timestamp] + self._input_dict = kwargs.get("input_dict", None) + self.df = kwargs.get("df", None) + self._mini_summary_columns = [ + "survey", + "station_id", + "run_id", + "start", + "end", + ] + + def clone(self): + """ + 2022-10-20: + Cloning may be causing issues with extra instances of open h5 files ... + + """ + return copy.deepcopy(self) + + def from_mth5s(self, mth5_list): + run_summary_df = extract_run_summaries_from_mth5s(mth5_list) + self.df = run_summary_df + + @property + def mini_summary(self): + return self.df[self._mini_summary_columns] + + @property + def print_mini_summary(self): + logger.info(self.mini_summary) + + def add_duration(self, df=None): + """ + + Parameters + ---------- + df + + """ + if df is None: + df = self.df + timedeltas = df.end - df.start + durations = [x.total_seconds() for x in timedeltas] + df["duration"] = durations + return + + def check_runs_are_valid(self, drop=False, **kwargs): + """kwargs can tell us what sorts of conditions to check, for example all_zero, there are nan, etc.""" + # check_for_all_zero_runs + for i_row, row in self.df.iterrows(): + logger.info(f"Checking row for zeros {row}") + m = mth5.mth5.MTH5() + m.open_mth5(row.mth5_path) + run_obj = m.get_run(row.station_id, row.run_id, row.survey) + runts = run_obj.to_runts() + if runts.dataset.to_array().data.__abs__().sum() == 0: + logger.critical( + "CRITICAL: Detected a run with all zero values" + ) + self.df["valid"].at[i_row] = False + # load each run, and take the median of the sum of the absolute values + if drop: + self.drop_invalid_rows() + return + + def drop_invalid_rows(self): + self.df = self.df[self.df.valid] + self.df.reset_index(drop=True, inplace=True) + + # BELOW FUNCTION CAN BE COPIED FROM METHOD IN KernelDataset() + # def drop_runs_shorter_than(self, duration, units="s"): + # if units != "s": + # raise NotImplementedError + # if "duration" not in self.df.columns: + # self.add_duration() + # drop_cond = self.df.duration < duration + # # df = self.df[drop_cond] + # self.df.drop(self.df[drop_cond].index, inplace=True) + # df = df.reset_index() + # + # self.df = df + # return df + + +def channel_summary_to_run_summary( + ch_summary, + allowed_input_channels=ALLOWED_INPUT_CHANNELS, + allowed_output_channels=ALLOWED_OUTPUT_CHANNELS, + sortby=["station_id", "start"], +): + """ + TODO: replace station_id with station, and run_id with run + Note will need to modify: aurora/tests/config$ more test_dataset_dataframe.py + TODO: Add logic for handling input and output channels based on channel + summary. Specifically, consider the case where there is no vertical magnetic + field, this information is available via ch_summary, and output channels should + then not include hz. + TODO: Just inherit all the run-level and higher el'ts of the channel_summary, + including n_samples? + + When creating the dataset dataframe, make it have these columns: + [ + "station_id", + "run_id", + "start", + "end", + "mth5_path", + "sample_rate", + "input_channels", + "output_channels", + "remote", + "channel_scale_factors", + ] + + Parameters + ---------- + ch_summary: mth5.tables.channel_table.ChannelSummaryTable or pandas DataFrame + If its a dataframe it is a representation of an mth5 channel_summary. + Maybe restricted to only have certain stations and runs before being passed to + this method + allowed_input_channels: list of strings + Normally ["hx", "hy", ] + These are the allowable input channel names for the processing. See further + note under allowed_output_channels. + allowed_output_channels: list of strings + Normally ["ex", "ey", "hz", ] + These are the allowable output channel names for the processing. + A global list of these is kept at the top of this module. The purpose of + this is to distinguish between runs that have different layouts, for example + some runs will have hz and some will not, and we cannot process for hz the + runs that do not have it. By making this a kwarg we sort of prop the door + open for more general names (see issue #74). + sortby: bool or list + Default: ["station_id", "start"] + + Returns + ------- + run_summary_df: pd.Dataframe + A table with one row per "acquistion run" that was in the input channel + summary table + """ + if isinstance(ch_summary, mth5.tables.channel_table.ChannelSummaryTable): + ch_summary_df = ch_summary.to_dataframe() + elif isinstance(ch_summary, pd.DataFrame): + ch_summary_df = ch_summary + group_by_columns = ["survey", "station", "run"] + grouper = ch_summary_df.groupby(group_by_columns) + n_station_runs = len(grouper) + survey_ids = n_station_runs * [None] + station_ids = n_station_runs * [None] + run_ids = n_station_runs * [None] + start_times = n_station_runs * [None] + end_times = n_station_runs * [None] + sample_rates = n_station_runs * [None] + input_channels = n_station_runs * [None] + output_channels = n_station_runs * [None] + channel_scale_factors = n_station_runs * [None] + i = 0 + for group_values, group in grouper: + group_info = dict( + zip(group_by_columns, group_values) + ) # handy for debug + # for k, v in group_info.items(): + # print(f"{k} = {v}") + survey_ids[i] = group_info["survey"] + station_ids[i] = group_info["station"] + run_ids[i] = group_info["run"] + start_times[i] = group.start.iloc[0] + end_times[i] = group.end.iloc[0] + sample_rates[i] = group.sample_rate.iloc[0] + channels_list = group.component.to_list() + num_channels = len(channels_list) + input_channels[i] = [ + x for x in channels_list if x in allowed_input_channels + ] + output_channels[i] = [ + x for x in channels_list if x in allowed_output_channels + ] + channel_scale_factors[i] = dict( + zip(channels_list, num_channels * [1.0]) + ) + i += 1 + + data_dict = {} + data_dict["survey"] = survey_ids + data_dict["station_id"] = station_ids + data_dict["run_id"] = run_ids + data_dict["start"] = start_times + data_dict["end"] = end_times + data_dict["sample_rate"] = sample_rates + data_dict["input_channels"] = input_channels + data_dict["output_channels"] = output_channels + data_dict["channel_scale_factors"] = channel_scale_factors + data_dict["valid"] = True + + run_summary_df = pd.DataFrame(data=data_dict) + if sortby: + run_summary_df.sort_values(by=sortby, inplace=True) + return run_summary_df + + +def extract_run_summary_from_mth5(mth5_obj, summary_type="run"): + """ + + Parameters + ---------- + mth5_obj: mth5.mth5.MTH5 + The initialized mth5 object that will be interrogated + summary_type: str + One of ["run", "channel"]. Returns a run summary or a channel summary + + Returns + ------- + out_df: pd.Dataframe + Table summarizing the available runs in the input mth5_obj + """ + channel_summary_df = mth5_obj.channel_summary.to_dataframe() + # check that the mth5 has been summarized already + if len(channel_summary_df) < 2: + logger.info( + "Channel summary maybe not initialized yet, 3 or more channels expected." + ) + mth5_obj.channel_summary.summarize() + channel_summary_df = mth5_obj.channel_summary.to_dataframe() + if summary_type == "run": + out_df = channel_summary_to_run_summary(channel_summary_df) + else: + out_df = channel_summary_df + out_df["mth5_path"] = str(mth5_obj.filename) + return out_df + + +def extract_run_summaries_from_mth5s( + mth5_list, summary_type="run", deduplicate=True +): + """ + ToDo: Move this method into mth5? or mth5_helpers? + ToDo: Make this a class so that the __repr__ is a nice visual representation of the + df, like what channel summary does in mth5 + + 2022-05-28 Modified to allow this method to accept mth5 objects as well as the + already supported types of pathlib.Path or str + + Given a list of mth5s, this returns a dataframe of all available runs + + In order to drop duplicates I used the solution here: + https://stackoverflow.com/questions/43855462/pandas-drop-duplicates-method-not-working-on-dataframe-containing-lists + + Parameters + ---------- + mth5_paths: list + paths or strings that point to mth5s + summary_type: string + one of ["channel", "run"] + "channel" returns concatenated channel summary, + "run" returns concatenated run summary, + deduplicate: bool + Default is True, deduplicates the summary_df + + Returns + ------- + super_summary: pd.DataFrame + + """ + dfs = len(mth5_list) * [None] + + for i, mth5_elt in enumerate(mth5_list): + if isinstance(mth5_elt, mth5.mth5.MTH5): + mth5_obj = mth5_elt + else: # mth5_elt is a path or a string + mth5_obj = initialize_mth5(mth5_elt, mode="a") + + df = extract_run_summary_from_mth5(mth5_obj, summary_type=summary_type) + + # close it back up if you opened it + if not isinstance(mth5_elt, mth5.mth5.MTH5): + mth5_obj.close_mth5() + dfs[i] = df + + # merge all summaries into a super_summary + super_summary = pd.concat(dfs) + super_summary.reset_index(drop=True, inplace=True) + + # drop rows that correspond to TFs: + run_rows = super_summary.sample_rate != 0 + super_summary = super_summary[run_rows] + super_summary.reset_index(drop=True, inplace=True) + + if deduplicate: + keep_indices = super_summary.astype(str).drop_duplicates().index + super_summary = super_summary.loc[keep_indices] + super_summary.reset_index(drop=True, inplace=True) + return super_summary diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index 77d070d5..f4254dcb 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -127,17 +127,15 @@ def update_dataset_df(self, i_dec_level): if not self.is_valid_dataset(row, i_dec_level): continue if row.fc: - row_ssr_str = ( - f"survey: {row.survey}, station: {row.station}, run: {row.run}" - ) + row_ssr_str = f"survey: {row.survey}, station: {row.station}, run: {row.run}" msg = f"FC already exists for {row_ssr_str} -- skipping decimation" logger.info(msg) continue run_xrds = row["run_dataarray"].to_dataset("channel") decimation = self.config.decimations[i_dec_level].decimation decimated_xrds = prototype_decimate(decimation, run_xrds) - self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array( - "channel" + self.dataset_df["run_dataarray"].at[i] = ( + decimated_xrds.to_array("channel") ) # See Note 1 above logger.info( @@ -159,14 +157,20 @@ def apply_clock_zero(self, dec_level_config): The modified DecimationLevel with clock-zero information set. """ if dec_level_config.window.clock_zero_type == "data start": - dec_level_config.window.clock_zero = str(self.dataset_df.start.min()) + dec_level_config.window.clock_zero = str( + self.dataset_df.start.min() + ) return dec_level_config @property def all_fcs_already_exist(self) -> bool: """Return true of all FCs needed to process data already exist in the mth5s""" if self.kernel_dataset.df["fc"].isna().any(): - self.check_if_fcs_already_exist() + has_fcs = self.check_if_fcs_already_exist() + if not has_fcs: + self.kernel_dataset.df["fc"] = self.kernel_dataset.df[ + "fc" + ].fillna(False) # these should all be booleans now assert not self.kernel_dataset.df["fc"].isna().any() @@ -241,13 +245,16 @@ def check_if_fcs_already_exist(self): if self.dataset_df["fc"].all(): msg = "All fc_levels already exist" msg += "Skip time series processing is OK" + logger.info(msg) + return True else: msg = f"Some, but not all fc_levels already exist = {self.dataset_df['fc']}" + logger.info(msg) + return False else: msg = "FC levels not present" - logger.info(msg) - - return + logger.info(msg) + return False def show_processing_summary( self, @@ -296,11 +303,15 @@ def make_processing_summary(self): decimation_info = self.config.decimation_info() for i_dec, dec_factor in decimation_info.items(): tmp[i_dec] = dec_factor - tmp = tmp.melt(id_vars=id_vars, value_name="dec_factor", var_name="dec_level") + tmp = tmp.melt( + id_vars=id_vars, value_name="dec_factor", var_name="dec_level" + ) sortby = ["survey", "station", "run", "start", "dec_level"] tmp.sort_values(by=sortby, inplace=True) tmp.reset_index(drop=True, inplace=True) - tmp.drop("sample_rate", axis=1, inplace=True) # not valid for decimated data + tmp.drop( + "sample_rate", axis=1, inplace=True + ) # not valid for decimated data # Add window info group_by = [ @@ -317,7 +328,9 @@ def make_processing_summary(self): cond = (df.dec_level.diff()[1:] == 1).all() assert cond # dec levels increment by 1 except AssertionError: - msg = f"Skipping {group} because decimation levels are messy." + msg = ( + f"Skipping {group} because decimation levels are messy." + ) logger.info(msg) continue assert df.dec_factor.iloc[0] == 1 @@ -382,7 +395,8 @@ def validate_decimation_scheme_and_dataset_compatability( for x in self.processing_config.decimations } min_stft_window_list = [ - min_stft_window_info[x] for x in self.processing_summary.dec_level + min_stft_window_info[x] + for x in self.processing_summary.dec_level ] min_num_stft_windows = pd.Series(min_stft_window_list) @@ -408,7 +422,9 @@ def validate_processing(self): self.config.drop_reference_channels() for decimation in self.config.decimations: if decimation.estimator.engine == "RME_RR": - logger.info("No RR station specified, switching RME_RR to RME") + logger.info( + "No RR station specified, switching RME_RR to RME" + ) decimation.estimator.engine = "RME" # Make sure that a local station is defined @@ -440,7 +456,9 @@ def valid_decimations(self): valid_levels = tmp.dec_level.unique() dec_levels = [x for x in self.config.decimations] - dec_levels = [x for x in dec_levels if x.decimation.level in valid_levels] + dec_levels = [ + x for x in dec_levels if x.decimation.level in valid_levels + ] msg = f"After validation there are {len(dec_levels)} valid decimation levels" logger.info(msg) return dec_levels @@ -458,7 +476,9 @@ def validate_save_fc_settings(self): # if dec_level_config.save_fcs: dec_level_config.save_fcs = False if self.config.stations.remote: - save_any_fcs = np.array([x.save_fcs for x in self.config.decimations]).any() + save_any_fcs = np.array( + [x.save_fcs for x in self.config.decimations] + ).any() if save_any_fcs: msg = "\n Saving FCs for remote reference processing is not supported" msg = f"{msg} \n - To save FCs, process as single station, then you can use the FCs for RR processing" @@ -574,13 +594,21 @@ def make_decimation_dict_for_tf(tf_collection, processing_config): decimation_dict = {} - for i_dec, dec_level_cfg in enumerate(processing_config.decimations): + for i_dec, dec_level_cfg in enumerate( + processing_config.decimations + ): for i_band, band in enumerate(dec_level_cfg.bands): period_key = f"{band.center_period:{PERIOD_FORMAT}}" period_value = {} - period_value["level"] = i_dec + 1 # +1 to match EMTF standard - period_value["bands"] = tuple(band.harmonic_indices[np.r_[0, -1]]) - period_value["sample_rate"] = dec_level_cfg.sample_rate_decimation + period_value["level"] = ( + i_dec + 1 + ) # +1 to match EMTF standard + period_value["bands"] = tuple( + band.harmonic_indices[np.r_[0, -1]] + ) + period_value["sample_rate"] = ( + dec_level_cfg.sample_rate_decimation + ) try: period_value["npts"] = tf_collection.tf_dict[ i_dec @@ -624,7 +652,9 @@ def make_decimation_dict_for_tf(tf_collection, processing_config): # Set key as first el't of dict, nor currently supporting mixed surveys in TF tf_cls.survey_metadata = self.dataset.local_survey_metadata - tf_cls.station_metadata.transfer_function.processing_type = self.processing_type + tf_cls.station_metadata.transfer_function.processing_type = ( + self.processing_type + ) # tf_cls.station_metadata.transfer_function.processing_config = ( # self.processing_config # ) @@ -655,7 +685,9 @@ def memory_check(self) -> None: num_samples = self.dataset_df.duration * self.dataset_df.sample_rate total_samples = num_samples.sum() total_bytes = total_samples * bytes_per_sample - logger.info(f"Total Bytes of Raw Data: {total_bytes / (1024 ** 3):.3f} GB") + logger.info( + f"Total Bytes of Raw Data: {total_bytes / (1024 ** 3):.3f} GB" + ) ram_fraction = 1.0 * total_bytes / total_memory logger.info(f"Raw Data will use: {100 * ram_fraction:.3f} % of memory") @@ -676,7 +708,9 @@ def memory_check(self) -> None: @path_or_mth5_object -def mth5_has_fcs(m, survey_id, station_id, run_id, remote, processing_config, **kwargs): +def mth5_has_fcs( + m, survey_id, station_id, run_id, remote, processing_config, **kwargs +): """ Checks if all needed fc-levels for survey-station-run are present under processing_config