Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve acados mpc formulation #182

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve acados mpc
aghezz1 committed Dec 4, 2024
commit 9b166cc8cc72f08e47f8b5e2f469d6ecfeb8369d
197 changes: 128 additions & 69 deletions safe_control_gym/controllers/mpc/mpc_acados.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

try:
from acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
from acados_template.mpc_utils import detect_constraint_structure
except ImportError as e:
print(colored(f'Error: {e}', 'red'))
print(colored('acados not installed, cannot use acados-based controller. Exiting.', 'red'))
@@ -123,14 +124,7 @@ def setup_acados_model(self) -> AcadosModel:
acados_model.u = self.model.u_sym
acados_model.name = self.env.NAME

# set up rk4 (acados need symbolic expression of dynamics, not function)
k1 = self.model.fc_func(acados_model.x, acados_model.u)
k2 = self.model.fc_func(acados_model.x + self.dt / 2 * k1, acados_model.u)
k3 = self.model.fc_func(acados_model.x + self.dt / 2 * k2, acados_model.u)
k4 = self.model.fc_func(acados_model.x + self.dt * k3, acados_model.u)
f_disc = acados_model.x + self.dt / 6 * (k1 + 2 * k2 + 2 * k3 + k4)

acados_model.disc_dyn_expr = f_disc
acados_model.f_expl_expr = self.model.fc_func(acados_model.x, acados_model.u)

# store meta information # NOTE: unit is missing
acados_model.x_labels = self.env.STATE_LABELS
@@ -162,12 +156,12 @@ def setup_acados_optimizer(self):
ocp.model = self.acados_model

# set dimensions
ocp.dims.N = self.T # prediction horizon
ocp.solver_options.N_horizon = self.T # prediction horizon

