Source code for examples.use_emb_es

import numpy as np
from scipy import interpolate
import os
import sys
import time

module_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(module_path)
from interface.Receiver import ZMQReceiver as Rec
from interface.Sender import ZMQSender as Snd


ESIM_FINISHED = 0
ESIM_CALCULATING = 1
ESIM_UPDATED = 0
ESIM_OUTDATED = 1


[docs]class VarPowerLimits: __slots__ = ('input_path', '__s_var_pwr', 's_max_var_pwr', '__P_var_pwr', 'f_pwr_intp', 'rec_recalc', 'snd_recalc') def __init__(self): """ Class to store and interpolate variable power limits to be used during driving to feed into the variable P_max constraint. :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.02.2020 """ self.snd_recalc = None self.rec_recalc = None # s coordinate [m] self.__s_var_pwr = [0, 1000] # max. s coordinate [m] self.s_max_var_pwr = 0 # max. power values [kW] self.__P_var_pwr = [0, 0] self.f_pwr_intp = interpolate.interp1d(self.__s_var_pwr, self.__P_var_pwr, bounds_error=False)
[docs] def init_interface_recalc(self): # --- Initialize sender to trigger ESIM recalculation self.snd_recalc = Snd(theme='receiver_exp_esim') # --- Initialize receiver for information from ESIM self.rec_recalc = Rec(theme='sender_imp_vplanner')
[docs] def receive_esim_update(self): """Function to receive ESIM update. Call this function manually instead of a while loop in order not to block the velocity planner function.""" # receive updates from ESIM update_esim = self.rec_recalc.run() if update_esim is not None: s_glo_old = self.__s_var_pwr P_old = self.__P_var_pwr # match first s-coordinate of new plan into indices of old plan idx_old = (np.abs(s_glo_old - update_esim[0, 0])).argmin() idx_old -= 1 if idx_old < 0: idx_old = 0 # retrieve s-coordinate of old plan, which is a few meters behind starting s-coordinate of new plan s_glo_old = s_glo_old[idx_old] P_old = P_old[idx_old] self.__s_var_pwr = update_esim[:, 0] self.__P_var_pwr = update_esim[:, 1] # if old last s coordinate is smaller than new first s-coordinate, reuse this value to enforce smooth # transitions between updates if s_glo_old < self.__s_var_pwr[0]: self.__s_var_pwr = np.insert(self.__s_var_pwr, 0, s_glo_old) self.__P_var_pwr = np.insert(self.__P_var_pwr, 0, P_old) # --- Postprocess variable power array (no negative values) self.__P_var_pwr[self.__P_var_pwr < 0] = 0 # re-interpolate self.f_pwr_intp = interpolate.interp1d(self.__s_var_pwr, self.__P_var_pwr, bounds_error=False, assume_sorted=True) return ESIM_UPDATED else: return ESIM_CALCULATING
[docs] def trigger_recalc(self, s_meas: float = 1.0, meas_diff: np.ndarray = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0]), phase: str = 'r', track: str = 'mnt', laps: int = 12, x0: np.ndarray = np.array([1, 0, 0, 0.5, 35, 35, 35, 35, 35])): """Triggers energy strategy recalculation with measurement values. :param s_meas: global s-coordinate where vehicle currently is :param meas_diff: measurement values expressed as difference to energy strtaegy values at current position Only necessary for the initialization phase are the following parameters: :param phase: which ES phase to call: 'r', recalculation (default); 'i', init.; 'v', v-ref calculation :param track: which track: 'mnt', Monteblanco (default), ... :param laps: how many laps: any integer, 12 (default) :param x0: starting values :Authors: Thomas Herrmann <thomas.herrmann@tum.de> :Created on: 01.02.2020 """ # test interface: start ESIM and v-planner afterwards: a message on the ESIM-side should appear. # --- v_ref: 'v' + track-ID phase = phase track = track # --- init.: 'i' + track-ID + number of laps + initial state x0 laps = laps x0 = x0 # --- re-optim.: 'r' + global coordinate of measurement diff. [m] + measurement diff. [various] s_meas = s_meas # s [m], v[m/s], t[s], soc_batt [], temp_batt, temp_mach, temp_inv, temp_cool_mi, temp_cool_b [°C] meas_diff = meas_diff zs_data = dict() zs_data['phase'] = phase zs_data['track'] = track zs_data['num_laps'] = laps zs_data['x0'] = x0 zs_data['s_meas'] = s_meas zs_data['meas_diff'] = meas_diff self.snd_recalc.send(zs_data)
if __name__ == '__main__': # Object to store energy strategy results vpl = VarPowerLimits() # initialize interfaces to energy strategy vpl.init_interface_recalc() # --- trigger v-ref calculation ------------------------------------------------------------------------------------ ''' vpl.trigger_recalc(phase='v', track='mnt') ''' # --- trigger initialization --------------------------------------------------------------------------------------- # x0 = s [m], v[m/s], t[s], soc_batt [], temp_batt, temp_mach, temp_inv, temp_cool_mi, temp_cool_b [°C] vpl.trigger_recalc(s_meas=0.0, meas_diff=np.array(0), phase='i', track='mnt', laps=12, x0=np.array([1, 0, 0, 0.5, 35, 35, 35, 35, 35])) s_meas_ = 500 # meters ''' # --- trigger recalculation ---------------------------------------------------------------------------------------- # meas_diff = s [m], v[m/s], t[s], soc_batt [], temp_batt, temp_mach, temp_inv, temp_cool_mi, temp_cool_b [°C] vpl.trigger_recalc(s_meas=s_meas_, meas_diff=np.array([0, 0, 0, -0.02, 0, 3.0, 0, 0, 0])) ''' # receive an update by the energy strategy if calculation has finished r = ESIM_CALCULATING while r is not ESIM_UPDATED: r = vpl.receive_esim_update() time.sleep(1) print("Power on global coordinates s_meas, 1, 2 km: ", vpl.f_pwr_intp([s_meas_, 1000, 2000]))