diff --git a/Control/__pycache__/controller.cpython-38.pyc b/Control/__pycache__/controller.cpython-38.pyc new file mode 100644 index 0000000..c7cfb62 Binary files /dev/null and b/Control/__pycache__/controller.cpython-38.pyc differ diff --git a/Control/__pycache__/drive.cpython-38.pyc b/Control/__pycache__/drive.cpython-38.pyc new file mode 100644 index 0000000..bbfad73 Binary files /dev/null and b/Control/__pycache__/drive.cpython-38.pyc differ diff --git a/Control/controllers/MPC_controller.py b/Control/controllers/MPC_controller.py new file mode 100644 index 0000000..b6bcf68 --- /dev/null +++ b/Control/controllers/MPC_controller.py @@ -0,0 +1,259 @@ +import matplotlib.pyplot as plt +import numpy as np +from math import * +from cvxopt import matrix, solvers + + +class MPC: + def __init__(self): + self.Np = 60 # 预测步长 + self.Nc = 60 # 控制步长 + + self.dt = 0.1 # 时间间隔 + self.Length = 1.0 # 车辆轴距 + + max_steer = 30 * pi / 180 # 最大方向盘打角 + max_steer_v = 15 * pi / 180 # 最大方向盘打角速度 + max_v = 8.7 # 最大车速 + max_a = 1.0 # 最大加速度 + + # 目标函数相关矩阵 + self.Q = 50 * np.identity(3 * self.Np) # 位姿权重 + self.R = 100 * np.identity(2 * self.Nc) # 控制权重 + + self.kesi = np.zeros((5, 1)) + + self.A = np.identity(5) + + self.B = np.block([ + [np.zeros((3, 2))], + [np.identity(2)] + ]) + + self.C = np.block([ + [np.identity(3), np.zeros((3, 2))] + ]) + + self.PHI = np.zeros((3 * self.Np, 5)) + self.THETA = np.zeros((3 * self.Np, 2 * self.Nc)) + + self.CA = (self.Np + 1) * [self.C] + + self.H = np.zeros((2 * self.Nc, 2 * self.Nc)) + + self.f = np.zeros((2 * self.Nc, 1)) + + # 不等式约束相关矩阵 + A_t = np.zeros((self.Nc, self.Nc)) + for p in range(self.Nc): + for q in range(p + 1): + A_t[p][q] = 1 + + A_I = np.kron(A_t, np.identity(2)) + + # 控制量约束 + umin = np.array([[-max_v], [-max_steer]]) + umax = np.array([[max_v], [max_steer]]) + self.Umin = np.kron(np.ones((self.Nc, 1)), umin) + self.Umax = np.kron(np.ones((self.Nc, 1)), umax) + + # 控制增量约束 + delta_umin = np.array([[-max_a * self.dt], [-max_steer_v * self.dt]]) + delta_umax = np.array([[max_a * self.dt], [max_steer_v * self.dt]]) + delta_Umin = np.kron(np.ones((self.Nc, 1)), delta_umin) + delta_Umax = np.kron(np.ones((self.Nc, 1)), delta_umax) + + self.A_cons = np.zeros((2 * 2 * self.Nc, 2 * self.Nc)) + self.A_cons[0:2 * self.Nc, 0:2 * self.Nc] = A_I + self.A_cons[2 * self.Nc:4 * self.Nc, 0:2 * self.Nc] = np.identity(2 * self.Nc) + + self.lb_cons = np.zeros((2 * 2 * self.Nc, 1)) + self.lb_cons[2 * self.Nc:4 * self.Nc, 0:1] = delta_Umin + + self.ub_cons = np.zeros((2 * 2 * self.Nc, 1)) + self.ub_cons[2 * self.Nc:4 * self.Nc, 0:1] = delta_Umax + + def mpcControl(self, x, y, yaw, v, angle, tar_x, tar_y, tar_yaw, tar_v, tar_angle): # mpc优化控制 + T = self.dt + L = self.Length + + # 更新误差 + self.kesi[0][0] = x - tar_x + self.kesi[1][0] = y - tar_y + self.kesi[2][0] = self.normalizeTheta(yaw - tar_yaw) + self.kesi[3][0] = v - tar_v + self.kesi[4][0] = angle - tar_angle + + # 更新A矩阵 + self.A[0][2] = -tar_v * sin(tar_yaw) * T + self.A[0][3] = cos(tar_yaw) * T + self.A[1][2] = tar_v * cos(tar_yaw) * T + self.A[1][3] = sin(tar_yaw) * T + self.A[2][3] = tan(tar_angle) * T / L + self.A[2][4] = tar_v * T / (L * (cos(tar_angle) ** 2)) + + # 更新B矩阵 + self.B[0][0] = cos(tar_yaw) * T + self.B[1][0] = sin(tar_yaw) * T + self.B[2][0] = tan(tar_angle) * T / L + self.B[2][1] = tar_v * T / (L * (cos(tar_angle) ** 2)) + + # 更新CA + for i in range(1, self.Np + 1): + self.CA[i] = np.dot(self.CA[i - 1], self.A) + + # 更新PHI和THETA + for j in range(self.Np): + self.PHI[3 * j:3 * (j + 1), 0:5] = self.CA[j + 1] + for k in range(min(self.Nc, j + 1)): + self.THETA[3 * j:3 * (j + 1), 2 * k: 2 * (k + 1) + ] = np.dot(self.CA[j - k], self.B) + + # 更新H + self.H = np.dot(np.dot(self.THETA.transpose(), self.Q), + self.THETA) + self.R + + # 更新f + self.f = 2 * np.dot(np.dot(self.THETA.transpose(), self.Q), + np.dot(self.PHI, self.kesi)) + + # 更新约束 + Ut = np.kron(np.ones((self.Nc, 1)), np.array([[v], [angle]])) + self.lb_cons[0:2 * self.Nc, 0:1] = self.Umin - Ut + self.ub_cons[0:2 * self.Nc, 0:1] = self.Umax - Ut + + # 求解QP + P = matrix(self.H) + q = matrix(self.f) + G = matrix(np.block([ + [self.A_cons], + [-self.A_cons] + ])) + h = matrix(np.block([ + [self.ub_cons], + [-self.lb_cons] + ])) + + solvers.options['show_progress'] = False + sol = solvers.qp(P, q, G, h) + X = sol['x'] + + # 输出结果 + v += X[0] + angle += X[1] + + return v, angle + + def normalizeTheta(self, angle): # 角度归一化 + while (angle >= pi): + angle -= 2 * pi + + while (angle < -pi): + angle += 2 * pi + + return angle + + def findIdx(self, x, y, cx, cy): # 寻找欧式距离最近的点 + min_dis = float('inf') + idx = 0 + + for i in range(len(cx)): + dx = x - cx[i] + dy = y - cy[i] + dis = dx ** 2 + dy ** 2 + if (dis < min_dis): + min_dis = dis + idx = i + + return idx + + def update(self, x, y, yaw, v, angle): # 模拟车辆位置 + x += v * cos(yaw) * self.dt + y += v * sin(yaw) * self.dt + yaw += v / self.Length * tan(angle) * self.dt + + return x, y, yaw + +import time +if __name__ == '__main__': + cx = np.linspace(0, 200, 20) + cy = np.zeros(len(cx)) + dx = np.zeros(len(cx)) + ddx = np.zeros(len(cy)) + cyaw = np.zeros(len(cx)) + ck = np.zeros(len(cx)) + + for i in range(len(cx)): + cy[i] = cos(cx[i] / 10) * cx[i] / 10 + + # 计算一阶导数 + for i in range(len(cx) - 1): + dx[i] = (cy[i + 1] - cy[i]) / (cx[i + 1] - cx[i]) + dx[len(cx) - 1] = dx[len(cx) - 2] + + # 计算二阶导数 + for i in range(len(cx) - 2): + ddx[i] = (cy[i + 2] - 2 * cy[i + 1] + cy[i]) / (0.5 * (cx[i + 2] - cx[i])) ** 2 + ddx[len(cx) - 2] = ddx[len(cx) - 3] + ddx[len(cx) - 1] = ddx[len(cx) - 2] + + # 计算偏航角 + for i in range(len(cx)): + cyaw[i] = atan(dx[i]) + + # 计算曲率 + for i in range(len(cx)): + ck[i] = ddx[i] / (1 + dx[i] ** 2) ** 1.5 + + # 初始状态 + x = 0.0 + y = 5.0 + yaw = 0.0 + v = 0.0 + angle = 0.0 + t = 0 + + # 历史状态 + xs = [x] + ys = [y] + vs = [v] + angles = [angle] + ts = [t] + + # 实例化 + mpc = MPC() + + while (1): + time_start = time.time() + idx = mpc.findIdx(x, y, cx, cy) + if (idx == len(cx) - 1): + break + + tar_v = 30.0 / 3.6 + tar_angle = atan(mpc.Length * ck[idx]) + + (v, angle) = mpc.mpcControl(x, y, yaw, v, angle, + cx[idx], cy[idx], cyaw[idx], tar_v, tar_angle) + + (x, y, yaw) = mpc.update(x, y, yaw, v, angle) + print(time.time() - time_start) + # 保存状态 + xs.append(x) + ys.append(y) + vs.append(v) + angles.append(angle) + t = t + 0.1 + ts.append(t) + + # 显示 + plt.plot(cx, cy) + plt.scatter(xs, ys, c='r', marker='*') + plt.pause(0.01) # 暂停0.01秒 + plt.clf() + + plt.close() + plt.subplot(2, 1, 1) + plt.plot(ts, vs) + plt.subplot(2, 1, 2) + plt.plot(ts, angles) + plt.show() \ No newline at end of file diff --git a/Control/controllers/PID_controller.py b/Control/controllers/PID_controller.py new file mode 100644 index 0000000..ca46c24 --- /dev/null +++ b/Control/controllers/PID_controller.py @@ -0,0 +1,28 @@ +import numpy as np +import copy + + +class PID: + def __init__(self, kp, ki, kd): + self.kp = kp + self.ki = ki + self.kd = kd + self.ep = 0.0 + self.ei = 0.0 + self.ed = 0.0 + self.dt = 0.1 + + def update_e(self, e): + self.ed = e - self.ep + self.ei += e + self.ep = copy.deepcopy(e) + + def get_u(self): + u = self.kp * self.ep + self.ki * self.ei + self.kd * self.ed + if u > np.pi / 6: u = np.pi / 6 + if u < -np.pi / 6: u = -np.pi / 6 + return u + + def get_a(self): + a = self.kp * self.ep + self.ki * self.ei + self.kd * self.ed + return -a \ No newline at end of file diff --git a/Control/controllers/__pycache__/PID_controller.cpython-38.pyc b/Control/controllers/__pycache__/PID_controller.cpython-38.pyc new file mode 100644 index 0000000..ee4ddc9 Binary files /dev/null and b/Control/controllers/__pycache__/PID_controller.cpython-38.pyc differ diff --git a/Control/controllers/__pycache__/cubic_spline.cpython-38.pyc b/Control/controllers/__pycache__/cubic_spline.cpython-38.pyc new file mode 100644 index 0000000..3b1cd37 Binary files /dev/null and b/Control/controllers/__pycache__/cubic_spline.cpython-38.pyc differ diff --git a/Control/controllers/__pycache__/fuzzy_controller.cpython-38.pyc b/Control/controllers/__pycache__/fuzzy_controller.cpython-38.pyc new file mode 100644 index 0000000..2b712ad Binary files /dev/null and b/Control/controllers/__pycache__/fuzzy_controller.cpython-38.pyc differ diff --git a/Control/controllers/__pycache__/purepursuit_controller.cpython-38.pyc b/Control/controllers/__pycache__/purepursuit_controller.cpython-38.pyc new file mode 100644 index 0000000..9fa2511 Binary files /dev/null and b/Control/controllers/__pycache__/purepursuit_controller.cpython-38.pyc differ diff --git a/Control/controllers/cubic_spline.py b/Control/controllers/cubic_spline.py new file mode 100644 index 0000000..c6fedb3 --- /dev/null +++ b/Control/controllers/cubic_spline.py @@ -0,0 +1,182 @@ +import math +import numpy as np +import bisect + + +class Spline: + u""" + Cubic Spline class + """ + + def __init__(self, x, y): + self.b, self.c, self.d, self.w = [], [], [], [] + + self.x = x + self.y = y + + self.nx = len(x) # dimension of x + h = np.diff(x) + + # calc coefficient c + self.a = [iy for iy in y] + + # calc coefficient c + A = self.__calc_A(h) + B = self.__calc_B(h) + self.c = np.linalg.solve(A, B) + # print(self.c1) + + # calc spline coefficient b and d + for i in range(self.nx - 1): + self.d.append((self.c[i + 1] - self.c[i]) / (3.0 * h[i])) + tb = (self.a[i + 1] - self.a[i]) / h[i] - h[i] * \ + (self.c[i + 1] + 2.0 * self.c[i]) / 3.0 + self.b.append(tb) + + def calc(self, t): + u""" + Calc position + if t is outside of the input x, return None + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = self.a[i] + self.b[i] * dx + \ + self.c[i] * dx ** 2.0 + self.d[i] * dx ** 3.0 + + return result + + def calcd(self, t): + u""" + Calc first derivative + if t is outside of the input x, return None + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = self.b[i] + 2.0 * self.c[i] * dx + 3.0 * self.d[i] * dx ** 2.0 + return result + + def calcdd(self, t): + u""" + Calc second derivative + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = 2.0 * self.c[i] + 6.0 * self.d[i] * dx + return result + + def __search_index(self, x): + u""" + search data segment index + """ + return bisect.bisect(self.x, x) - 1 + + def __calc_A(self, h): + u""" + calc matrix A for spline coefficient c + """ + A = np.zeros((self.nx, self.nx)) + A[0, 0] = 1.0 + for i in range(self.nx - 1): + if i != (self.nx - 2): + A[i + 1, i + 1] = 2.0 * (h[i] + h[i + 1]) + A[i + 1, i] = h[i] + A[i, i + 1] = h[i] + + A[0, 1] = 0.0 + A[self.nx - 1, self.nx - 2] = 0.0 + A[self.nx - 1, self.nx - 1] = 1.0 + # print(A) + return A + + def __calc_B(self, h): + u""" + calc matrix B for spline coefficient c + """ + B = np.zeros(self.nx) + for i in range(self.nx - 2): + B[i + 1] = 3.0 * (self.a[i + 2] - self.a[i + 1]) / \ + h[i + 1] - 3.0 * (self.a[i + 1] - self.a[i]) / h[i] + # print(B) + return B + + +class Spline2D: + u""" + 2D Cubic Spline class + """ + + def __init__(self, x, y): + self.s = self.__calc_s(x, y) + self.sx = Spline(self.s, x) + self.sy = Spline(self.s, y) + + def __calc_s(self, x, y): + dx = np.diff(x) + dy = np.diff(y) + self.ds = [math.sqrt(idx ** 2 + idy ** 2) + for (idx, idy) in zip(dx, dy)] + s = [0] + s.extend(np.cumsum(self.ds)) + return s + + def calc_position(self, s): + u""" + calc position + """ + x = self.sx.calc(s) + y = self.sy.calc(s) + + return x, y + + def calc_curvature(self, s): + u""" + calc curvature + """ + dx = self.sx.calcd(s) + ddx = self.sx.calcdd(s) + dy = self.sy.calcd(s) + ddy = self.sy.calcdd(s) + k = (ddy * dx - ddx * dy) / (dx ** 2 + dy ** 2) + return k + + def calc_yaw(self, s): + u""" + calc yaw + """ + dx = self.sx.calcd(s) + dy = self.sy.calcd(s) + yaw = math.atan2(dy, dx) + return yaw + + +def calc_spline_course(x, y, ds=0.1): + sp = Spline2D(x, y) + s = list(np.arange(0, sp.s[-1], ds)) + + rx, ry, ryaw, rk = [], [], [], [] + for i_s in s: + ix, iy = sp.calc_position(i_s) + rx.append(ix) + ry.append(iy) + ryaw.append(sp.calc_yaw(i_s)) + rk.append(sp.calc_curvature(i_s)) + + return rx, ry, ryaw, rk, s diff --git a/Control/controllers/fuzzy_controller.py b/Control/controllers/fuzzy_controller.py new file mode 100644 index 0000000..75369df --- /dev/null +++ b/Control/controllers/fuzzy_controller.py @@ -0,0 +1,110 @@ +import numpy as np +import skfuzzy as fuzz +import skfuzzy.control as ctrl + + +def fuzzy_initialization(): + # 定义模糊范围 + dspeed_range = np.arange(-30, 30, 1, np.float32) # 速度差 + distance_range = np.arange(0, 200, 2, np.float32) # 距离差 + acc_range = np.arange(0.2, 0.8, 0.01, np.float32) # 输出加速度差 + # 创建模糊控制变量 + dspeed = ctrl.Antecedent(dspeed_range, 'dspeed') + distance = ctrl.Antecedent(distance_range, 'distance') + acc = ctrl.Consequent(acc_range, 'acc') + # 定义模糊集和其隶属度函数 + dspeed['NL'] = fuzz.trimf(dspeed_range, [-30, -20, -10]) + dspeed['NB'] = fuzz.trimf(dspeed_range, [-20, -15, -5]) + dspeed['NS'] = fuzz.trimf(dspeed_range, [-10, -5, 0]) + dspeed['0'] = fuzz.trimf(dspeed_range, [-1, 0, 1]) + dspeed['PS'] = fuzz.trimf(dspeed_range, [0, 5, 10]) + dspeed['PB'] = fuzz.trimf(dspeed_range, [5, 10, 20]) + dspeed['PL'] = fuzz.trimf(dspeed_range, [10, 20, 30]) + + distance['NL'] = fuzz.trimf(distance_range, [50, 60, 70]) + distance['NB'] = fuzz.trimf(distance_range, [65, 70, 75]) + distance['NS'] = fuzz.trimf(distance_range, [70, 75, 80]) + distance['0'] = fuzz.trimf(distance_range, [79, 80, 81]) + distance['PS'] = fuzz.trimf(distance_range, [80, 90, 100]) + distance['PB'] = fuzz.trimf(distance_range, [90, 120, 140]) + distance['PL'] = fuzz.trimf(distance_range, [130, 150, 200]) + + acc['NL'] = fuzz.trimf(acc_range, [0.7, 0.75, 0.8]) + acc['NB'] = fuzz.trimf(acc_range, [0.6, 0.7, 0.75]) + acc['NS'] = fuzz.trimf(acc_range, [0.5, 0.55, 0.65]) + acc['0'] = fuzz.trimf(acc_range, [0.48, 0.5, 0.52]) + acc['PS'] = fuzz.trimf(acc_range, [0.35, 0.45, 0.5]) + acc['PB'] = fuzz.trimf(acc_range, [0.25, 0.3, 0.4]) + acc['PL'] = fuzz.trimf(acc_range, [0.2, 0.25, 0.3]) + + acc.defuzzify_method = 'centroid' # 设定输出powder的解模糊方法——质心解模糊方式 + # 输出规则 + ruleNL = ctrl.Rule(antecedent=((dspeed['PB'] & distance['NL']) | + (dspeed['PL'] & distance['NL']) | + (dspeed['PL'] & distance['NB'])), + consequent=acc['NL'], label='rule NL') + ruleNB = ctrl.Rule(antecedent=((dspeed['0'] & distance['NL']) | + (dspeed['PS'] & distance['NL']) | + (dspeed['PS'] & distance['NB']) | + (dspeed['PB'] & distance['NB']) | + (dspeed['PB'] & distance['NS']) | + (dspeed['PL'] & distance['NS']) | + (dspeed['PL'] & distance['0'])), + consequent=acc['NB'], label='rule NB') + ruleNS = ctrl.Rule(antecedent=((dspeed['NB'] & distance['NL']) | + (dspeed['NS'] & distance['NL']) | + (dspeed['0'] & distance['NB']) | + (dspeed['PS'] & distance['NS']) | + (dspeed['PB'] & distance['0']) | + (dspeed['PB'] & distance['PS']) | + (dspeed['PL'] & distance['PS']) | + (dspeed['PL'] & distance['PB'])), + consequent=acc['NS'], label='rule NS') + rule0 = ctrl.Rule(antecedent=((dspeed['NL'] & distance['NL']) | + (dspeed['NB'] & distance['NB']) | + (dspeed['NS'] & distance['NB']) | + (dspeed['NS'] & distance['NS']) | + (dspeed['0'] & distance['NS']) | + (dspeed['0'] & distance['0']) | + (dspeed['PS'] & distance['0']) | + (dspeed['PS'] & distance['PS']) | + (dspeed['PB'] & distance['PB']) | + (dspeed['PL'] & distance['PL'])), + consequent=acc['0'], label='rule 0') + rulePS = ctrl.Rule(antecedent=((dspeed['NL'] & distance['NB']) | + (dspeed['NL'] & distance['NS']) | + (dspeed['NB'] & distance['NS']) | + (dspeed['NB'] & distance['0']) | + (dspeed['NS'] & distance['0']) | + (dspeed['NS'] & distance['PS']) | + (dspeed['0'] & distance['PS']) | + (dspeed['0'] & distance['PB']) | + (dspeed['PS'] & distance['PB']) | + (dspeed['PS'] & distance['PL']) | + (dspeed['PB'] & distance['PL'])), + consequent=acc['PS'], label='rule PS') + rulePB = ctrl.Rule(antecedent=((dspeed['NL'] & distance['0']) | + (dspeed['NB'] & distance['PS']) | + (dspeed['NB'] & distance['PB']) | + (dspeed['NS'] & distance['PB']) | + (dspeed['NS'] & distance['PL']) | + (dspeed['0'] & distance['PL'])), + consequent=acc['PB'], label='rule PB') + rulePL = ctrl.Rule(antecedent=((dspeed['NL'] & distance['NS']) | + (dspeed['NL'] & distance['PB']) | + (dspeed['NL'] & distance['PL']) | + (dspeed['NB'] & distance['PL'])), + consequent=acc['PL'], label='rule PL') + # 系统和运行环境初始化 + system = ctrl.ControlSystem( + rules=[ruleNL, ruleNB, ruleNS, rule0, rulePS, rulePB, ruleNL]) + fuzzy_system = ctrl.ControlSystemSimulation(system) + return fuzzy_system + + +def fuzzy_compute(fuzzy_system, dspeed, distance): + fuzzy_system.input['dspeed'] = dspeed + fuzzy_system.input['distance'] = distance + fuzzy_system.compute() + acc = fuzzy_system.output['acc'] + return acc diff --git a/Control/controllers/purepursuit_controller.py b/Control/controllers/purepursuit_controller.py new file mode 100644 index 0000000..e6f7141 --- /dev/null +++ b/Control/controllers/purepursuit_controller.py @@ -0,0 +1,12 @@ +import numpy as np +import math + + +def Purepursuit(truck, nav_line): + robot_state = np.zeros(2) # 定义车辆位置 + robot_state[0] = truck.x + robot_state[1] = truck.y+60 # 后轴中心位置 + dx, dy = np.average(nav_line[-12:], axis=0) - robot_state + alpha = np.arctan(-dx/dy) # dx, dy夹角 + ang = math.atan2(2.0 * truck.bev_l * np.sin(alpha), truck.ld) + 0.5 # pure pursuit controller + return ang \ No newline at end of file diff --git a/Control/drive.py b/Control/drive.py new file mode 100644 index 0000000..6510070 --- /dev/null +++ b/Control/drive.py @@ -0,0 +1,53 @@ +import pyvjoy +import numpy as np + +global j +j = pyvjoy.VJoyDevice(1) + + +class Truck: # 阿克曼转向模型 + def __init__(self): # L:wheel base + self.x = 400 # X + self.y = 475 # Y + self.theta = 0 # 车辆朝向与轨迹夹角 + self.dx = 0 + self.dy = 0 + self.dtheta = 0 + self.ang = 0 + self.acc = 0 + self.l = 6.0 # 轴距 + self.bev_l = 60 # bev下轴距 + self.speed = 0 # 速度 + self.bev_speed = 0 # bev下的速度 + self.dv = 0 # 速度变量 + self.L = 6.0 # 车辆全长 + self.k = 1.0 + self.ld = self.k * self.bev_speed + self.L # 后轮到观察点距离 + self.dt = 0.2 # 决策间隔时间 + self.delta = 0 # 车辆与轨迹夹角 + + def update(self, ang, acc, refer_time, speed): # update ugv's state + self.ang = ang + self.acc = acc + self.dv = speed - self.speed + self.speed = speed + self.bev_speed = self.speed / 3.6 * 6 + self.ld = self.k * self.bev_speed + self.bev_l + self.dt = refer_time # 更新推理时间 + self.dx = self.speed * np.cos(self.theta) * self.dt + self.dy = self.speed * np.sin(self.theta) * self.dt + self.dtheta = self.speed * np.tan(self.ang) / self.l * self.dt + + +def driver(ang, acc): + j.data.wAxisX = int(ang * 32767) + j.data.wAxisY = int(acc * 32767) + j.data.wAxisZ = 0 + j.update() + + +def end(): + j.data.wAxisX = int(0.5 * 32767) + j.data.wAxisY = int(0.5 * 32767) + j.data.wAxisZ = 0 + j.update() diff --git a/Navigation/BevDraw.py b/Navigation/BevDraw.py new file mode 100644 index 0000000..443fa30 --- /dev/null +++ b/Navigation/BevDraw.py @@ -0,0 +1,115 @@ +import cv2 +import numpy as np + + +def draw_bev(nav_line, obstacles, traffic_light, bev_lanes, stop_line): # 处理nav及bev可视化的主函数 + bevmap = np.ones((600, 800)) * 75 + bevmap = draw_ruler(bevmap) + bevmap = draw_nav_line(bevmap, nav_line.pts) + bevmap = cv2.cvtColor(np.uint8(bevmap), cv2.COLOR_GRAY2BGR) + bevmap = display_bev_lanes(bevmap, bev_lanes, stop_line, width=2) + bevmap = display_objects(bevmap, obstacles, traffic_light) + return bevmap + +def draw_ruler(bevmap): + bevmap = cv2.line(bevmap, (500, 470), (500, 200), [255, 255, 255], thickness=1, lineType=4) + for x in range(200, 470, 30): + bevmap = cv2.line(bevmap, (495, x), (500, x), [255, 255, 255], thickness=1, lineType=4) + return bevmap + + +def draw_nav_line(bevmap, nav_pts): # 将导航线绘制在BEV + if len(nav_pts): + for nav_pt in nav_pts: + cv2.circle(bevmap, [int(nav_pt[0]), int(nav_pt[1])], radius=1, color=(0, 0, 0), thickness=-1) + return bevmap + + +def display_objects(bevmap, obstacles, traffic_light): + bevmap = cv2.rectangle(bevmap, (394, 470), (406, 530), [255, 255, 255], thickness=-1, lineType=4) # 卡车 + if obstacles is not None: + for obstacle in obstacles: + pos = obstacle.position + x_middle = int(pos[0]+pos[2]/2) + y_middle = int(pos[1]+pos[3]/2) + speed = obstacle.speed + if np.abs(x_middle-400) >= 50 or y_middle <= 250: + continue + obstacle_color = [int(pos[4]*125), int(pos[4]*125), int(pos[4]*125)] + if obstacle.lane == 0: + obstacle_color = [0, 0, 0] + cv2.putText(bevmap, '{:^.1f}'.format(obstacle.speed[1]), (int(pos[0]), int(pos[1]-1)), + fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.4, color=(0, 0, 0), thickness=1) + cv2.putText(bevmap, '{:^.1f}'.format(470 - (pos[1] + pos[3])), (int(pos[0]), int(pos[1] + pos[3] + 10)), + fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.4, color=(0, 0, 0), thickness=1) + elif obstacle.lane == -1: + obstacle_color = [0, 50, 0] + elif obstacle.lane == 1: + obstacle_color = [0, 0, 50] + bevmap = cv2.rectangle(bevmap, (int(pos[0]), int(pos[1])), (int(pos[0]+8), int(pos[1]+pos[3])), obstacle_color, thickness=-1, lineType=4) + bevmap = cv2.line(bevmap, (int(x_middle), int(y_middle)), (int(x_middle+speed[0]*0.4), int(y_middle+speed[1]*0.4)), [150, 150, 150], thickness=2, lineType=4) # 卡车宽度变化因为识别为小汽车 + + if traffic_light is not None: # 绘制红绿灯 + distance = traffic_light.distance + tlcolor = traffic_light.tlcolor + if tlcolor == 0: # 绿灯 + bevmap = cv2.circle(bevmap, (420, int(470 - distance)), 5, (0, 125, 0), -1) + elif tlcolor == 1: # 红灯 + bevmap = cv2.circle(bevmap, (420, int(470 - distance)), 5, (0, 0, 125), -1) + elif tlcolor == 2: # 黄灯 + bevmap = cv2.circle(bevmap, (420, int(470 - distance)), 5, (0, 125, 125), -1) + + return bevmap + + +def display_bev_lanes(bevmap, bev_lanes, stop_line, width=2): + if len(bev_lanes) >= 1: + for bev_lane in bev_lanes: + bev_lane_pts = bev_lane.pts + if bev_lane.position_type in [-1.0, 0.0, 1.0]: # 临近车道线设为重点 + width = 2 + else: + width = 1 + for i in range(1, len(bev_lane_pts)): + cv2.line(bevmap, bev_lane_pts[i - 1], bev_lane_pts[i], [200, 200, 200], thickness=width) + ''' + if bev_lane.cls == 1: # 画实线 + for i in range(1, len(bev_lane_pts)): + cv2.line(bevmap, bev_lane_pts[i - 1], bev_lane_pts[i], [200, 200, 200], thickness=width) + else: # 画虚线 + #time_now = int(time.time()*100) % 10 # 移动效果,没鸟用 + for i in range(1, len(bev_lane_pts)): + if i%5 == 1 or i%5 == 2: + continue + cv2.line(bevmap, bev_lane_pts[i - 1], bev_lane_pts[i], [200, 200, 200], thickness=width) + ''' + + if stop_line is not None: + stop_line_y = stop_line.center_y + length = stop_line.length + cv2.line(bevmap, [int(400-length/2), int(470-stop_line_y)], [int(400+length/2), int(470-stop_line_y)], [255, 255, 255], thickness=2) + + return bevmap + +def print_info(bevmap, refer_time, truck, speed_limit, state, weather): + cv2.putText(bevmap, 'Infer time: {:.4f}s'.format(refer_time), (10, 15), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(200, 200, 200), thickness=1) + cv2.putText(bevmap, "Steer angle: {:.2f}".format((truck.ang - 0.5) * 180), (10, 30), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + cv2.putText(bevmap, "speed limit: {}".format(speed_limit), (10, 45), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + cv2.putText(bevmap, "speed: {}".format(truck.speed), (10, 60), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + cv2.putText(bevmap, "power: {:.1f}".format((1-truck.acc)*10), (10, 75), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + + if state is not None: + cv2.putText(bevmap, state, (10, 90), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + + cv2.putText(bevmap, "weather: {}".format(weather), (10, 105), fontFace=cv2.FONT_HERSHEY_SIMPLEX, + fontScale=0.5, color=(255, 255, 255), thickness=1) + + return bevmap \ No newline at end of file diff --git a/Navigation/Navigation_Process.py b/Navigation/Navigation_Process.py new file mode 100644 index 0000000..93ab71c --- /dev/null +++ b/Navigation/Navigation_Process.py @@ -0,0 +1,89 @@ +import cv2 +import numpy as np + +class Nav_Line: # 封装导航线信息 + def __init__(self, fit, pts, pts_x, pts_y): + self.fit = fit + self.pts = pts + self.pts_x = pts_x + self.pts_y = pts_y + +def nav_process(nav): + bev_nav = nav2bev(nav) + nav_line = get_nav_line(bev_nav) + curve = np.abs(nav_line.fit[0]) + if curve <= 0.0005: + curve_speed_limit = 80 + elif curve > 0.0005 and curve <= 0.001: + curve_speed_limit = 75 + elif curve > 0.001 and curve <= 0.002: + curve_speed_limit = 65 + elif curve > 0.002 and curve <= 0.004: + curve_speed_limit = 55 + elif curve > 0.004 and curve <= 0.005: + curve_speed_limit = 40 + elif curve > 0.005: + curve_speed_limit = 20 + elif curve == 0: + curve_speed_limit = 30 + return nav_line, curve_speed_limit + +def nav2bev(map): # 将导航地图转化为bev画布 + pts_src = np.float32([ + [ 0, 0], [200, 0], + [ 0, 130], [200, 130], ]) + + pts_dst = np.float32([ + [ 0, 0], [800, 0], + [300, 600], [500, 600], ]) + h = cv2.getPerspectiveTransform(pts_src, pts_dst) + map = cv2.warpPerspective(map, h, (800, 600)) + map = filter_out_red(map) + return map + +def filter_out_red(img): + img_blue = img[:, :, 0] + _, mask_blue = cv2.threshold(img_blue, 180, 125, cv2.THRESH_BINARY) + img_red = img[:, :, 2] + _, mask_red = cv2.threshold(img_red, 100, 125, cv2.THRESH_BINARY_INV) + mask = mask_blue + mask_red + _, mask = cv2.threshold(mask, 200, 255, cv2.THRESH_BINARY) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (4, 4)) + mask = cv2.erode(mask, kernel, iterations=1) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (8, 8)) + mask = cv2.dilate(mask, kernel, iterations=6) + mask = cv2.Canny(mask, 50, 150) + _, mask = cv2.threshold(mask, 180, 180, cv2.THRESH_BINARY) + mask = mask+75 + return mask + +def get_nav_line(img): + middle_pts = [] + x = 400 + for i in range(540, 340, -5): + for j in range(x, 0, -1): + if img[i, j] == 255: + break + for k in range(x, 800, 1): + if img[i, k] == 255: + break + if j == 0 or j == 799 or k == 0 or k == 799: + break + x = int((j+k)/2) + middle_pts.append([x, i]) + middle_pts = np.array(middle_pts) # 路线中心线 + if len(middle_pts): + fit = np.polyfit(np.array([i[1] for i in middle_pts]), np.array([i[0] for i in middle_pts]), 2) + pts_y = np.linspace(300, 470, 50) + pts_x = fit[0] * pts_y ** 2 + fit[1] * pts_y + fit[2] + else: + pts_x = [400] + pts_y = [470] + fit = [0, 0, 0] + nav_pts = [] + if len(pts_y): + for pt_x, pt_y in zip(pts_x, pts_y): + nav_pt = [pt_x, pt_y] + nav_pts.append(nav_pt) + nav_line = Nav_Line(fit, nav_pts, pts_x, pts_y) + return nav_line \ No newline at end of file diff --git a/Navigation/__pycache__/BevDraw.cpython-38.pyc b/Navigation/__pycache__/BevDraw.cpython-38.pyc new file mode 100644 index 0000000..450dd5a Binary files /dev/null and b/Navigation/__pycache__/BevDraw.cpython-38.pyc differ diff --git a/Navigation/__pycache__/Navigation_Process.cpython-38.pyc b/Navigation/__pycache__/Navigation_Process.cpython-38.pyc new file mode 100644 index 0000000..8124de0 Binary files /dev/null and b/Navigation/__pycache__/Navigation_Process.cpython-38.pyc differ diff --git a/OCR/inference.pdiparams b/OCR/inference.pdiparams new file mode 100644 index 0000000..5eff539 Binary files /dev/null and b/OCR/inference.pdiparams differ diff --git a/OCR/inference.pdiparams.info b/OCR/inference.pdiparams.info new file mode 100644 index 0000000..9e133bf Binary files /dev/null and b/OCR/inference.pdiparams.info differ diff --git a/OCR/inference.pdmodel b/OCR/inference.pdmodel new file mode 100644 index 0000000..585fcdc Binary files /dev/null and b/OCR/inference.pdmodel differ diff --git a/Perception/LaneDetection/__pycache__/clrnet_trt.cpython-38.pyc b/Perception/LaneDetection/__pycache__/clrnet_trt.cpython-38.pyc new file mode 100644 index 0000000..6fcccea Binary files /dev/null and b/Perception/LaneDetection/__pycache__/clrnet_trt.cpython-38.pyc differ diff --git a/Perception/LaneDetection/__pycache__/postprocess.cpython-38.pyc b/Perception/LaneDetection/__pycache__/postprocess.cpython-38.pyc new file mode 100644 index 0000000..1864d58 Binary files /dev/null and b/Perception/LaneDetection/__pycache__/postprocess.cpython-38.pyc differ diff --git a/Perception/LaneDetection/clrnet_trt.py b/Perception/LaneDetection/clrnet_trt.py new file mode 100644 index 0000000..4837991 --- /dev/null +++ b/Perception/LaneDetection/clrnet_trt.py @@ -0,0 +1,310 @@ +import cv2 +import numpy as np +import tensorrt as trt +import pycuda.driver as cuda +import pycuda.autoinit +from scipy.interpolate import InterpolatedUnivariateSpline +from .postprocess import postprocess, middel_line, line_classification + +COLORS = [ + (255, 0, 0), + (0, 255, 0), + (0, 0, 255), + (255, 255, 0), + (255, 0, 255), + (0, 255, 255) +] + + +class Lane: + def __init__(self, points=None, invalid_value=-2., metadata=None): + super(Lane, self).__init__() + self.curr_iter = 0 + self.points = points + self.invalid_value = invalid_value + self.function = InterpolatedUnivariateSpline(points[:, 1], + points[:, 0], + k=min(3, + len(points) - 1)) + self.min_y = points[:, 1].min() - 0.01 + self.max_y = points[:, 1].max() + 0.01 + + self.metadata = metadata or {} + + self.sample_y = range(710, 150, -10) + self.ori_img_w = 1280 + self.ori_img_h = 720 + + def __repr__(self): + return '[Lane]\n' + str(self.points) + '\n[/Lane]' + + def __call__(self, lane_ys): + lane_xs = self.function(lane_ys) + + lane_xs[(lane_ys < self.min_y) | + (lane_ys > self.max_y)] = self.invalid_value + return lane_xs + + def to_array(self): + sample_y = self.sample_y + img_w, img_h = self.ori_img_w, self.ori_img_h + ys = np.array(sample_y) / float(img_h) + xs = self(ys) + valid_mask = (xs >= 0) & (xs < 1) + lane_xs = xs[valid_mask] * img_w + lane_ys = ys[valid_mask] * img_h + lane = np.concatenate((lane_xs.reshape(-1, 1), lane_ys.reshape(-1, 1)), + axis=1) + return lane + + def __iter__(self): + return self + + def __next__(self): + if self.curr_iter < len(self.points): + self.curr_iter += 1 + return self.points[self.curr_iter - 1] + self.curr_iter = 0 + raise StopIteration + +class CLRNet: + def __init__(self, engine_path): + self.logger = trt.Logger(trt.Logger.ERROR) + with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime: + self.engine = runtime.deserialize_cuda_engine(f.read()) + self.context = self.engine.create_execution_context() + + self.inputs = [] + self.outputs = [] + self.allocations = [] + for i in range(self.engine.num_bindings): + is_input = False + if self.engine.binding_is_input(i): + is_input = True + name = self.engine.get_binding_name(i) + dtype = self.engine.get_binding_dtype(i) + shape = self.engine.get_binding_shape(i) + if is_input: + self.batch_size = shape[0] + size = np.dtype(trt.nptype(dtype)).itemsize + for s in shape: + size *= s + allocation = cuda.mem_alloc(size) + binding = { + 'index': i, + 'name': name, + 'dtype': np.dtype(trt.nptype(dtype)), + 'shape': list(shape), + 'allocation': allocation, + } + self.allocations.append(allocation) + if self.engine.binding_is_input(i): + self.inputs.append(binding) + else: + self.outputs.append(binding) + + self.conf_threshold = 0.6 + self.nms_thres = 90 + self.max_lanes = 4 + self.sample_points = 48 + self.num_points = 72 + self.n_offsets = 72 + self.n_strips = 71 + self.img_w = 1280 + self.img_h = 720 + self.ori_img_w = 1280 + self.ori_img_h = 720 + self.cut_height = 320 + + self.input_width = 800 + self.input_height = 320 + + self.sample_x_indexs = (np.linspace(0, 1, self.sample_points) * self.n_strips) + self.prior_feat_ys = np.flip((1 - self.sample_x_indexs / self.n_strips)) + self.prior_ys = np.linspace(1,0, self.n_offsets) + + def softmax(self, x, axis=None): + x = x - x.max(axis=axis, keepdims=True) + y = np.exp(x) + return y / y.sum(axis=axis, keepdims=True) + + def Lane_nms(self, proposals,scores,overlap=50, top_k=4): + keep_index = [] + sorted_score = np.sort(scores)[-1] # from big to small + indices = np.argsort(-scores) # from big to small + + r_filters = np.zeros(len(scores)) + + for i, indice in enumerate(indices): + if r_filters[i] == 1: # continue if this proposal is filted by nms before + continue + keep_index.append(indice) + if len(keep_index)>top_k: # break if more than top_k + break + if i == (len(scores)-1): # break if indice is the last one + break + sub_indices = indices[i+1:] + for sub_i, sub_indice in enumerate(sub_indices): + r_filter = self.Lane_IOU(proposals[indice,:],proposals[sub_indice,:],overlap) + if r_filter: r_filters[i+1+sub_i]=1 + num_to_keep = len(keep_index) + keep_index = list(map(lambda x: x.item(), keep_index)) + return keep_index, num_to_keep + + def Lane_IOU(self, parent_box, compared_box, threshold): + ''' + calculate distance one pair of proposal lines + return True if distance less than threshold + ''' + n_offsets = 72 + n_strips = n_offsets - 1 + + start_a = (parent_box[2] * n_strips + 0.5).astype(int) # add 0.5 trick to make int() like round + start_b = (compared_box[2] * n_strips + 0.5).astype(int) + start = max(start_a,start_b) + end_a = start_a + parent_box[4] - 1 + 0.5 - (((parent_box[4] - 1)<0).astype(int)) + end_b = start_b + compared_box[4] - 1 + 0.5 - (((compared_box[4] - 1)<0).astype(int)) + end = min(min(end_a,end_b),71) + if (end - start)<0: + return False + dist = 0 + for i in range(5+start,5 + int(end)): + if i>(5+end): + break + if parent_box[i] < compared_box[i]: + dist += compared_box[i] - parent_box[i] + else: + dist += parent_box[i] - compared_box[i] + return dist < (threshold * (end - start + 1)) + + def predictions_to_pred(self, predictions): + lanes = [] + for lane in predictions: + lane_xs = lane[6:] # normalized value + start = min(max(0, int(round(lane[2].item() * self.n_strips))), + self.n_strips) + length = int(round(lane[5].item())) + end = start + length - 1 + end = min(end, len(self.prior_ys) - 1) + # end = label_end + # if the prediction does not start at the bottom of the image, + # extend its prediction until the x is outside the image + mask = ~((((lane_xs[:start] >= 0.) & (lane_xs[:start] <= 1.) + )[::-1].cumprod()[::-1]).astype(np.bool)) + + lane_xs[end + 1:] = -2 + lane_xs[:start][mask] = -2 + lane_ys = self.prior_ys[lane_xs >= 0] + lane_xs = lane_xs[lane_xs >= 0] + + lane_xs = np.double(lane_xs) + lane_xs = np.flip(lane_xs, axis=0) + lane_ys = np.flip(lane_ys, axis=0) + lane_ys = (lane_ys * (self.ori_img_h - self.cut_height) + + self.cut_height) / self.ori_img_h + if len(lane_xs) <= 1: + continue + + points = np.stack( + (lane_xs.reshape(-1, 1), lane_ys.reshape(-1, 1)), + axis=1).squeeze(2) + + lane = Lane(points=points, + metadata={ + 'start_x': lane[3], + 'start_y': lane[2], + 'conf': lane[1] + }) + lanes.append(lane) + return lanes + + def get_lanes(self, output, as_lanes=True): + ''' + Convert model output to lanes. + ''' + decoded = [] + for predictions in output: + # filter out the conf lower than conf threshold + scores = self.softmax(predictions[:, :2], 1)[:, 1] + + keep_inds = scores >= self.conf_threshold + predictions = predictions[keep_inds] + scores = scores[keep_inds] + + if predictions.shape[0] == 0: + decoded.append([]) + continue + nms_predictions = predictions + + nms_predictions = np.concatenate( + [nms_predictions[..., :4], nms_predictions[..., 5:]], axis=-1) + + nms_predictions[..., 4] = nms_predictions[..., 4] * self.n_strips + nms_predictions[...,5:] = nms_predictions[..., 5:] * (self.img_w - 1) + + keep, num_to_keep = self.Lane_nms( + nms_predictions, + scores, + self.nms_thres, + self.max_lanes) + + keep = keep[:num_to_keep] + predictions = predictions[keep] + + if predictions.shape[0] == 0: + decoded.append([]) + continue + + predictions[:, 5] = np.round(predictions[:, 5] * self.n_strips) + pred = self.predictions_to_pred(predictions) + decoded.append(pred) + + return decoded + + + def forward(self, img, im1, CAM): + img = img[self.cut_height:, :, :] + img = cv2.resize(img, (self.input_width, self.input_height), cv2.INTER_CUBIC) + img = img.astype(np.float32) / 255.0 + img = np.transpose(np.float32(img[:, :, :, np.newaxis]), (3, 2, 0, 1)) + img = np.ascontiguousarray(img) + cuda.memcpy_htod(self.inputs[0]['allocation'], img) + self.context.execute_v2(self.allocations) + outputs = [] + for out in self.outputs: + output = np.zeros(out['shape'],out['dtype']) + cuda.memcpy_dtoh(output, out['allocation']) + outputs.append(output) + output = outputs[0] + output = self.get_lanes(output) + res, bev_lanes = postprocess(im1, output, CAM) + return res, bev_lanes + + + + + + + + +''' +# 测试程序 +if __name__ == "__main__": + cap = cv2.VideoCapture("D:/autodrive/script/test3.mp4") + isnet = CLRNet('tusimple_r18.engine', np.ones(shape=(3,3))) + fourcc = cv2.VideoWriter_fourcc(*'MP4V') + fps = cap.get(cv2.CAP_PROP_FPS) + out = cv2.VideoWriter('D:/autodrive/script/result.mp4', fourcc, fps, (1280, 720)) + while True: + success, img = cap.read() + img = cv2.cvtColor(cv2.resize(img, (1280, 720)), cv2.COLOR_BGR2RGB) + #image = cv2.imread('test.jpg') + img1, lanes, bev_lanes, bev_img, middle_lane = isnet.forward(img) + cv2.imshow('demo', img1) + out.write(img1) + if cv2.waitKey(1) & 0xFF == ord('z'): + cv2.waitKey(0) + if cv2.waitKey(25) & 0xFF == ord('q'): + cv2.destroyAllWindows() + break +''' diff --git a/Perception/LaneDetection/postprocess.py b/Perception/LaneDetection/postprocess.py new file mode 100644 index 0000000..c8e23bc --- /dev/null +++ b/Perception/LaneDetection/postprocess.py @@ -0,0 +1,250 @@ +import heapq +import numpy as np +from sklearn.cluster import DBSCAN +import cv2 + + +class Bev_Lane: # 封装车道线类(金丹) + def __init__(self, cls, position_type, fit, endpoint, startpoint, lanes_pts): + self.cls = cls # 车道线线型 虚线:0, 实线:1 + self.position_type = position_type + ###############车道线类型################ + # 左边界: -4 + # 左边第三根车道线:-3 + # 左边第二根(左临近车道的左边界):-2 + # 当前车道左边界:-1 + # 当前车道中心线:0 + # 当前车道有边界:1 + # 右边第二根(左临近车道的左边界):2 + # 右边第三根车道线:3 + # 右边界:4 + # 未知车道线:5 未定义前全部设置为5 + ####################################### + self.fit = fit # 三次函数拟合车道线的参数 [a, b, c] 纵是X轴,横是Y轴 + self.endpoint = endpoint # 远离本车的端点坐标 + self.startpoint = startpoint # 接近本车的端点坐标 + self.pts = lanes_pts # 车道线上所有的点 + + +def postprocess(img_, output, CAM): + lanes_xys = lane2lane_xys(output[0]) + lines_cls = line_classification(img_, lanes_xys) # 车道线分类 + res, lanes = imshow_lanes(img_, lanes_xys) + vanish_point = vanish_point_detection(lanes) # 计算灭点 + bev_lanes = bev_lane(lanes, CAM, lines_cls, vanish_point) + bev_lanes = line_positiontype_classfication(bev_lanes) + return res, bev_lanes + + +def lane2lane_xys(lanes): + lanes = [lane.to_array() for lane in lanes] + lanes_xys = [] + for _, lane in enumerate(lanes): + xys = [] + for x, y in lane: + if x <= 0 or y <= 0: + continue + x, y = int(x), int(y) + xys.append([x, y]) + if len(xys) > 0: + lanes_xys.append(xys) + lanes_xys.sort(key=lambda xys: xys[0][0]) + return lanes_xys + + +def imshow_lanes(img, lanes_xys, width=4): + for idx, xys in enumerate(lanes_xys): + for i in range(1, len(xys)): + # cv2.line(img, xys[i - 1], xys[i], COLORS[idx], thickness=width) + cv2.line(img, xys[i - 1], xys[i], [0, 255, 127], thickness=width) + + ''' + middle_lanes = [] # 计算中线 + if len(lanes_xys) >= 2: + middle_lanes, middle_pt = middel_line(lanes_xys) + for pt in middle_lanes: + cv2.circle(img, pt, radius=3, color=(200, 200, 200), thickness=-1) + # cv2.line(img, middle_pt, (middle_pt[0], middle_pt[1]-5), [0, 255, 127], thickness=width) + ''' + return img, lanes_xys + + +def bev_lane(lanes, CAM, lines_cls, vanish_point): + # CAM.update_intrinsic(vanish_point) #根据灭点坐标的相机外参修正 + bev_lanes = [] + for i in range(len(lanes)): + if len(lanes[i]) < 10: # 过滤点数小于10的线 + continue + bev_lane_pts = [] + bev_xy = [] # 拟合前 + for pt in lanes[i]: + pt = list(pt) + pt[1] = 720 - pt[1] + bev_pt = CAM.pixel_to_world(pt).tolist() + if 470 - bev_pt[0][1]/2 <= 250 or 470 - bev_pt[0][1]/2 >= 500: + continue + bev_xy.append([int(bev_pt[0][0]/5)+400, 470 - int(bev_pt[0][1]/1.8)]) + bev_xy = np.array(bev_xy).T + fit = np.polyfit(bev_xy[:][1], bev_xy[:][0], 3) # 三次函数拟合 + bev_y = np.linspace(250, 470, 20) # 为了减少计算量,采样点减少为20个 + bev_x = fit[0] * bev_y ** 3 + fit[1] * bev_y ** 2 + fit[2] * bev_y + fit[3] + for j in range(20): + if np.abs(bev_x[j]-400) >= 60: continue + bev_lane_pts.append((int(bev_x[j]), int(bev_y[j]))) + if len(bev_lane_pts) >= 1: # 之前过滤了一次距离,可能全过滤掉了 + bev_line = Bev_Lane(lines_cls[i], 5, fit, bev_lane_pts[-1], bev_lane_pts[0], bev_lane_pts) # 封装bev车道线信息 + bev_lanes.append(bev_line) + return bev_lanes + + +def vanish_point_detection(lanes): # 灭点检测 + if len(lanes) == 0: + vanish_point = np.array([[640], [335], [0]]) + elif len(lanes) == 1: # 只检测出一条车道时,只与中线求交 + lane = np.array(lanes[0]) + fit = np.polyfit(lane[:, 0], lane[:, 1], 1) + vanish_point = np.array([[640], [fit[0] * 640 + fit[1]], [0]]) + elif len(lanes) > 1: #检测出多条车道时, 两两求交 + vpts = [] + fits = [] + for lane in lanes: + lane = np.array(lane) + fit = np.polyfit(lane[:, 0], lane[:, 1], 1) + fits.append(fit) + for i in range(len(fits) - 1): + for j in range(i+1, len(fits)): + fit1 = fits[i] + fit2 = fits[j] + vpt_x = -(fit1[1] - fit2[1])/(fit1[0] - fit2[0]) + vpt_y = fit1[0] * vpt_x + fit1[1] + vpts.append([vpt_x, vpt_y]) + vpts = np.array(vpts) + vanish_point = np.array([[np.average(vpts[:, 0])], [np.average(vpts[:, 1])], [0]]) + return vanish_point + + +def middel_line(lanes_xys): + line_to_middle = [] + middle_lane = [] + lanes = lanes_xys.copy() + for i in range(len(lanes)-1, -1, -1): + if len(lanes[i]) <= 30: + del lanes[i] + if len(lanes) >= 2: + for i in range(len(lanes)): + line_to_middle.append(np.abs(lanes[i][0][0]-640)) + current_lane_idx = list(map(line_to_middle.index, heapq.nsmallest(2, line_to_middle))) + current_lane_0 = lanes[current_lane_idx[0]] + current_lane_1 = lanes[current_lane_idx[1]] + fit_0 = np.polyfit(np.array([i[1] for i in current_lane_0]), np.array([i[0] for i in current_lane_0]), 3) + fit_1 = np.polyfit(np.array([i[1] for i in current_lane_1]), np.array([i[0] for i in current_lane_1]), 3) + pts_y = np.linspace(500, 699, 50) + pts_x = (fit_0[0]+fit_1[0])/2 * pts_y ** 3 + (fit_0[1]+fit_1[1])/2 * pts_y ** 2 + (fit_0[2]+fit_1[2])/2 * pts_y + (fit_0[3]+fit_1[3])/2 + for i in range(50): + middle_lane.append((int(pts_x[i]), int(pts_y[i]))) + middle_pt = ((current_lane_0[0][0]+current_lane_1[0][0])//2, (current_lane_0[0][1]+current_lane_1[0][1])//2) + return middle_lane, middle_pt + elif len(lanes) == 1: + if lanes[0][0][0] > 640: + middle_lane = [(lanes[0][0][0]-550, lanes[0][0][1]), (lanes[0][0][0]-550, lanes[0][1][1]), (lanes[0][0][0]-550, lanes[0][2][1])] + elif lanes[0][0][0] < 640: + middle_lane = [(lanes[0][0][0]+550, lanes[0][0][1]), (lanes[0][0][0]-550, lanes[0][1][1]), (lanes[0][0][0]-550, lanes[0][2][1])] + return middle_lane, [] + return middle_lane, [] + + +def line_positiontype_classfication(bev_lanes): + positiontypes = np.zeros((1, len(bev_lanes))) # 保存车道线位置类型 + bev_lanes_sorted = [0] * len(bev_lanes) + if len(bev_lanes) >= 1: + start_xs = [] + for bev_lane in bev_lanes: + start_x = bev_lane.pts[-3][0] # bev车道线起点横坐标 + start_xs.append(start_x-400) # bev车道起点距本车中心线横向距离 + middle_lane_exist = False # 判断是否存在车道线在车中间 + + start_xs = np.array(start_xs) # 将车道线按起始点位置重新排序 + lane_sort = np.argsort(start_xs) + start_xs = np.sort(start_xs) + for i in range(len(bev_lanes)): + bev_lanes_sorted[i] = bev_lanes[lane_sort[i]] + + for lane_id in range(len(start_xs)): # 找是否存在车道线在车中间 + if np.abs(start_xs[lane_id]) < 4: + positiontypes[0, lane_id] = 0 + middle_lane_exist = True + if middle_lane_exist == True: # 有中心线的情况 + middle_lane_id = lane_id + for i in reversed(range(middle_lane_id)): + positiontypes[0, i] = i-middle_lane_id + else: # 无中心线的情况 + len_left = 0 # 左边车道线数量 + for i in range(len(bev_lanes)): + if start_xs[i] >= 0: + break + len_left = len_left + 1 + if len_left > 0: + left_len = len_left + for i in reversed(range(len_left)): + if i-len_left == -1: # 当要设定为左车道线时,判断横向距离过滤 + if start_xs[i] <= -20: + left_len = left_len + 1 + positiontypes[0, i] = i-left_len + if len_left < len(bev_lanes_sorted): + right_len = len_left + for i in range(len_left, len(bev_lanes)): + if i-len_left == -1: # 当要设定为右车道线时,判断横向距离过滤 + if start_xs[i] >= 20: + right_len = right_len - 1 + positiontypes[0, i] = i-right_len+1 + + if 1 in positiontypes and -1 not in positiontypes and 0 not in positiontypes: # 只检测出右侧车道时,估计左侧车道 + ego_left = bev_lanes_sorted[positiontypes.tolist()[0].index(1)] + ego_left_fit = ego_left.fit + endpoint = ego_left.endpoint + startpoint = ego_left.startpoint + pts = ego_left.pts + pts_pre = [] + for pt in pts: + pts_pre.append([pt[0]-20, pt[1]]) + bev_lane_pre = Bev_Lane(0, 1, ego_left_fit, [endpoint[0]-20, endpoint[1]], [startpoint[0]-20, startpoint[1]], pts_pre) + bev_lanes_sorted.append(bev_lane_pre) + + if -1 in positiontypes and 1 not in positiontypes and 0 not in positiontypes: # 只检测出左侧车道时,估计右侧车道 + ego_right = bev_lanes_sorted[positiontypes.tolist()[0].index(-1)] + ego_right_fit = ego_right.fit + endpoint = ego_right.endpoint + startpoint = ego_right.startpoint + pts = ego_right.pts + pts_pre = [] + for pt in pts: + pts_pre.append([pt[0]+20, pt[1]]) + bev_lane_pre = Bev_Lane(0, 1, ego_right_fit, [endpoint[0]+20, endpoint[1]], [startpoint[0]+20, startpoint[1]], pts_pre) + bev_lanes_sorted.append(bev_lane_pre) + + for i in range(len(bev_lanes)): + bev_lanes_sorted[i].position_type = positiontypes[0, i] + return bev_lanes_sorted + + +def line_classification(img, lines): # 判断虚实线 + line_cls = [] # 车道线类型,实线:1,虚线:0 + for line in lines: + colors = [] + for i in range(len(line)-1): + if line[i][1] >= 700 or line[i][1] <= 400: + continue + pt_pre = [int((line[i][0] + line[i + 1][0]) / 2), int((line[i][1] + line[i + 1][1]) / 2)] + colors.append([np.average(img[line[i][1], line[i][0], :])]) + colors.append([np.average(img[pt_pre[1], pt_pre[0], :])]) + if len(colors) >= 1: + colors = np.array(colors) + db = DBSCAN(eps=20, min_samples=2).fit(colors) + k = np.max(db.labels_) + else: + k = -1 + if k == 0: + line_cls.append(1) + else: + line_cls.append(0) + return line_cls \ No newline at end of file diff --git a/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/basetrack.cpython-38.pyc b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/basetrack.cpython-38.pyc new file mode 100644 index 0000000..af08a40 Binary files /dev/null and b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/basetrack.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/byte_tracker.cpython-38.pyc b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/byte_tracker.cpython-38.pyc new file mode 100644 index 0000000..9a5cf9a Binary files /dev/null and b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/byte_tracker.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/kalman_filter.cpython-38.pyc b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/kalman_filter.cpython-38.pyc new file mode 100644 index 0000000..c0aeaec Binary files /dev/null and b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/kalman_filter.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/matching.cpython-38.pyc b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/matching.cpython-38.pyc new file mode 100644 index 0000000..6f9cb9d Binary files /dev/null and b/Perception/ObjectDetection/ByteTrack/tracker/__pycache__/matching.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/ByteTrack/tracker/basetrack.py b/Perception/ObjectDetection/ByteTrack/tracker/basetrack.py new file mode 100644 index 0000000..4fe2233 --- /dev/null +++ b/Perception/ObjectDetection/ByteTrack/tracker/basetrack.py @@ -0,0 +1,52 @@ +import numpy as np +from collections import OrderedDict + + +class TrackState(object): + New = 0 + Tracked = 1 + Lost = 2 + Removed = 3 + + +class BaseTrack(object): + _count = 0 + + track_id = 0 + is_activated = False + state = TrackState.New + + history = OrderedDict() + features = [] + curr_feature = None + score = 0 + start_frame = 0 + frame_id = 0 + time_since_update = 0 + + # multi-camera + location = (np.inf, np.inf) + + @property + def end_frame(self): + return self.frame_id + + @staticmethod + def next_id(): + BaseTrack._count += 1 + return BaseTrack._count + + def activate(self, *args): + raise NotImplementedError + + def predict(self): + raise NotImplementedError + + def update(self, *args, **kwargs): + raise NotImplementedError + + def mark_lost(self): + self.state = TrackState.Lost + + def mark_removed(self): + self.state = TrackState.Removed diff --git a/Perception/ObjectDetection/ByteTrack/tracker/byte_tracker.py b/Perception/ObjectDetection/ByteTrack/tracker/byte_tracker.py new file mode 100644 index 0000000..c1e5cdb --- /dev/null +++ b/Perception/ObjectDetection/ByteTrack/tracker/byte_tracker.py @@ -0,0 +1,327 @@ +import numpy as np + +from .kalman_filter import KalmanFilter +from ..tracker import matching +from .basetrack import BaseTrack, TrackState + +class STrack(BaseTrack): + shared_kalman = KalmanFilter() + def __init__(self, tlwh, score): + + # wait activate + self._tlwh = np.asarray(tlwh, dtype=np.float) + self.kalman_filter = None + self.mean, self.covariance = None, None + self.is_activated = False + + self.score = score + self.tracklet_len = 0 + + def predict(self): + mean_state = self.mean.copy() + if self.state != TrackState.Tracked: + mean_state[7] = 0 + self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance) + + @staticmethod + def multi_predict(stracks): + if len(stracks) > 0: + multi_mean = np.asarray([st.mean.copy() for st in stracks]) + multi_covariance = np.asarray([st.covariance for st in stracks]) + for i, st in enumerate(stracks): + if st.state != TrackState.Tracked: + multi_mean[i][7] = 0 + multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance) + for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)): + stracks[i].mean = mean + stracks[i].covariance = cov + + def activate(self, kalman_filter, frame_id): + """Start a new tracklet""" + self.kalman_filter = kalman_filter + self.track_id = self.next_id() + self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh)) + + self.tracklet_len = 0 + self.state = TrackState.Tracked + if frame_id == 1: + self.is_activated = True + # self.is_activated = True + self.frame_id = frame_id + self.start_frame = frame_id + + def re_activate(self, new_track, frame_id, new_id=False): + self.mean, self.covariance = self.kalman_filter.update( + self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh) + ) + self.tracklet_len = 0 + self.state = TrackState.Tracked + self.is_activated = True + self.frame_id = frame_id + if new_id: + self.track_id = self.next_id() + self.score = new_track.score + + def update(self, new_track, frame_id): + """ + Update a matched track + :type new_track: STrack + :type frame_id: int + :type update_feature: bool + :return: + """ + self.frame_id = frame_id + self.tracklet_len += 1 + + new_tlwh = new_track.tlwh + self.mean, self.covariance = self.kalman_filter.update( + self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh)) + self.state = TrackState.Tracked + self.is_activated = True + + self.score = new_track.score + + @property + # @jit(nopython=True) + def tlwh(self): + """Get current position in bounding box format `(top left x, top left y, + width, height)`. + """ + if self.mean is None: + return self._tlwh.copy() + ret = self.mean[:4].copy() + ret[2] *= ret[3] + ret[:2] -= ret[2:] / 2 + return ret + + @property + # @jit(nopython=True) + def tlbr(self): + """Convert bounding box to format `(min x, min y, max x, max y)`, i.e., + `(top left, bottom right)`. + """ + ret = self.tlwh.copy() + ret[2:] += ret[:2] + return ret + + @staticmethod + # @jit(nopython=True) + def tlwh_to_xyah(tlwh): + """Convert bounding box to format `(center x, center y, aspect ratio, + height)`, where the aspect ratio is `width / height`. + """ + ret = np.asarray(tlwh).copy() + ret[:2] += ret[2:] / 2 + ret[2] /= ret[3] + return ret + + def to_xyah(self): + return self.tlwh_to_xyah(self.tlwh) + + @staticmethod + # @jit(nopython=True) + def tlbr_to_tlwh(tlbr): + ret = np.asarray(tlbr).copy() + ret[2:] -= ret[:2] + return ret + + @staticmethod + # @jit(nopython=True) + def tlwh_to_tlbr(tlwh): + ret = np.asarray(tlwh).copy() + ret[2:] += ret[:2] + return ret + + def __repr__(self): + return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame) + + +class BYTETracker(object): + def __init__(self, track_thresh, track_buffer, match_thresh, mot20, frame_rate): + self.tracked_stracks = [] # type: list[STrack] + self.lost_stracks = [] # type: list[STrack] + self.removed_stracks = [] # type: list[STrack] + + self.frame_id = 0 + # self.args = args + # self.det_thresh = args.track_thresh + self.track_thresh = track_thresh + self.match_thresh = match_thresh + self.mot20 = mot20 + self.det_thresh = self.track_thresh + 0.1 + self.buffer_size = int(frame_rate / 30.0 * track_buffer) + self.max_time_lost = self.buffer_size + self.kalman_filter = KalmanFilter() + + def update(self, output_results, img_info, img_size): + self.frame_id += 1 + activated_starcks = [] + refind_stracks = [] + lost_stracks = [] + removed_stracks = [] + + if output_results.shape[1] == 5: + scores = output_results[:, 4] + bboxes = output_results[:, :4] + else: + output_results = output_results.cpu().numpy() + scores = output_results[:, 4] * output_results[:, 5] + bboxes = output_results[:, :4] # x1y1x2y2 + img_h, img_w = img_info[0], img_info[1] + scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w)) + bboxes /= scale + + remain_inds = scores > self.track_thresh + inds_low = scores > 0.1 + inds_high = scores < self.track_thresh + + inds_second = np.logical_and(inds_low, inds_high) + dets_second = bboxes[inds_second] + dets = bboxes[remain_inds] + scores_keep = scores[remain_inds] + scores_second = scores[inds_second] + + if len(dets) > 0: + '''Detections''' + detections = [STrack(STrack.tlbr_to_tlwh(tlbr), s) for + (tlbr, s) in zip(dets, scores_keep)] + else: + detections = [] + + ''' Add newly detected tracklets to tracked_stracks''' + unconfirmed = [] + tracked_stracks = [] # type: list[STrack] + for track in self.tracked_stracks: + if not track.is_activated: + unconfirmed.append(track) + else: + tracked_stracks.append(track) + + ''' Step 2: First association, with high score detection boxes''' + strack_pool = joint_stracks(tracked_stracks, self.lost_stracks) + # Predict the current location with KF + STrack.multi_predict(strack_pool) + dists = matching.iou_distance(strack_pool, detections) + if not self.mot20: + dists = matching.fuse_score(dists, detections) + matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.match_thresh) + + for itracked, idet in matches: + track = strack_pool[itracked] + det = detections[idet] + if track.state == TrackState.Tracked: + track.update(detections[idet], self.frame_id) + activated_starcks.append(track) + else: + track.re_activate(det, self.frame_id, new_id=False) + refind_stracks.append(track) + + ''' Step 3: Second association, with low score detection boxes''' + # association the untrack to the low score detections + if len(dets_second) > 0: + '''Detections''' + detections_second = [STrack(STrack.tlbr_to_tlwh(tlbr), s) for + (tlbr, s) in zip(dets_second, scores_second)] + else: + detections_second = [] + r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked] + dists = matching.iou_distance(r_tracked_stracks, detections_second) + matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5) + for itracked, idet in matches: + track = r_tracked_stracks[itracked] + det = detections_second[idet] + if track.state == TrackState.Tracked: + track.update(det, self.frame_id) + activated_starcks.append(track) + else: + track.re_activate(det, self.frame_id, new_id=False) + refind_stracks.append(track) + + for it in u_track: + track = r_tracked_stracks[it] + if not track.state == TrackState.Lost: + track.mark_lost() + lost_stracks.append(track) + + '''Deal with unconfirmed tracks, usually tracks with only one beginning frame''' + detections = [detections[i] for i in u_detection] + dists = matching.iou_distance(unconfirmed, detections) + if not self.mot20: + dists = matching.fuse_score(dists, detections) + matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7) + for itracked, idet in matches: + unconfirmed[itracked].update(detections[idet], self.frame_id) + activated_starcks.append(unconfirmed[itracked]) + for it in u_unconfirmed: + track = unconfirmed[it] + track.mark_removed() + removed_stracks.append(track) + + """ Step 4: Init new stracks""" + for inew in u_detection: + track = detections[inew] + if track.score < self.det_thresh: + continue + track.activate(self.kalman_filter, self.frame_id) + activated_starcks.append(track) + """ Step 5: Update state""" + for track in self.lost_stracks: + if self.frame_id - track.end_frame > self.max_time_lost: + track.mark_removed() + removed_stracks.append(track) + + # print('Ramained match {} s'.format(t4-t3)) + + self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked] + self.tracked_stracks = joint_stracks(self.tracked_stracks, activated_starcks) + self.tracked_stracks = joint_stracks(self.tracked_stracks, refind_stracks) + self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks) + self.lost_stracks.extend(lost_stracks) + self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks) + self.removed_stracks.extend(removed_stracks) + self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks) + # get scores of lost tracks + output_stracks = [track for track in self.tracked_stracks if track.is_activated] + + return output_stracks + + +def joint_stracks(tlista, tlistb): + exists = {} + res = [] + for t in tlista: + exists[t.track_id] = 1 + res.append(t) + for t in tlistb: + tid = t.track_id + if not exists.get(tid, 0): + exists[tid] = 1 + res.append(t) + return res + + +def sub_stracks(tlista, tlistb): + stracks = {} + for t in tlista: + stracks[t.track_id] = t + for t in tlistb: + tid = t.track_id + if stracks.get(tid, 0): + del stracks[tid] + return list(stracks.values()) + + +def remove_duplicate_stracks(stracksa, stracksb): + pdist = matching.iou_distance(stracksa, stracksb) + pairs = np.where(pdist < 0.15) + dupa, dupb = list(), list() + for p, q in zip(*pairs): + timep = stracksa[p].frame_id - stracksa[p].start_frame + timeq = stracksb[q].frame_id - stracksb[q].start_frame + if timep > timeq: + dupb.append(q) + else: + dupa.append(p) + resa = [t for i, t in enumerate(stracksa) if not i in dupa] + resb = [t for i, t in enumerate(stracksb) if not i in dupb] + return resa, resb diff --git a/Perception/ObjectDetection/ByteTrack/tracker/kalman_filter.py b/Perception/ObjectDetection/ByteTrack/tracker/kalman_filter.py new file mode 100644 index 0000000..deda8a2 --- /dev/null +++ b/Perception/ObjectDetection/ByteTrack/tracker/kalman_filter.py @@ -0,0 +1,270 @@ +# vim: expandtab:ts=4:sw=4 +import numpy as np +import scipy.linalg + + +""" +Table for the 0.95 quantile of the chi-square distribution with N degrees of +freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv +function and used as Mahalanobis gating threshold. +""" +chi2inv95 = { + 1: 3.8415, + 2: 5.9915, + 3: 7.8147, + 4: 9.4877, + 5: 11.070, + 6: 12.592, + 7: 14.067, + 8: 15.507, + 9: 16.919} + + +class KalmanFilter(object): + """ + A simple Kalman filter for tracking bounding boxes in image space. + + The 8-dimensional state space + + x, y, a, h, vx, vy, va, vh + + contains the bounding box center position (x, y), aspect ratio a, height h, + and their respective velocities. + + Object motion follows a constant velocity model. The bounding box location + (x, y, a, h) is taken as direct observation of the state space (linear + observation model). + + """ + + def __init__(self): + ndim, dt = 4, 1. + + # Create Kalman filter model matrices. + self._motion_mat = np.eye(2 * ndim, 2 * ndim) + for i in range(ndim): + self._motion_mat[i, ndim + i] = dt + self._update_mat = np.eye(ndim, 2 * ndim) + + # Motion and observation uncertainty are chosen relative to the current + # state estimate. These weights control the amount of uncertainty in + # the model. This is a bit hacky. + self._std_weight_position = 1. / 20 + self._std_weight_velocity = 1. / 160 + + def initiate(self, measurement): + """Create track from unassociated measurement. + + Parameters + ---------- + measurement : ndarray + Bounding box coordinates (x, y, a, h) with center position (x, y), + aspect ratio a, and height h. + + Returns + ------- + (ndarray, ndarray) + Returns the mean vector (8 dimensional) and covariance matrix (8x8 + dimensional) of the new track. Unobserved velocities are initialized + to 0 mean. + + """ + mean_pos = measurement + mean_vel = np.zeros_like(mean_pos) + mean = np.r_[mean_pos, mean_vel] + + std = [ + 2 * self._std_weight_position * measurement[3], + 2 * self._std_weight_position * measurement[3], + 1e-2, + 2 * self._std_weight_position * measurement[3], + 10 * self._std_weight_velocity * measurement[3], + 10 * self._std_weight_velocity * measurement[3], + 1e-5, + 10 * self._std_weight_velocity * measurement[3]] + covariance = np.diag(np.square(std)) + return mean, covariance + + def predict(self, mean, covariance): + """Run Kalman filter prediction step. + + Parameters + ---------- + mean : ndarray + The 8 dimensional mean vector of the object state at the previous + time step. + covariance : ndarray + The 8x8 dimensional covariance matrix of the object state at the + previous time step. + + Returns + ------- + (ndarray, ndarray) + Returns the mean vector and covariance matrix of the predicted + state. Unobserved velocities are initialized to 0 mean. + + """ + std_pos = [ + self._std_weight_position * mean[3], + self._std_weight_position * mean[3], + 1e-2, + self._std_weight_position * mean[3]] + std_vel = [ + self._std_weight_velocity * mean[3], + self._std_weight_velocity * mean[3], + 1e-5, + self._std_weight_velocity * mean[3]] + motion_cov = np.diag(np.square(np.r_[std_pos, std_vel])) + + #mean = np.dot(self._motion_mat, mean) + mean = np.dot(mean, self._motion_mat.T) + covariance = np.linalg.multi_dot(( + self._motion_mat, covariance, self._motion_mat.T)) + motion_cov + + return mean, covariance + + def project(self, mean, covariance): + """Project state distribution to measurement space. + + Parameters + ---------- + mean : ndarray + The state's mean vector (8 dimensional array). + covariance : ndarray + The state's covariance matrix (8x8 dimensional). + + Returns + ------- + (ndarray, ndarray) + Returns the projected mean and covariance matrix of the given state + estimate. + + """ + std = [ + self._std_weight_position * mean[3], + self._std_weight_position * mean[3], + 1e-1, + self._std_weight_position * mean[3]] + innovation_cov = np.diag(np.square(std)) + + mean = np.dot(self._update_mat, mean) + covariance = np.linalg.multi_dot(( + self._update_mat, covariance, self._update_mat.T)) + return mean, covariance + innovation_cov + + def multi_predict(self, mean, covariance): + """Run Kalman filter prediction step (Vectorized version). + Parameters + ---------- + mean : ndarray + The Nx8 dimensional mean matrix of the object states at the previous + time step. + covariance : ndarray + The Nx8x8 dimensional covariance matrics of the object states at the + previous time step. + Returns + ------- + (ndarray, ndarray) + Returns the mean vector and covariance matrix of the predicted + state. Unobserved velocities are initialized to 0 mean. + """ + std_pos = [ + self._std_weight_position * mean[:, 3], + self._std_weight_position * mean[:, 3], + 1e-2 * np.ones_like(mean[:, 3]), + self._std_weight_position * mean[:, 3]] + std_vel = [ + self._std_weight_velocity * mean[:, 3], + self._std_weight_velocity * mean[:, 3], + 1e-5 * np.ones_like(mean[:, 3]), + self._std_weight_velocity * mean[:, 3]] + sqr = np.square(np.r_[std_pos, std_vel]).T + + motion_cov = [] + for i in range(len(mean)): + motion_cov.append(np.diag(sqr[i])) + motion_cov = np.asarray(motion_cov) + + mean = np.dot(mean, self._motion_mat.T) + left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2)) + covariance = np.dot(left, self._motion_mat.T) + motion_cov + + return mean, covariance + + def update(self, mean, covariance, measurement): + """Run Kalman filter correction step. + + Parameters + ---------- + mean : ndarray + The predicted state's mean vector (8 dimensional). + covariance : ndarray + The state's covariance matrix (8x8 dimensional). + measurement : ndarray + The 4 dimensional measurement vector (x, y, a, h), where (x, y) + is the center position, a the aspect ratio, and h the height of the + bounding box. + + Returns + ------- + (ndarray, ndarray) + Returns the measurement-corrected state distribution. + + """ + projected_mean, projected_cov = self.project(mean, covariance) + + chol_factor, lower = scipy.linalg.cho_factor( + projected_cov, lower=True, check_finite=False) + kalman_gain = scipy.linalg.cho_solve( + (chol_factor, lower), np.dot(covariance, self._update_mat.T).T, + check_finite=False).T + innovation = measurement - projected_mean + + new_mean = mean + np.dot(innovation, kalman_gain.T) + new_covariance = covariance - np.linalg.multi_dot(( + kalman_gain, projected_cov, kalman_gain.T)) + return new_mean, new_covariance + + def gating_distance(self, mean, covariance, measurements, + only_position=False, metric='maha'): + """Compute gating distance between state distribution and measurements. + A suitable distance threshold can be obtained from `chi2inv95`. If + `only_position` is False, the chi-square distribution has 4 degrees of + freedom, otherwise 2. + Parameters + ---------- + mean : ndarray + Mean vector over the state distribution (8 dimensional). + covariance : ndarray + Covariance of the state distribution (8x8 dimensional). + measurements : ndarray + An Nx4 dimensional matrix of N measurements, each in + format (x, y, a, h) where (x, y) is the bounding box center + position, a the aspect ratio, and h the height. + only_position : Optional[bool] + If True, distance computation is done with respect to the bounding + box center position only. + Returns + ------- + ndarray + Returns an array of length N, where the i-th element contains the + squared Mahalanobis distance between (mean, covariance) and + `measurements[i]`. + """ + mean, covariance = self.project(mean, covariance) + if only_position: + mean, covariance = mean[:2], covariance[:2, :2] + measurements = measurements[:, :2] + + d = measurements - mean + if metric == 'gaussian': + return np.sum(d * d, axis=1) + elif metric == 'maha': + cholesky_factor = np.linalg.cholesky(covariance) + z = scipy.linalg.solve_triangular( + cholesky_factor, d.T, lower=True, check_finite=False, + overwrite_b=True) + squared_maha = np.sum(z * z, axis=0) + return squared_maha + else: + raise ValueError('invalid distance metric') \ No newline at end of file diff --git a/Perception/ObjectDetection/ByteTrack/tracker/matching.py b/Perception/ObjectDetection/ByteTrack/tracker/matching.py new file mode 100644 index 0000000..97b01b1 --- /dev/null +++ b/Perception/ObjectDetection/ByteTrack/tracker/matching.py @@ -0,0 +1,179 @@ +import numpy as np +import scipy +import lap +from scipy.spatial.distance import cdist + +from cython_bbox import bbox_overlaps as bbox_ious +from ..tracker import kalman_filter + +def merge_matches(m1, m2, shape): + O,P,Q = shape + m1 = np.asarray(m1) + m2 = np.asarray(m2) + + M1 = scipy.sparse.coo_matrix((np.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(O, P)) + M2 = scipy.sparse.coo_matrix((np.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(P, Q)) + + mask = M1*M2 + match = mask.nonzero() + match = list(zip(match[0], match[1])) + unmatched_O = tuple(set(range(O)) - set([i for i, j in match])) + unmatched_Q = tuple(set(range(Q)) - set([j for i, j in match])) + + return match, unmatched_O, unmatched_Q + + +def _indices_to_matches(cost_matrix, indices, thresh): + matched_cost = cost_matrix[tuple(zip(*indices))] + matched_mask = (matched_cost <= thresh) + + matches = indices[matched_mask] + unmatched_a = tuple(set(range(cost_matrix.shape[0])) - set(matches[:, 0])) + unmatched_b = tuple(set(range(cost_matrix.shape[1])) - set(matches[:, 1])) + + return matches, unmatched_a, unmatched_b + + +def linear_assignment(cost_matrix, thresh): + if cost_matrix.size == 0: + return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1])) + matches, unmatched_a, unmatched_b = [], [], [] + cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh) + for ix, mx in enumerate(x): + if mx >= 0: + matches.append([ix, mx]) + unmatched_a = np.where(x < 0)[0] + unmatched_b = np.where(y < 0)[0] + matches = np.asarray(matches) + return matches, unmatched_a, unmatched_b + + +def ious(atlbrs, btlbrs): + """ + Compute cost based on IoU + :type atlbrs: list[tlbr] | np.ndarray + :type atlbrs: list[tlbr] | np.ndarray + + :rtype ious np.ndarray + """ + ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float) + if ious.size == 0: + return ious + + ious = bbox_ious( + np.ascontiguousarray(atlbrs, dtype=np.float), + np.ascontiguousarray(btlbrs, dtype=np.float) + ) + + return ious + + +def iou_distance(atracks, btracks): + """ + Compute cost based on IoU + :type atracks: list[STrack] + :type btracks: list[STrack] + + :rtype cost_matrix np.ndarray + """ + + if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)): + atlbrs = atracks + btlbrs = btracks + else: + atlbrs = [track.tlbr for track in atracks] + btlbrs = [track.tlbr for track in btracks] + _ious = ious(atlbrs, btlbrs) + cost_matrix = 1 - _ious + + return cost_matrix + +def v_iou_distance(atracks, btracks): + """ + Compute cost based on IoU + :type atracks: list[STrack] + :type btracks: list[STrack] + + :rtype cost_matrix np.ndarray + """ + + if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)): + atlbrs = atracks + btlbrs = btracks + else: + atlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in atracks] + btlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in btracks] + _ious = ious(atlbrs, btlbrs) + cost_matrix = 1 - _ious + + return cost_matrix + +def embedding_distance(tracks, detections, metric='cosine'): + """ + :param tracks: list[STrack] + :param detections: list[BaseTrack] + :param metric: + :return: cost_matrix np.ndarray + """ + + cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float) + if cost_matrix.size == 0: + return cost_matrix + det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float) + #for i, track in enumerate(tracks): + #cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric)) + track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float) + cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Nomalized features + return cost_matrix + + +def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False): + if cost_matrix.size == 0: + return cost_matrix + gating_dim = 2 if only_position else 4 + gating_threshold = kalman_filter.chi2inv95[gating_dim] + measurements = np.asarray([det.to_xyah() for det in detections]) + for row, track in enumerate(tracks): + gating_distance = kf.gating_distance( + track.mean, track.covariance, measurements, only_position) + cost_matrix[row, gating_distance > gating_threshold] = np.inf + return cost_matrix + + +def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98): + if cost_matrix.size == 0: + return cost_matrix + gating_dim = 2 if only_position else 4 + gating_threshold = kalman_filter.chi2inv95[gating_dim] + measurements = np.asarray([det.to_xyah() for det in detections]) + for row, track in enumerate(tracks): + gating_distance = kf.gating_distance( + track.mean, track.covariance, measurements, only_position, metric='maha') + cost_matrix[row, gating_distance > gating_threshold] = np.inf + cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance + return cost_matrix + + +def fuse_iou(cost_matrix, tracks, detections): + if cost_matrix.size == 0: + return cost_matrix + reid_sim = 1 - cost_matrix + iou_dist = iou_distance(tracks, detections) + iou_sim = 1 - iou_dist + fuse_sim = reid_sim * (1 + iou_sim) / 2 + det_scores = np.array([det.score for det in detections]) + det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0) + #fuse_sim = fuse_sim * (1 + det_scores) / 2 + fuse_cost = 1 - fuse_sim + return fuse_cost + + +def fuse_score(cost_matrix, detections): + if cost_matrix.size == 0: + return cost_matrix + iou_sim = 1 - cost_matrix + det_scores = np.array([det.score for det in detections]) + det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0) + fuse_sim = iou_sim * det_scores + fuse_cost = 1 - fuse_sim + return fuse_cost \ No newline at end of file diff --git a/Perception/ObjectDetection/__pycache__/cipv_notice.cpython-38.pyc b/Perception/ObjectDetection/__pycache__/cipv_notice.cpython-38.pyc new file mode 100644 index 0000000..ea7ce81 Binary files /dev/null and b/Perception/ObjectDetection/__pycache__/cipv_notice.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/__pycache__/postprocess.cpython-38.pyc b/Perception/ObjectDetection/__pycache__/postprocess.cpython-38.pyc new file mode 100644 index 0000000..0d3119c Binary files /dev/null and b/Perception/ObjectDetection/__pycache__/postprocess.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/__pycache__/yolov6_trt.cpython-38.pyc b/Perception/ObjectDetection/__pycache__/yolov6_trt.cpython-38.pyc new file mode 100644 index 0000000..5f036f5 Binary files /dev/null and b/Perception/ObjectDetection/__pycache__/yolov6_trt.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/cipv_notice.py b/Perception/ObjectDetection/cipv_notice.py new file mode 100644 index 0000000..e364dc5 --- /dev/null +++ b/Perception/ObjectDetection/cipv_notice.py @@ -0,0 +1,60 @@ +def cipv_notice(obstacles, bev_lanes): + cipv = None + if obstacles is not None: + ego_left = None + ego_right = None + adjacent_left = None + adjacent_right = None + for bev_lane in bev_lanes: + if bev_lane.position_type == -1: + ego_left = bev_lane + elif bev_lane.position_type == 1: + ego_right = bev_lane + elif bev_lane.position_type == -2: + adjacent_left = bev_lane + elif bev_lane.position_type == 2: + adjacent_right = bev_lane + for obstacle in obstacles: + x = obstacle.position[0] + obstacle.position[2] / 2 + y = obstacle.position[1] + obstacle.position[3] # 计算尾部中心 + if y >= 320: # 过滤监控范围之外的物体 + if y <= 470: + if ego_left is not None and ego_right is not None: # 前方车道的CIPV + if ego_left.fit[0]*y**3 + ego_left.fit[1]*y**2 + ego_left.fit[2]*y + ego_left.fit[3] < x and ego_right.fit[0]*y**3 + ego_right.fit[1]*y**2 + ego_right.fit[2]*y + ego_right.fit[3] > x: + obstacle.lane = 0 + if cipv is None: + cipv = obstacle + else: + if cipv.position[1] + cipv.position[3] <= y: + cipv = obstacle + continue + else: + if x >= 390 and x <= 410: + if cipv is None: + cipv = obstacle + else: + if cipv.position[1] + cipv.position[3] <= y: + cipv = obstacle + obstacle.lane = 0 + continue + if adjacent_left is not None and ego_left is not None: # 左车道的CIPV + if adjacent_left.fit[0]*y**3 + adjacent_left.fit[1]*y**2 + adjacent_left.fit[2]*y + adjacent_left.fit[3] < x and ego_left.fit[0]*y**3 + ego_left.fit[1]*y**2 + ego_left.fit[2]*y + ego_left.fit[3] > x: + obstacle.lane = -1 + continue + else: + if x >= 370 and x <= 390: + obstacle.lane = -1 + continue + if ego_right is not None and adjacent_right is not None: # 右车道的CIPV + if ego_right.fit[0]*y**3 + ego_right.fit[1]*y**2 + ego_right.fit[2]*y + ego_right.fit[3] < x and adjacent_right.fit[0]*y**3 + adjacent_right.fit[1]*y**2 + adjacent_right.fit[2]*y + adjacent_right.fit[3] > x: + obstacle.lane = 1 + continue + else: + if x >= 410 and x <= 430: + obstacle.lane = 1 + continue + return obstacles, cipv + else: + return None, None + + diff --git a/Perception/ObjectDetection/postprocess.py b/Perception/ObjectDetection/postprocess.py new file mode 100644 index 0000000..331cd5d --- /dev/null +++ b/Perception/ObjectDetection/postprocess.py @@ -0,0 +1,181 @@ +import numpy as np +import torch + +class Traffic_Light(): # 对红绿灯信息进行封装 + def __init__(self, tlcolor_type, distance): + self.tlcolor = tlcolor_type # 定义红绿灯状态 + ###红绿灯状态### + # 绿:0 + # 红:1 + # 黄:2 + ############## + self.distance = distance + +class Obstacle: # 对障碍物信息进行封装 + def __init__(self, id, position, cls, speed, track): + self.id = id + self.position = position # 左上角位置 + self.cls = cls # 类别:0:小轿车 1:巴士、卡车 + self.speed = speed # 速度向量[speed_x, speed_y] + self.track = track # 保存前五帧的轨迹 + self.lane = None + #####所属车道信息##### + # 左侧车道:-1 + # 同一车道:0 + # 右侧车道:1 + # 未知:None + #################### + self.cipv = None # 是否为前方最邻近车辆 是:True + + +def postprocess(dets, CAM, CAM_BL, tracker, tracks, infer_time): # 目标追踪后处理主函数 + objs, traffic_lights_list = position(dets, CAM, CAM_BL) + if len(objs) >= 1: + objs = np.array(objs) + targets = tracker.update(torch.tensor(objs), [600, 800], (600, 800)) + tracks, obstacles = update_tracks(tracks, targets, infer_time) + else: + obstacles = None + objs = None + if len(traffic_lights_list): + traffic_light = tl_filter(traffic_lights_list) + else: + traffic_light = None + return obstacles, objs, tracks, traffic_light + + +def position(dets, CAM, CAM_BL): # 定位 + objs = [] + traffic_lights_list = [] + for det in dets: + box = np.copy(det) + if box[5] == 0 or box[5] == 1 or box[5] == 2 or box[5] == 3 or box[5] == 4 and box[4] >= 0.5: + x0 = box[0] # 左边界 + x1 = box[2] # 右边界 + y1 = box[3] # 下底 + if (x0+x1)/2 > 0 and (x0+x1)/2 < 180 and y1 <= 240: # 处理左后视镜 + position = CAM_BL.pixel_to_world([(x0 + x1) / 2, 240 - y1])[0] # 相对距离 + if box[5] == 2: + box[0] = 392 + int((position[0, 0])) + box[1] = 470 + int(position[0, 1]) + box[2] = 400 + int((position[0, 0])) + box[3] = 490 + int(position[0, 1]) + elif box[5] == 3 or box[5] == 4: + box[0] = 392 + int((position[0, 0])) + box[1] = 470 + int(position[0, 1]) + box[2] = 400 + int((position[0, 0])) + box[3] = 530 + int(position[0, 1]) + elif (x0+x1)/2 < 1280 and (x0+x1)/2 > 1100 and y1 <= 240: # 处理左后视镜 + position = CAM_BL.pixel_to_world([(x0 + x1) / 2, 240 - y1])[0] # 相对距离 + if box[5] == 2: + box[0] = 400 + int((position[0, 0])) + box[1] = 470 + int(position[0, 1]) + box[2] = 408 + int((position[0, 0])) + box[3] = 490 + int(position[0, 1]) + elif box[5] == 3 or box[5] == 4: + box[0] = 400 + int((position[0, 0])) + box[1] = 470 + int(position[0, 1]) + box[2] = 408 + int((position[0, 0])) + box[3] = 530 + int(position[0, 1]) + else: # 处理前视摄像头 + if box[5] == 2: + position = CAM.pixel_to_world([(x0+x1)/2, 720 - y1])[0] + box[0] = 396 + int((position[0, 0] / 5)) + box[1] = 450 - int(position[0, 1] / 1.8) + box[2] = 404 + int((position[0, 0] / 5)) + box[3] = 470 - int(position[0, 1] / 1.8) + elif box[5] == 3 or box[5] == 4: + position = CAM.pixel_to_world([(x0 + x1) / 2, 720 - y1])[0] + box[0] = 396 + int((position[0, 0] / 5)) + box[1] = 410 - int(position[0, 1] / 1.8) + box[2] = 404 + int((position[0, 0] / 5)) + box[3] = 470 - int(position[0, 1] / 1.8) + if x1 >= 1270 and x1-x0 >= 50: # 对近处的物体进行横向距离补偿 + box[0] = 416 + box[2] = 424 + elif x0 <= 10 and x1-x0 >= 50: # 对近处的物体进行横向距离补偿 + box[0] = 376 + box[2] = 384 + if box[3] <= 250 or np.abs((box[0] + box[2]) / 2 - 400) >= 50: + continue + objs.append(box) + + elif box[5] == 5 or box[5] == 6 or box[5] == 7 and box[4] >= 0.95: # 红绿灯 + s = (box[2] - box[0]) * (box[3] - box[1]) # 面积 + distance = scale_depth(s, f=0.07) * 5 + traffic_lights_list.append([box[5]-5, distance]) + return objs, traffic_lights_list + + +def tl_filter(tl_list): # 对红绿灯信息进行整合 + tl_list = np.array(tl_list) + tl_color = tl_list[:, 0].astype(np.int8) + tl_distance = tl_list[:, 1] + tl_color_filted = np.argmax(np.bincount(tl_color)) + tl_distance_filted = np.average(tl_distance) + traffic_light = Traffic_Light(tl_color_filted, tl_distance_filted) + return traffic_light + +def update_tracks(tracks, targets, infer_time): + ids = [] + new_tracks = [] + obstacles = [] + for t in targets: + tlwh = t.tlwh + tid = t.track_id + ids.append(int(tid)) + if len(tracks) == 0: + tracks.append([int(tid), [tlwh[0], tlwh[1], tlwh[2], tlwh[3], t.score]]) + else: + for i in range(len(tracks)): + if tid == tracks[i][0]: + tracks[i].append([tlwh[0], tlwh[1], tlwh[2], tlwh[3], t.score]) + break + if i == len(tracks) - 1: + tracks.append([int(tid), [tlwh[0], tlwh[1], tlwh[2], tlwh[3], t.score]]) + + for tid in ids: # 当前出现的存入新的轨迹中 + for track in tracks: + if tid == track[0]: + if len(track) >= 6: + track = [track[0]] + track[-5:] + new_tracks.append(track) + + if len(new_tracks) >= 0: + for track in new_tracks: + pos_now = track[-1] + speed_x, speed_y = speed_estimation(track, infer_time) + + if pos_now[3] >= 50: + cls = 1 + pos_now[3] = 60 + else: + cls = 0 + pos_now[3] = 20 + obstacle = Obstacle(track[0], pos_now, cls, [speed_x, speed_y], track[1:]) # 障碍物封装 + obstacles.append(obstacle) + else: + obstacles = None + return new_tracks, obstacles + + +def scale_depth(s, f=0.07): # 几何方法测红绿灯距离 f: 焦距 + common_size = 0.75 + proportion = np.sqrt(common_size / (s*0.0002**2)) + distance = (proportion+1)*f + return distance + + +def speed_estimation(track, infer_time): # 计算速度差 + if len(track) >= 4: + pts = np.array(track[1:]) + fit = np.polyfit(pts[:, 1], pts[:, 0], 1) + speed_y = (track[-1][1] - track[-3][1]) / (infer_time * 2) # 计算周围汽车速度 + speed_x = speed_y * fit[0] + elif len(track) == 3: + speed_y = (track[-1][1] - track[-2][1]) / infer_time + speed_x = (track[-1][0] - track[-2][0]) / infer_time + else: + speed_x = 0 + speed_y = 0 + return speed_x, speed_y diff --git a/Perception/ObjectDetection/utils/__init__.py b/Perception/ObjectDetection/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Perception/ObjectDetection/utils/__pycache__/__init__.cpython-38.pyc b/Perception/ObjectDetection/utils/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..c9f1c4f Binary files /dev/null and b/Perception/ObjectDetection/utils/__pycache__/__init__.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/utils/__pycache__/utils.cpython-38.pyc b/Perception/ObjectDetection/utils/__pycache__/utils.cpython-38.pyc new file mode 100644 index 0000000..cdfe6e1 Binary files /dev/null and b/Perception/ObjectDetection/utils/__pycache__/utils.cpython-38.pyc differ diff --git a/Perception/ObjectDetection/utils/utils.py b/Perception/ObjectDetection/utils/utils.py new file mode 100644 index 0000000..5fd4ec0 --- /dev/null +++ b/Perception/ObjectDetection/utils/utils.py @@ -0,0 +1,122 @@ +import tensorrt as trt +import pycuda.autoinit +import pycuda.driver as cuda +import numpy as np +import cv2 +from ..postprocess import postprocess # 后处理 + +class BaseEngine(object): + def __init__(self, engine_path, imgsz=(640, 640)): + self.imgsz = imgsz + self.n_classes = 10 + self.class_names = ["person", "rider", "car", "bus", "truck", "tl_green", "tl_red", "tl_yellow", "tl_none", + "traffic sign"] + logger = trt.Logger(trt.Logger.WARNING) + runtime = trt.Runtime(logger) + trt.init_libnvinfer_plugins(logger,'') # initialize TensorRT plugins + with open(engine_path, "rb") as f: + serialized_engine = f.read() + engine = runtime.deserialize_cuda_engine(serialized_engine) + self.context = engine.create_execution_context() + self.inputs, self.outputs, self.bindings = [], [], [] + self.stream = cuda.Stream() + for binding in engine: + size = trt.volume(engine.get_binding_shape(binding)) + dtype = trt.nptype(engine.get_binding_dtype(binding)) + host_mem = cuda.pagelocked_empty(size, dtype) + device_mem = cuda.mem_alloc(host_mem.nbytes) + self.bindings.append(int(device_mem)) + if engine.binding_is_input(binding): + self.inputs.append({'host': host_mem, 'device': device_mem}) + else: + self.outputs.append({'host': host_mem, 'device': device_mem}) + + def infer(self, img): + self.inputs[0]['host'] = np.ravel(img) + for inp in self.inputs: # 将数据转到gpu + cuda.memcpy_htod_async(inp['device'], inp['host'], self.stream) + self.context.execute_async_v2( + bindings=self.bindings, + stream_handle=self.stream.handle) # 推理过程 + for out in self.outputs: + cuda.memcpy_dtoh_async(out['host'], out['device'], self.stream) # 从GPU抓取输出 + self.stream.synchronize() # 同步视频流 + data = [out['host'] for out in self.outputs] + return data + + def inference(self, origin_img, CAM, CAM_BL, img_show, tracker, tracks, infer_time, conf=0.5): + img, ratio = preproc(origin_img, self.imgsz) + data = self.infer(img) + num, final_boxes, final_scores, final_cls_inds = data + final_boxes = np.reshape(final_boxes/ratio, (-1, 4)) + dets = np.concatenate([final_boxes[:num[0]], np.array(final_scores)[:num[0]].reshape(-1, 1), + np.array(final_cls_inds)[:num[0]].reshape(-1, 1)], axis=-1) + objs = None + obstacles = None + traffic_light = None + if dets is not None and len(dets): + obstacles, objs, tracks, traffic_light = postprocess(dets, CAM, CAM_BL, tracker, tracks, infer_time) + final_boxes, final_scores, final_cls_inds = dets[:, :4], dets[:, 4], dets[:, 5] + img_show = vis(img_show, final_boxes, final_scores, final_cls_inds, conf=conf, + class_names=self.class_names) + return img_show, obstacles, objs, tracks, traffic_light + + +def preproc(img, input_size, swap=(2, 0, 1)): + padded_img = np.ones((input_size[0], input_size[1], 3)) * 114.0 + r = min(input_size[0] / img.shape[0], input_size[1] / img.shape[1]) + resized_img = cv2.resize( + img, + (int(img.shape[1] * r), int(img.shape[0] * r)), + interpolation=cv2.INTER_LINEAR, + ).astype(np.float32) + padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img + padded_img = padded_img[:, :, ::-1] + padded_img /= 255.0 + padded_img = padded_img.transpose(swap) + padded_img = np.ascontiguousarray(padded_img, dtype=np.float32) + return padded_img, r + + +def vis(img, boxes, scores, cls_ids, conf=0.5, class_names=None): + for i in range(len(boxes)): + box = boxes[i] + cls_id = int(cls_ids[i]) + score = scores[i] + if score < conf: + continue + x0 = int(box[0]) + y0 = int(box[1]) + x1 = int(box[2]) + y1 = int(box[3]) + + color = (_COLORS[cls_id] * 255).astype(np.uint8).tolist() + cv2.rectangle(img, (x0, y0), (x1, y1), color, 2) + ''' + #text = '{}'.format(class_names[cls_id]) + #txt_color = (0, 0, 0) if np.mean(_COLORS[cls_id]) > 0.5 else (255, 255, 255) + #font = cv2.FONT_HERSHEY_SIMPLEX + #txt_size = cv2.getTextSize(text, font, 1, 2)[0] + #txt_bk_color = (_COLORS[cls_id] * 255 * 0.7).astype(np.uint8).tolist() + #cv2.rectangle(img, (x0, y0 + 1), (x0 + txt_size[0] + 1, y0 + int(1.5 * txt_size[1])), txt_bk_color, -1) + #cv2.putText(img, text, (x0, y0 + txt_size[1]), font, 1, txt_color, thickness=2) + ''' + return img + + +_COLORS = np.array( + [ + 0.000, 0.447, 0.741, + 0.850, 0.325, 0.098, + 0.929, 0.694, 0.125, + 0.494, 0.184, 0.556, + 0.466, 0.674, 0.188, + 0.301, 0.745, 0.933, + 0.635, 0.078, 0.184, + 0.300, 0.300, 0.300, + 0.600, 0.600, 0.600, + 1.000, 0.000, 0.000, + ] +).astype(np.float32).reshape(-1, 3) + + diff --git a/Perception/ObjectDetection/yolov6_trt.py b/Perception/ObjectDetection/yolov6_trt.py new file mode 100644 index 0000000..4bee237 --- /dev/null +++ b/Perception/ObjectDetection/yolov6_trt.py @@ -0,0 +1,7 @@ +from ObjectDetection.utils.utils import BaseEngine + +class YOLOPredictor(BaseEngine): + def __init__(self, engine_path , imgsz=(640,640)): + super(YOLOPredictor, self).__init__(engine_path) + self.imgsz = imgsz # your model infer image size + self.n_classes = 10 # your model classes \ No newline at end of file diff --git a/Perception/StopLineDetection/Linedetection.py b/Perception/StopLineDetection/Linedetection.py new file mode 100644 index 0000000..d00b0df --- /dev/null +++ b/Perception/StopLineDetection/Linedetection.py @@ -0,0 +1,38 @@ +import cv2 +import numpy as np + +class Stop_Line: # 对停止线进行封装 + def __init__(self, center_y): + self.center_y = center_y + self.length = 80 + +def line_filter(img, M): + img = img[400:680, 100:1000, :] + img = cv2.warpPerspective(img, M, (900, 900)) + img = cv2.resize(img, (300, 300)) + + img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY) + + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) + img = cv2.erode(img, kernel, iterations=1) + + img = cv2.Sobel(img, -1, 0, 2, ksize=3) + img = cv2.Canny(img, 250, 300) + img = cv2.dilate(img, kernel, iterations=1) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) + img = cv2.erode(img, kernel, iterations=1) + + stop_line = detect_stop_line(img, 0.15) + + return stop_line + +def detect_stop_line(img, conf): + sum_y = np.average(img, axis=1) / 255 + idxs = np.where(sum_y >= conf)[0] + if len(idxs.tolist()) >= 1: + stop_line_pos = 300 - np.average(idxs) + stop_line = Stop_Line(stop_line_pos/4) + else: + stop_line = None + return stop_line \ No newline at end of file diff --git a/Perception/StopLineDetection/__pycache__/Linedetection.cpython-38.pyc b/Perception/StopLineDetection/__pycache__/Linedetection.cpython-38.pyc new file mode 100644 index 0000000..591b99a Binary files /dev/null and b/Perception/StopLineDetection/__pycache__/Linedetection.cpython-38.pyc differ diff --git a/Planning/Cruise.py b/Planning/Cruise.py new file mode 100644 index 0000000..e64cfc7 --- /dev/null +++ b/Planning/Cruise.py @@ -0,0 +1,15 @@ +import numpy as np +import sys +sys.path.insert(0, 'D:/autodrive/Control') +from controllers.purepursuit_controller import Purepursuit + + +def Cruise(vertical_pid, horizontal_pid, truck, speed_limit, nav_line): + vertical_pid.update_e(speed_limit - truck.speed) + acc = vertical_pid.get_a() + 0.5 + # horizontal_pid.update_e(float(np.average(nav_line.pts_x[-5:])) - 400) + # ang = horizontal_pid.get_u() / np.pi + 0.5 + ang = Purepursuit(truck, nav_line.pts) + if acc > 1: acc = 1 + elif acc < 0: acc = 0 + return acc, ang \ No newline at end of file diff --git a/Planning/Decider.py b/Planning/Decider.py new file mode 100644 index 0000000..d3f5a12 --- /dev/null +++ b/Planning/Decider.py @@ -0,0 +1 @@ + diff --git a/Planning/FSMPlaning.py b/Planning/FSMPlaning.py new file mode 100644 index 0000000..2962631 --- /dev/null +++ b/Planning/FSMPlaning.py @@ -0,0 +1,90 @@ +from transitions import Machine + +class FSMPlanner(object): + states = ['Cruise', 'Follow', 'Passing', 'ActiveCollisionAvoidance'] # 目前超车只能向左超 + + def __init__(self, name): + self.name = name + self.kittens_rescued = 0 # 记录决策数 + self.machine = Machine(model=self, states=FSMPlanner.states, initial='Cruise') # 初始化状态机 + self.machine.add_transition(trigger='Cruise2Follow', source='Cruise', dest='Follow') # 定速巡航到跟随 + self.machine.add_transition(trigger='Follow2Cruise', source='Follow', dest='Cruise') # 跟随到自适应巡航 + self.machine.add_transition(trigger='Follow2Passing', source='Follow', dest='Passing') # 跟随到超车 + self.machine.add_transition(trigger='Passing2Follow', source='Passing', dest='Follow') # 超车到跟随 + self.machine.add_transition(trigger='Passing2Cruise', source='Passing', dest='Cruise') # 超车到巡航 + self.machine.add_transition(trigger='Follow2ACA', source='Follow', dest='ActiveCollisionAvoidance') # 跟随到主动避障 + self.machine.add_transition(trigger='ACA2Follow', source='ActiveCollisionAvoidance', dest='Follow') # 主动避障到跟随 + self.machine.add_transition(trigger='Passing2ACA', source='Passing', dest='ActiveCollisionAvoidance') # 超车到主动避撞 + self.machine.add_transition(trigger='ACA2Cruise', source='ActiveCollisionAvoidance', dest='Cruise') # 主动避障到定速巡航 + + def update_journal(self): # 更新记录 + self.kittens_rescued += 1 + + +class PlanTrigger: + def __init__(self): + self.value = None + self.obstacles = None + self.lanes = None + + self.ego_lane_infor = 0 # 0:前方区域无车辆或障碍物 1:前车处于跟踪范围内 2:前车处于避障范围内] + self.speed_infor = 0 # 0:前车纵向速度低于最低跟踪速度 1:前车纵向速度高于最高跟踪速度,] + self.near_lane_infor = [0, 0] # [0:有左车道 1:无左车道,0:有右车道 1:无右车道] + self.near_car_infor = [0, 0] # [0:左侧车道无车辆行驶 1:左相邻车道有车辆行驶,0:右侧车道无车辆行驶 1:右侧车道有车辆行驶] + self.t_sheld = 1 # 0:不满足车道变换时间阈值 1:满足车道变换时间阈值 + + self.state_now = 'Cruise' + self.state_trigger = None + + def update_infor(self, obstacles, lanes): # 更新基础条件 + self.obstacles = obstacles + self.ego_lane_infor = 0 + for obstacle in self.obstacles: # 更新周围汽车信息 + if obstacle.lane == 0 and obstacle.position[1] + obstacle.position[3] >= 350: + if obstacle.speed[1] <= -1: + self.speed_infor = 1 + elif obstacle.speed[1] >= -1: + self.speed_infor = 0 + if obstacle.position[1] + obstacle.position[3] <= 420: + self.ego_lane_infor = 1 + else: + self.ego_lane_infor = 2 + + elif obstacle.lane == -1 and obstacle.position[1] + obstacle.position[3] >= 420: + self.near_car_infor[0] = 1 + + elif obstacle.lane == 1 and obstacle.position[1] + obstacle.position[3] >= 420: + self.near_car_infor[1] = 1 + + for lane in lanes: + if lane.position_type == -2: + self.near_lane_infor[0] == 1 + elif lane.position_type == 2: + self.near_lane_infor[1] == 1 + + def update_trigger(self, state_now, obstacles, lane): + self.update_infor(obstacles, lane) + + self.state_now = state_now + if self.state_now == 'Cruise' and self.ego_lane_infor == 1 and self.speed_infor == 0: # 发现前车且速度差为负 + self.state_trigger = 'Cruise2Follow' + elif self.state_now == 'Follow' and (self.ego_lane_infor == 0 or (self.ego_lane_infor == 1 and self.speed_infor == 1)): + self.state_trigger = 'Follow2Cruise' + elif self.state_now == 'Follow' and self.ego_lane_infor == 2 and self.near_car_infor == [1, 1]: + self.state_trigger = 'Follow2ACA' + elif self.state_now == 'ActiveCollisionAvoidance' and self.ego_lane_infor == 1 and self.speed_infor == 0 and self.near_car_infor != [1, 1]: + self.state_trigger = 'ACA2Follow' + elif self.state_now == 'Follow' and self.near_car_infor != [1, 1] and self.t_sheld == 1: + self.state_trigger = 'Follow2Passing' + elif self.state_now == 'Passing' and self.ego_lane_infor == 1 and (self.near_lane_infor[0] != 0 or self.near_lane_infor[1] != 0): + self.state_trigger = 'Passing2Follow' + elif self.state_now == 'Passing' and self.ego_lane_infor == 0: + self.state_trigger = 'Passing2Cruise' + elif self.state_now == 'Passing' and self.ego_lane_infor == 2: + self.state_trigger = 'Passing2ACA' + elif self.state_now == 'ActiveCollisionAvoidance' and self.ego_lane_infor == 0: + self.state_trigger = 'ACA2Cruise' + else: + self.state_trigger = None + return self.state_trigger + # fsmplanner = FSMPlanner("autotruck") diff --git a/Planning/Follow.py b/Planning/Follow.py new file mode 100644 index 0000000..b82b0af --- /dev/null +++ b/Planning/Follow.py @@ -0,0 +1,15 @@ +import numpy as np +import sys +sys.path.insert(0, 'D:/autodrive/Control') +from controllers.fuzzy_controller import fuzzy_compute + + +def Follow(cipv, vertical_fuzzy, horizontal_pid, nav_line): + dspeed = int(cipv.speed[1]) + distance = 470 - int(cipv.position[1] + cipv.position[3]) + #print(dspeed) + #print(distance) + horizontal_pid.update_e(float(np.average(nav_line.pts_x[-5:])) - 400) + acc = fuzzy_compute(vertical_fuzzy, dspeed, distance) + ang = horizontal_pid.get_u() / np.pi + 0.5 + return acc, ang \ No newline at end of file diff --git a/Planning/__pycache__/Cruise.cpython-38.pyc b/Planning/__pycache__/Cruise.cpython-38.pyc new file mode 100644 index 0000000..a143784 Binary files /dev/null and b/Planning/__pycache__/Cruise.cpython-38.pyc differ diff --git a/Planning/__pycache__/FSMPlaning.cpython-38.pyc b/Planning/__pycache__/FSMPlaning.cpython-38.pyc new file mode 100644 index 0000000..a81b553 Binary files /dev/null and b/Planning/__pycache__/FSMPlaning.cpython-38.pyc differ diff --git a/Planning/__pycache__/Follow.cpython-38.pyc b/Planning/__pycache__/Follow.cpython-38.pyc new file mode 100644 index 0000000..037961f Binary files /dev/null and b/Planning/__pycache__/Follow.cpython-38.pyc differ diff --git a/script/ABS.py b/script/ABS.py new file mode 100644 index 0000000..fb3bef0 --- /dev/null +++ b/script/ABS.py @@ -0,0 +1,13 @@ +class TTC: + def __init__(self, distance): + self.distance = distance + self.history_distance = distance + self.relative_speed = None + self.TTC = None + def update(self, distance, delta_t): + self.history_distance = self.distance + self.distance = distance + self.relative_speed = (self.history_distance - self.distance)/delta_t + self.TTC = self.distance / self.relative_speed + return self.TTC + diff --git a/script/__pycache__/ABS.cpython-38.pyc b/script/__pycache__/ABS.cpython-38.pyc new file mode 100644 index 0000000..9101200 Binary files /dev/null and b/script/__pycache__/ABS.cpython-38.pyc differ diff --git a/script/__pycache__/NAV.cpython-38.pyc b/script/__pycache__/NAV.cpython-38.pyc new file mode 100644 index 0000000..49c0255 Binary files /dev/null and b/script/__pycache__/NAV.cpython-38.pyc differ diff --git a/script/__pycache__/Tracks.cpython-38.pyc b/script/__pycache__/Tracks.cpython-38.pyc new file mode 100644 index 0000000..547bada Binary files /dev/null and b/script/__pycache__/Tracks.cpython-38.pyc differ diff --git a/script/__pycache__/bev.cpython-38.pyc b/script/__pycache__/bev.cpython-38.pyc new file mode 100644 index 0000000..5d0b931 Binary files /dev/null and b/script/__pycache__/bev.cpython-38.pyc differ diff --git a/script/__pycache__/camera.cpython-38.pyc b/script/__pycache__/camera.cpython-38.pyc new file mode 100644 index 0000000..cba78ab Binary files /dev/null and b/script/__pycache__/camera.cpython-38.pyc differ diff --git a/script/__pycache__/cipv_notice.cpython-38.pyc b/script/__pycache__/cipv_notice.cpython-38.pyc new file mode 100644 index 0000000..bfa8afc Binary files /dev/null and b/script/__pycache__/cipv_notice.cpython-38.pyc differ diff --git a/script/__pycache__/control.cpython-38.pyc b/script/__pycache__/control.cpython-38.pyc new file mode 100644 index 0000000..4ec591c Binary files /dev/null and b/script/__pycache__/control.cpython-38.pyc differ diff --git a/script/__pycache__/drive.cpython-38.pyc b/script/__pycache__/drive.cpython-38.pyc new file mode 100644 index 0000000..556c9eb Binary files /dev/null and b/script/__pycache__/drive.cpython-38.pyc differ diff --git a/script/__pycache__/grab_screen.cpython-38.pyc b/script/__pycache__/grab_screen.cpython-38.pyc new file mode 100644 index 0000000..068da62 Binary files /dev/null and b/script/__pycache__/grab_screen.cpython-38.pyc differ diff --git a/script/__pycache__/ocr_tool.cpython-38.pyc b/script/__pycache__/ocr_tool.cpython-38.pyc new file mode 100644 index 0000000..678bb75 Binary files /dev/null and b/script/__pycache__/ocr_tool.cpython-38.pyc differ diff --git a/script/camera.py b/script/camera.py new file mode 100644 index 0000000..91bc44e --- /dev/null +++ b/script/camera.py @@ -0,0 +1,95 @@ +import numpy as np + +class cam: + def __init__(self): + self.R = np.array([[1, 0, 0], + [0, 1, -11], + [0, 11, 1]]) # 机身坐标与地面坐标的转换矩阵 + self.T = [0, 0, 300] # 机身坐标与地面坐标间的平移向量 + self.f = [3000, 4000] + self.center = [640, 0] + ''' + # 已经调好的固定参数, 改坏后恢复 + self.R = [[1, 0, 0], + [0, 1, -11], + [0, 11, 1]] # 机身坐标与地面坐标的转换矩阵 + self.T = [0, 0, 300] # 机身坐标与地面坐标间的平移向量 + self.f = [3000, 4000] + self.center = [640, 0] + ''' + self.camera_intrinsic = [[self.f[0], 0, self.center[0]], + [ 0, self.f[1], self.center[1]], + [ 0, 0, 1]] # 相机内参 + self.K_inv = np.mat(self.camera_intrinsic).I # 相机内参的逆 + self.R_inv = np.mat(self.R).I + self.t = np.asmatrix(self.T).T + self.R_inv_T = np.dot(self.R_inv, np.asmatrix(self.t)) + + def update_intrinsic(self, vanish_point): + R3 = vanish_point[1][0]*np.dot(self.K_inv, vanish_point) + self.R[1, 2] = -R3[1][0]/3 + self.R[2, 1] = R3[1][0] / 3 + + def pixel_to_world(self, img_point): + coords = np.array([[img_point[0]], [img_point[1]], [1.0]]) # 像素坐标 + cam_point = np.dot(self.K_inv, coords) # 像素坐标->物理成像坐标 + cam_R_inv = np.dot(self.R_inv, cam_point) # 求zc + scale = self.R_inv_T[2][0] / cam_R_inv[2][0] + scale_world = np.multiply(scale, cam_R_inv) + world_point = np.asmatrix(scale_world) - np.asmatrix(self.R_inv_T) + return world_point.T + +class backcam_left: + def __init__(self): + self.R = [[1, 0, 0], + [0, 1, 0], + [0, 0, 1]] # 机身坐标与地面坐标的转换矩阵 + self.T = [0, 0, 100] # 机身坐标与地面坐标间的平移向量 + self.f = [1000, 1000] + self.center = [180, 0] + self.camera_intrinsic = [[self.f[0], 0, self.center[0]], + [ 0, self.f[1], self.center[1]], + [ 0, 0, 1]] # 相机内参 + self.K_inv = np.mat(self.camera_intrinsic).I # 相机内参的逆 + self.R_inv = np.mat(self.R).I + self.t = np.asmatrix(self.T).T + self.R_inv_T = np.dot(self.R_inv, np.asmatrix(self.t)) + + def pixel_to_world(self, img_point): + coords = np.array([[img_point[0]], [img_point[1]], [1.0]]) # 像素坐标 + cam_point = np.dot(self.K_inv, coords) # 像素坐标->物理成像坐标 + cam_R_inv = np.dot(self.R_inv, cam_point) # 求zc + scale = self.R_inv_T[2][0] / cam_R_inv[2][0] + scale_world = np.multiply(scale, cam_R_inv) + world_point = np.asmatrix(scale_world) - np.asmatrix(self.R_inv_T) + return world_point.T + +class backcam_right: + def __init__(self): + self.R = [[1, 0, 0], + [0, 1, 0], + [0, 0, 1]] # 机身坐标与地面坐标的转换矩阵 + self.T = [0, 0, 100] # 机身坐标与地面坐标间的平移向量 + self.f = [1000, 1000] + self.center = [1100, 0] + self.camera_intrinsic = [[self.f[0], 0, self.center[0]], + [ 0, self.f[1], self.center[1]], + [ 0, 0, 1]] # 相机内参 + self.K_inv = np.mat(self.camera_intrinsic).I # 相机内参的逆 + self.R_inv = np.mat(self.R).I + self.t = np.asmatrix(self.T).T + self.R_inv_T = np.dot(self.R_inv, np.asmatrix(self.t)) + + def pixel_to_world(self, img_point): + coords = np.array([[img_point[0]], [img_point[1]], [1.0]]) # 像素坐标 + cam_point = np.dot(self.K_inv, coords) # 像素坐标->物理成像坐标 + cam_R_inv = np.dot(self.R_inv, cam_point) # 求zc + scale = self.R_inv_T[2][0] / cam_R_inv[2][0] + scale_world = np.multiply(scale, cam_R_inv) + world_point = np.asmatrix(scale_world) - np.asmatrix(self.R_inv_T) + return world_point.T + + +def transposition(model, pts): + output = model(pts) + return output diff --git a/script/grab_screen.py b/script/grab_screen.py new file mode 100644 index 0000000..fa602f7 --- /dev/null +++ b/script/grab_screen.py @@ -0,0 +1,47 @@ +import numpy as np +import win32gui, win32ui, win32con + +def grab_screen(region): + hwin = win32gui.GetDesktopWindow() + left,top,x2,y2 = region + width = x2 - left + 1 + height = y2 - top + 1 + hwindc = win32gui.GetWindowDC(hwin) + srcdc = win32ui.CreateDCFromHandle(hwindc) + memdc = srcdc.CreateCompatibleDC() + bmp = win32ui.CreateBitmap() + bmp.CreateCompatibleBitmap(srcdc, width, height) + memdc.SelectObject(bmp) + memdc.BitBlt((0, 0), (width, height), srcdc, (left, top), win32con.SRCCOPY) + + signedIntsArray = bmp.GetBitmapBits(True) + img = np.frombuffer(signedIntsArray, dtype='uint8') + img.shape = (height,width,4) + + srcdc.DeleteDC() + memdc.DeleteDC() + win32gui.ReleaseDC(hwin, hwindc) + win32gui.DeleteObject(bmp.GetHandle()) + + return img + +''' +import cv2 +import time +from PIL import ImageGrab, Image +def cv2_shot(region): + img = ImageGrab.grab(bbox=region) + img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) + return img_cv + + +last_time = time.time() +while True: + img = grab_screen(region=(0,30,800,630)) + cv2.imshow('window', img) + print('{}s to detect'.format(time.time() - last_time)) + last_time = time.time() + if cv2.waitKey(25) & 0xFF == ord('q'): + cv2.destroyAllWindows() + break +''' \ No newline at end of file diff --git a/script/main.py b/script/main.py new file mode 100644 index 0000000..e05ad93 --- /dev/null +++ b/script/main.py @@ -0,0 +1,202 @@ +''' +自动驾驶主程序 +''' +import warnings +warnings.filterwarnings('ignore') +import os, sys + +sys.path.insert(0, 'D:/autodrive/Perception') +from ObjectDetection.yolov6_trt import YOLOPredictor +from ObjectDetection.cipv_notice import cipv_notice +from ObjectDetection.ByteTrack.tracker.byte_tracker import BYTETracker +from LaneDetection.clrnet_trt import CLRNet +from StopLineDetection.Linedetection import line_filter +#from DepthEstimation.monodepth2.monodepth import load_monodepth, depth_detect +''' +sys.path.insert(0,'D:/autodrive/PaddleSeg') +from deploy.python.seg_infer import SegPredictor +''' +sys.path.insert(0,'D:/autodrive/Planning') +from FSMPlaning import FSMPlanner, PlanTrigger +from Cruise import Cruise +from Follow import Follow + +sys.path.insert(0,'D:/autodrive/Control') +from drive import driver, end, Truck +from controllers.PID_controller import PID +from controllers.fuzzy_controller import fuzzy_initialization + +sys.path.insert(0,'D:/autodrive/Navigation') +from BevDraw import draw_bev, print_info +from Navigation_Process import nav_process + +from ocr_tool import speed_detect +from grab_screen import grab_screen +import cv2 +import time +import numpy as np +import torch +from paddleocr import PaddleOCR +from ABS import TTC +from camera import cam, backcam_left +from threading import Thread +import threading +last_time = time.time() + +torch.backends.cudnn.benchmark = True +torch.backends.cudnn.deterministic = True +device = 'cuda:0' + +CAM = cam() # 定义相机 +CAM_BL = backcam_left() # 定义左后视镜 +#encoder, depth_decoder, feed_height, feed_width = load_monodepth(device) # 加载深度估计模型 +yolopredictor = YOLOPredictor(engine_path="D:/autodrive/Perception/ObjectDetection/weights/yolov6s_bdd_300.engine") +clrnet = CLRNet("D:/autodrive/Perception/LaneDetection/weights/llamas_dla34.engine") +ocr = PaddleOCR(enable_mkldnn=True, use_tensorrt=True, use_angle_cls=False, lang="en", use_gpu=False, show_log=False) +#segpredictor = SegPredictor("D:/autodrive/PaddleSeg/output/inference_model_mobile_seg/deploy.yaml") +fsmplanner = FSMPlanner("autotruck") +planetrigger = PlanTrigger() + +track_thresh = 0.4 +track_buffer = 15 +match_thresh = 0.75 +frame_rate = 5 +aspect_ratio_thresh = 1.6 +min_box_area = 10 +mot20_check = False +vehicle_tracker = BYTETracker(track_thresh, track_buffer, match_thresh, mot20_check, frame_rate) + +truck = Truck() # 初始化汽车状态模型 +tracks = [[1]] + +horizontal_pid = PID(0.02, 0.0005, 0.01) # 初始化横向PID控制算法 +vertical_pid = PID(0.05, 0, 0.1) # 初始化纵向PID控制算法 +vertical_fuzzy = fuzzy_initialization() # 初始化模糊控制跟踪算法 + +pts1 = np.array([[0, 0], [900, 0], + [0, 280], [900, 280]], dtype=np.float32) +pts2 = np.array([[0, 0], [900, 0], + [350, 900], [550, 900]], dtype=np.float32) +M = cv2.getPerspectiveTransform(pts1, pts2) # 原图的透视变换矩阵 + +speed = 0 +truck.ang = 0.5 +speed_limit = 0 +stop = 0 +car_stop = 0 +red_stop = 0 +state = 'Cruise' + +class MyThread(threading.Thread): + def __init__(self, func, args=()): + super(MyThread, self).__init__() + self.func = func + self.args = args + def run(self): + self.result = self.func(*self.args) + def get_result(self): + threading.Thread.join(self) # 等待线程执行完毕 + try: + return self.result + except Exception: + return None + +img = cv2.cvtColor(grab_screen(region=(0, 40, 1360, 807)), cv2.COLOR_RGB2BGR) # [768, 1360, 3]RGB通道图像 +im0 = img.copy() +while True: + if 'last_time' in locals().keys(): + refer_time = time.time() - last_time + else: + refer_time = 0.18 + last_time = time.time() + + img = cv2.cvtColor(grab_screen(region=(0, 40, 1360, 807)), cv2.COLOR_RGB2BGR) # [768, 1360, 3]RGB通道图像 + im0 = img.copy() + + # 环境感知 + if np.average(img) <= 50: weather = 'dark' # 判断天气 + elif np.average(img) >= 200: weather = 'snowy' + else: weather = 'white' + ''' + img_seg = segpredictor.run(cv2.resize(img[60:570, 185:1095, :], (1280, 720))) + img_seg = cv2.cvtColor(np.asarray(img_seg), cv2.COLOR_RGB2BGR) + cv2.imshow('test', cv2.resize(img_seg, (416, 234))) + ''' + + im1 = cv2.resize(cv2.cvtColor(img, cv2.COLOR_RGB2BGR), (1280, 720)).copy() # 检测结果画布 + + im1, obstacles, objs, tracks, traffic_light = yolopredictor.inference(cv2.resize(cv2.cvtColor(img, cv2.COLOR_RGB2BGR), (1280, 720)), CAM, CAM_BL, im1, vehicle_tracker, tracks, refer_time, conf=0.4) + + if traffic_light == None: + im1, bev_lanes = clrnet.forward(cv2.resize(img, (1280, 720)), im1, CAM) # 传入RGB图像 + else: + bev_lanes = [] + + obstacles, cipv = cipv_notice(obstacles, bev_lanes) + if traffic_light is not None: + if traffic_light.tlcolor == 1 or traffic_light.tlcolor == 2: # 检测到红灯或黄灯时再检测停止线 + stop_line = line_filter(cv2.resize(img, (1280, 720)), M) + else: + stop_line = None + else: + stop_line = None + ''' + if weather == 'white' and truck.speed <= 45: + depth_img = depth_detect(encoder, depth_decoder, img, feed_height, feed_width) + ''' + + # 导航图处理 + navmap = cv2.cvtColor(img[610:740, 580:780, :], cv2.COLOR_RGB2BGR) # 截取导航地图[130, 200, 3] + navmap[:, 0:50, :] = np.zeros([130, 50, 3]) + navmap[:, 150:200, :] = np.zeros([130, 50, 3]) + nav_line, curve_speed_limit = nav_process(navmap) + im1 = np.uint8(im1) + + # 自车状态监控 + bar = cv2.cvtColor(img[750:768, 545:595, :], cv2.COLOR_RGB2BGR) # 截取信息条[18, 480, 3] + speed = speed_detect(ocr, bar, truck.speed) + speed_limit = curve_speed_limit + + # 决策 + if obstacles is not None: + state_trigger = planetrigger.update_trigger(fsmplanner.state, obstacles, bev_lanes) + if state_trigger is not None: + fsmplanner.trigger(state_trigger) + state = fsmplanner.state + + # 控制 + acc = 0.5 + ang = 0.5 + # if state == 'Cruise': + acc, ang = Cruise(vertical_pid, horizontal_pid, truck, speed_limit, nav_line) + if (state == 'Follow' or state == 'Pass') and cipv is not None: + acc, ang = Follow(cipv, vertical_fuzzy, horizontal_pid, nav_line) + + driver(ang, acc) + + truck.update(ang, acc, refer_time, speed) + + # BEV绘制 + bevmap = draw_bev(nav_line, obstacles, traffic_light, bev_lanes, stop_line) + bevmap = bevmap[160:, 160:640, :] + bevmap = cv2.resize(bevmap, (416, 346)) + bevmap = print_info(bevmap, refer_time, truck, speed_limit, state, weather) + + img_show = cv2.vconcat([cv2.resize(im1, (416, 234)), bevmap]) + ''' + if weather == 'dark' or truck.speed >= 45: + img_show = cv2.vconcat([cv2.resize(im1, (416, 234)), + cv2.resize(bevmap, (416, 346))]) + else: + img_show = cv2.vconcat([cv2.resize(im1, (416, 234)), + cv2.cvtColor(cv2.resize(depth_img, (416, 234)), cv2.COLOR_GRAY2BGR), + bevmap]) + ''' + cv2.imshow('detect', img_show) + + if cv2.waitKey(25) & 0xFF == ord('q'): + cv2.destroyAllWindows() + break + +time.sleep(0.1) +end() diff --git a/script/ocr_tool.py b/script/ocr_tool.py new file mode 100644 index 0000000..d5419fa --- /dev/null +++ b/script/ocr_tool.py @@ -0,0 +1,18 @@ +from paddleocr import PaddleOCR +import re +import string + +def speed_detect(ocr, bar, speed_last): + text = ocr.ocr(bar, cls=False) + if len(text) > 0: + if len(text[0]) > 0: + speed = text[0][0][1][0] + speed = re.sub('[a-zA-Z]', '', speed) + for i in string.punctuation: + speed = speed.replace(i, '') + if ' ' in speed: speed = speed.replace(' ', '') + if speed == '': speed = 0 + speed = int(speed) + return speed + return speed_last + return speed_last \ No newline at end of file diff --git a/tools/YOLOPV2onnx.py b/tools/YOLOPV2onnx.py new file mode 100644 index 0000000..1da1af5 --- /dev/null +++ b/tools/YOLOPV2onnx.py @@ -0,0 +1,106 @@ +import onnxruntime +import numpy as np +import cv2 + +class YOLOPV2onnx(): + def __init__(self, model_path, anchor_path, conf_thres=0.5, iou_thres=0.5): + # Initialize model + self.initialize_model(model_path, anchor_path, conf_thres, iou_thres) + + def __call__(self, image): + return self.estimate_road(image) + + def initialize_model(self, model_path, anchor_path, conf_thres=0.5, iou_thres=0.5): + self.session = onnxruntime.InferenceSession(model_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) + self.conf_thres = conf_thres + self.iou_thres = iou_thres + # Read the anchors from the file + self.anchors = np.squeeze(np.load(anchor_path)) + # Get model info + self.get_input_details() + self.get_output_details() + + def estimate_road(self, image): + input_tensor = self.prepare_input(image) + # Perform inference on the image + outputs = self.inference(input_tensor) + # Process output data + self.seg_map, self.filtered_boxes, self.filtered_scores = self.process_output(outputs) + return self.seg_map, self.filtered_boxes, self.filtered_scores + + def prepare_input(self, image): + self.img_height, self.img_width = image.shape[:2] + input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + # Resize input image + input_img = cv2.resize(input_img, (self.input_width,self.input_height)) + # Scale input pixel values to -1 to 1 + mean=[0.485, 0.456, 0.406] + std=[0.229, 0.224, 0.225] + input_img = ((input_img/ 255.0 - mean) / std) + input_img = input_img.transpose(2, 0, 1) + input_tensor = input_img[np.newaxis,:,:,:].astype(np.float32) + return input_tensor + + def inference(self, input_tensor): + # start = time.time() + outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor}) + # print(time.time() - start) + return outputs + + def process_output(self, outputs): + # Process segmentation map + seg_map = np.squeeze(np.argmax(outputs[self.output_names.index("segmentation")], axis=1)) + # Process detections + scores = np.squeeze(outputs[self.output_names.index("classification")]) + boxes = np.squeeze(outputs[self.output_names.index("regression")]) + filtered_boxes, filtered_scores = self.process_detections(scores, boxes) + return seg_map, filtered_boxes, filtered_scores + + def process_detections(self, scores, boxes): + transformed_boxes = transform_boxes(boxes, self.anchors) + + # Filter out low score detections + filtered_boxes = transformed_boxes[scores>self.conf_thres] + filtered_scores = scores[scores>self.conf_thres] + + # Resize the boxes with image size + filtered_boxes[:,[0,2]] *= self.img_width/self.input_width + filtered_boxes[:,[1,3]] *= self.img_height/self.input_height + + # Perform nms filtering + filtered_boxes, filtered_scores = nms_fast(filtered_boxes, filtered_scores, self.iou_thres) + return filtered_boxes, filtered_scores + + + def draw_segmentation(self, image, alpha = 0.5): + return util_draw_seg(self.seg_map, image, alpha) + + def draw_boxes(self, image, text=True): + return util_draw_detections(self.filtered_boxes, self.filtered_scores, image, text) + + def draw_2D(self, image, alpha = 0.5, text=True): + front_view = self.draw_segmentation(image, alpha) + return self.draw_boxes(front_view, text) + + def draw_bird_eye(self, image, horizon_points): + seg_map = self.draw_2D(image, 0.00001, text=False) + return util_draw_bird_eye_view(seg_map, horizon_points) + + def draw_all(self, image, horizon_points, alpha = 0.5): + front_view = self.draw_segmentation(image, alpha) + front_view = self.draw_boxes(front_view) + bird_eye_view = self.draw_bird_eye(image, horizon_points) + combined_img = np.hstack((front_view, bird_eye_view)) + return combined_img + + def get_input_details(self): + model_inputs = self.session.get_inputs() + self.input_names = [model_inputs[i].name for i in range(len(model_inputs))] + self.input_shape = model_inputs[0].shape + self.input_height = self.input_shape[2] + self.input_width = self.input_shape[3] + + def get_output_details(self): + + model_outputs = self.session.get_outputs() + self.output_names = [model_outputs[i].name for i in range(len(model_outputs))] \ No newline at end of file diff --git a/tools/resize.py b/tools/resize.py new file mode 100644 index 0000000..fe0739d --- /dev/null +++ b/tools/resize.py @@ -0,0 +1,5 @@ +import cv2 + +img = cv2.imread(r"D:/autodrive/script/test_Moment.jpg") +img = cv2.resize(img, (1280, 720)) +cv2.imwrite(r"D:/autodrive/script/test_Moment_small.jpg", img) \ No newline at end of file diff --git a/tools/test.py b/tools/test.py new file mode 100644 index 0000000..fd68a2b --- /dev/null +++ b/tools/test.py @@ -0,0 +1,22 @@ +import time + +import onnx +import numpy as np +import onnxruntime as ort + +# load onnx model +model = onnx.load_model('alexnet.onnx') +onnx.checker.check_model(model=model) +onnx.helper.printable_graph(graph=model.graph) + +# onnx_runtime val +# sess_options = ort.SessionOptions() +# sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL +# sess_options.optimized_model_filepath = "optimized_mnist_original.onnx" +# session = ort.InferenceSession("mnist_original.onnx", sess_options) +ort_session = ort.InferenceSession("alexnet.onnx") +while 1: + start = time.time() + outputs = ort_session.run(None, {'inputs': np.random.randn(10, 3, 224, 224).astype(np.float32)}) # inputs is same to orig + + print(time.time() - start)