# set cost (NOTE: safe-control-gym uses quadratic cost)
ocp.cost.cost_type = 'LINEAR_LS'
ocp.cost.cost_type_e = 'LINEAR_LS'
ocp.cost.W = scipy.linalg.block_diag(self.Q, self.R)
ocp.cost.W = scipy.linalg.block_diag(self.Q/self.dt, self.R/self.dt) # scaling by dt as Q, R are given at discrete time
ocp.cost.W_e = self.Q if not self.use_lqr_gain_and_terminal_cost else self.P
ocp.cost.Vx = np.zeros((ny, nx))
ocp.cost.Vx[:nx, :nx] = np.eye(nx)
@@ -223,9 +217,9 @@ def setup_acados_optimizer(self):
# set up solver options
ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM'
ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
ocp.solver_options.integrator_type = 'DISCRETE'
ocp.solver_options.integrator_type = 'ERK'
ocp.solver_options.nlp_solver_type = 'SQP' if not self.use_RTI else 'SQP_RTI'
ocp.solver_options.nlp_solver_max_iter = 25 if not self.use_RTI else 1
ocp.solver_options.nlp_solver_max_iter = 200
ocp.solver_options.globalization = 'MERIT_BACKTRACKING'
# ocp.solver_options.globalization = 'FUNNEL_L1PEN_LINESEARCH' if not self.use_RTI else 'MERIT_BACKTRACKING'
ocp.solver_options.tf = self.T * self.dt # prediction horizon
@@ -260,38 +254,120 @@ def processing_acados_constraints_expression(self,
ocp.constraints.ubu = self.env.constraints.input_constraints[0].upper_bounds
ocp.constraints.idxbu = idxbu # active constraints dimension
'''

ub = {'h': set_acados_constraint_bound(h_expr, 'ub', self.constraint_tol),
'h0': set_acados_constraint_bound(h0_expr, 'ub', self.constraint_tol),
'he': set_acados_constraint_bound(he_expr, 'ub', self.constraint_tol), }

lb = {'h': set_acados_constraint_bound(h_expr, 'lb'),
'h0': set_acados_constraint_bound(h0_expr, 'lb'),
'he': set_acados_constraint_bound(he_expr, 'lb'), }

# make sure all the ub and lb are 1D numpy arrays
# (see: https://discourse.acados.org/t/infeasible-qps-when-using-nonlinear-casadi-constraint-expressions/1595/5?u=mxche)
for key in ub.keys():
ub[key] = ub[key].flatten() if ub[key].ndim != 1 else ub[key]
lb[key] = lb[key].flatten() if lb[key].ndim != 1 else lb[key]
# check ub and lb dimensions
for key in ub.keys():
assert ub[key].ndim == 1, f'ub[{key}] is not 1D numpy array'
assert lb[key].ndim == 1, f'lb[{key}] is not 1D numpy array'
assert ub['h'].shape == lb['h'].shape, 'h_ub and h_lb have different shapes'

# pass the constraints to the ocp object
ocp.model.con_h_expr_0, ocp.model.con_h_expr, ocp.model.con_h_expr_e = \
h0_expr, h_expr, he_expr
ocp.dims.nh_0, ocp.dims.nh, ocp.dims.nh_e = \
h0_expr.shape[0], h_expr.shape[0], he_expr.shape[0]
# assign constraints upper and lower bounds
ocp.constraints.uh_0 = ub['h0']
ocp.constraints.lh_0 = lb['h0']
ocp.constraints.uh = ub['h']
ocp.constraints.lh = lb['h']
ocp.constraints.uh_e = ub['he']
ocp.constraints.lh_e = lb['he']
ub = {}
lb = {}
# Check if given constraints are double-sided with heuristics based on the constraint jacobian:
h_list = [h0_expr, h_expr, he_expr]
stage_type = ["initial", "path", "terminal"]
for h_s, s_type in zip(h_list, stage_type):
jac_h_expr = cs.jacobian(h_s, cs.vertcat(ocp.model.x, ocp.model.u))
jac_h_fn = cs.Function("jac_h_fn", [ocp.model.x, ocp.model.u], [jac_h_expr])
jac_eval = jac_h_fn(0, 0).full()

# Check jacobian by blocks:
nx = ocp.model.x.shape[0]
nu = ocp.model.u.shape[0]
if jac_eval.shape[0] == nx * 2:
if np.all(jac_eval[:nx, :nx] == -jac_eval[nx:, :nx]):
h_double_bounded = True
case = 1
elif jac_eval.shape[0] == nu * 2:
if np.all(jac_eval[:nu, nx:] == -jac_eval[nu:, nx:]):
h_double_bounded = True
case = 2
elif jac_eval.shape[0] == nx * 2 + nu * 2:
if np.all(jac_eval[:nx, :nx] == -jac_eval[nx:2*nx, :nx]) and \
np.all (jac_eval[nx*2:nx*2+nu, nx:] == -jac_eval[nx*2+nu:, nx:]):
h_double_bounded = True
case = 3
else:
h_double_bounded = False

if h_double_bounded:
h_fn = cs.Function("h_fn", [ocp.model.x, ocp.model.u], [h_s])
if all(jac_eval[0, :] >= 0):
start_with_ub = True
else:
start_with_ub = False
if case == 1:
if start_with_ub:
uh = np.array(-h_fn(0,0)[:nx])
lh = np.array(h_fn(0,0)[nx:])
h_s_short = h_s[:nx] + uh
else:
lh = np.array(h_fn(0,0)[:nx])
uh = np.array(-h_fn(0,0)[nx:])
h_s_short = h_s[:nx] - lh
elif case == 2:
if start_with_ub:
uh = np.array(-h_fn(0,0)[:nu])
lh = np.array(h_fn(0,0)[nu:])
h_s_short = h_s[:nu] + uh
else:
lh = np.array(h_fn(0,0)[:nu])
uh = np.array(-h_fn(0,0)[nu:])
h_s_short = h_s[:nu] - lh
elif case == 3:
if start_with_ub:
uh = np.concatenate([-h_fn(0,0)[:nx], -h_fn(0,0)[nx*2:nx*2+nu]])
lh = np.concatenate([h_fn(0,0)[nx:nx*2], h_fn(0,0)[nx*2+nu:]])
h_s_short = cs.vertcat(h_s[:nx], h_s[2*nx:nx*2+nu]) + uh
else:
lh = np.concatenate([h_fn(0,0)[:nx], h_fn(0,0)[nx*2:nx*2+nu]])
uh = np.concatenate([-h_fn(0,0)[nx:nx*2], -h_fn(0,0)[nx*2+nu:]])
h_s_short = cs.vertcat(h_s[:nx], h_s[2*nx:nx*2+nu]) - lh
if s_type == "initial":
ocp.model.con_h_expr_0 = -h_s_short
ocp.constraints.lh_0 = lh
ocp.constraints.uh_0 = uh
elif s_type == "path":
ocp.model.con_h_expr = -h_s_short
ocp.constraints.lh = lh
ocp.constraints.uh = uh
elif s_type == "terminal":
ocp.model.con_h_expr_e = -h_s_short
ocp.constraints.lh_e = lh
ocp.constraints.uh_e = uh
detect_constraint_structure(ocp.model, ocp.constraints, stage_type=s_type)
else:
if s_type == "initial":
ub.update({'h0': set_acados_constraint_bound(h0_expr, 'ub', self.constraint_tol)})
lb.update({'h0': set_acados_constraint_bound(h0_expr, 'lb')})
elif s_type == "path":
ub.update({'h': set_acados_constraint_bound(h_expr, 'ub', self.constraint_tol)})
lb.update({'h': set_acados_constraint_bound(h_expr, 'lb')})
elif s_type == "terminal":
ub.update({'he': set_acados_constraint_bound(he_expr, 'ub', self.constraint_tol)})
lb.update({'he': set_acados_constraint_bound(he_expr, 'lb')})

if ub != {}:
# make sure all the ub and lb are 1D numpy arrays
# (see: https://discourse.acados.org/t/infeasible-qps-when-using-nonlinear-casadi-constraint-expressions/1595/5?u=mxche)
for key in ub.keys():
ub[key] = ub[key].flatten() if ub[key].ndim != 1 else ub[key]
lb[key] = lb[key].flatten() if lb[key].ndim != 1 else lb[key]
# check ub and lb dimensions
for key in ub.keys():
assert ub[key].ndim == 1, f'ub[{key}] is not 1D numpy array'
assert lb[key].ndim == 1, f'lb[{key}] is not 1D numpy array'
assert ub['h'].shape == lb['h'].shape, 'h_ub and h_lb have different shapes'
# update acados ocp constraints
for key in ub.keys():
if key == 'h0':
ocp.model.con_h_expr_0 = h0_expr
ocp.dims.nh_0 = h0_expr.shape[0]
ocp.constraints.uh_0 = ub['h0']
ocp.constraints.lh_0 = lb['h0']
elif key == 'h':
ocp.model.con_h_expr = h_expr
ocp.dims.nh = h_expr.shape[0]
ocp.constraints.uh = ub['h']
ocp.constraints.lh = lb['h']
elif key == 'he':
ocp.model.con_h_expr_e = he_expr
ocp.dims.nh_e = he_expr.shape[0]
ocp.constraints.uh_e = ub['he']
ocp.constraints.lh_e = lb['he']

return ocp

@@ -344,31 +420,14 @@ def select_action(self,
self.acados_ocp_solver.set(self.T, 'yref', y_ref_e)

# solve the optimization problem
if self.use_RTI:
# preparation phase
self.acados_ocp_solver.options_set('rti_phase', 1)
status = self.acados_ocp_solver.solve()

# feedback phase
self.acados_ocp_solver.options_set('rti_phase', 2)
status = self.acados_ocp_solver.solve()

if status not in [0, 2]:
self.acados_ocp_solver.print_statistics()
raise Exception(f'acados returned status {status}. Exiting.')
if status == 2:
print(f'acados returned status {status}. ')

action = self.acados_ocp_solver.get(0, 'u')

elif not self.use_RTI:
status = self.acados_ocp_solver.solve()
if status not in [0, 2]:
self.acados_ocp_solver.print_statistics()
raise Exception(f'acados returned status {status}. Exiting.')
if status == 2:
print(f'acados returned status {status}. ')
action = self.acados_ocp_solver.get(0, 'u')
status = self.acados_ocp_solver.solve()
if status not in [0, 2]:
self.acados_ocp_solver.print_statistics()
raise Exception(f'acados returned status {status}. Exiting.')
if status == 2:
self.acados_ocp_solver.print_statistics()
print(f'acados returned status {status}. ')
action = self.acados_ocp_solver.get(0, 'u')

# get the open-loop solution
if self.x_prev is None and self.u_prev is None: