-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdijkstra_algorithm.py
190 lines (153 loc) · 6.42 KB
/
dijkstra_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# -*- coding: utf-8 -*-
"""
/***************************************************************************
LeastCostPath
A QGIS plugin
Find the least cost path with given cost raster and points
Generated by Plugin Builder: http://g-sherman.github.io/Qgis-Plugin-Builder/
-------------------
begin : 2018-12-12
copyright : (C) 2018 by FlowMap [email protected]
email : [email protected]
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
__author__ = 'FlowMap [email protected]'
__date__ = '2018-12-12'
__copyright__ = '(C) 2018 by FlowMap [email protected]'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'
from math import sqrt
import queue
import collections
sqrt2 = sqrt(2)
def dijkstra(start_tuple, end_tuples, block, find_nearest, feedback=None):
class Grid:
def __init__(self, matrix):
self.map = matrix
self.h = len(matrix)
self.w = len(matrix[0])
self.manhattan_boundry = None
self.curr_boundry = None
def _in_bounds(self, id):
x, y = id
return 0 <= x < self.h and 0 <= y < self.w
def _passable(self, id):
x, y = id
return self.map[x][y] is not None
def is_valid(self, id):
return self._in_bounds(id) and self._passable(id)
def neighbors(self, id):
x, y = id
results = [(x + 1, y), (x, y - 1), (x - 1, y), (x, y + 1),
(x + 1, y - 1), (x + 1, y + 1), (x - 1, y - 1), (x - 1, y + 1)]
results = list(filter(self.is_valid, results))
return results
@staticmethod
def manhattan_distance(id1, id2):
x1, y1 = id1
x2, y2 = id2
return abs(x1 - x2) + abs(y1 - y2)
@staticmethod
def min_manhattan(curr_node, end_nodes):
return min(map(lambda node: Grid.manhattan_distance(curr_node, node), end_nodes))
@staticmethod
def max_manhattan(curr_node, end_nodes):
return max(map(lambda node: Grid.manhattan_distance(curr_node, node), end_nodes))
@staticmethod
def all_manhattan(curr_node, end_nodes):
return {end_node: Grid.manhattan_distance(curr_node, end_node) for end_node in end_nodes}
def simple_cost(self, cur, nex):
cx, cy = cur
nx, ny = nex
currV = self.map[cx][cy]
offsetV = self.map[nx][ny]
if cx == nx or cy == ny:
return (currV + offsetV) / 2
else:
return sqrt2 * (currV + offsetV) / 2
result = []
grid = Grid(block)
end_dict = collections.defaultdict(list)
for end_tuple in end_tuples:
end_dict[end_tuple[0]].append(end_tuple)
end_row_cols = set(end_dict.keys())
end_row_col_list = list(end_row_cols)
start_row_col = start_tuple[0]
frontier = queue.PriorityQueue()
frontier.put((0, start_row_col))
came_from = {}
cost_so_far = {}
decided = set()
if not grid.is_valid(start_row_col):
return result
# init progress
index = 0
distance_dic = grid.all_manhattan(start_row_col, end_row_cols)
if find_nearest:
total_manhattan = min(distance_dic.values())
else:
total_manhattan = sum(distance_dic.values())
total_manhattan = total_manhattan + 1
bound = total_manhattan
if feedback:
feedback.setProgress(1 + 100 * (1 - bound / total_manhattan))
came_from[start_row_col] = None
cost_so_far[start_row_col] = 0
while not frontier.empty():
_, current_node = frontier.get()
if current_node in decided:
continue
decided.add(current_node)
# update the progress bar
if feedback:
if feedback.isCanceled():
return None
index = (index + 1) % len(end_row_col_list)
target_node = end_row_col_list[index]
new_manhattan = grid.manhattan_distance(current_node, target_node)
if new_manhattan < distance_dic[target_node]:
if find_nearest:
curr_bound = new_manhattan
else:
curr_bound = bound - (distance_dic[target_node] - new_manhattan)
distance_dic[target_node] = new_manhattan
if curr_bound < bound:
bound = curr_bound
if feedback:
feedback.setProgress(1 + 100 * (1 - bound / total_manhattan)*(1 - bound / total_manhattan))
# reacn destination
if current_node in end_row_cols:
path = []
costs = []
traverse_node = current_node
while traverse_node is not None:
path.append(traverse_node)
costs.append(cost_so_far[traverse_node])
traverse_node = came_from[traverse_node]
# start point and end point overlaps
if len(path) == 1:
path.append(start_row_col)
costs.append(0.0)
path.reverse()
costs.reverse()
result.append((path, costs, end_dict[current_node]))
end_row_cols.remove(current_node)
end_row_col_list.remove(current_node)
if len(end_row_cols) == 0 or find_nearest:
break
# relax distance
for nex in grid.neighbors(current_node):
new_cost = cost_so_far[current_node] + grid.simple_cost(current_node, nex)
if nex not in cost_so_far or new_cost < cost_so_far[nex]:
cost_so_far[nex] = new_cost
frontier.put((new_cost, nex))
came_from[nex] = current_node
return result