Source code for mpcrl.agents.lstd_q_learning

import sys
from collections.abc import Collection, Iterable
from functools import partial
from typing import Callable, Generic, Literal, Optional, SupportsFloat, Union

import casadi as cs
import numpy as np
import numpy.typing as npt
from csnlp import Solution
from csnlp.wrappers import Mpc, NlpSensitivity
from gymnasium import Env

if sys.version_info >= (3, 10):
    from typing import TypeAlias
else:
    from typing_extensions import TypeAlias

from ..core.experience import ExperienceReplay
from ..core.exploration import ExplorationStrategy
from ..core.parameters import LearnableParametersDict
from ..core.update import UpdateStrategy
from ..core.warmstart import WarmStartStrategy
from ..optim.gradient_based_optimizer import GradientBasedOptimizer
from .common.agent import ActType, ObsType, SymType
from .common.rl_learning_agent import LrType, RlLearningAgent

# the experience buffer contains the gradient and, possibly, the hessian of the Bellman
# residuals w.r.t. the learnable parameters theta
ExpType: TypeAlias = tuple[
    npt.NDArray[np.floating], tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]
]


[docs] class LstdQLearningAgent( RlLearningAgent[SymType, ExpType, LrType], Generic[SymType, LrType] ): r"""Second-order Least-Squares Temporal Difference (LSTD) Q-learning agent, as first proposed in a simpler format in :cite:`gros_datadriven_2020`, and then in :cite:`esfahani_approximate_2021`. The Q-learning agent uses an MPC controller as policy provider and function approximation, and adjusts its parametrization according to the temporal-difference error, with the goal of improving the policy, in an indirect fashion by learning the action value function. Parameters ---------- mpc : :class:`csnlp.wrappers.Mpc` or tuple of :class:`csnlp.wrappers.Mpc` The MPC controller used as policy provider by this agent. If a tuple, the first entry is used to create the approximation of the state function :math:`V_\theta(s)` and the second for that of :math:`Q_\theta(s,a)`. Otherwise, the instance is modified in place to create both approximations, so it is recommended not to modify it further after initialization of the agent. Moreover, some parameter and constraint names will need to be created, so an error is thrown if these names are already in use in the mpc. update_strategy : UpdateStrategy or int The strategy used to decide which frequency to update the mpc parameters with. If an ``int`` is passed, then the default strategy that updates every ``n`` env's steps is used (where ``n`` is the argument passed); otherwise, an instance of :class:`core.update.UpdateStrategy` can be passed to specify the desired strategy in more details. discount_factor : float In RL, the factor that discounts future rewards in favor of immediate rewards. Usually denoted as :math:`\gamma`. It should satisfy :math:`\gamma \in (0, 1]`. optimizer : GradientBasedOptimizer A gradient-based optimizer (e.g., :class:`optim.GradientDescent`) to compute the updates of the learnable parameters, based on the current gradient-based RL algorithm. learnable_parameters : :class:`core.parameters.LearnableParametersDict` A special dict containing the learnable parameters of the MPC (usually referred to as :math:`\theta`), together with their bounds and values. This dict is complementary to :attr:`fixed_parameters`, which contains the MPC parameters that are not learnt by the agent. fixed_parameters : dict of (str, array_like) or collection of, optional A dict (or collection of dict, in case of the ``mpc`` wrapping an underlying :class:`csnlp.multistart.MultistartNlp` instance) whose keys are the names of the MPC parameters and the values are their corresponding values. Use this to specify fixed parameters, that is, non-learnable. If ``None``, then no fixed parameter is assumed. exploration : :class:`core.exploration.ExplorationStrategy`, optional Exploration strategy for inducing exploration in the online MPC policy. By default ``None``, in which case :class:`core.exploration.NoExploration` is used. experience : int or ExperienceReplay, optional The container for experience replay memory. If ``None`` is passed, then a memory with unitary length is created, i.e., it keeps only the latest memory transition. If an integer ``n`` is passed, then a memory with the length ``n`` is created and with sample size ``n``. Otherwise, pass an instance of :class:`core.experience.ExperienceReplay` to specify the requirements in more details. warmstart : "last" or "last-successful" or WarmStartStrategy, optional The warmstart strategy for the MPC's NLP. If ``"last-successful"``, the last successful solution is used to warmstart the solver for the next iteration. If ``"last"``, the last solution is used, regardless of success or failure. Furthermore, an instance of :class:`core.warmstart.WarmStartStrategy` can be passed to specify a strategy for generating multiple warmstart points for the MPC's NLP instance. This is useful to generate multiple initial conditions for highly non-convex, nonlinear problems. This feature can only be used with an MPC that has an underlying multistart NLP problem (see :mod:`csnlp.multistart`). hessian_type : {"approx", "full"}, optional The type of hessian to use in this (potentially) second-order algorithm. If the provided ``optimizer`` is first-order only, then this option is ignored. Otherwise, if ``"approx"``, a computationally lighter approximation of full Hessian is used; otherwise, the full hessian is computed, but this is usually much more expensive. record_td_errors: bool, optional If ``True``, the TD errors are recorded in the field :attr:`td_errors`, which otherwise is ``None``. By default, does not record them. use_last_action_on_fail : bool, optional In case the MPC solver fails - if ``False``, the action from the last solver's iteration is returned anyway (though suboptimal) - if ``True``, the action from the last successful call to the MPC is returned instead (if the MPC has been solved at least once successfully). By default, ``False``. remove_bounds_on_initial_action : bool, optional When ``True``, the upper and lower bounds on the initial action are removed in the action-value function approximator :math:`Q_\theta(s,a)` since the first action is constrained to be equal to the provided action :math:`a`. This is useful to avoid issues in the LICQ of the NLP. However, it can lead to numerical problems. By default, ``False``. name : str, optional Name of the agent. If ``None``, one is automatically created from a counter of the class' instancies. """ def __init__( self, mpc: Mpc[SymType] | tuple[Mpc[SymType], Mpc[SymType]], update_strategy: Union[int, UpdateStrategy], discount_factor: float, optimizer: GradientBasedOptimizer, learnable_parameters: LearnableParametersDict, fixed_parameters: Union[ None, dict[str, npt.ArrayLike], Collection[dict[str, npt.ArrayLike]] ] = None, exploration: Optional[ExplorationStrategy] = None, experience: Union[None, int, ExperienceReplay[ExpType]] = None, warmstart: Union[ Literal["last", "last-successful"], WarmStartStrategy ] = "last-successful", hessian_type: Literal["approx", "full"] = "approx", record_td_errors: bool = False, use_last_action_on_fail: bool = False, remove_bounds_on_initial_action: bool = False, name: Optional[str] = None, ) -> None: super().__init__( mpc=mpc, update_strategy=update_strategy, discount_factor=discount_factor, learnable_parameters=learnable_parameters, optimizer=optimizer, fixed_parameters=fixed_parameters, exploration=exploration, experience=experience, warmstart=warmstart, use_last_action_on_fail=use_last_action_on_fail, remove_bounds_on_initial_action=remove_bounds_on_initial_action, name=name, ) self._sensitivity = self._init_sensitivity(hessian_type) self.td_errors: Optional[list[float]] = [] if record_td_errors else None
[docs] def update(self) -> Optional[str]: if len(self.experience) <= 0: return "Experience buffer empty." sample = self.experience.sample() if self.optimizer.order == 1: gradient = np.mean(list(sample), 0) return self.optimizer.update(gradient) gradients, hessians = zip(*sample) gradient = np.mean(gradients, 0) hessian = np.mean(hessians, 0) return self.optimizer.update(gradient, hessian)
[docs] def train_one_episode( self, env: Env[ObsType, ActType], episode: int, init_state: ObsType, raises: bool = True, ) -> float: truncated = terminated = False timestep = 0 rewards = 0.0 state = init_state action_space = getattr(env, "action_space", None) # solve for the first action action, solV = self.state_value(state, False, action_space=action_space) if not solV.success: self.on_mpc_failure(episode, None, solV.status, raises) while not (truncated or terminated): # compute Q(s,a) solQ = self.action_value(state, action) # step the system with action computed at the previous iteration new_state, cost, truncated, terminated, _ = env.step(action) self.on_env_step(env, episode, timestep) # compute V(s+) and store transition new_action, solV = self.state_value( new_state, False, action_space=action_space ) if not self._try_store_experience(cost, solQ, solV): self.on_mpc_failure( episode, timestep, f"{solQ.status} (Q); {solV.status} (V)", raises ) # increase counters state = new_state action = new_action rewards += float(cost) timestep += 1 self.on_timestep_end(env, episode, timestep) return rewards
[docs] def train_one_rollout( self, rollout: Iterable[tuple[ObsType, ActType, float, ObsType]], episode: int, raises: bool = True, ) -> None: # in the case of off-policy q-learning, rollouts are made of # State-Action-Reward-next State (SARS) tuples for timestep, (state, action, cost, new_state) in enumerate(rollout, start=1): # compute Q(s,a) solQ = self.action_value(state, action) # compute V(s+) and store transition _, solV = self.state_value(new_state, False) if not self._try_store_experience(cost, solQ, solV): self.on_mpc_failure( episode, timestep, f"{solQ.status} (Q); {solV.status} (V)", raises ) self.on_timestep_end("off-policy", episode, timestep)
def _init_sensitivity( self, hessian_type: Literal["approx", "full"] ) -> Union[ Callable[[Solution], np.ndarray], Callable[[Solution], tuple[np.ndarray, float]], Callable[[Solution], tuple[np.ndarray, np.ndarray]], ]: """Internal utility to compute the derivative of ``Q(s,a)`` w.r.t. the learnable parameters, a.k.a., ``theta``.""" ord = self.optimizer.order nlp = self._Q.nlp theta = cs.vvcat([nlp.parameters[p] for p in self._learnable_pars]) x = nlp.x p = nlp.p lam_g_and_h = cs.vertcat(nlp.lam_g, nlp.lam_h) # compute first order sensitivity - necessary whatever the hessian type is snlp = NlpSensitivity(nlp, theta) gradient = snlp.jacobian("L-p") # exact gradient, i.e., dQ/dtheta if ord == 1: sensitivity = cs.Function( "lag_sens", [x, p, lam_g_and_h], [gradient], ["x", "p", "lam_g"], ["dQ"], {"cse": True}, ) elif hessian_type == "approx": hessian = snlp.hessian("L-pp") # approximate hessian # check if the hessian is not all zeros. If that's the case, we fall back to # computing just the gradient if hessian.nnz() > 0: sensitivity = cs.Function( "lag_sens", [x, p, lam_g_and_h], [gradient, hessian], ["x", "p", "lam_g"], ["dQ", "ddQ"], {"cse": True}, ) else: sensitivity = cs.Function( "lag_sens", [x, p, lam_g_and_h], [gradient], ["x", "p", "lam_g"], ["dQ"], {"cse": True}, ) else: lam_lbx_and_ubx = cs.vertcat(nlp.lam_lbx, nlp.lam_ubx) Kp = snlp.jacobian("K-p") Ky = snlp.jacobian("K-y") dydtheta = -cs.solve(Ky, Kp) Lpy = cs.jacobian(gradient, nlp.primal_dual) hessian = snlp.hessian("L-pp") + Lpy @ dydtheta # not sure if Lpy or Kp.T # check if the hessian is not all zeros. If that's the case, we fall back to # computing just the gradient if hessian.nnz() > 0: sensitivity = cs.Function( "lag_sens", [x, p, lam_g_and_h, lam_lbx_and_ubx], [gradient, hessian], ["x", "p", "lam_g", "lam_x"], ["dQ", "ddQ"], {"cse": True}, ) else: sensitivity = cs.Function( "lag_sens", [x, p, lam_g_and_h], # should lam_lbx_and_ubx be included? [gradient], ["x", "p", "lam_g"], ["dQ"], {"cse": True}, ) # convenience partial to avoid local lambdas return_zero_hessian = sensitivity.n_out() == 1 and ord > 1 return partial(_sol_sensitivities, sensitivity, return_zero_hessian) def _try_store_experience( self, cost: SupportsFloat, solQ: Solution[SymType], solV: Solution[SymType] ) -> bool: """Internal utility that tries to store the gradient and hessian for the current transition in memory, if both ``V`` and ``Q`` were successful; otherwise, does not store it. Returns whether it was successful or not.""" success = solQ.success and solV.success if success: td_error = cost + self.discount_factor * solV.f - solQ.f sensitivities = self._sensitivity(solQ) if self.optimizer.order == 1: dQ = sensitivities gradient = -td_error * dQ self.store_experience(gradient) else: dQ, ddQ = sensitivities gradient = -td_error * dQ hessian = np.multiply.outer(dQ, dQ) - td_error * ddQ self.store_experience((gradient, hessian)) else: td_error = np.nan if self.td_errors is not None: self.td_errors.append(td_error) return success
def _sol_sensitivities( sens: cs.Function, return_zero_hessian: bool, s: Solution ) -> Union[np.ndarray, tuple[np.ndarray, float], tuple[np.ndarray, np.ndarray]]: """Internal utility to compute sensitivities.""" out = ( sens(s.x, s.p, s.lam_g_and_h) if sens.n_in() == 3 else sens(s.x, s.p, s.lam_g_and_h, s.lam_lbx_and_ubx) ) if sens.n_out() == 1: J = np.asarray(out.elements()) if return_zero_hessian: return J, 0.0 return J J, H = out hessian_shape = sens.size_out("ddQ") return np.asarray(J.elements()), np.reshape(H.elements(), hessian_shape, "F")