Note
Go to the end to download the full example code.
On-policy Deterministic Policy Gradient#
This example takes inspiration from the linear MPC numerical experiment in [5], but applies DPG to it instead of Q-learning. We are given an RL environment whose cost function is
where \(s\) is the state, \(a\) is the action, \(w\) is a weight vector, and \(\underline{s}\) and \(\overline{s}\) are the lower and upper bounds of the state, respectively. The dynamics of the real environment are
where \(e \sim \mathcal{U}(-0.1, 0)\). Given the state \(s_k\), the following MPC scheme is used to control the system
with \(\gamma = 0.9\), and the learnable parameters are
The parameters are initialized differently, and in particular, the prediction model of the MPC is initialized wrongly as
and \(S\) is the solution to the corresponding discrete-time algebraic Riccati equation, i.e., computed with the wrong dynamics matrices. The task is simple: find a parametrization \(\theta\) such that the cost function is minimized. To solve it, we will employ a first-order LSTD DPG algorithm.
import logging
from typing import Any, Optional
import casadi as cs
import gymnasium as gym
import numpy as np
import numpy.typing as npt
from csnlp import Nlp
from csnlp.wrappers import Mpc
from gymnasium.spaces import Box
from gymnasium.wrappers import TimeLimit
from mpcrl import (
LearnableParameter,
LearnableParametersDict,
LstdDpgAgent,
UpdateStrategy,
)
from mpcrl import exploration as E
from mpcrl.optim import GradientDescent
from mpcrl.util.control import dlqr
from mpcrl.wrappers.agents import Log, RecordUpdates
from mpcrl.wrappers.envs import MonitorEpisodes
Defining the environment#
First things first, we need to build the environment. We will use the gymnasium
library to do so. The most important methods are gymnasium.Env.reset and
gymnasium.Env.step, which will be called to reset the environment to its
initial state and to step the dynamics and receive a realization of the reward signal,
respectively. The environment is defined as a the following class.
class LtiSystem(gym.Env[npt.NDArray[np.floating], float]):
"""A simple discrete-time LTI system affected by uniform noise."""
nx = 2 # number of states
nu = 1 # number of inputs
A = np.asarray([[0.9, 0.35], [0, 1.1]]) # state-space matrix A
B = np.asarray([[0.0813], [0.2]]) # state-space matrix B
x_bnd = (np.asarray([[0], [-1]]), np.asarray([[1], [1]])) # bounds of state
a_bnd = (-1, 1) # bounds of control input
w = np.asarray([[1e2], [1e2]]) # penalty weight for bound violations
e_bnd = (-1e-1, 0) # uniform noise bounds
# extremely recommended to bound the action space with additive exploration so that
# we can clip the action before applying it to the system
action_space = Box(*a_bnd, (nu,), np.float64)
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[npt.NDArray[np.floating], dict[str, Any]]:
"""Resets the state of the LTI system."""
super().reset(seed=seed, options=options)
self.x = np.asarray([0, 0.15]).reshape(self.nx, 1)
return self.x, {}
def get_stage_cost(self, state: npt.NDArray[np.floating], action: float) -> float:
"""Computes the stage cost :math:`L(s,a)`."""
lb, ub = self.x_bnd
return (
0.5
* (
np.square(state).sum()
+ 0.5 * action**2
+ self.w.T @ np.maximum(0, lb - state)
+ self.w.T @ np.maximum(0, state - ub)
).item()
)
def step(
self, action: cs.DM
) -> tuple[npt.NDArray[np.floating], float, bool, bool, dict[str, Any]]:
"""Steps the LTI system."""
action = np.asarray(action).item()
x_new = self.A @ self.x + self.B * action
x_new[0] += self.np_random.uniform(*self.e_bnd)
r = self.get_stage_cost(self.x, action)
self.x = x_new
return x_new, r, False, False, {}
Defining the MPC controller#
The second component is the MPC controller. We’ll create a custom that, of course,
inherits from csnlp.wrappers.Mpc. The implementation is as follows, and it is
in line with the theory presented above.
class LinearMpc(Mpc[cs.SX]):
"""A simple linear MPC controller."""
horizon = 10
discount_factor = 0.9
learnable_pars_init = {
"V0": np.asarray([0, 0]),
"x_lb": np.asarray([0, 0]),
"x_ub": np.asarray([1, 0]),
"b": np.zeros(LtiSystem.nx),
"f": np.zeros(LtiSystem.nx + LtiSystem.nu),
"A": np.asarray([[1, 0.25], [0, 1]]),
"B": np.asarray([[0.0312], [0.25]]),
}
def __init__(self) -> None:
N = self.horizon
gamma = self.discount_factor
w = LtiSystem.w
nx, nu = LtiSystem.nx, LtiSystem.nu
x_bnd, a_bnd = LtiSystem.x_bnd, LtiSystem.a_bnd
nlp = Nlp[cs.SX]()
super().__init__(nlp, N)
# parameters
V0 = self.parameter("V0", (nx,))
x_lb = self.parameter("x_lb", (nx,))
x_ub = self.parameter("x_ub", (nx,))
b = self.parameter("b", (nx, 1))
f = self.parameter("f", (nx + nu, 1))
A = self.parameter("A", (nx, nx))
B = self.parameter("B", (nx, nu))
# variables (state, action, slack)
x, _ = self.state("x", nx, bound_initial=False)
u, _ = self.action("u", nu, lb=a_bnd[0], ub=a_bnd[1])
s, _, _ = self.variable("s", (nx, N), lb=0)
# dynamics
self.set_affine_dynamics(A, B, c=b)
# other constraints
self.constraint("x_lb", x_bnd[0] + x_lb - s, "<=", x[:, 1:])
self.constraint("x_ub", x[:, 1:], "<=", x_bnd[1] + x_ub + s)
# objective
A_init, B_init = self.learnable_pars_init["A"], self.learnable_pars_init["B"]
S = cs.DM(dlqr(A_init, B_init, 0.5 * np.eye(nx), 0.25 * np.eye(nu))[1])
gammapowers = cs.DM(gamma ** np.arange(N)).T
self.minimize(
V0.T @ x[:, 0] # to have a derivative, V0 must be a function of the state
+ cs.bilin(S, x[:, -1])
+ cs.sum2(f.T @ cs.vertcat(x[:, :-1], u))
+ 0.5
* cs.sum2(
gammapowers * (cs.sum1(x[:, :-1] ** 2) + 0.5 * cs.sum1(u**2) + w.T @ s)
)
)
# solver
opts = {
"expand": True,
"print_time": False,
"bound_consistency": True,
"calc_lam_p": False,
"fatrop": {"max_iter": 500, "print_level": 0},
}
self.init_solver(opts, solver="fatrop", type="nlp")
Simulation#
So far, we have only defined the classes for the environment and the MPC controller. Now, it is time to instantiate these and run the simulation. This is comprised of multiple steps, which are detailed below.
We instantiate the environment. Note how it is wrapped in two different wrappers:
gymnasium.wrappers.TimeLimitis used to impose a maximum amount of steps to be simulated, whereasmpcrl.wrappers.envs.MonitorEpisodesis used to record the state, action and reward signals at each time step for plotting purposes.We instantiate the MPC controller and define its learnable parameters.
We instantiate the DPG agent. We pass different options to it, such as the update strategy, the optimizer, the Hessian type, etc. For plotting purposes, it is also wrapped such that the updated parameters are recorded. And we also log the progress of the simulation.
We run the simulation. Under the hood, the agent will interact with the environment, collect data, and update the parameters of the MPC controller.
Finally, we plot the results. The first plot shows the evolution of the states and the control action, and the corresponding bounds. The second plot shows the performance per rollout, the norm of the estimated policy gradient per rollout, and time-wise stage cost realizations. The last plot shows how each learnable parameter evolves over time.
if __name__ == "__main__":
# instantiate the env and wrap it - since we will train for only one long episode,
# tell the DPG agent to perform its LSTD computations over subtrajectories of length
# 100.
env = MonitorEpisodes(TimeLimit(LtiSystem(), max_episode_steps=5_000))
rollout_length = 100
# now build the MPC and the dict of learnable parameters
mpc = LinearMpc()
learnable_pars = LearnableParametersDict(
(
LearnableParameter(name, val.shape, val)
for name, val in mpc.learnable_pars_init.items()
)
)
# build and wrap appropriately the agent
agent = Log(
RecordUpdates(
LstdDpgAgent(
mpc=mpc,
learnable_parameters=learnable_pars,
discount_factor=mpc.discount_factor,
optimizer=GradientDescent(learning_rate=1e-6),
update_strategy=UpdateStrategy(rollout_length, "on_timestep_end"),
rollout_length=rollout_length,
exploration=E.OrnsteinUhlenbeckExploration(0.0, 0.05, mode="additive"),
record_policy_performance=True,
record_policy_gradient=True,
)
),
level=logging.DEBUG,
log_frequencies={"on_timestep_end": 1000},
)
# launch the training simulation
agent.train(env=env, episodes=1, seed=69)
# plot the results
import matplotlib.pyplot as plt
X = env.get_wrapper_attr("observations")[0].squeeze().T
U = env.get_wrapper_attr("actions")[0].squeeze()
R = env.get_wrapper_attr("rewards")[0]
_, axs = plt.subplots(3, 1, constrained_layout=True, sharex=True)
axs[0].plot(X[0])
axs[1].plot(X[1])
axs[2].plot(U)
for i in range(2):
axs[0].axhline(env.get_wrapper_attr("x_bnd")[i][0], color="r")
axs[1].axhline(env.get_wrapper_attr("x_bnd")[i][1], color="r")
axs[2].axhline(env.get_wrapper_attr("a_bnd")[i], color="r")
axs[0].set_ylabel("$s_1$")
axs[1].set_ylabel("$s_2$")
axs[2].set_ylabel("$a$")
_, axs = plt.subplots(3, 1, constrained_layout=True)
axs[0].plot(agent.policy_performances)
axs[1].semilogy(np.linalg.norm(agent.policy_gradients, axis=1))
axs[2].semilogy(R, "o", markersize=1)
axs[0].set_ylabel(r"$J(\pi_\theta)$")
axs[1].set_ylabel(r"$||\nabla_\theta J(\pi_\theta)||$")
axs[2].set_ylabel("$L$")
_, axs = plt.subplots(3, 2, constrained_layout=True, sharex=True)
axs[0, 0].plot(np.asarray(agent.updates_history["b"]))
axs[0, 1].plot(
np.stack(
[np.asarray(agent.updates_history[n])[:, 0] for n in ("x_lb", "x_ub")], -1
),
)
axs[1, 0].plot(np.asarray(agent.updates_history["f"]))
axs[1, 1].plot(np.asarray(agent.updates_history["V0"]))
axs[2, 0].plot(np.asarray(agent.updates_history["A"]).reshape(-1, 4))
axs[2, 1].plot(np.asarray(agent.updates_history["B"]).squeeze())
axs[0, 0].set_ylabel("$b$")
axs[0, 1].set_ylabel("$x_1$")
axs[1, 0].set_ylabel("$f$")
axs[1, 1].set_ylabel("$V_0$")
axs[2, 0].set_ylabel("$A$")
axs[2, 1].set_ylabel("$B$")
plt.show()
LstdDpgAgent0@2026-03-11,08:57:30> training of <MonitorEpisodes<TimeLimit<LtiSystem instance>>> started.
/home/docs/checkouts/readthedocs.org/user_builds/mpc-reinforcement-learning/envs/latest/lib/python3.14/site-packages/mpcrl/agents/lstd_dpg.py:378: NumbaTypeSafetyWarning: unsafe cast from uint64 to int64. Precision may be lost.
s, e, cost, _, sol_val = rollout[i]
LstdDpgAgent0@2026-03-11,08:57:46> episode 0 stepped at time 1000.
LstdDpgAgent0@2026-03-11,08:57:50> episode 0 stepped at time 2000.
LstdDpgAgent0@2026-03-11,08:57:54> episode 0 stepped at time 3000.
LstdDpgAgent0@2026-03-11,08:57:58> episode 0 stepped at time 4000.
LstdDpgAgent0@2026-03-11,08:58:02> episode 0 stepped at time 5000.
LstdDpgAgent0@2026-03-11,08:58:02> training of <MonitorEpisodes<TimeLimit<LtiSystem instance>>> concluded with returns=[899.401].
Total running time of the script: (0 minutes 34.384 seconds)
Estimated memory usage: 351 MB


