diff --git a/scripts/network_flow_model.py b/scripts/network_flow_model.py index 8aafc01..af7fd78 100644 --- a/scripts/network_flow_model.py +++ b/scripts/network_flow_model.py @@ -14,6 +14,7 @@ base_path = Path(load_config()["paths"]["base_path"]) +# %% if __name__ == "__main__": start_time = time.time() # model parameters @@ -40,8 +41,10 @@ od_node_2021 = pd.read_csv( base_path / "census_datasets" / "od_matrix" / "od_gb_oa_2021_node.csv" ) + + od_node_2021 = od_node_2021[od_node_2021.Car21 > 1].reset_index(drop=True) od_node_2021["Car21"] = od_node_2021["Car21"] * 2 - # od_node_2021 = od_node_2021.head(1000) + od_node_2021 = od_node_2021.head(100) print(f"total flows: {od_node_2021.Car21.sum()}") # generate OD pairs @@ -57,6 +60,7 @@ od_voc_dict, od_vot_dict, od_toll_dict, + od_flow_dict, isolated_od_dict, ) = func.network_flow_model( road_link_file, # road @@ -78,7 +82,12 @@ # change field types road_link_file.acc_flow = road_link_file.acc_flow.astype(int) road_link_file.acc_capacity = road_link_file.acc_capacity.astype(int) - # calculate od travel costs + + # append the simulation results (flows and costs) + od_node_2021["od_flow"] = od_node_2021.apply( + lambda row: od_flow_dict.get((row["origin_node"], row["destination_node"])), + axis=1, + ) # value or NAN od_node_2021["od_voc"] = od_node_2021.apply( lambda row: od_voc_dict.get((row["origin_node"], row["destination_node"])), axis=1, @@ -95,14 +104,16 @@ od_node_2021.od_voc + od_node_2021.od_vot + od_node_2021.od_toll ) isolated_od_df = pd.Series(isolated_od_dict).reset_index() - isolated_od_df.columns = ["origin", "destination", "isolated_flows"] print(f"The total simulation time: {time.time() - start_time}") # export files road_link_file.to_parquet( - base_path.parent / "outputs" / "gb_edge_flows_0816.geoparquet" - ) - od_node_2021.to_csv(base_path.parent / "outputs" / "od_costs_0816.csv", index=False) - isolated_od_df.to_csv( - base_path.parent / "outputs" / "isolated_od_flows_0816.csv", index=False + base_path.parent / "outputs" / "gb_edge_flows_test.geoparquet" ) + od_node_2021.to_csv(base_path.parent / "outputs" / "od_costs_test.csv", index=False) + + if isolated_od_df.shape[0] != 0: # in case of empty df + isolated_od_df.columns = ["origin", "destination", "isolated_flows"] + isolated_od_df.to_csv( + base_path.parent / "outputs" / "isolated_od_flows_test.csv", index=False + ) diff --git a/src/nird/road.py b/src/nird/road.py index 61b5e0e..da8d6ef 100644 --- a/src/nird/road.py +++ b/src/nird/road.py @@ -24,7 +24,7 @@ def select_partial_roads( road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, col_name: str, - list_of_values: List, + list_of_values: List[str], ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Extract partial road network based on road types. @@ -167,7 +167,7 @@ def find_nearest_node( def extract_od_pairs( od: pd.DataFrame, -) -> Tuple[List, Dict[str, List[str]], Dict[str, List[int]]]: +) -> Tuple[List[str], Dict[str, List[str]], Dict[str, List[int]]]: """Prepare the OD matrix. Parameters @@ -185,8 +185,8 @@ def extract_od_pairs( A dictionary recording a list of flows for each origin-destination pair. """ list_of_origin_nodes = [] - dict_of_destination_nodes: dict[str, list[str]] = defaultdict(list) - dict_of_origin_supplies: dict[str, list[float]] = defaultdict(list) + dict_of_destination_nodes: Dict[str, List[str]] = defaultdict(list) + dict_of_origin_supplies: Dict[str, List[float]] = defaultdict(list) for _, row in od.iterrows(): from_node = row["origin_node"] to_node = row["destination_node"] @@ -296,7 +296,6 @@ def initial_speed_func( return None -# update speed (mile/hour) according to edge flow (car/day) def speed_flow_func( road_type: str, isurban: int, @@ -550,8 +549,13 @@ def update_od_matrix( temp_flow_matrix: pd.DataFrame, supply_dict: Dict[str, List[float]], destination_dict: Dict[str, List[str]], - isolated_flow_dict: Dict[Tuple[str], float], -) -> Tuple[pd.DataFrame, List, Dict[str, List[float]], Dict[str, List[str]]]: + isolated_flow_dict: Dict[Tuple[str, str], float], +) -> Tuple[ + pd.DataFrame, + List[str], + Dict[str, List[float]], + Dict[str, List[str]], +]: """Update the OD matrix by removing unreachable desitinations from each origin; and origins with zero supplies. @@ -644,6 +648,7 @@ def update_network_structure( edge_operatecost_dict: dict The updated vehicle operating cost of edges. """ + zero_capacity_edges = set( temp_edge_flow.loc[temp_edge_flow["remaining_capacity"] < 1, "e_id"].tolist() ) # edge names @@ -713,7 +718,9 @@ def update_network_structure( ) -def find_least_cost_path(params: Tuple) -> Tuple[int, List, List, List]: +def find_least_cost_path( + params: Tuple, +) -> Tuple[int, List[str], List[int], List[float]]: """Find the least-cost path for each OD trip. Parameters: @@ -751,8 +758,12 @@ def find_least_cost_path(params: Tuple) -> Tuple[int, List, List, List]: def compute_edge_costs( - path: List, -) -> Tuple[Dict[Tuple[str], float], Dict[Tuple[str], float], Dict[Tuple[str]]]: + path: List[int], +) -> Tuple[ + Dict[Tuple[str, str], float], + Dict[Tuple[str, str], float], + Dict[Tuple[str, str], float], +]: """Calculate the total travel cost for the path Parameters @@ -769,6 +780,7 @@ def compute_edge_costs( od_toll: Dict Toll costs of each trip. """ + od_voc = edge_weight_df.loc[edge_weight_df["edge_idx"].isin(path), "edge_voc"].sum() od_vot = edge_weight_df.loc[edge_weight_df["edge_idx"].isin(path), "edge_vot"].sum() od_toll = edge_weight_df.loc[ @@ -820,7 +832,7 @@ def worker_init_edge(shared_network_pkl: bytes, shared_weight_pkl: bytes) -> Non def network_flow_model( road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, - list_of_origins: List, + list_of_origins: List[str], supply_dict: Dict[str, List[float]], destination_dict: Dict[str, List[str]], free_flow_speed_dict: Dict[str, float], @@ -832,10 +844,10 @@ def network_flow_model( Dict[str, float], Dict[str, float], Dict[str, float], - Dict[Tuple[str], float], - Dict[Tuple[str], float], - Dict[Tuple[str], float], - Dict[Tuple[str], float], + Dict[Tuple[str, str], float], + Dict[Tuple[str, str], float], + Dict[Tuple[str, str], float], + Dict[Tuple[str, str], float], ]: """Model the passenger flows on the road network. @@ -878,6 +890,7 @@ def network_flow_model( isolated_flow_dict: dict The isolated trips between each OD pair. """ + # network creation (igraph) ( network, @@ -925,7 +938,6 @@ def network_flow_model( edge_length_dict = ( road_links.set_index("e_id")["geometry"].length * cons.CONV_METER_TO_MILE ).to_dict() - # edge_toll_dict = road_links.set_index("e_id")["average_toll_cost"].to_dict() acc_flow_dict = road_links.set_index("e_id")["acc_flow"].to_dict() acc_capacity_dict = road_links.set_index("e_id")["acc_capacity"].to_dict() acc_speed_dict = road_links.set_index("e_id")["ave_flow_rate"].to_dict() @@ -933,10 +945,10 @@ def network_flow_model( # starts iter_flag = 1 isolated_flow_dict = defaultdict(float) - # od_cost_dict = defaultdict(float) od_voc_dict = defaultdict(float) od_vot_dict = defaultdict(float) od_toll_dict = defaultdict(float) + od_flow_dict = defaultdict(float) while total_remain > 0: print(f"No.{iter_flag} iteration starts:") # dump the network and edge weight for shared use in multiprocessing @@ -982,29 +994,25 @@ def network_flow_model( # compute the total travel cost for each OD trip st = time.time() args = [] - args = [(row["path"],) for _, row in temp_flow_matrix.iterrows()] + args = [row["path"] for _, row in temp_flow_matrix.iterrows()] with Pool( processes=20, initializer=worker_init_edge, initargs=(shared_network_pkl, shared_weight_pkl), ) as pool: - # temp_flow_matrix["unit_od_cost"] = pool.starmap(compute_edge_costs, args) - temp_flow_matrix[["unit_od_voc", "unit_od_vot", "unit_od_toll"]] = ( - pool.starmap(compute_edge_costs, args) + temp_flow_matrix[["unit_od_voc", "unit_od_vot", "unit_od_toll"]] = pool.map( + compute_edge_costs, args ) print(f"The computational time for OD costs: {time.time() - st}.") - # calculate the non-allocated flows and remaining flows ( temp_flow_matrix, list_of_origins, supply_dict, destination_dict, - # non_allocated_flow, ) = update_od_matrix( temp_flow_matrix, supply_dict, destination_dict, isolated_flow_dict ) - # total_non_allocated_flow += non_allocated_flow number_of_destinations = sum(len(value) for value in destination_dict.values()) print(f"The remaining number of origins: {len(list_of_origins)}") print(f"The remaining number of destinations: {number_of_destinations}") @@ -1093,12 +1101,13 @@ def network_flow_model( ) toll_cost += temp_cost.sum() - # update od trave costs: unit_od_cost * adjusted_flows + # update od trave costs: unit_od_cost for row in temp_flow_matrix.itertuples(index=False): key = (row.origin, row.destination) od_voc_dict[key] += row.unit_od_voc * row.flow od_vot_dict[key] += row.unit_od_vot * row.flow od_toll_dict[key] += row.unit_od_toll * row.flow + od_flow_dict[key] += row.flow print("Iteration stops: there is no edge overflow!") break @@ -1175,12 +1184,13 @@ def network_flow_model( temp_cost = temp_edge_flow["e_id"].map(edge_toll_dict) * temp_edge_flow["flow"] toll_cost += temp_cost.sum() - # update OD travel costs + # update OD travel costs (based on adjusted flows) for row in temp_flow_matrix.itertuples(index=False): key = (row.origin, row.destination) - od_voc_dict[key] += row.unit_od_voc * row.flow - od_vot_dict[key] += row.unit_od_vot * row.flow - od_toll_dict[key] += row.unit_od_toll * row.flow + od_voc_dict[key] += row.unit_od_voc * row.flow * r + od_vot_dict[key] += row.unit_od_vot * row.flow * r + od_toll_dict[key] += row.unit_od_toll * row.flow * r + od_flow_dict[key] += row.flow * r # update network structure (nodes and edges) ( @@ -1194,6 +1204,7 @@ def network_flow_model( network, edge_length_dict, acc_speed_dict, + edge_toll_dict, temp_edge_flow, ) @@ -1212,5 +1223,6 @@ def network_flow_model( od_voc_dict, od_vot_dict, od_toll_dict, + od_flow_dict, isolated_flow_dict, )