Source code for mpcrl.agents.common.globopt_learning_agent

from abc import ABC
from typing import Any, Optional

from gymnasium import Env

from mpcrl.agents.common.agent import ActType, ObsType

from ...core.update import UpdateStrategy
from ...optim.gradient_free_optimizer import GradientFreeOptimizer
from .learning_agent import LearningAgent, SymType


[docs] class GlobOptLearningAgent(LearningAgent[SymType, None], ABC): """Class for learning agents that employ gradient-free Global Optimization strategies (e.g., Bayesian Optimization) to learn/improve the MPC policy. Contrary to :class:`RlLearningAgent`, this class does not require a discount factor, but requires an instance of a :class:`optim.GradientFreeOptimizer` that adheres to the ask-tell interface, i.e., it must implement the :func:`optim.GradientFreeOptimizer.ask` and :func:`optim.GradientFreeOptimizer.tell` methods. Parameters ---------- optimizer : GradientFreeOptimizer An instance of :class:`optim.GradientFreeOptimizer` optimizer to ask for a suggested set of parameters to try out, and later tell the value of the objective function for that suggested set of parameters. kwargs Additional arguments to be passed to :class:`LearningAgent`. Note: the following kwargs are not yet supported - ``"experience"``: usually, GO strategies do not require experience replay - ``"update_strategy"``: updates are fixed at the end of each episode. """ def __init__(self, optimizer: GradientFreeOptimizer, **kwargs: Any) -> None: for key in ("experience", "update_strategy"): if key in kwargs: raise ValueError( f"{self.__class__.__name__} does not yet support `{key}` kwargs." ) self.optimizer = optimizer super().__init__(update_strategy=UpdateStrategy(1, "on_episode_end"), **kwargs) self.optimizer.set_learnable_parameters(self._learnable_pars)
[docs] def train(self, *args: Any, **kwargs: Any) -> Any: # prime the initial value of the learnable parameters by asking it to the # optimizer. This is the usual way to start Global Optimization strategies, # whereas in RL we usually start the user's given initial values. self.update() return super().train(*args, **kwargs)
[docs] def train_one_episode( self, env: Env[ObsType, ActType], episode: int, init_state: ObsType, raises: bool = True, ) -> float: # simply evaluate the MPC on the env with the current set of parameters for one # episode, and then tell the optimizer the value of the objective function rewards = 0.0 state = init_state truncated, terminated, timestep = False, False, 0 while not (truncated or terminated): action, sol = self.state_value(state, False) if not sol.success: self.on_mpc_failure(episode, timestep, sol.status, raises) state, r, truncated, terminated, _ = env.step(action) self.on_env_step(env, episode, timestep) rewards += float(r) timestep += 1 self.on_timestep_end(env, episode, timestep) values = ( self._learnable_pars.value_as_dict if self.optimizer.prefers_dict else self._learnable_pars.value ) self.optimizer.tell(values, rewards) return rewards
[docs] def update(self) -> Optional[str]: # simply ask the optimizer for a new set of parameters, and then update the # current parameters with this new set theta_new, status = self.optimizer.ask() self._learnable_pars.update_values(theta_new) return status