diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 28ceb435..328ebac6 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -113,6 +113,8 @@ def _map_binned_logs_to_dandiset( all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict() all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict() + all_reduced_s3_logs_per_blob_id_aggregated_by_ip = dict() + blob_id_to_asset_path = dict() total_bytes_across_versions_by_blob_id = dict() dandiset_versions = list(dandiset.get_versions()) @@ -180,10 +182,10 @@ def _map_binned_logs_to_dandiset( for ip_address in reduced_s3_log_binned_by_blob_id["ip_address"] ] - reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.reindex( - columns=("timestamp", "bytes_sent", "region") + reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.sort_values( + by="timestamp", + key=natsort.natsort_keygen(), ) - reordered_reduced_s3_log.sort_values(by="timestamp", key=natsort.natsort_keygen(), inplace=True) reordered_reduced_s3_log.index = range(len(reordered_reduced_s3_log)) dandiset_version_log_folder_path.mkdir(parents=True, exist_ok=True) @@ -205,6 +207,11 @@ def _map_binned_logs_to_dandiset( all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region) all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region + aggregated_activity_by_ip = _aggregate_activity_by_ip_per_asset( + reduced_s3_logs_per_asset=reordered_reduced_s3_log + ) + all_reduced_s3_logs_per_blob_id_aggregated_by_ip[blob_id] = aggregated_activity_by_ip + total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"]) total_bytes_per_asset_path[asset.path] = total_bytes @@ -256,6 +263,12 @@ def _map_binned_logs_to_dandiset( total_bytes_per_asset_path=total_bytes_across_versions_by_asset, file_path=dandiset_summary_by_asset_file_path ) + dandiset_summary_by_ip_file_path = dandiset_log_folder_path / "dandiset_summary_by_ip.tsv" + _write_aggregated_activity_by_ip( + reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_ip.values(), + file_path=dandiset_summary_by_ip_file_path, + ) + return None @@ -295,6 +308,31 @@ def _aggregate_activity_by_asset(total_bytes_per_asset_path: dict[str, int]) -> return aggregated_activity_by_asset +def _aggregate_activity_by_ip_per_asset(reduced_s3_logs_per_asset: pandas.DataFrame) -> pandas.DataFrame: + reduced_s3_logs_clipped = reduced_s3_logs_per_asset.reindex(columns=("date", "ip_address")) + pre_aggregated = reduced_s3_logs_clipped.groupby(by="date", as_index=False)["ip_address"].agg([list, "nunique"]) + pre_aggregated.rename(columns={"nunique": "num_unique_access"}, inplace=True) + pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True) + aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access")) + + return aggregated_activity_by_ip + + +def _aggregate_activity_by_ip(reduced_s3_logs_per_day: Iterable[pandas.DataFrame]) -> pandas.DataFrame: + all_reduced_s3_logs = pandas.concat(objs=reduced_s3_logs_per_day, ignore_index=True) + all_reduced_s3_logs_clipped = all_reduced_s3_logs.reindex(columns=("date", "num_unique_access")) + + pre_aggregated = all_reduced_s3_logs_clipped.groupby(by="date", as_index=False)["num_unique_access"].agg( + [list, "sum"] + ) + pre_aggregated.rename(columns={"sum": "num_unique_access"}, inplace=True) + pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True) + + aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access")) + + return aggregated_activity_by_ip + + def _write_aggregated_activity_by_day( reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path ) -> None: @@ -304,6 +342,15 @@ def _write_aggregated_activity_by_day( return None +def _write_aggregated_activity_by_ip( + reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path +) -> None: + aggregated_activity_by_ip = _aggregate_activity_by_ip(reduced_s3_logs_per_day=reduced_s3_logs_per_day) + aggregated_activity_by_ip.to_csv(path_or_buf=file_path, mode="w", sep="\t", header=True, index=False) + + return None + + def _write_aggregated_activity_by_region( reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path ) -> None: