-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinference_testset.py
227 lines (190 loc) · 11 KB
/
inference_testset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import shutil
import sys
sys.path.append("Detector/yolov9")
import yaml
import cv2
import torch
import numpy as np
from glob import glob
from tqdm import tqdm
from datetime import datetime
from Detector.yolov9.models.common import DetectMultiBackend, AutoShape
from Extractor.model.feature_extractor import ReIDModel, match_features
from Tracker.tacker import Tracklet, are_directions_opposite
from common.plot_boxes import get_random_color, plot_box_on_img
from common.txt_writer import MOT_TXT, write_txt_by_line
with open('inference_testset.yaml', 'r') as file:
config = yaml.safe_load(file)
config_default = config['Default']
config_detector = config['Detector']
config_extractor = config['Extractor']
config_tracker = config['Tracker']
########################################################################################################################
# Initialize folder setting
FRAME_FOLDER = config_default['FRAME_FOLDER']
RESULT_FOLDER = config_default['RESULT_FOLDER']
os.makedirs(RESULT_FOLDER, exist_ok=True)
EXP_FOLDER = os.path.join(RESULT_FOLDER, datetime.now().strftime('%Y%m%d%H%M%S'))
YAML_LOG_FOLDER = os.path.join(EXP_FOLDER, 'yaml_log_results')
os.makedirs(YAML_LOG_FOLDER, exist_ok=True)
current_file_path = os.path.abspath(__file__)
with open(current_file_path, 'r', encoding='utf-8') as f:
current_file_content = f.read()
with open(os.path.join(YAML_LOG_FOLDER, "codes_record.py"), 'w', encoding='utf-8') as f:
f.write(current_file_content)
if os.path.exists("inference_testset.yaml"):
shutil.copyfile(src="inference_testset.yaml", dst=os.path.join(YAML_LOG_FOLDER, "param_record.yaml"))
########################################################################################################################
# Initialize Detector, Extractor
# [Detector] Initialize YOLO model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if config_detector['ENSEMBLE']: # ensemble model
from Detector.ensemble_detector import DetectMultiBackendEnsemble
weight_list = config_detector['ENSEMBLE_MODEL_LIST']
weight_path_list = config_detector['ENSEMBLE_WEIGHT_LIST']
model = DetectMultiBackendEnsemble(weights_list=weight_list, ensemble_weight_list=weight_path_list, device=device)
else: # single yolov9 model
model = DetectMultiBackend(weights=config_detector['DETECTOR_WEIGHT'], device=device, fuse=True)
model = AutoShape(model)
# [Feature Extractor]
extractor = ReIDModel(trained_weight=config_extractor['EXTRACTOR_WEIGHT'], model_type=config_extractor['EXTRACTOR_TYPE'])
########################################################################################################################
# Prepare inference frames
PATH = r"F:\results\0903_125957_131610"
track_id = 0
folder_list = os.listdir(FRAME_FOLDER)
for i, DATE_TIME in enumerate(tqdm(folder_list)):
TEMP_CROP_FOLDER = os.path.join(EXP_FOLDER, 'crop_results')
os.makedirs(TEMP_CROP_FOLDER, exist_ok=True)
frames = glob(os.path.join(FRAME_FOLDER, DATE_TIME, "*.jpg")) + glob(os.path.join(FRAME_FOLDER, DATE_TIME, "*.png"))
if config_default['WRITE_MOT_TXT']:
MOT_TXT_FOLDER = os.path.join(EXP_FOLDER, 'mot_txt_results')
os.makedirs(MOT_TXT_FOLDER, exist_ok=True)
txt_file = os.path.join(MOT_TXT_FOLDER, f"{DATE_TIME}.txt")
if config_default['SAVE_OUT_VIDEO']:
VIDEO_FOLDER = os.path.join(EXP_FOLDER, 'video_results')
os.makedirs(VIDEO_FOLDER, exist_ok=True)
FPS = config_default['SAVE_OUT_VIDEO_FPS']
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(f"{VIDEO_FOLDER}/{DATE_TIME}.mp4", fourcc, fps=FPS, frameSize=(1280, 720))
# Start Tracking
# Initialize global variables and configurations
frame_id = 1
previous_camera_id = None
for frame_index, frame_path in enumerate(frames):
frame_current = cv2.imread(frame_path)
results = model(frame_current)
frame_name = os.path.basename(frame_path)
camera_id, _ = frame_name.split("_")
if not config_detector['ENSEMBLE']:
results = results.pred[0]
detection_label = []
detection_image = []
for i, det in enumerate(results):
label, confidence, bbox = det[5], det[4], det[:4]
if confidence < config_detector['DETECTOR_CONFIDENCE']:
continue
x1, y1, x2, y2 = map(int, bbox)
class_id = int(label)
if not config_detector['ENSEMBLE']:
detection_label.append([x1, y1, x2, y2, confidence.cpu().item(), class_id])
else:
detection_label.append([x1, y1, x2, y2, confidence, class_id])
cropped_image = frame_current[y1:y2, x1:x2, :]
save_path = os.path.join(TEMP_CROP_FOLDER, f"{os.path.basename(frame_path)[:-4]}_det_{i:04}.png")
cv2.imwrite(save_path, cropped_image)
detection_image.append(save_path)
current_features = extractor.get_features(detection_image) if len(detection_label) != 0 else np.empty((0, 512))
current_detects = np.array(detection_label) if len(detection_label) != 0 else np.empty((0, 6))
if frame_index == 0 or (previous_camera_id is not None and camera_id != previous_camera_id): # Reset track objects
tracks = []
track_id += 1
for detect, feature in zip(current_detects, current_features):
current_box = detect[:4]
init_track = Tracklet(track_id, detect[-1], current_box, feature, get_random_color())
tracks.append(init_track)
frame_current = plot_box_on_img(frame_current, current_box, init_track.track_id, init_track.color)
track_id += 1
cv2.imshow("Test tracking", frame_current)
if config_default['WRITE_MOT_TXT']:
x1, y1, x2, y2 = current_box[0], current_box[1], current_box[2], current_box[3]
mot_txt_line = MOT_TXT(frame_id, init_track.track_id, x1, y1, x2, y2, detect[4])
write_txt_by_line(txt_file, mot_txt_line)
else: # Subsequent frames logic
previous_features = np.array([track.feature for track in tracks])
# Set direction matrix for calculate features matching
direction_matrix = np.zeros((len(previous_features), len(current_features)), dtype=bool)
if len(previous_features) != 0 and len(current_features) != 0:
for prev_idx, prev_track in enumerate(tracks):
for curr_idx, curr_bbox in enumerate(current_detects):
if prev_track.direction_vector is None:
direction_matrix[prev_idx, curr_idx] = True # Allow matching if direction is not determined yet
else:
temp_track = Tracklet(None, None, curr_bbox[:4], None, None)
temp_track.update_direction_vector(curr_bbox[:4])
direction_matrix[prev_idx, curr_idx] = \
not are_directions_opposite(prev_track.direction_vector, temp_track.direction_vector)
matched_indices, similarity_matrix = (
match_features(previous_features, current_features, direction_matrix, config_extractor['EXTRACTOR_THRESHOLD'])
)
used_indices = set()
for prev_idx, curr_idx in matched_indices:
bbox = current_detects[curr_idx][:4]
tracks[prev_idx].update(bbox=bbox, feature=current_features[curr_idx], mode=config_tracker["TRACKER_MOTION_PREDICT"])
used_indices.add(curr_idx)
frame_current = plot_box_on_img(frame_current, bbox, tracks[prev_idx].track_id, tracks[prev_idx].color)
if config_default['WRITE_MOT_TXT']:
x1, y1, x2, y2 = bbox[0], bbox[1], bbox[2], bbox[3]
mot_txt_line = MOT_TXT(frame_id, tracks[prev_idx].track_id, x1, y1, x2, y2, current_detects[curr_idx][4])
write_txt_by_line(txt_file, mot_txt_line)
for i, feature in enumerate(current_features):
if i not in used_indices:
current_box = current_detects[i][:4]
new_track = Tracklet(track_id, current_detects[i][-1], current_box, feature, get_random_color())
tracks.append(new_track)
frame_current = plot_box_on_img(frame_current, current_box, new_track.track_id, new_track.color)
track_id += 1
if config_default['WRITE_MOT_TXT']:
x1, y1, x2, y2 = current_box[0], current_box[1], current_box[2], current_box[3]
mot_txt_line = MOT_TXT(frame_id, new_track.track_id, x1, y1, x2, y2, current_detects[i][4])
write_txt_by_line(txt_file, mot_txt_line)
matched_prev_indices = {prev_idx for prev_idx, curr_idx in matched_indices}
to_remove = []
for i, track in enumerate(tracks):
if track.active:
if i not in matched_prev_indices:
track.increment_unmatched(mode=config_tracker['TRACKER_MOTION_PREDICT'])
if track.unmatched_count > config_tracker['TRACKER_MAX_UNMATCH_FRAME']:
track.active = False
to_remove.append(i)
else:
to_remove.append(i)
for index in sorted(to_remove, reverse=True):
del tracks[index]
elif len(current_features) != 0 and len(previous_features) == 0:
for detect, feature in zip(current_detects, current_features):
current_box = detect[:4]
init_track = Tracklet(track_id, detect[-1], current_box, feature, get_random_color())
tracks.append(init_track)
frame_current = plot_box_on_img(frame_current, current_box, init_track.track_id, init_track.color)
track_id += 1
if config_default['WRITE_MOT_TXT']:
x1, y1, x2, y2 = current_box[0], current_box[1], current_box[2], current_box[3]
mot_txt_line = MOT_TXT(frame_id, init_track.track_id, x1, y1, x2, y2, detect[4])
write_txt_by_line(txt_file, mot_txt_line)
elif len(current_features) == 0 and len(previous_features) != 0:
tracks = []
track_id += 1
previous_camera_id = camera_id
cv2.imshow("Test tracking", frame_current)
if config_default['SAVE_OUT_VIDEO']:
writer.write(frame_current)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_id += 1
if os.path.exists(TEMP_CROP_FOLDER):
shutil.rmtree(TEMP_CROP_FOLDER)
cv2.destroyAllWindows()
if config_default['SAVE_OUT_VIDEO']:
writer.release()