import numpy as np
import os
import sys
import time
# Append custom modules to python path
mod_global_trajectory_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# print(mod_global_trajectory_path)
sys.path.append(mod_global_trajectory_path)
# Custom modules
import src
from interface import Receiver, Sender, unpack_inputs
[docs]class AcPmModel:
def __init__(self,
track_name: str,
b_visualize: bool = False,
ci: bool = False):
"""Class to interface the algorithms which are necessary to calculate an energy strategy for a race vehicle.
:param track_name: name of the track where the energy strategy shall be calculated on
:param b_visualize: switch to visualize the energy strategy results
:param ci: switch to be activated in the CI jobs
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
# --------------------------------------------------------------------------------------------------------------
# GLOBAL ATTRIBUTES --------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------------------
self.CI = ci # switch for CI jobs
self.track = track_name # Track name
self.track_data = None # Dictionary containing the track information
self.idx_meas = 0 # measurement idx in global s-coordinate
self.dpx_meas_local = 0 # measurement idx in local solver indices
self.idx_meas_last = 0 # measurement idx in global s-coordinates from previous iteration
# Set switches -------------------------------------------------------------------------------------------------
self.PLOT = True # Plot results
self.COMP_TO_IPOPT = True # Plot IPOPT results additionally
# Initialization of objects ------------------------------------------------------------------------------------
self.DDref = src.DrivingDynamicsReference.DrivingDynamicsReference(b_visualize=b_visualize)
self.ES = src.OnlineES.OnlineES(b_visualize=b_visualize)
# Data of all callable tracks: Name, Number of discr. points, v0 [m/s], length [m] -----------------------------
track_dict = {"Berlin": {"Name": 'Berlin', "discr": 290, "v0": 20, "Length": 2318},
"Hong_Kong": {"Name": 'Hong_Kong', "discr": 231, "v0": 20, "Length": 1845},
"Modena": {"Name": 'Modena', "discr": 251, "v0": 15, "Length": 2000},
"Monteblanco": {"Name": 'Monteblanco', "discr": 298, "v0": 45, "Length": 2382},
"Paris": {"Name": 'Paris', "discr": 235, "v0": 20, "Length": 1874},
"Upper_Heyford": {"Name": 'Upper_Heyford', "discr": 167, "v0": 20, "Length": 1332}}
# Select track according to args -------------------------------------------------------------------------------
self.track_data = track_dict[self.track]
print('[INFO] ES set up for ' + track_name + '\n')
# ------------------------------------------------------------------------------------------------------------------
# - DEFINE create_dd_ref METHOD ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
[docs] def create_dd_ref(self):
"""Calls the calculation of the reference velocity profile.
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
stat = self.DDref.create_ref(track_data=self.track_data)
if self.CI:
if stat == 0:
sys.exit(0)
else:
sys.exit(1)
# ------------------------------------------------------------------------------------------------------------------
# - DEFINE init_es METHOD ------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
[docs] def init_es(self,
x0: np.array,
num_laps: int):
"""Calls the initialization of the energy strategy.
:param x0: initial values of the DAEs
:param num_laps: number of race laps
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
# Overwrite velocity of initial state with a feasible one ------------------------------------------------------
# s_pm, v, t_pm, soc_batt, temp_batt, temp_mach, temp_inv, temp_cool_mi, temp_cool_b
x0[1] = self.track_data["v0"]
stat = self.ES.initialize_es(x0=x0,
track_data=self.track_data,
laps=num_laps)
if self.CI:
if stat == 0:
sys.exit(0)
else:
sys.exit(1)
# ------------------------------------------------------------------------------------------------------------------
# - DEFINE re-optimization METHOD ----------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
[docs] def re_optimize(self,
dp: int,
dp_dist: int,
x_meas: np.array):
"""Calls the reoptimization of the energy strategy.
:param dp: bounds on dp_th node
:param dp_dist: local position index of travelled vehicle in current solver instance
:param x_meas: measurement value
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
# bind measurement values on the states to max. and min. constraints
x_meas = es.ES.bind_meas_to_constraints(x_meas=x_meas)
self.ES.recalc_es(dpx=dp,
dpx_dist=dp_dist,
x0=x_meas)
# ------------------------------------------------------------------------------------------------------------------
# - DEFINE get_idx METHOD ------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
[docs] def get_idx(self,
vp_in: dict):
"""Calls the reoptimization of the energy strategy.
:param vp_in: velocity planner input
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.03.2021
"""
# get discrete position in NLP matching the s-coordinate of the measurement
self.idx_meas = (np.abs(self.ES.s_track - vp_in['s_meas'])).argmin()
print("[INFO] Triggered ES re-optimization on s-ID (new x0): ", self.idx_meas)
# measurement ID in local solver
self.dpx_meas_local = self.idx_meas - self.idx_meas_last
# overwrite last measurement ID with current one for next iteration
self.idx_meas_last = self.idx_meas
[docs]def translate_track_id(track_id: str) -> str:
"""Translates the track ID into its full name.
:param track_id: short ID of race track
:return track_name: full name of race track
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
if track_id == 'mnt':
track_name = 'Monteblanco'
elif track_id == 'mod':
track_name = 'Modena'
elif track_id == 'prs':
track_name = 'Paris'
elif track_id == 'ber':
track_name = 'Berlin'
elif track_id == 'hok':
track_name = 'Hong_Kong'
elif track_id == 'upp':
track_name = 'Upper_Heyford'
else:
print('No valid track ID found. Exiting.')
sys.exit(1)
return track_name
if __name__ == "__main__":
"""This function provides an example on how to use the energy strategy algorithm in this repository.
:Authors:
Thomas Herrmann <thomas.herrmann@tum.de>
:Created on:
01.12.2020
"""
# --- frequency of main loop task
f_main_loop_Hz = 1
b_visual = False
# --- Check command line args --------------------------------------------------------------------------------------
if len(sys.argv) >= 2:
# activate CI behavior
b_ci = bool(int(sys.argv[1]))
else:
b_ci = False
# --- initialize interfaces ----------------------------------------------------------------------------------------
zmq_rcv = Receiver.ZMQReceiver(theme="sender_imp_esim")
zmq_snd = Sender.ZMQSender(theme='receiver_exp_vplanner')
# --- embedded ES main loop ----------------------------------------------------------------------------------------
while True:
print('ES online.')
# --- get input from velocity planner module
vp_input = zmq_rcv.run()
if vp_input is not None:
unpack_inputs.unpack(msg_in=vp_input)
# --- Case: Calculate v_ref
if vp_input['phase'] == 'v':
track_name_ = translate_track_id(vp_input['track'])
es = AcPmModel(track_name=track_name_,
b_visualize=b_visual,
ci=b_ci)
es.create_dd_ref()
# delete solver object as a new solver is necessary for all future online functionality
del es
# --- Case: Initialize ES
elif vp_input['phase'] == 'i':
track_name_ = translate_track_id(vp_input['track'])
es = AcPmModel(track_name=track_name_,
b_visualize=b_visual,
ci=b_ci)
es.init_es(x0=vp_input['x0'],
num_laps=vp_input['num_laps'])
# --- Case: Re-optimize ES
elif vp_input['phase'] == 'r':
# get discrete position in NLP matching the s-coordinate of the measurement
es.get_idx(vp_in=vp_input)
# measurement value: calculated state + measurement difference
x_meas_ = \
es.ES.res.x[es.dpx_meas_local, :] \
+ vp_input['meas_diff']
# re-create solver with less discretization points
es.re_optimize(dp=es.idx_meas,
dp_dist=es.dpx_meas_local,
x_meas=x_meas_)
print("[INFO] New state at measurement point: ", es.ES.res.x[0, :])
# --- Case: not known
else:
print('Specified ES phase not known! Exiting.')
sys.exit(1)
# --- send variable power limits to trajectory planning module
if vp_input['phase'] == 'i' or vp_input['phase'] == 'r':
# data: global s (without last coordinate) and max. power values in between discretization points
s_glo = es.ES.res.x[:-1, 0] + es.ES.s_track[es.idx_meas]
zmq_snd.send(data=np.column_stack((s_glo, es.ES.res.pwr)))
time.sleep(1 / f_main_loop_Hz)