Source code for src.OnlineES

import sys
import numpy as np
import time
# custom modules
import src
from postproc import comp_ipopt_lap


[docs]class OnlineES: def __init__(self, b_visualize: bool = False): """Class containing all the functionality to initialize and recalculate the energy strategy. :param b_visualize: activate interactive plots after every optimization of the energy strategy. :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ self.s_ref = None self.kappa_disc = None self.s_track = None self.v_ref = None self.es_model = None self.es_constraints = None self.es_solver = None self.nlp_steplen_init = None self.b_visualize = b_visualize self.res = src.GetESresults.ESres()
[docs] def initialize_es(self, x0: np.array, track_data: dict, laps: int) -> int: """Initializes the energy strategy. This is the first attempt to create an energy for a given race lap, which takes a reference velocity profile as an initial guess. You can create this reference velocity profile using the class `DrivingDynamicsReference`. :param x0: initial values of the ODEs :param track_data: dictionary containing all the necessary track information :param laps: number of race laps :return stat: return the solver status, which must be '0' for success :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ b_save_tikz = self.b_visualize # load driving dynamics reference speed profile self.v_ref = np.loadtxt('v_ref.csv', dtype=np.float64) # -------------------------------------------------------------------------------------------------------------- # - GET TRACK DATA --------------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------------------------- # import SINGLE race lap with variable mesh [self.s_ref, self.kappa_disc, self.s_track] = src.get_track.get_track(laps=1, TRACK=track_data) # multiples of delta_s self.s_ref = np.tile(self.s_ref, laps) # multiples of kappa_disc by removing first entry in kappa array and appending to it tmp_kappa_disc = self.kappa_disc tmp_v_ref = self.v_ref for i in range(0, laps - 1): self.kappa_disc = np.concatenate((self.kappa_disc, tmp_kappa_disc[1:])) self.v_ref = np.concatenate((self.v_ref, tmp_v_ref[1:])) # calculate global s coordinate starting from 0 s_global = np.insert(np.cumsum(self.s_ref), 0, 0) self.s_track = s_global # Get number of discretization points N = len(self.s_ref) # -------------------------------------------------------------------------------------------------------------- # SET MODEL AND SOLVER ----------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------------------------- # initialize model and constraints self.es_model, self.es_constraints = src.def_es_model.es_model(b_dd_ref=False, b_battery_simple=False, b_inverter_simple=False, b_machine_simple=False) # set solver and its costs ------------------------------------------------------------------------------------- self.es_solver = src.def_es_solver.es_solver(model=self.es_model, constraint=self.es_constraints, time_steps=self.s_ref, hessian_method="EXACT", x0=x0) if b_save_tikz: comp_ipopt_lap.comp_ipopt_lap(s_global=s_global, v_ref_long=self.v_ref) for i in range(0, N + 1): # set the track kappa values self.es_solver.set(i, "p", np.array([self.kappa_disc[i]])) # --- set the initial guess for the state variables using v_ref self.es_solver.set(i, 'x', np.hstack((x0[0], 0.5 * self.v_ref[i], x0[2:]))) if i < N: self.es_solver.set(i, 'u', np.array([0.5 * self.es_model.f_drive_max, 0])) # sets x in acados to x0-constraint to reduce feasibility issues self.set_state_constraints(x=x0) # --- bounds on first node: set x0 manually -------------------------------------------------------------------- print('[INFO] Initializing ES ...') self.nlp_steplen_init, \ stat = \ self.solve_es(N=N, kappa_disc_recalc=self.kappa_disc, b_save_tikz=b_save_tikz) return stat
[docs] def recalc_es(self, dpx: int, dpx_dist: int, x0: np.array): """Recalculates the energy strategy. Based on current measurement data (x0), a reoptimization will take place for the remaining race distance. :param x0: initial values of the ODEs (measurement) :param dpx: index in the global s-coordinate where the vehicle currently is and the measurements were taken :param dpx_dist: local index in the global s-coordinate where the vehicle currently is and the measurements were taken :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ b_save_tikz = self.b_visualize # -------------------------------------------------------------------------------------------------------------- # - GET REDUCED TRACK DATA ------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------------------------- ts = time.time() # Get number of discretization points s_ref_recalc = self.s_ref[dpx:] kappa_disc_recalc = self.kappa_disc[dpx:] # length: N + 1 N = len(s_ref_recalc) # -------------------------------------------------------------------------------------------------------------- # - SET NEW SOLVER --------------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------------------------- # set solver and its costs ------------------------------------------------------------------------------------- # delete old solver instance to free bound libraries del self.es_solver # this solver has less discretization points starting from the current position of the vehicle # TODO: try different initial NLP steplength from initialization solver self.es_solver = src.def_es_solver.es_solver(model=self.es_model, constraint=self.es_constraints, time_steps=s_ref_recalc, hessian_method="EXACT", x0=x0, nlp_steplen=self.nlp_steplen_init) # --- set ALL the available values from the previous solving process to speed up the computation --------------- for i in range(0, N + 1): # set the track kappa values for the reduced track self.es_solver.set(i, "p", np.array([kappa_disc_recalc[i]])) # states x self.es_solver.set(i, 'x', self.res.x[i + dpx_dist, :]) # controls u if i < N: u_guess = self.res.u[i + dpx_dist, :] # to enforce the complementarity constraint F_d * F_b == 0 if u_guess[0] > 0.5: u_guess[1] = 0 elif u_guess[1] < -0.5: u_guess[0] = 0 self.es_solver.set(i, 'u', u_guess) self.set_state_constraints(x=x0) print('[INFO] Resolving with measurement value ...') self.solve_es(N=N, kappa_disc_recalc=kappa_disc_recalc, b_save_tikz=b_save_tikz, dpx=dpx) print("[INFO] Re-solving time [s]:", time.time() - ts)
[docs] def set_state_constraints(self, x: np.array): """Set solver values to match the set constraints on the initial DAE value :param x: states vector on first discretization point :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ # set solver values to match the set constraints on the initial DAE value self.es_solver.set(0, 'x', x)
[docs] def reset_state_constraints(self): # --- Reset of constraints on specific discretization point (idx: 0) """ # lower bounds self.es_solver.\ set(0, 'lbx', np.array((self.es_model.v_min, self.es_model.soc_min, self.es_model.temp_batt_min, self.es_model.temp_mach_min, self.es_model.temp_inv_min, self.es_model.temp_cool_mach_inv_min, self.es_model.temp_cool_batt_min))) # upper bounds self.es_solver.\ set(0, 'ubx', np.array((self.es_model.v_max, self.es_model.soc_max, self.es_model.temp_batt_max, self.es_model.temp_mach_max, self.es_model.temp_inv_max, self.es_model.temp_cool_mach_inv_max, self.es_model.temp_cool_batt_max))) """ pass
[docs] def bind_meas_to_constraints(self, x_meas: np.array) -> np.array: """Binds the measurement values to the linear OCP state constraints. :param x_meas: measured states vector :return x_meas: returns modified measurement values :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ # Velocity [m/s] id_state = 1 if x_meas[id_state] > self.es_model.v_max: x_meas[id_state] = self.es_model.v_max if x_meas[id_state] < self.es_model.v_min: x_meas[id_state] = self.es_model.v_min # SOC [] id_state = 3 if x_meas[id_state] > self.es_model.soc_max: x_meas[id_state] = self.es_model.soc_max if x_meas[id_state] < self.es_model.soc_min: x_meas[id_state] = self.es_model.soc_min # T_Batt [degC] id_state = 4 if x_meas[id_state] > self.es_model.temp_batt_max: x_meas[id_state] = self.es_model.temp_batt_max if x_meas[id_state] < self.es_model.temp_batt_min: x_meas[id_state] = self.es_model.temp_batt_min # T_Machine [degC] id_state = 5 if x_meas[id_state] > self.es_model.temp_mach_max: x_meas[id_state] = self.es_model.temp_mach_max if x_meas[id_state] < self.es_model.temp_mach_min: x_meas[id_state] = self.es_model.temp_mach_min # T_Inverter [degC] id_state = 6 if x_meas[id_state] > self.es_model.temp_inv_max: x_meas[id_state] = self.es_model.temp_inv_max if x_meas[id_state] < self.es_model.temp_inv_min: x_meas[id_state] = self.es_model.temp_inv_min # T_Cool_MI [degC] id_state = 7 if x_meas[id_state] > self.es_model.temp_cool_mach_inv_max: x_meas[id_state] = self.es_model.temp_cool_mach_inv_max if x_meas[id_state] < self.es_model.temp_cool_mach_inv_min: x_meas[id_state] = self.es_model.temp_cool_mach_inv_min # T_Cool_B [degC] id_state = 8 if x_meas[id_state] > self.es_model.temp_cool_batt_max: x_meas[id_state] = self.es_model.temp_cool_batt_max if x_meas[id_state] < self.es_model.temp_cool_batt_min: x_meas[id_state] = self.es_model.temp_cool_batt_min return x_meas
[docs] def solve_es(self, N: int, kappa_disc_recalc: np.array, b_save_tikz: bool, dpx: int = 0): """Solves the OCP to provide energy strategy results. :param N: Length of discrete optimization horizon :param kappa_disc_recalc: curvature of the track, where the ES shall be solved on [rad/m] :param b_save_tikz: bool to save results as separate .tex-files :param dpx: discrete position index in the global s-coordinate frame where the vehicle currently is :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.12.2020 """ print("[INFO] NLP steplength:", self.es_solver.acados_ocp.solver_options.nlp_solver_step_length) # --- solve the given problem status = self.es_solver.solve() # --- First check: QP solver returned failure if status == 4: for i in range(0, N + 1): if i < N: # control guess u based on scenario. Longer run --> lower u-guess self.es_solver.set(i, 'u', np.array([0.1 * self.es_model.f_drive_max, 0])) # resolve OCP print("[INFO] ES re-solving with reduced control inputs guess ...") status = self.es_solver.solve() if status == 0: print("[INFO] ES re-solve | solving time [s]:", float(self.es_solver.get_stats('time_tot'))) print("[INFO] ES re-solve | SQP iterations:", int(self.es_solver.get_stats('sqp_iter'))) return 0 else: print("[ERROR] ES re-solve | Can't get rid of solver error! Check your problem.") nlp_steplength = 1.0 # --- Second check: Problem was solved directly if status == 0: print("[INFO] ES solver successful.") print("[INFO] ES | NLP solver status:", status) print("[INFO] ES | solving time [s]:", float(self.es_solver.get_stats('time_tot'))) print("[INFO] ES | SQP iterations:", int(self.es_solver.get_stats('sqp_iter'))) # --- Third check: NLP iterations limit elif status == 2: nlp_steplength_reduction = 0.25 nlp_steplength = self.es_solver.acados_ocp.solver_options.nlp_solver_step_length for i in range(1, int(1 / nlp_steplength_reduction)): print("[INFO] ES | solving time [s]:", float(self.es_solver.get_stats('time_tot'))) print("[INFO] ES problem seems to be challenging. Reducing NLP step length.") if self.es_solver.acados_ocp.solver_options.nlp_solver_step_length - nlp_steplength_reduction > 0.0: # reduce current NLP solver step length nlp_steplength -= nlp_steplength_reduction # set new step length in OCP solver self.es_solver.options_set('step_length', nlp_steplength) print("[INFO] NLP step length reduced to", nlp_steplength) # resolve OCP print("[INFO] Solving again to optimize ES ...") status = self.es_solver.solve() if status == 0: print("[INFO] ES | solving time [s]:", float(self.es_solver.get_stats('time_tot'))) print("[INFO] ES | SQP iterations:", int(self.es_solver.get_stats('sqp_iter'))) print("[INFO] ES solver successful.") break else: print("[INFO] ES | solving time [s]:", float(self.es_solver.get_stats('time_tot'))) else: print("[ERROR] ES problem solver failed with reduced NLP step length.") sys.exit(1) # --- Last check: Unknown status else: print("[ERROR] ES optimization failed.") self.res.convert_results(es_model=self.es_model, es_constraints=self.es_constraints, es_solver=self.es_solver, kappa_disc=kappa_disc_recalc) if self.b_visualize: # self.res.show_statistics(solver=self.es_solver) self.res.plot_results(es_model=self.es_model, s_glo_offset=self.s_track[dpx], b_save_tikz=b_save_tikz) return nlp_steplength, status
if __name__ == '__main__': pass