-
Notifications
You must be signed in to change notification settings - Fork 11
/
speed_info.py
59 lines (49 loc) · 1.88 KB
/
speed_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pandas as pd
import numpy as np
class SpeedInfo:
def __init__(self, file_path, unit_time=60):
'''
Speed information for each road.
You can use your own speed implementation!
:param file_path: Path to speed info data.
:param unit_time: Default is 60. We will use linear interpolation for intermediate values.
'''
data = pd.read_csv(file_path, encoding='euc-kr')
d = data.drop(columns=['일자', '링크아이디', '거리', '차선수']).groupby('도로명')
speed_mean = d.mean()
speed_std = d.std()
road_names = speed_mean.index
speed_mean = speed_mean.values
speed_std = speed_std.values
# set nan data
speed_mean[np.isnan(speed_mean)] = 24
speed_std[np.isnan(speed_std)] = 4
speed_std = speed_std / 1.5
self.speed_mean = speed_mean
self.speed_std = speed_std
self.road_names = road_names
self.road_names_dict = {}
for i, k in enumerate(road_names):
self.road_names_dict[k] = i
self.unit_time = unit_time
self.max_time = 24
def set_speed(self, city):
"""
Set speed information to city's current time.
:param city:
:return:
"""
k1 = city.city_time // self.unit_time
k2 = k1 + 1
r = (city.city_time / self.unit_time) - k1
k1 = k1 % self.max_time
k2 = k2 % self.max_time
# use linear interpolation.
lerped_mean = self.speed_mean[:,k1] * (1 - r) + self.speed_mean[:,k2] * r
lerped_std = self.speed_std[:,k1] * (1 - r) + self.speed_std[:,k2] * r
for road in city.roads:
road_index = road.speed_info_closest_road_index
f = np.random.normal(lerped_mean[road_index], lerped_std[road_index])
f = max(f, 5)
f = min(f, 50)
road.speed = f