diff --git a/examples/data/optimal_routes_analysis_with_hazard/network.ini b/examples/data/optimal_routes_analysis_with_hazard/network.ini index 6e62313f4..a9446eeff 100644 --- a/examples/data/optimal_routes_analysis_with_hazard/network.ini +++ b/examples/data/optimal_routes_analysis_with_hazard/network.ini @@ -32,6 +32,6 @@ hazard_crs = EPSG:32736 [cleanup] snapping_threshold = None segmentation_length = None -merge_lines = True +merge_lines = False merge_on_id = False cut_at_intersections = False \ No newline at end of file diff --git a/ra2ce/network/network_simplification/nx_to_snkit_network_converter.py b/ra2ce/network/network_simplification/nx_to_snkit_network_converter.py index 17e1d47c8..ecbf6fd90 100644 --- a/ra2ce/network/network_simplification/nx_to_snkit_network_converter.py +++ b/ra2ce/network/network_simplification/nx_to_snkit_network_converter.py @@ -25,6 +25,8 @@ from shapely.geometry import LineString, Point from snkit.network import Network as SnkitNetwork +from ra2ce.network import add_x_y_to_nodes + NxGraph = nx.Graph | nx.MultiGraph | nx.MultiDiGraph @@ -49,6 +51,8 @@ def convert(self) -> SnkitNetwork: """ # Extract graph values _crs = self.networkx_graph.graph.get("crs", None) + # add x and y to the nodes of a graph + self.networkx_graph = add_x_y_to_nodes(self.networkx_graph) # Create new network snkit_network = SnkitNetwork() diff --git a/ra2ce/network/network_simplification/snkit_network_merge_wrapper.py b/ra2ce/network/network_simplification/snkit_network_merge_wrapper.py index 3923a8324..eac76f852 100644 --- a/ra2ce/network/network_simplification/snkit_network_merge_wrapper.py +++ b/ra2ce/network/network_simplification/snkit_network_merge_wrapper.py @@ -20,7 +20,9 @@ import geopandas as gpd import networkx as nx +import numpy as np import pandas as pd +from networkx import MultiGraph from shapely.geometry import LineString, MultiLineString, MultiPoint, Point from shapely.ops import linemerge from snkit.network import Network as SnkitNetwork @@ -40,6 +42,7 @@ def merge_edges( snkit_network: SnkitNetwork, + networkx_graph: NxGraph, aggregate_func: str | dict, by: str | list, id_col: str, @@ -49,6 +52,7 @@ def merge_edges( Args: snkit_network (SnkitNetwork): network to merge. + networkx_graph (NxGraph): networkx graph to merge aggregate_func (str | dict): Aggregation function to apply. by (str | list): Arguments (column names). id_col (str, optional): Name of the column representing the 'id'. @@ -57,7 +61,7 @@ def merge_edges( SnkitNetwork: _description_ """ - def _node_connectivity_degree(node, snkit_network: SnkitNetwork) -> int: + def node_connectivity_degree(node, snkit_network: SnkitNetwork) -> int: return len( snkit_network.edges[ (snkit_network.edges.from_id == node) @@ -65,13 +69,13 @@ def _node_connectivity_degree(node, snkit_network: SnkitNetwork) -> int: ] ) - def _get_edge_ids_to_update(edges_list: list) -> list: + def get_edge_ids_to_update(edges_list: list) -> list: ids_to_update = [] for edges in edges_list: ids_to_update.extend(edges.id.tolist()) return ids_to_update - def _get_merged_edges( + def get_merged_edges( paths_to_group: list, by: list, aggfunc: str | dict, @@ -94,62 +98,162 @@ def _get_merged_edges( updated_edges_gdf = updated_edges_gdf.drop(columns=["id"]) return updated_edges_gdf - def _get_edge_paths(node_set: set, snkit_network: SnkitNetwork) -> list: - # Convert edges to an adjacency list using vectorized operations - edge_dict = defaultdict(set) - from_ids = snkit_network.edges["from_id"].values - to_ids = snkit_network.edges["to_id"].values - - for from_id, to_id in zip(from_ids, to_ids): - edge_dict[from_id].add(to_id) - edge_dict[to_id].add(from_id) + def filter_node(_node_set: set, _degrees: int) -> set: + _filtered = set() + for _node_id in _node_set: + # Get the predecessors (antecedents) and successors (precedents) to make sure for to filter correctly + # For _node_set with all degree 2: this filters on the nodes that have only one predecessor and successor. + # E.g. filters on 2 in 1->2, 1->5, 2->3, 3->4, 3->5 + # For _node_set with all degree 4: Check if the predecessors and successors are the same nodes. + # E.g. filters on 2 in 1->2, 2->3, 2->1, 3->2. + predecessors = list(networkx_graph.predecessors(_node_id)) + successors = list(networkx_graph.successors(_node_id)) + + # Check the degree of the _node_set and the corresponding criterium. + if ((_degrees == 2 and len(predecessors) == len(successors) == 1) or + (_degrees == 4 and sorted(predecessors) == sorted(successors))): + _filtered.add(_node_id) + return _filtered + + def get_edge_paths(node_set: set, _snkit_network: SnkitNetwork) -> list: + def get_adjacency_list(edges_gdf: gpd.GeoDataFrame, from_id_column: str, + to_id_column: str) -> defaultdict: + # Convert the edges of a GeoDataFrame to an adjacency list using vectorized operations. + _edge_dict = defaultdict(set) + # Extract the 'from_id' and 'to_id' columns as numpy arrays for efficient processing + from_ids = edges_gdf[from_id_column].values + to_ids = edges_gdf[to_id_column].values + # Vectorized operation to populate the adjacency list + for from_id, to_id in np.nditer([from_ids, to_ids]): + _edge_dict[int(from_id)].add(int(to_id)) + _edge_dict[int(to_id)].add(int(from_id)) + + return _edge_dict + + def retrieve_edge(node1: int | float, node2: int | float) -> gpd.GeoDataFrame: + """Retrieve the edge from snkit_network.edges GeoDataFrame between two nodes.""" + edge = _snkit_network.edges[ + (_snkit_network.edges['from_id'] == node1) & + (_snkit_network.edges['to_id'] == node2) + ] + return edge if not edge.empty else None + + def construct_path(start_node: int | float, end_node: int | float, intermediate_nodes: list) -> pd.DataFrame | None: + path = [] + current_node = start_node + _intermediates = intermediate_nodes.copy() + + _explored_nodes = [] + # Ensure we go through all the items. + for _ in range(len(intermediate_nodes)): + # Filter out nodes already used for edge retrieval + _to_explore = filter(lambda x: x not in _explored_nodes, intermediate_nodes) + for _next_node in _to_explore: + _edge = retrieve_edge(current_node, _next_node) + if _edge is not None: + path.append(_edge) + _explored_nodes.append(_next_node) + current_node = _next_node + + final_edge = retrieve_edge(current_node, end_node) + if final_edge is not None: + path.append(final_edge) + + if len(path) > 0 and all(edge is not None for edge in path): + return pd.concat(path) # Combine edges into a single GeoDataFrame + return None + + def find_and_append_degree_4_paths( + _edge_paths: list + ) -> list: + _edge_paths_results = _edge_paths + boundary_nodes = list(node_path - filtered_degree_4_set) + if len(boundary_nodes) == 2: + from_node, to_node = boundary_nodes + _intermediates = list(node_path & filtered_degree_4_set) + + # Construct and append the forward path + forward_gdf = construct_path(from_node, to_node, _intermediates) + if forward_gdf is not None: + _edge_paths_results.append(forward_gdf) + + # Construct and append the backward path + backward_gdf = construct_path(to_node, from_node, _intermediates) + if backward_gdf is not None: + _edge_paths_results.append(backward_gdf) + return _edge_paths_results - edge_paths = [] + # Convert edges to an adjacency list using vectorized operations + edge_dict = get_adjacency_list(edges_gdf=snkit_network.edges, from_id_column="from_id", to_id_column="to_id") + _edge_paths: list = [] + # find the edge paths for the nodes in node_set while node_set: + # for each node in node_set find other nodes on the path + intermediates = set() popped_node = node_set.pop() + # intermediates are the nodes in the node_path that are between two other nodes in the path + intermediates.add(popped_node) node_path = {popped_node} candidates = {popped_node} while candidates: popped_cand = candidates.pop() + # matches are the nodes that belong to a node_path matches = edge_dict[popped_cand] matches = matches - node_path for match in matches: + intermediates.add(popped_cand) + # if the found node on the path is in the node_set, then keep looking for other connected nodes on + # the path if match in node_set: candidates.add(match) node_path.add(match) node_set.remove(match) + # If the found node is not in node_set stop. else: node_path.add(match) + # After finding all nodes on a path find the edges that are connected to these nodes. + # Finding the edges is different for the nodes in the node path with degree 2 and 4. if len(node_path) >= 2: - edge_paths.append( - snkit_network.edges.loc[ - (snkit_network.edges.from_id.isin(node_path)) - & (snkit_network.edges.to_id.isin(node_path)) - ] - ) - return edge_paths + if any(node_path.intersection(filtered_degree_4_set)): + # node_path has nodes with degree 4 => get the forward and backward paths + _edge_paths = find_and_append_degree_4_paths(_edge_paths) + else: + # node_path has nodes with degree 2 => find the edges connected to the intermediates + edge_paths_gdf = snkit_network.edges[ + snkit_network.edges.from_id.isin(intermediates) | + snkit_network.edges.to_id.isin(intermediates) + ] + _edge_paths.append(edge_paths_gdf) + return _edge_paths + # Adds degree column which is needed to find the to-be-simplified nodes and edges. if "degree" not in snkit_network.nodes.columns: snkit_network.nodes["degree"] = snkit_network.nodes[id_col].apply( - lambda x: _node_connectivity_degree(x, snkit_network) + lambda x: node_connectivity_degree(x, snkit_network) ) - degree_2 = list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 2]) - degree_2_set = set(degree_2) - edge_paths = _get_edge_paths(degree_2_set, snkit_network) + # Filter on the nodes with degree 2 and 4 which suffice the following criteria: + # For _node_set with all degree 2: this filters on the nodes that have only one predecessor and successor. + # E.g. filters on 2 in 1->2, 1->5, 2->3, 3->4, 3->5 + # For _node_set with all degree 4: Check if the predecessors and successors are the same nodes. + # E.g. filters on 2 in 1->2, 2->3, 2->1, 3->2. + degree_2_set = set(list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 2])) + filtered_degree_2_set = filter_node(degree_2_set, _degrees=2) + + degree_4_set = set(list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 4])) + filtered_degree_4_set = filter_node(degree_4_set, _degrees=4) - edge_ids_to_update = _get_edge_ids_to_update(edge_paths) + nodes_of_interest = filtered_degree_2_set | filtered_degree_4_set + + edge_paths = get_edge_paths(sorted(nodes_of_interest), snkit_network) + + edge_ids_to_update = get_edge_ids_to_update(edge_paths) edges_to_keep = snkit_network.edges[ ~snkit_network.edges["id"].isin(edge_ids_to_update) ] - updated_edges = _get_merged_edges( - paths_to_group=edge_paths, - by=by, - aggfunc=aggregate_func, - net=snkit_network, - ) + updated_edges = get_merged_edges(paths_to_group=edge_paths, by=by, aggfunc=aggregate_func, net=snkit_network) edges_to_keep = edges_to_keep.drop(columns=["id"]) updated_edges = updated_edges.reset_index(drop=True) @@ -164,7 +268,7 @@ def _get_edge_paths(node_set: set, snkit_network: SnkitNetwork) -> list: merged_snkit_network = SnkitNetwork(nodes=new_nodes_gdf, edges=new_edges_gdf) merged_snkit_network.nodes["degree"] = merged_snkit_network.nodes[id_col].apply( - lambda x: _node_connectivity_degree(x, merged_snkit_network) + lambda x: node_connectivity_degree(x, merged_snkit_network) ) return merged_snkit_network diff --git a/ra2ce/network/network_simplification/snkit_network_wrapper.py b/ra2ce/network/network_simplification/snkit_network_wrapper.py index bae06d170..8ebab34fd 100644 --- a/ra2ce/network/network_simplification/snkit_network_wrapper.py +++ b/ra2ce/network/network_simplification/snkit_network_wrapper.py @@ -43,9 +43,21 @@ class SnkitNetworkWrapper: """ snkit_network: Network - node_id_column_name: str = "id" - edge_from_id_column_name: str = "from_id" - edge_to_id_column_name: str = "to_id" + node_id_column_name: str + edge_from_id_column_name: str + edge_to_id_column_name: str + + def __init__( + self, + snkit_network: Network, + node_id_column_name: str, + edge_from_id_column_name: str, + edge_to_id_column_name: str, + ) -> None: + self.snkit_network = snkit_network + self.node_id_column_name = node_id_column_name + self.edge_from_id_column_name = edge_from_id_column_name + self.edge_to_id_column_name = edge_to_id_column_name @classmethod def from_networkx( @@ -80,17 +92,18 @@ def filter_excluded_attributes() -> list[str]: _attributes_to_exclude = filter_excluded_attributes() if "demand_edge" not in _attributes_to_exclude: - _aggregate_function = self._aggfunc_with_demand_edge( - cols, _attributes_to_exclude + _aggregate_function = self._aggrfunc( + cols, _attributes_to_exclude, with_demand=True ) else: - _aggregate_function = self._aggfunc_no_demand_edge( - cols, _attributes_to_exclude + _aggregate_function = self._aggrfunc( + cols, _attributes_to_exclude, with_demand=False ) # Overwrite the existing network with the merged edges. self.snkit_network = merge_edges( snkit_network=self.snkit_network, + networkx_graph=self.to_networkx(), aggregate_func=_aggregate_function, by=_attributes_to_exclude, id_col="id", @@ -128,7 +141,7 @@ def to_networkx(self) -> NxGraph: edge_to_id_column_name=self.edge_to_id_column_name, ).convert() - def _aggfunc_with_demand_edge(self, cols, attributes_to_exclude: list[str]): + def _aggrfunc(self, cols, attributes_to_exclude: list[str], with_demand: bool): def aggregate_column(col_data, col_name: str): if col_name in attributes_to_exclude: return col_data.iloc[0] @@ -136,32 +149,14 @@ def aggregate_column(col_data, col_name: str): return list(col_data) elif col_name in ["maxspeed", "avgspeed"]: return col_data.mean() - elif col_name == "demand_edge": + elif with_demand and col_name == "demand_edge": return max(col_data) elif col_data.dtype == "O": - return "; ".join( - str(item) for item in col_data if isinstance(item, str) - ) - else: - return col_data.iloc[0] - - return { - col: (lambda col_data, col_name=col: aggregate_column(col_data, col_name)) - for col in cols - } - - def _aggfunc_no_demand_edge(self, cols, attributes_to_exclude: list[str]): - def aggregate_column(col_data, col_name: str): - if col_name in attributes_to_exclude: - return col_data.iloc[0] - elif col_name == "rfid_c": - return list(col_data) - elif col_name in ["maxspeed", "avgspeed"]: - return col_data.mean() - elif col_data.dtype == "O": - return "; ".join( - str(item) for item in col_data if isinstance(item, str) - ) + col_data_unique_values = list(set(col_data)) + if len(col_data_unique_values) == 1: + return col_data_unique_values[0] + else: + return str(col_data_unique_values) else: return col_data.iloc[0] diff --git a/ra2ce/network/network_simplification/snkit_to_nx_network_converter.py b/ra2ce/network/network_simplification/snkit_to_nx_network_converter.py index 15f9128bd..7b3e0af21 100644 --- a/ra2ce/network/network_simplification/snkit_to_nx_network_converter.py +++ b/ra2ce/network/network_simplification/snkit_to_nx_network_converter.py @@ -33,7 +33,7 @@ class SnkitToNxNetworkConverter: edge_from_id_column_name: str = "from_id" edge_to_id_column_name: str = "to_id" - def convert(self) -> nx.MultiGraph: + def convert(self) -> nx.MultiDiGraph: """ Converts the given `snkit.network.Network` into a matching `networkx.MultiGraph`. @@ -42,10 +42,10 @@ def convert(self) -> nx.MultiGraph: snkit_network (SnkitNetwork): The snkit network to convert. Returns: - `networkx.MultiGraph`: The converted graph. + `networkx.MultiDiGraph`: The converted graph. """ # Define new graph - _nx_graph = nx.MultiGraph() + _nx_graph = nx.MultiDiGraph() _crs = self.snkit_network.edges.crs # Add nodes to the graph diff --git a/tests/network/network_simplification/test_network_graph_simplificator.py b/tests/network/network_simplification/test_network_graph_simplificator.py index d1b496d67..474352e56 100644 --- a/tests/network/network_simplification/test_network_graph_simplificator.py +++ b/tests/network/network_simplification/test_network_graph_simplificator.py @@ -1,11 +1,12 @@ import math -import random from typing import Callable, Iterator import networkx as nx import numpy as np import pytest -from shapely.geometry import LineString, Point +from altgraph.Graph import Graph +from networkx import DiGraph +from shapely.geometry import LineString from ra2ce.network import add_missing_geoms_graph from ra2ce.network.network_simplification.network_graph_simplificator import ( @@ -26,24 +27,33 @@ def _detailed_edge_comparison( graph1: nx.MultiDiGraph | nx.MultiGraph, graph2: nx.MultiDiGraph | nx.MultiGraph ) -> bool: - def _dicts_comparison( - graph1: nx.MultiDiGraph | nx.MultiGraph, graph2: nx.MultiDiGraph | nx.MultiGraph + def dicts_comparison( + graph_a: nx.MultiDiGraph | nx.MultiGraph, + graph_b: nx.MultiDiGraph | nx.MultiGraph, ) -> bool: - for u, v, k, data1 in graph1.edges(keys=True, data=True): - data2 = graph2.get_edge_data(u, v, k) - for key1, value1 in data1.items(): - if key1 not in data2: - return False - if isinstance(value1, float) and math.isnan(value1): - if not math.isnan(data2[key1]): - return False - continue - if value1 != data2[key1]: - return False + for u, v, k, data1 in graph_a.edges(keys=True, data=True): + geom1 = data1["geometry"] + geom_found = 0 + + data2_dict = graph_b.get_edge_data(u, v) + for _, data2 in data2_dict.items(): + if data2["geometry"] == geom1: + geom_found = 1 + for key1, value1 in data1.items(): + if key1 not in data2: + return False + if isinstance(value1, float) and math.isnan(value1): + if not math.isnan(data2[key1]): + return False + continue + if value1 != data2[key1]: + return False + if geom_found == 0: + return False return True - check_1_2 = _dicts_comparison(graph1, graph2) - check_2_1 = _dicts_comparison(graph2, graph1) + check_1_2 = dicts_comparison(graph1, graph2) + check_2_1 = dicts_comparison(graph2, graph1) if check_1_2 and check_2_1: return True @@ -132,26 +142,31 @@ def _get_network_simplification_with_attribute_exclusion( def _get_nx_digraph_factory(self) -> Iterator[Callable[[], nx.MultiDiGraph]]: def create_nx_multidigraph(): _nx_digraph = nx.MultiDiGraph() - for i in range(1, 16): - _nx_digraph.add_node(i, x=i, y=i * 10) - - _nx_digraph.add_edge(1, 2, a=np.nan) - _nx_digraph.add_edge(2, 1, a=np.nan) - _nx_digraph.add_edge(2, 3, a=np.nan) - _nx_digraph.add_edge(3, 4, a=np.nan) - _nx_digraph.add_edge(4, 5, a="yes") - _nx_digraph.add_edge(5, 6, a="yes") - _nx_digraph.add_edge(6, 7, a="yes") - _nx_digraph.add_edge(7, 8, a=np.nan) - _nx_digraph.add_edge(8, 9, a=np.nan) - _nx_digraph.add_edge(8, 12, a=np.nan) - _nx_digraph.add_edge(8, 13, a="yes") - _nx_digraph.add_edge(9, 10, a=np.nan) - _nx_digraph.add_edge(10, 11, a=np.nan) - _nx_digraph.add_edge(11, 12, a="yes") - _nx_digraph.add_edge(13, 14, a="yes") - _nx_digraph.add_edge(14, 15, a="yes") - _nx_digraph.add_edge(15, 11, a="yes") + _nx_digraph.add_nodes_from( + [(i, {"x": i, "y": i * 10}) for i in range(1, 19)] + ) + + _nx_digraph.add_edge(1, 2, bridge="None") + _nx_digraph.add_edge(2, 1, bridge="None") + _nx_digraph.add_edge(2, 3, bridge="None") + _nx_digraph.add_edge(3, 4, bridge="None") + _nx_digraph.add_edge(4, 5, bridge="yes") + _nx_digraph.add_edge(5, 6, bridge="yes") + _nx_digraph.add_edge(6, 7, bridge="yes") + _nx_digraph.add_edge(7, 8, bridge="None") + _nx_digraph.add_edge(8, 9, bridge="None") + _nx_digraph.add_edge(8, 12, bridge="None") + _nx_digraph.add_edge(8, 13, bridge="yes") + _nx_digraph.add_edge(9, 10, bridge="None") + _nx_digraph.add_edge(10, 11, bridge="None") + _nx_digraph.add_edge(11, 12, bridge="yes") + _nx_digraph.add_edge(13, 14, bridge="yes") + _nx_digraph.add_edge(14, 15, bridge="yes") + _nx_digraph.add_edge(15, 11, bridge="yes") + _nx_digraph.add_edge(1, 16, bridge="None") + _nx_digraph.add_edge(16, 1, bridge="None") + _nx_digraph.add_edge(16, 17, bridge="None") + _nx_digraph.add_edge(16, 18, bridge="None") _nx_digraph = add_missing_geoms_graph(_nx_digraph, "geometry") _nx_digraph.graph["crs"] = "EPSG:4326" @@ -165,9 +180,34 @@ def create_nx_multidigraph(): def _get_expected_result_graph_fixture( self, nx_digraph_factory: nx.MultiDiGraph ) -> nx.MultiGraph: + def add_edge_with_attributes( + graph_to_shape: Graph | DiGraph, + edge_u: int | float, + edge_v: int | float, + value_to_exclude: str, + edge_node_ids: list, + ) -> Graph | DiGraph: + # Create a copy of the input graph + shaped_graph = graph_to_shape.copy() + + # Extract geometries programmatically using edge_node_ids + geometry_list = [ + _nx_digraph.nodes[n_id]["geometry"] for n_id in edge_node_ids + ] + + shaped_graph.add_edge( + edge_u, + edge_v, + bridge=value_to_exclude, + from_node=edge_u, + to_node=edge_v, + geometry=LineString(geometry_list), + ) + return shaped_graph + _nx_digraph = nx_digraph_factory() - _result_digraph = nx.MultiGraph() - node_ids_degrees = {2: 1, 4: 2, 7: 2, 8: 4, 11: 3, 12: 2} + _result_digraph = nx.MultiDiGraph() + node_ids_degrees = {2: 3, 4: 2, 7: 2, 8: 4, 11: 3, 12: 2, 16: 4, 17: 1, 18: 1} for node_id, degree in node_ids_degrees.items(): node_data = _nx_digraph.nodes[node_id] node_data["id"] = node_id @@ -175,107 +215,39 @@ def _get_expected_result_graph_fixture( _result_digraph.add_node(node_id, **node_data) _result_digraph = add_missing_geoms_graph(_result_digraph, "geometry") - _result_digraph.add_edge( - 2, - 4.0, - a="None", - from_node=2, - to_node=4, - geometry=LineString( - [ - _nx_digraph.nodes[2]["geometry"], - _nx_digraph.nodes[3]["geometry"], - _nx_digraph.nodes[4]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 2, 4.0, "None", [2, 3, 4] ) - - _result_digraph.add_edge( - 4, - 7.0, - a="yes", - from_node=4, - to_node=7, - geometry=LineString( - [ - _nx_digraph.nodes[4]["geometry"], - _nx_digraph.nodes[5]["geometry"], - _nx_digraph.nodes[6]["geometry"], - _nx_digraph.nodes[7]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 2, 16.0, "None", [2, 1, 16] ) - _result_digraph.add_edge( - 7, - 8.0, - a="None", - from_node=7, - to_node=8, - geometry=LineString( - [ - _nx_digraph.nodes[7]["geometry"], - _nx_digraph.nodes[8]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 4, 7.0, "yes", [4, 5, 6, 7] ) - _result_digraph.add_edge( - 8, - 11.0, - a="None", - from_node=8, - to_node=11, - geometry=LineString( - [ - _nx_digraph.nodes[8]["geometry"], - _nx_digraph.nodes[9]["geometry"], - _nx_digraph.nodes[10]["geometry"], - _nx_digraph.nodes[11]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 7, 8.0, "None", [7, 8] ) - _result_digraph.add_edge( - 8, - 11.0, - a="yes", - from_node=8, - to_node=11, - geometry=LineString( - [ - _nx_digraph.nodes[8]["geometry"], - _nx_digraph.nodes[13]["geometry"], - _nx_digraph.nodes[14]["geometry"], - _nx_digraph.nodes[15]["geometry"], - _nx_digraph.nodes[11]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 8, 11, "None", [8, 9, 10, 11] ) - _result_digraph.add_edge( - 8, - 12.0, - a="None", - from_node=8, - to_node=12, - geometry=LineString( - [ - _nx_digraph.nodes[8]["geometry"], - _nx_digraph.nodes[12]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 8, 11, "yes", [8, 13, 14, 15, 11] ) - _result_digraph.add_edge( - 11, - 12.0, - a="yes", - from_node=11, - to_node=12, - geometry=LineString( - [ - _nx_digraph.nodes[11]["geometry"], - _nx_digraph.nodes[12]["geometry"], - ] - ), + _result_digraph = add_edge_with_attributes( + _result_digraph, 8, 12, "None", [8, 12] + ) + _result_digraph = add_edge_with_attributes( + _result_digraph, 11, 12, "yes", [11, 12] + ) + _result_digraph = add_edge_with_attributes( + _result_digraph, 16, 2.0, "None", [16, 1, 2] + ) + _result_digraph = add_edge_with_attributes( + _result_digraph, 16, 17, "None", [16, 17] + ) + _result_digraph = add_edge_with_attributes( + _result_digraph, 16, 18, "None", [16, 18] ) - _result_digraph.graph["crs"] = "EPSG:4326" snkit_network = NxToSnkitNetworkConverter( @@ -296,7 +268,9 @@ def test_simplify_graph( expected_result_graph_fixture: nx.MultiDiGraph, ): network_simplification_with_attribute_exclusion.nx_graph = nx_digraph_factory() - network_simplification_with_attribute_exclusion.attributes_to_exclude = ["a"] + network_simplification_with_attribute_exclusion.attributes_to_exclude = [ + "bridge" + ] _graph_simple = network_simplification_with_attribute_exclusion.simplify_graph() @@ -304,7 +278,5 @@ def test_simplify_graph( assert _graph_simple.nodes(data=True) == expected_result_graph_fixture.nodes( data=True ) - # Compare edges topology - assert set(_graph_simple.edges()) == set(expected_result_graph_fixture.edges()) # Compare edges with attributes assert _detailed_edge_comparison(_graph_simple, expected_result_graph_fixture)