From 7f9e50369895c5f8fe7b82a8f2d17b17d5f4131a Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Thu, 8 Feb 2024 14:50:54 +0000 Subject: [PATCH] Fetch rehabilitation costs during damage calculation --- src/open_gira/direct_damages.py | 80 +++++ .../event_set_direct_damages.py | 8 +- .../return_period_direct_damages.py | 5 + workflow/transport/create_rail_network.py | 77 +---- workflow/transport/create_road_network.py | 287 +----------------- 5 files changed, 108 insertions(+), 349 deletions(-) diff --git a/src/open_gira/direct_damages.py b/src/open_gira/direct_damages.py index 37378b17..25f82699 100644 --- a/src/open_gira/direct_damages.py +++ b/src/open_gira/direct_damages.py @@ -6,6 +6,10 @@ import re from typing import Union +import pandas as pd +import snkit + +from open_gira import fields from open_gira.utils import natural_sort @@ -214,3 +218,79 @@ def get_rp_map(name: str) -> ReturnPeriodMap: # return a concrete subclass of ReturnPeriodMap return prefix_class_map[prefix](name) + + +def rail_rehab_cost(row: pd.Series, rehab_costs: pd.DataFrame) -> float: + """ + Determine the cost of rehabilitation for a given rail segment (row). + + Args: + row: Railway segment + rehab_costs: Table of rehabilitation costs for various rail types + + Returns: + Minimum cost, maximum cost, units of cost + """ + + # bridge should be a boolean type after data cleaning step + if row.bridge: + asset_type = "bridge" + else: + asset_type = "rail" + + return float(rehab_costs.loc[rehab_costs["asset_type"] == asset_type, "rehab_cost_USD_per_km"]) + + +def road_rehab_cost(row: pd.Series, rehab_costs: pd.DataFrame) -> float: + """ + Fetch the cost of rehabilitation for a given road segment (row). + + Args: + row: Road segment + rehab_costs: Table of rehabilitation costs for various road types + + Returns: + Cost in units of USD per km per lane. + """ + + # bridge should be boolean after data cleaning step + if row.bridge: + highway_type = "bridge" + else: + highway_type = row.tag_highway + + if row.paved: + condition = "paved" + else: + condition = "unpaved" + + return float( + rehab_costs["rehab_cost_USD_per_km_per_lane"].loc[ + (rehab_costs.highway == highway_type) & (rehab_costs.road_cond == condition) + ].squeeze() + ) + + +def annotate_rehab_cost(network: snkit.network.Network, network_type: str, rehab_cost: pd.DataFrame) -> snkit.network.Network: + """ + Label a network with rehabilitation costs to be used in subsequent direct + damage cost estimate. + + Args: + network: Network with edges to label. + network_type: Category string, currently either road or rail. + rehab_cost: Table containing rehabilitation cost data per km. See lookup + functions for more requirements. + + Returns: + Network labelled with rehabilitation cost data. + """ + + if network_type == "road": + network.edges[fields.REHAB_COST] = network.edges.apply(road_rehab_cost, axis=1, args=(rehab_cost,)) * network.edges.lanes + elif network_type == "rail": + network.edges[fields.REHAB_COST] = network.edges.apply(rail_rehab_cost, axis=1, args=(rehab_cost,)) + else: + raise ValueError(f"No lookup function available for {network_type=}") + + return network \ No newline at end of file diff --git a/workflow/transport-flood/event_set_direct_damages.py b/workflow/transport-flood/event_set_direct_damages.py index b2f9bd7d..99186c43 100644 --- a/workflow/transport-flood/event_set_direct_damages.py +++ b/workflow/transport-flood/event_set_direct_damages.py @@ -3,7 +3,6 @@ fraction for exposed assets. """ -from collections import defaultdict import logging import sys import warnings @@ -11,9 +10,9 @@ import geopandas as gpd import pandas as pd from scipy.interpolate import interp1d -from scipy.integrate import simpson from open_gira import fields +from open_gira.direct_damages import annotate_rehab_cost from open_gira.io import write_empty_frames, read_damage_curves from open_gira.utils import natural_sort @@ -26,6 +25,7 @@ damage_fraction_path: str = snakemake.output["damage_fraction"] damage_cost_path: str = snakemake.output["damage_cost"] damage_curves_dir: str = snakemake.config["direct_damages"]["curves_dir"] + rehabilitation_costs_path = snakemake.config["transport"]["rehabilitation_costs_path"] network_type: str = snakemake.params["network_type"] hazard_type: str = snakemake.params["hazard_type"] asset_types: set[str] = set(snakemake.config["direct_damages"]["asset_types"]) @@ -60,6 +60,10 @@ write_empty_frames(path) sys.exit(0) # exit gracefully so snakemake will continue + logging.info("Annotate network with rehabilitation costs") + rehab_cost = pd.read_excel(rehabilitation_costs_path, sheet_name=network_type) + exposure = annotate_rehab_cost(exposure, network_type, rehab_cost) + # column groupings for data selection hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)] non_hazard_columns = list(set(exposure.columns) - set(hazard_columns)) diff --git a/workflow/transport-flood/return_period_direct_damages.py b/workflow/transport-flood/return_period_direct_damages.py index ca373b6a..c89ee2e9 100644 --- a/workflow/transport-flood/return_period_direct_damages.py +++ b/workflow/transport-flood/return_period_direct_damages.py @@ -29,6 +29,7 @@ expected_annual_damages_path: str = snakemake.output["expected_annual_damages"] return_period_and_ead_path: str = snakemake.output["return_period_and_ead"] damage_curves_dir: str = snakemake.config["direct_damages"]["curves_dir"] + rehabilitation_costs_path = snakemake.config["transport"]["rehabilitation_costs_path"] network_type: str = snakemake.params["network_type"] hazard_type: str = snakemake.params["hazard_type"] asset_types: set[str] = set(snakemake.config["direct_damages"]["asset_types"]) @@ -65,6 +66,10 @@ write_empty_frames(path) sys.exit(0) # exit gracefully so snakemake will continue + logging.info("Annotate network with rehabilitation costs") + rehab_cost = pd.read_excel(rehabilitation_costs_path, sheet_name=network_type) + exposure = annotate_rehab_cost(exposure, network_type, rehab_cost) + # column groupings for data selection hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)] non_hazard_columns = list(set(exposure.columns) - set(hazard_columns)) diff --git a/workflow/transport/create_rail_network.py b/workflow/transport/create_rail_network.py index cda71bc0..ce314ecd 100644 --- a/workflow/transport/create_rail_network.py +++ b/workflow/transport/create_rail_network.py @@ -9,8 +9,6 @@ import warnings import geopandas as gpd -import pandas as pd -import snkit from utils import annotate_country, get_administrative_data from open_gira.assets import RailAssets @@ -19,71 +17,14 @@ from open_gira.utils import str_to_bool -def annotate_rehabilitation_costs( - network: snkit.network.Network, rehab_costs: pd.DataFrame -) -> snkit.network.Network: - - def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: - """ - Determine the cost of rehabilitation for a given rail segment (row). - - Args: - row (pd.Series): Road segment - rehab_costs: (pd.DataFrame): Table of rehabilitation costs for various rail types - - Returns: - Tuple[float, float, str]: Minimum cost, maximum cost, units of cost - """ - - # bridge should be a boolean type after data cleaning step - if row.bridge: - asset_type = "bridge" - else: - asset_type = "rail" - - return float(rehab_costs.loc[rehab_costs["asset_type"] == asset_type, "rehab_cost_USD_per_km"]) - - # lookup costs - network.edges["rehab_cost_USD_per_km"] = network.edges.apply( - get_rehab_costs, axis=1, args=(rehab_costs,) - ) - - return network - - if __name__ == "__main__": - try: - osm_edges_path = snakemake.input["edges"] # type: ignore - osm_nodes_path = snakemake.input["nodes"] # type: ignore - administrative_data_path = snakemake.input["admin"] # type: ignore - nodes_output_path = snakemake.output["nodes"] # type: ignore - edges_output_path = snakemake.output["edges"] # type: ignore - slice_number = snakemake.params["slice_number"] # type: ignore - rehabilitation_costs_path = snakemake.config["transport"]["rehabilitation_costs_path"] # type: ignore - - except NameError: - # If "snakemake" doesn't exist then must be running from the - # command line. - ( - osm_edges_path, - osm_nodes_path, - administrative_data_path, - nodes_output_path, - edges_output_path, - slice_number, - rehabilitation_costs_path, - transport_costs_path, - flow_cost_time_factor, - ) = sys.argv[1:] - - # osm_edges_path = ../../results/geoparquet/tanzania-latest_filter-rail/slice-0_edges.geoparquet - # osm_nodes_path = ../../results/geoparquet/tanzania-latest_filter-rail/slice-0_nodes.geoparquet - # administrative_data_path = ../../results/input/admin-boundaries/gadm36_levels.gpkg - # nodes_output_path = ../../results/geoparquet/tanzania-latest_filter-rail/slice-0_nodes.geoparquet - # edges_output_path = ../../results/geoparquet/tanzania-latest_filter-rail/slice-0_edges.geoparquet - # slice_number = 0 - # rehabilitation_costs_path = ../../bundled_data/rehabilitation.xlsx + osm_edges_path = snakemake.input["edges"] # type: ignore + osm_nodes_path = snakemake.input["nodes"] # type: ignore + administrative_data_path = snakemake.input["admin"] # type: ignore + nodes_output_path = snakemake.output["nodes"] # type: ignore + edges_output_path = snakemake.output["edges"] # type: ignore + slice_number = int(snakemake.params["slice_number"]) # type: ignore slice_number = int(slice_number) osm_epsg = 4326 @@ -144,12 +85,6 @@ def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: get_administrative_data(administrative_data_path, to_epsg=osm_epsg), ) - logging.info("Annotate network with rehabilitation costs") - network = annotate_rehabilitation_costs( - network, - pd.read_excel(rehabilitation_costs_path, sheet_name="rail") - ) - logging.info("Writing network to disk") network.edges.to_parquet(edges_output_path) network.nodes.to_parquet(nodes_output_path) diff --git a/workflow/transport/create_road_network.py b/workflow/transport/create_road_network.py index b38b9160..0fcae520 100644 --- a/workflow/transport/create_road_network.py +++ b/workflow/transport/create_road_network.py @@ -12,7 +12,6 @@ import geopandas as gpd import pandas as pd import snkit -from pyproj import Geod from utils import annotate_country, cast, get_administrative_data, strip_suffix from open_gira.assets import RoadAssets @@ -80,10 +79,10 @@ def get_road_condition(row: pd.Series) -> Tuple[str, str]: fine_gravel 1 Args: - row (pd.Series): Must have surface (nullable) and highway attributes. + row: Must have surface (nullable) and highway attributes. Returns: - Tuple[str, str]: Categorical condition and surface strings + Boolean paved status and surface category string """ if not row.tag_surface: @@ -128,9 +127,7 @@ def get_road_lanes(row: pd.Series) -> int: return 1 -def annotate_condition( - network: snkit.network.Network, lane_width_m: float, shoulder_width_m: float -) -> snkit.network.Network: +def annotate_condition(network: snkit.network.Network) -> snkit.network.Network: # infer paved status and material type from 'surface' column network.edges["paved_material"] = network.edges.apply( @@ -147,260 +144,18 @@ def annotate_condition( # add number of lanes network.edges["lanes"] = network.edges.apply(lambda x: get_road_lanes(x), axis=1) - # add road width - network.edges["width_m"] = network.edges.apply( - lambda x: x.lanes * lane_width_m + 2 * shoulder_width_m, axis=1 - ) - - return network - - -def assign_road_speeds(row: pd.Series) -> Tuple[float, float]: - """ - Infer road speed limits from road type and condition. - - Args: - row (pd.Series): Road to infer speeds for, must have `highway`, - `paved`, `Highway_min`, `Highway_max`, `Urban_min`, `Urban_max`, - `Rural_min` and `Rural_max` labels. - - Returns: - Tuple[float, float]: Likely minimum and maximum speeds - """ - - if row.tag_highway in {"motorway", "trunk", "primary"}: - min_speed = float(row["Highway_min"]) - max_speed = float(row["Highway_max"]) - elif row.paved: - min_speed = float(row["Urban_min"]) - max_speed = float(row["Urban_max"]) - else: - min_speed = float(row["Rural_min"]) - max_speed = float(row["Rural_max"]) - - # if we've got data from OSM, use that instead of the per-country data - # use isnull because it works for numpy NaNs and Nones; a numpy NaN is truthy! - if not pd.isnull(row.tag_maxspeed): - max_speed = row.tag_maxspeed - - return min_speed, max_speed - - -def annotate_speeds(network: snkit.network.Network, speeds_by_country) -> snkit.network.Network: - """ - Using OSM data (network.edges.maxspeed) and speeds_by_country, assemble a - best guess of road speeds in km/h. - - Args: - network (snkit.network.Network): Network to annotate. The network.edges - must have a 'from_iso_a3' column - speeds_by_country (pd.DataFrame): Speed limit information by country - N.B. speeds_by_country is expected to have the following columns: - 'ISO_A3', 'CONTINENT', 'NAME', 'REGION', 'SUBREGION', 'Highway_min', - 'Highway_max', 'Rural_min', 'Rural_max', 'Urban_min', 'Urban_max' - - Returns: - snkit.network.Network: Modified network - """ - - network.edges = pd.merge( - network.edges, - speeds_by_country, - how="left", - left_on=["from_iso_a3"], - right_on=["ISO_A3"], - ) - - # infer a likely min and max road speed - network.edges["min_max_speed"] = network.edges.apply( - lambda x: assign_road_speeds(x), axis=1 - ) - - # assign_road_speeds returned two values, unpack these into two columns - network.edges[["min_speed_kmh", "max_speed_kmh"]] = network.edges[ - "min_max_speed" - ].apply(pd.Series) - - # drop the intermediate columns - network.edges.drop( - ["min_max_speed"] + speeds_by_country.columns.values.tolist(), - axis=1, - inplace=True, - ) - - return network - - -def annotate_tariff_flow_costs( - network: snkit.network.Network, - transport_tariffs: pd.DataFrame, - flow_cost_time_factor: float, -) -> snkit.network.Network: - """ - Add tariff flow costs to network edges. - - Args: - network (snkit.network.Network): Network to annotate. The network.edges - must have a 'from_iso_a3' column to merge on. - transport_tariffs (pd.DataFrame): Table of transport tariffs by country and - by mode of transport, with 'from_iso3', 'cost_km', 'cost_unit' and - 'cost_scaling' columns - flow_cost_time_factor (float): A fudge factor that varies (by country?) - This may well need consuming as location specific data in future. - Returns: - snkit.network.Network: Modified network - """ - - # input checking - expected_columns = { - "from_iso3", - "cost_km", - "cost_unit", - "cost_scaling", - } - if not set(transport_tariffs.columns).issuperset(expected_columns): - raise ValueError(f"{expected_columns=} for transport_tariffs") - - # rename and subset table - transport_tariffs.rename( - columns={"cost_km": "tariff_cost", "cost_unit": "tariff_unit"}, inplace=True - ) - - # merge datasets - network.edges = pd.merge( - network.edges, - transport_tariffs[["from_iso3", "tariff_cost", "tariff_unit"]], - how="left", - left_on=["from_iso_a3"], - right_on=["from_iso3"], - ) - network.edges["min_tariff"] = network.edges.apply( - lambda x: float(x.tariff_cost) - (float(x.tariff_cost) * 0.2), axis=1 - ) - network.edges["max_tariff"] = network.edges.apply( - lambda x: float(x.tariff_cost) + (float(x.tariff_cost) * 0.2), axis=1 - ) - network.edges.drop(["tariff_cost", "from_iso3"], axis=1, inplace=True) - - # calculate road segment lengths - geod = Geod(ellps="WGS84") - network.edges["length_m"] = network.edges.apply( - lambda x: float(geod.geometry_length(x.geometry)), axis=1 - ) - - # assign flow costs - metres_per_km = 1_000 - - network.edges["min_flow_cost"] = ( - flow_cost_time_factor * network.edges["length_m"] / metres_per_km - ) / network.edges["max_speed_kmh"] + ( - network.edges["min_tariff"] * network.edges["length_m"] / metres_per_km - ) - - network.edges["max_flow_cost"] = ( - flow_cost_time_factor * network.edges["length_m"] / metres_per_km - ) / network.edges["min_speed_kmh"] + ( - network.edges["max_tariff"] * network.edges["length_m"] / metres_per_km - ) - - network.edges["flow_cost_unit"] = "USD/ton" - - return network - - -def annotate_rehabilitation_costs( - network: snkit.network.Network, rehab_costs: pd.DataFrame -) -> snkit.network.Network: - - def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: - """ - Fetch the cost of rehabilitation for a given road segment (row). - - Args: - row (pd.Series): Road segment - rehab_costs: (pd.DataFrame): Table of rehabilitation costs for various road types - - Returns: - float: Cost in units of USD per km per lane. - """ - - # bridge should be boolean after data cleaning step - if row.bridge: - highway_type = "bridge" - else: - highway_type = row.tag_highway - - if row.paved: - condition = "paved" - else: - condition = "unpaved" - - return float( - rehab_costs["rehab_cost_USD_per_km_per_lane"].loc[ - (rehab_costs.highway == highway_type) & (rehab_costs.road_cond == condition) - ].squeeze() - ) - - # lookup costs - network.edges["rehab_cost_USD_per_km"] = network.edges.apply( - get_rehab_costs, axis=1, args=(rehab_costs,) - ) * network.edges.lanes - return network if __name__ == "__main__": - try: - osm_edges_path = snakemake.input["edges"] # type: ignore - osm_nodes_path = snakemake.input["nodes"] # type: ignore - administrative_data_path = snakemake.input["admin"] # type: ignore - nodes_output_path = snakemake.output["nodes"] # type: ignore - edges_output_path = snakemake.output["edges"] # type: ignore - slice_number = snakemake.params["slice_number"] # type: ignore - road_speeds_path = snakemake.config["transport"]["speeds_path"] # type: ignore - rehabilitation_costs_path = snakemake.config["transport"]["rehabilitation_costs_path"] # type: ignore - transport_costs_path = snakemake.config["transport"]["tariff_costs_path"] # type: ignore - default_shoulder_width_metres = snakemake.config["transport"]["road"]["default_shoulder_width_metres"] # type: ignore - default_lane_width_metres = snakemake.config["transport"]["road"]["default_lane_width_metres"] # type: ignore - flow_cost_time_factor = snakemake.config["transport"]["road"]["flow_cost_time_factor"] # type: ignore - - except NameError: - # If "snakemake" doesn't exist then must be running from the - # command line. - ( - osm_edges_path, - osm_nodes_path, - administrative_data_path, - nodes_output_path, - edges_output_path, - slice_number, - road_speeds_path, - rehabilitation_costs_path, - transport_costs_path, - default_shoulder_width_metres, - default_lane_width_metres, - flow_cost_time_factor, - ) = sys.argv[1:] - - # osm_edges_path = ../../results/geoparquet/tanzania-latest_filter-road/slice-0.geoparquet - # osm_nodes_path = ../../results/geoparquet/tanzania-latest_filter-road/slice-0.geoparquet - # administrative_data_path = ../../results/input/admin-boundaries/gadm36_levels.gpkg - # nodes_output_path = ../../results/geoparquet/tanzania-latest_filter-road/slice-0_road_nodes.geoparquet - # edges_output_path = ../../results/geoparquet/tanzania-latest_filter-road/slice-0_road_edges.geoparquet - # slice_number = 0 - # road_speeds_path = ../../bundled_data/global_road_speeds.xlsx - # rehabilitation_costs_path = ../../bundled_data/rehabilitation_costs.xlsx - # transport_costs_path = ../../bundled_data/transport_costs.csv - # default_shoulder_width_metres = 1.5 - # default_lane_width_metres = 3.25 - # flow_cost_time_factor = 0.49 - - # cast script arguments to relevant types where necessary - default_shoulder_width_metres = float(default_shoulder_width_metres) - default_lane_width_metres = float(default_lane_width_metres) - flow_cost_time_factor = float(flow_cost_time_factor) - slice_number = int(slice_number) + osm_edges_path = snakemake.input["edges"] # type: ignore + osm_nodes_path = snakemake.input["nodes"] # type: ignore + administrative_data_path = snakemake.input["admin"] # type: ignore + nodes_output_path = snakemake.output["nodes"] # type: ignore + edges_output_path = snakemake.output["edges"] # type: ignore + slice_number = int(snakemake.params["slice_number"]) # type: ignore + osm_epsg = 4326 logging.basicConfig(format="%(asctime)s %(process)d %(filename)s %(message)s", level=logging.INFO) @@ -440,9 +195,7 @@ def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: ) logging.info("Annotating network with road type and condition data") - network = annotate_condition( - network, default_lane_width_metres, default_shoulder_width_metres - ) + network = annotate_condition(network) # select and label assets with their type # the asset_type is used to later select a damage curve @@ -450,24 +203,6 @@ def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: network.edges.loc[network.edges.paved == True, 'asset_type'] = RoadAssets.PAVED network.edges.loc[network.edges.bridge == True, 'asset_type'] = RoadAssets.BRIDGE - logging.info("Annotating network with road speed data") - network = annotate_speeds( - network, pd.read_excel(road_speeds_path, sheet_name="road") - ) - - logging.info("Annotating network with tariff and flow costs") - network = annotate_tariff_flow_costs( - network, - pd.read_excel(transport_costs_path, sheet_name="road"), - flow_cost_time_factor, - ) - - logging.info("Annotate network with rehabilitation costs") - network = annotate_rehabilitation_costs( - network, - pd.read_excel(rehabilitation_costs_path, sheet_name="road") - ) - logging.info("Writing network to disk") network.edges.to_parquet(edges_output_path) network.nodes.to_parquet(nodes_output_path)