|
| 1 | +""" |
| 2 | +Given an exposure estimate and some damage curves, calculate the damage |
| 3 | +fraction for exposed assets. |
| 4 | +""" |
| 5 | + |
| 6 | +import logging |
| 7 | +import sys |
| 8 | +import warnings |
| 9 | + |
| 10 | +import geopandas as gpd |
| 11 | +import pandas as pd |
| 12 | +from snail.damages import PiecewiseLinearDamageCurve |
| 13 | + |
| 14 | +from open_gira import fields |
| 15 | +from open_gira.direct_damages import annotate_rehab_cost |
| 16 | +from open_gira.io import write_empty_frames, read_damage_curves, read_rehab_costs |
| 17 | + |
| 18 | + |
| 19 | +if __name__ == "__main__": |
| 20 | + |
| 21 | + try: |
| 22 | + unsplit_path: str = snakemake.input["unsplit"] |
| 23 | + exposure_path: str = snakemake.input["exposure"] |
| 24 | + rehabilitation_costs_path: str = snakemake.input["rehab_cost"] |
| 25 | + damage_curves_dir: str = snakemake.input["damage_curves"] |
| 26 | + split_ead_and_cost_per_trigger_path: str = snakemake.output[ |
| 27 | + "split_ead_and_cost_per_trigger" |
| 28 | + ] |
| 29 | + ead_and_cost_per_trigger_path: str = snakemake.output[ |
| 30 | + "ead_and_cost_per_trigger" |
| 31 | + ] |
| 32 | + network_type: str = snakemake.params["network_type"].split("-")[0] |
| 33 | + except NameError: |
| 34 | + raise ValueError("Must be run via snakemake.") |
| 35 | + |
| 36 | + OUTPUT_FILE_PATHS: tuple[str] = ( |
| 37 | + split_ead_and_cost_per_trigger_path, |
| 38 | + ead_and_cost_per_trigger_path, |
| 39 | + ) |
| 40 | + HAZARD_TYPE = "landslide" |
| 41 | + |
| 42 | + logging.basicConfig( |
| 43 | + format="%(asctime)s %(process)d %(filename)s %(message)s", level=logging.INFO |
| 44 | + ) |
| 45 | + |
| 46 | + # Ignore geopandas parquet implementation warnings |
| 47 | + # NB though that .geoparquet is not the format to use for archiving. |
| 48 | + warnings.filterwarnings("ignore", message=".*initial implementation of Parquet.*") |
| 49 | + |
| 50 | + # load curves first so if we fail here, we've failed early |
| 51 | + # and we don't try and load the (potentially large) exposure file |
| 52 | + damage_curves_all = read_damage_curves( |
| 53 | + damage_curves_dir, HAZARD_TYPE, set((network_type,)) |
| 54 | + ) |
| 55 | + damage_curve_data = damage_curves_all[network_type] |
| 56 | + assert ( |
| 57 | + "occurrence" in damage_curve_data.columns |
| 58 | + ), "Expected 'occurrence' column in landslide damage curve" |
| 59 | + |
| 60 | + # Parse damage curve data into dict of DamageCurve objects |
| 61 | + damage_curves = {} |
| 62 | + for ratio_col in [c for c in damage_curve_data.columns if c != "occurrence"]: |
| 63 | + damage_curves[f"{network_type}_{ratio_col}"] = PiecewiseLinearDamageCurve( |
| 64 | + damage_curve_data[["occurrence", ratio_col]].rename( |
| 65 | + columns={"occurrence": "intensity", ratio_col: "damage"} |
| 66 | + ) |
| 67 | + ) |
| 68 | + logging.info(f"Available damage curves: {damage_curves.keys()}") |
| 69 | + |
| 70 | + logging.info("Reading exposure (network/raster intersection) data") |
| 71 | + exposure: gpd.GeoDataFrame = gpd.read_parquet(exposure_path) |
| 72 | + logging.info(f"{exposure.shape=}") |
| 73 | + |
| 74 | + if exposure.empty: |
| 75 | + logging.info("No data in geometry column, writing empty files.") |
| 76 | + |
| 77 | + # snakemake requires that output files exist, even if empty |
| 78 | + for path in OUTPUT_FILE_PATHS: |
| 79 | + write_empty_frames(path) |
| 80 | + sys.exit(0) # exit gracefully so snakemake will continue |
| 81 | + |
| 82 | + logging.info("Annotate network with rehabilitation costs") |
| 83 | + rehab_cost = read_rehab_costs(rehabilitation_costs_path) |
| 84 | + exposure = annotate_rehab_cost(exposure, network_type, rehab_cost) |
| 85 | + |
| 86 | + # column groupings for data selection |
| 87 | + initial_hazard_columns = [ |
| 88 | + col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX) |
| 89 | + ] |
| 90 | + exposure[f"{fields.HAZARD_PREFIX}_{HAZARD_TYPE}_sum"] = exposure[ |
| 91 | + initial_hazard_columns |
| 92 | + ].sum(axis=1) |
| 93 | + |
| 94 | + hazard_columns = [ |
| 95 | + col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX) |
| 96 | + ] |
| 97 | + non_hazard_columns = list(set(exposure.columns) - set(hazard_columns)) |
| 98 | + |
| 99 | + ############################# |
| 100 | + # EXPECTED ANNUAL DAMAGE COST |
| 101 | + ############################# |
| 102 | + direct_damages = {} |
| 103 | + for hazard_probability_column in hazard_columns: |
| 104 | + # hazard maps give probability of occurrence |
| 105 | + hazard_probability = exposure[hazard_probability_column] |
| 106 | + # any non-zero probability of landslide has an "occurrence" value of 1.0 |
| 107 | + exposure_intensity = (hazard_probability > 0).astype("float") |
| 108 | + for damage_curve_key, damage_curve in damage_curves.items(): |
| 109 | + # damage curves are step functions based on 0-1 occurrence |
| 110 | + damage_fraction = damage_curve.damage_fraction(exposure_intensity) |
| 111 | + # damage cost is calculated directly from damage fraction |
| 112 | + damage_cost = damage_fraction * exposure[fields.REHAB_COST] |
| 113 | + # and so expected damage is (exposed value * damage fraction * probability of occurrence) |
| 114 | + expected_damage = damage_cost * hazard_probability |
| 115 | + direct_damages[f"{hazard_probability_column}__{damage_curve_key}_EAD"] = ( |
| 116 | + expected_damage |
| 117 | + ) |
| 118 | + direct_damages[ |
| 119 | + f"{hazard_probability_column}__{damage_curve_key}_damage_cost" |
| 120 | + ] = damage_cost |
| 121 | + |
| 122 | + direct_damages = pd.DataFrame(direct_damages) |
| 123 | + split_ead_and_cost_per_trigger = pd.concat( |
| 124 | + [exposure[non_hazard_columns], direct_damages], axis=1 |
| 125 | + ) |
| 126 | + grouped_direct_damages = ( |
| 127 | + pd.concat([exposure[["id"]], direct_damages], axis=1).groupby("id").sum() |
| 128 | + ) |
| 129 | + |
| 130 | + ######################################### |
| 131 | + # JOINING, VALIDATION AND SERIALIZATION # |
| 132 | + ######################################### |
| 133 | + |
| 134 | + logging.info("Reading raw network data for unsplit geometry") |
| 135 | + unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path) |
| 136 | + logging.info(f"{unsplit.shape=}") |
| 137 | + |
| 138 | + # lose columns like "cell_indices" or rastered length measures that are specific to _rastered_ edges |
| 139 | + non_hazard_output_columns = list(set(non_hazard_columns) & set(unsplit.columns)) |
| 140 | + unsplit_subset = unsplit[non_hazard_output_columns].set_index("id", drop=False) |
| 141 | + |
| 142 | + # rejoin direct damage cost estimates with geometry and metadata columns and write to disk |
| 143 | + # join on 'right' / grouped_direct_damages index to only keep rows we have damages for |
| 144 | + ead_and_cost_per_trigger = unsplit_subset.join( |
| 145 | + grouped_direct_damages, validate="one_to_one", how="right" |
| 146 | + ) |
| 147 | + # we may not have calculated damages for every possible asset_type |
| 148 | + assert len(ead_and_cost_per_trigger) <= len(unsplit_subset) |
| 149 | + assert "id" in ead_and_cost_per_trigger.columns |
| 150 | + |
| 151 | + logging.info( |
| 152 | + f"Writing out {split_ead_and_cost_per_trigger.shape=} " |
| 153 | + "(per unified geometry, hazard RP map and hazard map (integrated RP))" |
| 154 | + ) |
| 155 | + split_ead_and_cost_per_trigger.to_parquet(split_ead_and_cost_per_trigger_path) |
| 156 | + |
| 157 | + logging.info( |
| 158 | + f"Writing out {ead_and_cost_per_trigger.shape=} " |
| 159 | + "(per unified geometry, hazard RP map and hazard map (integrated RP))" |
| 160 | + ) |
| 161 | + ead_and_cost_per_trigger.to_parquet(ead_and_cost_per_trigger_path) |
| 162 | + |
| 163 | + logging.info("Done calculating direct damages") |
0 commit comments