Skip to content

Commit

Permalink
chore: autoformat isort & black
Browse files Browse the repository at this point in the history
  • Loading branch information
sahand-asgarpour authored and github-actions[bot] committed Sep 16, 2024
1 parent 0cc6cf4 commit 8b4b679
Showing 1 changed file with 33 additions and 19 deletions.
52 changes: 33 additions & 19 deletions ra2ce/network/network_simplification/snkit_network_merge_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,16 @@ def filter_node(_node_set: set, _degrees: int) -> set:
successors = list(networkx_graph.successors(_node_id))

# Check the degree of the _node_set and the corresponding criterium.
if ((_degrees == 2 and len(predecessors) == len(successors) == 1) or
(_degrees == 4 and sorted(predecessors) == sorted(successors))):
if (_degrees == 2 and len(predecessors) == len(successors) == 1) or (
_degrees == 4 and sorted(predecessors) == sorted(successors)
):
_filtered.add(_node_id)
return _filtered

def get_edge_paths(node_set: set, _snkit_network: SnkitNetwork) -> list:
def get_adjacency_list(edges_gdf: gpd.GeoDataFrame, from_id_column: str,
to_id_column: str) -> defaultdict:
def get_adjacency_list(
edges_gdf: gpd.GeoDataFrame, from_id_column: str, to_id_column: str
) -> defaultdict:
# Convert the edges of a GeoDataFrame to an adjacency list using vectorized operations.
_edge_dict = defaultdict(set)
# Extract the 'from_id' and 'to_id' columns as numpy arrays for efficient processing
Expand All @@ -133,12 +135,14 @@ def get_adjacency_list(edges_gdf: gpd.GeoDataFrame, from_id_column: str,
def retrieve_edge(node1: int | float, node2: int | float) -> gpd.GeoDataFrame:
"""Retrieve the edge from snkit_network.edges GeoDataFrame between two nodes."""
edge = _snkit_network.edges[
(_snkit_network.edges['from_id'] == node1) &
(_snkit_network.edges['to_id'] == node2)
]
(_snkit_network.edges["from_id"] == node1)
& (_snkit_network.edges["to_id"] == node2)
]
return edge if not edge.empty else None

def construct_path(start_node: int | float, end_node: int | float, intermediate_nodes: list) -> pd.DataFrame | None:
def construct_path(
start_node: int | float, end_node: int | float, intermediate_nodes: list
) -> pd.DataFrame | None:
path = []
current_node = start_node
_intermediates = intermediate_nodes.copy()
Expand All @@ -147,7 +151,9 @@ def construct_path(start_node: int | float, end_node: int | float, intermediate_
# Ensure we go through all the items.
for _ in range(len(intermediate_nodes)):
# Filter out nodes already used for edge retrieval
_to_explore = filter(lambda x: x not in _explored_nodes, intermediate_nodes)
_to_explore = filter(
lambda x: x not in _explored_nodes, intermediate_nodes
)
for _next_node in _to_explore:
_edge = retrieve_edge(current_node, _next_node)
if _edge is not None:
Expand All @@ -163,9 +169,7 @@ def construct_path(start_node: int | float, end_node: int | float, intermediate_
return pd.concat(path) # Combine edges into a single GeoDataFrame
return None

def find_and_append_degree_4_paths(
_edge_paths: list
) -> list:
def find_and_append_degree_4_paths(_edge_paths: list) -> list:
_edge_paths_results = _edge_paths
boundary_nodes = list(node_path - filtered_degree_4_set)
if len(boundary_nodes) == 2:
Expand All @@ -184,7 +188,11 @@ def find_and_append_degree_4_paths(
return _edge_paths_results

# Convert edges to an adjacency list using vectorized operations
edge_dict = get_adjacency_list(edges_gdf=snkit_network.edges, from_id_column="from_id", to_id_column="to_id")
edge_dict = get_adjacency_list(
edges_gdf=snkit_network.edges,
from_id_column="from_id",
to_id_column="to_id",
)
_edge_paths: list = []

# find the edge paths for the nodes in node_set
Expand Down Expand Up @@ -221,9 +229,9 @@ def find_and_append_degree_4_paths(
else:
# node_path has nodes with degree 2 => find the edges connected to the intermediates
edge_paths_gdf = snkit_network.edges[
snkit_network.edges.from_id.isin(intermediates) |
snkit_network.edges.to_id.isin(intermediates)
]
snkit_network.edges.from_id.isin(intermediates)
| snkit_network.edges.to_id.isin(intermediates)
]
_edge_paths.append(edge_paths_gdf)
return _edge_paths

Expand All @@ -238,10 +246,14 @@ def find_and_append_degree_4_paths(
# E.g. filters on 2 in 1->2, 1->5, 2->3, 3->4, 3->5
# For _node_set with all degree 4: Check if the predecessors and successors are the same nodes.
# E.g. filters on 2 in 1->2, 2->3, 2->1, 3->2.
degree_2_set = set(list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 2]))
degree_2_set = set(
list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 2])
)
filtered_degree_2_set = filter_node(degree_2_set, _degrees=2)

degree_4_set = set(list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 4]))
degree_4_set = set(
list(snkit_network.nodes[id_col].loc[snkit_network.nodes.degree == 4])
)
filtered_degree_4_set = filter_node(degree_4_set, _degrees=4)

nodes_of_interest = filtered_degree_2_set | filtered_degree_4_set
Expand All @@ -253,7 +265,9 @@ def find_and_append_degree_4_paths(
~snkit_network.edges["id"].isin(edge_ids_to_update)
]

updated_edges = get_merged_edges(paths_to_group=edge_paths, by=by, aggfunc=aggregate_func, net=snkit_network)
updated_edges = get_merged_edges(
paths_to_group=edge_paths, by=by, aggfunc=aggregate_func, net=snkit_network
)
edges_to_keep = edges_to_keep.drop(columns=["id"])
updated_edges = updated_edges.reset_index(drop=True)

Expand Down

0 comments on commit 8b4b679

Please sign in to comment.