From 1a7e94f0e5eeaffc03f81a7f4fef22029afd2972 Mon Sep 17 00:00:00 2001 From: YUE LI Date: Thu, 17 Oct 2024 14:22:45 +0100 Subject: [PATCH] road disruption and damage analysis --- scripts/network_flow_model_disruption.py | 113 +++++++++ scripts/network_flow_model_distributed.py | 133 ++++++++++ scripts/road_disruption_damage.py | 287 ++++++++++++++++++++++ scripts/road_disruption_maxSpeed.py | 96 ++++++++ 4 files changed, 629 insertions(+) create mode 100644 scripts/network_flow_model_disruption.py create mode 100644 scripts/network_flow_model_distributed.py create mode 100644 scripts/road_disruption_damage.py create mode 100644 scripts/road_disruption_maxSpeed.py diff --git a/scripts/network_flow_model_disruption.py b/scripts/network_flow_model_disruption.py new file mode 100644 index 0000000..8edb6a9 --- /dev/null +++ b/scripts/network_flow_model_disruption.py @@ -0,0 +1,113 @@ +import os + +from pathlib import Path +import time +import pandas as pd +import geopandas as gpd # type: ignore + +from nird.utils import load_config +import nird.road as func + + +import json +import warnings + +warnings.simplefilter("ignore") + +base_path = Path(load_config()["paths"]["base_path"]) +disruption_path = base_path / "disruption_analysis" + + +def network_flow_model(max_flow_speed_dict, event_key): + start_time = time.time() + + # model parameters + with open(base_path / "parameters" / "flow_breakpoint_dict.json", "r") as f: + flow_breakpoint_dict = json.load(f) + with open(base_path / "parameters" / "flow_cap_dict.json", "r") as f: + flow_capacity_dict = json.load(f) + with open(base_path / "parameters" / "free_flow_speed_dict.json", "r") as f: + free_flow_speed_dict = json.load(f) + with open(base_path / "parameters" / "min_speed_cap.json", "r") as f: + min_speed_dict = json.load(f) + with open(base_path / "parameters" / "urban_speed_cap.json", "r") as f: + urban_speed_dict = json.load(f) + + # road networks (urban_filter: mannual correction) + road_node_file = gpd.read_parquet( + base_path / "networks" / "road" / "road_node_file.geoparquet" + ) + road_link_file = gpd.read_parquet( + base_path / "networks" / "road" / "road_link_file.geoparquet" + ) + + # O-D matrix (2021) + od_node_2021 = pd.read_csv( + base_path / "census_datasets" / "od_matrix" / "od_gb_oa_2021_node.csv" + ) + od_node_2021 = od_node_2021[od_node_2021.Car21 > 1].reset_index(drop=True) + od_node_2021["Car21"] = od_node_2021["Car21"] * 2 + od_node_2021 = od_node_2021.head(10) #!!! for test + print(f"total flows: {od_node_2021.Car21.sum()}") + + # generate OD pairs + list_of_origin_nodes, dict_of_destination_nodes, dict_of_origin_supplies = ( + func.extract_od_pairs(od_node_2021) + ) + + # flow simulation + ( + road_link_file, # edge flow results + od_node_2021, # edge flow validation & component costs + isolated_od_dict, # isolated trips + ) = func.network_flow_model( + road_link_file, # road + road_node_file, # road + list_of_origin_nodes, # od + dict_of_origin_supplies, # od + dict_of_destination_nodes, # od + free_flow_speed_dict, # net + flow_breakpoint_dict, # net + flow_capacity_dict, # net + min_speed_dict, # net + urban_speed_dict, # net + od_node_2021, # od_flow_matrix + max_flow_speed_dict, # disruption analysis + ) + + # export files + road_link_file.to_parquet( + base_path.parent / "outputs" / f"gb_edge_flows_{event_key}.geoparquet" + ) + od_node_2021.to_csv( + base_path.parent / "outputs" / f"od_costs_{event_key}.csv", index=False + ) + isolated_od_df = pd.Series(isolated_od_dict).reset_index() + if isolated_od_df.shape[0] != 0: # in case of empty df + isolated_od_df.columns = ["origin_node", "destination_node", "isolated_flows"] + isolated_od_df.to_csv( + base_path.parent / "outputs" / f"isolated_od_flows_{event_key}.csv", + index=False, + ) + print(f"The network flow model is completed for {event_key}!") + print(f"The total simulation time: {time.time() - start_time}. ") + + +def main(): + flood_event_paths = [] + for root, _, files in os.walk(disruption_path): + for file in files: + file_path = os.path.join(root, file) + flood_event_paths.append(file_path) + for event_path in flood_event_paths: + event_key = event_path.split("\\")[-1].split(".")[0] + flooded_road_max_speed = pd.read_csv(event_path) + # maximum vehicle speeds (mph) -> flooded roads (id/e_id) + max_flow_speed_dict = flooded_road_max_speed.set_index("id")[ + "max_speed_mph_adjusted" + ].to_dict() + network_flow_model(max_flow_speed_dict, event_key) + + +if __name__ == "__main__": + main() diff --git a/scripts/network_flow_model_distributed.py b/scripts/network_flow_model_distributed.py new file mode 100644 index 0000000..eaa2b3b --- /dev/null +++ b/scripts/network_flow_model_distributed.py @@ -0,0 +1,133 @@ +# %% +import os +import sys +import math +import itertools + +from pathlib import Path +import time +import pandas as pd +import geopandas as gpd # type: ignore + +from nird.utils import load_config +import nird.road as func + + +import json +import warnings + +warnings.simplefilter("ignore") + +base_path = Path(load_config()["paths"]["base_path"]) +disruption_path = Path(load_config()["paths"]["disruption_path"]) +# %% + + +def network_flow_model(max_flow_speed_dict, event_key): + start_time = time.time() + + # model parameters + with open(base_path / "parameters" / "flow_breakpoint_dict.json", "r") as f: + flow_breakpoint_dict = json.load(f) + with open(base_path / "parameters" / "flow_cap_dict.json", "r") as f: + flow_capacity_dict = json.load(f) + with open(base_path / "parameters" / "free_flow_speed_dict.json", "r") as f: + free_flow_speed_dict = json.load(f) + with open(base_path / "parameters" / "min_speed_cap.json", "r") as f: + min_speed_dict = json.load(f) + with open(base_path / "parameters" / "urban_speed_cap.json", "r") as f: + urban_speed_dict = json.load(f) + + # road networks (urban_filter: mannual correction) + road_node_file = gpd.read_parquet( + base_path / "networks" / "road" / "road_node_file.geoparquet" + ) + road_link_file = gpd.read_parquet( + base_path / "networks" / "road" / "road_link_file.geoparquet" + ) + + # O-D matrix (2021) + od_node_2021 = pd.read_csv( + base_path / "census_datasets" / "od_matrix" / "od_gb_oa_2021_node.csv" + ) + # od_node_2021 = od_node_2021[od_node_2021.Car21 > 1].reset_index(drop=True) + od_node_2021["Car21"] = od_node_2021["Car21"] * 2 + # od_node_2021 = od_node_2021.head(10) #!!! for test + print(f"total flows: {od_node_2021.Car21.sum()}") + + # generate OD pairs + list_of_origin_nodes, dict_of_destination_nodes, dict_of_origin_supplies = ( + func.extract_od_pairs(od_node_2021) + ) + + # flow simulation + ( + road_link_file, # edge flow results + od_node_2021, # edge flow validation & component costs + isolated_od_dict, # isolated trips + ) = func.network_flow_model( + road_link_file, # road + road_node_file, # road + list_of_origin_nodes, # od + dict_of_origin_supplies, # od + dict_of_destination_nodes, # od + free_flow_speed_dict, # net + flow_breakpoint_dict, # net + flow_capacity_dict, # net + min_speed_dict, # net + urban_speed_dict, # net + od_node_2021, # od_flow_matrix + max_flow_speed_dict, # disruption analysis + ) + + # export files + road_link_file.to_parquet( + base_path.parent / "outputs" / f"gb_edge_flows_{event_key}.geoparquet" + ) + od_node_2021.to_csv( + base_path.parent / "outputs" / f"od_costs_{event_key}.csv", index=False + ) + isolated_od_df = pd.Series(isolated_od_dict).reset_index() + if isolated_od_df.shape[0] != 0: # in case of empty df + isolated_od_df.columns = ["origin_node", "destination_node", "isolated_flows"] + isolated_od_df.to_csv( + base_path.parent / "outputs" / f"isolated_od_flows_{event_key}.csv", + index=False, + ) + print(f"The network flow model is completed for {event_key}!") + print(f"The total simulation time: {time.time() - start_time}. ") + + +def main(task_id: int, task_count: int): + flood_event_paths = [] + for root, _, files in os.walk(disruption_path): + for file in files: + file_path = Path(root) / file + flood_event_paths.append(file_path) + + n_events = len(flood_event_paths) + event_per_task = math.ceil(n_events / task_count) + event_batches = list(itertools.batched(flood_event_paths, event_per_task)) + try: + event_paths_to_run = event_batches[task_id] + except IndexError: + print(f"No events for {task_id=} {task_count=} {n_events=}") + sys.exit() + + for event_path in event_paths_to_run: + event_key = event_path.stem + flooded_road_max_speed = pd.read_csv(event_path) + # maximum vehicle speeds (mph) -> flooded roads (id/e_id) + max_flow_speed_dict = flooded_road_max_speed.set_index("id")[ + "max_speed_mph_adjusted" + ].to_dict() + network_flow_model(max_flow_speed_dict, event_key) + + +if __name__ == "__main__": + try: + task_id = int(sys.argv[1]) + task_count = int(sys.argv[2]) + except IndexError: + print(f"Usage: python {__file__} ") + main(task_id, task_count) diff --git a/scripts/road_disruption_damage.py b/scripts/road_disruption_damage.py new file mode 100644 index 0000000..c37cf11 --- /dev/null +++ b/scripts/road_disruption_damage.py @@ -0,0 +1,287 @@ +# %% +import os +import warnings +from pathlib import Path + +from snail import io, intersection, damages +import pandas as pd +import numpy as np +import geopandas as gpd +from nird.utils import load_config + +from collections import defaultdict + +warnings.simplefilter("ignore") +base_path = Path(load_config()["paths"]["damage_analysis"]) +raster_path = Path(load_config()["paths"]["JBA_data"]) +raster_path = raster_path / "12-14_2007 Summer_UK Floods" + + +# %% +def calculate_damages(raster_path, raster_key, features, damage_curves, cost_dict): + print(f"Intersecting features with raster {raster_key}...") + # read the raster data: depth - meter + raster = io.read_raster_band_data(raster_path) + + # run the intersection analysis + grid, _ = io.read_raster_metadata(raster_path) + prepared = intersection.prepare_linestrings(features) + intersections = intersection.split_linestrings(prepared, grid) + intersections = intersection.apply_indices(intersections, grid) + + intersections[f"{raster_key}_depth"] = intersection.get_raster_values_for_splits( + intersections, raster + ) + # reproject back + intersections = intersections.to_crs("epsg:27700") + + # calculate damage fractions + print("Computing direct damages...") + # create damage curves (Motorways - 4 curves; others - 2 curves) + curves = ["M_SA_LF", "M_SA_HF", "M_NSA_LF", "M_NSA_HF", "Other_LF", "Other_HF"] + for col in curves: + intersections[f"{raster_key}_{col}_damage_fraction"] = np.nan + for col in curves: + intersections[f"{raster_key}_{col}_damage_value_L"] = np.nan + for col in curves: + intersections[f"{raster_key}_{col}_damage_value_U"] = np.nan + + # Motorways and A roads + MA_curves = ["M_SA_LF", "M_SA_HF", "M_NSA_LF", "M_NSA_HF"] + intersections_M = intersections[intersections.road_classification == "Motorway"] + for curve in MA_curves: + intersections_M[f"{raster_key}_{curve}_damage_fraction"] = damage_curves[ + curve + ].damage_fraction(intersections_M[f"{raster_key}_depth"]) + + intersections_A = intersections[intersections.road_classification == "A Road"] + for curve in MA_curves: + intersections_A[f"{raster_key}_{curve}_damage_fraction"] = damage_curves[ + curve + ].damage_fraction(intersections_A[f"{raster_key}_depth"]) + + # B Roads + B_curves = ["Other_LF", "Other_HF"] + intersections_B = intersections[intersections.road_classification == "B Road"] + for curve in B_curves: + intersections_B[f"{raster_key}_{curve}_damage_fraction"] = damage_curves[ + curve + ].damage_fraction(intersections_B[f"{raster_key}_depth"]) + + # calculate cost of damage + # Motorway and A roads: 4 damage curves for M & A roads + for case in MA_curves: + intersections_M[f"{raster_key}_{case}_damage_value_L"] = ( + intersections_M.geometry.length + * cost_dict["ML"] + * intersections_M[f"{raster_key}_{case}_damage_fraction"] + ) + intersections_M[f"{raster_key}_{case}_damage_value_U"] = ( + intersections_M.geometry.length + * cost_dict["MU"] + * intersections_M[f"{raster_key}_{case}_damage_fraction"] + ) + intersections_A[f"{raster_key}_{case}_damage_value_L"] = ( + intersections_A.geometry.length + * cost_dict["AL"] + * intersections_A[f"{raster_key}_{case}_damage_fraction"] + ) + intersections_A[f"{raster_key}_{case}_damage_value_U"] = ( + intersections_A.geometry.length + * cost_dict["AU"] + * intersections_A[f"{raster_key}_{case}_damage_fraction"] + ) + + # B Roads: 2 damage curves for B roads + for case in B_curves: + intersections_B[f"{raster_key}_{case}_damage_value_L"] = ( + intersections_B.geometry.length + * cost_dict["BL"] + * intersections_B[f"{raster_key}_{case}_damage_fraction"] + ) + intersections_B[f"{raster_key}_{case}_damage_value_U"] = ( + intersections_B.geometry.length + * cost_dict["BU"] + * intersections_B[f"{raster_key}_{case}_damage_fraction"] + ) + + # combine results + intersections = pd.concat( + [intersections_M, intersections_A, intersections_B], axis=0, ignore_index=True + ) + + # calculate min and max damage values + intersections[f"{raster_key}_min_damage_values"] = np.nan + intersections[f"{raster_key}_max_damage_values"] = np.nan + damage_value_columns_MA = [ + col + for col in intersections.columns + if ("damage_value" in col) & (f"{raster_key}_M" in col) + ] + intersections.loc[ + (intersections.road_classification == "Motorway") + | (intersections.road_classification == "A Road"), + f"{raster_key}_min_damage_values", + ] = intersections[damage_value_columns_MA].min(axis=1) + + intersections.loc[ + (intersections.road_classification == "Motorway") + | (intersections.road_classification == "A Road"), + f"{raster_key}_max_damage_values", + ] = intersections[damage_value_columns_MA].max(axis=1) + + damage_value_columns_B = [ + col + for col in intersections.columns + if ("damage_value" in col) & (f"{raster_key}_Other" in col) + ] + intersections.loc[ + intersections.road_classification == "B Road", f"{raster_key}_min_damage_values" + ] = intersections[damage_value_columns_B].min(axis=1) + intersections.loc[ + intersections.road_classification == "B Road", f"{raster_key}_max_damage_values" + ] = intersections[damage_value_columns_B].max(axis=1) + + # extract partial datasets + columns_to_extract = ["id", "road_classification", f"{raster_key}_depth"] + columns_to_extract.extend( + [col for col in intersections.columns if "damage_value" in col] + ) + intersections = intersections[columns_to_extract] + print("The damage calculation is completed!") + return intersections + + +def create_damage_curves(damage_ratio_df): + list_of_damage_curves = [] + cols = damage_ratio_df.columns[1:] + for col in cols: + damage_ratios = damage_ratio_df[["intensity", col]] + damage_ratios.rename(columns={col: "damage"}, inplace=True) + damage_curve = damages.PiecewiseLinearDamageCurve(damage_ratios) + list_of_damage_curves.append(damage_curve) + + damage_curve_dict = defaultdict() + keys = ["M_SA_LF", "M_SA_HF", "M_NSA_LF", "M_NSA_HF", "Other_LF", "Other_HF"] + for idx in range(len(keys)): + key = keys[idx] + damage_curve = list_of_damage_curves[idx] + damage_curve_dict[key] = damage_curve + + return damage_curve_dict + + +def clip_features(features, clip_path, raster_key): + print(f"Clipping features based on {raster_key}...") + clips = gpd.read_file(clip_path, engine="pyogrio") + if clips.crs != "epsg:4326": + clips = clips.to_crs("epsg:4326") + # clipped_features = gpd.overlay(features, clips, how="intersection") + assert ( + clips.crs == features.crs + ), "CRS mismatch! Ensure both layers use the same CRS." + clipped_features = gpd.sjoin(features, clips, how="inner", op="intersects") + clipped_features = clipped_features[features.columns] + clipped_features.reset_index(drop=True, inplace=True) + return clipped_features + + +def main(): + # damage curves + damage_ratio_df = pd.read_excel( + base_path / "inputs" / "damage_ratio_road_flood.xlsx" + ) + damage_curve_dict = create_damage_curves(damage_ratio_df) + + # damage costs + damage_cost_dict = { + "ML": 3470.158, + "MU": 34701.582, + "AL": 991.474, + "AU": 2974.421, + "BL": 495.737, + "BU": 1487.211, + } + # read feature data + road_links = gpd.read_parquet(base_path / "inputs" / "road_link_file.geoparquet") + road_links = road_links.to_crs("epsg:4326") + + # classify flood events + raster_path_with_vector = [] + raster_path_without_vector = [] + for root, dirs, _ in os.walk(raster_path): + # check if both "Raster" and "Vector" directories exist + if "Raster" in dirs: + raster_dir = os.path.join(root, "Raster") + vector_file_list = [] + # if "Vector" exists, collect vector files + if "Vector" in dirs: + vector_dir = os.path.join(root, "Vector") + vector_file_list = [ + f for f in os.listdir(vector_dir) if f.endswith(".shp") + ] + # process raster files in the "Raster" directory + for raster_root, _, raster_files in os.walk(raster_dir): + for raster_file in raster_files: + if ( + raster_file.endswith(".tif") + and "RD" in raster_file # RD: Raster Depth + and "IE" not in raster_file # IE: Ireland + ): + file_path = os.path.join(raster_root, raster_file) + raster_key = raster_file.split(".")[0] + vector_file = f"{raster_key}.shp" + vector_file_VE = vector_file.replace("RD", "VE") + vector_file_PR = vector_file.replace("RD", "PR") + if (vector_file_VE in vector_file_list) or ( + vector_file_PR in vector_file_list + ): + raster_path_with_vector.append(file_path) + else: + raster_path_without_vector.append(file_path) + """ + # create vector data for rasters within raster_path_without_vector + # gdal_polygonize xxx + """ + # damage analysis (with vector data) + for flood_path in raster_path_with_vector: + # raster data path + flood_key = flood_path.split("\\")[-1].split(".")[0] + + # clip data path, clip analysis + clip_path = ( + flood_path.replace("Raster", "Vector") + .replace("tif", "shp") + .replace("RD", "VE") + ) + if not os.path.exists(clip_path): + clip_path = clip_path.replace("VE", "PR") + clipped_features = clip_features(road_links, clip_path, flood_key) + + # output path + intersections_path1 = base_path / "outputs" / f"{flood_key}_damages.csv" + intersections_path2 = base_path / "outputs" / f"{flood_key}_damages_gp.csv" + + # damage analysis + intersections_with_damages = calculate_damages( + flood_path, flood_key, clipped_features, damage_curve_dict, damage_cost_dict + ) + intersections_with_damages.to_csv(intersections_path1, index=False) + + intersections_with_damages = intersections_with_damages.groupby( + by=["id"], as_index=False + ).agg( + { + f"{flood_key}_depth": "max", + f"{flood_key}_min_damage_values": "sum", + f"{flood_key}_max_damage_values": "sum", + } + ) + intersections_with_damages.rename( + columns={f"{flood_key}_depth": f"{flood_key}_max_depth"}, inplace=True + ) + intersections_with_damages.to_csv(intersections_path2, index=False) + + +if __name__ == "__main__": + main() diff --git a/scripts/road_disruption_maxSpeed.py b/scripts/road_disruption_maxSpeed.py new file mode 100644 index 0000000..eaea7be --- /dev/null +++ b/scripts/road_disruption_maxSpeed.py @@ -0,0 +1,96 @@ +"""This script calculate the maximum vehicle speed on the flooded roads +based on the flood intensity (depth) +inputs: flood intensity (m) -> mm +outputs: maximum vehicle speed (mph) +""" + +import os +import warnings +from pathlib import Path + +import json +import pandas as pd + +import geopandas as gpd +from nird.utils import load_config +import nird.constants as cons + + +warnings.simplefilter("ignore") + +base_path = Path(load_config()["paths"]["base_path"]) +damage_path = Path(load_config()["paths"]["damage_analysis"]) +flood_path = damage_path / "outputs" + + +def identify_flood_scenarios(): + case_paths = [] + for root, _, files in os.walk(flood_path): + for file in files: + if "gp" in file and file.endswith(".csv"): + file_dir = os.path.join(root, file) + case_paths.append(file_dir) + return case_paths + + +def max_edge_speed_under_flood(depth: float) -> float: + """Calculate the maximum vehicle speed (mph) based on flood depth (mm)""" + if depth < 300: # mm + value = 0.0009 * depth**2 - 0.5529 * depth + 86.9448 # kph + return value / cons.CONV_MILE_TO_KM + else: + return 0.0 # mph + + +def road_reclassification(road_classification, form_of_way): + if road_classification == "Motorway": + return "M" + elif road_classification == "B Road": + return "B" + elif road_classification == "A Road" and form_of_way == "Single Carriageway": + return "A_single" + else: + return "A_dual" + + +def calculate_road_max_speed(road_link_file, min_speed_dict, output_path): + road_link_file["label"] = road_link_file.apply( + lambda x: road_reclassification(x.road_classification, x.form_of_way), axis=1 + ) + road_link_file["min_speed_mph"] = road_link_file["label"].map(min_speed_dict) + road_min_speed_dict = road_link_file.set_index("id")["min_speed_mph"].to_dict() + case_paths = identify_flood_scenarios() + for case_path in case_paths: + case_key = case_path.split("\\")[-1][:-15] + temp = pd.read_csv(case_path) + e_ids = temp.iloc[:, 0] + intensites = temp.iloc[:, 1] + case_df = pd.DataFrame({"id": e_ids, "fld_depth_meter": intensites}) + case_df["min_speed_mph"] = case_df.id.map(road_min_speed_dict) + case_df["max_speed_mph"] = case_df.fld_depth_meter.apply( + lambda x: max_edge_speed_under_flood(x * 1000) + ) + case_df["max_speed_mph_adjusted"] = case_df.max_speed_mph + case_df.loc[ + case_df.max_speed_mph < case_df.min_speed_mph, "max_speed_mph_adjusted" + ] = 0.0 + case_df["max_speed_mph_adjusted"] = case_df["max_speed_mph_adjusted"].round(1) + case_df.to_csv(output_path / f"{case_key}.csv", index=False) + + +def main(): + # attach minimum speed to each raod edge according to road_classification and form_of_ways + road_link_file = gpd.read_parquet( + base_path / "networks" / "road" / "road_link_file.geoparquet" + ) + # min speed limit + with open(base_path / "parameters" / "min_speed_cap.json", "r") as f: + min_speed_dict = json.load(f) + output_path = base_path / "disruption_analysis" + + # calculate the maximum vehicle speed under floods + calculate_road_max_speed(road_link_file, min_speed_dict, output_path) + + +if __name__ == "__main__": + main()