11from collections import defaultdict
2- from typing import List
2+ from typing import Any , List , Optional
33
44import networkx as nx
55import overpy
1616 get_additional_signals ,
1717 get_opposite_edge_pairs ,
1818 get_signal_classification_number ,
19+ get_signal_direction ,
1920 get_signal_function ,
2021 get_signal_kind ,
2122 get_signal_name ,
2223 get_signal_states ,
23- getSignalDirection ,
2424 is_end_node ,
2525 is_same_edge ,
2626 is_signal ,
@@ -35,15 +35,18 @@ def __init__(self):
3535 self .top_nodes : list [OverpyNode ] = []
3636 self .node_data : dict [str , OverpyNode ] = {}
3737 self .ways : dict [str , List [overpy .Way ]] = defaultdict (list )
38- self .paths : dict [str , List [List ]] = defaultdict (list )
38+ self .paths : dict [tuple [ Optional [ Any ], Optional [ Any ]] , List [List ]] = defaultdict (list )
3939 self .api = overpy .Overpass (url = "https://osm.hpi.de/overpass/api/interpreter" )
4040 self .topology = Topology ()
4141
4242 def _get_track_objects (self , polygon : str , railway_option_types : list [str ]):
4343 query_parts = ""
44- for type in railway_option_types :
45- query_parts = query_parts + f'way["railway"="{ type } "](poly: "{ polygon } ");node(w)(poly: "{ polygon } ");'
46- query = f'({ query_parts } );out body;'
44+ for _type in railway_option_types :
45+ query_parts = (
46+ query_parts
47+ + f'way["railway"="{ _type } "](poly: "{ polygon } ");node(w)(poly: "{ polygon } ");'
48+ )
49+ query = f"({ query_parts } );out body;"
4750 print (query )
4851 return self ._query_api (query )
4952
@@ -52,21 +55,21 @@ def _query_api(self, query):
5255 return result
5356
5457 def _build_graph (self , track_objects ):
55- G = nx .Graph ()
58+ graph = nx .Graph ()
5659 for way in track_objects .ways :
5760 previous_node = None
5861 for idx , node_id in enumerate (way ._node_ids ):
5962 try :
6063 node = track_objects .get_node (node_id )
6164 self .node_data [node_id ] = node
6265 self .ways [str (node_id )].append (way )
63- G .add_node (node .id )
66+ graph .add_node (node .id )
6467 if previous_node :
65- G .add_edge (previous_node .id , node .id )
68+ graph .add_edge (previous_node .id , node .id )
6669 previous_node = node
6770 except overpy .exception .DataIncomplete :
6871 continue
69- return G
72+ return graph
7073
7174 def _get_next_top_node (self , node , edge : "tuple[str, str]" , path ):
7275 node_to_id = edge [1 ]
@@ -115,7 +118,7 @@ def _add_signals(self, path, edge: model.Edge, node_before, node_after):
115118 signal_geo_point
116119 ),
117120 side_distance = dist_edge (node_before , node_after , node ),
118- direction = getSignalDirection (
121+ direction = get_signal_direction (
119122 edge , self .ways , path , node .tags ["railway:signal:direction" ]
120123 ),
121124 function = get_signal_function (node ),
@@ -134,8 +137,8 @@ def _get_edge_speed(self, edge: Edge):
134137 common_ways = ways_a .intersection (ways_b )
135138 if len (common_ways ) != 1 :
136139 return None
137- maxspeed = common_ways .pop ().tags .get ("maxspeed" , None )
138- return int (maxspeed ) if maxspeed else None
140+ max_speed = common_ways .pop ().tags .get ("maxspeed" , None )
141+ return int (max_speed ) if max_speed else None
139142
140143 def _should_add_edge (self , node_a : model .Node , node_b : model .Node , path : list [int ]):
141144 edge_not_present = not self .topology .get_edge_by_nodes (node_a , node_b )
@@ -151,7 +154,8 @@ def run(self, polygon, railway_option_types: list[str] = None):
151154 track_objects = self ._get_track_objects (polygon , railway_option_types )
152155 self .graph = self ._build_graph (track_objects )
153156
154- # ToDo: Check whether all edges really link to each other in ORM or if there might be edges missing for nodes that are just a few cm from each other
157+ # ToDo: Check whether all edges really link to each other in ORM or if there might be
158+ # edges missing for nodes that are just a few cm from each other
155159 # Only nodes with max 1 edge or that are a switch can be top nodes
156160 for node_id in self .graph .nodes :
157161 node = self .node_data [node_id ]
@@ -205,7 +209,8 @@ def run(self, polygon, railway_option_types: list[str] = None):
205209 e for e in self .topology .edges .values () if e .node_a == node or e .node_b == node
206210 ]
207211
208- # merge edges, this means removing the switch and allowing only one path for each origin
212+ # merge edges, this means removing the switch and
213+ # allowing only one path for each origin
209214 edge_pair_1 , edge_pair_2 = get_opposite_edge_pairs (connected_edges , node )
210215 new_edge_1 = merge_edges (* edge_pair_1 , node )
211216 new_edge_2 = merge_edges (* edge_pair_2 , node )
@@ -230,8 +235,10 @@ def run(self, polygon, railway_option_types: list[str] = None):
230235 except DataIncomplete :
231236 nodes = way .get_nodes (resolve_missing = True )
232237 for candidate in nodes :
233- # we are only interested in nodes outside the bounding box as every node
234- # that has been previously known was already visited as part of the graph
238+ # we are only interested in nodes outside the
239+ # bounding box as every node that has been
240+ # previously known was already visited as
241+ # part of the graph
235242 if (
236243 candidate .id != int (node .name )
237244 and candidate .id not in self .node_data .keys ()
@@ -250,9 +257,9 @@ def run(self, polygon, railway_option_types: list[str] = None):
250257 break
251258 if not substitute_found :
252259 # if no substitute was found, the third node seems to be inside the bounding box
253- # this can happen when a node is connected to the same node twice (e.g. station on
254- # lines with only one track). WARNING: this produced weird results in the past.
255- # It should be okay to do it after the check above.
260+ # this can happen when a node is connected to the same node twice (e.g.
261+ # station on lines with only one track). WARNING: this produced weird
262+ # results in the past. It should be okay to do it after the check above.
256263 connected_edges = [
257264 e
258265 for e in self .topology .edges .values ()
0 commit comments